diff --git a/src/networking.c b/src/networking.c index aeaeca96..871da626 100644 --- a/src/networking.c +++ b/src/networking.c @@ -871,9 +871,17 @@ void freeClient(client *c) { * a context where calling freeClient() is not possible, because the client * should be valid for the continuation of the flow of the program. */ void freeClientAsync(client *c) { + /* We need to handle concurrent access to the server.clients_to_close list + * only in the freeClientAsync() function, since it's the only function that + * may access the list while Redis uses I/O threads. All the other accesses + * are in the context of the main thread while the other threads are + * idle. */ + static pthread_mutex_t async_free_queue_mutex = PTHREAD_MUTEX_INITIALIZER; if (c->flags & CLIENT_CLOSE_ASAP || c->flags & CLIENT_LUA) return; c->flags |= CLIENT_CLOSE_ASAP; + pthread_mutex_lock(&async_free_queue_mutex); listAddNodeTail(server.clients_to_close,c); + pthread_mutex_unlock(&async_free_queue_mutex); } void freeClientsInAsyncFreeQueue(void) { @@ -888,7 +896,12 @@ void freeClientsInAsyncFreeQueue(void) { } /* Write data in output buffers to client. Return C_OK if the client - * is still valid after the call, C_ERR if it was freed. */ + * is still valid after the call, C_ERR if it was freed because of some + * error. + * + * This function is called by threads, but always with handler_installed + * set to 0. So when handler_installed is set to 0 the function must be + * thread safe. */ int writeToClient(int fd, client *c, int handler_installed) { ssize_t nwritten = 0, totwritten = 0; size_t objlen; @@ -944,14 +957,15 @@ int writeToClient(int fd, client *c, int handler_installed) { (server.maxmemory == 0 || zmalloc_used_memory() < server.maxmemory)) break; } + /* FIXME: Fixme, use atomic var for this. */ server.stat_net_output_bytes += totwritten; if (nwritten == -1) { if (errno == EAGAIN) { nwritten = 0; } else { - serverLog(LL_VERBOSE, - "Error writing to client: %s", strerror(errno)); - freeClient(c); + // serverLog(LL_VERBOSE, + // "Error writing to client: %s", strerror(errno)); + freeClientAsync(c); return C_ERR; } } @@ -964,11 +978,15 @@ int writeToClient(int fd, client *c, int handler_installed) { } if (!clientHasPendingReplies(c)) { c->sentlen = 0; + /* Note that writeToClient() is called in a threaded way, but + * adDeleteFileEvent() is not thread safe: however writeToClient() + * is always called with handler_installed set to 0 from threads + * so we are fine. */ if (handler_installed) aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE); /* Close connection after entire reply has been sent. */ if (c->flags & CLIENT_CLOSE_AFTER_REPLY) { - freeClient(c); + freeClientAsync(c); return C_ERR; } } @@ -2020,3 +2038,131 @@ int processEventsWhileBlocked(void) { } return count; } + +/* ============================================================================= + * Threaded I/O + * =========================================================================== */ + +#define SERVER_MAX_IO_THREADS 32 + +pthread_t io_threads[SERVER_MAX_IO_THREADS]; +pthread_mutex_t io_threads_done_mutex = PTHREAD_MUTEX_INITIALIZER; +pthread_cond_t io_threads_done_cond = PTHREAD_COND_INITIALIZER; +pthread_mutex_t io_threads_idle_mutex = PTHREAD_MUTEX_INITIALIZER; +pthread_cond_t io_threads_idle_cond = PTHREAD_COND_INITIALIZER; +pthread_cond_t io_threads_start_cond = PTHREAD_COND_INITIALIZER; +int io_threads_done = 0; /* Number of threads that completed the work. */ +int io_threads_idle = 0; /* Number of threads in idle state ready to go. */ +list *io_threads_list[SERVER_MAX_IO_THREADS]; + +void *IOThreadMain(void *myid) { + /* The ID is the thread number (from 0 to server.iothreads_num-1), and is + * used by the thread to just manipulate a single sub-array of clients. */ + long id = (unsigned long)myid; + + while(1) { + /* ... Wait for start ... */ + pthread_mutex_lock(&io_threads_idle_mutex); + io_threads_idle++; + pthread_cond_signal(&io_threads_idle_cond); + printf("[%ld] Waiting start...\n", id); + pthread_cond_wait(&io_threads_start_cond,&io_threads_idle_mutex); + printf("[%ld] Started\n", id); + pthread_mutex_unlock(&io_threads_idle_mutex); + printf("%d to handle\n", (int)listLength(io_threads_list[id])); + + /* ... Process ... */ + listIter li; + listNode *ln; + listRewind(io_threads_list[id],&li); + while((ln = listNext(&li))) { + client *c = listNodeValue(ln); + writeToClient(c->fd,c,0); + } + listEmpty(io_threads_list[id]); + + /* Report success. */ + pthread_mutex_lock(&io_threads_done_mutex); + io_threads_done++; + pthread_cond_signal(&io_threads_done_cond); + pthread_mutex_unlock(&io_threads_done_mutex); + printf("[%ld] Done\n", id); + } +} + +/* Initialize the data structures needed for threaded I/O. */ +void initThreadedIO(void) { + pthread_t tid; + + server.io_threads_num = 4; + for (int i = 0; i < server.io_threads_num; i++) { + if (pthread_create(&tid,NULL,IOThreadMain,(void*)(long)i) != 0) { + serverLog(LL_WARNING,"Fatal: Can't initialize IO thread."); + exit(1); + } + io_threads[i] = tid; + io_threads_list[i] = listCreate(); + } +} + +int handleClientsWithPendingWritesUsingThreads(void) { + int processed = listLength(server.clients_pending_write); + if (processed == 0) return 0; /* Return ASAP if there are no clients. */ + + printf("%d TOTAL\n", processed); + + /* Wait for all threads to be ready. */ + pthread_mutex_lock(&io_threads_idle_mutex); + while(io_threads_idle < server.io_threads_num) { + pthread_cond_wait(&io_threads_idle_cond,&io_threads_idle_mutex); + } + printf("All threads are idle: %d\n", io_threads_idle); + io_threads_idle = 0; + pthread_mutex_unlock(&io_threads_idle_mutex); + + /* Distribute the clients across N different lists. */ + listIter li; + listNode *ln; + listRewind(server.clients_pending_write,&li); + int item_id = 0; + while((ln = listNext(&li))) { + client *c = listNodeValue(ln); + c->flags &= ~CLIENT_PENDING_WRITE; + int target_id = item_id % server.io_threads_num; + listAddNodeTail(io_threads_list[target_id],c); + item_id++; + } + + /* Start all threads. */ + printf("Send start condition\n"); + pthread_mutex_lock(&io_threads_done_mutex); + io_threads_done = 0; + pthread_cond_broadcast(&io_threads_start_cond); + pthread_mutex_unlock(&io_threads_done_mutex); + + /* Wait for all threads to end their work. */ + pthread_mutex_lock(&io_threads_done_mutex); + while(io_threads_done < server.io_threads_num) { + pthread_cond_wait(&io_threads_done_cond,&io_threads_done_mutex); + } + pthread_mutex_unlock(&io_threads_done_mutex); + printf("All threads finshed\n"); + + /* Run the list of clients again to install the write handler where + * needed. */ + listRewind(server.clients_pending_write,&li); + while((ln = listNext(&li))) { + client *c = listNodeValue(ln); + + /* Install the write handler if there are pending writes in some + * of the clients. */ + if (clientHasPendingReplies(c) && + aeCreateFileEvent(server.el, c->fd, AE_WRITABLE, + sendReplyToClient, c) == AE_ERR) + { + freeClientAsync(c); + } + } + listEmpty(server.clients_pending_write); + return processed; +} diff --git a/src/server.c b/src/server.c index 61291fde..aaf34cb8 100644 --- a/src/server.c +++ b/src/server.c @@ -1129,9 +1129,6 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { flushAppendOnlyFile(0); } - /* Close clients that need to be closed asynchronous */ - freeClientsInAsyncFreeQueue(); - /* Clear the paused clients flag if needed. */ clientsArePaused(); /* Don't check return value, just use the side effect.*/ @@ -1225,7 +1222,12 @@ void beforeSleep(struct aeEventLoop *eventLoop) { flushAppendOnlyFile(0); /* Handle writes with pending output buffers. */ - handleClientsWithPendingWrites(); + /* XXX: Put a condition based on number of waiting clients: if we + * have less than a given number of clients, use non threaded code. */ + handleClientsWithPendingWritesUsingThreads(); + + /* Close clients that need to be closed asynchronous */ + freeClientsInAsyncFreeQueue(); /* Before we are going to sleep, let the threads access the dataset by * releasing the GIL. Redis main thread will not touch anything at this @@ -1948,6 +1950,7 @@ void initServer(void) { slowlogInit(); latencyMonitorInit(); bioInit(); + initThreadedIO(); server.initial_memory_usage = zmalloc_used_memory(); } diff --git a/src/server.h b/src/server.h index e3b56075..bbc8dce1 100644 --- a/src/server.h +++ b/src/server.h @@ -922,6 +922,8 @@ struct redisServer { dict *migrate_cached_sockets;/* MIGRATE cached sockets */ uint64_t next_client_id; /* Next client unique ID. Incremental. */ int protected_mode; /* Don't accept external connections. */ + int io_threads_num; /* Number of IO threads to use. */ + /* RDB / AOF loading information */ int loading; /* We are loading data from disk if true */ off_t loading_total_bytes; @@ -1382,9 +1384,11 @@ void pauseClients(mstime_t duration); int clientsArePaused(void); int processEventsWhileBlocked(void); int handleClientsWithPendingWrites(void); +int handleClientsWithPendingWritesUsingThreads(void); int clientHasPendingReplies(client *c); void unlinkClient(client *c); int writeToClient(int fd, client *c, int handler_installed); +void initThreadedIO(void); #ifdef __GNUC__ void addReplyErrorFormat(client *c, const char *fmt, ...)