diff --git a/src/aof.c b/src/aof.c index 4a7d749d..9fa98e9d 100644 --- a/src/aof.c +++ b/src/aof.c @@ -197,10 +197,35 @@ ssize_t aofRewriteBufferWrite(int fd) { * AOF file implementation * ------------------------------------------------------------------------- */ +/* Return true if an AOf fsync is currently already in progress in a + * BIO thread. */ +int aofFsyncInProgress(void) { + return bioPendingJobsOfType(BIO_AOF_FSYNC) != 0; +} + /* Starts a background task that performs fsync() against the specified * file descriptor (the one of the AOF file) in another thread. */ -void aof_background_fsync(int fd) { - bioCreateBackgroundJob(BIO_AOF_FSYNC,(void*)(long)fd,NULL,NULL); +void aofStartBackgroundFsync(void) { + if (aofFsyncInProgress()) { + /* We never want to start another fsync if one is in progress. */ + return; + } + + /* No fsync is in progress. If there was one, the new epoch, now that it + * termianted, is stored in server.aof_fsync_in_progress_epoch. So + * update the current fsync epoch. */ + server.aof_fsync_epoch = server.aof_fsync_in_progress_epoch; + bioCreateBackgroundJob(BIO_AOF_FSYNC,(void*)(long)server.aof_fd,NULL,NULL); + server.aof_fsync_in_progress_epoch++; + handleClientsBlockedForAOF(); /* Unblock clients if we can. */ +} + +/* Returns an AOF epoch so that, when such epoch is reached by + * server.aof_fsync_epoch, it means that everything that was written in the + * AOF up to the moment this function returned the epoch, is now flushed + * on disk. */ +uint64_t aofNextEpoch(void) { + return server.aof_fsync_in_progress_epoch + 1; } /* Kills an AOFRW child process if exists */ @@ -336,7 +361,7 @@ void flushAppendOnlyFile(int force) { if (sdslen(server.aof_buf) == 0) return; if (server.aof_fsync == AOF_FSYNC_EVERYSEC) - sync_in_progress = bioPendingJobsOfType(BIO_AOF_FSYNC) != 0; + sync_in_progress = aofFsyncInProgress(); if (server.aof_fsync == AOF_FSYNC_EVERYSEC && !force) { /* With this append fsync policy we do background fsyncing. @@ -485,7 +510,11 @@ void flushAppendOnlyFile(int force) { server.aof_last_fsync = server.unixtime; } else if ((server.aof_fsync == AOF_FSYNC_EVERYSEC && server.unixtime > server.aof_last_fsync)) { - if (!sync_in_progress) aof_background_fsync(server.aof_fd); + aofStartBackgroundFsync(); + /* Note that we update this time regardless of the fact fsync + * was actually started or nmot. server.aof_last_fsync is not + * used to really know when the latest fsync succeeded, but just + * as a timer to *try* to fsync once every second. */ server.aof_last_fsync = server.unixtime; } } @@ -1608,10 +1637,11 @@ void backgroundRewriteDoneHandler(int exitcode, int bysignal) { /* AOF enabled, replace the old fd with the new one. */ oldfd = server.aof_fd; server.aof_fd = newfd; - if (server.aof_fsync == AOF_FSYNC_ALWAYS) + if (server.aof_fsync == AOF_FSYNC_ALWAYS) { aof_fsync(newfd); - else if (server.aof_fsync == AOF_FSYNC_EVERYSEC) - aof_background_fsync(newfd); + } else if (server.aof_fsync == AOF_FSYNC_EVERYSEC) { + aofStartBackgroundFsync(); + } server.aof_selected_db = -1; /* Make sure SELECT is re-issued */ aofUpdateCurrentSize(); server.aof_rewrite_base_size = server.aof_current_size;