diff --git a/src/evict.c b/src/evict.c index 773916ce..176f4c36 100644 --- a/src/evict.c +++ b/src/evict.c @@ -78,7 +78,7 @@ unsigned int getLRUClock(void) { unsigned int LRU_CLOCK(void) { unsigned int lruclock; if (1000/server.hz <= LRU_CLOCK_RESOLUTION) { - atomicGet(server.lruclock,lruclock); + lruclock = server.lruclock; } else { lruclock = getLRUClock(); } diff --git a/src/networking.c b/src/networking.c index caffd3be..fd4e990f 100644 --- a/src/networking.c +++ b/src/networking.c @@ -35,6 +35,7 @@ #include static void setProtocolError(const char *errstr, client *c); +int postponeClientRead(client *c); /* Return the size consumed from the allocator, for the specified SDS string, * including internal fragmentation. This function is used in order to compute @@ -105,8 +106,7 @@ client *createClient(int fd) { } selectDb(c,0); - uint64_t client_id; - atomicGetIncr(server.next_client_id,client_id,1); + uint64_t client_id = ++server.next_client_id; c->id = client_id; c->resp = 2; c->fd = fd; @@ -950,6 +950,14 @@ void unlinkClient(client *c) { c->flags &= ~CLIENT_PENDING_WRITE; } + /* Remove from the list of pending reads if needed. */ + if (c->flags & CLIENT_PENDING_READ) { + ln = listSearchKey(server.clients_pending_read,c); + serverAssert(ln != NULL); + listDelNode(server.clients_pending_read,ln); + c->flags &= ~CLIENT_PENDING_READ; + } + /* When client was just unblocked because of a blocking operation, * remove it from the list of unblocked clients. */ if (c->flags & CLIENT_UNBLOCKED) { @@ -1642,13 +1650,19 @@ void processInputBuffer(client *c) { } /* This is a wrapper for processInputBuffer that also cares about handling - * the replication forwarding to the sub-slaves, in case the client 'c' + * the replication forwarding to the sub-replicas, in case the client 'c' * is flagged as master. Usually you want to call this instead of the * raw processInputBuffer(). */ void processInputBufferAndReplicate(client *c) { if (!(c->flags & CLIENT_MASTER)) { processInputBuffer(c); } else { + /* If the client is a master we need to compute the difference + * between the applied offset before and after processing the buffer, + * to understand how much of the replication stream was actually + * applied to the master state: this quantity, and its corresponding + * part of the replication stream, will be propagated to the + * sub-replicas and to the replication backlog. */ size_t prev_offset = c->reploff; processInputBuffer(c); size_t applied = c->reploff - prev_offset; @@ -1667,6 +1681,10 @@ void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) { UNUSED(el); UNUSED(mask); + /* Check if we want to read from the client later when exiting from + * the event loop. This is the case if threaded I/O is enabled. */ + if (postponeClientRead(c)) return; + readlen = PROTO_IOBUF_LEN; /* If this is a multi bulk request, and we are processing a bulk reply * that is large enough, try to maximize the probability that the query @@ -1716,20 +1734,21 @@ void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) { sds ci = catClientInfoString(sdsempty(),c), bytes = sdsempty(); bytes = sdscatrepr(bytes,c->querybuf,64); - serverLog(LL_WARNING,"Closing client that reached max query buffer length: %s (qbuf initial bytes: %s)", ci, bytes); +// FIXME: This may be called from an I/O thread and it is not safe to +// log from there for now. +// serverLog(LL_WARNING,"Closing client that reached max query buffer length: %s (qbuf initial bytes: %s)", ci, bytes); sdsfree(ci); sdsfree(bytes); freeClient(c); return; } - /* Time to process the buffer. If the client is a master we need to - * compute the difference between the applied offset before and after - * processing the buffer, to understand how much of the replication stream - * was actually applied to the master state: this quantity, and its - * corresponding part of the replication stream, will be propagated to - * the sub-slaves and to the replication backlog. */ - processInputBufferAndReplicate(c); + /* There is more data in the client input buffer, continue parsing it + * in case to check if there is a full command to execute. + * Don't do it if the client is flagged as CLIENT_PENDING_READ: it means + * we are currently in the context of an I/O thread. */ + if (!(c->flags & CLIENT_PENDING_READ)) + processInputBufferAndReplicate(c); } void getClientsMaxBuffers(unsigned long *longest_output_list, @@ -2566,7 +2585,9 @@ void stopThreadedIO(void) { /* This function checks if there are not enough pending clients to justify * taking the I/O threads active: in that case I/O threads are stopped if - * currently active. + * currently active. We track the pending writes as a measure of clients + * we need to handle in parallel, however the I/O threading is disabled + * globally for reads as well if we have too little pending clients. * * The function returns 0 if the I/O threading should be used becuase there * are enough active threads, otherwise 1 is returned and the I/O threads @@ -2647,3 +2668,19 @@ int handleClientsWithPendingWritesUsingThreads(void) { listEmpty(server.clients_pending_write); return processed; } + +/* Return 1 if we want to handle the client read later using threaded I/O. + * This is called by the readable handler of the event loop. + * As a side effect of calling this function the client is put in the + * pending read clients and flagged as such. */ +int postponeClientRead(client *c) { + if (io_threads_active && + !(c->flags & (CLIENT_MASTER|CLIENT_SLAVE|CLIENT_PENDING_READ))) + { + c->flags |= CLIENT_PENDING_READ; + listAddNodeHead(server.clients_pending_read,c); + return 1; + } else { + return 0; + } +} diff --git a/src/server.c b/src/server.c index f6d2b47f..ef6b85c4 100644 --- a/src/server.c +++ b/src/server.c @@ -1728,16 +1728,17 @@ void databasesCron(void) { * every object access, and accuracy is not needed. To access a global var is * a lot faster than calling time(NULL) */ void updateCachedTime(void) { - time_t unixtime = time(NULL); - atomicSet(server.unixtime,unixtime); + server.unixtime = time(NULL); server.mstime = mstime(); - /* To get information about daylight saving time, we need to call localtime_r - * and cache the result. However calling localtime_r in this context is safe - * since we will never fork() while here, in the main thread. The logging - * function will call a thread safe version of localtime that has no locks. */ + /* To get information about daylight saving time, we need to call + * localtime_r and cache the result. However calling localtime_r in this + * context is safe since we will never fork() while here, in the main + * thread. The logging function will call a thread safe version of + * localtime that has no locks. */ struct tm tm; - localtime_r(&server.unixtime,&tm); + time_t ut = server.unixtime; + localtime_r(&ut,&tm); server.daylight_active = tm.tm_isdst; } @@ -1807,8 +1808,7 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { * * Note that you can change the resolution altering the * LRU_CLOCK_RESOLUTION define. */ - unsigned long lruclock = getLRUClock(); - atomicSet(server.lruclock,lruclock); + server.lruclock = getLRUClock(); /* Record the max memory used since the server was started. */ if (zmalloc_used_memory() > server.stat_peak_memory) @@ -2202,10 +2202,6 @@ void createSharedObjects(void) { void initServerConfig(void) { int j; - pthread_mutex_init(&server.next_client_id_mutex,NULL); - pthread_mutex_init(&server.lruclock_mutex,NULL); - pthread_mutex_init(&server.unixtime_mutex,NULL); - updateCachedTime(); getRandomHexChars(server.runid,CONFIG_RUN_ID_SIZE); server.runid[CONFIG_RUN_ID_SIZE] = '\0'; @@ -2319,8 +2315,7 @@ void initServerConfig(void) { server.lua_time_limit = LUA_SCRIPT_TIME_LIMIT; server.io_threads_num = CONFIG_DEFAULT_IO_THREADS_NUM; - unsigned int lruclock = getLRUClock(); - atomicSet(server.lruclock,lruclock); + server.lruclock = getLRUClock(); resetServerSaveParams(); appendServerSaveParams(60*60,1); /* save after 1 hour and 1 change */ @@ -2718,6 +2713,7 @@ void initServer(void) { server.slaves = listCreate(); server.monitors = listCreate(); server.clients_pending_write = listCreate(); + server.clients_pending_read = listCreate(); server.slaveseldb = -1; /* Force to emit the first SELECT command. */ server.unblocked_clients = listCreate(); server.ready_keys = listCreate(); @@ -3821,8 +3817,6 @@ sds genRedisInfoString(char *section) { call_uname = 0; } - unsigned int lruclock; - atomicGet(server.lruclock,lruclock); info = sdscatprintf(info, "# Server\r\n" "redis_version:%s\r\n" @@ -3866,7 +3860,7 @@ sds genRedisInfoString(char *section) { (intmax_t)(uptime/(3600*24)), server.hz, server.config_hz, - (unsigned long) lruclock, + (unsigned long) server.lruclock, server.executable ? server.executable : "", server.configfile ? server.configfile : ""); } diff --git a/src/server.h b/src/server.h index 2e4de2bb..dcfcb55f 100644 --- a/src/server.h +++ b/src/server.h @@ -285,6 +285,9 @@ typedef long long mstime_t; /* millisecond time type. */ #define CLIENT_LUA_DEBUG_SYNC (1<<26) /* EVAL debugging without fork() */ #define CLIENT_MODULE (1<<27) /* Non connected client used by some module. */ #define CLIENT_PROTECTED (1<<28) /* Client should not be freed for now. */ +#define CLIENT_PENDING_READ (1<<29) /* The client has pending reads and was put + in the list of clients we can read + from. */ /* Client block type (btype field in client structure) * if CLIENT_BLOCKED flag is set. */ @@ -1018,7 +1021,7 @@ struct redisServer { dict *commands; /* Command table */ dict *orig_commands; /* Command table before command renaming. */ aeEventLoop *el; - unsigned int lruclock; /* Clock for LRU eviction */ + _Atomic unsigned int lruclock; /* Clock for LRU eviction */ int shutdown_asap; /* SHUTDOWN needed ASAP */ int activerehashing; /* Incremental rehash in serverCron() */ int active_defrag_running; /* Active defragmentation running (holds current scan aggressiveness) */ @@ -1052,6 +1055,7 @@ struct redisServer { list *clients; /* List of active clients */ list *clients_to_close; /* Clients to close asynchronously */ list *clients_pending_write; /* There is to write or install handler. */ + list *clients_pending_read; /* Client has pending read socket buffers. */ list *slaves, *monitors; /* List of slaves and MONITORs */ client *current_client; /* Current client, only used on crash report */ rax *clients_index; /* Active clients dictionary by client ID. */ @@ -1059,7 +1063,7 @@ struct redisServer { mstime_t clients_pause_end_time; /* Time when we undo clients_paused */ char neterr[ANET_ERR_LEN]; /* Error buffer for anet.c */ dict *migrate_cached_sockets;/* MIGRATE cached sockets */ - uint64_t next_client_id; /* Next client unique ID. Incremental. */ + _Atomic uint64_t next_client_id; /* Next client unique ID. Incremental. */ int protected_mode; /* Don't accept external connections. */ int gopher_enabled; /* If true the server will reply to gopher queries. Will still serve RESP2 queries. */ @@ -1104,8 +1108,8 @@ struct redisServer { long long slowlog_log_slower_than; /* SLOWLOG time limit (to get logged) */ unsigned long slowlog_max_len; /* SLOWLOG max number of items logged */ struct malloc_stats cron_malloc_stats; /* sampled in serverCron(). */ - long long stat_net_input_bytes; /* Bytes read from network. */ - long long stat_net_output_bytes; /* Bytes written to network. */ + _Atomic long long stat_net_input_bytes; /* Bytes read from network. */ + _Atomic long long stat_net_output_bytes; /* Bytes written to network. */ size_t stat_rdb_cow_bytes; /* Copy on write bytes during RDB saving. */ size_t stat_aof_cow_bytes; /* Copy on write bytes during AOF rewrite. */ /* The following two are used to track instantaneous metrics, like @@ -1128,7 +1132,7 @@ struct redisServer { int active_defrag_cycle_min; /* minimal effort for defrag in CPU percentage */ int active_defrag_cycle_max; /* maximal effort for defrag in CPU percentage */ unsigned long active_defrag_max_scan_fields; /* maximum number of fields of set/hash/zset/list to process from within the main dict scan */ - size_t client_max_querybuf_len; /* Limit for client query buffer length */ + _Atomic size_t client_max_querybuf_len; /* Limit for client query buffer length */ int dbnum; /* Total number of configured DBs */ int supervised; /* 1 if supervised, 0 otherwise. */ int supervised_mode; /* See SUPERVISED_* */ @@ -1297,10 +1301,10 @@ struct redisServer { int list_max_ziplist_size; int list_compress_depth; /* time cache */ - time_t unixtime; /* Unix time sampled every cron cycle. */ - time_t timezone; /* Cached timezone. As set by tzset(). */ - int daylight_active; /* Currently in daylight saving time. */ - long long mstime; /* Like 'unixtime' but with milliseconds resolution. */ + _Atomic time_t unixtime; /* Unix time sampled every cron cycle. */ + time_t timezone; /* Cached timezone. As set by tzset(). */ + int daylight_active; /* Currently in daylight saving time. */ + long long mstime; /* 'unixtime' with milliseconds resolution. */ /* Pubsub */ dict *pubsub_channels; /* Map channels to list of subscribed clients */ list *pubsub_patterns; /* A list of pubsub_patterns */ @@ -1360,12 +1364,6 @@ struct redisServer { int watchdog_period; /* Software watchdog period in ms. 0 = off */ /* System hardware info */ size_t system_memory_size; /* Total memory in system as reported by OS */ - - /* Mutexes used to protect atomic variables when atomic builtins are - * not available. */ - pthread_mutex_t lruclock_mutex; - pthread_mutex_t next_client_id_mutex; - pthread_mutex_t unixtime_mutex; }; typedef struct pubsubPattern {