mirror of
https://github.com/fluencelabs/redis
synced 2025-06-16 10:41:22 +00:00
Merge pull request #6264 from oranagra/modules_api_aux_rdb
Implement module api for aux data in rdb
This commit is contained in:
41
src/module.c
41
src/module.c
@ -29,6 +29,7 @@
|
||||
|
||||
#include "server.h"
|
||||
#include "cluster.h"
|
||||
#include "rdb.h"
|
||||
#include <dlfcn.h>
|
||||
|
||||
#define REDISMODULE_CORE 1
|
||||
@ -3078,6 +3079,11 @@ moduleType *RM_CreateDataType(RedisModuleCtx *ctx, const char *name, int encver,
|
||||
moduleTypeMemUsageFunc mem_usage;
|
||||
moduleTypeDigestFunc digest;
|
||||
moduleTypeFreeFunc free;
|
||||
struct {
|
||||
moduleTypeAuxLoadFunc aux_load;
|
||||
moduleTypeAuxSaveFunc aux_save;
|
||||
int aux_save_triggers;
|
||||
} v2;
|
||||
} *tms = (struct typemethods*) typemethods_ptr;
|
||||
|
||||
moduleType *mt = zcalloc(sizeof(*mt));
|
||||
@ -3089,6 +3095,11 @@ moduleType *RM_CreateDataType(RedisModuleCtx *ctx, const char *name, int encver,
|
||||
mt->mem_usage = tms->mem_usage;
|
||||
mt->digest = tms->digest;
|
||||
mt->free = tms->free;
|
||||
if (tms->version >= 2) {
|
||||
mt->aux_load = tms->v2.aux_load;
|
||||
mt->aux_save = tms->v2.aux_save;
|
||||
mt->aux_save_triggers = tms->v2.aux_save_triggers;
|
||||
}
|
||||
memcpy(mt->name,name,sizeof(mt->name));
|
||||
listAddNodeTail(ctx->module->types,mt);
|
||||
return mt;
|
||||
@ -3355,6 +3366,36 @@ loaderr:
|
||||
return 0; /* Never reached. */
|
||||
}
|
||||
|
||||
/* Iterate over modules, and trigger rdb aux saving for the ones modules types
|
||||
* who asked for it. */
|
||||
ssize_t rdbSaveModulesAux(rio *rdb, int when) {
|
||||
size_t total_written = 0;
|
||||
dictIterator *di = dictGetIterator(modules);
|
||||
dictEntry *de;
|
||||
|
||||
while ((de = dictNext(di)) != NULL) {
|
||||
struct RedisModule *module = dictGetVal(de);
|
||||
listIter li;
|
||||
listNode *ln;
|
||||
|
||||
listRewind(module->types,&li);
|
||||
while((ln = listNext(&li))) {
|
||||
moduleType *mt = ln->value;
|
||||
if (!mt->aux_save || !(mt->aux_save_triggers & when))
|
||||
continue;
|
||||
ssize_t ret = rdbSaveSingleModuleAux(rdb, when, mt);
|
||||
if (ret==-1) {
|
||||
dictReleaseIterator(di);
|
||||
return -1;
|
||||
}
|
||||
total_written += ret;
|
||||
}
|
||||
}
|
||||
|
||||
dictReleaseIterator(di);
|
||||
return total_written;
|
||||
}
|
||||
|
||||
/* --------------------------------------------------------------------------
|
||||
* Key digest API (DEBUG DIGEST interface for modules types)
|
||||
* -------------------------------------------------------------------------- */
|
||||
|
87
src/rdb.c
87
src/rdb.c
@ -971,7 +971,6 @@ ssize_t rdbSaveObject(rio *rdb, robj *o, robj *key) {
|
||||
RedisModuleIO io;
|
||||
moduleValue *mv = o->ptr;
|
||||
moduleType *mt = mv->type;
|
||||
moduleInitIOContext(io,mt,rdb,key);
|
||||
|
||||
/* Write the "module" identifier as prefix, so that we'll be able
|
||||
* to call the right module during loading. */
|
||||
@ -980,10 +979,13 @@ ssize_t rdbSaveObject(rio *rdb, robj *o, robj *key) {
|
||||
io.bytes += retval;
|
||||
|
||||
/* Then write the module-specific representation + EOF marker. */
|
||||
moduleInitIOContext(io,mt,rdb,key);
|
||||
mt->rdb_save(&io,mv->value);
|
||||
retval = rdbSaveLen(rdb,RDB_MODULE_OPCODE_EOF);
|
||||
if (retval == -1) return -1;
|
||||
io.bytes += retval;
|
||||
if (retval == -1)
|
||||
io.error = 1;
|
||||
else
|
||||
io.bytes += retval;
|
||||
|
||||
if (io.ctx) {
|
||||
moduleFreeContext(io.ctx);
|
||||
@ -1101,6 +1103,40 @@ int rdbSaveInfoAuxFields(rio *rdb, int flags, rdbSaveInfo *rsi) {
|
||||
return 1;
|
||||
}
|
||||
|
||||
ssize_t rdbSaveSingleModuleAux(rio *rdb, int when, moduleType *mt) {
|
||||
/* Save a module-specific aux value. */
|
||||
RedisModuleIO io;
|
||||
int retval = rdbSaveType(rdb, RDB_OPCODE_MODULE_AUX);
|
||||
|
||||
/* Write the "module" identifier as prefix, so that we'll be able
|
||||
* to call the right module during loading. */
|
||||
retval = rdbSaveLen(rdb,mt->id);
|
||||
if (retval == -1) return -1;
|
||||
io.bytes += retval;
|
||||
|
||||
/* write the 'when' so that we can provide it on loading */
|
||||
retval = rdbSaveLen(rdb,when);
|
||||
if (retval == -1) return -1;
|
||||
io.bytes += retval;
|
||||
|
||||
/* Then write the module-specific representation + EOF marker. */
|
||||
moduleInitIOContext(io,mt,rdb,NULL);
|
||||
mt->aux_save(&io,when);
|
||||
retval = rdbSaveLen(rdb,RDB_MODULE_OPCODE_EOF);
|
||||
if (retval == -1)
|
||||
io.error = 1;
|
||||
else
|
||||
io.bytes += retval;
|
||||
|
||||
if (io.ctx) {
|
||||
moduleFreeContext(io.ctx);
|
||||
zfree(io.ctx);
|
||||
}
|
||||
if (io.error)
|
||||
return -1;
|
||||
return io.bytes;
|
||||
}
|
||||
|
||||
/* Produces a dump of the database in RDB format sending it to the specified
|
||||
* Redis I/O channel. On success C_OK is returned, otherwise C_ERR
|
||||
* is returned and part of the output, or all the output, can be
|
||||
@ -1122,6 +1158,7 @@ int rdbSaveRio(rio *rdb, int *error, int flags, rdbSaveInfo *rsi) {
|
||||
snprintf(magic,sizeof(magic),"REDIS%04d",RDB_VERSION);
|
||||
if (rdbWriteRaw(rdb,magic,9) == -1) goto werr;
|
||||
if (rdbSaveInfoAuxFields(rdb,flags,rsi) == -1) goto werr;
|
||||
if (rdbSaveModulesAux(rdb, REDISMODULE_AUX_BEFORE_RDB) == -1) goto werr;
|
||||
|
||||
for (j = 0; j < server.dbnum; j++) {
|
||||
redisDb *db = server.db+j;
|
||||
@ -1183,6 +1220,8 @@ int rdbSaveRio(rio *rdb, int *error, int flags, rdbSaveInfo *rsi) {
|
||||
di = NULL; /* So that we don't release it again on error. */
|
||||
}
|
||||
|
||||
if (rdbSaveModulesAux(rdb, REDISMODULE_AUX_AFTER_RDB) == -1) goto werr;
|
||||
|
||||
/* EOF opcode */
|
||||
if (rdbSaveType(rdb,RDB_OPCODE_EOF) == -1) goto werr;
|
||||
|
||||
@ -2089,15 +2128,11 @@ int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi, int loading_aof) {
|
||||
decrRefCount(auxval);
|
||||
continue; /* Read type again. */
|
||||
} else if (type == RDB_OPCODE_MODULE_AUX) {
|
||||
/* This is just for compatibility with the future: we have plans
|
||||
* to add the ability for modules to store anything in the RDB
|
||||
* file, like data that is not related to the Redis key space.
|
||||
* Such data will potentially be stored both before and after the
|
||||
* RDB keys-values section. For this reason since RDB version 9,
|
||||
* we have the ability to read a MODULE_AUX opcode followed by an
|
||||
* identifier of the module, and a serialized value in "MODULE V2"
|
||||
* format. */
|
||||
/* Load module data that is not related to the Redis key space.
|
||||
* Such data can be potentially be stored both before and after the
|
||||
* RDB keys-values section. */
|
||||
uint64_t moduleid = rdbLoadLen(rdb,NULL);
|
||||
int when = rdbLoadLen(rdb,NULL);
|
||||
if (rioGetReadError(rdb)) goto eoferr;
|
||||
moduleType *mt = moduleTypeLookupModuleByID(moduleid);
|
||||
char name[10];
|
||||
@ -2108,10 +2143,32 @@ int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi, int loading_aof) {
|
||||
serverLog(LL_WARNING,"The RDB file contains AUX module data I can't load: no matching module '%s'", name);
|
||||
exit(1);
|
||||
} else if (!rdbCheckMode && mt != NULL) {
|
||||
/* This version of Redis actually does not know what to do
|
||||
* with modules AUX data... */
|
||||
serverLog(LL_WARNING,"The RDB file contains AUX module data I can't load for the module '%s'. Probably you want to use a newer version of Redis which implements aux data callbacks", name);
|
||||
exit(1);
|
||||
if (!mt->aux_load) {
|
||||
/* Module doesn't support AUX. */
|
||||
serverLog(LL_WARNING,"The RDB file contains module AUX data, but the module '%s' doesn't seem to support it.", name);
|
||||
exit(1);
|
||||
}
|
||||
|
||||
RedisModuleIO io;
|
||||
moduleInitIOContext(io,mt,rdb,NULL);
|
||||
io.ver = 2;
|
||||
/* Call the rdb_load method of the module providing the 10 bit
|
||||
* encoding version in the lower 10 bits of the module ID. */
|
||||
if (mt->aux_load(&io,moduleid&1023, when) || io.error) {
|
||||
moduleTypeNameByID(name,moduleid);
|
||||
serverLog(LL_WARNING,"The RDB file contains module AUX data for the module type '%s', that the responsible module is not able to load. Check for modules log above for additional clues.", name);
|
||||
exit(1);
|
||||
}
|
||||
if (io.ctx) {
|
||||
moduleFreeContext(io.ctx);
|
||||
zfree(io.ctx);
|
||||
}
|
||||
uint64_t eof = rdbLoadLen(rdb,NULL);
|
||||
if (eof != RDB_MODULE_OPCODE_EOF) {
|
||||
serverLog(LL_WARNING,"The RDB file contains module AUX data for the module '%s' that is not terminated by the proper module value EOF marker", name);
|
||||
exit(1);
|
||||
}
|
||||
continue;
|
||||
} else {
|
||||
/* RDB check mode. */
|
||||
robj *aux = rdbLoadCheckModuleValue(rdb,name);
|
||||
|
@ -145,6 +145,7 @@ size_t rdbSavedObjectLen(robj *o);
|
||||
robj *rdbLoadObject(int type, rio *rdb, robj *key);
|
||||
void backgroundSaveDoneHandler(int exitcode, int bysignal);
|
||||
int rdbSaveKeyValuePair(rio *rdb, robj *key, robj *val, long long expiretime);
|
||||
ssize_t rdbSaveSingleModuleAux(rio *rdb, int when, moduleType *mt);
|
||||
robj *rdbLoadStringObject(rio *rdb);
|
||||
ssize_t rdbSaveStringObject(rio *rdb, robj *obj);
|
||||
ssize_t rdbSaveRawString(rio *rdb, unsigned char *s, size_t len);
|
||||
|
@ -129,6 +129,10 @@
|
||||
|
||||
#define REDISMODULE_NOT_USED(V) ((void) V)
|
||||
|
||||
/* Bit flags for aux_save_triggers and the aux_load and aux_save callbacks */
|
||||
#define REDISMODULE_AUX_BEFORE_RDB (1<<0)
|
||||
#define REDISMODULE_AUX_AFTER_RDB (1<<1)
|
||||
|
||||
/* This type represents a timer handle, and is returned when a timer is
|
||||
* registered and used in order to invalidate a timer. It's just a 64 bit
|
||||
* number, because this is how each timer is represented inside the radix tree
|
||||
@ -166,6 +170,8 @@ typedef void (*RedisModuleDisconnectFunc)(RedisModuleCtx *ctx, RedisModuleBlocke
|
||||
typedef int (*RedisModuleNotificationFunc)(RedisModuleCtx *ctx, int type, const char *event, RedisModuleString *key);
|
||||
typedef void *(*RedisModuleTypeLoadFunc)(RedisModuleIO *rdb, int encver);
|
||||
typedef void (*RedisModuleTypeSaveFunc)(RedisModuleIO *rdb, void *value);
|
||||
typedef int (*RedisModuleTypeAuxLoadFunc)(RedisModuleIO *rdb, int encver, int when);
|
||||
typedef void (*RedisModuleTypeAuxSaveFunc)(RedisModuleIO *rdb, int when);
|
||||
typedef void (*RedisModuleTypeRewriteFunc)(RedisModuleIO *aof, RedisModuleString *key, void *value);
|
||||
typedef size_t (*RedisModuleTypeMemUsageFunc)(const void *value);
|
||||
typedef void (*RedisModuleTypeDigestFunc)(RedisModuleDigest *digest, void *value);
|
||||
@ -174,7 +180,7 @@ typedef void (*RedisModuleClusterMessageReceiver)(RedisModuleCtx *ctx, const cha
|
||||
typedef void (*RedisModuleTimerProc)(RedisModuleCtx *ctx, void *data);
|
||||
typedef void (*RedisModuleCommandFilterFunc) (RedisModuleCommandFilterCtx *filter);
|
||||
|
||||
#define REDISMODULE_TYPE_METHOD_VERSION 1
|
||||
#define REDISMODULE_TYPE_METHOD_VERSION 2
|
||||
typedef struct RedisModuleTypeMethods {
|
||||
uint64_t version;
|
||||
RedisModuleTypeLoadFunc rdb_load;
|
||||
@ -183,6 +189,9 @@ typedef struct RedisModuleTypeMethods {
|
||||
RedisModuleTypeMemUsageFunc mem_usage;
|
||||
RedisModuleTypeDigestFunc digest;
|
||||
RedisModuleTypeFreeFunc free;
|
||||
RedisModuleTypeAuxLoadFunc aux_load;
|
||||
RedisModuleTypeAuxSaveFunc aux_save;
|
||||
int aux_save_triggers;
|
||||
} RedisModuleTypeMethods;
|
||||
|
||||
#define REDISMODULE_GET_API(name) \
|
||||
|
10
src/server.h
10
src/server.h
@ -537,6 +537,10 @@ typedef long long mstime_t; /* millisecond time type. */
|
||||
#define REDISMODULE_TYPE_ENCVER(id) (id & REDISMODULE_TYPE_ENCVER_MASK)
|
||||
#define REDISMODULE_TYPE_SIGN(id) ((id & ~((uint64_t)REDISMODULE_TYPE_ENCVER_MASK)) >>REDISMODULE_TYPE_ENCVER_BITS)
|
||||
|
||||
/* Bit flags for moduleTypeAuxSaveFunc */
|
||||
#define REDISMODULE_AUX_BEFORE_RDB (1<<0)
|
||||
#define REDISMODULE_AUX_AFTER_RDB (1<<1)
|
||||
|
||||
struct RedisModule;
|
||||
struct RedisModuleIO;
|
||||
struct RedisModuleDigest;
|
||||
@ -549,6 +553,8 @@ struct redisObject;
|
||||
* is deleted. */
|
||||
typedef void *(*moduleTypeLoadFunc)(struct RedisModuleIO *io, int encver);
|
||||
typedef void (*moduleTypeSaveFunc)(struct RedisModuleIO *io, void *value);
|
||||
typedef int (*moduleTypeAuxLoadFunc)(struct RedisModuleIO *rdb, int encver, int when);
|
||||
typedef void (*moduleTypeAuxSaveFunc)(struct RedisModuleIO *rdb, int when);
|
||||
typedef void (*moduleTypeRewriteFunc)(struct RedisModuleIO *io, struct redisObject *key, void *value);
|
||||
typedef void (*moduleTypeDigestFunc)(struct RedisModuleDigest *digest, void *value);
|
||||
typedef size_t (*moduleTypeMemUsageFunc)(const void *value);
|
||||
@ -565,6 +571,9 @@ typedef struct RedisModuleType {
|
||||
moduleTypeMemUsageFunc mem_usage;
|
||||
moduleTypeDigestFunc digest;
|
||||
moduleTypeFreeFunc free;
|
||||
moduleTypeAuxLoadFunc aux_load;
|
||||
moduleTypeAuxSaveFunc aux_save;
|
||||
int aux_save_triggers;
|
||||
char name[10]; /* 9 bytes name + null term. Charset: A-Z a-z 0-9 _- */
|
||||
} moduleType;
|
||||
|
||||
@ -1530,6 +1539,7 @@ void moduleAcquireGIL(void);
|
||||
void moduleReleaseGIL(void);
|
||||
void moduleNotifyKeyspaceEvent(int type, const char *event, robj *key, int dbid);
|
||||
void moduleCallCommandFilters(client *c);
|
||||
ssize_t rdbSaveModulesAux(rio *rdb, int when);
|
||||
|
||||
/* Utils */
|
||||
long long ustime(void);
|
||||
|
Reference in New Issue
Block a user