Streams: use bulk replies instead of status replies.

They play better with Lua scripting, otherwise Lua will see status
replies as "ok" = "string" which is very odd, and actually as @oranagra
reasoned in issue #5456 in the rest of the Redis code base there was no
such concern as saving a few bytes when the protocol is emitted.
This commit is contained in:
antirez 2018-10-17 17:20:05 +02:00
parent bcc0916d08
commit 1eb0994ecc

View File

@ -777,8 +777,8 @@ int streamDeleteItem(stream *s, streamID *id) {
* in the standard <ms>-<seq> format, using the simple string protocol * in the standard <ms>-<seq> format, using the simple string protocol
* of REPL. */ * of REPL. */
void addReplyStreamID(client *c, streamID *id) { void addReplyStreamID(client *c, streamID *id) {
sds replyid = sdscatfmt(sdsempty(),"+%U-%U\r\n",id->ms,id->seq); sds replyid = sdscatfmt(sdsempty(),"%U-%U",id->ms,id->seq);
addReplySds(c,replyid); addReplyBulkSds(c,replyid);
} }
/* Similar to the above function, but just creates an object, usually useful /* Similar to the above function, but just creates an object, usually useful
@ -2456,11 +2456,11 @@ NULL
if (idle < 0) idle = 0; if (idle < 0) idle = 0;
addReplyMultiBulkLen(c,6); addReplyMultiBulkLen(c,6);
addReplyStatus(c,"name"); addReplyBulkCString(c,"name");
addReplyBulkCBuffer(c,consumer->name,sdslen(consumer->name)); addReplyBulkCBuffer(c,consumer->name,sdslen(consumer->name));
addReplyStatus(c,"pending"); addReplyBulkCString(c,"pending");
addReplyLongLong(c,raxSize(consumer->pel)); addReplyLongLong(c,raxSize(consumer->pel));
addReplyStatus(c,"idle"); addReplyBulkCString(c,"idle");
addReplyLongLong(c,idle); addReplyLongLong(c,idle);
} }
raxStop(&ri); raxStop(&ri);
@ -2478,28 +2478,28 @@ NULL
while(raxNext(&ri)) { while(raxNext(&ri)) {
streamCG *cg = ri.data; streamCG *cg = ri.data;
addReplyMultiBulkLen(c,8); addReplyMultiBulkLen(c,8);
addReplyStatus(c,"name"); addReplyBulkCString(c,"name");
addReplyBulkCBuffer(c,ri.key,ri.key_len); addReplyBulkCBuffer(c,ri.key,ri.key_len);
addReplyStatus(c,"consumers"); addReplyBulkCString(c,"consumers");
addReplyLongLong(c,raxSize(cg->consumers)); addReplyLongLong(c,raxSize(cg->consumers));
addReplyStatus(c,"pending"); addReplyBulkCString(c,"pending");
addReplyLongLong(c,raxSize(cg->pel)); addReplyLongLong(c,raxSize(cg->pel));
addReplyStatus(c,"last-delivered-id"); addReplyBulkCString(c,"last-delivered-id");
addReplyStreamID(c,&cg->last_id); addReplyStreamID(c,&cg->last_id);
} }
raxStop(&ri); raxStop(&ri);
} else if (!strcasecmp(opt,"STREAM") && c->argc == 3) { } else if (!strcasecmp(opt,"STREAM") && c->argc == 3) {
/* XINFO STREAM <key> (or the alias XINFO <key>). */ /* XINFO STREAM <key> (or the alias XINFO <key>). */
addReplyMultiBulkLen(c,14); addReplyMultiBulkLen(c,14);
addReplyStatus(c,"length"); addReplyBulkCString(c,"length");
addReplyLongLong(c,s->length); addReplyLongLong(c,s->length);
addReplyStatus(c,"radix-tree-keys"); addReplyBulkCString(c,"radix-tree-keys");
addReplyLongLong(c,raxSize(s->rax)); addReplyLongLong(c,raxSize(s->rax));
addReplyStatus(c,"radix-tree-nodes"); addReplyBulkCString(c,"radix-tree-nodes");
addReplyLongLong(c,s->rax->numnodes); addReplyLongLong(c,s->rax->numnodes);
addReplyStatus(c,"groups"); addReplyBulkCString(c,"groups");
addReplyLongLong(c,s->cgroups ? raxSize(s->cgroups) : 0); addReplyLongLong(c,s->cgroups ? raxSize(s->cgroups) : 0);
addReplyStatus(c,"last-generated-id"); addReplyBulkCString(c,"last-generated-id");
addReplyStreamID(c,&s->last_id); addReplyStreamID(c,&s->last_id);
/* To emit the first/last entry we us the streamReplyWithRange() /* To emit the first/last entry we us the streamReplyWithRange()
@ -2508,11 +2508,11 @@ NULL
streamID start, end; streamID start, end;
start.ms = start.seq = 0; start.ms = start.seq = 0;
end.ms = end.seq = UINT64_MAX; end.ms = end.seq = UINT64_MAX;
addReplyStatus(c,"first-entry"); addReplyBulkCString(c,"first-entry");
count = streamReplyWithRange(c,s,&start,&end,1,0,NULL,NULL, count = streamReplyWithRange(c,s,&start,&end,1,0,NULL,NULL,
STREAM_RWR_RAWENTRIES,NULL); STREAM_RWR_RAWENTRIES,NULL);
if (!count) addReply(c,shared.nullbulk); if (!count) addReply(c,shared.nullbulk);
addReplyStatus(c,"last-entry"); addReplyBulkCString(c,"last-entry");
count = streamReplyWithRange(c,s,&start,&end,1,1,NULL,NULL, count = streamReplyWithRange(c,s,&start,&end,1,1,NULL,NULL,
STREAM_RWR_RAWENTRIES,NULL); STREAM_RWR_RAWENTRIES,NULL);
if (!count) addReply(c,shared.nullbulk); if (!count) addReply(c,shared.nullbulk);