Fix master timeout during handshake

This change allows a slave to properly time out a dead master during
the extended asynchronous synchronization state machine.  Now, slaves
will record their last interaction with the master and apply the
replication timeout before a response to the PSYNC request is received.
This commit is contained in:
Kevin McGehee 2015-10-14 12:03:47 -07:00 committed by antirez
parent 30978004b3
commit dc03e4c51b

View File

@ -41,6 +41,7 @@ void replicationDiscardCachedMaster(void);
void replicationResurrectCachedMaster(int newfd); void replicationResurrectCachedMaster(int newfd);
void replicationSendAck(void); void replicationSendAck(void);
void putSlaveOnline(redisClient *slave); void putSlaveOnline(redisClient *slave);
int serverInHandshakeState(int repl_state);
/* --------------------------- Utility functions ---------------------------- */ /* --------------------------- Utility functions ---------------------------- */
@ -1196,6 +1197,7 @@ char *sendSynchronousCommand(int flags, int fd, ...) {
return sdscatprintf(sdsempty(),"-Reading from master: %s", return sdscatprintf(sdsempty(),"-Reading from master: %s",
strerror(errno)); strerror(errno));
} }
server.repl_transfer_lastio = server.unixtime;
return sdsnew(buf); return sdsnew(buf);
} }
return NULL; return NULL;
@ -1626,7 +1628,7 @@ void undoConnectWithMaster(void) {
int fd = server.repl_transfer_s; int fd = server.repl_transfer_s;
redisAssert(server.repl_state == REDIS_REPL_CONNECTING || redisAssert(server.repl_state == REDIS_REPL_CONNECTING ||
server.repl_state == REDIS_REPL_RECEIVE_PONG); serverInHandshakeState(server.repl_state));
aeDeleteFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE); aeDeleteFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE);
close(fd); close(fd);
server.repl_transfer_s = -1; server.repl_transfer_s = -1;
@ -1645,7 +1647,7 @@ int cancelReplicationHandshake(void) {
if (server.repl_state == REDIS_REPL_TRANSFER) { if (server.repl_state == REDIS_REPL_TRANSFER) {
replicationAbortSyncTransfer(); replicationAbortSyncTransfer();
} else if (server.repl_state == REDIS_REPL_CONNECTING || } else if (server.repl_state == REDIS_REPL_CONNECTING ||
server.repl_state == REDIS_REPL_RECEIVE_PONG) serverInHandshakeState(server.repl_state))
{ {
undoConnectWithMaster(); undoConnectWithMaster();
} else { } else {
@ -1802,6 +1804,20 @@ void roleCommand(redisClient *c) {
} }
} }
/* Returns 1 if the given replication state is a handshake state,
* 0 otherwise. */
int serverInHandshakeState(int repl_state) {
return repl_state == REDIS_REPL_RECEIVE_PONG ||
repl_state == REDIS_REPL_SEND_AUTH ||
repl_state == REDIS_REPL_RECEIVE_AUTH ||
repl_state == REDIS_REPL_SEND_PORT ||
repl_state == REDIS_REPL_RECEIVE_PORT ||
repl_state == REDIS_REPL_SEND_CAPA ||
repl_state == REDIS_REPL_RECEIVE_CAPA ||
repl_state == REDIS_REPL_SEND_PSYNC ||
repl_state == REDIS_REPL_RECEIVE_PSYNC;
}
/* Send a REPLCONF ACK command to the master to inform it about the current /* Send a REPLCONF ACK command to the master to inform it about the current
* processed offset. If we are not connected with a master, the command has * processed offset. If we are not connected with a master, the command has
* no effects. */ * no effects. */
@ -2186,7 +2202,7 @@ void replicationCron(void) {
/* Non blocking connection timeout? */ /* Non blocking connection timeout? */
if (server.masterhost && if (server.masterhost &&
(server.repl_state == REDIS_REPL_CONNECTING || (server.repl_state == REDIS_REPL_CONNECTING ||
server.repl_state == REDIS_REPL_RECEIVE_PONG) && serverInHandshakeState(server.repl_state)) &&
(time(NULL)-server.repl_transfer_lastio) > server.repl_timeout) (time(NULL)-server.repl_transfer_lastio) > server.repl_timeout)
{ {
redisLog(REDIS_WARNING,"Timeout connecting to the MASTER..."); redisLog(REDIS_WARNING,"Timeout connecting to the MASTER...");