CG: Replication WIP 1: XREADGROUP and XCLAIM propagated as XCLAIM.

This commit is contained in:
antirez
2018-03-19 14:16:13 +01:00
parent 3c2a952912
commit 0b58ad301e
5 changed files with 93 additions and 16 deletions

View File

@ -672,6 +672,44 @@ void addReplyStreamID(client *c, streamID *id) {
addReplySds(c,replyid);
}
/* Similar to the above function, but just creates an object, usually useful
* for replication purposes to create arguments. */
robj *createObjectFromStreamID(streamID *id) {
return createObject(OBJ_STRING, sdscatfmt(sdsempty(),"%U-%U",
id->ms,id->seq));
}
/* As a result of an explicit XCLAIM or XREADGROUP command, new entries
* are created in the pending list of the stream and consumers. We need
* to propagate this changes in the form of XCLAIM commands. */
void streamPropagateXCLAIM(client *c, robj *key, robj *group, robj *id, streamNACK *nack) {
/* We need to generate an XCLAIM that will work in a idempotent fashion:
*
* XCLAIM <key> <group> <consumer> 0 <id> TIME <milliseconds-unix-time>
* RETRYCOUNT <count> [FORCE]. */
robj *argv[11];
argv[0] = createStringObject("XCLAIM",6);
argv[1] = key;
argv[2] = group;
argv[3] = createStringObject(nack->consumer->name,sdslen(nack->consumer->name));
argv[4] = createStringObjectFromLongLong(0);
argv[5] = id;
argv[6] = createStringObject("TIME",4);
argv[7] = createStringObjectFromLongLong(nack->delivery_time);
argv[8] = createStringObject("RETRYCOUNT",10);
argv[9] = createStringObjectFromLongLong(nack->delivery_count);
argv[10] = createStringObject("FORCE",5);
propagate(server.xclaimCommand,c->db->id,argv,10,PROPAGATE_AOF|PROPAGATE_REPL);
decrRefCount(argv[0]);
decrRefCount(argv[3]);
decrRefCount(argv[4]);
decrRefCount(argv[6]);
decrRefCount(argv[7]);
decrRefCount(argv[8]);
decrRefCount(argv[9]);
decrRefCount(argv[10]);
}
/* Send the specified range to the client 'c'. The range the client will
* receive is between start and end inclusive, if 'count' is non zero, no more
* than 'count' elemnets are sent. The 'end' pointer can be NULL to mean that
@ -695,6 +733,15 @@ void addReplyStreamID(client *c, streamID *id) {
* This is used when the function is just used in order
* to emit data and there is some higher level logic.
*
* The final argument 'spi' (stream propagatino info pointer) is a structure
* filled with information needed to propagte the command execution to AOF
* and slaves, in the case a consumer group was passed: we need to generate
* XCLAIM commands to create the pending list into AOF/slaves in that case.
*
* If 'spi' is set to NULL no propagation will happen even if the group was
* given, but currently such a feature is never used by the code base that
* will always pass 'spi' and propagate when a group is passed.
*
* Note that this function is recursive in certian cases. When it's called
* with a non NULL group and consumer argument, it may call
* streamReplyWithRangeFromConsumerPEL() in order to get entries from the
@ -706,7 +753,7 @@ void addReplyStreamID(client *c, streamID *id) {
#define STREAM_RWR_NOACK (1<<0) /* Do not create entries in the PEL. */
#define STREAM_RWR_RAWENTRIES (1<<1) /* Do not emit protocol for array
boundaries, just the entries. */
size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end, size_t count, int rev, streamCG *group, streamConsumer *consumer, int flags) {
size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end, size_t count, int rev, streamCG *group, streamConsumer *consumer, int flags, streamPropInfo *spi) {
void *arraylen_ptr = NULL;
size_t arraylen = 0;
streamIterator si;
@ -763,6 +810,13 @@ size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end
retval += raxInsert(group->pel,buf,sizeof(buf),nack,NULL);
retval += raxInsert(consumer->pel,buf,sizeof(buf),nack,NULL);
serverAssert(retval == 2); /* Make sure entry was inserted. */
/* Propagate as XCLAIM. */
if (spi) {
robj *idarg = createObjectFromStreamID(&id);
streamPropagateXCLAIM(c,spi->keyname,spi->groupname,idarg,nack);
decrRefCount(idarg);
}
}
arraylen++;
@ -802,7 +856,7 @@ size_t streamReplyWithRangeFromConsumerPEL(client *c, stream *s, streamID *start
streamID thisid;
streamDecodeID(ri.key,&thisid);
if (streamReplyWithRange(c,s,&thisid,NULL,1,0,NULL,NULL,
STREAM_RWR_RAWENTRIES) == 0)
STREAM_RWR_RAWENTRIES,NULL) == 0)
{
/* Note that we may have a not acknowledged entry in the PEL
* about a message that's no longer here because was removed
@ -992,8 +1046,7 @@ void xaddCommand(client *c) {
/* Let's rewrite the ID argument with the one actually generated for
* AOF/replication propagation. */
robj *idarg = createObject(OBJ_STRING,
sdscatfmt(sdsempty(),"%U-%U",id.ms,id.seq));
robj *idarg = createObjectFromStreamID(&id);
rewriteClientCommandArgument(c,i,idarg);
decrRefCount(idarg);
@ -1035,7 +1088,7 @@ void xrangeGenericCommand(client *c, int rev) {
if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.emptymultibulk)) == NULL
|| checkType(c,o,OBJ_STREAM)) return;
s = o->ptr;
streamReplyWithRange(c,s,&startid,&endid,count,rev,NULL,NULL,0);
streamReplyWithRange(c,s,&startid,&endid,count,rev,NULL,NULL,0,NULL);
}
/* XRANGE key start end [COUNT <n>] */
@ -1220,9 +1273,11 @@ void xreadCommand(client *c) {
streamConsumer *consumer = NULL;
if (groups) consumer = streamLookupConsumer(groups[i],
consumername->ptr,1);
streamPropInfo spi = {c->argv[i+streams_arg],groupname};
streamReplyWithRange(c,s,&start,NULL,count,0,
groups ? groups[i] : NULL,
consumer, noack);
consumer, noack, &spi);
if (groups) server.dirty++;
}
}
@ -1268,8 +1323,11 @@ void xreadCommand(client *c) {
addReply(c,shared.nullmultibulk);
/* Continue to cleanup... */
cleanup:
/* Cleanup. */
cleanup: /* Cleanup. */
/* The command is propagated (in the READGROUP form) as a side effect
* of calling lower level APIs. So stop any implicit propagation. */
preventCommandPropagation(c);
if (ids != static_ids) zfree(ids);
zfree(groups);
}
@ -1849,12 +1907,17 @@ void xclaimCommand(client *c) {
addReplyStreamID(c,&id);
} else {
streamReplyWithRange(c,o->ptr,&id,NULL,1,0,NULL,NULL,
STREAM_RWR_RAWENTRIES);
STREAM_RWR_RAWENTRIES,NULL);
}
arraylen++;
/* Propagate this change. */
streamPropagateXCLAIM(c,c->argv[1],c->argv[3],c->argv[j],nack);
server.dirty++;
}
}
setDeferredMultiBulkLength(c,arraylenptr,arraylen);
preventCommandPropagation(c);
}
/* XINFO <key> [CONSUMERS group|GROUPS|STREAM]. STREAM is the default */
@ -1951,11 +2014,11 @@ NULL
end.ms = end.seq = UINT64_MAX;
addReplyStatus(c,"first-entry");
count = streamReplyWithRange(c,s,&start,&end,1,0,NULL,NULL,
STREAM_RWR_RAWENTRIES);
STREAM_RWR_RAWENTRIES,NULL);
if (!count) addReply(c,shared.nullbulk);
addReplyStatus(c,"last-entry");
count = streamReplyWithRange(c,s,&start,&end,1,1,NULL,NULL,
STREAM_RWR_RAWENTRIES);
STREAM_RWR_RAWENTRIES,NULL);
if (!count) addReply(c,shared.nullbulk);
} else if (!strcasecmp(opt,"HELP")) {
addReplyHelp(c, help);