mirror of
https://github.com/fluencelabs/redis
synced 2025-06-18 03:31:21 +00:00
Merge branch 'unstable' into 5.0 branch
This commit is contained in:
135
src/t_stream.c
135
src/t_stream.c
@ -207,7 +207,7 @@ int streamAppendItem(stream *s, robj **argv, int64_t numfields, streamID *added_
|
||||
|
||||
/* Create a new listpack and radix tree node if needed. Note that when
|
||||
* a new listpack is created, we populate it with a "master entry". This
|
||||
* is just a set of fields that is taken as refernce in order to compress
|
||||
* is just a set of fields that is taken as references in order to compress
|
||||
* the stream entries that we'll add inside the listpack.
|
||||
*
|
||||
* Note that while we use the first added entry fields to create
|
||||
@ -469,7 +469,7 @@ int64_t streamTrimByLength(stream *s, size_t maxlen, int approx) {
|
||||
* iteration is from the start to the end element (inclusive), otherwise
|
||||
* if rev is non-zero, the iteration is reversed.
|
||||
*
|
||||
* Once the iterator is initalized, we iterate like this:
|
||||
* Once the iterator is initialized, we iterate like this:
|
||||
*
|
||||
* streamIterator myiterator;
|
||||
* streamIteratorStart(&myiterator,...);
|
||||
@ -705,10 +705,22 @@ void streamIteratorRemoveEntry(streamIterator *si, streamID *current) {
|
||||
/* Change the valid/deleted entries count in the master entry. */
|
||||
unsigned char *p = lpFirst(lp);
|
||||
aux = lpGetInteger(p);
|
||||
lp = lpReplaceInteger(lp,&p,aux-1);
|
||||
p = lpNext(lp,p); /* Seek deleted field. */
|
||||
aux = lpGetInteger(p);
|
||||
lp = lpReplaceInteger(lp,&p,aux+1);
|
||||
|
||||
if (aux == 1) {
|
||||
/* If this is the last element in the listpack, we can remove the whole
|
||||
* node. */
|
||||
lpFree(lp);
|
||||
raxRemove(si->stream->rax,si->ri.key,si->ri.key_len,NULL);
|
||||
} else {
|
||||
/* In the base case we alter the counters of valid/deleted entries. */
|
||||
lp = lpReplaceInteger(lp,&p,aux-1);
|
||||
p = lpNext(lp,p); /* Seek deleted field. */
|
||||
aux = lpGetInteger(p);
|
||||
lp = lpReplaceInteger(lp,&p,aux+1);
|
||||
}
|
||||
|
||||
/* Update the number of entries counter. */
|
||||
si->stream->length--;
|
||||
|
||||
/* Update the number of entries counter. */
|
||||
si->stream->length--;
|
||||
@ -834,7 +846,7 @@ void streamPropagateXCLAIM(client *c, robj *key, robj *group, robj *id, streamNA
|
||||
* given, but currently such a feature is never used by the code base that
|
||||
* will always pass 'spi' and propagate when a group is passed.
|
||||
*
|
||||
* Note that this function is recursive in certian cases. When it's called
|
||||
* Note that this function is recursive in certain cases. When it's called
|
||||
* with a non NULL group and consumer argument, it may call
|
||||
* streamReplyWithRangeFromConsumerPEL() in order to get entries from the
|
||||
* consumer pending entries list. However such a function will then call
|
||||
@ -905,24 +917,26 @@ size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end
|
||||
* will not require extra lookups. We'll fix the problem later
|
||||
* if we find that there is already a entry for this ID. */
|
||||
streamNACK *nack = streamCreateNACK(consumer);
|
||||
int retval = 0;
|
||||
retval += raxTryInsert(group->pel,buf,sizeof(buf),nack,NULL);
|
||||
retval += raxTryInsert(consumer->pel,buf,sizeof(buf),nack,NULL);
|
||||
int group_inserted =
|
||||
raxTryInsert(group->pel,buf,sizeof(buf),nack,NULL);
|
||||
int consumer_inserted =
|
||||
raxTryInsert(consumer->pel,buf,sizeof(buf),nack,NULL);
|
||||
|
||||
/* Now we can check if the entry was already busy, and
|
||||
* in that case reassign the entry to the new consumer. */
|
||||
if (retval == 0) {
|
||||
* in that case reassign the entry to the new consumer,
|
||||
* or update it if the consumer is the same as before. */
|
||||
if (group_inserted == 0) {
|
||||
streamFreeNACK(nack);
|
||||
nack = raxFind(group->pel,buf,sizeof(buf));
|
||||
serverAssert(nack != raxNotFound);
|
||||
raxRemove(nack->consumer->pel,buf,sizeof(buf),NULL);
|
||||
/* Update the consumer and idle time. */
|
||||
/* Update the consumer and NACK metadata. */
|
||||
nack->consumer = consumer;
|
||||
nack->delivery_time = mstime();
|
||||
nack->delivery_count++;
|
||||
nack->delivery_count = 1;
|
||||
/* Add the entry in the new consumer local PEL. */
|
||||
raxInsert(consumer->pel,buf,sizeof(buf),nack,NULL);
|
||||
} else if (retval == 1) {
|
||||
} else if (group_inserted == 1 && consumer_inserted == 0) {
|
||||
serverPanic("NACK half-created. Should not be possible.");
|
||||
}
|
||||
|
||||
@ -945,7 +959,7 @@ size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end
|
||||
/* This is an helper function for streamReplyWithRange() when called with
|
||||
* group and consumer arguments, but with a range that is referring to already
|
||||
* delivered messages. In this case we just emit messages that are already
|
||||
* in the history of the conusmer, fetching the IDs from its PEL.
|
||||
* in the history of the consumer, fetching the IDs from its PEL.
|
||||
*
|
||||
* Note that this function does not have a 'rev' argument because it's not
|
||||
* possible to iterate in reverse using a group. Basically this function
|
||||
@ -1081,7 +1095,7 @@ invalid:
|
||||
void xaddCommand(client *c) {
|
||||
streamID id;
|
||||
int id_given = 0; /* Was an ID different than "*" specified? */
|
||||
long long maxlen = 0; /* 0 means no maximum length. */
|
||||
long long maxlen = -1; /* If left to -1 no trimming is performed. */
|
||||
int approx_maxlen = 0; /* If 1 only delete whole radix tree nodes, so
|
||||
the maxium length is not applied verbatim. */
|
||||
int maxlen_arg_idx = 0; /* Index of the count in MAXLEN, for rewriting. */
|
||||
@ -1105,6 +1119,11 @@ void xaddCommand(client *c) {
|
||||
}
|
||||
if (getLongLongFromObjectOrReply(c,c->argv[i+1],&maxlen,NULL)
|
||||
!= C_OK) return;
|
||||
|
||||
if (maxlen < 0) {
|
||||
addReplyError(c,"The MAXLEN argument must be >= 0.");
|
||||
return;
|
||||
}
|
||||
i++;
|
||||
maxlen_arg_idx = i;
|
||||
} else {
|
||||
@ -1144,7 +1163,7 @@ void xaddCommand(client *c) {
|
||||
server.dirty++;
|
||||
|
||||
/* Remove older elements if MAXLEN was specified. */
|
||||
if (maxlen) {
|
||||
if (maxlen >= 0) {
|
||||
if (!streamTrimByLength(s,maxlen,approx_maxlen)) {
|
||||
/* If no trimming was performed, for instance because approximated
|
||||
* trimming length was specified, rewrite the MAXLEN argument
|
||||
@ -1335,6 +1354,14 @@ void xreadCommand(client *c) {
|
||||
}
|
||||
|
||||
if (strcmp(c->argv[i]->ptr,"$") == 0) {
|
||||
if (xreadgroup) {
|
||||
addReplyError(c,"The $ ID is meaningless in the context of "
|
||||
"XREADGROUP: you want to read the history of "
|
||||
"this consumer by specifying a proper ID, or "
|
||||
"use the > ID to get new messages. The $ ID would "
|
||||
"just return an empty result set.");
|
||||
goto cleanup;
|
||||
}
|
||||
if (o) {
|
||||
stream *s = o->ptr;
|
||||
ids[id_idx] = s->last_id;
|
||||
@ -1344,13 +1371,17 @@ void xreadCommand(client *c) {
|
||||
}
|
||||
continue;
|
||||
} else if (strcmp(c->argv[i]->ptr,">") == 0) {
|
||||
if (!xreadgroup || groupname == NULL) {
|
||||
if (!xreadgroup) {
|
||||
addReplyError(c,"The > ID can be specified only when calling "
|
||||
"XREADGROUP using the GROUP <group> "
|
||||
"<consumer> option.");
|
||||
goto cleanup;
|
||||
}
|
||||
ids[id_idx] = group->last_id;
|
||||
/* We use just the maximum ID to signal this is a ">" ID, anyway
|
||||
* the code handling the blocking clients will have to update the
|
||||
* ID later in order to match the changing consumer group last ID. */
|
||||
ids[id_idx].ms = UINT64_MAX;
|
||||
ids[id_idx].seq = UINT64_MAX;
|
||||
continue;
|
||||
}
|
||||
if (streamParseIDOrReply(c,c->argv[i],ids+id_idx,0) != C_OK)
|
||||
@ -1365,9 +1396,36 @@ void xreadCommand(client *c) {
|
||||
if (o == NULL) continue;
|
||||
stream *s = o->ptr;
|
||||
streamID *gt = ids+i; /* ID must be greater than this. */
|
||||
if (s->last_id.ms > gt->ms ||
|
||||
(s->last_id.ms == gt->ms && s->last_id.seq > gt->seq))
|
||||
{
|
||||
int serve_synchronously = 0;
|
||||
|
||||
/* Check if there are the conditions to serve the client synchronously. */
|
||||
if (groups) {
|
||||
/* If the consumer is blocked on a group, we always serve it
|
||||
* synchronously (serving its local history) if the ID specified
|
||||
* was not the special ">" ID. */
|
||||
if (gt->ms != UINT64_MAX ||
|
||||
gt->seq != UINT64_MAX)
|
||||
{
|
||||
serve_synchronously = 1;
|
||||
} else {
|
||||
/* We also want to serve a consumer in a consumer group
|
||||
* synchronously in case the group top item delivered is smaller
|
||||
* than what the stream has inside. */
|
||||
streamID *last = &groups[i]->last_id;
|
||||
if (streamCompareID(&s->last_id, last) > 0) {
|
||||
serve_synchronously = 1;
|
||||
*gt = *last;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
/* For consumers without a group, we serve synchronously if we can
|
||||
* actually provide at least one item from the stream. */
|
||||
if (streamCompareID(&s->last_id, gt) > 0) {
|
||||
serve_synchronously = 1;
|
||||
}
|
||||
}
|
||||
|
||||
if (serve_synchronously) {
|
||||
arraylen++;
|
||||
if (arraylen == 1) arraylen_ptr = addDeferredMultiBulkLength(c);
|
||||
/* streamReplyWithRange() handles the 'start' ID as inclusive,
|
||||
@ -1421,6 +1479,7 @@ void xreadCommand(client *c) {
|
||||
incrRefCount(consumername);
|
||||
c->bpop.xread_group = groupname;
|
||||
c->bpop.xread_consumer = consumername;
|
||||
c->bpop.xread_group_noack = noack;
|
||||
} else {
|
||||
c->bpop.xread_group = NULL;
|
||||
c->bpop.xread_consumer = NULL;
|
||||
@ -1566,7 +1625,7 @@ void xgroupCommand(client *c) {
|
||||
"CREATE <key> <groupname> <id or $> -- Create a new consumer group.",
|
||||
"SETID <key> <groupname> <id or $> -- Set the current group ID.",
|
||||
"DESTROY <key> <groupname> -- Remove the specified group.",
|
||||
"DELCONSUMER <key> <groupname> <consumer> -- Remove the specified conusmer.",
|
||||
"DELCONSUMER <key> <groupname> <consumer> -- Remove the specified consumer.",
|
||||
"HELP -- Prints this help.",
|
||||
NULL
|
||||
};
|
||||
@ -1645,7 +1704,7 @@ NULL
|
||||
} else if (!strcasecmp(opt,"HELP")) {
|
||||
addReplyHelp(c, help);
|
||||
} else {
|
||||
addReply(c,shared.syntaxerr);
|
||||
addReplySubcommandSyntaxError(c);
|
||||
}
|
||||
}
|
||||
|
||||
@ -2062,7 +2121,7 @@ void xclaimCommand(client *c) {
|
||||
/* XDEL <key> [<ID1> <ID2> ... <IDN>]
|
||||
*
|
||||
* Removes the specified entries from the stream. Returns the number
|
||||
* of items actaully deleted, that may be different from the number
|
||||
* of items actually deleted, that may be different from the number
|
||||
* of IDs passed in case certain IDs do not exist. */
|
||||
void xdelCommand(client *c) {
|
||||
robj *o;
|
||||
@ -2073,21 +2132,25 @@ void xdelCommand(client *c) {
|
||||
|
||||
/* We need to sanity check the IDs passed to start. Even if not
|
||||
* a big issue, it is not great that the command is only partially
|
||||
* executed becuase at some point an invalid ID is parsed. */
|
||||
* executed because at some point an invalid ID is parsed. */
|
||||
streamID id;
|
||||
for (int j = 2; j < c->argc; j++) {
|
||||
if (streamParseIDOrReply(c,c->argv[j],&id,0) != C_OK) return;
|
||||
}
|
||||
|
||||
/* Actaully apply the command. */
|
||||
/* Actually apply the command. */
|
||||
int deleted = 0;
|
||||
for (int j = 2; j < c->argc; j++) {
|
||||
streamParseIDOrReply(c,c->argv[j],&id,0); /* Retval already checked. */
|
||||
deleted += streamDeleteItem(s,&id);
|
||||
}
|
||||
signalModifiedKey(c->db,c->argv[1]);
|
||||
notifyKeyspaceEvent(NOTIFY_STREAM,"xdel",c->argv[1],c->db->id);
|
||||
server.dirty += deleted;
|
||||
|
||||
/* Propagate the write if needed. */
|
||||
if (deleted) {
|
||||
signalModifiedKey(c->db,c->argv[1]);
|
||||
notifyKeyspaceEvent(NOTIFY_STREAM,"xdel",c->argv[1],c->db->id);
|
||||
server.dirty += deleted;
|
||||
}
|
||||
addReplyLongLong(c,deleted);
|
||||
}
|
||||
|
||||
@ -2236,18 +2299,20 @@ NULL
|
||||
raxSeek(&ri,"^",NULL,0);
|
||||
while(raxNext(&ri)) {
|
||||
streamCG *cg = ri.data;
|
||||
addReplyMultiBulkLen(c,6);
|
||||
addReplyMultiBulkLen(c,8);
|
||||
addReplyStatus(c,"name");
|
||||
addReplyBulkCBuffer(c,ri.key,ri.key_len);
|
||||
addReplyStatus(c,"consumers");
|
||||
addReplyLongLong(c,raxSize(cg->consumers));
|
||||
addReplyStatus(c,"pending");
|
||||
addReplyLongLong(c,raxSize(cg->pel));
|
||||
addReplyStatus(c,"last-delivered-id");
|
||||
addReplyStreamID(c,&cg->last_id);
|
||||
}
|
||||
raxStop(&ri);
|
||||
} else if (!strcasecmp(opt,"STREAM") && c->argc == 3) {
|
||||
/* XINFO STREAM <key> (or the alias XINFO <key>). */
|
||||
addReplyMultiBulkLen(c,12);
|
||||
addReplyMultiBulkLen(c,14);
|
||||
addReplyStatus(c,"length");
|
||||
addReplyLongLong(c,s->length);
|
||||
addReplyStatus(c,"radix-tree-keys");
|
||||
@ -2256,6 +2321,8 @@ NULL
|
||||
addReplyLongLong(c,s->rax->numnodes);
|
||||
addReplyStatus(c,"groups");
|
||||
addReplyLongLong(c,s->cgroups ? raxSize(s->cgroups) : 0);
|
||||
addReplyStatus(c,"last-generated-id");
|
||||
addReplyStreamID(c,&s->last_id);
|
||||
|
||||
/* To emit the first/last entry we us the streamReplyWithRange()
|
||||
* API. */
|
||||
@ -2272,7 +2339,7 @@ NULL
|
||||
STREAM_RWR_RAWENTRIES,NULL);
|
||||
if (!count) addReply(c,shared.nullbulk);
|
||||
} else {
|
||||
addReplyError(c,"syntax error, try 'XINFO HELP'");
|
||||
addReplySubcommandSyntaxError(c);
|
||||
}
|
||||
}
|
||||
|
||||
|
Reference in New Issue
Block a user