WAIT AOF WIP #2. Feature mostly implemented.

This commit is contained in:
antirez 2018-02-28 12:25:59 +01:00
parent 913d600033
commit d69fc0d76a
6 changed files with 85 additions and 34 deletions

View File

@ -211,13 +211,18 @@ void aofStartBackgroundFsync(void) {
return; return;
} }
/* No fsync is in progress. If there was one, the new epoch, now that it /* Before starting a new fsync, we need to flush the AOF buffer to
* termianted, is stored in server.aof_fsync_in_progress_epoch. So * disk if there are clients blocked in WAIT AOF, otherwise we are
* update the current fsync epoch. */ * not going to sync their data. */
if (server.blocked_clients_by_type[BLOCKED_AOF])
flushAppendOnlyFile(0);
/* No fsync is in progress. If there was one, the new epoch is stored
* in server.aof_fsync_in_progress_epoch. So update the current fsync
* epoch with the one of the fsync in progress. */
server.aof_fsync_epoch = server.aof_fsync_in_progress_epoch; server.aof_fsync_epoch = server.aof_fsync_in_progress_epoch;
bioCreateBackgroundJob(BIO_AOF_FSYNC,(void*)(long)server.aof_fd,NULL,NULL); bioCreateBackgroundJob(BIO_AOF_FSYNC,(void*)(long)server.aof_fd,NULL,NULL);
server.aof_fsync_in_progress_epoch++; server.aof_fsync_in_progress_epoch++;
handleClientsBlockedForAOF(); /* Unblock clients if we can. */
} }
/* Returns an AOF epoch so that, when such epoch is reached by /* Returns an AOF epoch so that, when such epoch is reached by

View File

@ -139,6 +139,8 @@ void unblockClient(client *c) {
unblockClientWaitingData(c); unblockClientWaitingData(c);
} else if (c->btype == BLOCKED_WAIT) { } else if (c->btype == BLOCKED_WAIT) {
unblockClientWaitingReplicas(c); unblockClientWaitingReplicas(c);
} else if (c->btype == BLOCKED_AOF) {
unblockClientWaitingReplicas(c);
} else if (c->btype == BLOCKED_MODULE) { } else if (c->btype == BLOCKED_MODULE) {
unblockClientFromModule(c); unblockClientFromModule(c);
} else { } else {
@ -166,6 +168,8 @@ void replyToBlockedClientTimedOut(client *c) {
addReply(c,shared.nullmultibulk); addReply(c,shared.nullmultibulk);
} else if (c->btype == BLOCKED_WAIT) { } else if (c->btype == BLOCKED_WAIT) {
addReplyLongLong(c,replicationCountAcksByOffset(c->bpop.reploffset)); addReplyLongLong(c,replicationCountAcksByOffset(c->bpop.reploffset));
} else if (c->btype == BLOCKED_AOF) {
addReply(c,shared.czero);
} else if (c->btype == BLOCKED_MODULE) { } else if (c->btype == BLOCKED_MODULE) {
moduleBlockedClientTimedOut(c); moduleBlockedClientTimedOut(c);
} else { } else {

View File

@ -138,6 +138,7 @@ client *createClient(int fd) {
c->bpop.target = NULL; c->bpop.target = NULL;
c->bpop.xread_group = NULL; c->bpop.xread_group = NULL;
c->bpop.numreplicas = 0; c->bpop.numreplicas = 0;
c->bpop.aofepoch = 0;
c->bpop.reploffset = 0; c->bpop.reploffset = 0;
c->woff = 0; c->woff = 0;
c->watched_keys = listCreate(); c->watched_keys = listCreate();

View File

@ -2394,15 +2394,22 @@ void waitCommand(client *c) {
mstime_t timeout; mstime_t timeout;
long numreplicas, ackreplicas; long numreplicas, ackreplicas;
long long offset = c->woff; long long offset = c->woff;
int waitaof = 0; /* True if the user requested to wait for AOF sync. */
if (server.masterhost) { if (server.masterhost) {
addReplyError(c,"WAIT cannot be used with slave instances. Please also note that since Redis 4.0 if a slave is configured to be writable (which is not the default) writes to slaves are just local and are not propagated."); addReplyError(c,"WAIT cannot be used with slave instances. Please also note that since Redis 4.0 if a slave is configured to be writable (which is not the default) writes to slaves are just local and are not propagated.");
return; return;
} }
/* Argument parsing. */ /* AOF or number of replicas argument parsing. */
if (getLongFromObjectOrReply(c,c->argv[1],&numreplicas,NULL) != C_OK) if (!strcasecmp(c->argv[1]->ptr,"AOF")) {
return; waitaof = 1;
} else {
if (getLongFromObjectOrReply(c,c->argv[1],&numreplicas,NULL) != C_OK)
return;
}
/* Timeout parsing. */
if (getTimeoutFromObjectOrReply(c,c->argv[2],&timeout,UNIT_MILLISECONDS) if (getTimeoutFromObjectOrReply(c,c->argv[2],&timeout,UNIT_MILLISECONDS)
!= C_OK) return; != C_OK) return;
@ -2415,11 +2422,17 @@ void waitCommand(client *c) {
/* Otherwise block the client and put it into our list of clients /* Otherwise block the client and put it into our list of clients
* waiting for ack from slaves. */ * waiting for ack from slaves. */
c->bpop.timeout = timeout; if (waitaof) {
c->bpop.reploffset = offset; c->bpop.aofepoch = aofNextEpoch();
c->bpop.numreplicas = numreplicas; listAddNodeTail(server.clients_waiting_acks,c);
listAddNodeTail(server.clients_waiting_acks,c); blockClient(c,BLOCKED_AOF);
blockClient(c,BLOCKED_WAIT); } else {
c->bpop.timeout = timeout;
c->bpop.reploffset = offset;
c->bpop.numreplicas = numreplicas;
listAddNodeTail(server.clients_waiting_acks,c);
blockClient(c,BLOCKED_WAIT);
}
/* Make sure that the server will send an ACK request to all the slaves /* Make sure that the server will send an ACK request to all the slaves
* before returning to the event loop. */ * before returning to the event loop. */
@ -2438,7 +2451,7 @@ void unblockClientWaitingReplicas(client *c) {
/* Check if there are clients blocked in WAIT that can be unblocked since /* Check if there are clients blocked in WAIT that can be unblocked since
* we received enough ACKs from slaves. */ * we received enough ACKs from slaves. */
void processClientsWaitingReplicas(void) { void processClientsBlockedInWait(void) {
long long last_offset = 0; long long last_offset = 0;
int last_numreplicas = 0; int last_numreplicas = 0;
@ -2449,26 +2462,44 @@ void processClientsWaitingReplicas(void) {
while((ln = listNext(&li))) { while((ln = listNext(&li))) {
client *c = ln->value; client *c = ln->value;
/* Every time we find a client that is satisfied for a given if (c->btype == BLOCKED_AOF) {
* offset and number of replicas, we remember it so the next client /* Handle WAIT AOF. */
* may be unblocked without calling replicationCountAcksByOffset() if (server.aof_fsync == AOF_FSYNC_ALWAYS ||
* if the requested offset / replicas were equal or less. */ c->bpop.aofepoch <= server.aof_fsync_epoch)
if (last_offset && last_offset > c->bpop.reploffset && {
last_numreplicas > c->bpop.numreplicas)
{
unblockClient(c);
addReplyLongLong(c,last_numreplicas);
} else {
int numreplicas = replicationCountAcksByOffset(c->bpop.reploffset);
if (numreplicas >= c->bpop.numreplicas) {
last_offset = c->bpop.reploffset;
last_numreplicas = numreplicas;
unblockClient(c); unblockClient(c);
addReplyLongLong(c,numreplicas); addReply(c,shared.cone);
}
} else {
/* Handle WAIT for slaves to ACK.
*
* Every time we find a client that is satisfied for a given
* offset and number of replicas, we remember it so the next client
* may be unblocked without calling replicationCountAcksByOffset()
* if the requested offset / replicas were equal or less. */
if (last_offset && last_offset > c->bpop.reploffset &&
last_numreplicas > c->bpop.numreplicas)
{
unblockClient(c);
addReplyLongLong(c,last_numreplicas);
} else {
int numreplicas =
replicationCountAcksByOffset(c->bpop.reploffset);
if (numreplicas >= c->bpop.numreplicas) {
last_offset = c->bpop.reploffset;
last_numreplicas = numreplicas;
unblockClient(c);
addReplyLongLong(c,numreplicas);
}
} }
} }
} }
/* If after this cycle we have still clients blocked, try to start
* a new AOF fsync. If one is already in progress nothig will happen. */
if (server.blocked_clients_by_type[BLOCKED_AOF])
aofStartBackgroundFsync();
} }
/* Return the slave replication offset for this instance, that is /* Return the slave replication offset for this instance, that is

View File

@ -1229,9 +1229,9 @@ void beforeSleep(struct aeEventLoop *eventLoop) {
} }
/* Unblock all the clients blocked for synchronous replication /* Unblock all the clients blocked for synchronous replication
* in WAIT. */ * or AOF sync in WAIT. */
if (listLength(server.clients_waiting_acks)) if (listLength(server.clients_waiting_acks))
processClientsWaitingReplicas(); processClientsBlockedInWait();
/* Check if there are clients unblocked by modules that implement /* Check if there are clients unblocked by modules that implement
* blocking commands. */ * blocking commands. */
@ -1418,6 +1418,8 @@ void initServerConfig(void) {
server.aof_rewrite_incremental_fsync = CONFIG_DEFAULT_AOF_REWRITE_INCREMENTAL_FSYNC; server.aof_rewrite_incremental_fsync = CONFIG_DEFAULT_AOF_REWRITE_INCREMENTAL_FSYNC;
server.aof_load_truncated = CONFIG_DEFAULT_AOF_LOAD_TRUNCATED; server.aof_load_truncated = CONFIG_DEFAULT_AOF_LOAD_TRUNCATED;
server.aof_use_rdb_preamble = CONFIG_DEFAULT_AOF_USE_RDB_PREAMBLE; server.aof_use_rdb_preamble = CONFIG_DEFAULT_AOF_USE_RDB_PREAMBLE;
server.aof_fsync_in_progress_epoch = 0;
server.aof_fsync_epoch = 0;
server.pidfile = NULL; server.pidfile = NULL;
server.rdb_filename = zstrdup(CONFIG_DEFAULT_RDB_FILENAME); server.rdb_filename = zstrdup(CONFIG_DEFAULT_RDB_FILENAME);
server.aof_filename = zstrdup(CONFIG_DEFAULT_AOF_FILENAME); server.aof_filename = zstrdup(CONFIG_DEFAULT_AOF_FILENAME);

View File

@ -258,7 +258,8 @@ typedef long long mstime_t; /* millisecond time type. */
#define BLOCKED_WAIT 2 /* WAIT for synchronous replication. */ #define BLOCKED_WAIT 2 /* WAIT for synchronous replication. */
#define BLOCKED_MODULE 3 /* Blocked by a loadable module. */ #define BLOCKED_MODULE 3 /* Blocked by a loadable module. */
#define BLOCKED_STREAM 4 /* XREAD. */ #define BLOCKED_STREAM 4 /* XREAD. */
#define BLOCKED_NUM 5 /* Number of blocked states. */ #define BLOCKED_AOF 5 /* WAIT AOF waiting for AOF sync. */
#define BLOCKED_NUM 6 /* Number of blocked states. */
/* Client request types */ /* Client request types */
#define PROTO_REQ_INLINE 1 #define PROTO_REQ_INLINE 1
@ -660,6 +661,9 @@ typedef struct blockingState {
int numreplicas; /* Number of replicas we are waiting for ACK. */ int numreplicas; /* Number of replicas we are waiting for ACK. */
long long reploffset; /* Replication offset to reach. */ long long reploffset; /* Replication offset to reach. */
/* BLOCKED_AOF */
uint64_t aofepoch; /* WAIT AOF: the sync epoch we are waiting for. */
/* BLOCKED_MODULE */ /* BLOCKED_MODULE */
void *module_blocked_handle; /* RedisModuleBlockedClient structure. void *module_blocked_handle; /* RedisModuleBlockedClient structure.
which is opaque for the Redis core, only which is opaque for the Redis core, only
@ -1016,7 +1020,7 @@ struct redisServer {
int aof_fd; /* File descriptor of currently selected AOF file */ int aof_fd; /* File descriptor of currently selected AOF file */
int aof_selected_db; /* Currently selected DB in AOF */ int aof_selected_db; /* Currently selected DB in AOF */
time_t aof_flush_postponed_start; /* UNIX time of postponed AOF flush */ time_t aof_flush_postponed_start; /* UNIX time of postponed AOF flush */
time_t aof_last_fsync; /* UNIX time of last fsync() */ time_t aof_last_fsync; /* UNIX time of last fsync() *attempt* */
time_t aof_rewrite_time_last; /* Time used by last AOF rewrite run. */ time_t aof_rewrite_time_last; /* Time used by last AOF rewrite run. */
time_t aof_rewrite_time_start; /* Current AOF rewrite start time. */ time_t aof_rewrite_time_start; /* Current AOF rewrite start time. */
int aof_lastbgrewrite_status; /* C_OK or C_ERR */ int aof_lastbgrewrite_status; /* C_OK or C_ERR */
@ -1026,6 +1030,8 @@ struct redisServer {
int aof_last_write_errno; /* Valid if aof_last_write_status is ERR */ int aof_last_write_errno; /* Valid if aof_last_write_status is ERR */
int aof_load_truncated; /* Don't stop on unexpected AOF EOF. */ int aof_load_truncated; /* Don't stop on unexpected AOF EOF. */
int aof_use_rdb_preamble; /* Use RDB preamble on AOF rewrites. */ int aof_use_rdb_preamble; /* Use RDB preamble on AOF rewrites. */
uint64_t aof_fsync_epoch; /* AOF sync epoch used for WAIT AOF. */
uint64_t aof_fsync_in_progress_epoch; /* Current ongoing AOF sync epoch. */
/* AOF pipes used to communicate between parent and child during rewrite. */ /* AOF pipes used to communicate between parent and child during rewrite. */
int aof_pipe_write_data_to_child; int aof_pipe_write_data_to_child;
int aof_pipe_read_data_from_parent; int aof_pipe_read_data_from_parent;
@ -1513,7 +1519,7 @@ void replicationScriptCacheInit(void);
void replicationScriptCacheFlush(void); void replicationScriptCacheFlush(void);
void replicationScriptCacheAdd(sds sha1); void replicationScriptCacheAdd(sds sha1);
int replicationScriptCacheExists(sds sha1); int replicationScriptCacheExists(sds sha1);
void processClientsWaitingReplicas(void); void processClientsBlockedInWait(void);
void unblockClientWaitingReplicas(client *c); void unblockClientWaitingReplicas(client *c);
int replicationCountAcksByOffset(long long offset); int replicationCountAcksByOffset(long long offset);
void replicationSendNewlineToMaster(void); void replicationSendNewlineToMaster(void);
@ -1548,6 +1554,8 @@ void backgroundRewriteDoneHandler(int exitcode, int bysignal);
void aofRewriteBufferReset(void); void aofRewriteBufferReset(void);
unsigned long aofRewriteBufferSize(void); unsigned long aofRewriteBufferSize(void);
ssize_t aofReadDiffFromParent(void); ssize_t aofReadDiffFromParent(void);
void aofStartBackgroundFsync(void);
uint64_t aofNextEpoch(void);
/* Child info */ /* Child info */
void openChildInfoPipe(void); void openChildInfoPipe(void);