mirror of
https://github.com/fluencelabs/redis
synced 2025-06-13 01:01:22 +00:00
Implements [B]Z[REV]POP and the respective unit tests
An implementation of the [Ze POP Redis Module](https://github.com/itamarhaber/zpop) as core Redis commands. Fixes #1861.
This commit is contained in:
@ -135,7 +135,9 @@ void processUnblockedClients(void) {
|
||||
/* Unblock a client calling the right function depending on the kind
|
||||
* of operation the client is blocking for. */
|
||||
void unblockClient(client *c) {
|
||||
if (c->btype == BLOCKED_LIST || c->btype == BLOCKED_STREAM) {
|
||||
if (c->btype == BLOCKED_LIST ||
|
||||
c->btype == BLOCKED_ZSET ||
|
||||
c->btype == BLOCKED_STREAM) {
|
||||
unblockClientWaitingData(c);
|
||||
} else if (c->btype == BLOCKED_WAIT) {
|
||||
unblockClientWaitingReplicas(c);
|
||||
@ -162,7 +164,9 @@ void unblockClient(client *c) {
|
||||
* send it a reply of some kind. After this function is called,
|
||||
* unblockClient() will be called with the same client as argument. */
|
||||
void replyToBlockedClientTimedOut(client *c) {
|
||||
if (c->btype == BLOCKED_LIST || c->btype == BLOCKED_STREAM) {
|
||||
if (c->btype == BLOCKED_LIST ||
|
||||
c->btype == BLOCKED_ZSET ||
|
||||
c->btype == BLOCKED_STREAM) {
|
||||
addReply(c,shared.nullmultibulk);
|
||||
} else if (c->btype == BLOCKED_WAIT) {
|
||||
addReplyLongLong(c,replicationCountAcksByOffset(c->bpop.reploffset));
|
||||
@ -244,7 +248,7 @@ void handleClientsBlockedOnKeys(void) {
|
||||
client *receiver = clientnode->value;
|
||||
|
||||
if (receiver->btype != BLOCKED_LIST) {
|
||||
/* Put on the tail, so that at the next call
|
||||
/* Put at the tail, so that at the next call
|
||||
* we'll not run into it again. */
|
||||
listDelNode(clients,clientnode);
|
||||
listAddNodeTail(clients,receiver);
|
||||
@ -289,6 +293,43 @@ void handleClientsBlockedOnKeys(void) {
|
||||
* when an element was pushed on the list. */
|
||||
}
|
||||
|
||||
/* Serve clients blocked on sorted set key. */
|
||||
else if (o != NULL && o->type == OBJ_ZSET) {
|
||||
dictEntry *de;
|
||||
|
||||
/* We serve clients in the same order they blocked for
|
||||
* this key, from the first blocked to the last. */
|
||||
de = dictFind(rl->db->blocking_keys,rl->key);
|
||||
if (de) {
|
||||
list *clients = dictGetVal(de);
|
||||
int numclients = listLength(clients);
|
||||
|
||||
while(numclients--) {
|
||||
listNode *clientnode = listFirst(clients);
|
||||
client *receiver = clientnode->value;
|
||||
|
||||
if (receiver->btype != BLOCKED_ZSET) {
|
||||
/* Put at the tail, so that at the next call
|
||||
* we'll not run into it again. */
|
||||
listDelNode(clients,clientnode);
|
||||
listAddNodeTail(clients,receiver);
|
||||
continue;
|
||||
}
|
||||
|
||||
int reverse = (receiver->lastcmd &&
|
||||
receiver->lastcmd->proc == bzpopCommand) ?
|
||||
0 : 1;
|
||||
unblockClient(receiver);
|
||||
genericZpopCommand(receiver,&rl->key,1,reverse);
|
||||
|
||||
propagate(reverse ?
|
||||
server.zrevpopCommand : server.zpopCommand,
|
||||
receiver->db->id,receiver->argv,receiver->argc,
|
||||
PROPAGATE_AOF|PROPAGATE_REPL);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/* Serve clients blocked on stream key. */
|
||||
else if (o != NULL && o->type == OBJ_STREAM) {
|
||||
dictEntry *de = dictFind(rl->db->blocking_keys,rl->key);
|
||||
@ -371,8 +412,9 @@ void handleClientsBlockedOnKeys(void) {
|
||||
}
|
||||
}
|
||||
|
||||
/* This is how the current blocking lists/streams work, we use BLPOP as
|
||||
* example, but the concept is the same for other list ops and XREAD.
|
||||
/* This is how the current blocking lists/sorted sets/streams work, we use
|
||||
* BLPOP as example, but the concept is the same for other list ops, sorted
|
||||
* sets and XREAD.
|
||||
* - If the user calls BLPOP and the key exists and contains a non empty list
|
||||
* then LPOP is called instead. So BLPOP is semantically the same as LPOP
|
||||
* if blocking is not required.
|
||||
@ -389,14 +431,14 @@ void handleClientsBlockedOnKeys(void) {
|
||||
* to the number of elements we have in the ready list.
|
||||
*/
|
||||
|
||||
/* Set a client in blocking mode for the specified key (list or stream), with
|
||||
* the specified timeout. The 'type' argument is BLOCKED_LIST or BLOCKED_STREAM
|
||||
* depending on the kind of operation we are waiting for an empty key in
|
||||
* order to awake the client. The client is blocked for all the 'numkeys'
|
||||
* keys as in the 'keys' argument. When we block for stream keys, we also
|
||||
* provide an array of streamID structures: clients will be unblocked only
|
||||
* when items with an ID greater or equal to the specified one is appended
|
||||
* to the stream. */
|
||||
/* Set a client in blocking mode for the specified key (list, zset or stream),
|
||||
* with the specified timeout. The 'type' argument is BLOCKED_LIST,
|
||||
* BLOCKED_ZSET or BLOCKED_STREAM depending on the kind of operation we are
|
||||
* waiting for an empty key in order to awake the client. The client is blocked
|
||||
* for all the 'numkeys' keys as in the 'keys' argument. When we block for
|
||||
* stream keys, we also provide an array of streamID structures: clients will
|
||||
* be unblocked only when items with an ID greater or equal to the specified
|
||||
* one is appended to the stream. */
|
||||
void blockForKeys(client *c, int btype, robj **keys, int numkeys, mstime_t timeout, robj *target, streamID *ids) {
|
||||
dictEntry *de;
|
||||
list *l;
|
||||
@ -409,7 +451,7 @@ void blockForKeys(client *c, int btype, robj **keys, int numkeys, mstime_t timeo
|
||||
|
||||
for (j = 0; j < numkeys; j++) {
|
||||
/* The value associated with the key name in the bpop.keys dictionary
|
||||
* is NULL for lists, or the stream ID for streams. */
|
||||
* is NULL for lists and sorted sets, or the stream ID for streams. */
|
||||
void *key_data = NULL;
|
||||
if (btype == BLOCKED_STREAM) {
|
||||
key_data = zmalloc(sizeof(streamID));
|
||||
|
Reference in New Issue
Block a user