mirror of
https://github.com/fluencelabs/redis
synced 2025-06-22 13:31:32 +00:00
PSYNC2: different improvements to Redis replication.
The gist of the changes is that now, partial resynchronizations between slaves and masters (without the need of a full resync with RDB transfer and so forth), work in a number of cases when it was impossible in the past. For instance: 1. When a slave is promoted to mastrer, the slaves of the old master can partially resynchronize with the new master. 2. Chained slalves (slaves of slaves) can be moved to replicate to other slaves or the master itsef, without requiring a full resync. 3. The master itself, after being turned into a slave, is able to partially resynchronize with the new master, when it joins replication again. In order to obtain this, the following main changes were operated: * Slaves also take a replication backlog, not just masters. * Same stream replication for all the slaves and sub slaves. The replication stream is identical from the top level master to its slaves and is also the same from the slaves to their sub-slaves and so forth. This means that if a slave is later promoted to master, it has the same replication backlong, and can partially resynchronize with its slaves (that were previously slaves of the old master). * A given replication history is no longer identified by the `runid` of a Redis node. There is instead a `replication ID` which changes every time the instance has a new history no longer coherent with the past one. So, for example, slaves publish the same replication history of their master, however when they are turned into masters, they publish a new replication ID, but still remember the old ID, so that they are able to partially resynchronize with slaves of the old master (up to a given offset). * The replication protocol was slightly modified so that a new extended +CONTINUE reply from the master is able to inform the slave of a replication ID change. * REPLCONF CAPA is used in order to notify masters that a slave is able to understand the new +CONTINUE reply. * The RDB file was extended with an auxiliary field that is able to select a given DB after loading in the slave, so that the slave can continue receiving the replication stream from the point it was disconnected without requiring the master to insert "SELECT" statements. This is useful in order to guarantee the "same stream" property, because the slave must be able to accumulate an identical backlog. * Slave pings to sub-slaves are now sent in a special form, when the top-level master is disconnected, in order to don't interfer with the replication stream. We just use out of band "\n" bytes as in other parts of the Redis protocol. An old design document is available here: https://gist.github.com/antirez/ae068f95c0d084891305 However the implementation is not identical to the description because during the work to implement it, different changes were needed in order to make things working well.
This commit is contained in:
47
src/server.h
47
src/server.h
@ -293,7 +293,8 @@ typedef long long mstime_t; /* millisecond time type. */
|
||||
|
||||
/* Slave capabilities. */
|
||||
#define SLAVE_CAPA_NONE 0
|
||||
#define SLAVE_CAPA_EOF (1<<0) /* Can parse the RDB EOF streaming format. */
|
||||
#define SLAVE_CAPA_EOF (1<<0) /* Can parse the RDB EOF streaming format. */
|
||||
#define SLAVE_CAPA_PSYNC2 (1<<1) /* Supports PSYNC2 protocol. */
|
||||
|
||||
/* Synchronous read timeout - slave side */
|
||||
#define CONFIG_REPL_SYNCIO_TIMEOUT 5
|
||||
@ -679,8 +680,8 @@ typedef struct client {
|
||||
long long psync_initial_offset; /* FULLRESYNC reply offset other slaves
|
||||
copying this slave output buffer
|
||||
should use. */
|
||||
char replrunid[CONFIG_RUN_ID_SIZE+1]; /* Master run id if is a master. */
|
||||
int slave_listening_port; /* As configured with: REPLCONF listening-port */
|
||||
char replid[CONFIG_RUN_ID_SIZE+1]; /* Master replication ID (if master). */
|
||||
int slave_listening_port; /* As configured with: SLAVECONF listening-port */
|
||||
char slave_ip[NET_IP_STR_LEN]; /* Optionally given by REPLCONF ip-address */
|
||||
int slave_capa; /* Slave capabilities: SLAVE_CAPA_* bitwise OR. */
|
||||
multiState mstate; /* MULTI/EXEC state */
|
||||
@ -803,6 +804,20 @@ struct redisMemOverhead {
|
||||
} *db;
|
||||
};
|
||||
|
||||
/* This structure can be optionally passed to RDB save/load functions in
|
||||
* order to implement additional functionalities, by storing and loading
|
||||
* metadata to the RDB file.
|
||||
*
|
||||
* Currently the only use is to select a DB at load time, useful in
|
||||
* replication in order to make sure that chained slaves (slaves of slaves)
|
||||
* select the correct DB and are able to accept the stream coming from the
|
||||
* top-level master. */
|
||||
typedef struct rdbSaveInfo {
|
||||
int repl_stream_db; /* DB to select in server.master client. */
|
||||
} rdbSaveInfo;
|
||||
|
||||
#define RDB_SAVE_INFO_INIT {-1}
|
||||
|
||||
/*-----------------------------------------------------------------------------
|
||||
* Global server state
|
||||
*----------------------------------------------------------------------------*/
|
||||
@ -988,15 +1003,19 @@ struct redisServer {
|
||||
char *syslog_ident; /* Syslog ident */
|
||||
int syslog_facility; /* Syslog facility */
|
||||
/* Replication (master) */
|
||||
char replid[CONFIG_RUN_ID_SIZE+1]; /* My current replication ID. */
|
||||
char replid2[CONFIG_RUN_ID_SIZE+1]; /* replid inherited from master*/
|
||||
long long master_repl_offset; /* My current replication offset */
|
||||
long long second_replid_offset; /* Accept offsets up to this for replid2. */
|
||||
int slaveseldb; /* Last SELECTed DB in replication output */
|
||||
long long master_repl_offset; /* Global replication offset */
|
||||
int repl_ping_slave_period; /* Master pings the slave every N seconds */
|
||||
char *repl_backlog; /* Replication backlog for partial syncs */
|
||||
long long repl_backlog_size; /* Backlog circular buffer size */
|
||||
long long repl_backlog_histlen; /* Backlog actual data length */
|
||||
long long repl_backlog_idx; /* Backlog circular buffer current offset */
|
||||
long long repl_backlog_off; /* Replication offset of first byte in the
|
||||
backlog buffer. */
|
||||
long long repl_backlog_idx; /* Backlog circular buffer current offset,
|
||||
that is the next byte will'll write to.*/
|
||||
long long repl_backlog_off; /* Replication "master offset" of first
|
||||
byte in the replication backlog buffer.*/
|
||||
time_t repl_backlog_time_limit; /* Time without slaves after the backlog
|
||||
gets released. */
|
||||
time_t repl_no_slaves_since; /* We have no slaves since that time.
|
||||
@ -1029,8 +1048,11 @@ struct redisServer {
|
||||
int slave_priority; /* Reported in INFO and used by Sentinel. */
|
||||
int slave_announce_port; /* Give the master this listening port. */
|
||||
char *slave_announce_ip; /* Give the master this ip address. */
|
||||
char repl_master_runid[CONFIG_RUN_ID_SIZE+1]; /* Master run id for PSYNC.*/
|
||||
long long repl_master_initial_offset; /* Master PSYNC offset. */
|
||||
/* The following two fields is where we store master PSYNC replid/offset
|
||||
* while the PSYNC is in progress. At the end we'll copy the fields into
|
||||
* the server->master client structure. */
|
||||
char master_replid[CONFIG_RUN_ID_SIZE+1]; /* Master PSYNC runid. */
|
||||
long long master_initial_offset; /* Master PSYNC offset. */
|
||||
int repl_slave_lazy_flush; /* Lazy FLUSHALL before loading DB? */
|
||||
/* Replication script cache. */
|
||||
dict *repl_scriptcache_dict; /* SHA1 all slaves are aware of. */
|
||||
@ -1259,6 +1281,7 @@ void acceptHandler(aeEventLoop *el, int fd, void *privdata, int mask);
|
||||
void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask);
|
||||
void acceptUnixHandler(aeEventLoop *el, int fd, void *privdata, int mask);
|
||||
void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask);
|
||||
void addReplyString(client *c, const char *s, size_t len);
|
||||
void addReplyBulk(client *c, robj *obj);
|
||||
void addReplyBulkCString(client *c, const char *s);
|
||||
void addReplyBulkCBuffer(client *c, const void *p, size_t len);
|
||||
@ -1393,6 +1416,7 @@ ssize_t syncReadLine(int fd, char *ptr, ssize_t size, long long timeout);
|
||||
|
||||
/* Replication */
|
||||
void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc);
|
||||
void replicationFeedSlavesFromMasterStream(list *slaves, char *buf, size_t buflen);
|
||||
void replicationFeedMonitors(client *c, list *monitors, int dictid, robj **argv, int argc);
|
||||
void updateSlavesWaitingBgsave(int bgsaveerr, int type);
|
||||
void replicationCron(void);
|
||||
@ -1414,6 +1438,9 @@ long long replicationGetSlaveOffset(void);
|
||||
char *replicationGetSlaveName(client *c);
|
||||
long long getPsyncInitialOffset(void);
|
||||
int replicationSetupSlaveForFullResync(client *slave, long long offset);
|
||||
void changeReplicationId(void);
|
||||
void clearReplicationId2(void);
|
||||
void chopReplicationBacklog(void);
|
||||
|
||||
/* Generic persistence functions */
|
||||
void startLoading(FILE *fp);
|
||||
@ -1422,7 +1449,7 @@ void stopLoading(void);
|
||||
|
||||
/* RDB persistence */
|
||||
#include "rdb.h"
|
||||
int rdbSaveRio(rio *rdb, int *error, int flags);
|
||||
int rdbSaveRio(rio *rdb, int *error, int flags, rdbSaveInfo *rsi);
|
||||
|
||||
/* AOF persistence */
|
||||
void flushAppendOnlyFile(int force);
|
||||
|
Reference in New Issue
Block a user