mirror of
https://github.com/fluencelabs/redis
synced 2025-06-12 16:51:22 +00:00
CG: consumer lookup + initial streamReplyWithRange() work to supprot CG.
This commit is contained in:
@ -42,6 +42,7 @@
|
||||
|
||||
void streamFreeCG(streamCG *cg);
|
||||
streamCG *streamLookupCG(stream *s, sds groupname);
|
||||
streamConsumer *streamLookupConsumer(streamCG *cg, sds name);
|
||||
|
||||
/* -----------------------------------------------------------------------
|
||||
* Low level stream encoding: a radix tree of listpacks.
|
||||
@ -659,8 +660,19 @@ void streamIteratorStop(streamIterator *si) {
|
||||
* receive is between start and end inclusive, if 'count' is non zero, no more
|
||||
* than 'count' elemnets are sent. The 'end' pointer can be NULL to mean that
|
||||
* we want all the elements from 'start' till the end of the stream. If 'rev'
|
||||
* is non zero, elements are produced in reversed order from end to start. */
|
||||
size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end, size_t count, int rev) {
|
||||
* is non zero, elements are produced in reversed order from end to start.
|
||||
*
|
||||
* If group and consumer are not NULL, the function performs additional work:
|
||||
* 1. It updates the last delivered ID in the group in case we are
|
||||
* sending IDs greater than the current last ID.
|
||||
* 2. If the requested IDs are already assigned to some other consumer, the
|
||||
* function will not return it to the client.
|
||||
* 3. An entry in the pending list will be created for every entry delivered
|
||||
* for the first time to this consumer. This is only performed if
|
||||
* consumer != NULL, so in order to implement the XREADGROUP NOACK option
|
||||
* no consumer is passed to this function.
|
||||
*/
|
||||
size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end, size_t count, int rev, streamCG *group, streamConsumer *consumer) {
|
||||
void *arraylen_ptr = addDeferredMultiBulkLength(c);
|
||||
size_t arraylen = 0;
|
||||
streamIterator si;
|
||||
@ -903,7 +915,7 @@ void xrangeGenericCommand(client *c, int rev) {
|
||||
if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.emptymultibulk)) == NULL
|
||||
|| checkType(c,o,OBJ_STREAM)) return;
|
||||
s = o->ptr;
|
||||
streamReplyWithRange(c,s,&startid,&endid,count,rev);
|
||||
streamReplyWithRange(c,s,&startid,&endid,count,rev,NULL,NULL);
|
||||
}
|
||||
|
||||
/* XRANGE key start end [COUNT <n>] */
|
||||
@ -1063,13 +1075,18 @@ void xreadCommand(client *c) {
|
||||
* so start from the next ID, since we want only messages with
|
||||
* IDs greater than start. */
|
||||
streamID start = *gt;
|
||||
start.seq++; /* Can't overflow, it's an uint64_t */
|
||||
start.seq++; /* uint64_t can't overflow in this context. */
|
||||
|
||||
/* Emit the two elements sub-array consisting of the name
|
||||
* of the stream and the data we extracted from it. */
|
||||
addReplyMultiBulkLen(c,2);
|
||||
addReplyBulk(c,c->argv[i+streams_arg]);
|
||||
streamReplyWithRange(c,s,&start,NULL,count,0);
|
||||
streamConsumer *consumer = NULL;
|
||||
if (groups) consumer = streamLookupConsumer(groups[i],
|
||||
consumername->ptr);
|
||||
streamReplyWithRange(c,s,&start,NULL,count,0,
|
||||
groups ? groups[i] : NULL,
|
||||
consumer);
|
||||
}
|
||||
}
|
||||
|
||||
@ -1118,6 +1135,7 @@ void streamNotAckedFree(streamNotAcked *na) {
|
||||
}
|
||||
|
||||
void streamConsumerFree(streamConsumer *sc) {
|
||||
zfree(sc->name);
|
||||
zfree(sc);
|
||||
}
|
||||
|
||||
@ -1154,6 +1172,22 @@ streamCG *streamLookupCG(stream *s, sds groupname) {
|
||||
return (cg == raxNotFound) ? NULL : cg;
|
||||
}
|
||||
|
||||
/* Lookup the consumer with the specified name in the group 'cg': if the
|
||||
* consumer does not exist it is automatically created as a side effect
|
||||
* of calling this function, otherwise its last seen time is updated and
|
||||
* the existing consumer reference returned. */
|
||||
streamConsumer *streamLookupConsumer(streamCG *cg, sds name) {
|
||||
streamConsumer *c = raxFind(cg->consumers,(unsigned char*)name,
|
||||
sdslen(name));
|
||||
if (c == raxNotFound) {
|
||||
c = zmalloc(sizeof(*c));
|
||||
c->name = sdsdup(name);
|
||||
c->pel = raxNew();
|
||||
}
|
||||
c->seen_time = mstime();
|
||||
return c;
|
||||
}
|
||||
|
||||
/* -----------------------------------------------------------------------
|
||||
* Consumer groups commands
|
||||
* ----------------------------------------------------------------------- */
|
||||
|
Reference in New Issue
Block a user