Merge branch 'unstable' into modules_fork

This commit is contained in:
Salvatore Sanfilippo
2019-09-27 11:24:06 +02:00
committed by GitHub
82 changed files with 4524 additions and 871 deletions

264
src/rdb.c
View File

@ -42,31 +42,35 @@
#include <sys/stat.h>
#include <sys/param.h>
#define rdbExitReportCorruptRDB(...) rdbCheckThenExit(__LINE__,__VA_ARGS__)
/* This macro is called when the internal RDB stracture is corrupt */
#define rdbExitReportCorruptRDB(...) rdbReportError(1, __LINE__,__VA_ARGS__)
/* This macro is called when RDB read failed (possibly a short read) */
#define rdbReportReadError(...) rdbReportError(0, __LINE__,__VA_ARGS__)
char* rdbFileBeingLoaded = NULL; /* used for rdb checking on read error */
extern int rdbCheckMode;
void rdbCheckError(const char *fmt, ...);
void rdbCheckSetError(const char *fmt, ...);
void rdbCheckThenExit(int linenum, char *reason, ...) {
void rdbReportError(int corruption_error, int linenum, char *reason, ...) {
va_list ap;
char msg[1024];
int len;
len = snprintf(msg,sizeof(msg),
"Internal error in RDB reading function at rdb.c:%d -> ", linenum);
"Internal error in RDB reading offset %llu, function at rdb.c:%d -> ",
(unsigned long long)server.loading_loaded_bytes, linenum);
va_start(ap,reason);
vsnprintf(msg+len,sizeof(msg)-len,reason,ap);
va_end(ap);
if (!rdbCheckMode) {
serverLog(LL_WARNING, "%s", msg);
if (rdbFileBeingLoaded) {
if (rdbFileBeingLoaded || corruption_error) {
serverLog(LL_WARNING, "%s", msg);
char *argv[2] = {"",rdbFileBeingLoaded};
redis_check_rdb_main(2,argv,NULL);
} else {
serverLog(LL_WARNING, "Failure loading rdb format from socket, assuming connection error, resuming operation.");
serverLog(LL_WARNING, "%s. Failure loading rdb format from socket, assuming connection error, resuming operation.", msg);
return;
}
} else {
@ -82,18 +86,6 @@ static int rdbWriteRaw(rio *rdb, void *p, size_t len) {
return len;
}
/* This is just a wrapper for the low level function rioRead() that will
* automatically abort if it is not possible to read the specified amount
* of bytes. */
void rdbLoadRaw(rio *rdb, void *buf, uint64_t len) {
if (rioRead(rdb,buf,len) == 0) {
rdbExitReportCorruptRDB(
"Impossible to read %llu bytes in rdbLoadRaw()",
(unsigned long long) len);
return; /* Not reached. */
}
}
int rdbSaveType(rio *rdb, unsigned char type) {
return rdbWriteRaw(rdb,&type,1);
}
@ -109,10 +101,12 @@ int rdbLoadType(rio *rdb) {
/* This is only used to load old databases stored with the RDB_OPCODE_EXPIRETIME
* opcode. New versions of Redis store using the RDB_OPCODE_EXPIRETIME_MS
* opcode. */
* opcode. On error -1 is returned, however this could be a valid time, so
* to check for loading errors the caller should call rioGetReadError() after
* calling this function. */
time_t rdbLoadTime(rio *rdb) {
int32_t t32;
rdbLoadRaw(rdb,&t32,4);
if (rioRead(rdb,&t32,4) == 0) return -1;
return (time_t)t32;
}
@ -132,10 +126,14 @@ int rdbSaveMillisecondTime(rio *rdb, long long t) {
* after upgrading to Redis version 5 they will no longer be able to load their
* own old RDB files. Because of that, we instead fix the function only for new
* RDB versions, and load older RDB versions as we used to do in the past,
* allowing big endian systems to load their own old RDB files. */
* allowing big endian systems to load their own old RDB files.
*
* On I/O error the function returns LLONG_MAX, however if this is also a
* valid stored value, the caller should use rioGetReadError() to check for
* errors after calling this function. */
long long rdbLoadMillisecondTime(rio *rdb, int rdbver) {
int64_t t64;
rdbLoadRaw(rdb,&t64,8);
if (rioRead(rdb,&t64,8) == 0) return LLONG_MAX;
if (rdbver >= 9) /* Check the top comment of this function. */
memrev64ifbe(&t64); /* Convert in big endian if the system is BE. */
return (long long)t64;
@ -262,7 +260,7 @@ int rdbEncodeInteger(long long value, unsigned char *enc) {
/* Loads an integer-encoded object with the specified encoding type "enctype".
* The returned value changes according to the flags, see
* rdbGenerincLoadStringObject() for more info. */
* rdbGenericLoadStringObject() for more info. */
void *rdbLoadIntegerObject(rio *rdb, int enctype, int flags, size_t *lenptr) {
int plain = flags & RDB_LOAD_PLAIN;
int sds = flags & RDB_LOAD_SDS;
@ -284,8 +282,8 @@ void *rdbLoadIntegerObject(rio *rdb, int enctype, int flags, size_t *lenptr) {
v = enc[0]|(enc[1]<<8)|(enc[2]<<16)|(enc[3]<<24);
val = (int32_t)v;
} else {
val = 0; /* anti-warning */
rdbExitReportCorruptRDB("Unknown RDB integer encoding type %d",enctype);
return NULL; /* Never reached. */
}
if (plain || sds) {
char buf[LONG_STR_SIZE], *p;
@ -388,8 +386,7 @@ void *rdbLoadLzfStringObject(rio *rdb, int flags, size_t *lenptr) {
/* Load the compressed representation and uncompress it to target. */
if (rioRead(rdb,c,clen) == 0) goto err;
if (lzf_decompress(c,clen,val,len) == 0) {
if (rdbCheckMode) rdbCheckSetError("Invalid LZF compressed string");
goto err;
rdbExitReportCorruptRDB("Invalid LZF compressed string");
}
zfree(c);
@ -503,6 +500,7 @@ void *rdbGenericLoadStringObject(rio *rdb, int flags, size_t *lenptr) {
return rdbLoadLzfStringObject(rdb,flags,lenptr);
default:
rdbExitReportCorruptRDB("Unknown RDB string encoding type %d",len);
return NULL; /* Never reached. */
}
}
@ -973,7 +971,6 @@ ssize_t rdbSaveObject(rio *rdb, robj *o, robj *key) {
RedisModuleIO io;
moduleValue *mv = o->ptr;
moduleType *mt = mv->type;
moduleInitIOContext(io,mt,rdb,key);
/* Write the "module" identifier as prefix, so that we'll be able
* to call the right module during loading. */
@ -982,10 +979,13 @@ ssize_t rdbSaveObject(rio *rdb, robj *o, robj *key) {
io.bytes += retval;
/* Then write the module-specific representation + EOF marker. */
moduleInitIOContext(io,mt,rdb,key);
mt->rdb_save(&io,mv->value);
retval = rdbSaveLen(rdb,RDB_MODULE_OPCODE_EOF);
if (retval == -1) return -1;
io.bytes += retval;
if (retval == -1)
io.error = 1;
else
io.bytes += retval;
if (io.ctx) {
moduleFreeContext(io.ctx);
@ -1103,6 +1103,45 @@ int rdbSaveInfoAuxFields(rio *rdb, int flags, rdbSaveInfo *rsi) {
return 1;
}
ssize_t rdbSaveSingleModuleAux(rio *rdb, int when, moduleType *mt) {
/* Save a module-specific aux value. */
RedisModuleIO io;
int retval = rdbSaveType(rdb, RDB_OPCODE_MODULE_AUX);
/* Write the "module" identifier as prefix, so that we'll be able
* to call the right module during loading. */
retval = rdbSaveLen(rdb,mt->id);
if (retval == -1) return -1;
io.bytes += retval;
/* write the 'when' so that we can provide it on loading. add a UINT opcode
* for backwards compatibility, everything after the MT needs to be prefixed
* by an opcode. */
retval = rdbSaveLen(rdb,RDB_MODULE_OPCODE_UINT);
if (retval == -1) return -1;
io.bytes += retval;
retval = rdbSaveLen(rdb,when);
if (retval == -1) return -1;
io.bytes += retval;
/* Then write the module-specific representation + EOF marker. */
moduleInitIOContext(io,mt,rdb,NULL);
mt->aux_save(&io,when);
retval = rdbSaveLen(rdb,RDB_MODULE_OPCODE_EOF);
if (retval == -1)
io.error = 1;
else
io.bytes += retval;
if (io.ctx) {
moduleFreeContext(io.ctx);
zfree(io.ctx);
}
if (io.error)
return -1;
return io.bytes;
}
/* Produces a dump of the database in RDB format sending it to the specified
* Redis I/O channel. On success C_OK is returned, otherwise C_ERR
* is returned and part of the output, or all the output, can be
@ -1124,6 +1163,7 @@ int rdbSaveRio(rio *rdb, int *error, int flags, rdbSaveInfo *rsi) {
snprintf(magic,sizeof(magic),"REDIS%04d",RDB_VERSION);
if (rdbWriteRaw(rdb,magic,9) == -1) goto werr;
if (rdbSaveInfoAuxFields(rdb,flags,rsi) == -1) goto werr;
if (rdbSaveModulesAux(rdb, REDISMODULE_AUX_BEFORE_RDB) == -1) goto werr;
for (j = 0; j < server.dbnum; j++) {
redisDb *db = server.db+j;
@ -1185,6 +1225,8 @@ int rdbSaveRio(rio *rdb, int *error, int flags, rdbSaveInfo *rsi) {
di = NULL; /* So that we don't release it again on error. */
}
if (rdbSaveModulesAux(rdb, REDISMODULE_AUX_AFTER_RDB) == -1) goto werr;
/* EOF opcode */
if (rdbSaveType(rdb,RDB_OPCODE_EOF) == -1) goto werr;
@ -1628,6 +1670,7 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, robj *key) {
hashTypeConvert(o, OBJ_ENCODING_HT);
break;
default:
/* totally unreachable */
rdbExitReportCorruptRDB("Unknown RDB encoding type %d",rdbtype);
break;
}
@ -1635,6 +1678,11 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, robj *key) {
o = createStreamObject();
stream *s = o->ptr;
uint64_t listpacks = rdbLoadLen(rdb,NULL);
if (listpacks == RDB_LENERR) {
rdbReportReadError("Stream listpacks len loading failed.");
decrRefCount(o);
return NULL;
}
while(listpacks--) {
/* Get the master ID, the one we'll use as key of the radix tree
@ -1642,7 +1690,9 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, robj *key) {
* relatively to this ID. */
sds nodekey = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL);
if (nodekey == NULL) {
rdbExitReportCorruptRDB("Stream master ID loading failed: invalid encoding or I/O error.");
rdbReportReadError("Stream master ID loading failed: invalid encoding or I/O error.");
decrRefCount(o);
return NULL;
}
if (sdslen(nodekey) != sizeof(streamID)) {
rdbExitReportCorruptRDB("Stream node key entry is not the "
@ -1652,7 +1702,12 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, robj *key) {
/* Load the listpack. */
unsigned char *lp =
rdbGenericLoadStringObject(rdb,RDB_LOAD_PLAIN,NULL);
if (lp == NULL) return NULL;
if (lp == NULL) {
rdbReportReadError("Stream listpacks loading failed.");
sdsfree(nodekey);
decrRefCount(o);
return NULL;
}
unsigned char *first = lpFirst(lp);
if (first == NULL) {
/* Serialized listpacks should never be empty, since on
@ -1670,12 +1725,24 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, robj *key) {
}
/* Load total number of items inside the stream. */
s->length = rdbLoadLen(rdb,NULL);
/* Load the last entry ID. */
s->last_id.ms = rdbLoadLen(rdb,NULL);
s->last_id.seq = rdbLoadLen(rdb,NULL);
if (rioGetReadError(rdb)) {
rdbReportReadError("Stream object metadata loading failed.");
decrRefCount(o);
return NULL;
}
/* Consumer groups loading */
size_t cgroups_count = rdbLoadLen(rdb,NULL);
uint64_t cgroups_count = rdbLoadLen(rdb,NULL);
if (cgroups_count == RDB_LENERR) {
rdbReportReadError("Stream cgroup count loading failed.");
decrRefCount(o);
return NULL;
}
while(cgroups_count--) {
/* Get the consumer group name and ID. We can then create the
* consumer group ASAP and populate its structure as
@ -1683,11 +1750,21 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, robj *key) {
streamID cg_id;
sds cgname = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL);
if (cgname == NULL) {
rdbExitReportCorruptRDB(
rdbReportReadError(
"Error reading the consumer group name from Stream");
decrRefCount(o);
return NULL;
}
cg_id.ms = rdbLoadLen(rdb,NULL);
cg_id.seq = rdbLoadLen(rdb,NULL);
if (rioGetReadError(rdb)) {
rdbReportReadError("Stream cgroup ID loading failed.");
sdsfree(cgname);
decrRefCount(o);
return NULL;
}
streamCG *cgroup = streamCreateCG(s,cgname,sdslen(cgname),&cg_id);
if (cgroup == NULL)
rdbExitReportCorruptRDB("Duplicated consumer group name %s",
@ -1699,13 +1776,28 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, robj *key) {
* owner, since consumers for this group and their messages will
* be read as a next step. So for now leave them not resolved
* and later populate it. */
size_t pel_size = rdbLoadLen(rdb,NULL);
uint64_t pel_size = rdbLoadLen(rdb,NULL);
if (pel_size == RDB_LENERR) {
rdbReportReadError("Stream PEL size loading failed.");
decrRefCount(o);
return NULL;
}
while(pel_size--) {
unsigned char rawid[sizeof(streamID)];
rdbLoadRaw(rdb,rawid,sizeof(rawid));
if (rioRead(rdb,rawid,sizeof(rawid)) == 0) {
rdbReportReadError("Stream PEL ID loading failed.");
decrRefCount(o);
return NULL;
}
streamNACK *nack = streamCreateNACK(NULL);
nack->delivery_time = rdbLoadMillisecondTime(rdb,RDB_VERSION);
nack->delivery_count = rdbLoadLen(rdb,NULL);
if (rioGetReadError(rdb)) {
rdbReportReadError("Stream PEL NACK loading failed.");
decrRefCount(o);
streamFreeNACK(nack);
return NULL;
}
if (!raxInsert(cgroup->pel,rawid,sizeof(rawid),nack,NULL))
rdbExitReportCorruptRDB("Duplicated gobal PEL entry "
"loading stream consumer group");
@ -1713,24 +1805,47 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, robj *key) {
/* Now that we loaded our global PEL, we need to load the
* consumers and their local PELs. */
size_t consumers_num = rdbLoadLen(rdb,NULL);
uint64_t consumers_num = rdbLoadLen(rdb,NULL);
if (consumers_num == RDB_LENERR) {
rdbReportReadError("Stream consumers num loading failed.");
decrRefCount(o);
return NULL;
}
while(consumers_num--) {
sds cname = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL);
if (cname == NULL) {
rdbExitReportCorruptRDB(
"Error reading the consumer name from Stream group");
rdbReportReadError(
"Error reading the consumer name from Stream group.");
decrRefCount(o);
return NULL;
}
streamConsumer *consumer = streamLookupConsumer(cgroup,cname,
1);
sdsfree(cname);
consumer->seen_time = rdbLoadMillisecondTime(rdb,RDB_VERSION);
if (rioGetReadError(rdb)) {
rdbReportReadError("Stream short read reading seen time.");
decrRefCount(o);
return NULL;
}
/* Load the PEL about entries owned by this specific
* consumer. */
pel_size = rdbLoadLen(rdb,NULL);
if (pel_size == RDB_LENERR) {
rdbReportReadError(
"Stream consumer PEL num loading failed.");
decrRefCount(o);
return NULL;
}
while(pel_size--) {
unsigned char rawid[sizeof(streamID)];
rdbLoadRaw(rdb,rawid,sizeof(rawid));
if (rioRead(rdb,rawid,sizeof(rawid)) == 0) {
rdbReportReadError(
"Stream short read reading PEL streamID.");
decrRefCount(o);
return NULL;
}
streamNACK *nack = raxFind(cgroup->pel,rawid,sizeof(rawid));
if (nack == raxNotFound)
rdbExitReportCorruptRDB("Consumer entry not found in "
@ -1749,6 +1864,7 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, robj *key) {
}
} else if (rdbtype == RDB_TYPE_MODULE || rdbtype == RDB_TYPE_MODULE_2) {
uint64_t moduleid = rdbLoadLen(rdb,NULL);
if (rioGetReadError(rdb)) return NULL;
moduleType *mt = moduleTypeLookupModuleByID(moduleid);
char name[10];
@ -1776,6 +1892,11 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, robj *key) {
/* Module v2 serialization has an EOF mark at the end. */
if (io.ver == 2) {
uint64_t eof = rdbLoadLen(rdb,NULL);
if (eof == RDB_LENERR) {
o = createModuleObject(mt,ptr); /* creating just in order to easily destroy */
decrRefCount(o);
return NULL;
}
if (eof != RDB_MODULE_OPCODE_EOF) {
serverLog(LL_WARNING,"The RDB file contains module data for the module '%s' that is not terminated by the proper module value EOF marker", name);
exit(1);
@ -1789,7 +1910,8 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, robj *key) {
}
o = createModuleObject(mt,ptr);
} else {
rdbExitReportCorruptRDB("Unknown RDB encoding type %d",rdbtype);
rdbReportReadError("Unknown RDB encoding type %d",rdbtype);
return NULL;
}
return o;
}
@ -1888,11 +2010,13 @@ int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi, int loading_aof) {
* load the actual type, and continue. */
expiretime = rdbLoadTime(rdb);
expiretime *= 1000;
if (rioGetReadError(rdb)) goto eoferr;
continue; /* Read next opcode. */
} else if (type == RDB_OPCODE_EXPIRETIME_MS) {
/* EXPIRETIME_MS: milliseconds precision expire times introduced
* with RDB v3. Like EXPIRETIME but no with more precision. */
expiretime = rdbLoadMillisecondTime(rdb,rdbver);
if (rioGetReadError(rdb)) goto eoferr;
continue; /* Read next opcode. */
} else if (type == RDB_OPCODE_FREQ) {
/* FREQ: LFU frequency. */
@ -1993,15 +2117,15 @@ int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi, int loading_aof) {
decrRefCount(auxval);
continue; /* Read type again. */
} else if (type == RDB_OPCODE_MODULE_AUX) {
/* This is just for compatibility with the future: we have plans
* to add the ability for modules to store anything in the RDB
* file, like data that is not related to the Redis key space.
* Such data will potentially be stored both before and after the
* RDB keys-values section. For this reason since RDB version 9,
* we have the ability to read a MODULE_AUX opcode followed by an
* identifier of the module, and a serialized value in "MODULE V2"
* format. */
/* Load module data that is not related to the Redis key space.
* Such data can be potentially be stored both before and after the
* RDB keys-values section. */
uint64_t moduleid = rdbLoadLen(rdb,NULL);
int when_opcode = rdbLoadLen(rdb,NULL);
int when = rdbLoadLen(rdb,NULL);
if (rioGetReadError(rdb)) goto eoferr;
if (when_opcode != RDB_MODULE_OPCODE_UINT)
rdbReportReadError("bad when_opcode");
moduleType *mt = moduleTypeLookupModuleByID(moduleid);
char name[10];
moduleTypeNameByID(name,moduleid);
@ -2011,14 +2135,37 @@ int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi, int loading_aof) {
serverLog(LL_WARNING,"The RDB file contains AUX module data I can't load: no matching module '%s'", name);
exit(1);
} else if (!rdbCheckMode && mt != NULL) {
/* This version of Redis actually does not know what to do
* with modules AUX data... */
serverLog(LL_WARNING,"The RDB file contains AUX module data I can't load for the module '%s'. Probably you want to use a newer version of Redis which implements aux data callbacks", name);
exit(1);
if (!mt->aux_load) {
/* Module doesn't support AUX. */
serverLog(LL_WARNING,"The RDB file contains module AUX data, but the module '%s' doesn't seem to support it.", name);
exit(1);
}
RedisModuleIO io;
moduleInitIOContext(io,mt,rdb,NULL);
io.ver = 2;
/* Call the rdb_load method of the module providing the 10 bit
* encoding version in the lower 10 bits of the module ID. */
if (mt->aux_load(&io,moduleid&1023, when) || io.error) {
moduleTypeNameByID(name,moduleid);
serverLog(LL_WARNING,"The RDB file contains module AUX data for the module type '%s', that the responsible module is not able to load. Check for modules log above for additional clues.", name);
exit(1);
}
if (io.ctx) {
moduleFreeContext(io.ctx);
zfree(io.ctx);
}
uint64_t eof = rdbLoadLen(rdb,NULL);
if (eof != RDB_MODULE_OPCODE_EOF) {
serverLog(LL_WARNING,"The RDB file contains module AUX data for the module '%s' that is not terminated by the proper module value EOF marker", name);
exit(1);
}
continue;
} else {
/* RDB check mode. */
robj *aux = rdbLoadCheckModuleValue(rdb,name);
decrRefCount(aux);
continue; /* Read next opcode. */
}
}
@ -2072,10 +2219,15 @@ int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi, int loading_aof) {
}
return C_OK;
eoferr: /* unexpected end of file is handled here with a fatal exit */
serverLog(LL_WARNING,"Short read or OOM loading DB. Unrecoverable error, aborting now.");
rdbExitReportCorruptRDB("Unexpected EOF reading RDB file");
return C_ERR; /* Just to avoid warning */
/* Unexpected end of file is handled here calling rdbReportReadError():
* this will in turn either abort Redis in most cases, or if we are loading
* the RDB file from a socket during initial SYNC (diskless replica mode),
* we'll report the error to the caller, so that we can retry. */
eoferr:
serverLog(LL_WARNING,
"Short read or OOM loading DB. Unrecoverable error, aborting now.");
rdbReportReadError("Unexpected EOF reading RDB file");
return C_ERR;
}
/* Like rdbLoadRio() but takes a filename instead of a rio stream. The