mirror of
https://github.com/fluencelabs/redis
synced 2025-04-25 10:32:14 +00:00
Merge pull request #6798 from guybe7/module_circular_block
Fix memory corruption in moduleHandleBlockedClients
This commit is contained in:
commit
10afb9639a
18
src/module.c
18
src/module.c
@ -4393,14 +4393,26 @@ RedisModuleBlockedClient *moduleBlockClient(RedisModuleCtx *ctx, RedisModuleCmdF
|
||||
* can really be unblocked, since the module was able to serve the client.
|
||||
* If the callback returns REDISMODULE_OK, then the client can be unblocked,
|
||||
* otherwise the client remains blocked and we'll retry again when one of
|
||||
* the keys it blocked for becomes "ready" again. */
|
||||
* the keys it blocked for becomes "ready" again.
|
||||
* This function returns 1 if client was served (and should be unblocked) */
|
||||
int moduleTryServeClientBlockedOnKey(client *c, robj *key) {
|
||||
int served = 0;
|
||||
RedisModuleBlockedClient *bc = c->bpop.module_blocked_handle;
|
||||
/* Protect against re-processing: don't serve clients that are already
|
||||
* in the unblocking list for any reason (including RM_UnblockClient()
|
||||
* explicit call). */
|
||||
if (bc->unblocked) return REDISMODULE_ERR;
|
||||
* explicit call).
|
||||
* For example, the following pathological case:
|
||||
* Assume a module called LIST implements the same command as
|
||||
* the Redis list data type.
|
||||
* LIST.BRPOPLPUSH src dst 0 ('src' goes into db->blocking_keys)
|
||||
* LIST.BRPOPLPUSH dst src 0 ('dst' goes into db->blocking_keys)
|
||||
* LIST.LPUSH src foo
|
||||
* 'src' is in db->blocking_keys after the first BRPOPLPUSH is served
|
||||
* (and stays there until the next beforeSleep).
|
||||
* The second BRPOPLPUSH will signal 'src' as ready, leading to the
|
||||
* unblocking of the already unblocked (and worst, freed) reply_client
|
||||
* of the first BRPOPLPUSH. */
|
||||
if (bc->unblocked) return 0;
|
||||
RedisModuleCtx ctx = REDISMODULE_CTX_INIT;
|
||||
ctx.flags |= REDISMODULE_CTX_BLOCKED_REPLY;
|
||||
ctx.blocked_ready_key = key;
|
||||
|
@ -3981,7 +3981,7 @@ sds genRedisInfoString(const char *section) {
|
||||
maxin, maxout,
|
||||
server.blocked_clients,
|
||||
server.tracking_clients,
|
||||
raxSize(server.clients_timeout_table));
|
||||
(unsigned long long)raxSize(server.clients_timeout_table));
|
||||
}
|
||||
|
||||
/* Memory */
|
||||
|
@ -109,41 +109,33 @@ int fsl_push(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
|
||||
return RedisModule_ReplyWithError(ctx,"ERR new element has to be greater than the head element");
|
||||
|
||||
fsl->list[fsl->length++] = ele;
|
||||
|
||||
if (fsl->length >= 2)
|
||||
RedisModule_SignalKeyAsReady(ctx, argv[1]);
|
||||
RedisModule_SignalKeyAsReady(ctx, argv[1]);
|
||||
|
||||
return RedisModule_ReplyWithSimpleString(ctx, "OK");
|
||||
}
|
||||
|
||||
int bpop2_reply_callback(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
|
||||
int bpop_reply_callback(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
|
||||
REDISMODULE_NOT_USED(argv);
|
||||
REDISMODULE_NOT_USED(argc);
|
||||
RedisModuleString *keyname = RedisModule_GetBlockedClientReadyKey(ctx);
|
||||
|
||||
fsl_t *fsl;
|
||||
if (!get_fsl(ctx, keyname, REDISMODULE_READ, 0, &fsl, 0))
|
||||
if (!get_fsl(ctx, keyname, REDISMODULE_READ, 0, &fsl, 0) || !fsl)
|
||||
return REDISMODULE_ERR;
|
||||
|
||||
if (!fsl || fsl->length < 2)
|
||||
return REDISMODULE_ERR;
|
||||
|
||||
RedisModule_ReplyWithArray(ctx, 2);
|
||||
RedisModule_ReplyWithLongLong(ctx, fsl->list[--fsl->length]);
|
||||
RedisModule_ReplyWithLongLong(ctx, fsl->list[--fsl->length]);
|
||||
return REDISMODULE_OK;
|
||||
}
|
||||
|
||||
int bpop2_timeout_callback(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
|
||||
int bpop_timeout_callback(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
|
||||
REDISMODULE_NOT_USED(argv);
|
||||
REDISMODULE_NOT_USED(argc);
|
||||
return RedisModule_ReplyWithSimpleString(ctx, "Request timedout");
|
||||
}
|
||||
|
||||
|
||||
/* FSL.BPOP2 <key> <timeout> - Block clients until list has two or more elements.
|
||||
/* FSL.BPOP <key> <timeout> - Block clients until list has two or more elements.
|
||||
* When that happens, unblock client and pop the last two elements (from the right). */
|
||||
int fsl_bpop2(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
|
||||
int fsl_bpop(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
|
||||
if (argc != 3)
|
||||
return RedisModule_WrongArity(ctx);
|
||||
|
||||
@ -155,13 +147,10 @@ int fsl_bpop2(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
|
||||
if (!get_fsl(ctx, argv[1], REDISMODULE_READ, 0, &fsl, 1))
|
||||
return REDISMODULE_OK;
|
||||
|
||||
if (!fsl || fsl->length < 2) {
|
||||
/* Key is empty or has <2 elements, we must block */
|
||||
RedisModule_BlockClientOnKeys(ctx, bpop2_reply_callback, bpop2_timeout_callback,
|
||||
if (!fsl) {
|
||||
RedisModule_BlockClientOnKeys(ctx, bpop_reply_callback, bpop_timeout_callback,
|
||||
NULL, timeout, &argv[1], 1, NULL);
|
||||
} else {
|
||||
RedisModule_ReplyWithArray(ctx, 2);
|
||||
RedisModule_ReplyWithLongLong(ctx, fsl->list[--fsl->length]);
|
||||
RedisModule_ReplyWithLongLong(ctx, fsl->list[--fsl->length]);
|
||||
}
|
||||
|
||||
@ -175,10 +164,10 @@ int bpopgt_reply_callback(RedisModuleCtx *ctx, RedisModuleString **argv, int arg
|
||||
long long *pgt = RedisModule_GetBlockedClientPrivateData(ctx);
|
||||
|
||||
fsl_t *fsl;
|
||||
if (!get_fsl(ctx, keyname, REDISMODULE_READ, 0, &fsl, 0))
|
||||
if (!get_fsl(ctx, keyname, REDISMODULE_READ, 0, &fsl, 0) || !fsl)
|
||||
return REDISMODULE_ERR;
|
||||
|
||||
if (!fsl || fsl->list[fsl->length-1] <= *pgt)
|
||||
if (fsl->list[fsl->length-1] <= *pgt)
|
||||
return REDISMODULE_ERR;
|
||||
|
||||
RedisModule_ReplyWithLongLong(ctx, fsl->list[--fsl->length]);
|
||||
@ -218,7 +207,6 @@ int fsl_bpopgt(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
|
||||
/* We use malloc so the tests in blockedonkeys.tcl can check for memory leaks */
|
||||
long long *pgt = RedisModule_Alloc(sizeof(long long));
|
||||
*pgt = gt;
|
||||
/* Key is empty or has <2 elements, we must block */
|
||||
RedisModule_BlockClientOnKeys(ctx, bpopgt_reply_callback, bpopgt_timeout_callback,
|
||||
bpopgt_free_privdata, timeout, &argv[1], 1, pgt);
|
||||
} else {
|
||||
@ -228,6 +216,88 @@ int fsl_bpopgt(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
|
||||
return REDISMODULE_OK;
|
||||
}
|
||||
|
||||
int bpoppush_reply_callback(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
|
||||
REDISMODULE_NOT_USED(argv);
|
||||
REDISMODULE_NOT_USED(argc);
|
||||
RedisModuleString *src_keyname = RedisModule_GetBlockedClientReadyKey(ctx);
|
||||
RedisModuleString *dst_keyname = RedisModule_GetBlockedClientPrivateData(ctx);
|
||||
|
||||
fsl_t *src;
|
||||
if (!get_fsl(ctx, src_keyname, REDISMODULE_READ, 0, &src, 0) || !src)
|
||||
return REDISMODULE_ERR;
|
||||
|
||||
fsl_t *dst;
|
||||
if (!get_fsl(ctx, dst_keyname, REDISMODULE_WRITE, 1, &dst, 0) || !dst)
|
||||
return REDISMODULE_ERR;
|
||||
|
||||
long long ele = src->list[--src->length];
|
||||
dst->list[dst->length++] = ele;
|
||||
RedisModule_SignalKeyAsReady(ctx, dst_keyname);
|
||||
return RedisModule_ReplyWithLongLong(ctx, ele);
|
||||
}
|
||||
|
||||
int bpoppush_timeout_callback(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
|
||||
REDISMODULE_NOT_USED(argv);
|
||||
REDISMODULE_NOT_USED(argc);
|
||||
return RedisModule_ReplyWithSimpleString(ctx, "Request timedout");
|
||||
}
|
||||
|
||||
void bpoppush_free_privdata(RedisModuleCtx *ctx, void *privdata) {
|
||||
RedisModule_FreeString(ctx, privdata);
|
||||
}
|
||||
|
||||
/* FSL.BPOPPUSH <src> <dst> <timeout> - Block clients until <src> has an element.
|
||||
* When that happens, unblock client, pop the last element from <src> and push it to <dst>
|
||||
* (from the right). */
|
||||
int fsl_bpoppush(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
|
||||
if (argc != 4)
|
||||
return RedisModule_WrongArity(ctx);
|
||||
|
||||
long long timeout;
|
||||
if (RedisModule_StringToLongLong(argv[3],&timeout) != REDISMODULE_OK || timeout < 0)
|
||||
return RedisModule_ReplyWithError(ctx,"ERR invalid timeout");
|
||||
|
||||
fsl_t *src;
|
||||
if (!get_fsl(ctx, argv[1], REDISMODULE_READ, 0, &src, 1))
|
||||
return REDISMODULE_OK;
|
||||
|
||||
if (!src) {
|
||||
/* Retain string for reply callback */
|
||||
RedisModule_RetainString(ctx, argv[2]);
|
||||
/* Key is empty, we must block */
|
||||
RedisModule_BlockClientOnKeys(ctx, bpoppush_reply_callback, bpoppush_timeout_callback,
|
||||
bpoppush_free_privdata, timeout, &argv[1], 1, argv[2]);
|
||||
} else {
|
||||
fsl_t *dst;
|
||||
if (!get_fsl(ctx, argv[2], REDISMODULE_WRITE, 1, &dst, 1))
|
||||
return REDISMODULE_OK;
|
||||
long long ele = src->list[--src->length];
|
||||
dst->list[dst->length++] = ele;
|
||||
RedisModule_SignalKeyAsReady(ctx, argv[2]);
|
||||
RedisModule_ReplyWithLongLong(ctx, ele);
|
||||
}
|
||||
|
||||
return REDISMODULE_OK;
|
||||
}
|
||||
|
||||
/* FSL.GETALL <key> - Reply with an array containing all elements. */
|
||||
int fsl_getall(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
|
||||
if (argc != 2)
|
||||
return RedisModule_WrongArity(ctx);
|
||||
|
||||
fsl_t *fsl;
|
||||
if (!get_fsl(ctx, argv[1], REDISMODULE_READ, 0, &fsl, 1))
|
||||
return REDISMODULE_OK;
|
||||
|
||||
if (!fsl)
|
||||
return RedisModule_ReplyWithArray(ctx, 0);
|
||||
|
||||
RedisModule_ReplyWithArray(ctx, fsl->length);
|
||||
for (int i = 0; i < fsl->length; i++)
|
||||
RedisModule_ReplyWithLongLong(ctx, fsl->list[i]);
|
||||
return REDISMODULE_OK;
|
||||
}
|
||||
|
||||
int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
|
||||
REDISMODULE_NOT_USED(argv);
|
||||
REDISMODULE_NOT_USED(argc);
|
||||
@ -252,11 +322,17 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
|
||||
if (RedisModule_CreateCommand(ctx,"fsl.push",fsl_push,"",0,0,0) == REDISMODULE_ERR)
|
||||
return REDISMODULE_ERR;
|
||||
|
||||
if (RedisModule_CreateCommand(ctx,"fsl.bpop2",fsl_bpop2,"",0,0,0) == REDISMODULE_ERR)
|
||||
if (RedisModule_CreateCommand(ctx,"fsl.bpop",fsl_bpop,"",0,0,0) == REDISMODULE_ERR)
|
||||
return REDISMODULE_ERR;
|
||||
|
||||
if (RedisModule_CreateCommand(ctx,"fsl.bpopgt",fsl_bpopgt,"",0,0,0) == REDISMODULE_ERR)
|
||||
return REDISMODULE_ERR;
|
||||
|
||||
if (RedisModule_CreateCommand(ctx,"fsl.bpoppush",fsl_bpoppush,"",0,0,0) == REDISMODULE_ERR)
|
||||
return REDISMODULE_ERR;
|
||||
|
||||
if (RedisModule_CreateCommand(ctx,"fsl.getall",fsl_getall,"",0,0,0) == REDISMODULE_ERR)
|
||||
return REDISMODULE_ERR;
|
||||
|
||||
return REDISMODULE_OK;
|
||||
}
|
||||
|
@ -3,37 +3,53 @@ set testmodule [file normalize tests/modules/blockonkeys.so]
|
||||
start_server {tags {"modules"}} {
|
||||
r module load $testmodule
|
||||
|
||||
test "Module client blocked on keys: Circular BPOPPUSH" {
|
||||
set rd1 [redis_deferring_client]
|
||||
set rd2 [redis_deferring_client]
|
||||
|
||||
r del src dst
|
||||
|
||||
$rd1 fsl.bpoppush src dst 0
|
||||
$rd2 fsl.bpoppush dst src 0
|
||||
|
||||
r fsl.push src 42
|
||||
|
||||
assert_equal {42} [r fsl.getall src]
|
||||
assert_equal {} [r fsl.getall dst]
|
||||
}
|
||||
|
||||
test "Module client blocked on keys: Self-referential BPOPPUSH" {
|
||||
set rd1 [redis_deferring_client]
|
||||
|
||||
r del src
|
||||
|
||||
$rd1 fsl.bpoppush src src 0
|
||||
|
||||
r fsl.push src 42
|
||||
|
||||
assert_equal {42} [r fsl.getall src]
|
||||
}
|
||||
|
||||
test {Module client blocked on keys (no metadata): No block} {
|
||||
r del k
|
||||
r fsl.push k 33
|
||||
r fsl.push k 34
|
||||
r fsl.bpop2 k 0
|
||||
} {34 33}
|
||||
r fsl.bpop k 0
|
||||
} {34}
|
||||
|
||||
test {Module client blocked on keys (no metadata): Timeout} {
|
||||
r del k
|
||||
set rd [redis_deferring_client]
|
||||
r fsl.push k 33
|
||||
$rd fsl.bpop2 k 1
|
||||
$rd fsl.bpop k 1
|
||||
assert_equal {Request timedout} [$rd read]
|
||||
}
|
||||
|
||||
test {Module client blocked on keys (no metadata): Blocked, case 1} {
|
||||
test {Module client blocked on keys (no metadata): Blocked} {
|
||||
r del k
|
||||
set rd [redis_deferring_client]
|
||||
r fsl.push k 33
|
||||
$rd fsl.bpop2 k 0
|
||||
$rd fsl.bpop k 0
|
||||
r fsl.push k 34
|
||||
assert_equal {34 33} [$rd read]
|
||||
}
|
||||
|
||||
test {Module client blocked on keys (no metadata): Blocked, case 2} {
|
||||
r del k
|
||||
set rd [redis_deferring_client]
|
||||
r fsl.push k 33
|
||||
r fsl.push k 34
|
||||
$rd fsl.bpop2 k 0
|
||||
assert_equal {34 33} [$rd read]
|
||||
assert_equal {34} [$rd read]
|
||||
}
|
||||
|
||||
test {Module client blocked on keys (with metadata): No block} {
|
||||
@ -108,13 +124,12 @@ start_server {tags {"modules"}} {
|
||||
test {Module client blocked on keys does not wake up on wrong type} {
|
||||
r del k
|
||||
set rd [redis_deferring_client]
|
||||
$rd fsl.bpop2 k 0
|
||||
$rd fsl.bpop k 0
|
||||
r lpush k 12
|
||||
r lpush k 13
|
||||
r lpush k 14
|
||||
r del k
|
||||
r fsl.push k 33
|
||||
r fsl.push k 34
|
||||
assert_equal {34 33} [$rd read]
|
||||
assert_equal {34} [$rd read]
|
||||
}
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user