Stream & AOF: rewrite stream in correct way

This commit is contained in:
zhaozhao.zz 2018-10-08 21:23:38 +08:00 committed by antirez
parent 6455274d1c
commit d22f1ef032

View File

@ -1139,6 +1139,7 @@ int rewriteStreamObject(rio *r, robj *key, robj *o) {
streamID id; streamID id;
int64_t numfields; int64_t numfields;
if (s->length) {
/* Reconstruct the stream data using XADD commands. */ /* Reconstruct the stream data using XADD commands. */
while(streamIteratorGetID(&si,&id,&numfields)) { while(streamIteratorGetID(&si,&id,&numfields)) {
/* Emit a two elements array for each item. The first is /* Emit a two elements array for each item. The first is
@ -1157,6 +1158,21 @@ int rewriteStreamObject(rio *r, robj *key, robj *o) {
if (rioWriteBulkString(r,(char*)value,value_len) == 0) return 0; if (rioWriteBulkString(r,(char*)value,value_len) == 0) return 0;
} }
} }
/* Append XSTREAM SETID after XADD, make sure lastid is correct,
* in case of XDEL lastid. */
if (rioWriteBulkCount(r,'*',4) == 0) return 0;
if (rioWriteBulkString(r,"XSTREAM",7) == 0) return 0;
if (rioWriteBulkString(r,"SETID",5) == 0) return 0;
if (rioWriteBulkObject(r,key) == 0) return 0;
if (rioWriteBulkStreamID(r,&s->last_id) == 0) return 0;
} else {
/* Using XSTREAM CREATE if the stream is empty. */
if (rioWriteBulkCount(r,'*',4) == 0) return 0;
if (rioWriteBulkString(r,"XSTREAM",7) == 0) return 0;
if (rioWriteBulkString(r,"CREATE",6) == 0) return 0;
if (rioWriteBulkObject(r,key) == 0) return 0;
if (rioWriteBulkStreamID(r,&s->last_id) == 0) return 0;
}
/* Create all the stream consumer groups. */ /* Create all the stream consumer groups. */
if (s->cgroups) { if (s->cgroups) {