Add RM_ScanKey to scan hash, set, zset, changes to RM_Scan API

- Adding RM_ScanKey
- Adding tests for RM_ScanKey
- Refactoring RM_Scan API

Changes in RM_Scan
- cleanup in docs and coding convention
- Moving out of experimantal Api
- Adding ctx to scan callback
- Dont use cursor of -1 as an indication of done (can be a valid cursor)
- Set errno when returning 0 for various reasons
- Rename Cursor to ScanCursor
- Test filters key that are not strings, and opens a key if NULL
This commit is contained in:
Oran Agra
2019-11-11 13:30:37 +02:00
committed by meir@redislabs.com
parent 11c6ce812a
commit 0f8692b464
4 changed files with 323 additions and 107 deletions

View File

@ -1848,7 +1848,8 @@ int RM_SelectDb(RedisModuleCtx *ctx, int newid) {
return (retval == C_OK) ? REDISMODULE_OK : REDISMODULE_ERR;
}
static void initializeKey(RedisModuleKey *kp, RedisModuleCtx *ctx, robj *keyname, robj *value, int mode){
/* Initialize a RedisModuleKey struct */
static void moduleInitKey(RedisModuleKey *kp, RedisModuleCtx *ctx, robj *keyname, robj *value, int mode){
kp->ctx = ctx;
kp->db = ctx->client->db;
kp->key = keyname;
@ -1889,12 +1890,13 @@ void *RM_OpenKey(RedisModuleCtx *ctx, robj *keyname, int mode) {
/* Setup the key handle. */
kp = zmalloc(sizeof(*kp));
initializeKey(kp, ctx, keyname, value, mode);
moduleInitKey(kp, ctx, keyname, value, mode);
autoMemoryAdd(ctx,REDISMODULE_AM_KEY,kp);
return (void*)kp;
}
static void closeKeyInternal(RedisModuleKey *key) {
/* Destroy a RedisModuleKey struct (freeing is the responsibility of the caller). */
static void moduleCloseKey(RedisModuleKey *key) {
int signal = SHOULD_SIGNAL_MODIFIED_KEYS(key->ctx);
if ((key->mode & REDISMODULE_WRITE) && signal)
signalModifiedKey(key->db,key->key);
@ -1906,7 +1908,7 @@ static void closeKeyInternal(RedisModuleKey *key) {
/* Close a key handle. */
void RM_CloseKey(RedisModuleKey *key) {
if (key == NULL) return;
closeKeyInternal(key);
moduleCloseKey(key);
autoMemoryFreed(key->ctx,REDISMODULE_AM_KEY,key);
zfree(key);
}
@ -5899,31 +5901,23 @@ int RM_CommandFilterArgDelete(RedisModuleCommandFilterCtx *fctx, int pos)
return REDISMODULE_OK;
}
/**
* Callback for scan implementation.
*
* The keyname is owned by the caller and need to be retained if used after this function.
*
* The kp is the data and provide using the best efforts approach, in some cases it might
* not be available (in such case it will be set to NULL) and it is the user responsibility
* to handle it.
*
* The kp (if given) is owned by the caller and will be free when the callback returns
*
*/
typedef void (*RedisModuleScanCB)(void *privdata, RedisModuleString* keyname, RedisModuleKey* key);
/* --------------------------------------------------------------------------
* Scanning keyspace and hashes
* -------------------------------------------------------------------------- */
typedef void (*RedisModuleScanCB)(RedisModuleCtx *ctx, RedisModuleString *keyname, RedisModuleKey *key, void *privdata);
typedef struct {
RedisModuleCtx *ctx;
void* user_data;
RedisModuleScanCB fn;
} ScanCBData;
typedef struct RedisModuleCursor{
typedef struct RedisModuleScanCursor{
int cursor;
}RedisModuleCursor;
int done;
}RedisModuleScanCursor;
void ScanCallback(void *privdata, const dictEntry *de) {
static void moduleScanCallback(void *privdata, const dictEntry *de) {
ScanCBData *data = privdata;
sds key = dictGetKey(de);
robj* val = dictGetVal(de);
@ -5931,69 +5925,211 @@ void ScanCallback(void *privdata, const dictEntry *de) {
/* Setup the key handle. */
RedisModuleKey kp = {0};
initializeKey(&kp, data->ctx, keyname, val, REDISMODULE_READ);
moduleInitKey(&kp, data->ctx, keyname, val, REDISMODULE_READ);
data->fn(data->user_data, keyname, &kp);
data->fn(data->ctx, keyname, &kp, data->user_data);
closeKeyInternal(&kp);
moduleCloseKey(&kp);
decrRefCount(keyname);
}
/**
* Create a new cursor to scan keys.
*/
RedisModuleCursor* RM_CursorCreate() {
RedisModuleCursor* cursor = zmalloc(sizeof(*cursor));
/* Create a new cursor to be used with RedisModule_Scan */
RedisModuleScanCursor *RM_ScanCursorCreate() {
RedisModuleScanCursor* cursor = zmalloc(sizeof(*cursor));
cursor->cursor = 0;
cursor->done = 0;
return cursor;
}
/**
* Restart an existing cursor. The keys will be rescanned.
*/
void RM_CursorRestart(RedisModuleCursor* cursor) {
/* Restart an existing cursor. The keys will be rescanned. */
void RM_ScanCursorRestart(RedisModuleScanCursor *cursor) {
cursor->cursor = 0;
cursor->done = 0;
}
/**
* Destroy the cursor struct.
*/
void RM_CursorDestroy(RedisModuleCursor* cursor) {
/* Destroy the cursor struct. */
void RM_ScanCursorDestroy(RedisModuleScanCursor *cursor) {
zfree(cursor);
}
/**
* Scan api that allows module writer to scan all the keys and value in redis.
/* Scan api that allows a module to scan all the keys and value in the selected db.
*
* Callback for scan implementation.
* void scan_callback(RedisModuleCtx *ctx, RedisModuleString *keyname, RedisModuleKey *key, void *privdata);
* - ctx - the redis module context provided to for the scan.
* - keyname - owned by the caller and need to be retained if used after this function.
* - key - holds info on the key and value, it is provided as best effort, in some cases it might
* be NULL, in which case the user should (can) use RedisModule_OpenKey (and CloseKey too).
* when it is provided, it is owned by the caller and will be free when the callback returns.
* - privdata - the user data provided to RedisModule_Scan.
*
* The way it should be used:
* Cursor* c = RedisModule_CursorCreate();
* RedisModuleCursor *c = RedisModule_ScanCursorCreate();
* while(RedisModule_Scan(ctx, c, callback, privateData));
* RedisModule_CursorDestroy(c);
* RedisModule_ScanCursorDestroy(c);
*
* It is also possible to use this api from another thread such that the GIL only have to
* be acquired durring the actuall call to RM_Scan:
* Cursor* c = RedisModule_CursorCreate();
* RedisModule_ThreadSafeCtxLock(ctx);
* It is also possible to use this API from another thread while the lock is acquired durring
* the actuall call to RM_Scan:
* RedisModuleCursor *c = RedisModule_ScanCursorCreate();
* RedisModule_ThreadSafeContextLock(ctx);
* while(RedisModule_Scan(ctx, c, callback, privateData)){
* RedisModule_ThreadSafeCtxUnlock(ctx);
* RedisModule_ThreadSafeContextUnlock(ctx);
* // do some background job
* RedisModule_ThreadSafeCtxLock(ctx);
* RedisModule_ThreadSafeContextLock(ctx);
* }
* RedisModule_CursorDestroy(c);
* RedisModule_ScanCursorDestroy(c);
*
* The function will return 1 if there is more elements to scan and 0 otherwise.
* It is also possible to restart and existing cursor using RM_CursorRestart
*/
int RM_Scan(RedisModuleCtx *ctx, RedisModuleCursor* cursor, RedisModuleScanCB fn, void* privdata) {
if(cursor->cursor == -1){
* The function will return 1 if there are more elements to scan and 0 otherwise,
* possibly setting errno if the call failed.
* It is also possible to restart and existing cursor using RM_CursorRestart. */
int RM_Scan(RedisModuleCtx *ctx, RedisModuleScanCursor *cursor, RedisModuleScanCB fn, void *privdata) {
if (cursor->done) {
errno = ENOENT;
return 0;
}
int ret = 1;
ScanCBData data = { ctx, privdata, fn };
cursor->cursor = dictScan(ctx->client->db->dict, cursor->cursor, ScanCallback, NULL, &data);
if (cursor->cursor == 0){
cursor->cursor = -1;
cursor->cursor = dictScan(ctx->client->db->dict, cursor->cursor, moduleScanCallback, NULL, &data);
if (cursor->cursor == 0) {
cursor->done = 1;
ret = 0;
}
errno = 0;
return ret;
}
typedef void (*RedisModuleScanKeyCB)(RedisModuleKey *key, RedisModuleString *field, RedisModuleString *value, void *privdata);
typedef struct {
RedisModuleKey *key;
void* user_data;
RedisModuleScanKeyCB fn;
} ScanKeyCBData;
static void moduleScanKeyCallback(void *privdata, const dictEntry *de) {
ScanKeyCBData *data = privdata;
sds key = dictGetKey(de);
robj *o = data->key->value;
robj *field = createStringObject(key, sdslen(key));
robj *value = NULL;
if (o->type == OBJ_SET) {
value = NULL;
} else if (o->type == OBJ_HASH) {
sds val = dictGetVal(de);
value = createStringObject(val, sdslen(val));
} else if (o->type == OBJ_ZSET) {
double *val = (double*)dictGetVal(de);
value = createStringObjectFromLongDouble(*val, 0);
}
data->fn(data->key, field, value, data->user_data);
decrRefCount(field);
if (value) decrRefCount(value);
}
/* Scan api that allows a module to scan the elements in a hash, set or sorted set key
*
* Callback for scan implementation.
* void scan_callback(RedisModuleKey *key, RedisModuleString* field, RedisModuleString* value, void *privdata);
* - key - the redis key context provided to for the scan.
* - field - field name, owned by the caller and need to be retained if used
* after this function.
* - value - value string or NULL for set type, owned by the caller and need to
* be retained if used after this function.
* - privdata - the user data provided to RedisModule_ScanKey.
*
* The way it should be used:
* RedisModuleCursor *c = RedisModule_ScanCursorCreate();
* RedisModuleKey *key = RedisModule_OpenKey(...)
* while(RedisModule_ScanKey(key, c, callback, privateData));
* RedisModule_CloseKey(key);
* RedisModule_ScanCursorDestroy(c);
*
* It is also possible to use this API from another thread while the lock is acquired durring
* the actuall call to RM_Scan, and re-opening the key each time:
* RedisModuleCursor *c = RedisModule_ScanCursorCreate();
* RedisModule_ThreadSafeContextLock(ctx);
* RedisModuleKey *key = RedisModule_OpenKey(...)
* while(RedisModule_ScanKey(ctx, c, callback, privateData)){
* RedisModule_CloseKey(key);
* RedisModule_ThreadSafeContextUnlock(ctx);
* // do some background job
* RedisModule_ThreadSafeContextLock(ctx);
* RedisModuleKey *key = RedisModule_OpenKey(...)
* }
* RedisModule_CloseKey(key);
* RedisModule_ScanCursorDestroy(c);
*
* The function will return 1 if there are more elements to scan and 0 otherwise,
* possibly setting errno if the call failed.
* It is also possible to restart and existing cursor using RM_CursorRestart. */
int RM_ScanKey(RedisModuleKey *key, RedisModuleScanCursor *cursor, RedisModuleScanKeyCB fn, void *privdata) {
if (key == NULL || key->value == NULL) {
errno = EINVAL;
return 0;
}
dict *ht = NULL;
robj *o = key->value;
if (o->type == OBJ_SET) {
if (o->encoding == OBJ_ENCODING_HT)
ht = o->ptr;
} else if (o->type == OBJ_HASH) {
if (o->encoding == OBJ_ENCODING_HT)
ht = o->ptr;
} else if (o->type == OBJ_ZSET) {
if (o->encoding == OBJ_ENCODING_SKIPLIST)
ht = ((zset *)o->ptr)->dict;
} else {
errno = EINVAL;
return 0;
}
if (cursor->done) {
errno = ENOENT;
return 0;
}
int ret = 1;
if (ht) {
ScanKeyCBData data = { key, privdata, fn };
cursor->cursor = dictScan(ht, cursor->cursor, moduleScanKeyCallback, NULL, &data);
if (cursor->cursor == 0) {
cursor->done = 1;
ret = 0;
}
} else if (o->type == OBJ_SET && o->encoding == OBJ_ENCODING_INTSET) {
int pos = 0;
int64_t ll;
while(intsetGet(o->ptr,pos++,&ll)) {
robj *field = createStringObjectFromLongLong(ll);
fn(key, field, NULL, privdata);
decrRefCount(field);
}
cursor->cursor = 1;
cursor->done = 1;
ret = 0;
} else if (o->type == OBJ_HASH || o->type == OBJ_ZSET) {
unsigned char *p = ziplistIndex(o->ptr,0);
unsigned char *vstr;
unsigned int vlen;
long long vll;
while(p) {
ziplistGet(p,&vstr,&vlen,&vll);
robj *field = (vstr != NULL) ?
createStringObject((char*)vstr,vlen) :
createStringObjectFromLongLong(vll);
p = ziplistNext(o->ptr,p);
ziplistGet(p,&vstr,&vlen,&vll);
robj *value = (vstr != NULL) ?
createStringObject((char*)vstr,vlen) :
createStringObjectFromLongLong(vll);
fn(key, field, value, privdata);
p = ziplistNext(o->ptr,p);
decrRefCount(field);
decrRefCount(value);
}
cursor->cursor = 1;
cursor->done = 1;
ret = 0;
}
errno = 0;
return ret;
}
@ -7076,10 +7212,11 @@ void moduleRegisterCoreAPI(void) {
REGISTER_API(SetLRUOrLFU);
REGISTER_API(GetLRUOrLFU);
REGISTER_API(BlockClientOnKeys);
REGISTER_API(Scan);
REGISTER_API(CursorCreate);
REGISTER_API(CursorDestroy);
REGISTER_API(CursorRestart);
REGISTER_API(SignalKeyAsReady);
REGISTER_API(GetBlockedClientReadyKey);
REGISTER_API(ScanCursorCreate);
REGISTER_API(ScanCursorDestroy);
REGISTER_API(ScanCursorRestart);
REGISTER_API(Scan);
REGISTER_API(ScanKey);
}