mirror of
https://github.com/fluencelabs/redis
synced 2025-04-29 20:42:13 +00:00
min-slaves-to-write: don't accept writes with less than N replicas.
This feature allows the user to specify the minimum number of connected replicas having a lag less or equal than the specified amount of seconds for writes to be accepted.
This commit is contained in:
parent
fc2587ec6e
commit
d0d67f8d42
@ -281,7 +281,13 @@ slave-priority 100
|
|||||||
#
|
#
|
||||||
# For example to require at least 3 slaves with a lag <= 10 seconds use:
|
# For example to require at least 3 slaves with a lag <= 10 seconds use:
|
||||||
#
|
#
|
||||||
# repl-min-slaves-to-write 3 10
|
# min-slaves-to-write 3
|
||||||
|
# min-slaves-max-lag 10
|
||||||
|
#
|
||||||
|
# Setting one or the other to 0 disables the feature.
|
||||||
|
#
|
||||||
|
# By default min-slaves-to-write is set to 0 (feature disabled) and
|
||||||
|
# min-slaves-max-lag is set to 10.
|
||||||
|
|
||||||
################################## SECURITY ###################################
|
################################## SECURITY ###################################
|
||||||
|
|
||||||
|
20
src/config.c
20
src/config.c
@ -428,6 +428,16 @@ void loadServerConfigFromString(char *config) {
|
|||||||
}
|
}
|
||||||
} else if (!strcasecmp(argv[0],"slave-priority") && argc == 2) {
|
} else if (!strcasecmp(argv[0],"slave-priority") && argc == 2) {
|
||||||
server.slave_priority = atoi(argv[1]);
|
server.slave_priority = atoi(argv[1]);
|
||||||
|
} else if (!strcasecmp(argv[0],"min-slaves-to-write") && argc == 2) {
|
||||||
|
server.repl_min_slaves_to_write = atoi(argv[1]);
|
||||||
|
if (server.repl_min_slaves_to_write < 0) {
|
||||||
|
err = "Invalid value for min-slaves-to-write."; goto loaderr;
|
||||||
|
}
|
||||||
|
} else if (!strcasecmp(argv[0],"min-slaves-max-lag") && argc == 2) {
|
||||||
|
server.repl_min_slaves_max_lag = atoi(argv[1]);
|
||||||
|
if (server.repl_min_slaves_max_lag < 0) {
|
||||||
|
err = "Invalid value for min-slaves-max-lag."; goto loaderr;
|
||||||
|
}
|
||||||
} else if (!strcasecmp(argv[0],"notify-keyspace-events") && argc == 2) {
|
} else if (!strcasecmp(argv[0],"notify-keyspace-events") && argc == 2) {
|
||||||
int flags = keyspaceEventsStringToFlags(argv[1]);
|
int flags = keyspaceEventsStringToFlags(argv[1]);
|
||||||
|
|
||||||
@ -783,6 +793,14 @@ void configSetCommand(redisClient *c) {
|
|||||||
if (getLongLongFromObject(o,&ll) == REDIS_ERR ||
|
if (getLongLongFromObject(o,&ll) == REDIS_ERR ||
|
||||||
ll <= 0) goto badfmt;
|
ll <= 0) goto badfmt;
|
||||||
server.slave_priority = ll;
|
server.slave_priority = ll;
|
||||||
|
} else if (!strcasecmp(c->argv[2]->ptr,"min-slaves-to-write")) {
|
||||||
|
if (getLongLongFromObject(o,&ll) == REDIS_ERR ||
|
||||||
|
ll < 0) goto badfmt;
|
||||||
|
server.repl_min_slaves_to_write = ll;
|
||||||
|
} else if (!strcasecmp(c->argv[2]->ptr,"min-slaves-max-lag")) {
|
||||||
|
if (getLongLongFromObject(o,&ll) == REDIS_ERR ||
|
||||||
|
ll < 0) goto badfmt;
|
||||||
|
server.repl_min_slaves_max_lag = ll;
|
||||||
} else {
|
} else {
|
||||||
addReplyErrorFormat(c,"Unsupported CONFIG parameter: %s",
|
addReplyErrorFormat(c,"Unsupported CONFIG parameter: %s",
|
||||||
(char*)c->argv[2]->ptr);
|
(char*)c->argv[2]->ptr);
|
||||||
@ -880,6 +898,8 @@ void configGetCommand(redisClient *c) {
|
|||||||
config_get_numerical_field("maxclients",server.maxclients);
|
config_get_numerical_field("maxclients",server.maxclients);
|
||||||
config_get_numerical_field("watchdog-period",server.watchdog_period);
|
config_get_numerical_field("watchdog-period",server.watchdog_period);
|
||||||
config_get_numerical_field("slave-priority",server.slave_priority);
|
config_get_numerical_field("slave-priority",server.slave_priority);
|
||||||
|
config_get_numerical_field("min-slaves-to-write",server.repl_min_slaves_to_write);
|
||||||
|
config_get_numerical_field("min-slaves-max-lag",server.repl_min_slaves_max_lag);
|
||||||
config_get_numerical_field("hz",server.hz);
|
config_get_numerical_field("hz",server.hz);
|
||||||
|
|
||||||
/* Bool (yes/no) values */
|
/* Bool (yes/no) values */
|
||||||
|
@ -692,6 +692,7 @@ void freeClient(redisClient *c) {
|
|||||||
* backlog. */
|
* backlog. */
|
||||||
if (c->flags & REDIS_SLAVE && listLength(server.slaves) == 0)
|
if (c->flags & REDIS_SLAVE && listLength(server.slaves) == 0)
|
||||||
server.repl_no_slaves_since = server.unixtime;
|
server.repl_no_slaves_since = server.unixtime;
|
||||||
|
refreshGoodSlavesCount();
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Case 2: we lost the connection with the master. */
|
/* Case 2: we lost the connection with the master. */
|
||||||
|
15
src/redis.c
15
src/redis.c
@ -1125,6 +1125,8 @@ void createSharedObjects(void) {
|
|||||||
"-OOM command not allowed when used memory > 'maxmemory'.\r\n"));
|
"-OOM command not allowed when used memory > 'maxmemory'.\r\n"));
|
||||||
shared.execaborterr = createObject(REDIS_STRING,sdsnew(
|
shared.execaborterr = createObject(REDIS_STRING,sdsnew(
|
||||||
"-EXECABORT Transaction discarded because of previous errors.\r\n"));
|
"-EXECABORT Transaction discarded because of previous errors.\r\n"));
|
||||||
|
shared.noreplicaserr = createObject(REDIS_STRING,sdsnew(
|
||||||
|
"-NOREPLICAS Not enough good slaves to write.\r\n"));
|
||||||
shared.space = createObject(REDIS_STRING,sdsnew(" "));
|
shared.space = createObject(REDIS_STRING,sdsnew(" "));
|
||||||
shared.colon = createObject(REDIS_STRING,sdsnew(":"));
|
shared.colon = createObject(REDIS_STRING,sdsnew(":"));
|
||||||
shared.plus = createObject(REDIS_STRING,sdsnew("+"));
|
shared.plus = createObject(REDIS_STRING,sdsnew("+"));
|
||||||
@ -1228,6 +1230,8 @@ void initServerConfig() {
|
|||||||
server.shutdown_asap = 0;
|
server.shutdown_asap = 0;
|
||||||
server.repl_ping_slave_period = REDIS_REPL_PING_SLAVE_PERIOD;
|
server.repl_ping_slave_period = REDIS_REPL_PING_SLAVE_PERIOD;
|
||||||
server.repl_timeout = REDIS_REPL_TIMEOUT;
|
server.repl_timeout = REDIS_REPL_TIMEOUT;
|
||||||
|
server.repl_min_slaves_to_write = REDIS_DEFAULT_MIN_SLAVES_TO_WRITE;
|
||||||
|
server.repl_min_slaves_max_lag = REDIS_DEFAULT_MIN_SLAVES_MAX_LAG;
|
||||||
server.lua_caller = NULL;
|
server.lua_caller = NULL;
|
||||||
server.lua_time_limit = REDIS_LUA_TIME_LIMIT;
|
server.lua_time_limit = REDIS_LUA_TIME_LIMIT;
|
||||||
server.lua_client = NULL;
|
server.lua_client = NULL;
|
||||||
@ -1429,6 +1433,7 @@ void initServer() {
|
|||||||
server.ops_sec_last_sample_ops = 0;
|
server.ops_sec_last_sample_ops = 0;
|
||||||
server.unixtime = time(NULL);
|
server.unixtime = time(NULL);
|
||||||
server.lastbgsave_status = REDIS_OK;
|
server.lastbgsave_status = REDIS_OK;
|
||||||
|
server.repl_good_slaves_count = 0;
|
||||||
if(aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) {
|
if(aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) {
|
||||||
redisPanic("Can't create the serverCron time event.");
|
redisPanic("Can't create the serverCron time event.");
|
||||||
exit(1);
|
exit(1);
|
||||||
@ -1733,6 +1738,16 @@ int processCommand(redisClient *c) {
|
|||||||
return REDIS_OK;
|
return REDIS_OK;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* Don't accept write commands if there are not enough good slaves and
|
||||||
|
* used configured the min-slaves-to-write option. */
|
||||||
|
if (server.repl_min_slaves_to_write && server.repl_min_slaves_max_lag &&
|
||||||
|
server.repl_good_slaves_count < server.repl_min_slaves_to_write)
|
||||||
|
{
|
||||||
|
flagTransaction(c);
|
||||||
|
addReply(c, shared.noreplicaserr);
|
||||||
|
return REDIS_OK;
|
||||||
|
}
|
||||||
|
|
||||||
/* Don't accept write commands if this is a read only slave. But
|
/* Don't accept write commands if this is a read only slave. But
|
||||||
* accept write commands if this is our master. */
|
* accept write commands if this is our master. */
|
||||||
if (server.masterhost && server.repl_slave_ro &&
|
if (server.masterhost && server.repl_slave_ro &&
|
||||||
|
@ -118,6 +118,8 @@
|
|||||||
#define REDIS_DEFAULT_AOF_NO_FSYNC_ON_REWRITE 0
|
#define REDIS_DEFAULT_AOF_NO_FSYNC_ON_REWRITE 0
|
||||||
#define REDIS_DEFAULT_ACTIVE_REHASHING 1
|
#define REDIS_DEFAULT_ACTIVE_REHASHING 1
|
||||||
#define REDIS_DEFAULT_AOF_REWRITE_INCREMENTAL_FSYNC 1
|
#define REDIS_DEFAULT_AOF_REWRITE_INCREMENTAL_FSYNC 1
|
||||||
|
#define REDIS_DEFAULT_MIN_SLAVES_TO_WRITE 0
|
||||||
|
#define REDIS_DEFAULT_MIN_SLAVES_MAX_LAG 10
|
||||||
|
|
||||||
/* Protocol and I/O related defines */
|
/* Protocol and I/O related defines */
|
||||||
#define REDIS_MAX_QUERYBUF_LEN (1024*1024*1024) /* 1GB max query buffer. */
|
#define REDIS_MAX_QUERYBUF_LEN (1024*1024*1024) /* 1GB max query buffer. */
|
||||||
@ -479,7 +481,7 @@ struct sharedObjectsStruct {
|
|||||||
*colon, *nullbulk, *nullmultibulk, *queued,
|
*colon, *nullbulk, *nullmultibulk, *queued,
|
||||||
*emptymultibulk, *wrongtypeerr, *nokeyerr, *syntaxerr, *sameobjecterr,
|
*emptymultibulk, *wrongtypeerr, *nokeyerr, *syntaxerr, *sameobjecterr,
|
||||||
*outofrangeerr, *noscripterr, *loadingerr, *slowscripterr, *bgsaveerr,
|
*outofrangeerr, *noscripterr, *loadingerr, *slowscripterr, *bgsaveerr,
|
||||||
*masterdownerr, *roslaveerr, *execaborterr, *noautherr,
|
*masterdownerr, *roslaveerr, *execaborterr, *noautherr, *noreplicaserr,
|
||||||
*oomerr, *plus, *messagebulk, *pmessagebulk, *subscribebulk,
|
*oomerr, *plus, *messagebulk, *pmessagebulk, *subscribebulk,
|
||||||
*unsubscribebulk, *psubscribebulk, *punsubscribebulk, *del, *rpop, *lpop,
|
*unsubscribebulk, *psubscribebulk, *punsubscribebulk, *del, *rpop, *lpop,
|
||||||
*lpush,
|
*lpush,
|
||||||
@ -676,6 +678,9 @@ struct redisServer {
|
|||||||
gets released. */
|
gets released. */
|
||||||
time_t repl_no_slaves_since; /* We have no slaves since that time.
|
time_t repl_no_slaves_since; /* We have no slaves since that time.
|
||||||
Only valid if server.slaves len is 0. */
|
Only valid if server.slaves len is 0. */
|
||||||
|
int repl_min_slaves_to_write; /* Min number of slaves to write. */
|
||||||
|
int repl_min_slaves_max_lag; /* Max lag of <count> slaves to write. */
|
||||||
|
int repl_good_slaves_count; /* Number of slaves with lag <= max_lag. */
|
||||||
/* Replication (slave) */
|
/* Replication (slave) */
|
||||||
char *masterauth; /* AUTH with this password with master */
|
char *masterauth; /* AUTH with this password with master */
|
||||||
char *masterhost; /* Hostname of master */
|
char *masterhost; /* Hostname of master */
|
||||||
@ -986,6 +991,7 @@ void replicationCron(void);
|
|||||||
void replicationHandleMasterDisconnection(void);
|
void replicationHandleMasterDisconnection(void);
|
||||||
void replicationCacheMaster(redisClient *c);
|
void replicationCacheMaster(redisClient *c);
|
||||||
void resizeReplicationBacklog(long long newsize);
|
void resizeReplicationBacklog(long long newsize);
|
||||||
|
void refreshGoodSlavesCount(void);
|
||||||
|
|
||||||
/* Generic persistence functions */
|
/* Generic persistence functions */
|
||||||
void startLoading(FILE *fp);
|
void startLoading(FILE *fp);
|
||||||
|
@ -1386,6 +1386,30 @@ void replicationResurrectCachedMaster(int newfd) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* ------------------------- MIN-SLAVES-TO-WRITE --------------------------- */
|
||||||
|
|
||||||
|
/* This function counts the number of slaves with lag <= min-slaves-max-lag.
|
||||||
|
* If the option is active, the server will prevent writes if there are not
|
||||||
|
* enough connected slaves with the specified lag (or less). */
|
||||||
|
void refreshGoodSlavesCount(void) {
|
||||||
|
listIter li;
|
||||||
|
listNode *ln;
|
||||||
|
int good = 0;
|
||||||
|
|
||||||
|
if (!server.repl_min_slaves_to_write ||
|
||||||
|
!server.repl_min_slaves_max_lag) return;
|
||||||
|
|
||||||
|
listRewind(server.slaves,&li);
|
||||||
|
while((ln = listNext(&li))) {
|
||||||
|
redisClient *slave = ln->value;
|
||||||
|
time_t lag = server.unixtime - slave->repl_ack_time;
|
||||||
|
|
||||||
|
if (slave->replstate == REDIS_REPL_ONLINE &&
|
||||||
|
lag <= server.repl_min_slaves_max_lag) good++;
|
||||||
|
}
|
||||||
|
server.repl_good_slaves_count = good;
|
||||||
|
}
|
||||||
|
|
||||||
/* --------------------------- REPLICATION CRON ---------------------------- */
|
/* --------------------------- REPLICATION CRON ---------------------------- */
|
||||||
|
|
||||||
/* Replication cron funciton, called 1 time per second. */
|
/* Replication cron funciton, called 1 time per second. */
|
||||||
@ -1499,4 +1523,7 @@ void replicationCron(void) {
|
|||||||
(int) server.repl_backlog_time_limit);
|
(int) server.repl_backlog_time_limit);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* Refresh the number of slaves with lag <= min-slaves-max-lag. */
|
||||||
|
refreshGoodSlavesCount();
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user