Merge branch 'unstable' into module-long-double

This commit is contained in:
Salvatore Sanfilippo
2019-11-19 12:15:45 +01:00
committed by GitHub
29 changed files with 1223 additions and 124 deletions

View File

@ -1412,7 +1412,7 @@ int RM_ReplyWithString(RedisModuleCtx *ctx, RedisModuleString *str) {
int RM_ReplyWithEmptyString(RedisModuleCtx *ctx) {
client *c = moduleGetReplyClient(ctx);
if (c == NULL) return REDISMODULE_OK;
addReplyBulkCBuffer(c, "", 0);
addReply(c,shared.emptybulk);
return REDISMODULE_OK;
}
@ -1427,8 +1427,7 @@ int RM_ReplyWithVerbatimString(RedisModuleCtx *ctx, const char *buf, size_t len)
return REDISMODULE_OK;
}
/* Reply to the client with a NULL. In the RESP protocol a NULL is encoded
* as the string "$-1\r\n".
/* Reply to the client with a NULL.
*
* The function always returns REDISMODULE_OK. */
int RM_ReplyWithNull(RedisModuleCtx *ctx) {
@ -1787,6 +1786,8 @@ int RM_GetSelectedDb(RedisModuleCtx *ctx) {
* * REDISMODULE_CTX_FLAGS_OOM_WARNING: Less than 25% of memory remains before
* reaching the maxmemory level.
*
* * REDISMODULE_CTX_FLAGS_LOADING: Server is loading RDB/AOF
*
* * REDISMODULE_CTX_FLAGS_REPLICA_IS_STALE: No active link with the master.
*
* * REDISMODULE_CTX_FLAGS_REPLICA_IS_CONNECTING: The replica is trying to
@ -1886,6 +1887,18 @@ int RM_SelectDb(RedisModuleCtx *ctx, int newid) {
return (retval == C_OK) ? REDISMODULE_OK : REDISMODULE_ERR;
}
/* Initialize a RedisModuleKey struct */
static void moduleInitKey(RedisModuleKey *kp, RedisModuleCtx *ctx, robj *keyname, robj *value, int mode){
kp->ctx = ctx;
kp->db = ctx->client->db;
kp->key = keyname;
incrRefCount(keyname);
kp->value = value;
kp->iter = NULL;
kp->mode = mode;
zsetKeyReset(kp);
}
/* Return an handle representing a Redis key, so that it is possible
* to call other APIs with the key handle as argument to perform
* operations on the key.
@ -1916,27 +1929,25 @@ void *RM_OpenKey(RedisModuleCtx *ctx, robj *keyname, int mode) {
/* Setup the key handle. */
kp = zmalloc(sizeof(*kp));
kp->ctx = ctx;
kp->db = ctx->client->db;
kp->key = keyname;
incrRefCount(keyname);
kp->value = value;
kp->iter = NULL;
kp->mode = mode;
zsetKeyReset(kp);
moduleInitKey(kp, ctx, keyname, value, mode);
autoMemoryAdd(ctx,REDISMODULE_AM_KEY,kp);
return (void*)kp;
}
/* Close a key handle. */
void RM_CloseKey(RedisModuleKey *key) {
if (key == NULL) return;
/* Destroy a RedisModuleKey struct (freeing is the responsibility of the caller). */
static void moduleCloseKey(RedisModuleKey *key) {
int signal = SHOULD_SIGNAL_MODIFIED_KEYS(key->ctx);
if ((key->mode & REDISMODULE_WRITE) && signal)
signalModifiedKey(key->db,key->key);
/* TODO: if (key->iter) RM_KeyIteratorStop(kp); */
RM_ZsetRangeStop(key);
decrRefCount(key->key);
}
/* Close a key handle. */
void RM_CloseKey(RedisModuleKey *key) {
if (key == NULL) return;
moduleCloseKey(key);
autoMemoryFreed(key->ctx,REDISMODULE_AM_KEY,key);
zfree(key);
}
@ -1988,7 +1999,7 @@ int RM_DeleteKey(RedisModuleKey *key) {
return REDISMODULE_OK;
}
/* If the key is open for writing, unlink it (that is delete it in a
/* If the key is open for writing, unlink it (that is delete it in a
* non-blocking way, not reclaiming memory immediately) and setup the key to
* accept new writes as an empty key (that will be created on demand).
* On success REDISMODULE_OK is returned. If the key is not open for
@ -3148,7 +3159,9 @@ fmterr:
* On success a RedisModuleCallReply object is returned, otherwise
* NULL is returned and errno is set to the following values:
*
* EINVAL: command non existing, wrong arity, wrong format specifier.
* EBADF: wrong format specifier.
* EINVAL: wrong command arity.
* ENOENT: command does not exist.
* EPERM: operation in Cluster instance with key in non local slot.
*
* This API is documented here: https://redis.io/topics/modules-intro
@ -3180,7 +3193,7 @@ RedisModuleCallReply *RM_Call(RedisModuleCtx *ctx, const char *cmdname, const ch
/* We handle the above format error only when the client is setup so that
* we can free it normally. */
if (argv == NULL) {
errno = EINVAL;
errno = EBADF;
goto cleanup;
}
@ -3192,7 +3205,7 @@ RedisModuleCallReply *RM_Call(RedisModuleCtx *ctx, const char *cmdname, const ch
*/
cmd = lookupCommand(c->argv[0]->ptr);
if (!cmd) {
errno = EINVAL;
errno = ENOENT;
goto cleanup;
}
c->cmd = c->lastcmd = cmd;
@ -3922,6 +3935,59 @@ void RM_DigestEndSequence(RedisModuleDigest *md) {
memset(md->o,0,sizeof(md->o));
}
/* Decode a serialized representation of a module data type 'mt' from string
* 'str' and return a newly allocated value, or NULL if decoding failed.
*
* This call basically reuses the 'rdb_load' callback which module data types
* implement in order to allow a module to arbitrarily serialize/de-serialize
* keys, similar to how the Redis 'DUMP' and 'RESTORE' commands are implemented.
*
* Modules should generally use the REDISMODULE_OPTIONS_HANDLE_IO_ERRORS flag and
* make sure the de-serialization code properly checks and handles IO errors
* (freeing allocated buffers and returning a NULL).
*
* If this is NOT done, Redis will handle corrupted (or just truncated) serialized
* data by producing an error message and terminating the process.
*/
void *RM_LoadDataTypeFromString(const RedisModuleString *str, const moduleType *mt) {
rio payload;
RedisModuleIO io;
rioInitWithBuffer(&payload, str->ptr);
moduleInitIOContext(io,(moduleType *)mt,&payload,NULL);
/* All RM_Save*() calls always write a version 2 compatible format, so we
* need to make sure we read the same.
*/
io.ver = 2;
return mt->rdb_load(&io,0);
}
/* Encode a module data type 'mt' value 'data' into serialized form, and return it
* as a newly allocated RedisModuleString.
*
* This call basically reuses the 'rdb_save' callback which module data types
* implement in order to allow a module to arbitrarily serialize/de-serialize
* keys, similar to how the Redis 'DUMP' and 'RESTORE' commands are implemented.
*/
RedisModuleString *RM_SaveDataTypeToString(RedisModuleCtx *ctx, void *data, const moduleType *mt) {
rio payload;
RedisModuleIO io;
rioInitWithBuffer(&payload,sdsempty());
moduleInitIOContext(io,(moduleType *)mt,&payload,NULL);
mt->rdb_save(&io,data);
if (io.error) {
return NULL;
} else {
robj *str = createObject(OBJ_STRING,payload.io.buffer.ptr);
autoMemoryAdd(ctx,REDISMODULE_AM_STRING,str);
return str;
}
}
/* --------------------------------------------------------------------------
* AOF API for modules data types
* -------------------------------------------------------------------------- */
@ -5929,6 +5995,239 @@ int RM_CommandFilterArgDelete(RedisModuleCommandFilterCtx *fctx, int pos)
return REDISMODULE_OK;
}
/* --------------------------------------------------------------------------
* Scanning keyspace and hashes
* -------------------------------------------------------------------------- */
typedef void (*RedisModuleScanCB)(RedisModuleCtx *ctx, RedisModuleString *keyname, RedisModuleKey *key, void *privdata);
typedef struct {
RedisModuleCtx *ctx;
void* user_data;
RedisModuleScanCB fn;
} ScanCBData;
typedef struct RedisModuleScanCursor{
int cursor;
int done;
}RedisModuleScanCursor;
static void moduleScanCallback(void *privdata, const dictEntry *de) {
ScanCBData *data = privdata;
sds key = dictGetKey(de);
robj* val = dictGetVal(de);
RedisModuleString *keyname = createObject(OBJ_STRING,sdsdup(key));
/* Setup the key handle. */
RedisModuleKey kp = {0};
moduleInitKey(&kp, data->ctx, keyname, val, REDISMODULE_READ);
data->fn(data->ctx, keyname, &kp, data->user_data);
moduleCloseKey(&kp);
decrRefCount(keyname);
}
/* Create a new cursor to be used with RedisModule_Scan */
RedisModuleScanCursor *RM_ScanCursorCreate() {
RedisModuleScanCursor* cursor = zmalloc(sizeof(*cursor));
cursor->cursor = 0;
cursor->done = 0;
return cursor;
}
/* Restart an existing cursor. The keys will be rescanned. */
void RM_ScanCursorRestart(RedisModuleScanCursor *cursor) {
cursor->cursor = 0;
cursor->done = 0;
}
/* Destroy the cursor struct. */
void RM_ScanCursorDestroy(RedisModuleScanCursor *cursor) {
zfree(cursor);
}
/* Scan api that allows a module to scan all the keys and value in the selected db.
*
* Callback for scan implementation.
* void scan_callback(RedisModuleCtx *ctx, RedisModuleString *keyname, RedisModuleKey *key, void *privdata);
* - ctx - the redis module context provided to for the scan.
* - keyname - owned by the caller and need to be retained if used after this function.
* - key - holds info on the key and value, it is provided as best effort, in some cases it might
* be NULL, in which case the user should (can) use RedisModule_OpenKey (and CloseKey too).
* when it is provided, it is owned by the caller and will be free when the callback returns.
* - privdata - the user data provided to RedisModule_Scan.
*
* The way it should be used:
* RedisModuleCursor *c = RedisModule_ScanCursorCreate();
* while(RedisModule_Scan(ctx, c, callback, privateData));
* RedisModule_ScanCursorDestroy(c);
*
* It is also possible to use this API from another thread while the lock is acquired durring
* the actuall call to RM_Scan:
* RedisModuleCursor *c = RedisModule_ScanCursorCreate();
* RedisModule_ThreadSafeContextLock(ctx);
* while(RedisModule_Scan(ctx, c, callback, privateData)){
* RedisModule_ThreadSafeContextUnlock(ctx);
* // do some background job
* RedisModule_ThreadSafeContextLock(ctx);
* }
* RedisModule_ScanCursorDestroy(c);
*
* The function will return 1 if there are more elements to scan and 0 otherwise,
* possibly setting errno if the call failed.
* It is also possible to restart and existing cursor using RM_CursorRestart. */
int RM_Scan(RedisModuleCtx *ctx, RedisModuleScanCursor *cursor, RedisModuleScanCB fn, void *privdata) {
if (cursor->done) {
errno = ENOENT;
return 0;
}
int ret = 1;
ScanCBData data = { ctx, privdata, fn };
cursor->cursor = dictScan(ctx->client->db->dict, cursor->cursor, moduleScanCallback, NULL, &data);
if (cursor->cursor == 0) {
cursor->done = 1;
ret = 0;
}
errno = 0;
return ret;
}
typedef void (*RedisModuleScanKeyCB)(RedisModuleKey *key, RedisModuleString *field, RedisModuleString *value, void *privdata);
typedef struct {
RedisModuleKey *key;
void* user_data;
RedisModuleScanKeyCB fn;
} ScanKeyCBData;
static void moduleScanKeyCallback(void *privdata, const dictEntry *de) {
ScanKeyCBData *data = privdata;
sds key = dictGetKey(de);
robj *o = data->key->value;
robj *field = createStringObject(key, sdslen(key));
robj *value = NULL;
if (o->type == OBJ_SET) {
value = NULL;
} else if (o->type == OBJ_HASH) {
sds val = dictGetVal(de);
value = createStringObject(val, sdslen(val));
} else if (o->type == OBJ_ZSET) {
double *val = (double*)dictGetVal(de);
value = createStringObjectFromLongDouble(*val, 0);
}
data->fn(data->key, field, value, data->user_data);
decrRefCount(field);
if (value) decrRefCount(value);
}
/* Scan api that allows a module to scan the elements in a hash, set or sorted set key
*
* Callback for scan implementation.
* void scan_callback(RedisModuleKey *key, RedisModuleString* field, RedisModuleString* value, void *privdata);
* - key - the redis key context provided to for the scan.
* - field - field name, owned by the caller and need to be retained if used
* after this function.
* - value - value string or NULL for set type, owned by the caller and need to
* be retained if used after this function.
* - privdata - the user data provided to RedisModule_ScanKey.
*
* The way it should be used:
* RedisModuleCursor *c = RedisModule_ScanCursorCreate();
* RedisModuleKey *key = RedisModule_OpenKey(...)
* while(RedisModule_ScanKey(key, c, callback, privateData));
* RedisModule_CloseKey(key);
* RedisModule_ScanCursorDestroy(c);
*
* It is also possible to use this API from another thread while the lock is acquired durring
* the actuall call to RM_Scan, and re-opening the key each time:
* RedisModuleCursor *c = RedisModule_ScanCursorCreate();
* RedisModule_ThreadSafeContextLock(ctx);
* RedisModuleKey *key = RedisModule_OpenKey(...)
* while(RedisModule_ScanKey(ctx, c, callback, privateData)){
* RedisModule_CloseKey(key);
* RedisModule_ThreadSafeContextUnlock(ctx);
* // do some background job
* RedisModule_ThreadSafeContextLock(ctx);
* RedisModuleKey *key = RedisModule_OpenKey(...)
* }
* RedisModule_CloseKey(key);
* RedisModule_ScanCursorDestroy(c);
*
* The function will return 1 if there are more elements to scan and 0 otherwise,
* possibly setting errno if the call failed.
* It is also possible to restart and existing cursor using RM_CursorRestart. */
int RM_ScanKey(RedisModuleKey *key, RedisModuleScanCursor *cursor, RedisModuleScanKeyCB fn, void *privdata) {
if (key == NULL || key->value == NULL) {
errno = EINVAL;
return 0;
}
dict *ht = NULL;
robj *o = key->value;
if (o->type == OBJ_SET) {
if (o->encoding == OBJ_ENCODING_HT)
ht = o->ptr;
} else if (o->type == OBJ_HASH) {
if (o->encoding == OBJ_ENCODING_HT)
ht = o->ptr;
} else if (o->type == OBJ_ZSET) {
if (o->encoding == OBJ_ENCODING_SKIPLIST)
ht = ((zset *)o->ptr)->dict;
} else {
errno = EINVAL;
return 0;
}
if (cursor->done) {
errno = ENOENT;
return 0;
}
int ret = 1;
if (ht) {
ScanKeyCBData data = { key, privdata, fn };
cursor->cursor = dictScan(ht, cursor->cursor, moduleScanKeyCallback, NULL, &data);
if (cursor->cursor == 0) {
cursor->done = 1;
ret = 0;
}
} else if (o->type == OBJ_SET && o->encoding == OBJ_ENCODING_INTSET) {
int pos = 0;
int64_t ll;
while(intsetGet(o->ptr,pos++,&ll)) {
robj *field = createStringObjectFromLongLong(ll);
fn(key, field, NULL, privdata);
decrRefCount(field);
}
cursor->cursor = 1;
cursor->done = 1;
ret = 0;
} else if (o->type == OBJ_HASH || o->type == OBJ_ZSET) {
unsigned char *p = ziplistIndex(o->ptr,0);
unsigned char *vstr;
unsigned int vlen;
long long vll;
while(p) {
ziplistGet(p,&vstr,&vlen,&vll);
robj *field = (vstr != NULL) ?
createStringObject((char*)vstr,vlen) :
createStringObjectFromLongLong(vll);
p = ziplistNext(o->ptr,p);
ziplistGet(p,&vstr,&vlen,&vll);
robj *value = (vstr != NULL) ?
createStringObject((char*)vstr,vlen) :
createStringObjectFromLongLong(vll);
fn(key, field, value, privdata);
p = ziplistNext(o->ptr,p);
decrRefCount(field);
decrRefCount(value);
}
cursor->cursor = 1;
cursor->done = 1;
ret = 0;
}
errno = 0;
return ret;
}
/* --------------------------------------------------------------------------
* Module fork API
* -------------------------------------------------------------------------- */
@ -6774,38 +7073,82 @@ size_t moduleCount(void) {
return dictSize(modules);
}
/* Set the key LRU/LFU depending on server.maxmemory_policy.
* The lru_idle arg is idle time in seconds, and is only relevant if the
* eviction policy is LRU based.
* The lfu_freq arg is a logarithmic counter that provides an indication of
* the access frequencyonly (must be <= 255) and is only relevant if the
* eviction policy is LFU based.
* Either or both of them may be <0, in that case, nothing is set. */
/* return value is an indication if the lru field was updated or not. */
int RM_SetLRUOrLFU(RedisModuleKey *key, long long lfu_freq, long long lru_idle) {
/* Set the key last access time for LRU based eviction. not relevent if the
* servers's maxmemory policy is LFU based. Value is idle time in milliseconds.
* returns REDISMODULE_OK if the LRU was updated, REDISMODULE_ERR otherwise. */
int RM_SetLRU(RedisModuleKey *key, mstime_t lru_idle) {
if (!key->value)
return REDISMODULE_ERR;
if (objectSetLRUOrLFU(key->value, lfu_freq, lru_idle, lru_idle>=0 ? LRU_CLOCK() : 0))
if (objectSetLRUOrLFU(key->value, -1, lru_idle, lru_idle>=0 ? LRU_CLOCK() : 0, 1))
return REDISMODULE_OK;
return REDISMODULE_ERR;
}
/* Gets the key LRU or LFU (depending on the current eviction policy).
* One will be set to the appropiate return value, and the other will be set to -1.
* see RedisModule_SetLRUOrLFU for units and ranges.
* return value is an indication of success. */
int RM_GetLRUOrLFU(RedisModuleKey *key, long long *lfu_freq, long long *lru_idle) {
*lru_idle = *lfu_freq = -1;
/* Gets the key last access time.
* Value is idletime in milliseconds or -1 if the server's eviction policy is
* LFU based.
* returns REDISMODULE_OK if when key is valid. */
int RM_GetLRU(RedisModuleKey *key, mstime_t *lru_idle) {
*lru_idle = -1;
if (!key->value)
return REDISMODULE_ERR;
if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) {
*lfu_freq = LFUDecrAndReturn(key->value);
} else {
*lru_idle = estimateObjectIdleTime(key->value)/1000;
}
if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU)
return REDISMODULE_OK;
*lru_idle = estimateObjectIdleTime(key->value);
return REDISMODULE_OK;
}
/* Set the key access frequency. only relevant if the server's maxmemory policy
* is LFU based.
* The frequency is a logarithmic counter that provides an indication of
* the access frequencyonly (must be <= 255).
* returns REDISMODULE_OK if the LFU was updated, REDISMODULE_ERR otherwise. */
int RM_SetLFU(RedisModuleKey *key, long long lfu_freq) {
if (!key->value)
return REDISMODULE_ERR;
if (objectSetLRUOrLFU(key->value, lfu_freq, -1, 0, 1))
return REDISMODULE_OK;
return REDISMODULE_ERR;
}
/* Gets the key access frequency or -1 if the server's eviction policy is not
* LFU based.
* returns REDISMODULE_OK if when key is valid. */
int RM_GetLFU(RedisModuleKey *key, long long *lfu_freq) {
*lfu_freq = -1;
if (!key->value)
return REDISMODULE_ERR;
if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU)
*lfu_freq = LFUDecrAndReturn(key->value);
return REDISMODULE_OK;
}
/* Replace the value assigned to a module type.
*
* The key must be open for writing, have an existing value, and have a moduleType
* that matches the one specified by the caller.
*
* Unlike RM_ModuleTypeSetValue() which will free the old value, this function
* simply swaps the old value with the new value.
*
* The function returns the old value, or NULL if any of the above conditions is
* not met.
*/
void *RM_ModuleTypeReplaceValue(RedisModuleKey *key, moduleType *mt, void *new_value) {
if (!(key->mode & REDISMODULE_WRITE) || key->iter)
return NULL;
if (!key->value || key->value->type != OBJ_MODULE)
return NULL;
moduleValue *mv = key->value->ptr;
if (mv->type != mt)
return NULL;
void *old_val = mv->value;
mv->value = new_value;
return old_val;
}
/* Register all the APIs we export. Keep this function at the end of the
* file so that's easy to seek it to add new entries. */
void moduleRegisterCoreAPI(void) {
@ -6898,6 +7241,7 @@ void moduleRegisterCoreAPI(void) {
REGISTER_API(PoolAlloc);
REGISTER_API(CreateDataType);
REGISTER_API(ModuleTypeSetValue);
REGISTER_API(ModuleTypeReplaceValue);
REGISTER_API(ModuleTypeGetType);
REGISTER_API(ModuleTypeGetValue);
REGISTER_API(IsIOError);
@ -6917,6 +7261,8 @@ void moduleRegisterCoreAPI(void) {
REGISTER_API(LoadFloat);
REGISTER_API(SaveLongDouble);
REGISTER_API(LoadLongDouble);
REGISTER_API(SaveDataTypeToString);
REGISTER_API(LoadDataTypeFromString);
REGISTER_API(EmitAOF);
REGISTER_API(Log);
REGISTER_API(LogIOError);
@ -7007,9 +7353,16 @@ void moduleRegisterCoreAPI(void) {
REGISTER_API(GetClientInfoById);
REGISTER_API(PublishMessage);
REGISTER_API(SubscribeToServerEvent);
REGISTER_API(SetLRUOrLFU);
REGISTER_API(GetLRUOrLFU);
REGISTER_API(SetLRU);
REGISTER_API(GetLRU);
REGISTER_API(SetLFU);
REGISTER_API(GetLFU);
REGISTER_API(BlockClientOnKeys);
REGISTER_API(SignalKeyAsReady);
REGISTER_API(GetBlockedClientReadyKey);
REGISTER_API(ScanCursorCreate);
REGISTER_API(ScanCursorDestroy);
REGISTER_API(ScanCursorRestart);
REGISTER_API(Scan);
REGISTER_API(ScanKey);
}