mirror of
https://github.com/fluencelabs/redis
synced 2025-06-14 09:41:21 +00:00
Blocking XREAD[GROUP] should always reply with valid data (or timeout)
This commit solves the following bug: 127.0.0.1:6379> XGROUP CREATE x grp $ MKSTREAM OK 127.0.0.1:6379> XADD x 666 f v "666-0" 127.0.0.1:6379> XREADGROUP GROUP grp Alice BLOCK 0 STREAMS x > 1) 1) "x" 2) 1) 1) "666-0" 2) 1) "f" 2) "v" 127.0.0.1:6379> XADD x 667 f v "667-0" 127.0.0.1:6379> XDEL x 667 (integer) 1 127.0.0.1:6379> XREADGROUP GROUP grp Alice BLOCK 0 STREAMS x > 1) 1) "x" 2) (empty array) The root cause is that we use s->last_id in streamCompareID while we should use the last *valid* ID
This commit is contained in:
@ -782,6 +782,16 @@ int streamDeleteItem(stream *s, streamID *id) {
|
||||
return deleted;
|
||||
}
|
||||
|
||||
/* Get the last valid (non-tombstone) streamID of 's'. */
|
||||
void streamLastValidID(stream *s, streamID *maxid)
|
||||
{
|
||||
streamIterator si;
|
||||
streamIteratorStart(&si,s,NULL,NULL,1);
|
||||
int64_t numfields;
|
||||
streamIteratorGetID(&si,maxid,&numfields);
|
||||
streamIteratorStop(&si);
|
||||
}
|
||||
|
||||
/* Emit a reply in the client output buffer by formatting a Stream ID
|
||||
* in the standard <ms>-<seq> format, using the simple string protocol
|
||||
* of REPL. */
|
||||
@ -1484,20 +1494,23 @@ void xreadCommand(client *c) {
|
||||
{
|
||||
serve_synchronously = 1;
|
||||
serve_history = 1;
|
||||
} else {
|
||||
} else if (s->length) {
|
||||
/* We also want to serve a consumer in a consumer group
|
||||
* synchronously in case the group top item delivered is smaller
|
||||
* than what the stream has inside. */
|
||||
streamID *last = &groups[i]->last_id;
|
||||
if (s->length && (streamCompareID(&s->last_id, last) > 0)) {
|
||||
streamID maxid, *last = &groups[i]->last_id;
|
||||
streamLastValidID(s, &maxid);
|
||||
if (streamCompareID(&maxid, last) > 0) {
|
||||
serve_synchronously = 1;
|
||||
*gt = *last;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
} else if (s->length) {
|
||||
/* For consumers without a group, we serve synchronously if we can
|
||||
* actually provide at least one item from the stream. */
|
||||
if (s->length && (streamCompareID(&s->last_id, gt) > 0)) {
|
||||
streamID maxid;
|
||||
streamLastValidID(s, &maxid);
|
||||
if (streamCompareID(&maxid, gt) > 0) {
|
||||
serve_synchronously = 1;
|
||||
}
|
||||
}
|
||||
@ -1849,11 +1862,7 @@ void xsetidCommand(client *c) {
|
||||
* item, otherwise the fundamental ID monotonicity assumption is violated. */
|
||||
if (s->length > 0) {
|
||||
streamID maxid;
|
||||
streamIterator si;
|
||||
streamIteratorStart(&si,s,NULL,NULL,1);
|
||||
int64_t numfields;
|
||||
streamIteratorGetID(&si,&maxid,&numfields);
|
||||
streamIteratorStop(&si);
|
||||
streamLastValidID(s,&maxid);
|
||||
|
||||
if (streamCompareID(&id,&maxid) < 0) {
|
||||
addReplyError(c,"The ID specified in XSETID is smaller than the "
|
||||
|
Reference in New Issue
Block a user