Merge branch 'unstable' into rm_get_server_info

This commit is contained in:
Salvatore Sanfilippo
2019-11-21 10:06:15 +01:00
committed by GitHub
44 changed files with 2691 additions and 286 deletions

View File

@ -67,6 +67,12 @@ void freeStream(stream *s) {
zfree(s);
}
/* Return the length of a stream. */
unsigned long streamLength(const robj *subject) {
stream *s = subject->ptr;
return s->length;
}
/* Generate the next stream item ID given the previous one. If the current
* milliseconds Unix time is greater than the previous one, just use this
* as time part and start with sequence part of zero. Otherwise we use the
@ -173,9 +179,19 @@ int streamCompareID(streamID *a, streamID *b) {
* C_ERR if an ID was given via 'use_id', but adding it failed since the
* current top ID is greater or equal. */
int streamAppendItem(stream *s, robj **argv, int64_t numfields, streamID *added_id, streamID *use_id) {
/* If an ID was given, check that it's greater than the last entry ID
* or return an error. */
if (use_id && streamCompareID(use_id,&s->last_id) <= 0) return C_ERR;
/* Generate the new entry ID. */
streamID id;
if (use_id)
id = *use_id;
else
streamNextID(&s->last_id,&id);
/* Check that the new ID is greater than the last entry ID
* or return an error. Automatically generated IDs might
* overflow (and wrap-around) when incrementing the sequence
part. */
if (streamCompareID(&id,&s->last_id) <= 0) return C_ERR;
/* Add the new entry. */
raxIterator ri;
@ -192,13 +208,6 @@ int streamAppendItem(stream *s, robj **argv, int64_t numfields, streamID *added_
}
raxStop(&ri);
/* Generate the new entry ID. */
streamID id;
if (use_id)
id = *use_id;
else
streamNextID(&s->last_id,&id);
/* We have to add the key into the radix tree in lexicographic order,
* to do so we consider the ID as a single 128 bit number written in
* big endian, so that the most significant bytes are the first ones. */
@ -1197,6 +1206,14 @@ void xaddCommand(client *c) {
return;
}
/* Return ASAP if minimal ID (0-0) was given so we avoid possibly creating
* a new stream and have streamAppendItem fail, leaving an empty key in the
* database. */
if (id_given && id.ms == 0 && id.seq == 0) {
addReplyError(c,"The ID specified in XADD must be greater than 0-0");
return;
}
/* Lookup the stream at key. */
robj *o;
stream *s;