mirror of
https://github.com/fluencelabs/redis
synced 2025-05-29 18:21:20 +00:00
Fix for issue 516, rewriting the command vector to correctly repliate BRPOPLPUSH. Still to test everything, especially edge cases
This commit is contained in:
parent
afc151d3ba
commit
3769dbd722
63
src/t_list.c
63
src/t_list.c
@ -624,6 +624,31 @@ void lremCommand(redisClient *c) {
|
|||||||
if (removed) touchWatchedKey(c->db,c->argv[1]);
|
if (removed) touchWatchedKey(c->db,c->argv[1]);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void rewriteClientCommandVector(redisClient *c, int argc, ...) {
|
||||||
|
va_list ap;
|
||||||
|
int j;
|
||||||
|
robj **argv; /* The new argument vector */
|
||||||
|
|
||||||
|
argv = zmalloc(sizeof(robj*)*argc);
|
||||||
|
va_start(ap,argc);
|
||||||
|
for (j = 0; j < argc; j++) {
|
||||||
|
robj *a;
|
||||||
|
|
||||||
|
a = va_arg(ap, robj*);
|
||||||
|
argv[j] = a;
|
||||||
|
incrRefCount(a);
|
||||||
|
}
|
||||||
|
/* We free the objects in the original vector at the end, so we are
|
||||||
|
* sure that if the same objects are reused in the new vector the
|
||||||
|
* refcount gets incremented before it gets decremented. */
|
||||||
|
for (j = 0; j < c->argc; j++) decrRefCount(c->argv[j]);
|
||||||
|
zfree(c->argv);
|
||||||
|
/* Replace argv and argc with our new versions. */
|
||||||
|
c->argv = argv;
|
||||||
|
c->argc = argc;
|
||||||
|
va_end(ap);
|
||||||
|
}
|
||||||
|
|
||||||
/* This is the semantic of this command:
|
/* This is the semantic of this command:
|
||||||
* RPOPLPUSH srclist dstlist:
|
* RPOPLPUSH srclist dstlist:
|
||||||
* IF LLEN(srclist) > 0
|
* IF LLEN(srclist) > 0
|
||||||
@ -640,7 +665,9 @@ void lremCommand(redisClient *c) {
|
|||||||
* as well. This command was originally proposed by Ezra Zygmuntowicz.
|
* as well. This command was originally proposed by Ezra Zygmuntowicz.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
void rpoplpushHandlePush(redisClient *c, robj *dstkey, robj *dstobj, robj *value) {
|
void rpoplpushHandlePush(redisClient *origclient, redisClient *c, robj *dstkey, robj *dstobj, robj *value) {
|
||||||
|
robj *aux;
|
||||||
|
|
||||||
if (!handleClientsWaitingListPush(c,dstkey,value)) {
|
if (!handleClientsWaitingListPush(c,dstkey,value)) {
|
||||||
/* Create the list if the key does not exist */
|
/* Create the list if the key does not exist */
|
||||||
if (!dstobj) {
|
if (!dstobj) {
|
||||||
@ -648,9 +675,25 @@ void rpoplpushHandlePush(redisClient *c, robj *dstkey, robj *dstobj, robj *value
|
|||||||
dbAdd(c->db,dstkey,dstobj);
|
dbAdd(c->db,dstkey,dstobj);
|
||||||
} else {
|
} else {
|
||||||
touchWatchedKey(c->db,dstkey);
|
touchWatchedKey(c->db,dstkey);
|
||||||
server.dirty++;
|
|
||||||
}
|
}
|
||||||
listTypePush(dstobj,value,REDIS_HEAD);
|
listTypePush(dstobj,value,REDIS_HEAD);
|
||||||
|
/* If we are pushing as a result of LPUSH against a key
|
||||||
|
* watched by BLPOPLPUSH, we need to rewrite the command vector.
|
||||||
|
* But if this is called directly by RPOPLPUSH (either directly
|
||||||
|
* or via a BRPOPLPUSH where the popped list exists)
|
||||||
|
* we should replicate the BRPOPLPUSH command itself. */
|
||||||
|
if (c != origclient) {
|
||||||
|
aux = createStringObject("LPUSH",5);
|
||||||
|
rewriteClientCommandVector(origclient,3,aux,dstkey,value);
|
||||||
|
decrRefCount(aux);
|
||||||
|
} else {
|
||||||
|
/* Make sure to always use RPOPLPUSH in the replication / AOF,
|
||||||
|
* even if the original command was BRPOPLPUSH. */
|
||||||
|
aux = createStringObject("RPOPLPUSH",9);
|
||||||
|
rewriteClientCommandVector(origclient,3,aux,c->argv[1],c->argv[2]);
|
||||||
|
decrRefCount(aux);
|
||||||
|
}
|
||||||
|
server.dirty++;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Always send the pushed value to the client. */
|
/* Always send the pushed value to the client. */
|
||||||
@ -666,16 +709,22 @@ void rpoplpushCommand(redisClient *c) {
|
|||||||
addReply(c,shared.nullbulk);
|
addReply(c,shared.nullbulk);
|
||||||
} else {
|
} else {
|
||||||
robj *dobj = lookupKeyWrite(c->db,c->argv[2]);
|
robj *dobj = lookupKeyWrite(c->db,c->argv[2]);
|
||||||
|
robj *touchedkey = c->argv[1];
|
||||||
|
|
||||||
if (dobj && checkType(c,dobj,REDIS_LIST)) return;
|
if (dobj && checkType(c,dobj,REDIS_LIST)) return;
|
||||||
value = listTypePop(sobj,REDIS_TAIL);
|
value = listTypePop(sobj,REDIS_TAIL);
|
||||||
rpoplpushHandlePush(c,c->argv[2],dobj,value);
|
/* We saved touched key, and protect it, since rpoplpushHandlePush
|
||||||
|
* may change the client command argument vector. */
|
||||||
|
incrRefCount(touchedkey);
|
||||||
|
rpoplpushHandlePush(c,c,c->argv[2],dobj,value);
|
||||||
|
|
||||||
/* listTypePop returns an object with its refcount incremented */
|
/* listTypePop returns an object with its refcount incremented */
|
||||||
decrRefCount(value);
|
decrRefCount(value);
|
||||||
|
|
||||||
/* Delete the source list when it is empty */
|
/* Delete the source list when it is empty */
|
||||||
if (listTypeLength(sobj) == 0) dbDelete(c->db,c->argv[1]);
|
if (listTypeLength(sobj) == 0) dbDelete(c->db,touchedkey);
|
||||||
touchWatchedKey(c->db,c->argv[1]);
|
touchWatchedKey(c->db,touchedkey);
|
||||||
|
decrRefCount(touchedkey);
|
||||||
server.dirty++;
|
server.dirty++;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -828,14 +877,14 @@ int handleClientsWaitingListPush(redisClient *c, robj *key, robj *ele) {
|
|||||||
addReplyMultiBulkLen(receiver,2);
|
addReplyMultiBulkLen(receiver,2);
|
||||||
addReplyBulk(receiver,key);
|
addReplyBulk(receiver,key);
|
||||||
addReplyBulk(receiver,ele);
|
addReplyBulk(receiver,ele);
|
||||||
return 1;
|
return 1; /* Serve just the first client as in B[RL]POP semantics */
|
||||||
} else {
|
} else {
|
||||||
/* BRPOPLPUSH, note that receiver->db is always equal to c->db. */
|
/* BRPOPLPUSH, note that receiver->db is always equal to c->db. */
|
||||||
dstobj = lookupKeyWrite(receiver->db,dstkey);
|
dstobj = lookupKeyWrite(receiver->db,dstkey);
|
||||||
if (dstobj && checkType(receiver,dstobj,REDIS_LIST)) {
|
if (dstobj && checkType(receiver,dstobj,REDIS_LIST)) {
|
||||||
decrRefCount(dstkey);
|
decrRefCount(dstkey);
|
||||||
} else {
|
} else {
|
||||||
rpoplpushHandlePush(receiver,dstkey,dstobj,ele);
|
rpoplpushHandlePush(c,receiver,dstkey,dstobj,ele);
|
||||||
decrRefCount(dstkey);
|
decrRefCount(dstkey);
|
||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user