mirror of
https://github.com/fluencelabs/redis
synced 2025-06-20 04:26:31 +00:00
finished implementation of notifications. Tests unfinished
This commit is contained in:
137
src/module.c
137
src/module.c
@ -216,6 +216,25 @@ static list *moduleUnblockedClients;
|
||||
* allow thread safe contexts to execute commands at a safe moment. */
|
||||
static pthread_mutex_t moduleGIL = PTHREAD_MUTEX_INITIALIZER;
|
||||
|
||||
|
||||
/* Function pointer type for keyspace event notification subscriptions from modules. */
|
||||
typedef int (*RedisModuleNotificationFunc) (RedisModuleCtx *ctx, int type, const char *event, RedisModuleString *key);
|
||||
|
||||
/* Keyspace notification subscriber information. See RM_SubscribeToKeyspaceEvents */
|
||||
typedef struct RedisModuleKeyspaceSubscriber {
|
||||
/* The module subscribed to the event */
|
||||
RedisModule *module;
|
||||
/* Notification callback in the module*/
|
||||
RedisModuleNotificationFunc notify_callback;
|
||||
/* A bit mask of the events the module is interested in */
|
||||
int event_mask;
|
||||
/* Active flag set on entry, to avoid reentrant subscribers calling themselves */
|
||||
int active;
|
||||
} RedisModuleKeyspaceSubscriber;
|
||||
|
||||
/* The module keyspace notification subscribers list */
|
||||
static list *moduleKeyspaceSubscribers;
|
||||
|
||||
/* --------------------------------------------------------------------------
|
||||
* Prototypes
|
||||
* -------------------------------------------------------------------------- */
|
||||
@ -3669,6 +3688,114 @@ void moduleReleaseGIL(void) {
|
||||
pthread_mutex_unlock(&moduleGIL);
|
||||
}
|
||||
|
||||
|
||||
/* --------------------------------------------------------------------------
|
||||
* Module Keyspace Notifications API
|
||||
* -------------------------------------------------------------------------- */
|
||||
|
||||
/* Subscribe to keyspace notifications. This is a low-level version of the
|
||||
* keyspace-notifications API. A module cand register callbacks to be notified
|
||||
* when keyspce events occur.
|
||||
* Notification events are filtered by their type (string events, set events,
|
||||
* etc), and the subsriber callback receives only events that match a specific
|
||||
* mask of event types.
|
||||
* We do not distinguish between key events and keyspace events, and it is up
|
||||
* to the module to filter the actions taken based on the key.
|
||||
*
|
||||
* The subscriber signature is:
|
||||
*
|
||||
* int (*RedisModuleNotificationFunc) (RedisModuleCtx *ctx, int type,
|
||||
* const char *event,
|
||||
* RedisModuleString *key);
|
||||
*
|
||||
* `type` is the event type bit, that must match the mask given at registration
|
||||
* time. The event string is the actual command being executed, and key is the
|
||||
* relevant Redis key.
|
||||
*
|
||||
* A notification callback gets executed with a redis context that can not be
|
||||
* used to send anything to the client, and has the db number where the event
|
||||
* occured as its selected db number.
|
||||
*
|
||||
* Notice that it is not necessary to enable norifications in redis.conf for
|
||||
* module notifications to work.
|
||||
*
|
||||
* Warning: the notification callbacks are performed in a synchronous manner,
|
||||
* so notification callbacks must to be fast, or they would slow Redis down.
|
||||
* If you need to take long actions, use threads to offload them.
|
||||
*
|
||||
* See https://redis.io/topics/notifications for more information.
|
||||
*/
|
||||
int RM_SubscribeToKeyspaceEvents(RedisModuleCtx *ctx, int types, RedisModuleNotificationFunc callback) {
|
||||
RedisModuleKeyspaceSubscriber *sub = zmalloc(sizeof(*sub));
|
||||
sub->module = ctx->module;
|
||||
sub->event_mask = types;
|
||||
sub->notify_callback = callback;
|
||||
sub->active = 0;
|
||||
|
||||
/* Let the notification system know that modules are interested in notifications */
|
||||
server.notify_keyspace_events |= NOTIFY_MODULE;
|
||||
listAddNodeTail(moduleKeyspaceSubscribers, sub);
|
||||
return REDISMODULE_OK;
|
||||
|
||||
}
|
||||
|
||||
/* Dispatcher for keyspace notifications to module subscriber functions.
|
||||
* This gets called only if at least one module requested to be notified on
|
||||
* keyspace notifications */
|
||||
void moduleNotifyKeyspaceEvent(int type, const char *event, robj *key, int dbid) {
|
||||
listIter li;
|
||||
listNode *ln;
|
||||
|
||||
listRewind(moduleKeyspaceSubscribers,&li);
|
||||
|
||||
/* Remove irrelevant flags from the type mask */
|
||||
type &= ~(NOTIFY_KEYEVENT | NOTIFY_KEYSPACE);
|
||||
|
||||
/* Setup a fake client, so we can have proper db selection when performing
|
||||
* actions. We use one client for all handlers, writing to it will crash */
|
||||
client *c = createClient(-1);
|
||||
c->flags |= CLIENT_MODULE;
|
||||
|
||||
while((ln = listNext(&li))) {
|
||||
RedisModuleKeyspaceSubscriber *sub = ln->value;
|
||||
/* Only notify subscribers on events matching they registration,
|
||||
* and avoid subscribers triggering themselves */
|
||||
if ((sub->event_mask & type) && sub->active == 0) {
|
||||
RedisModuleCtx ctx = REDISMODULE_CTX_INIT;
|
||||
ctx.module = sub->module;
|
||||
selectDb(c, dbid);
|
||||
ctx.client = c;
|
||||
/* mark the handler as activer to avoid reentrant loops.
|
||||
* If the subscriber performs an action triggering itself,
|
||||
* it will not be notified about it. */
|
||||
sub->active = 1;
|
||||
sub->notify_callback(&ctx, type, event, key);
|
||||
sub->active = 0;
|
||||
moduleFreeContext(&ctx);
|
||||
}
|
||||
}
|
||||
freeClient(c);
|
||||
}
|
||||
|
||||
/* Unsubscribe any notification subscirbers this module has upon unloading */
|
||||
void moduleUnsubscribeNotifications(RedisModule *module) {
|
||||
listIter li;
|
||||
listNode *ln;
|
||||
listRewind(moduleKeyspaceSubscribers,&li);
|
||||
while((ln = listNext(&li))) {
|
||||
RedisModuleKeyspaceSubscriber *sub = ln->value;
|
||||
if (sub->module == module) {
|
||||
listDelNode(moduleKeyspaceSubscribers, ln);
|
||||
zfree(sub);
|
||||
}
|
||||
}
|
||||
/* If no subscribers are left - do not call the module norification function */
|
||||
if (listLength(moduleKeyspaceSubscribers) == 0) {
|
||||
server.notify_keyspace_events &= ~NOTIFY_MODULE;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/* --------------------------------------------------------------------------
|
||||
* Modules API internals
|
||||
* -------------------------------------------------------------------------- */
|
||||
@ -3706,9 +3833,11 @@ void moduleRegisterCoreAPI(void);
|
||||
|
||||
void moduleInitModulesSystem(void) {
|
||||
moduleUnblockedClients = listCreate();
|
||||
|
||||
|
||||
server.loadmodule_queue = listCreate();
|
||||
modules = dictCreate(&modulesDictType,NULL);
|
||||
moduleKeyspaceSubscribers = listCreate();
|
||||
|
||||
moduleRegisterCoreAPI();
|
||||
if (pipe(server.module_blocked_pipe) == -1) {
|
||||
serverLog(LL_WARNING,
|
||||
@ -3759,6 +3888,7 @@ void moduleFreeModuleStructure(struct RedisModule *module) {
|
||||
zfree(module);
|
||||
}
|
||||
|
||||
|
||||
void moduleUnregisterCommands(struct RedisModule *module) {
|
||||
/* Unregister all the commands registered by this module. */
|
||||
dictIterator *di = dictGetSafeIterator(server.commands);
|
||||
@ -3819,6 +3949,7 @@ int moduleLoad(const char *path, void **module_argv, int module_argc) {
|
||||
return C_OK;
|
||||
}
|
||||
|
||||
|
||||
/* Unload the module registered with the specified name. On success
|
||||
* C_OK is returned, otherwise C_ERR is returned and errno is set
|
||||
* to the following values depending on the type of error:
|
||||
@ -3840,6 +3971,9 @@ int moduleUnload(sds name) {
|
||||
|
||||
moduleUnregisterCommands(module);
|
||||
|
||||
/* Remvoe any noification subscribers this module might have */
|
||||
moduleUnsubscribeNotifications(module);
|
||||
|
||||
/* Unregister all the hooks. TODO: Yet no hooks support here. */
|
||||
|
||||
/* Unload the dynamic library. */
|
||||
@ -4037,4 +4171,5 @@ void moduleRegisterCoreAPI(void) {
|
||||
REGISTER_API(DigestAddStringBuffer);
|
||||
REGISTER_API(DigestAddLongLong);
|
||||
REGISTER_API(DigestEndSequence);
|
||||
REGISTER_API(SubscribeToKeyspaceEvents);
|
||||
}
|
||||
|
Reference in New Issue
Block a user