From eb706b42023d22fcc06e4c79973d0c53c09de7ba Mon Sep 17 00:00:00 2001 From: Oran Agra Date: Tue, 21 Jul 2015 11:55:17 +0300 Subject: [PATCH] introduce REPLCONF eof-supported and repl-diskless-load --- redis.conf | 4 ++ src/config.c | 9 ++++ src/networking.c | 1 + src/redis.c | 1 + src/redis.h | 5 ++- src/replication.c | 68 +++++++++++++++++++++---------- tests/integration/replication.tcl | 6 +-- 7 files changed, 69 insertions(+), 25 deletions(-) diff --git a/redis.conf b/redis.conf index 0f164af1..92718346 100644 --- a/redis.conf +++ b/redis.conf @@ -318,6 +318,10 @@ repl-diskless-sync no # it entirely just set it to 0 seconds and the transfer will start ASAP. repl-diskless-sync-delay 5 +# Enable diskless replication on slave side. +# Load RDB directly from the socket rather than saving it to disk first. +repl-diskless-load no + # Slaves send PINGs to server in a predefined interval. It's possible to change # this interval with the repl_ping_slave_period option. The default value is 10 # seconds. diff --git a/src/config.c b/src/config.c index fd56269e..8c24f93b 100644 --- a/src/config.c +++ b/src/config.c @@ -329,6 +329,10 @@ void loadServerConfigFromString(char *config) { if ((server.repl_diskless_sync = yesnotoi(argv[1])) == -1) { err = "argument must be 'yes' or 'no'"; goto loaderr; } + } else if (!strcasecmp(argv[0],"repl-diskless-load") && argc==2) { + if ((server.repl_diskless_load = yesnotoi(argv[1])) == -1) { + err = "argument must be 'yes' or 'no'"; goto loaderr; + } } else if (!strcasecmp(argv[0],"repl-diskless-sync-delay") && argc==2) { server.repl_diskless_sync_delay = atoi(argv[1]); if (server.repl_diskless_sync_delay < 0) { @@ -861,6 +865,8 @@ void configSetCommand(redisClient *c) { "repl-disable-tcp-nodelay",server.repl_disable_tcp_nodelay) { } config_set_bool_field( "repl-diskless-sync",server.repl_diskless_sync) { + } config_set_bool_field( + "repl-diskless-load",server.repl_diskless_load) { } config_set_bool_field( "cluster-require-full-coverage",server.cluster_require_full_coverage) { } config_set_bool_field( @@ -1109,6 +1115,8 @@ void configGetCommand(redisClient *c) { server.repl_disable_tcp_nodelay); config_get_bool_field("repl-diskless-sync", server.repl_diskless_sync); + config_get_bool_field("repl-diskless-load", + server.repl_diskless_load); config_get_bool_field("aof-rewrite-incremental-fsync", server.aof_rewrite_incremental_fsync); config_get_bool_field("aof-load-truncated", @@ -1780,6 +1788,7 @@ int rewriteConfig(char *path) { rewriteConfigBytesOption(state,"repl-backlog-ttl",server.repl_backlog_time_limit,REDIS_DEFAULT_REPL_BACKLOG_TIME_LIMIT); rewriteConfigYesNoOption(state,"repl-disable-tcp-nodelay",server.repl_disable_tcp_nodelay,REDIS_DEFAULT_REPL_DISABLE_TCP_NODELAY); rewriteConfigYesNoOption(state,"repl-diskless-sync",server.repl_diskless_sync,REDIS_DEFAULT_REPL_DISKLESS_SYNC); + rewriteConfigYesNoOption(state,"repl-diskless-load",server.repl_diskless_load,REDIS_DEFAULT_REPL_DISKLESS_LOAD); rewriteConfigNumericalOption(state,"repl-diskless-sync-delay",server.repl_diskless_sync_delay,REDIS_DEFAULT_REPL_DISKLESS_SYNC_DELAY); rewriteConfigNumericalOption(state,"slave-priority",server.slave_priority,REDIS_DEFAULT_SLAVE_PRIORITY); rewriteConfigNumericalOption(state,"min-slaves-to-write",server.repl_min_slaves_to_write,REDIS_DEFAULT_MIN_SLAVES_TO_WRITE); diff --git a/src/networking.c b/src/networking.c index eb033ae6..6369c9f6 100644 --- a/src/networking.c +++ b/src/networking.c @@ -121,6 +121,7 @@ redisClient *createClient(int fd) { c->pubsub_channels = dictCreate(&setDictType,NULL); c->pubsub_patterns = listCreate(); c->peerid = NULL; + c->repl_eof_supported = 0; listSetFreeMethod(c->pubsub_patterns,decrRefCountVoid); listSetMatchMethod(c->pubsub_patterns,listMatchObjects); if (fd != -1) listAddNodeTail(server.clients,c); diff --git a/src/redis.c b/src/redis.c index f963042e..fcabfca4 100644 --- a/src/redis.c +++ b/src/redis.c @@ -1522,6 +1522,7 @@ void initServerConfig(void) { server.repl_down_since = 0; /* Never connected, repl is down since EVER. */ server.repl_disable_tcp_nodelay = REDIS_DEFAULT_REPL_DISABLE_TCP_NODELAY; server.repl_diskless_sync = REDIS_DEFAULT_REPL_DISKLESS_SYNC; + server.repl_diskless_load = REDIS_DEFAULT_REPL_DISKLESS_LOAD; server.repl_diskless_sync_delay = REDIS_DEFAULT_REPL_DISKLESS_SYNC_DELAY; server.slave_priority = REDIS_DEFAULT_SLAVE_PRIORITY; server.master_repl_offset = 0; diff --git a/src/redis.h b/src/redis.h index 09ac289c..901bab9a 100644 --- a/src/redis.h +++ b/src/redis.h @@ -118,6 +118,7 @@ typedef long long mstime_t; /* millisecond time type. */ #define REDIS_DEFAULT_RDB_CHECKSUM 1 #define REDIS_DEFAULT_RDB_FILENAME "dump.rdb" #define REDIS_DEFAULT_REPL_DISKLESS_SYNC 0 +#define REDIS_DEFAULT_REPL_DISKLESS_LOAD 0 #define REDIS_DEFAULT_REPL_DISKLESS_SYNC_DELAY 5 #define REDIS_DEFAULT_SLAVE_SERVE_STALE_DATA 1 #define REDIS_DEFAULT_SLAVE_READ_ONLY 1 @@ -570,6 +571,7 @@ typedef struct redisClient { dict *pubsub_channels; /* channels a client is interested in (SUBSCRIBE) */ list *pubsub_patterns; /* patterns a client is interested in (SUBSCRIBE) */ sds peerid; /* Cached peer ID. */ + int repl_eof_supported; /* slave supports EOF based, diskless replication */ /* Response buffer */ int bufpos; @@ -828,7 +830,8 @@ struct redisServer { int repl_min_slaves_to_write; /* Min number of slaves to write. */ int repl_min_slaves_max_lag; /* Max lag of slaves to write. */ int repl_good_slaves_count; /* Number of slaves with lag <= max_lag. */ - int repl_diskless_sync; /* Send RDB to slaves sockets directly. */ + int repl_diskless_load; /* Slave parse RDB directly from the socket. */ + int repl_diskless_sync; /* Master send RDB to slaves sockets directly. */ int repl_diskless_sync_delay; /* Delay to start a diskless repl BGSAVE. */ /* Replication (slave) */ char *masterauth; /* AUTH with this password with master */ diff --git a/src/replication.c b/src/replication.c index 7cdfdb16..90ac9c4c 100644 --- a/src/replication.c +++ b/src/replication.c @@ -442,13 +442,13 @@ need_full_resync: * the script cache is flushed before to start. * * Returns REDIS_OK on success or REDIS_ERR otherwise. */ -int startBgsaveForReplication(void) { +int startBgsaveForReplication(int use_eof) { int retval; redisLog(REDIS_NOTICE,"Starting BGSAVE for SYNC with target: %s", - server.repl_diskless_sync ? "slaves sockets" : "disk"); + use_eof ? "slaves sockets" : "disk"); - if (server.repl_diskless_sync) + if (use_eof) retval = rdbSaveToSlavesSockets(); else retval = rdbSaveBackground(server.rdb_filename); @@ -553,7 +553,7 @@ void syncCommand(redisClient *c) { c->replstate = REDIS_REPL_WAIT_BGSAVE_START; redisLog(REDIS_NOTICE,"Waiting for next BGSAVE for SYNC"); } else { - if (server.repl_diskless_sync) { + if (server.repl_diskless_sync && c->repl_eof_supported) { /* Diskless replication RDB child is created inside * replicationCron() since we want to delay its start a * few seconds to wait for more slaves to arrive. */ @@ -562,7 +562,7 @@ void syncCommand(redisClient *c) { redisLog(REDIS_NOTICE,"Delay next BGSAVE for SYNC"); } else { /* Ok we don't have a BGSAVE in progress, let's start one. */ - if (startBgsaveForReplication() != REDIS_OK) { + if (startBgsaveForReplication(0) != REDIS_OK) { redisLog(REDIS_NOTICE,"Replication failed, can't BGSAVE"); addReplyError(c,"Unable to perform background save"); return; @@ -613,6 +613,8 @@ void replconfCommand(redisClient *c) { &port,NULL) != REDIS_OK)) return; c->slave_listening_port = port; + } else if (!strcasecmp(c->argv[j]->ptr,"eof-supported")) { + c->repl_eof_supported = 1; } else if (!strcasecmp(c->argv[j]->ptr,"ack")) { /* REPLCONF ACK is used by slave to inform the master the amount * of replication stream that it processed so far. It is an @@ -745,7 +747,8 @@ void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask) { * (if it had a disk or socket target). */ void updateSlavesWaitingBgsave(int bgsaveerr, int type) { listNode *ln; - int startbgsave = 0; + int slaves_waiting_eof = 0; + int slaves_waiting_noneof = 0; listIter li; listRewind(server.slaves,&li); @@ -753,7 +756,10 @@ void updateSlavesWaitingBgsave(int bgsaveerr, int type) { redisClient *slave = ln->value; if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) { - startbgsave = 1; + if (slave->repl_eof_supported) + slaves_waiting_eof++; + else + slaves_waiting_noneof++; slave->replstate = REDIS_REPL_WAIT_BGSAVE_END; } else if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) { struct redis_stat buf; @@ -801,8 +807,9 @@ void updateSlavesWaitingBgsave(int bgsaveerr, int type) { } } } - if (startbgsave) { - if (startBgsaveForReplication() != REDIS_OK) { + if (slaves_waiting_eof || slaves_waiting_noneof) { + /* if there is at least one slave that doesn't support EOF, we'll start an non-eof replication */ + if (startBgsaveForReplication(slaves_waiting_noneof==0) != REDIS_OK) { listIter li; listRewind(server.slaves,&li); @@ -825,7 +832,7 @@ void replicationAbortSyncTransfer(void) { aeDeleteFileEvent(server.el,server.repl_transfer_s,AE_READABLE); close(server.repl_transfer_s); - if (!server.repl_diskless_sync) { + if (server.repl_transfer_fd!=-1) { close(server.repl_transfer_fd); unlink(server.repl_transfer_tmpfile); zfree(server.repl_transfer_tmpfile); @@ -938,18 +945,20 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) { * at the next call. */ server.repl_transfer_size = 0; redisLog(REDIS_NOTICE, - "MASTER <-> SLAVE sync: receiving streamed RDB from master"); + "MASTER <-> SLAVE sync: receiving streamed RDB from master with EOF %s", + server.repl_diskless_load? "to parser":"to disk"); } else { usemark = 0; server.repl_transfer_size = strtol(buf+1,NULL,10); redisLog(REDIS_NOTICE, - "MASTER <-> SLAVE sync: receiving %lld bytes from master", - (long long) server.repl_transfer_size); + "MASTER <-> SLAVE sync: receiving %lld bytes from master %s", + (long long) server.repl_transfer_size, + server.repl_diskless_load? "to parser":"to disk"); } return; } - if (!server.repl_diskless_sync) { + if (!server.repl_diskless_load) { /* read the data from the socket, store it to a file and search for the EOF */ if (usemark) { readlen = sizeof(buf); @@ -1033,7 +1042,7 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) { * time for non blocking loading. */ aeDeleteFileEvent(server.el,server.repl_transfer_s,AE_READABLE); redisLog(REDIS_NOTICE, "MASTER <-> SLAVE sync: Loading DB in memory"); - if (server.repl_diskless_sync) { + if (server.repl_diskless_load) { rio rdb; rioInitWithFd(&rdb,fd); /* Put the socket in blocking mode to simplify RDB transfer. @@ -1370,6 +1379,18 @@ void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) { } sdsfree(err); } + + /* Inform the master that this slave supports EOF marker of diskless-sync */ + { + err = sendSynchronousCommand(fd,"REPLCONF","eof-supported","yes", + NULL); + /* Ignore the error if any, not all the Redis versions support + * REPLCONF eof-supported. */ + if (err[0] == '-') { + redisLog(REDIS_NOTICE,"(Non critical) Master does not understand REPLCONF eof-supported: %s", err); + } + sdsfree(err); + } /* Try a partial resynchonization. If we don't have a cached master * slaveTryPartialResynchronization() will at least try to use PSYNC @@ -1395,7 +1416,7 @@ void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) { } /* Prepare a suitable temp file for bulk transfer */ - if (!server.repl_diskless_sync) { + if (!server.repl_diskless_load) { while(maxtries--) { snprintf(tmpfile,256, "temp-%d.%ld.rdb",(int)server.unixtime,(long int)getpid()); @@ -2133,7 +2154,8 @@ void replicationCron(void) { * slaves in WAIT_BGSAVE_START state. */ if (server.rdb_child_pid == -1 && server.aof_child_pid == -1) { time_t idle, max_idle = 0; - int slaves_waiting = 0; + int slaves_waiting_eof = 0; + int slaves_waiting_noneof = 0; listNode *ln; listIter li; @@ -2143,14 +2165,18 @@ void replicationCron(void) { if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) { idle = server.unixtime - slave->lastinteraction; if (idle > max_idle) max_idle = idle; - slaves_waiting++; + if (slave->repl_eof_supported) + slaves_waiting_eof++; + else + slaves_waiting_noneof++; } } - if (slaves_waiting && max_idle > server.repl_diskless_sync_delay) { + if ((slaves_waiting_eof || slaves_waiting_noneof) && max_idle > server.repl_diskless_sync_delay) { /* Start a BGSAVE. Usually with socket target, or with disk target - * if there was a recent socket -> disk config change. */ - if (startBgsaveForReplication() == REDIS_OK) { + * if there was a recent socket -> disk config change. + * if there is at least one slave that doesn't support EOF, we'll start an non-eof replication */ + if (startBgsaveForReplication(slaves_waiting_noneof==0) == REDIS_OK) { /* It started! We need to change the state of slaves * from WAIT_BGSAVE_START to WAIT_BGSAVE_END in case * the current target is disk. Otherwise it was already done diff --git a/tests/integration/replication.tcl b/tests/integration/replication.tcl index ece76491..bc983e95 100644 --- a/tests/integration/replication.tcl +++ b/tests/integration/replication.tcl @@ -152,9 +152,9 @@ foreach mdl {no yes} { lappend slaves [srv 0 client] test "Connect multiple slaves at the same time (issue #141), master diskless=$mdl, slave diskless=$sdl" { # Send SLAVEOF commands to slaves - [lindex $slaves 0] config set repl-diskless-sync $sdl - [lindex $slaves 1] config set repl-diskless-sync $sdl - [lindex $slaves 2] config set repl-diskless-sync $sdl + [lindex $slaves 0] config set repl-diskless-load $sdl + [lindex $slaves 1] config set repl-diskless-load $sdl + [lindex $slaves 2] config set repl-diskless-load $sdl [lindex $slaves 0] slaveof $master_host $master_port [lindex $slaves 1] slaveof $master_host $master_port [lindex $slaves 2] slaveof $master_host $master_port