WAIT AOF WIP #1.

This commit is contained in:
antirez
2018-02-27 19:17:02 +01:00
parent 92696e49d2
commit 913d600033

View File

@ -197,10 +197,35 @@ ssize_t aofRewriteBufferWrite(int fd) {
* AOF file implementation * AOF file implementation
* ------------------------------------------------------------------------- */ * ------------------------------------------------------------------------- */
/* Return true if an AOf fsync is currently already in progress in a
* BIO thread. */
int aofFsyncInProgress(void) {
return bioPendingJobsOfType(BIO_AOF_FSYNC) != 0;
}
/* Starts a background task that performs fsync() against the specified /* Starts a background task that performs fsync() against the specified
* file descriptor (the one of the AOF file) in another thread. */ * file descriptor (the one of the AOF file) in another thread. */
void aof_background_fsync(int fd) { void aofStartBackgroundFsync(void) {
bioCreateBackgroundJob(BIO_AOF_FSYNC,(void*)(long)fd,NULL,NULL); if (aofFsyncInProgress()) {
/* We never want to start another fsync if one is in progress. */
return;
}
/* No fsync is in progress. If there was one, the new epoch, now that it
* termianted, is stored in server.aof_fsync_in_progress_epoch. So
* update the current fsync epoch. */
server.aof_fsync_epoch = server.aof_fsync_in_progress_epoch;
bioCreateBackgroundJob(BIO_AOF_FSYNC,(void*)(long)server.aof_fd,NULL,NULL);
server.aof_fsync_in_progress_epoch++;
handleClientsBlockedForAOF(); /* Unblock clients if we can. */
}
/* Returns an AOF epoch so that, when such epoch is reached by
* server.aof_fsync_epoch, it means that everything that was written in the
* AOF up to the moment this function returned the epoch, is now flushed
* on disk. */
uint64_t aofNextEpoch(void) {
return server.aof_fsync_in_progress_epoch + 1;
} }
/* Kills an AOFRW child process if exists */ /* Kills an AOFRW child process if exists */
@ -336,7 +361,7 @@ void flushAppendOnlyFile(int force) {
if (sdslen(server.aof_buf) == 0) return; if (sdslen(server.aof_buf) == 0) return;
if (server.aof_fsync == AOF_FSYNC_EVERYSEC) if (server.aof_fsync == AOF_FSYNC_EVERYSEC)
sync_in_progress = bioPendingJobsOfType(BIO_AOF_FSYNC) != 0; sync_in_progress = aofFsyncInProgress();
if (server.aof_fsync == AOF_FSYNC_EVERYSEC && !force) { if (server.aof_fsync == AOF_FSYNC_EVERYSEC && !force) {
/* With this append fsync policy we do background fsyncing. /* With this append fsync policy we do background fsyncing.
@ -485,7 +510,11 @@ void flushAppendOnlyFile(int force) {
server.aof_last_fsync = server.unixtime; server.aof_last_fsync = server.unixtime;
} else if ((server.aof_fsync == AOF_FSYNC_EVERYSEC && } else if ((server.aof_fsync == AOF_FSYNC_EVERYSEC &&
server.unixtime > server.aof_last_fsync)) { server.unixtime > server.aof_last_fsync)) {
if (!sync_in_progress) aof_background_fsync(server.aof_fd); aofStartBackgroundFsync();
/* Note that we update this time regardless of the fact fsync
* was actually started or nmot. server.aof_last_fsync is not
* used to really know when the latest fsync succeeded, but just
* as a timer to *try* to fsync once every second. */
server.aof_last_fsync = server.unixtime; server.aof_last_fsync = server.unixtime;
} }
} }
@ -1608,10 +1637,11 @@ void backgroundRewriteDoneHandler(int exitcode, int bysignal) {
/* AOF enabled, replace the old fd with the new one. */ /* AOF enabled, replace the old fd with the new one. */
oldfd = server.aof_fd; oldfd = server.aof_fd;
server.aof_fd = newfd; server.aof_fd = newfd;
if (server.aof_fsync == AOF_FSYNC_ALWAYS) if (server.aof_fsync == AOF_FSYNC_ALWAYS) {
aof_fsync(newfd); aof_fsync(newfd);
else if (server.aof_fsync == AOF_FSYNC_EVERYSEC) } else if (server.aof_fsync == AOF_FSYNC_EVERYSEC) {
aof_background_fsync(newfd); aofStartBackgroundFsync();
}
server.aof_selected_db = -1; /* Make sure SELECT is re-issued */ server.aof_selected_db = -1; /* Make sure SELECT is re-issued */
aofUpdateCurrentSize(); aofUpdateCurrentSize();
server.aof_rewrite_base_size = server.aof_current_size; server.aof_rewrite_base_size = server.aof_current_size;