From 20867e8009876e50ae1536203a097f4338598943 Mon Sep 17 00:00:00 2001 From: antirez Date: Thu, 26 May 2011 13:49:03 +0200 Subject: [PATCH 1/5] Fix for issue 516, rewriting the command vector to correctly repliate BRPOPLPUSH. Still to test everything, especially edge cases --- src/t_list.c | 63 ++++++++++++++++++++++++++++++++++++++++++++++------ 1 file changed, 56 insertions(+), 7 deletions(-) diff --git a/src/t_list.c b/src/t_list.c index b85cd2c1..6e84ba42 100644 --- a/src/t_list.c +++ b/src/t_list.c @@ -619,6 +619,31 @@ void lremCommand(redisClient *c) { if (removed) touchWatchedKey(c->db,c->argv[1]); } +void rewriteClientCommandVector(redisClient *c, int argc, ...) { + va_list ap; + int j; + robj **argv; /* The new argument vector */ + + argv = zmalloc(sizeof(robj*)*argc); + va_start(ap,argc); + for (j = 0; j < argc; j++) { + robj *a; + + a = va_arg(ap, robj*); + argv[j] = a; + incrRefCount(a); + } + /* We free the objects in the original vector at the end, so we are + * sure that if the same objects are reused in the new vector the + * refcount gets incremented before it gets decremented. */ + for (j = 0; j < c->argc; j++) decrRefCount(c->argv[j]); + zfree(c->argv); + /* Replace argv and argc with our new versions. */ + c->argv = argv; + c->argc = argc; + va_end(ap); +} + /* This is the semantic of this command: * RPOPLPUSH srclist dstlist: * IF LLEN(srclist) > 0 @@ -635,7 +660,9 @@ void lremCommand(redisClient *c) { * as well. This command was originally proposed by Ezra Zygmuntowicz. */ -void rpoplpushHandlePush(redisClient *c, robj *dstkey, robj *dstobj, robj *value) { +void rpoplpushHandlePush(redisClient *origclient, redisClient *c, robj *dstkey, robj *dstobj, robj *value) { + robj *aux; + if (!handleClientsWaitingListPush(c,dstkey,value)) { /* Create the list if the key does not exist */ if (!dstobj) { @@ -643,9 +670,25 @@ void rpoplpushHandlePush(redisClient *c, robj *dstkey, robj *dstobj, robj *value dbAdd(c->db,dstkey,dstobj); } else { touchWatchedKey(c->db,dstkey); - server.dirty++; } listTypePush(dstobj,value,REDIS_HEAD); + /* If we are pushing as a result of LPUSH against a key + * watched by BLPOPLPUSH, we need to rewrite the command vector. + * But if this is called directly by RPOPLPUSH (either directly + * or via a BRPOPLPUSH where the popped list exists) + * we should replicate the BRPOPLPUSH command itself. */ + if (c != origclient) { + aux = createStringObject("LPUSH",5); + rewriteClientCommandVector(origclient,3,aux,dstkey,value); + decrRefCount(aux); + } else { + /* Make sure to always use RPOPLPUSH in the replication / AOF, + * even if the original command was BRPOPLPUSH. */ + aux = createStringObject("RPOPLPUSH",9); + rewriteClientCommandVector(origclient,3,aux,c->argv[1],c->argv[2]); + decrRefCount(aux); + } + server.dirty++; } /* Always send the pushed value to the client. */ @@ -661,16 +704,22 @@ void rpoplpushCommand(redisClient *c) { addReply(c,shared.nullbulk); } else { robj *dobj = lookupKeyWrite(c->db,c->argv[2]); + robj *touchedkey = c->argv[1]; + if (dobj && checkType(c,dobj,REDIS_LIST)) return; value = listTypePop(sobj,REDIS_TAIL); - rpoplpushHandlePush(c,c->argv[2],dobj,value); + /* We saved touched key, and protect it, since rpoplpushHandlePush + * may change the client command argument vector. */ + incrRefCount(touchedkey); + rpoplpushHandlePush(c,c,c->argv[2],dobj,value); /* listTypePop returns an object with its refcount incremented */ decrRefCount(value); /* Delete the source list when it is empty */ - if (listTypeLength(sobj) == 0) dbDelete(c->db,c->argv[1]); - touchWatchedKey(c->db,c->argv[1]); + if (listTypeLength(sobj) == 0) dbDelete(c->db,touchedkey); + touchWatchedKey(c->db,touchedkey); + decrRefCount(touchedkey); server.dirty++; } } @@ -823,14 +872,14 @@ int handleClientsWaitingListPush(redisClient *c, robj *key, robj *ele) { addReplyMultiBulkLen(receiver,2); addReplyBulk(receiver,key); addReplyBulk(receiver,ele); - return 1; + return 1; /* Serve just the first client as in B[RL]POP semantics */ } else { /* BRPOPLPUSH, note that receiver->db is always equal to c->db. */ dstobj = lookupKeyWrite(receiver->db,dstkey); if (dstobj && checkType(receiver,dstobj,REDIS_LIST)) { decrRefCount(dstkey); } else { - rpoplpushHandlePush(receiver,dstkey,dstobj,ele); + rpoplpushHandlePush(c,receiver,dstkey,dstobj,ele); decrRefCount(dstkey); return 1; } From d7061f813714e2ec49d4d335fe3fdba092529f2b Mon Sep 17 00:00:00 2001 From: antirez Date: Fri, 27 May 2011 14:09:50 +0200 Subject: [PATCH 2/5] Removed a leak in the BRPOPLPUSH unrelated to issue 561 --- src/t_list.c | 1 + 1 file changed, 1 insertion(+) diff --git a/src/t_list.c b/src/t_list.c index 6e84ba42..e945ed8b 100644 --- a/src/t_list.c +++ b/src/t_list.c @@ -821,6 +821,7 @@ void unblockClientWaitingData(redisClient *c) { /* Cleanup the client structure */ zfree(c->bpop.keys); c->bpop.keys = NULL; + if (c->bpop.target) decrRefCount(c->bpop.target); c->bpop.target = NULL; c->flags &= ~REDIS_BLOCKED; c->flags |= REDIS_UNBLOCKED; From b190b0c98faff697dabbbde5524a5223a16a8bdb Mon Sep 17 00:00:00 2001 From: antirez Date: Fri, 27 May 2011 15:01:20 +0200 Subject: [PATCH 3/5] modified the BRPOPLPUSH target field cleanup strategy to fix it the proper way. --- src/t_list.c | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/src/t_list.c b/src/t_list.c index e945ed8b..00ae1edc 100644 --- a/src/t_list.c +++ b/src/t_list.c @@ -865,6 +865,9 @@ int handleClientsWaitingListPush(redisClient *c, robj *key, robj *ele) { receiver = ln->value; dstkey = receiver->bpop.target; + /* Protect receiver->bpop.target, that will be freed by + * the next unblockClientWaitingData() call. */ + if (dstkey) incrRefCount(dstkey); /* This should remove the first element of the "clients" list. */ unblockClientWaitingData(receiver); @@ -877,13 +880,12 @@ int handleClientsWaitingListPush(redisClient *c, robj *key, robj *ele) { } else { /* BRPOPLPUSH, note that receiver->db is always equal to c->db. */ dstobj = lookupKeyWrite(receiver->db,dstkey); - if (dstobj && checkType(receiver,dstobj,REDIS_LIST)) { - decrRefCount(dstkey); - } else { + if (!(dstobj && checkType(receiver,dstobj,REDIS_LIST))) { rpoplpushHandlePush(c,receiver,dstkey,dstobj,ele); decrRefCount(dstkey); return 1; } + decrRefCount(dstkey); } } From 196fc32b77f47f51431015ccf4e7f20ce94bd5e2 Mon Sep 17 00:00:00 2001 From: antirez Date: Fri, 27 May 2011 15:39:31 +0200 Subject: [PATCH 4/5] use the new rewriteClientCommandVector() function for SPOP -> SREM replication translation as well. --- src/networking.c | 25 +++++++++++++++++++++++++ src/redis.h | 1 + src/t_list.c | 25 ------------------------- src/t_set.c | 17 ++++++----------- 4 files changed, 32 insertions(+), 36 deletions(-) diff --git a/src/networking.c b/src/networking.c index 9c250cdd..7d4b9636 100644 --- a/src/networking.c +++ b/src/networking.c @@ -872,3 +872,28 @@ void getClientsMaxBuffers(unsigned long *longest_output_list, *biggest_input_buffer = bib; } +void rewriteClientCommandVector(redisClient *c, int argc, ...) { + va_list ap; + int j; + robj **argv; /* The new argument vector */ + + argv = zmalloc(sizeof(robj*)*argc); + va_start(ap,argc); + for (j = 0; j < argc; j++) { + robj *a; + + a = va_arg(ap, robj*); + argv[j] = a; + incrRefCount(a); + } + /* We free the objects in the original vector at the end, so we are + * sure that if the same objects are reused in the new vector the + * refcount gets incremented before it gets decremented. */ + for (j = 0; j < c->argc; j++) decrRefCount(c->argv[j]); + zfree(c->argv); + /* Replace argv and argc with our new versions. */ + c->argv = argv; + c->argc = argc; + va_end(ap); +} + diff --git a/src/redis.h b/src/redis.h index 7c147179..7dcca3be 100644 --- a/src/redis.h +++ b/src/redis.h @@ -661,6 +661,7 @@ void addReplyMultiBulkLen(redisClient *c, long length); void *dupClientReplyValue(void *o); void getClientsMaxBuffers(unsigned long *longest_output_list, unsigned long *biggest_input_buffer); +void rewriteClientCommandVector(redisClient *c, int argc, ...); #ifdef __GNUC__ void addReplyErrorFormat(redisClient *c, const char *fmt, ...) diff --git a/src/t_list.c b/src/t_list.c index 00ae1edc..c05bd1ae 100644 --- a/src/t_list.c +++ b/src/t_list.c @@ -619,31 +619,6 @@ void lremCommand(redisClient *c) { if (removed) touchWatchedKey(c->db,c->argv[1]); } -void rewriteClientCommandVector(redisClient *c, int argc, ...) { - va_list ap; - int j; - robj **argv; /* The new argument vector */ - - argv = zmalloc(sizeof(robj*)*argc); - va_start(ap,argc); - for (j = 0; j < argc; j++) { - robj *a; - - a = va_arg(ap, robj*); - argv[j] = a; - incrRefCount(a); - } - /* We free the objects in the original vector at the end, so we are - * sure that if the same objects are reused in the new vector the - * refcount gets incremented before it gets decremented. */ - for (j = 0; j < c->argc; j++) decrRefCount(c->argv[j]); - zfree(c->argv); - /* Replace argv and argc with our new versions. */ - c->argv = argv; - c->argc = argc; - va_end(ap); -} - /* This is the semantic of this command: * RPOPLPUSH srclist dstlist: * IF LLEN(srclist) > 0 diff --git a/src/t_set.c b/src/t_set.c index bffe873a..84f736c5 100644 --- a/src/t_set.c +++ b/src/t_set.c @@ -325,7 +325,7 @@ void scardCommand(redisClient *c) { } void spopCommand(redisClient *c) { - robj *set, *ele; + robj *set, *ele, *aux; int64_t llele; int encoding; @@ -341,16 +341,11 @@ void spopCommand(redisClient *c) { setTypeRemove(set,ele); } - /* Change argv to replicate as SREM */ - c->argc = 3; - c->argv = zrealloc(c->argv,sizeof(robj*)*(c->argc)); - - /* Overwrite SREM with SPOP (same length) */ - redisAssert(sdslen(c->argv[0]->ptr) == 4); - memcpy(c->argv[0]->ptr, "SREM", 4); - - /* Popped element already has incremented refcount */ - c->argv[2] = ele; + /* Replicate/AOF this command as an SREM operation */ + aux = createStringObject("SREM",4); + rewriteClientCommandVector(c,3,aux,c->argv[1],ele); + decrRefCount(ele); + decrRefCount(aux); addReplyBulk(c,ele); if (setTypeSize(set) == 0) dbDelete(c->db,c->argv[1]); From dfc74051ce4243209e26bea8dbdda2ee00a9cf06 Mon Sep 17 00:00:00 2001 From: antirez Date: Mon, 30 May 2011 17:16:33 +0200 Subject: [PATCH 5/5] test for the BRPOPLPUSH issue 561 related issues --- tests/integration/replication.tcl | 18 ++++++++++++++++++ tests/unit/type/list.tcl | 8 ++++++++ 2 files changed, 26 insertions(+) diff --git a/tests/integration/replication.tcl b/tests/integration/replication.tcl index 892fae03..227356b2 100644 --- a/tests/integration/replication.tcl +++ b/tests/integration/replication.tcl @@ -6,6 +6,24 @@ start_server {tags {"repl"}} { s -1 role } {slave} + test {BRPOPLPUSH replication, when blocking against empty list} { + set rd [redis_deferring_client] + $rd brpoplpush a b 5 + r lpush a foo + after 1000 + assert_equal [r debug digest] [r -1 debug digest] + } + + test {BRPOPLPUSH replication, list exists} { + set rd [redis_deferring_client] + r lpush c 1 + r lpush c 2 + r lpush c 3 + $rd brpoplpush c d 5 + after 1000 + assert_equal [r debug digest] [r -1 debug digest] + } + test {MASTER and SLAVE dataset should be identical after complex ops} { createComplexDataset r 10000 after 500 diff --git a/tests/unit/type/list.tcl b/tests/unit/type/list.tcl index 6b128b72..84c95c2e 100644 --- a/tests/unit/type/list.tcl +++ b/tests/unit/type/list.tcl @@ -262,6 +262,14 @@ start_server { r exec } {foo bar {} {} {bar foo}} + test {BRPOPLPUSH timeout} { + set rd [redis_deferring_client] + + $rd brpoplpush foo_list bar_list 1 + after 2000 + $rd read + } {} + foreach {pop} {BLPOP BRPOP} { test "$pop: with single empty list argument" { set rd [redis_deferring_client]