Merge branch 'unstable' into module_lru_lfu

This commit is contained in:
Salvatore Sanfilippo
2019-11-04 11:04:49 +01:00
committed by GitHub
20 changed files with 1501 additions and 156 deletions

View File

@ -731,7 +731,7 @@ int loadAppendOnlyFile(char *filename) {
server.aof_state = AOF_OFF;
fakeClient = createAOFClient();
startLoadingFile(fp, filename);
startLoadingFile(fp, filename, RDBFLAGS_AOF_PREAMBLE);
/* Check if this AOF file has an RDB preamble. In that case we need to
* load the RDB file and later continue loading the AOF tail. */
@ -746,7 +746,7 @@ int loadAppendOnlyFile(char *filename) {
serverLog(LL_NOTICE,"Reading RDB preamble from AOF file...");
if (fseek(fp,0,SEEK_SET) == -1) goto readerr;
rioInitWithFile(&rdb,fp);
if (rdbLoadRio(&rdb,NULL,1) != C_OK) {
if (rdbLoadRio(&rdb,RDBFLAGS_AOF_PREAMBLE,NULL) != C_OK) {
serverLog(LL_WARNING,"Error reading the RDB preamble of the AOF file, AOF loading aborted");
goto readerr;
} else {
@ -767,6 +767,7 @@ int loadAppendOnlyFile(char *filename) {
if (!(loops++ % 1000)) {
loadingProgress(ftello(fp));
processEventsWhileBlocked();
processModuleLoadingProgressEvent(1);
}
if (fgets(buf,sizeof(buf),fp) == NULL) {
@ -859,7 +860,7 @@ loaded_ok: /* DB loaded, cleanup and return C_OK to the caller. */
fclose(fp);
freeFakeClient(fakeClient);
server.aof_state = old_aof_state;
stopLoading();
stopLoading(1);
aofUpdateCurrentSize();
server.aof_rewrite_base_size = server.aof_current_size;
server.aof_fsync_offset = server.aof_current_size;
@ -1400,9 +1401,11 @@ int rewriteAppendOnlyFile(char *filename) {
if (server.aof_rewrite_incremental_fsync)
rioSetAutoSync(&aof,REDIS_AUTOSYNC_BYTES);
startSaving(RDBFLAGS_AOF_PREAMBLE);
if (server.aof_use_rdb_preamble) {
int error;
if (rdbSaveRio(&aof,&error,RDB_SAVE_AOF_PREAMBLE,NULL) == C_ERR) {
if (rdbSaveRio(&aof,&error,RDBFLAGS_AOF_PREAMBLE,NULL) == C_ERR) {
errno = error;
goto werr;
}
@ -1465,15 +1468,18 @@ int rewriteAppendOnlyFile(char *filename) {
if (rename(tmpfile,filename) == -1) {
serverLog(LL_WARNING,"Error moving temp append only file on the final destination: %s", strerror(errno));
unlink(tmpfile);
stopSaving(0);
return C_ERR;
}
serverLog(LL_NOTICE,"SYNC append only file rewrite performed");
stopSaving(1);
return C_OK;
werr:
serverLog(LL_WARNING,"Write error writing append only file on disk: %s", strerror(errno));
fclose(fp);
unlink(tmpfile);
stopSaving(0);
return C_ERR;
}

View File

@ -174,6 +174,7 @@ void unblockClient(client *c) {
} else if (c->btype == BLOCKED_WAIT) {
unblockClientWaitingReplicas(c);
} else if (c->btype == BLOCKED_MODULE) {
if (moduleClientIsBlockedOnKeys(c)) unblockClientWaitingData(c);
unblockClientFromModule(c);
} else {
serverPanic("Unknown btype in unblockClient().");
@ -430,6 +431,49 @@ void serveClientsBlockedOnStreamKey(robj *o, readyList *rl) {
}
}
/* Helper function for handleClientsBlockedOnKeys(). This function is called
* in order to check if we can serve clients blocked by modules using
* RM_BlockClientOnKeys(), when the corresponding key was signaled as ready:
* our goal here is to call the RedisModuleBlockedClient reply() callback to
* see if the key is really able to serve the client, and in that case,
* unblock it. */
void serveClientsBlockedOnKeyByModule(readyList *rl) {
dictEntry *de;
/* We serve clients in the same order they blocked for
* this key, from the first blocked to the last. */
de = dictFind(rl->db->blocking_keys,rl->key);
if (de) {
list *clients = dictGetVal(de);
int numclients = listLength(clients);
while(numclients--) {
listNode *clientnode = listFirst(clients);
client *receiver = clientnode->value;
/* Put at the tail, so that at the next call
* we'll not run into it again: clients here may not be
* ready to be served, so they'll remain in the list
* sometimes. We want also be able to skip clients that are
* not blocked for the MODULE type safely. */
listDelNode(clients,clientnode);
listAddNodeTail(clients,receiver);
if (receiver->btype != BLOCKED_MODULE) continue;
/* Note that if *this* client cannot be served by this key,
* it does not mean that another client that is next into the
* list cannot be served as well: they may be blocked by
* different modules with different triggers to consider if a key
* is ready or not. This means we can't exit the loop but need
* to continue after the first failure. */
if (!moduleTryServeClientBlockedOnKey(receiver, rl->key)) continue;
moduleUnblockClient(receiver);
}
}
}
/* This function should be called by Redis every time a single command,
* a MULTI/EXEC block, or a Lua script, terminated its execution after
* being called by a client. It handles serving clients blocked in
@ -480,6 +524,10 @@ void handleClientsBlockedOnKeys(void) {
serveClientsBlockedOnSortedSetKey(o,rl);
else if (o->type == OBJ_STREAM)
serveClientsBlockedOnStreamKey(o,rl);
/* We want to serve clients blocked on module keys
* regardless of the object type: we don't know what the
* module is trying to accomplish right now. */
serveClientsBlockedOnKeyByModule(rl);
}
/* Free this item. */

View File

@ -465,6 +465,29 @@ int getFlushCommandFlags(client *c, int *flags) {
return C_OK;
}
/* Flushes the whole server data set. */
void flushAllDataAndResetRDB(int flags) {
server.dirty += emptyDb(-1,flags,NULL);
if (server.rdb_child_pid != -1) killRDBChild();
if (server.saveparamslen > 0) {
/* Normally rdbSave() will reset dirty, but we don't want this here
* as otherwise FLUSHALL will not be replicated nor put into the AOF. */
int saved_dirty = server.dirty;
rdbSaveInfo rsi, *rsiptr;
rsiptr = rdbPopulateSaveInfo(&rsi);
rdbSave(server.rdb_filename,rsiptr);
server.dirty = saved_dirty;
}
server.dirty++;
#if defined(USE_JEMALLOC)
/* jemalloc 5 doesn't release pages back to the OS when there's no traffic.
* for large databases, flushdb blocks for long anyway, so a bit more won't
* harm and this way the flush and purge will be synchroneus. */
if (!(flags & EMPTYDB_ASYNC))
jemalloc_purge();
#endif
}
/* FLUSHDB [ASYNC]
*
* Flushes the currently SELECTed Redis DB. */
@ -488,28 +511,9 @@ void flushdbCommand(client *c) {
* Flushes the whole server data set. */
void flushallCommand(client *c) {
int flags;
if (getFlushCommandFlags(c,&flags) == C_ERR) return;
server.dirty += emptyDb(-1,flags,NULL);
flushAllDataAndResetRDB(flags);
addReply(c,shared.ok);
if (server.rdb_child_pid != -1) killRDBChild();
if (server.saveparamslen > 0) {
/* Normally rdbSave() will reset dirty, but we don't want this here
* as otherwise FLUSHALL will not be replicated nor put into the AOF. */
int saved_dirty = server.dirty;
rdbSaveInfo rsi, *rsiptr;
rsiptr = rdbPopulateSaveInfo(&rsi);
rdbSave(server.rdb_filename,rsiptr);
server.dirty = saved_dirty;
}
server.dirty++;
#if defined(USE_JEMALLOC)
/* jemalloc 5 doesn't release pages back to the OS when there's no traffic.
* for large databases, flushdb blocks for long anyway, so a bit more won't
* harm and this way the flush and purge will be synchroneus. */
if (!(flags & EMPTYDB_ASYNC))
jemalloc_purge();
#endif
}
/* This command implements DEL and LAZYDEL. */

View File

@ -417,7 +417,7 @@ NULL
}
emptyDb(-1,EMPTYDB_NO_FLAGS,NULL);
protectClient(c);
int ret = rdbLoad(server.rdb_filename,NULL);
int ret = rdbLoad(server.rdb_filename,NULL,RDBFLAGS_NONE);
unprotectClient(c);
if (ret != C_OK) {
addReplyError(c,"Error trying to load the RDB dump");

View File

@ -140,6 +140,9 @@ struct RedisModuleCtx {
void **postponed_arrays; /* To set with RM_ReplySetArrayLength(). */
int postponed_arrays_count; /* Number of entries in postponed_arrays. */
void *blocked_privdata; /* Privdata set when unblocking a client. */
RedisModuleString *blocked_ready_key; /* Key ready when the reply callback
gets called for clients blocked
on keys. */
/* Used if there is the REDISMODULE_CTX_KEYS_POS_REQUEST flag set. */
int *keys_pos;
@ -153,7 +156,7 @@ struct RedisModuleCtx {
};
typedef struct RedisModuleCtx RedisModuleCtx;
#define REDISMODULE_CTX_INIT {(void*)(unsigned long)&RM_GetApi, NULL, NULL, NULL, NULL, 0, 0, 0, NULL, 0, NULL, NULL, 0, NULL, {0}}
#define REDISMODULE_CTX_INIT {(void*)(unsigned long)&RM_GetApi, NULL, NULL, NULL, NULL, 0, 0, 0, NULL, 0, NULL, NULL, NULL, 0, NULL, {0}}
#define REDISMODULE_CTX_MULTI_EMITTED (1<<0)
#define REDISMODULE_CTX_AUTO_MEMORY (1<<1)
#define REDISMODULE_CTX_KEYS_POS_REQUEST (1<<2)
@ -245,6 +248,8 @@ typedef struct RedisModuleBlockedClient {
client *reply_client; /* Fake client used to accumulate replies
in thread safe contexts. */
int dbid; /* Database number selected by the original client. */
int blocked_on_keys; /* If blocked via RM_BlockClientOnKeys(). */
int unblocked; /* Already on the moduleUnblocked list. */
} RedisModuleBlockedClient;
static pthread_mutex_t moduleUnblockedClientsMutex = PTHREAD_MUTEX_INITIALIZER;
@ -1620,6 +1625,27 @@ int modulePopulateClientInfoStructure(void *ci, client *client, int structver) {
return REDISMODULE_OK;
}
/* This is an helper for moduleFireServerEvent() and other functions:
* It populates the replication info structure with the appropriate
* fields depending on the version provided. If the version is not valid
* then REDISMODULE_ERR is returned. Otherwise the function returns
* REDISMODULE_OK and the structure pointed by 'ri' gets populated. */
int modulePopulateReplicationInfoStructure(void *ri, int structver) {
if (structver != 1) return REDISMODULE_ERR;
RedisModuleReplicationInfoV1 *ri1 = ri;
memset(ri1,0,sizeof(*ri1));
ri1->version = structver;
ri1->master = server.masterhost==NULL;
ri1->masterhost = server.masterhost? server.masterhost: "";
ri1->masterport = server.masterport;
ri1->replid1 = server.replid;
ri1->replid2 = server.replid2;
ri1->repl1_offset = server.master_repl_offset;
ri1->repl2_offset = server.second_replid_offset;
return REDISMODULE_OK;
}
/* Return information about the client with the specified ID (that was
* previously obtained via the RedisModule_GetClientId() API). If the
* client exists, REDISMODULE_OK is returned, otherwise REDISMODULE_ERR
@ -1672,6 +1698,15 @@ int RM_GetClientInfoById(void *ci, uint64_t id) {
return modulePopulateClientInfoStructure(ci,client,structver);
}
/* Publish a message to subscribers (see PUBLISH command). */
int RM_PublishMessage(RedisModuleCtx *ctx, RedisModuleString *channel, RedisModuleString *message) {
UNUSED(ctx);
int receivers = pubsubPublishMessage(channel, message);
if (server.cluster_enabled)
clusterPropagatePublish(channel, message);
return receivers;
}
/* Return the currently selected DB. */
int RM_GetSelectedDb(RedisModuleCtx *ctx) {
return ctx->client->db->id;
@ -1960,6 +1995,28 @@ int RM_SetExpire(RedisModuleKey *key, mstime_t expire) {
return REDISMODULE_OK;
}
/* Performs similar operation to FLUSHALL, and optionally start a new AOF file (if enabled)
* If restart_aof is true, you must make sure the command that triggered this call is not
* propagated to the AOF file.
* When async is set to true, db contents will be freed by a background thread. */
void RM_ResetDataset(int restart_aof, int async) {
if (restart_aof && server.aof_state != AOF_OFF) stopAppendOnly();
flushAllDataAndResetRDB(async? EMPTYDB_ASYNC: EMPTYDB_NO_FLAGS);
if (server.aof_enabled && restart_aof) restartAOFAfterSYNC();
}
/* Returns the number of keys in the current db. */
unsigned long long RM_DbSize(RedisModuleCtx *ctx) {
return dictSize(ctx->client->db->dict);
}
/* Returns a name of a random key, or NULL if current db is empty. */
RedisModuleString *RM_RandomKey(RedisModuleCtx *ctx) {
robj *key = dbRandomKey(ctx->client->db);
autoMemoryAdd(ctx,REDISMODULE_AM_STRING,key);
return key;
}
/* --------------------------------------------------------------------------
* Key API for String type
* -------------------------------------------------------------------------- */
@ -4014,6 +4071,94 @@ void unblockClientFromModule(client *c) {
resetClient(c);
}
/* Block a client in the context of a module: this function implements both
* RM_BlockClient() and RM_BlockClientOnKeys() depending on the fact the
* keys are passed or not.
*
* When not blocking for keys, the keys, numkeys, and privdata parameters are
* not needed. The privdata in that case must be NULL, since later is
* RM_UnblockClient() that will provide some private data that the reply
* callback will receive.
*
* Instead when blocking for keys, normally RM_UnblockClient() will not be
* called (because the client will unblock when the key is modified), so
* 'privdata' should be provided in that case, so that once the client is
* unlocked and the reply callback is called, it will receive its associated
* private data.
*
* Even when blocking on keys, RM_UnblockClient() can be called however, but
* in that case the privdata argument is disregarded, because we pass the
* reply callback the privdata that is set here while blocking.
*/
RedisModuleBlockedClient *moduleBlockClient(RedisModuleCtx *ctx, RedisModuleCmdFunc reply_callback, RedisModuleCmdFunc timeout_callback, void (*free_privdata)(RedisModuleCtx*,void*), long long timeout_ms, RedisModuleString **keys, int numkeys, void *privdata) {
client *c = ctx->client;
int islua = c->flags & CLIENT_LUA;
int ismulti = c->flags & CLIENT_MULTI;
c->bpop.module_blocked_handle = zmalloc(sizeof(RedisModuleBlockedClient));
RedisModuleBlockedClient *bc = c->bpop.module_blocked_handle;
ctx->module->blocked_clients++;
/* We need to handle the invalid operation of calling modules blocking
* commands from Lua or MULTI. We actually create an already aborted
* (client set to NULL) blocked client handle, and actually reply with
* an error. */
mstime_t timeout = timeout_ms ? (mstime()+timeout_ms) : 0;
bc->client = (islua || ismulti) ? NULL : c;
bc->module = ctx->module;
bc->reply_callback = reply_callback;
bc->timeout_callback = timeout_callback;
bc->disconnect_callback = NULL; /* Set by RM_SetDisconnectCallback() */
bc->free_privdata = free_privdata;
bc->privdata = privdata;
bc->reply_client = createClient(NULL);
bc->reply_client->flags |= CLIENT_MODULE;
bc->dbid = c->db->id;
bc->blocked_on_keys = keys != NULL;
bc->unblocked = 0;
c->bpop.timeout = timeout;
if (islua || ismulti) {
c->bpop.module_blocked_handle = NULL;
addReplyError(c, islua ?
"Blocking module command called from Lua script" :
"Blocking module command called from transaction");
} else {
if (keys) {
blockForKeys(c,BLOCKED_MODULE,keys,numkeys,timeout,NULL,NULL);
} else {
blockClient(c,BLOCKED_MODULE);
}
}
return bc;
}
/* This function is called from module.c in order to check if a module
* blocked for BLOCKED_MODULE and subtype 'on keys' (bc->blocked_on_keys true)
* can really be unblocked, since the module was able to serve the client.
* If the callback returns REDISMODULE_OK, then the client can be unblocked,
* otherwise the client remains blocked and we'll retry again when one of
* the keys it blocked for becomes "ready" again. */
int moduleTryServeClientBlockedOnKey(client *c, robj *key) {
int served = 0;
RedisModuleBlockedClient *bc = c->bpop.module_blocked_handle;
/* Protect against re-processing: don't serve clients that are already
* in the unblocking list for any reason (including RM_UnblockClient()
* explicit call). */
if (bc->unblocked) return REDISMODULE_ERR;
RedisModuleCtx ctx = REDISMODULE_CTX_INIT;
ctx.flags |= REDISMODULE_CTX_BLOCKED_REPLY;
ctx.blocked_ready_key = key;
ctx.blocked_privdata = bc->privdata;
ctx.module = bc->module;
ctx.client = bc->client;
ctx.blocked_client = bc;
if (bc->reply_callback(&ctx,(void**)c->argv,c->argc) == REDISMODULE_OK)
served = 1;
moduleFreeContext(&ctx);
return served;
}
/* Block a client in the context of a blocking command, returning an handle
* which will be used, later, in order to unblock the client with a call to
* RedisModule_UnblockClient(). The arguments specify callback functions
@ -4031,39 +4176,96 @@ void unblockClientFromModule(client *c) {
* by RedisModule_UnblockClient() call.
*/
RedisModuleBlockedClient *RM_BlockClient(RedisModuleCtx *ctx, RedisModuleCmdFunc reply_callback, RedisModuleCmdFunc timeout_callback, void (*free_privdata)(RedisModuleCtx*,void*), long long timeout_ms) {
client *c = ctx->client;
int islua = c->flags & CLIENT_LUA;
int ismulti = c->flags & CLIENT_MULTI;
return moduleBlockClient(ctx,reply_callback,timeout_callback,free_privdata,timeout_ms, NULL,0,NULL);
}
c->bpop.module_blocked_handle = zmalloc(sizeof(RedisModuleBlockedClient));
RedisModuleBlockedClient *bc = c->bpop.module_blocked_handle;
ctx->module->blocked_clients++;
/* This call is similar to RedisModule_BlockClient(), however in this case we
* don't just block the client, but also ask Redis to unblock it automatically
* once certain keys become "ready", that is, contain more data.
*
* Basically this is similar to what a typical Redis command usually does,
* like BLPOP or ZPOPMAX: the client blocks if it cannot be served ASAP,
* and later when the key receives new data (a list push for instance), the
* client is unblocked and served.
*
* However in the case of this module API, when the client is unblocked?
*
* 1. If you block ok a key of a type that has blocking operations associated,
* like a list, a sorted set, a stream, and so forth, the client may be
* unblocked once the relevant key is targeted by an operation that normally
* unblocks the native blocking operations for that type. So if we block
* on a list key, an RPUSH command may unblock our client and so forth.
* 2. If you are implementing your native data type, or if you want to add new
* unblocking conditions in addition to "1", you can call the modules API
* RedisModule_SignalKeyAsReady().
*
* Anyway we can't be sure if the client should be unblocked just because the
* key is signaled as ready: for instance a successive operation may change the
* key, or a client in queue before this one can be served, modifying the key
* as well and making it empty again. So when a client is blocked with
* RedisModule_BlockClientOnKeys() the reply callback is not called after
* RM_UnblockCLient() is called, but every time a key is signaled as ready:
* if the reply callback can serve the client, it returns REDISMODULE_OK
* and the client is unblocked, otherwise it will return REDISMODULE_ERR
* and we'll try again later.
*
* The reply callback can access the key that was signaled as ready by
* calling the API RedisModule_GetBlockedClientReadyKey(), that returns
* just the string name of the key as a RedisModuleString object.
*
* Thanks to this system we can setup complex blocking scenarios, like
* unblocking a client only if a list contains at least 5 items or other
* more fancy logics.
*
* Note that another difference with RedisModule_BlockClient(), is that here
* we pass the private data directly when blocking the client: it will
* be accessible later in the reply callback. Normally when blocking with
* RedisModule_BlockClient() the private data to reply to the client is
* passed when calling RedisModule_UnblockClient() but here the unblocking
* is performed by Redis itself, so we need to have some private data before
* hand. The private data is used to store any information about the specific
* unblocking operation that you are implementing. Such information will be
* freed using the free_privdata callback provided by the user.
*
* However the reply callback will be able to access the argument vector of
* the command, so the private data is often not needed. */
RedisModuleBlockedClient *RM_BlockClientOnKeys(RedisModuleCtx *ctx, RedisModuleCmdFunc reply_callback, RedisModuleCmdFunc timeout_callback, void (*free_privdata)(RedisModuleCtx*,void*), long long timeout_ms, RedisModuleString **keys, int numkeys, void *privdata) {
return moduleBlockClient(ctx,reply_callback,timeout_callback,free_privdata,timeout_ms, keys,numkeys,privdata);
}
/* We need to handle the invalid operation of calling modules blocking
* commands from Lua or MULTI. We actually create an already aborted
* (client set to NULL) blocked client handle, and actually reply with
* an error. */
bc->client = (islua || ismulti) ? NULL : c;
bc->module = ctx->module;
bc->reply_callback = reply_callback;
bc->timeout_callback = timeout_callback;
bc->disconnect_callback = NULL; /* Set by RM_SetDisconnectCallback() */
bc->free_privdata = free_privdata;
bc->privdata = NULL;
bc->reply_client = createClient(NULL);
bc->reply_client->flags |= CLIENT_MODULE;
bc->dbid = c->db->id;
c->bpop.timeout = timeout_ms ? (mstime()+timeout_ms) : 0;
/* This function is used in order to potentially unblock a client blocked
* on keys with RedisModule_BlockClientOnKeys(). When this function is called,
* all the clients blocked for this key will get their reply callback called,
* and if the callback returns REDISMODULE_OK the client will be unblocked. */
void RM_SignalKeyAsReady(RedisModuleCtx *ctx, RedisModuleString *key) {
signalKeyAsReady(ctx->client->db, key);
}
if (islua || ismulti) {
c->bpop.module_blocked_handle = NULL;
addReplyError(c, islua ?
"Blocking module command called from Lua script" :
"Blocking module command called from transaction");
} else {
blockClient(c,BLOCKED_MODULE);
/* Implements RM_UnblockClient() and moduleUnblockClient(). */
int moduleUnblockClientByHandle(RedisModuleBlockedClient *bc, void *privdata) {
pthread_mutex_lock(&moduleUnblockedClientsMutex);
if (!bc->blocked_on_keys) bc->privdata = privdata;
bc->unblocked = 1;
listAddNodeTail(moduleUnblockedClients,bc);
if (write(server.module_blocked_pipe[1],"A",1) != 1) {
/* Ignore the error, this is best-effort. */
}
return bc;
pthread_mutex_unlock(&moduleUnblockedClientsMutex);
return REDISMODULE_OK;
}
/* This API is used by the Redis core to unblock a client that was blocked
* by a module. */
void moduleUnblockClient(client *c) {
RedisModuleBlockedClient *bc = c->bpop.module_blocked_handle;
moduleUnblockClientByHandle(bc,NULL);
}
/* Return true if the client 'c' was blocked by a module using
* RM_BlockClientOnKeys(). */
int moduleClientIsBlockedOnKeys(client *c) {
RedisModuleBlockedClient *bc = c->bpop.module_blocked_handle;
return bc->blocked_on_keys;
}
/* Unblock a client blocked by `RedisModule_BlockedClient`. This will trigger
@ -4076,15 +4278,25 @@ RedisModuleBlockedClient *RM_BlockClient(RedisModuleCtx *ctx, RedisModuleCmdFunc
* needs to be passed to the client, included but not limited some slow
* to compute reply or some reply obtained via networking.
*
* Note: this function can be called from threads spawned by the module. */
* Note 1: this function can be called from threads spawned by the module.
*
* Note 2: when we unblock a client that is blocked for keys using
* the API RedisModule_BlockClientOnKeys(), the privdata argument here is
* not used, and the reply callback is called with the privdata pointer that
* was passed when blocking the client.
*
* Unblocking a client that was blocked for keys using this API will still
* require the client to get some reply, so the function will use the
* "timeout" handler in order to do so. */
int RM_UnblockClient(RedisModuleBlockedClient *bc, void *privdata) {
pthread_mutex_lock(&moduleUnblockedClientsMutex);
bc->privdata = privdata;
listAddNodeTail(moduleUnblockedClients,bc);
if (write(server.module_blocked_pipe[1],"A",1) != 1) {
/* Ignore the error, this is best-effort. */
if (bc->blocked_on_keys) {
/* In theory the user should always pass the timeout handler as an
* argument, but better to be safe than sorry. */
if (bc->timeout_callback == NULL) return REDISMODULE_ERR;
if (bc->unblocked) return REDISMODULE_OK;
if (bc->client) moduleBlockedClientTimedOut(bc->client);
}
pthread_mutex_unlock(&moduleUnblockedClientsMutex);
moduleUnblockClientByHandle(bc,privdata);
return REDISMODULE_OK;
}
@ -4144,16 +4356,19 @@ void moduleHandleBlockedClients(void) {
* touch the shared list. */
/* Call the reply callback if the client is valid and we have
* any callback. */
if (c && bc->reply_callback) {
* any callback. However the callback is not called if the client
* was blocked on keys (RM_BlockClientOnKeys()), because we already
* called such callback in moduleTryServeClientBlockedOnKey() when
* the key was signaled as ready. */
if (c && !bc->blocked_on_keys && bc->reply_callback) {
RedisModuleCtx ctx = REDISMODULE_CTX_INIT;
ctx.flags |= REDISMODULE_CTX_BLOCKED_REPLY;
ctx.blocked_privdata = bc->privdata;
ctx.blocked_ready_key = NULL;
ctx.module = bc->module;
ctx.client = bc->client;
ctx.blocked_client = bc;
bc->reply_callback(&ctx,(void**)c->argv,c->argc);
moduleHandlePropagationAfterCommandCallback(&ctx);
moduleFreeContext(&ctx);
}
@ -4241,6 +4456,12 @@ void *RM_GetBlockedClientPrivateData(RedisModuleCtx *ctx) {
return ctx->blocked_privdata;
}
/* Get the key that is ready when the reply callback is called in the context
* of a client blocked by RedisModule_BlockClientOnKeys(). */
RedisModuleString *RM_GetBlockedClientReadyKey(RedisModuleCtx *ctx) {
return ctx->blocked_ready_key;
}
/* Get the blocked client associated with a given context.
* This is useful in the reply and timeout callbacks of blocked clients,
* before sometimes the module has the blocked client handle references
@ -4295,7 +4516,7 @@ RedisModuleCtx *RM_GetThreadSafeContext(RedisModuleBlockedClient *bc) {
ctx->client = createClient(NULL);
if (bc) {
selectDb(ctx->client,bc->dbid);
ctx->client->id = bc->client->id;
if (bc->client) ctx->client->id = bc->client->id;
}
return ctx;
}
@ -5781,8 +6002,8 @@ void ModuleForkDoneHandler(int exitcode, int bysignal) {
*
* The following sub events are available:
*
* REDISMODULE_EVENT_REPLROLECHANGED_NOW_MASTER
* REDISMODULE_EVENT_REPLROLECHANGED_NOW_REPLICA
* REDISMODULE_SUBEVENT_REPLROLECHANGED_NOW_MASTER
* REDISMODULE_SUBEVENT_REPLROLECHANGED_NOW_REPLICA
*
* The 'data' field can be casted by the callback to a
* RedisModuleReplicationInfo structure with the following fields:
@ -5792,24 +6013,30 @@ void ModuleForkDoneHandler(int exitcode, int bysignal) {
* int masterport; // master instance port for NOW_REPLICA
* char *replid1; // Main replication ID
* char *replid2; // Secondary replication ID
* uint64_t repl1_offset; // Main replication offset
* uint64_t repl2_offset; // Offset of replid2 validity
* uint64_t main_repl_offset; // Replication offset
*
* RedisModuleEvent_Persistence
*
* This event is called when RDB saving or AOF rewriting starts
* and ends. The following sub events are available:
*
* REDISMODULE_EVENT_LOADING_RDB_START // BGSAVE start
* REDISMODULE_EVENT_LOADING_RDB_END // BGSAVE end
* REDISMODULE_EVENT_LOADING_SYNC_RDB_START // SAVE start
* REDISMODULE_EVENT_LOADING_SYNC_RDB_START // SAVE end
* REDISMODULE_EVENT_LOADING_AOF_START // AOF rewrite start
* REDISMODULE_EVENT_LOADING_AOF_END // AOF rewrite end
* REDISMODULE_SUBEVENT_PERSISTENCE_RDB_START
* REDISMODULE_SUBEVENT_PERSISTENCE_AOF_START
* REDISMODULE_SUBEVENT_PERSISTENCE_SYNC_RDB_START
* REDISMODULE_SUBEVENT_PERSISTENCE_ENDED
* REDISMODULE_SUBEVENT_PERSISTENCE_FAILED
*
* The above events are triggered not just when the user calls the
* relevant commands like BGSAVE, but also when a saving operation
* or AOF rewriting occurs because of internal server triggers.
* The SYNC_RDB_START sub events are happening in the forground due to
* SAVE command, FLUSHALL, or server shutdown, and the other RDB and
* AOF sub events are executed in a background fork child, so any
* action the module takes can only affect the generated AOF or RDB,
* but will not be reflected in the parent process and affect connected
* clients and commands. Also note that the AOF_START sub event may end
* up saving RDB content in case of an AOF with rdb-preamble.
*
* RedisModuleEvent_FlushDB
*
@ -5817,8 +6044,8 @@ void ModuleForkDoneHandler(int exitcode, int bysignal) {
* because of replication, after the replica synchronization)
* happened. The following sub events are available:
*
* REDISMODULE_EVENT_FLUSHDB_START
* REDISMODULE_EVENT_FLUSHDB_END
* REDISMODULE_SUBEVENT_FLUSHDB_START
* REDISMODULE_SUBEVENT_FLUSHDB_END
*
* The data pointer can be casted to a RedisModuleFlushInfo
* structure with the following fields:
@ -5842,12 +6069,15 @@ void ModuleForkDoneHandler(int exitcode, int bysignal) {
* replica is loading the RDB file from the master.
* The following sub events are available:
*
* REDISMODULE_EVENT_LOADING_RDB_START
* REDISMODULE_EVENT_LOADING_RDB_END
* REDISMODULE_EVENT_LOADING_MASTER_RDB_START
* REDISMODULE_EVENT_LOADING_MASTER_RDB_END
* REDISMODULE_EVENT_LOADING_AOF_START
* REDISMODULE_EVENT_LOADING_AOF_END
* REDISMODULE_SUBEVENT_LOADING_RDB_START
* REDISMODULE_SUBEVENT_LOADING_AOF_START
* REDISMODULE_SUBEVENT_LOADING_REPL_START
* REDISMODULE_SUBEVENT_LOADING_ENDED
* REDISMODULE_SUBEVENT_LOADING_FAILED
*
* Note that AOF loading may start with an RDB data in case of
* rdb-preamble, in which case you'll only recieve an AOF_START event.
*
*
* RedisModuleEvent_ClientChange
*
@ -5856,8 +6086,8 @@ void ModuleForkDoneHandler(int exitcode, int bysignal) {
* structure, documented in RedisModule_GetClientInfoById().
* The following sub events are available:
*
* REDISMODULE_EVENT_CLIENT_CHANGE_CONNECTED
* REDISMODULE_EVENT_CLIENT_CHANGE_DISCONNECTED
* REDISMODULE_SUBEVENT_CLIENT_CHANGE_CONNECTED
* REDISMODULE_SUBEVENT_CLIENT_CHANGE_DISCONNECTED
*
* RedisModuleEvent_Shutdown
*
@ -5870,8 +6100,8 @@ void ModuleForkDoneHandler(int exitcode, int bysignal) {
* replica since it gets disconnected.
* The following sub events are availble:
*
* REDISMODULE_EVENT_REPLICA_CHANGE_ONLINE
* REDISMODULE_EVENT_REPLICA_CHANGE_OFFLINE
* REDISMODULE_SUBEVENT_REPLICA_CHANGE_ONLINE
* REDISMODULE_SUBEVENT_REPLICA_CHANGE_OFFLINE
*
* No additional information is available so far: future versions
* of Redis will have an API in order to enumerate the replicas
@ -5886,6 +6116,11 @@ void ModuleForkDoneHandler(int exitcode, int bysignal) {
* this changes depending on the "hz" configuration.
* No sub events are available.
*
* The data pointer can be casted to a RedisModuleCronLoop
* structure with the following fields:
*
* int32_t hz; // Approximate number of events per second.
*
* RedisModuleEvent_MasterLinkChange
*
* This is called for replicas in order to notify when the
@ -5895,8 +6130,38 @@ void ModuleForkDoneHandler(int exitcode, int bysignal) {
* replication is happening correctly.
* The following sub events are available:
*
* REDISMODULE_EVENT_MASTER_LINK_UP
* REDISMODULE_EVENT_MASTER_LINK_DOWN
* REDISMODULE_SUBEVENT_MASTER_LINK_UP
* REDISMODULE_SUBEVENT_MASTER_LINK_DOWN
*
* RedisModuleEvent_ModuleChange
*
* This event is called when a new module is loaded or one is unloaded.
* The following sub events are availble:
*
* REDISMODULE_SUBEVENT_MODULE_LOADED
* REDISMODULE_SUBEVENT_MODULE_UNLOADED
*
* The data pointer can be casted to a RedisModuleModuleChange
* structure with the following fields:
*
* const char* module_name; // Name of module loaded or unloaded.
* int32_t module_version; // Module version.
*
* RedisModuleEvent_LoadingProgress
*
* This event is called repeatedly called while an RDB or AOF file
* is being loaded.
* The following sub events are availble:
*
* REDISMODULE_SUBEVENT_LOADING_PROGRESS_RDB
* REDISMODULE_SUBEVENT_LOADING_PROGRESS_AOF
*
* The data pointer can be casted to a RedisModuleLoadingProgress
* structure with the following fields:
*
* int32_t hz; // Approximate number of events per second.
* int32_t progress; // Approximate progress between 0 and 1024,
* or -1 if unknown.
*
* The function returns REDISMODULE_OK if the module was successfully subscrived
* for the specified event. If the API is called from a wrong context then
@ -5955,7 +6220,7 @@ void moduleFireServerEvent(uint64_t eid, int subid, void *data) {
listRewind(RedisModule_EventListeners,&li);
while((ln = listNext(&li))) {
RedisModuleEventListener *el = ln->value;
if (el->event.id == eid && !el->module->in_hook) {
if (el->event.id == eid) {
RedisModuleCtx ctx = REDISMODULE_CTX_INIT;
ctx.module = el->module;
@ -5969,6 +6234,8 @@ void moduleFireServerEvent(uint64_t eid, int subid, void *data) {
void *moduledata = NULL;
RedisModuleClientInfoV1 civ1;
RedisModuleReplicationInfoV1 riv1;
RedisModuleModuleChangeV1 mcv1;
/* Start at DB zero by default when calling the handler. It's
* up to the specific event setup to change it when it makes
* sense. For instance for FLUSHDB events we select the correct
@ -5980,11 +6247,26 @@ void moduleFireServerEvent(uint64_t eid, int subid, void *data) {
modulePopulateClientInfoStructure(&civ1,data,
el->event.dataver);
moduledata = &civ1;
} else if (eid == REDISMODULE_EVENT_REPLICATION_ROLE_CHANGED) {
modulePopulateReplicationInfoStructure(&riv1,el->event.dataver);
moduledata = &riv1;
} else if (eid == REDISMODULE_EVENT_FLUSHDB) {
moduledata = data;
RedisModuleFlushInfoV1 *fi = data;
if (fi->dbnum != -1)
selectDb(ctx.client, fi->dbnum);
} else if (eid == REDISMODULE_EVENT_MODULE_CHANGE) {
RedisModule *m = data;
if (m == el->module)
continue;
mcv1.version = REDISMODULE_MODULE_CHANGE_VERSION;
mcv1.module_name = m->name;
mcv1.module_version = m->ver;
moduledata = &mcv1;
} else if (eid == REDISMODULE_EVENT_LOADING_PROGRESS) {
moduledata = data;
} else if (eid == REDISMODULE_EVENT_CRON_LOOP) {
moduledata = data;
}
ModulesInHooks++;
@ -6016,6 +6298,27 @@ void moduleUnsubscribeAllServerEvents(RedisModule *module) {
}
}
void processModuleLoadingProgressEvent(int is_aof) {
long long now = ustime();
static long long next_event = 0;
if (now >= next_event) {
/* Fire the loading progress modules end event. */
int progress = -1;
if (server.loading_total_bytes)
progress = (server.loading_total_bytes<<10) / server.loading_total_bytes;
RedisModuleFlushInfoV1 fi = {REDISMODULE_LOADING_PROGRESS_VERSION,
server.hz,
progress};
moduleFireServerEvent(REDISMODULE_EVENT_LOADING_PROGRESS,
is_aof?
REDISMODULE_SUBEVENT_LOADING_PROGRESS_AOF:
REDISMODULE_SUBEVENT_LOADING_PROGRESS_RDB,
&fi);
/* decide when the next event should fire. */
next_event = now + 1000000 / server.hz;
}
}
/* --------------------------------------------------------------------------
* Modules API internals
* -------------------------------------------------------------------------- */
@ -6184,6 +6487,11 @@ int moduleLoad(const char *path, void **module_argv, int module_argc) {
ctx.module->blocked_clients = 0;
ctx.module->handle = handle;
serverLog(LL_NOTICE,"Module '%s' loaded from %s",ctx.module->name,path);
/* Fire the loaded modules event. */
moduleFireServerEvent(REDISMODULE_EVENT_MODULE_CHANGE,
REDISMODULE_SUBEVENT_MODULE_LOADED,
ctx.module);
moduleFreeContext(&ctx);
return C_OK;
}
@ -6246,6 +6554,11 @@ int moduleUnload(sds name) {
module->name, error);
}
/* Fire the unloaded modules event. */
moduleFireServerEvent(REDISMODULE_EVENT_MODULE_CHANGE,
REDISMODULE_SUBEVENT_MODULE_UNLOADED,
module);
/* Remove from list of modules. */
serverLog(LL_NOTICE,"Module %s unloaded",module->name);
dictDelete(modules,module->name);
@ -6494,6 +6807,9 @@ void moduleRegisterCoreAPI(void) {
REGISTER_API(StringTruncate);
REGISTER_API(SetExpire);
REGISTER_API(GetExpire);
REGISTER_API(ResetDataset);
REGISTER_API(DbSize);
REGISTER_API(RandomKey);
REGISTER_API(ZsetAdd);
REGISTER_API(ZsetIncrby);
REGISTER_API(ZsetScore);
@ -6621,7 +6937,11 @@ void moduleRegisterCoreAPI(void) {
REGISTER_API(InfoAddFieldLongLong);
REGISTER_API(InfoAddFieldULongLong);
REGISTER_API(GetClientInfoById);
REGISTER_API(PublishMessage);
REGISTER_API(SubscribeToServerEvent);
REGISTER_API(SetLRUOrLFU);
REGISTER_API(GetLRUOrLFU);
REGISTER_API(BlockClientOnKeys);
REGISTER_API(SignalKeyAsReady);
REGISTER_API(GetBlockedClientReadyKey);
}

View File

@ -129,6 +129,7 @@ int HelloTypeInsert_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv,
/* Insert the new element. */
HelloTypeInsert(hto,value);
RedisModule_SignalKeyAsReady(ctx,argv[1]);
RedisModule_ReplyWithLongLong(ctx,hto->len);
RedisModule_ReplicateVerbatim(ctx);
@ -190,6 +191,77 @@ int HelloTypeLen_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int
return REDISMODULE_OK;
}
/* ====================== Example of a blocking command ==================== */
/* Reply callback for blocking command HELLOTYPE.BRANGE, this will get
* called when the key we blocked for is ready: we need to check if we
* can really serve the client, and reply OK or ERR accordingly. */
int HelloBlock_Reply(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
REDISMODULE_NOT_USED(argv);
REDISMODULE_NOT_USED(argc);
RedisModuleString *keyname = RedisModule_GetBlockedClientReadyKey(ctx);
RedisModuleKey *key = RedisModule_OpenKey(ctx,keyname,REDISMODULE_READ);
int type = RedisModule_KeyType(key);
if (type != REDISMODULE_KEYTYPE_MODULE ||
RedisModule_ModuleTypeGetType(key) != HelloType)
{
RedisModule_CloseKey(key);
return REDISMODULE_ERR;
}
/* In case the key is able to serve our blocked client, let's directly
* use our original command implementation to make this example simpler. */
RedisModule_CloseKey(key);
return HelloTypeRange_RedisCommand(ctx,argv,argc-1);
}
/* Timeout callback for blocking command HELLOTYPE.BRANGE */
int HelloBlock_Timeout(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
REDISMODULE_NOT_USED(argv);
REDISMODULE_NOT_USED(argc);
return RedisModule_ReplyWithSimpleString(ctx,"Request timedout");
}
/* Private data freeing callback for HELLOTYPE.BRANGE command. */
void HelloBlock_FreeData(RedisModuleCtx *ctx, void *privdata) {
REDISMODULE_NOT_USED(ctx);
RedisModule_Free(privdata);
}
/* HELLOTYPE.BRANGE key first count timeout -- This is a blocking verison of
* the RANGE operation, in order to show how to use the API
* RedisModule_BlockClientOnKeys(). */
int HelloTypeBRange_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
if (argc != 5) return RedisModule_WrongArity(ctx);
RedisModule_AutoMemory(ctx); /* Use automatic memory management. */
RedisModuleKey *key = RedisModule_OpenKey(ctx,argv[1],
REDISMODULE_READ|REDISMODULE_WRITE);
int type = RedisModule_KeyType(key);
if (type != REDISMODULE_KEYTYPE_EMPTY &&
RedisModule_ModuleTypeGetType(key) != HelloType)
{
return RedisModule_ReplyWithError(ctx,REDISMODULE_ERRORMSG_WRONGTYPE);
}
/* Parse the timeout before even trying to serve the client synchronously,
* so that we always fail ASAP on syntax errors. */
long long timeout;
if (RedisModule_StringToLongLong(argv[4],&timeout) != REDISMODULE_OK) {
return RedisModule_ReplyWithError(ctx,
"ERR invalid timeout parameter");
}
/* Can we serve the reply synchronously? */
if (type != REDISMODULE_KEYTYPE_EMPTY) {
return HelloTypeRange_RedisCommand(ctx,argv,argc-1);
}
/* Otherwise let's block on the key. */
void *privdata = RedisModule_Alloc(100);
RedisModule_BlockClientOnKeys(ctx,HelloBlock_Reply,HelloBlock_Timeout,HelloBlock_FreeData,timeout,argv+1,1,privdata);
return REDISMODULE_OK;
}
/* ========================== "hellotype" type methods ======================= */
@ -282,5 +354,9 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
HelloTypeLen_RedisCommand,"readonly",1,1,1) == REDISMODULE_ERR)
return REDISMODULE_ERR;
if (RedisModule_CreateCommand(ctx,"hellotype.brange",
HelloTypeBRange_RedisCommand,"readonly",1,1,1) == REDISMODULE_ERR)
return REDISMODULE_ERR;
return REDISMODULE_OK;
}

View File

@ -1118,6 +1118,11 @@ void freeClient(client *c) {
if (c->flags & CLIENT_SLAVE && listLength(server.slaves) == 0)
server.repl_no_slaves_since = server.unixtime;
refreshGoodSlavesCount();
/* Fire the replica change modules event. */
if (c->replstate == SLAVE_STATE_ONLINE)
moduleFireServerEvent(REDISMODULE_EVENT_REPLICA_CHANGE,
REDISMODULE_SUBEVENT_REPLICA_CHANGE_OFFLINE,
NULL);
}
/* Master/slave cleanup Case 2:

View File

@ -1080,9 +1080,9 @@ ssize_t rdbSaveAuxFieldStrInt(rio *rdb, char *key, long long val) {
}
/* Save a few default AUX fields with information about the RDB generated. */
int rdbSaveInfoAuxFields(rio *rdb, int flags, rdbSaveInfo *rsi) {
int rdbSaveInfoAuxFields(rio *rdb, int rdbflags, rdbSaveInfo *rsi) {
int redis_bits = (sizeof(void*) == 8) ? 64 : 32;
int aof_preamble = (flags & RDB_SAVE_AOF_PREAMBLE) != 0;
int aof_preamble = (rdbflags & RDBFLAGS_AOF_PREAMBLE) != 0;
/* Add a few fields about the state when the RDB was created. */
if (rdbSaveAuxFieldStrStr(rdb,"redis-ver",REDIS_VERSION) == -1) return -1;
@ -1150,7 +1150,7 @@ ssize_t rdbSaveSingleModuleAux(rio *rdb, int when, moduleType *mt) {
* When the function returns C_ERR and if 'error' is not NULL, the
* integer pointed by 'error' is set to the value of errno just after the I/O
* error. */
int rdbSaveRio(rio *rdb, int *error, int flags, rdbSaveInfo *rsi) {
int rdbSaveRio(rio *rdb, int *error, int rdbflags, rdbSaveInfo *rsi) {
dictIterator *di = NULL;
dictEntry *de;
char magic[10];
@ -1162,7 +1162,7 @@ int rdbSaveRio(rio *rdb, int *error, int flags, rdbSaveInfo *rsi) {
rdb->update_cksum = rioGenericUpdateChecksum;
snprintf(magic,sizeof(magic),"REDIS%04d",RDB_VERSION);
if (rdbWriteRaw(rdb,magic,9) == -1) goto werr;
if (rdbSaveInfoAuxFields(rdb,flags,rsi) == -1) goto werr;
if (rdbSaveInfoAuxFields(rdb,rdbflags,rsi) == -1) goto werr;
if (rdbSaveModulesAux(rdb, REDISMODULE_AUX_BEFORE_RDB) == -1) goto werr;
for (j = 0; j < server.dbnum; j++) {
@ -1199,7 +1199,7 @@ int rdbSaveRio(rio *rdb, int *error, int flags, rdbSaveInfo *rsi) {
/* When this RDB is produced as part of an AOF rewrite, move
* accumulated diff from parent to child while rewriting in
* order to have a smaller final write. */
if (flags & RDB_SAVE_AOF_PREAMBLE &&
if (rdbflags & RDBFLAGS_AOF_PREAMBLE &&
rdb->processed_bytes > processed+AOF_READ_DIFF_INTERVAL_BYTES)
{
processed = rdb->processed_bytes;
@ -1254,18 +1254,21 @@ werr:
int rdbSaveRioWithEOFMark(rio *rdb, int *error, rdbSaveInfo *rsi) {
char eofmark[RDB_EOF_MARK_SIZE];
startSaving(RDBFLAGS_REPLICATION);
getRandomHexChars(eofmark,RDB_EOF_MARK_SIZE);
if (error) *error = 0;
if (rioWrite(rdb,"$EOF:",5) == 0) goto werr;
if (rioWrite(rdb,eofmark,RDB_EOF_MARK_SIZE) == 0) goto werr;
if (rioWrite(rdb,"\r\n",2) == 0) goto werr;
if (rdbSaveRio(rdb,error,RDB_SAVE_NONE,rsi) == C_ERR) goto werr;
if (rdbSaveRio(rdb,error,RDBFLAGS_NONE,rsi) == C_ERR) goto werr;
if (rioWrite(rdb,eofmark,RDB_EOF_MARK_SIZE) == 0) goto werr;
stopSaving(1);
return C_OK;
werr: /* Write error. */
/* Set 'error' only if not already set by rdbSaveRio() call. */
if (error && *error == 0) *error = errno;
stopSaving(0);
return C_ERR;
}
@ -1291,11 +1294,12 @@ int rdbSave(char *filename, rdbSaveInfo *rsi) {
}
rioInitWithFile(&rdb,fp);
startSaving(RDBFLAGS_NONE);
if (server.rdb_save_incremental_fsync)
rioSetAutoSync(&rdb,REDIS_AUTOSYNC_BYTES);
if (rdbSaveRio(&rdb,&error,RDB_SAVE_NONE,rsi) == C_ERR) {
if (rdbSaveRio(&rdb,&error,RDBFLAGS_NONE,rsi) == C_ERR) {
errno = error;
goto werr;
}
@ -1317,6 +1321,7 @@ int rdbSave(char *filename, rdbSaveInfo *rsi) {
cwdp ? cwdp : "unknown",
strerror(errno));
unlink(tmpfile);
stopSaving(0);
return C_ERR;
}
@ -1324,12 +1329,14 @@ int rdbSave(char *filename, rdbSaveInfo *rsi) {
server.dirty = 0;
server.lastsave = time(NULL);
server.lastbgsave_status = C_OK;
stopSaving(1);
return C_OK;
werr:
serverLog(LL_WARNING,"Write error saving DB on disk: %s", strerror(errno));
fclose(fp);
unlink(tmpfile);
stopSaving(0);
return C_ERR;
}
@ -1918,23 +1925,33 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, robj *key) {
/* Mark that we are loading in the global state and setup the fields
* needed to provide loading stats. */
void startLoading(size_t size) {
void startLoading(size_t size, int rdbflags) {
/* Load the DB */
server.loading = 1;
server.loading_start_time = time(NULL);
server.loading_loaded_bytes = 0;
server.loading_total_bytes = size;
/* Fire the loading modules start event. */
int subevent;
if (rdbflags & RDBFLAGS_AOF_PREAMBLE)
subevent = REDISMODULE_SUBEVENT_LOADING_AOF_START;
else if(rdbflags & RDBFLAGS_REPLICATION)
subevent = REDISMODULE_SUBEVENT_LOADING_REPL_START;
else
subevent = REDISMODULE_SUBEVENT_LOADING_RDB_START;
moduleFireServerEvent(REDISMODULE_EVENT_LOADING,subevent,NULL);
}
/* Mark that we are loading in the global state and setup the fields
* needed to provide loading stats.
* 'filename' is optional and used for rdb-check on error */
void startLoadingFile(FILE *fp, char* filename) {
void startLoadingFile(FILE *fp, char* filename, int rdbflags) {
struct stat sb;
if (fstat(fileno(fp), &sb) == -1)
sb.st_size = 0;
rdbFileBeingLoaded = filename;
startLoading(sb.st_size);
startLoading(sb.st_size, rdbflags);
}
/* Refresh the loading progress info */
@ -1945,9 +1962,37 @@ void loadingProgress(off_t pos) {
}
/* Loading finished */
void stopLoading(void) {
void stopLoading(int success) {
server.loading = 0;
rdbFileBeingLoaded = NULL;
/* Fire the loading modules end event. */
moduleFireServerEvent(REDISMODULE_EVENT_LOADING,
success?
REDISMODULE_SUBEVENT_LOADING_ENDED:
REDISMODULE_SUBEVENT_LOADING_FAILED,
NULL);
}
void startSaving(int rdbflags) {
/* Fire the persistence modules end event. */
int subevent;
if (rdbflags & RDBFLAGS_AOF_PREAMBLE)
subevent = REDISMODULE_SUBEVENT_PERSISTENCE_AOF_START;
else if (getpid()!=server.pid)
subevent = REDISMODULE_SUBEVENT_PERSISTENCE_RDB_START;
else
subevent = REDISMODULE_SUBEVENT_PERSISTENCE_SYNC_RDB_START;
moduleFireServerEvent(REDISMODULE_EVENT_PERSISTENCE,subevent,NULL);
}
void stopSaving(int success) {
/* Fire the persistence modules end event. */
moduleFireServerEvent(REDISMODULE_EVENT_PERSISTENCE,
success?
REDISMODULE_SUBEVENT_PERSISTENCE_ENDED:
REDISMODULE_SUBEVENT_PERSISTENCE_FAILED,
NULL);
}
/* Track loading progress in order to serve client's from time to time
@ -1966,12 +2011,13 @@ void rdbLoadProgressCallback(rio *r, const void *buf, size_t len) {
replicationSendNewlineToMaster();
loadingProgress(r->processed_bytes);
processEventsWhileBlocked();
processModuleLoadingProgressEvent(0);
}
}
/* Load an RDB file from the rio stream 'rdb'. On success C_OK is returned,
* otherwise C_ERR is returned and 'errno' is set accordingly. */
int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi, int loading_aof) {
int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) {
uint64_t dbid;
int type, rdbver;
redisDb *db = server.db+0;
@ -2182,7 +2228,7 @@ int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi, int loading_aof) {
* received from the master. In the latter case, the master is
* responsible for key expiry. If we would expire keys here, the
* snapshot taken by the master may not be reflected on the slave. */
if (server.masterhost == NULL && !loading_aof && expiretime != -1 && expiretime < now) {
if (server.masterhost == NULL && !(rdbflags&RDBFLAGS_AOF_PREAMBLE) && expiretime != -1 && expiretime < now) {
decrRefCount(key);
decrRefCount(val);
} else {
@ -2243,17 +2289,17 @@ eoferr:
*
* If you pass an 'rsi' structure initialied with RDB_SAVE_OPTION_INIT, the
* loading code will fiil the information fields in the structure. */
int rdbLoad(char *filename, rdbSaveInfo *rsi) {
int rdbLoad(char *filename, rdbSaveInfo *rsi, int rdbflags) {
FILE *fp;
rio rdb;
int retval;
if ((fp = fopen(filename,"r")) == NULL) return C_ERR;
startLoadingFile(fp, filename);
startLoadingFile(fp, filename,rdbflags);
rioInitWithFile(&rdb,fp);
retval = rdbLoadRio(&rdb,rsi,0);
retval = rdbLoadRio(&rdb,rdbflags,rsi);
fclose(fp);
stopLoading();
stopLoading(retval==C_OK);
return retval;
}

View File

@ -121,8 +121,10 @@
#define RDB_LOAD_PLAIN (1<<1)
#define RDB_LOAD_SDS (1<<2)
#define RDB_SAVE_NONE 0
#define RDB_SAVE_AOF_PREAMBLE (1<<0)
/* flags on the purpose of rdb save or load */
#define RDBFLAGS_NONE 0
#define RDBFLAGS_AOF_PREAMBLE (1<<0)
#define RDBFLAGS_REPLICATION (1<<1)
int rdbSaveType(rio *rdb, unsigned char type);
int rdbLoadType(rio *rdb);
@ -135,7 +137,7 @@ uint64_t rdbLoadLen(rio *rdb, int *isencoded);
int rdbLoadLenByRef(rio *rdb, int *isencoded, uint64_t *lenptr);
int rdbSaveObjectType(rio *rdb, robj *o);
int rdbLoadObjectType(rio *rdb);
int rdbLoad(char *filename, rdbSaveInfo *rsi);
int rdbLoad(char *filename, rdbSaveInfo *rsi, int rdbflags);
int rdbSaveBackground(char *filename, rdbSaveInfo *rsi);
int rdbSaveToSlavesSockets(rdbSaveInfo *rsi);
void rdbRemoveTempFile(pid_t childpid);
@ -154,7 +156,8 @@ int rdbSaveBinaryDoubleValue(rio *rdb, double val);
int rdbLoadBinaryDoubleValue(rio *rdb, double *val);
int rdbSaveBinaryFloatValue(rio *rdb, float val);
int rdbLoadBinaryFloatValue(rio *rdb, float *val);
int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi, int loading_aof);
int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi);
int rdbSaveRio(rio *rdb, int *error, int rdbflags, rdbSaveInfo *rsi);
rdbSaveInfo *rdbPopulateSaveInfo(rdbSaveInfo *rsi);
#endif

View File

@ -202,7 +202,7 @@ int redis_check_rdb(char *rdbfilename, FILE *fp) {
}
expiretime = -1;
startLoadingFile(fp, rdbfilename);
startLoadingFile(fp, rdbfilename, RDBFLAGS_NONE);
while(1) {
robj *key, *val;
@ -316,7 +316,7 @@ int redis_check_rdb(char *rdbfilename, FILE *fp) {
}
if (closefile) fclose(fp);
stopLoading();
stopLoading(1);
return 0;
eoferr: /* unexpected end of file is handled here with a fatal exit */
@ -327,7 +327,7 @@ eoferr: /* unexpected end of file is handled here with a fatal exit */
}
err:
if (closefile) fclose(fp);
stopLoading();
stopLoading(0);
return 1;
}

View File

@ -185,6 +185,8 @@ typedef uint64_t RedisModuleTimerID;
#define REDISMODULE_EVENT_REPLICA_CHANGE 6
#define REDISMODULE_EVENT_MASTER_LINK_CHANGE 7
#define REDISMODULE_EVENT_CRON_LOOP 8
#define REDISMODULE_EVENT_MODULE_CHANGE 9
#define REDISMODULE_EVENT_LOADING_PROGRESS 10
typedef struct RedisModuleEvent {
uint64_t id; /* REDISMODULE_EVENT_... defines. */
@ -230,18 +232,28 @@ static const RedisModuleEvent
RedisModuleEvent_MasterLinkChange = {
REDISMODULE_EVENT_MASTER_LINK_CHANGE,
1
},
RedisModuleEvent_ModuleChange = {
REDISMODULE_EVENT_MODULE_CHANGE,
1
},
RedisModuleEvent_LoadingProgress = {
REDISMODULE_EVENT_LOADING_PROGRESS,
1
};
/* Those are values that are used for the 'subevent' callback argument. */
#define REDISMODULE_SUBEVENT_PERSISTENCE_RDB_START 0
#define REDISMODULE_SUBEVENT_PERSISTENCE_RDB_END 1
#define REDISMODULE_SUBEVENT_PERSISTENCE_AOF_START 2
#define REDISMODULE_SUBEVENT_PERSISTENCE_AOF_END 3
#define REDISMODULE_SUBEVENT_PERSISTENCE_AOF_START 1
#define REDISMODULE_SUBEVENT_PERSISTENCE_SYNC_RDB_START 2
#define REDISMODULE_SUBEVENT_PERSISTENCE_ENDED 3
#define REDISMODULE_SUBEVENT_PERSISTENCE_FAILED 4
#define REDISMODULE_SUBEVENT_LOADING_RDB_START 0
#define REDISMODULE_SUBEVENT_LOADING_RDB_END 1
#define REDISMODULE_SUBEVENT_LOADING_AOF_START 2
#define REDISMODULE_SUBEVENT_LOADING_AOF_END 3
#define REDISMODULE_SUBEVENT_LOADING_AOF_START 1
#define REDISMODULE_SUBEVENT_LOADING_REPL_START 2
#define REDISMODULE_SUBEVENT_LOADING_ENDED 3
#define REDISMODULE_SUBEVENT_LOADING_FAILED 4
#define REDISMODULE_SUBEVENT_CLIENT_CHANGE_CONNECTED 0
#define REDISMODULE_SUBEVENT_CLIENT_CHANGE_DISCONNECTED 1
@ -249,12 +261,21 @@ static const RedisModuleEvent
#define REDISMODULE_SUBEVENT_MASTER_LINK_UP 0
#define REDISMODULE_SUBEVENT_MASTER_LINK_DOWN 1
#define REDISMODULE_SUBEVENT_REPLICA_CHANGE_CONNECTED 0
#define REDISMODULE_SUBEVENT_REPLICA_CHANGE_DISCONNECTED 1
#define REDISMODULE_SUBEVENT_REPLICA_CHANGE_ONLINE 0
#define REDISMODULE_SUBEVENT_REPLICA_CHANGE_OFFLINE 1
#define REDISMODULE_EVENT_REPLROLECHANGED_NOW_MASTER 0
#define REDISMODULE_EVENT_REPLROLECHANGED_NOW_REPLICA 1
#define REDISMODULE_SUBEVENT_FLUSHDB_START 0
#define REDISMODULE_SUBEVENT_FLUSHDB_END 1
#define REDISMODULE_SUBEVENT_MODULE_LOADED 0
#define REDISMODULE_SUBEVENT_MODULE_UNLOADED 1
#define REDISMODULE_SUBEVENT_LOADING_PROGRESS_RDB 0
#define REDISMODULE_SUBEVENT_LOADING_PROGRESS_AOF 1
/* RedisModuleClientInfo flags. */
#define REDISMODULE_CLIENTINFO_FLAG_SSL (1<<0)
#define REDISMODULE_CLIENTINFO_FLAG_PUBSUB (1<<1)
@ -290,6 +311,22 @@ typedef struct RedisModuleClientInfo {
#define RedisModuleClientInfo RedisModuleClientInfoV1
#define REDISMODULE_REPLICATIONINFO_VERSION 1
typedef struct RedisModuleReplicationInfo {
uint64_t version; /* Not used since this structure is never passed
from the module to the core right now. Here
for future compatibility. */
int master; /* true if master, false if replica */
char *masterhost; /* master instance hostname for NOW_REPLICA */
int masterport; /* master instance port for NOW_REPLICA */
char *replid1; /* Main replication ID */
char *replid2; /* Secondary replication ID */
uint64_t repl1_offset; /* Main replication offset */
uint64_t repl2_offset; /* Offset of replid2 validity */
} RedisModuleReplicationInfoV1;
#define RedisModuleReplicationInfo RedisModuleReplicationInfoV1
#define REDISMODULE_FLUSHINFO_VERSION 1
typedef struct RedisModuleFlushInfo {
uint64_t version; /* Not used since this structure is never passed
@ -301,6 +338,39 @@ typedef struct RedisModuleFlushInfo {
#define RedisModuleFlushInfo RedisModuleFlushInfoV1
#define REDISMODULE_MODULE_CHANGE_VERSION 1
typedef struct RedisModuleModuleChange {
uint64_t version; /* Not used since this structure is never passed
from the module to the core right now. Here
for future compatibility. */
const char* module_name;/* Name of module loaded or unloaded. */
int32_t module_version; /* Module version. */
} RedisModuleModuleChangeV1;
#define RedisModuleModuleChange RedisModuleModuleChangeV1
#define REDISMODULE_CRON_LOOP_VERSION 1
typedef struct RedisModuleCronLoopInfo {
uint64_t version; /* Not used since this structure is never passed
from the module to the core right now. Here
for future compatibility. */
int32_t hz; /* Approximate number of events per second. */
} RedisModuleCronLoopV1;
#define RedisModuleCronLoop RedisModuleCronLoopV1
#define REDISMODULE_LOADING_PROGRESS_VERSION 1
typedef struct RedisModuleLoadingProgressInfo {
uint64_t version; /* Not used since this structure is never passed
from the module to the core right now. Here
for future compatibility. */
int32_t hz; /* Approximate number of events per second. */
int32_t progress; /* Approximate progress between 0 and 1024, or -1
* if unknown. */
} RedisModuleLoadingProgressV1;
#define RedisModuleLoadingProgress RedisModuleLoadingProgressV1
/* ------------------------- End of common defines ------------------------ */
#ifndef REDISMODULE_CORE
@ -420,6 +490,9 @@ char *REDISMODULE_API_FUNC(RedisModule_StringDMA)(RedisModuleKey *key, size_t *l
int REDISMODULE_API_FUNC(RedisModule_StringTruncate)(RedisModuleKey *key, size_t newlen);
mstime_t REDISMODULE_API_FUNC(RedisModule_GetExpire)(RedisModuleKey *key);
int REDISMODULE_API_FUNC(RedisModule_SetExpire)(RedisModuleKey *key, mstime_t expire);
void REDISMODULE_API_FUNC(RedisModule_ResetDataset)(int restart_aof, int async);
unsigned long long REDISMODULE_API_FUNC(RedisModule_DbSize)(RedisModuleCtx *ctx);
RedisModuleString *REDISMODULE_API_FUNC(RedisModule_RandomKey)(RedisModuleCtx *ctx);
int REDISMODULE_API_FUNC(RedisModule_ZsetAdd)(RedisModuleKey *key, double score, RedisModuleString *ele, int *flagsptr);
int REDISMODULE_API_FUNC(RedisModule_ZsetIncrby)(RedisModuleKey *key, double score, RedisModuleString *ele, int *flagsptr, double *newscore);
int REDISMODULE_API_FUNC(RedisModule_ZsetScore)(RedisModuleKey *key, RedisModuleString *ele, double *score);
@ -439,6 +512,7 @@ int REDISMODULE_API_FUNC(RedisModule_IsKeysPositionRequest)(RedisModuleCtx *ctx)
void REDISMODULE_API_FUNC(RedisModule_KeyAtPos)(RedisModuleCtx *ctx, int pos);
unsigned long long REDISMODULE_API_FUNC(RedisModule_GetClientId)(RedisModuleCtx *ctx);
int REDISMODULE_API_FUNC(RedisModule_GetClientInfoById)(void *ci, uint64_t id);
int REDISMODULE_API_FUNC(RedisModule_PublishMessage)(RedisModuleCtx *ctx, RedisModuleString *channel, RedisModuleString *message);
int REDISMODULE_API_FUNC(RedisModule_GetContextFlags)(RedisModuleCtx *ctx);
void *REDISMODULE_API_FUNC(RedisModule_PoolAlloc)(RedisModuleCtx *ctx, size_t bytes);
RedisModuleType *REDISMODULE_API_FUNC(RedisModule_CreateDataType)(RedisModuleCtx *ctx, const char *name, int encver, RedisModuleTypeMethods *typemethods);
@ -509,6 +583,9 @@ int REDISMODULE_API_FUNC(RedisModule_InfoAddFieldULongLong)(RedisModuleInfoCtx *
int REDISMODULE_API_FUNC(RedisModule_SubscribeToServerEvent)(RedisModuleCtx *ctx, RedisModuleEvent event, RedisModuleEventCallback callback);
int REDISMODULE_API_FUNC(RedisModule_SetLRUOrLFU)(RedisModuleKey *key, long long lfu_freq, long long lru_idle);
int REDISMODULE_API_FUNC(RedisModule_GetLRUOrLFU)(RedisModuleKey *key, long long *lfu_freq, long long *lru_idle);
RedisModuleBlockedClient *REDISMODULE_API_FUNC(RedisModule_BlockClientOnKeys)(RedisModuleCtx *ctx, RedisModuleCmdFunc reply_callback, RedisModuleCmdFunc timeout_callback, void (*free_privdata)(RedisModuleCtx*,void*), long long timeout_ms, RedisModuleString **keys, int numkeys, void *privdata);
void REDISMODULE_API_FUNC(RedisModule_SignalKeyAsReady)(RedisModuleCtx *ctx, RedisModuleString *key);
RedisModuleString *REDISMODULE_API_FUNC(RedisModule_GetBlockedClientReadyKey)(RedisModuleCtx *ctx);
/* Experimental APIs */
#ifdef REDISMODULE_EXPERIMENTAL_API
@ -622,6 +699,9 @@ static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int
REDISMODULE_GET_API(StringTruncate);
REDISMODULE_GET_API(GetExpire);
REDISMODULE_GET_API(SetExpire);
REDISMODULE_GET_API(ResetDataset);
REDISMODULE_GET_API(DbSize);
REDISMODULE_GET_API(RandomKey);
REDISMODULE_GET_API(ZsetAdd);
REDISMODULE_GET_API(ZsetIncrby);
REDISMODULE_GET_API(ZsetScore);
@ -708,9 +788,13 @@ static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int
REDISMODULE_GET_API(InfoAddFieldLongLong);
REDISMODULE_GET_API(InfoAddFieldULongLong);
REDISMODULE_GET_API(GetClientInfoById);
REDISMODULE_GET_API(PublishMessage);
REDISMODULE_GET_API(SubscribeToServerEvent);
REDISMODULE_GET_API(SetLRUOrLFU);
REDISMODULE_GET_API(GetLRUOrLFU);
REDISMODULE_GET_API(BlockClientOnKeys);
REDISMODULE_GET_API(SignalKeyAsReady);
REDISMODULE_GET_API(GetBlockedClientReadyKey);
#ifdef REDISMODULE_EXPERIMENTAL_API
REDISMODULE_GET_API(GetThreadSafeContext);

View File

@ -533,6 +533,12 @@ int masterTryPartialResynchronization(client *c) {
* has this state from the previous connection with the master. */
refreshGoodSlavesCount();
/* Fire the replica change modules event. */
moduleFireServerEvent(REDISMODULE_EVENT_REPLICA_CHANGE,
REDISMODULE_SUBEVENT_REPLICA_CHANGE_ONLINE,
NULL);
return C_OK; /* The caller can return, no full resync needed. */
need_full_resync:
@ -868,6 +874,10 @@ void putSlaveOnline(client *slave) {
return;
}
refreshGoodSlavesCount();
/* Fire the replica change modules event. */
moduleFireServerEvent(REDISMODULE_EVENT_REPLICA_CHANGE,
REDISMODULE_SUBEVENT_REPLICA_CHANGE_ONLINE,
NULL);
serverLog(LL_NOTICE,"Synchronization with replica %s succeeded",
replicationGetSlaveName(slave));
}
@ -1542,11 +1552,11 @@ void readSyncBulkPayload(connection *conn) {
* We'll restore it when the RDB is received. */
connBlock(conn);
connRecvTimeout(conn, server.repl_timeout*1000);
startLoading(server.repl_transfer_size);
startLoading(server.repl_transfer_size, RDBFLAGS_REPLICATION);
if (rdbLoadRio(&rdb,&rsi,0) != C_OK) {
if (rdbLoadRio(&rdb,RDBFLAGS_REPLICATION,&rsi) != C_OK) {
/* RDB loading failed. */
stopLoading();
stopLoading(0);
serverLog(LL_WARNING,
"Failed trying to load the MASTER synchronization DB "
"from socket");
@ -1567,7 +1577,7 @@ void readSyncBulkPayload(connection *conn) {
* gets promoted. */
return;
}
stopLoading();
stopLoading(1);
/* RDB loading succeeded if we reach this point. */
if (server.repl_diskless_load == REPL_DISKLESS_LOAD_SWAPDB) {
@ -1614,7 +1624,7 @@ void readSyncBulkPayload(connection *conn) {
cancelReplicationHandshake();
return;
}
if (rdbLoad(server.rdb_filename,&rsi) != C_OK) {
if (rdbLoad(server.rdb_filename,&rsi,RDBFLAGS_REPLICATION) != C_OK) {
serverLog(LL_WARNING,
"Failed trying to load the MASTER synchronization "
"DB from disk");
@ -1636,6 +1646,11 @@ void readSyncBulkPayload(connection *conn) {
server.repl_state = REPL_STATE_CONNECTED;
server.repl_down_since = 0;
/* Fire the master link modules event. */
moduleFireServerEvent(REDISMODULE_EVENT_MASTER_LINK_CHANGE,
REDISMODULE_SUBEVENT_MASTER_LINK_UP,
NULL);
/* After a full resynchroniziation we use the replication ID and
* offset of the master. The secondary ID / offset are cleared since
* we are starting a new history. */
@ -2314,12 +2329,31 @@ void replicationSetMaster(char *ip, int port) {
replicationDiscardCachedMaster();
replicationCacheMasterUsingMyself();
}
/* Fire the role change modules event. */
moduleFireServerEvent(REDISMODULE_EVENT_REPLICATION_ROLE_CHANGED,
REDISMODULE_EVENT_REPLROLECHANGED_NOW_REPLICA,
NULL);
/* Fire the master link modules event. */
if (server.repl_state == REPL_STATE_CONNECTED)
moduleFireServerEvent(REDISMODULE_EVENT_MASTER_LINK_CHANGE,
REDISMODULE_SUBEVENT_MASTER_LINK_DOWN,
NULL);
server.repl_state = REPL_STATE_CONNECT;
}
/* Cancel replication, setting the instance as a master itself. */
void replicationUnsetMaster(void) {
if (server.masterhost == NULL) return; /* Nothing to do. */
/* Fire the master link modules event. */
if (server.repl_state == REPL_STATE_CONNECTED)
moduleFireServerEvent(REDISMODULE_EVENT_MASTER_LINK_CHANGE,
REDISMODULE_SUBEVENT_MASTER_LINK_DOWN,
NULL);
sdsfree(server.masterhost);
server.masterhost = NULL;
/* When a slave is turned into a master, the current replication ID
@ -2348,11 +2382,22 @@ void replicationUnsetMaster(void) {
* starting from now. Otherwise the backlog will be freed after a
* failover if slaves do not connect immediately. */
server.repl_no_slaves_since = server.unixtime;
/* Fire the role change modules event. */
moduleFireServerEvent(REDISMODULE_EVENT_REPLICATION_ROLE_CHANGED,
REDISMODULE_EVENT_REPLROLECHANGED_NOW_MASTER,
NULL);
}
/* This function is called when the slave lose the connection with the
* master into an unexpected way. */
void replicationHandleMasterDisconnection(void) {
/* Fire the master link modules event. */
if (server.repl_state == REPL_STATE_CONNECTED)
moduleFireServerEvent(REDISMODULE_EVENT_MASTER_LINK_CHANGE,
REDISMODULE_SUBEVENT_MASTER_LINK_DOWN,
NULL);
server.master = NULL;
server.repl_state = REPL_STATE_CONNECT;
server.repl_down_since = server.unixtime;

View File

@ -2056,6 +2056,12 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
server.rdb_bgsave_scheduled = 0;
}
/* Fire the cron loop modules event. */
RedisModuleCronLoopV1 ei = {REDISMODULE_CRON_LOOP_VERSION,server.hz};
moduleFireServerEvent(REDISMODULE_EVENT_CRON_LOOP,
0,
&ei);
server.cronloops++;
return 1000/server.hz;
}
@ -3682,6 +3688,9 @@ int prepareForShutdown(int flags) {
}
}
/* Fire the shutdown modules event. */
moduleFireServerEvent(REDISMODULE_EVENT_SHUTDOWN,0,NULL);
/* Remove the pid file if possible and needed. */
if (server.daemonize || server.pidfile) {
serverLog(LL_NOTICE,"Removing the pid file.");
@ -4767,7 +4776,7 @@ void loadDataFromDisk(void) {
serverLog(LL_NOTICE,"DB loaded from append only file: %.3f seconds",(float)(ustime()-start)/1000000);
} else {
rdbSaveInfo rsi = RDB_SAVE_INFO_INIT;
if (rdbLoad(server.rdb_filename,&rsi) == C_OK) {
if (rdbLoad(server.rdb_filename,&rsi,RDBFLAGS_NONE) == C_OK) {
serverLog(LL_NOTICE,"DB loaded from disk: %.3f seconds",
(float)(ustime()-start)/1000000);

View File

@ -1602,6 +1602,10 @@ ssize_t rdbSaveModulesAux(rio *rdb, int when);
int moduleAllDatatypesHandleErrors();
sds modulesCollectInfo(sds info, sds section, int for_crash_report, int sections);
void moduleFireServerEvent(uint64_t eid, int subid, void *data);
void processModuleLoadingProgressEvent(int is_aof);
int moduleTryServeClientBlockedOnKey(client *c, robj *key);
void moduleUnblockClient(client *c);
int moduleClientIsBlockedOnKeys(client *c);
/* Utils */
long long ustime(void);
@ -1831,10 +1835,12 @@ void rdbPipeReadHandler(struct aeEventLoop *eventLoop, int fd, void *clientData,
void rdbPipeWriteHandlerConnRemoved(struct connection *conn);
/* Generic persistence functions */
void startLoadingFile(FILE* fp, char* filename);
void startLoading(size_t size);
void startLoadingFile(FILE* fp, char* filename, int rdbflags);
void startLoading(size_t size, int rdbflags);
void loadingProgress(off_t pos);
void stopLoading(void);
void stopLoading(int success);
void startSaving(int rdbflags);
void stopSaving(int success);
#define DISK_ERROR_TYPE_AOF 1 /* Don't accept writes: AOF errors. */
#define DISK_ERROR_TYPE_RDB 2 /* Don't accept writes: RDB errors. */
@ -1843,7 +1849,6 @@ int writeCommandsDeniedByDiskError(void);
/* RDB persistence */
#include "rdb.h"
int rdbSaveRio(rio *rdb, int *error, int flags, rdbSaveInfo *rsi);
void killRDBChild(void);
/* AOF persistence */
@ -1859,6 +1864,7 @@ void aofRewriteBufferReset(void);
unsigned long aofRewriteBufferSize(void);
ssize_t aofReadDiffFromParent(void);
void killAppendOnlyChild(void);
void restartAOFAfterSYNC();
/* Child info */
void openChildInfoPipe(void);
@ -2099,6 +2105,7 @@ robj *dbUnshareStringValue(redisDb *db, robj *key, robj *o);
#define EMPTYDB_ASYNC (1<<0) /* Reclaim memory in another thread. */
long long emptyDb(int dbnum, int flags, void(callback)(void*));
long long emptyDbGeneric(redisDb *dbarray, int dbnum, int flags, void(callback)(void*));
void flushAllDataAndResetRDB(int flags);
long long dbTotalServerKeyCount();
int selectDb(client *c, int id);