mirror of
https://github.com/fluencelabs/redis
synced 2025-06-13 09:11:20 +00:00
Module API for Forking
* create module API for forking child processes. * refactor duplicate code around creating and tracking forks by AOF and RDB. * child processes listen to SIGUSR1 and dies exitFromChild in order to eliminate a valgrind warning of unhandled signal. * note that BGSAVE error reply has changed. valgrind error is: Process terminating with default action of signal 10 (SIGUSR1)
This commit is contained in:
90
src/server.c
90
src/server.c
@ -1447,12 +1447,18 @@ int incrementallyRehash(int dbid) {
|
||||
* for dict.c to resize the hash tables accordingly to the fact we have o not
|
||||
* running childs. */
|
||||
void updateDictResizePolicy(void) {
|
||||
if (server.rdb_child_pid == -1 && server.aof_child_pid == -1)
|
||||
if (!hasForkChild())
|
||||
dictEnableResize();
|
||||
else
|
||||
dictDisableResize();
|
||||
}
|
||||
|
||||
int hasForkChild() {
|
||||
return server.rdb_child_pid != -1 ||
|
||||
server.aof_child_pid != -1 ||
|
||||
server.module_child_pid != -1;
|
||||
}
|
||||
|
||||
/* ======================= Cron: called every 100 ms ======================== */
|
||||
|
||||
/* Add a sample to the operations per second array of samples. */
|
||||
@ -1689,7 +1695,7 @@ void databasesCron(void) {
|
||||
/* Perform hash tables rehashing if needed, but only if there are no
|
||||
* other processes saving the DB on disk. Otherwise rehashing is bad
|
||||
* as will cause a lot of copy-on-write of memory pages. */
|
||||
if (server.rdb_child_pid == -1 && server.aof_child_pid == -1) {
|
||||
if (!hasForkChild()) {
|
||||
/* We use global counters so if we stop the computation at a given
|
||||
* DB we'll be able to start from the successive in the next
|
||||
* cron loop iteration. */
|
||||
@ -1886,15 +1892,14 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
|
||||
|
||||
/* Start a scheduled AOF rewrite if this was requested by the user while
|
||||
* a BGSAVE was in progress. */
|
||||
if (server.rdb_child_pid == -1 && server.aof_child_pid == -1 &&
|
||||
if (!hasForkChild() &&
|
||||
server.aof_rewrite_scheduled)
|
||||
{
|
||||
rewriteAppendOnlyFileBackground();
|
||||
}
|
||||
|
||||
/* Check if a background saving or AOF rewrite in progress terminated. */
|
||||
if (server.rdb_child_pid != -1 || server.aof_child_pid != -1 ||
|
||||
ldbPendingChildren())
|
||||
if (hasForkChild() || ldbPendingChildren())
|
||||
{
|
||||
int statloc;
|
||||
pid_t pid;
|
||||
@ -1907,16 +1912,20 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
|
||||
|
||||
if (pid == -1) {
|
||||
serverLog(LL_WARNING,"wait3() returned an error: %s. "
|
||||
"rdb_child_pid = %d, aof_child_pid = %d",
|
||||
"rdb_child_pid = %d, aof_child_pid = %d, module_child_pid = %d",
|
||||
strerror(errno),
|
||||
(int) server.rdb_child_pid,
|
||||
(int) server.aof_child_pid);
|
||||
(int) server.aof_child_pid,
|
||||
(int) server.module_child_pid);
|
||||
} else if (pid == server.rdb_child_pid) {
|
||||
backgroundSaveDoneHandler(exitcode,bysignal);
|
||||
if (!bysignal && exitcode == 0) receiveChildInfo();
|
||||
} else if (pid == server.aof_child_pid) {
|
||||
backgroundRewriteDoneHandler(exitcode,bysignal);
|
||||
if (!bysignal && exitcode == 0) receiveChildInfo();
|
||||
} else if (pid == server.module_child_pid) {
|
||||
ModuleForkDoneHandler(exitcode,bysignal);
|
||||
if (!bysignal && exitcode == 0) receiveChildInfo();
|
||||
} else {
|
||||
if (!ldbRemoveChild(pid)) {
|
||||
serverLog(LL_WARNING,
|
||||
@ -1954,8 +1963,7 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
|
||||
|
||||
/* Trigger an AOF rewrite if needed. */
|
||||
if (server.aof_state == AOF_ON &&
|
||||
server.rdb_child_pid == -1 &&
|
||||
server.aof_child_pid == -1 &&
|
||||
!hasForkChild() &&
|
||||
server.aof_rewrite_perc &&
|
||||
server.aof_current_size > server.aof_rewrite_min_size)
|
||||
{
|
||||
@ -2013,7 +2021,7 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
|
||||
* Note: this code must be after the replicationCron() call above so
|
||||
* make sure when refactoring this file to keep this order. This is useful
|
||||
* because we want to give priority to RDB savings for replication. */
|
||||
if (server.rdb_child_pid == -1 && server.aof_child_pid == -1 &&
|
||||
if (!hasForkChild() &&
|
||||
server.rdb_bgsave_scheduled &&
|
||||
(server.unixtime-server.lastbgsave_try > CONFIG_BGSAVE_RETRY_DELAY ||
|
||||
server.lastbgsave_status == C_OK))
|
||||
@ -2784,6 +2792,7 @@ void initServer(void) {
|
||||
server.cronloops = 0;
|
||||
server.rdb_child_pid = -1;
|
||||
server.aof_child_pid = -1;
|
||||
server.module_child_pid = -1;
|
||||
server.rdb_child_type = RDB_CHILD_TYPE_NONE;
|
||||
server.rdb_bgsave_scheduled = 0;
|
||||
server.child_info_pipe[0] = -1;
|
||||
@ -2802,6 +2811,7 @@ void initServer(void) {
|
||||
server.stat_peak_memory = 0;
|
||||
server.stat_rdb_cow_bytes = 0;
|
||||
server.stat_aof_cow_bytes = 0;
|
||||
server.stat_module_cow_bytes = 0;
|
||||
server.cron_malloc_stats.zmalloc_used = 0;
|
||||
server.cron_malloc_stats.process_rss = 0;
|
||||
server.cron_malloc_stats.allocator_allocated = 0;
|
||||
@ -4040,7 +4050,9 @@ sds genRedisInfoString(char *section) {
|
||||
"aof_current_rewrite_time_sec:%jd\r\n"
|
||||
"aof_last_bgrewrite_status:%s\r\n"
|
||||
"aof_last_write_status:%s\r\n"
|
||||
"aof_last_cow_size:%zu\r\n",
|
||||
"aof_last_cow_size:%zu\r\n"
|
||||
"module_fork_in_progress:%d\r\n"
|
||||
"module_fork_last_cow_size:%zu\r\n",
|
||||
server.loading,
|
||||
server.dirty,
|
||||
server.rdb_child_pid != -1,
|
||||
@ -4058,7 +4070,9 @@ sds genRedisInfoString(char *section) {
|
||||
-1 : time(NULL)-server.aof_rewrite_time_start),
|
||||
(server.aof_lastbgrewrite_status == C_OK) ? "ok" : "err",
|
||||
(server.aof_last_write_status == C_OK) ? "ok" : "err",
|
||||
server.stat_aof_cow_bytes);
|
||||
server.stat_aof_cow_bytes,
|
||||
server.module_child_pid != -1,
|
||||
server.stat_module_cow_bytes);
|
||||
|
||||
if (server.aof_enabled) {
|
||||
info = sdscatprintf(info,
|
||||
@ -4554,6 +4568,58 @@ void setupSignalHandlers(void) {
|
||||
return;
|
||||
}
|
||||
|
||||
static void sigKillChildHandler(int sig) {
|
||||
UNUSED(sig);
|
||||
/* this handler is needed to resolve a valgrind warning */
|
||||
serverLogFromHandler(LL_WARNING, "Received SIGUSR1 in child, exiting now.");
|
||||
exitFromChild(1);
|
||||
}
|
||||
|
||||
void setupChildSignalHandlers(void) {
|
||||
struct sigaction act;
|
||||
|
||||
/* When the SA_SIGINFO flag is set in sa_flags then sa_sigaction is used.
|
||||
* Otherwise, sa_handler is used. */
|
||||
sigemptyset(&act.sa_mask);
|
||||
act.sa_flags = 0;
|
||||
act.sa_handler = sigKillChildHandler;
|
||||
sigaction(SIGUSR1, &act, NULL);
|
||||
return;
|
||||
}
|
||||
|
||||
int redisFork() {
|
||||
int childpid;
|
||||
long long start = ustime();
|
||||
if ((childpid = fork()) == 0) {
|
||||
/* Child */
|
||||
closeListeningSockets(0);
|
||||
setupChildSignalHandlers();
|
||||
} else {
|
||||
/* Parent */
|
||||
server.stat_fork_time = ustime()-start;
|
||||
server.stat_fork_rate = (double) zmalloc_used_memory() * 1000000 / server.stat_fork_time / (1024*1024*1024); /* GB per second. */
|
||||
latencyAddSampleIfNeeded("fork",server.stat_fork_time/1000);
|
||||
if (childpid == -1) {
|
||||
return -1;
|
||||
}
|
||||
updateDictResizePolicy();
|
||||
}
|
||||
return childpid;
|
||||
}
|
||||
|
||||
void sendChildCOWInfo(int ptype, char *pname) {
|
||||
size_t private_dirty = zmalloc_get_private_dirty(-1);
|
||||
|
||||
if (private_dirty) {
|
||||
serverLog(LL_NOTICE,
|
||||
"%s: %zu MB of memory used by copy-on-write",
|
||||
pname, private_dirty/(1024*1024));
|
||||
}
|
||||
|
||||
server.child_info_data.cow_size = private_dirty;
|
||||
sendChildInfo(ptype);
|
||||
}
|
||||
|
||||
void memtest(size_t megabytes, int passes);
|
||||
|
||||
/* Returns 1 if there is --sentinel among the arguments or if
|
||||
|
Reference in New Issue
Block a user