introduce REPLCONF eof-supported and repl-diskless-load

This commit is contained in:
Oran Agra 2015-07-21 11:55:17 +03:00
parent 9e67df2a39
commit eb706b4202
7 changed files with 69 additions and 25 deletions

View File

@ -318,6 +318,10 @@ repl-diskless-sync no
# it entirely just set it to 0 seconds and the transfer will start ASAP.
repl-diskless-sync-delay 5
# Enable diskless replication on slave side.
# Load RDB directly from the socket rather than saving it to disk first.
repl-diskless-load no
# Slaves send PINGs to server in a predefined interval. It's possible to change
# this interval with the repl_ping_slave_period option. The default value is 10
# seconds.

View File

@ -329,6 +329,10 @@ void loadServerConfigFromString(char *config) {
if ((server.repl_diskless_sync = yesnotoi(argv[1])) == -1) {
err = "argument must be 'yes' or 'no'"; goto loaderr;
}
} else if (!strcasecmp(argv[0],"repl-diskless-load") && argc==2) {
if ((server.repl_diskless_load = yesnotoi(argv[1])) == -1) {
err = "argument must be 'yes' or 'no'"; goto loaderr;
}
} else if (!strcasecmp(argv[0],"repl-diskless-sync-delay") && argc==2) {
server.repl_diskless_sync_delay = atoi(argv[1]);
if (server.repl_diskless_sync_delay < 0) {
@ -861,6 +865,8 @@ void configSetCommand(redisClient *c) {
"repl-disable-tcp-nodelay",server.repl_disable_tcp_nodelay) {
} config_set_bool_field(
"repl-diskless-sync",server.repl_diskless_sync) {
} config_set_bool_field(
"repl-diskless-load",server.repl_diskless_load) {
} config_set_bool_field(
"cluster-require-full-coverage",server.cluster_require_full_coverage) {
} config_set_bool_field(
@ -1109,6 +1115,8 @@ void configGetCommand(redisClient *c) {
server.repl_disable_tcp_nodelay);
config_get_bool_field("repl-diskless-sync",
server.repl_diskless_sync);
config_get_bool_field("repl-diskless-load",
server.repl_diskless_load);
config_get_bool_field("aof-rewrite-incremental-fsync",
server.aof_rewrite_incremental_fsync);
config_get_bool_field("aof-load-truncated",
@ -1780,6 +1788,7 @@ int rewriteConfig(char *path) {
rewriteConfigBytesOption(state,"repl-backlog-ttl",server.repl_backlog_time_limit,REDIS_DEFAULT_REPL_BACKLOG_TIME_LIMIT);
rewriteConfigYesNoOption(state,"repl-disable-tcp-nodelay",server.repl_disable_tcp_nodelay,REDIS_DEFAULT_REPL_DISABLE_TCP_NODELAY);
rewriteConfigYesNoOption(state,"repl-diskless-sync",server.repl_diskless_sync,REDIS_DEFAULT_REPL_DISKLESS_SYNC);
rewriteConfigYesNoOption(state,"repl-diskless-load",server.repl_diskless_load,REDIS_DEFAULT_REPL_DISKLESS_LOAD);
rewriteConfigNumericalOption(state,"repl-diskless-sync-delay",server.repl_diskless_sync_delay,REDIS_DEFAULT_REPL_DISKLESS_SYNC_DELAY);
rewriteConfigNumericalOption(state,"slave-priority",server.slave_priority,REDIS_DEFAULT_SLAVE_PRIORITY);
rewriteConfigNumericalOption(state,"min-slaves-to-write",server.repl_min_slaves_to_write,REDIS_DEFAULT_MIN_SLAVES_TO_WRITE);

View File

@ -121,6 +121,7 @@ redisClient *createClient(int fd) {
c->pubsub_channels = dictCreate(&setDictType,NULL);
c->pubsub_patterns = listCreate();
c->peerid = NULL;
c->repl_eof_supported = 0;
listSetFreeMethod(c->pubsub_patterns,decrRefCountVoid);
listSetMatchMethod(c->pubsub_patterns,listMatchObjects);
if (fd != -1) listAddNodeTail(server.clients,c);

View File

@ -1522,6 +1522,7 @@ void initServerConfig(void) {
server.repl_down_since = 0; /* Never connected, repl is down since EVER. */
server.repl_disable_tcp_nodelay = REDIS_DEFAULT_REPL_DISABLE_TCP_NODELAY;
server.repl_diskless_sync = REDIS_DEFAULT_REPL_DISKLESS_SYNC;
server.repl_diskless_load = REDIS_DEFAULT_REPL_DISKLESS_LOAD;
server.repl_diskless_sync_delay = REDIS_DEFAULT_REPL_DISKLESS_SYNC_DELAY;
server.slave_priority = REDIS_DEFAULT_SLAVE_PRIORITY;
server.master_repl_offset = 0;

View File

@ -118,6 +118,7 @@ typedef long long mstime_t; /* millisecond time type. */
#define REDIS_DEFAULT_RDB_CHECKSUM 1
#define REDIS_DEFAULT_RDB_FILENAME "dump.rdb"
#define REDIS_DEFAULT_REPL_DISKLESS_SYNC 0
#define REDIS_DEFAULT_REPL_DISKLESS_LOAD 0
#define REDIS_DEFAULT_REPL_DISKLESS_SYNC_DELAY 5
#define REDIS_DEFAULT_SLAVE_SERVE_STALE_DATA 1
#define REDIS_DEFAULT_SLAVE_READ_ONLY 1
@ -570,6 +571,7 @@ typedef struct redisClient {
dict *pubsub_channels; /* channels a client is interested in (SUBSCRIBE) */
list *pubsub_patterns; /* patterns a client is interested in (SUBSCRIBE) */
sds peerid; /* Cached peer ID. */
int repl_eof_supported; /* slave supports EOF based, diskless replication */
/* Response buffer */
int bufpos;
@ -828,7 +830,8 @@ struct redisServer {
int repl_min_slaves_to_write; /* Min number of slaves to write. */
int repl_min_slaves_max_lag; /* Max lag of <count> slaves to write. */
int repl_good_slaves_count; /* Number of slaves with lag <= max_lag. */
int repl_diskless_sync; /* Send RDB to slaves sockets directly. */
int repl_diskless_load; /* Slave parse RDB directly from the socket. */
int repl_diskless_sync; /* Master send RDB to slaves sockets directly. */
int repl_diskless_sync_delay; /* Delay to start a diskless repl BGSAVE. */
/* Replication (slave) */
char *masterauth; /* AUTH with this password with master */

View File

@ -442,13 +442,13 @@ need_full_resync:
* the script cache is flushed before to start.
*
* Returns REDIS_OK on success or REDIS_ERR otherwise. */
int startBgsaveForReplication(void) {
int startBgsaveForReplication(int use_eof) {
int retval;
redisLog(REDIS_NOTICE,"Starting BGSAVE for SYNC with target: %s",
server.repl_diskless_sync ? "slaves sockets" : "disk");
use_eof ? "slaves sockets" : "disk");
if (server.repl_diskless_sync)
if (use_eof)
retval = rdbSaveToSlavesSockets();
else
retval = rdbSaveBackground(server.rdb_filename);
@ -553,7 +553,7 @@ void syncCommand(redisClient *c) {
c->replstate = REDIS_REPL_WAIT_BGSAVE_START;
redisLog(REDIS_NOTICE,"Waiting for next BGSAVE for SYNC");
} else {
if (server.repl_diskless_sync) {
if (server.repl_diskless_sync && c->repl_eof_supported) {
/* Diskless replication RDB child is created inside
* replicationCron() since we want to delay its start a
* few seconds to wait for more slaves to arrive. */
@ -562,7 +562,7 @@ void syncCommand(redisClient *c) {
redisLog(REDIS_NOTICE,"Delay next BGSAVE for SYNC");
} else {
/* Ok we don't have a BGSAVE in progress, let's start one. */
if (startBgsaveForReplication() != REDIS_OK) {
if (startBgsaveForReplication(0) != REDIS_OK) {
redisLog(REDIS_NOTICE,"Replication failed, can't BGSAVE");
addReplyError(c,"Unable to perform background save");
return;
@ -613,6 +613,8 @@ void replconfCommand(redisClient *c) {
&port,NULL) != REDIS_OK))
return;
c->slave_listening_port = port;
} else if (!strcasecmp(c->argv[j]->ptr,"eof-supported")) {
c->repl_eof_supported = 1;
} else if (!strcasecmp(c->argv[j]->ptr,"ack")) {
/* REPLCONF ACK is used by slave to inform the master the amount
* of replication stream that it processed so far. It is an
@ -745,7 +747,8 @@ void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask) {
* (if it had a disk or socket target). */
void updateSlavesWaitingBgsave(int bgsaveerr, int type) {
listNode *ln;
int startbgsave = 0;
int slaves_waiting_eof = 0;
int slaves_waiting_noneof = 0;
listIter li;
listRewind(server.slaves,&li);
@ -753,7 +756,10 @@ void updateSlavesWaitingBgsave(int bgsaveerr, int type) {
redisClient *slave = ln->value;
if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) {
startbgsave = 1;
if (slave->repl_eof_supported)
slaves_waiting_eof++;
else
slaves_waiting_noneof++;
slave->replstate = REDIS_REPL_WAIT_BGSAVE_END;
} else if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) {
struct redis_stat buf;
@ -801,8 +807,9 @@ void updateSlavesWaitingBgsave(int bgsaveerr, int type) {
}
}
}
if (startbgsave) {
if (startBgsaveForReplication() != REDIS_OK) {
if (slaves_waiting_eof || slaves_waiting_noneof) {
/* if there is at least one slave that doesn't support EOF, we'll start an non-eof replication */
if (startBgsaveForReplication(slaves_waiting_noneof==0) != REDIS_OK) {
listIter li;
listRewind(server.slaves,&li);
@ -825,7 +832,7 @@ void replicationAbortSyncTransfer(void) {
aeDeleteFileEvent(server.el,server.repl_transfer_s,AE_READABLE);
close(server.repl_transfer_s);
if (!server.repl_diskless_sync) {
if (server.repl_transfer_fd!=-1) {
close(server.repl_transfer_fd);
unlink(server.repl_transfer_tmpfile);
zfree(server.repl_transfer_tmpfile);
@ -938,18 +945,20 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) {
* at the next call. */
server.repl_transfer_size = 0;
redisLog(REDIS_NOTICE,
"MASTER <-> SLAVE sync: receiving streamed RDB from master");
"MASTER <-> SLAVE sync: receiving streamed RDB from master with EOF %s",
server.repl_diskless_load? "to parser":"to disk");
} else {
usemark = 0;
server.repl_transfer_size = strtol(buf+1,NULL,10);
redisLog(REDIS_NOTICE,
"MASTER <-> SLAVE sync: receiving %lld bytes from master",
(long long) server.repl_transfer_size);
"MASTER <-> SLAVE sync: receiving %lld bytes from master %s",
(long long) server.repl_transfer_size,
server.repl_diskless_load? "to parser":"to disk");
}
return;
}
if (!server.repl_diskless_sync) {
if (!server.repl_diskless_load) {
/* read the data from the socket, store it to a file and search for the EOF */
if (usemark) {
readlen = sizeof(buf);
@ -1033,7 +1042,7 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) {
* time for non blocking loading. */
aeDeleteFileEvent(server.el,server.repl_transfer_s,AE_READABLE);
redisLog(REDIS_NOTICE, "MASTER <-> SLAVE sync: Loading DB in memory");
if (server.repl_diskless_sync) {
if (server.repl_diskless_load) {
rio rdb;
rioInitWithFd(&rdb,fd);
/* Put the socket in blocking mode to simplify RDB transfer.
@ -1370,6 +1379,18 @@ void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) {
}
sdsfree(err);
}
/* Inform the master that this slave supports EOF marker of diskless-sync */
{
err = sendSynchronousCommand(fd,"REPLCONF","eof-supported","yes",
NULL);
/* Ignore the error if any, not all the Redis versions support
* REPLCONF eof-supported. */
if (err[0] == '-') {
redisLog(REDIS_NOTICE,"(Non critical) Master does not understand REPLCONF eof-supported: %s", err);
}
sdsfree(err);
}
/* Try a partial resynchonization. If we don't have a cached master
* slaveTryPartialResynchronization() will at least try to use PSYNC
@ -1395,7 +1416,7 @@ void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) {
}
/* Prepare a suitable temp file for bulk transfer */
if (!server.repl_diskless_sync) {
if (!server.repl_diskless_load) {
while(maxtries--) {
snprintf(tmpfile,256,
"temp-%d.%ld.rdb",(int)server.unixtime,(long int)getpid());
@ -2133,7 +2154,8 @@ void replicationCron(void) {
* slaves in WAIT_BGSAVE_START state. */
if (server.rdb_child_pid == -1 && server.aof_child_pid == -1) {
time_t idle, max_idle = 0;
int slaves_waiting = 0;
int slaves_waiting_eof = 0;
int slaves_waiting_noneof = 0;
listNode *ln;
listIter li;
@ -2143,14 +2165,18 @@ void replicationCron(void) {
if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) {
idle = server.unixtime - slave->lastinteraction;
if (idle > max_idle) max_idle = idle;
slaves_waiting++;
if (slave->repl_eof_supported)
slaves_waiting_eof++;
else
slaves_waiting_noneof++;
}
}
if (slaves_waiting && max_idle > server.repl_diskless_sync_delay) {
if ((slaves_waiting_eof || slaves_waiting_noneof) && max_idle > server.repl_diskless_sync_delay) {
/* Start a BGSAVE. Usually with socket target, or with disk target
* if there was a recent socket -> disk config change. */
if (startBgsaveForReplication() == REDIS_OK) {
* if there was a recent socket -> disk config change.
* if there is at least one slave that doesn't support EOF, we'll start an non-eof replication */
if (startBgsaveForReplication(slaves_waiting_noneof==0) == REDIS_OK) {
/* It started! We need to change the state of slaves
* from WAIT_BGSAVE_START to WAIT_BGSAVE_END in case
* the current target is disk. Otherwise it was already done

View File

@ -152,9 +152,9 @@ foreach mdl {no yes} {
lappend slaves [srv 0 client]
test "Connect multiple slaves at the same time (issue #141), master diskless=$mdl, slave diskless=$sdl" {
# Send SLAVEOF commands to slaves
[lindex $slaves 0] config set repl-diskless-sync $sdl
[lindex $slaves 1] config set repl-diskless-sync $sdl
[lindex $slaves 2] config set repl-diskless-sync $sdl
[lindex $slaves 0] config set repl-diskless-load $sdl
[lindex $slaves 1] config set repl-diskless-load $sdl
[lindex $slaves 2] config set repl-diskless-load $sdl
[lindex $slaves 0] slaveof $master_host $master_port
[lindex $slaves 1] slaveof $master_host $master_port
[lindex $slaves 2] slaveof $master_host $master_port