Threaded IO: read side WIP.

This commit is contained in:
antirez 2019-03-30 11:26:58 +01:00
parent 74591fb5bd
commit dd5b105c73
4 changed files with 75 additions and 46 deletions

View File

@ -78,7 +78,7 @@ unsigned int getLRUClock(void) {
unsigned int LRU_CLOCK(void) { unsigned int LRU_CLOCK(void) {
unsigned int lruclock; unsigned int lruclock;
if (1000/server.hz <= LRU_CLOCK_RESOLUTION) { if (1000/server.hz <= LRU_CLOCK_RESOLUTION) {
atomicGet(server.lruclock,lruclock); lruclock = server.lruclock;
} else { } else {
lruclock = getLRUClock(); lruclock = getLRUClock();
} }

View File

@ -35,6 +35,7 @@
#include <ctype.h> #include <ctype.h>
static void setProtocolError(const char *errstr, client *c); static void setProtocolError(const char *errstr, client *c);
int postponeClientRead(client *c);
/* Return the size consumed from the allocator, for the specified SDS string, /* Return the size consumed from the allocator, for the specified SDS string,
* including internal fragmentation. This function is used in order to compute * including internal fragmentation. This function is used in order to compute
@ -105,8 +106,7 @@ client *createClient(int fd) {
} }
selectDb(c,0); selectDb(c,0);
uint64_t client_id; uint64_t client_id = ++server.next_client_id;
atomicGetIncr(server.next_client_id,client_id,1);
c->id = client_id; c->id = client_id;
c->resp = 2; c->resp = 2;
c->fd = fd; c->fd = fd;
@ -950,6 +950,14 @@ void unlinkClient(client *c) {
c->flags &= ~CLIENT_PENDING_WRITE; c->flags &= ~CLIENT_PENDING_WRITE;
} }
/* Remove from the list of pending reads if needed. */
if (c->flags & CLIENT_PENDING_READ) {
ln = listSearchKey(server.clients_pending_read,c);
serverAssert(ln != NULL);
listDelNode(server.clients_pending_read,ln);
c->flags &= ~CLIENT_PENDING_READ;
}
/* When client was just unblocked because of a blocking operation, /* When client was just unblocked because of a blocking operation,
* remove it from the list of unblocked clients. */ * remove it from the list of unblocked clients. */
if (c->flags & CLIENT_UNBLOCKED) { if (c->flags & CLIENT_UNBLOCKED) {
@ -1642,13 +1650,19 @@ void processInputBuffer(client *c) {
} }
/* This is a wrapper for processInputBuffer that also cares about handling /* This is a wrapper for processInputBuffer that also cares about handling
* the replication forwarding to the sub-slaves, in case the client 'c' * the replication forwarding to the sub-replicas, in case the client 'c'
* is flagged as master. Usually you want to call this instead of the * is flagged as master. Usually you want to call this instead of the
* raw processInputBuffer(). */ * raw processInputBuffer(). */
void processInputBufferAndReplicate(client *c) { void processInputBufferAndReplicate(client *c) {
if (!(c->flags & CLIENT_MASTER)) { if (!(c->flags & CLIENT_MASTER)) {
processInputBuffer(c); processInputBuffer(c);
} else { } else {
/* If the client is a master we need to compute the difference
* between the applied offset before and after processing the buffer,
* to understand how much of the replication stream was actually
* applied to the master state: this quantity, and its corresponding
* part of the replication stream, will be propagated to the
* sub-replicas and to the replication backlog. */
size_t prev_offset = c->reploff; size_t prev_offset = c->reploff;
processInputBuffer(c); processInputBuffer(c);
size_t applied = c->reploff - prev_offset; size_t applied = c->reploff - prev_offset;
@ -1667,6 +1681,10 @@ void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
UNUSED(el); UNUSED(el);
UNUSED(mask); UNUSED(mask);
/* Check if we want to read from the client later when exiting from
* the event loop. This is the case if threaded I/O is enabled. */
if (postponeClientRead(c)) return;
readlen = PROTO_IOBUF_LEN; readlen = PROTO_IOBUF_LEN;
/* If this is a multi bulk request, and we are processing a bulk reply /* If this is a multi bulk request, and we are processing a bulk reply
* that is large enough, try to maximize the probability that the query * that is large enough, try to maximize the probability that the query
@ -1716,20 +1734,21 @@ void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
sds ci = catClientInfoString(sdsempty(),c), bytes = sdsempty(); sds ci = catClientInfoString(sdsempty(),c), bytes = sdsempty();
bytes = sdscatrepr(bytes,c->querybuf,64); bytes = sdscatrepr(bytes,c->querybuf,64);
serverLog(LL_WARNING,"Closing client that reached max query buffer length: %s (qbuf initial bytes: %s)", ci, bytes); // FIXME: This may be called from an I/O thread and it is not safe to
// log from there for now.
// serverLog(LL_WARNING,"Closing client that reached max query buffer length: %s (qbuf initial bytes: %s)", ci, bytes);
sdsfree(ci); sdsfree(ci);
sdsfree(bytes); sdsfree(bytes);
freeClient(c); freeClient(c);
return; return;
} }
/* Time to process the buffer. If the client is a master we need to /* There is more data in the client input buffer, continue parsing it
* compute the difference between the applied offset before and after * in case to check if there is a full command to execute.
* processing the buffer, to understand how much of the replication stream * Don't do it if the client is flagged as CLIENT_PENDING_READ: it means
* was actually applied to the master state: this quantity, and its * we are currently in the context of an I/O thread. */
* corresponding part of the replication stream, will be propagated to if (!(c->flags & CLIENT_PENDING_READ))
* the sub-slaves and to the replication backlog. */ processInputBufferAndReplicate(c);
processInputBufferAndReplicate(c);
} }
void getClientsMaxBuffers(unsigned long *longest_output_list, void getClientsMaxBuffers(unsigned long *longest_output_list,
@ -2566,7 +2585,9 @@ void stopThreadedIO(void) {
/* This function checks if there are not enough pending clients to justify /* This function checks if there are not enough pending clients to justify
* taking the I/O threads active: in that case I/O threads are stopped if * taking the I/O threads active: in that case I/O threads are stopped if
* currently active. * currently active. We track the pending writes as a measure of clients
* we need to handle in parallel, however the I/O threading is disabled
* globally for reads as well if we have too little pending clients.
* *
* The function returns 0 if the I/O threading should be used becuase there * The function returns 0 if the I/O threading should be used becuase there
* are enough active threads, otherwise 1 is returned and the I/O threads * are enough active threads, otherwise 1 is returned and the I/O threads
@ -2647,3 +2668,19 @@ int handleClientsWithPendingWritesUsingThreads(void) {
listEmpty(server.clients_pending_write); listEmpty(server.clients_pending_write);
return processed; return processed;
} }
/* Return 1 if we want to handle the client read later using threaded I/O.
* This is called by the readable handler of the event loop.
* As a side effect of calling this function the client is put in the
* pending read clients and flagged as such. */
int postponeClientRead(client *c) {
if (io_threads_active &&
!(c->flags & (CLIENT_MASTER|CLIENT_SLAVE|CLIENT_PENDING_READ)))
{
c->flags |= CLIENT_PENDING_READ;
listAddNodeHead(server.clients_pending_read,c);
return 1;
} else {
return 0;
}
}

View File

@ -1728,16 +1728,17 @@ void databasesCron(void) {
* every object access, and accuracy is not needed. To access a global var is * every object access, and accuracy is not needed. To access a global var is
* a lot faster than calling time(NULL) */ * a lot faster than calling time(NULL) */
void updateCachedTime(void) { void updateCachedTime(void) {
time_t unixtime = time(NULL); server.unixtime = time(NULL);
atomicSet(server.unixtime,unixtime);
server.mstime = mstime(); server.mstime = mstime();
/* To get information about daylight saving time, we need to call localtime_r /* To get information about daylight saving time, we need to call
* and cache the result. However calling localtime_r in this context is safe * localtime_r and cache the result. However calling localtime_r in this
* since we will never fork() while here, in the main thread. The logging * context is safe since we will never fork() while here, in the main
* function will call a thread safe version of localtime that has no locks. */ * thread. The logging function will call a thread safe version of
* localtime that has no locks. */
struct tm tm; struct tm tm;
localtime_r(&server.unixtime,&tm); time_t ut = server.unixtime;
localtime_r(&ut,&tm);
server.daylight_active = tm.tm_isdst; server.daylight_active = tm.tm_isdst;
} }
@ -1807,8 +1808,7 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
* *
* Note that you can change the resolution altering the * Note that you can change the resolution altering the
* LRU_CLOCK_RESOLUTION define. */ * LRU_CLOCK_RESOLUTION define. */
unsigned long lruclock = getLRUClock(); server.lruclock = getLRUClock();
atomicSet(server.lruclock,lruclock);
/* Record the max memory used since the server was started. */ /* Record the max memory used since the server was started. */
if (zmalloc_used_memory() > server.stat_peak_memory) if (zmalloc_used_memory() > server.stat_peak_memory)
@ -2202,10 +2202,6 @@ void createSharedObjects(void) {
void initServerConfig(void) { void initServerConfig(void) {
int j; int j;
pthread_mutex_init(&server.next_client_id_mutex,NULL);
pthread_mutex_init(&server.lruclock_mutex,NULL);
pthread_mutex_init(&server.unixtime_mutex,NULL);
updateCachedTime(); updateCachedTime();
getRandomHexChars(server.runid,CONFIG_RUN_ID_SIZE); getRandomHexChars(server.runid,CONFIG_RUN_ID_SIZE);
server.runid[CONFIG_RUN_ID_SIZE] = '\0'; server.runid[CONFIG_RUN_ID_SIZE] = '\0';
@ -2319,8 +2315,7 @@ void initServerConfig(void) {
server.lua_time_limit = LUA_SCRIPT_TIME_LIMIT; server.lua_time_limit = LUA_SCRIPT_TIME_LIMIT;
server.io_threads_num = CONFIG_DEFAULT_IO_THREADS_NUM; server.io_threads_num = CONFIG_DEFAULT_IO_THREADS_NUM;
unsigned int lruclock = getLRUClock(); server.lruclock = getLRUClock();
atomicSet(server.lruclock,lruclock);
resetServerSaveParams(); resetServerSaveParams();
appendServerSaveParams(60*60,1); /* save after 1 hour and 1 change */ appendServerSaveParams(60*60,1); /* save after 1 hour and 1 change */
@ -2718,6 +2713,7 @@ void initServer(void) {
server.slaves = listCreate(); server.slaves = listCreate();
server.monitors = listCreate(); server.monitors = listCreate();
server.clients_pending_write = listCreate(); server.clients_pending_write = listCreate();
server.clients_pending_read = listCreate();
server.slaveseldb = -1; /* Force to emit the first SELECT command. */ server.slaveseldb = -1; /* Force to emit the first SELECT command. */
server.unblocked_clients = listCreate(); server.unblocked_clients = listCreate();
server.ready_keys = listCreate(); server.ready_keys = listCreate();
@ -3821,8 +3817,6 @@ sds genRedisInfoString(char *section) {
call_uname = 0; call_uname = 0;
} }
unsigned int lruclock;
atomicGet(server.lruclock,lruclock);
info = sdscatprintf(info, info = sdscatprintf(info,
"# Server\r\n" "# Server\r\n"
"redis_version:%s\r\n" "redis_version:%s\r\n"
@ -3866,7 +3860,7 @@ sds genRedisInfoString(char *section) {
(intmax_t)(uptime/(3600*24)), (intmax_t)(uptime/(3600*24)),
server.hz, server.hz,
server.config_hz, server.config_hz,
(unsigned long) lruclock, (unsigned long) server.lruclock,
server.executable ? server.executable : "", server.executable ? server.executable : "",
server.configfile ? server.configfile : ""); server.configfile ? server.configfile : "");
} }

View File

@ -285,6 +285,9 @@ typedef long long mstime_t; /* millisecond time type. */
#define CLIENT_LUA_DEBUG_SYNC (1<<26) /* EVAL debugging without fork() */ #define CLIENT_LUA_DEBUG_SYNC (1<<26) /* EVAL debugging without fork() */
#define CLIENT_MODULE (1<<27) /* Non connected client used by some module. */ #define CLIENT_MODULE (1<<27) /* Non connected client used by some module. */
#define CLIENT_PROTECTED (1<<28) /* Client should not be freed for now. */ #define CLIENT_PROTECTED (1<<28) /* Client should not be freed for now. */
#define CLIENT_PENDING_READ (1<<29) /* The client has pending reads and was put
in the list of clients we can read
from. */
/* Client block type (btype field in client structure) /* Client block type (btype field in client structure)
* if CLIENT_BLOCKED flag is set. */ * if CLIENT_BLOCKED flag is set. */
@ -1018,7 +1021,7 @@ struct redisServer {
dict *commands; /* Command table */ dict *commands; /* Command table */
dict *orig_commands; /* Command table before command renaming. */ dict *orig_commands; /* Command table before command renaming. */
aeEventLoop *el; aeEventLoop *el;
unsigned int lruclock; /* Clock for LRU eviction */ _Atomic unsigned int lruclock; /* Clock for LRU eviction */
int shutdown_asap; /* SHUTDOWN needed ASAP */ int shutdown_asap; /* SHUTDOWN needed ASAP */
int activerehashing; /* Incremental rehash in serverCron() */ int activerehashing; /* Incremental rehash in serverCron() */
int active_defrag_running; /* Active defragmentation running (holds current scan aggressiveness) */ int active_defrag_running; /* Active defragmentation running (holds current scan aggressiveness) */
@ -1052,6 +1055,7 @@ struct redisServer {
list *clients; /* List of active clients */ list *clients; /* List of active clients */
list *clients_to_close; /* Clients to close asynchronously */ list *clients_to_close; /* Clients to close asynchronously */
list *clients_pending_write; /* There is to write or install handler. */ list *clients_pending_write; /* There is to write or install handler. */
list *clients_pending_read; /* Client has pending read socket buffers. */
list *slaves, *monitors; /* List of slaves and MONITORs */ list *slaves, *monitors; /* List of slaves and MONITORs */
client *current_client; /* Current client, only used on crash report */ client *current_client; /* Current client, only used on crash report */
rax *clients_index; /* Active clients dictionary by client ID. */ rax *clients_index; /* Active clients dictionary by client ID. */
@ -1059,7 +1063,7 @@ struct redisServer {
mstime_t clients_pause_end_time; /* Time when we undo clients_paused */ mstime_t clients_pause_end_time; /* Time when we undo clients_paused */
char neterr[ANET_ERR_LEN]; /* Error buffer for anet.c */ char neterr[ANET_ERR_LEN]; /* Error buffer for anet.c */
dict *migrate_cached_sockets;/* MIGRATE cached sockets */ dict *migrate_cached_sockets;/* MIGRATE cached sockets */
uint64_t next_client_id; /* Next client unique ID. Incremental. */ _Atomic uint64_t next_client_id; /* Next client unique ID. Incremental. */
int protected_mode; /* Don't accept external connections. */ int protected_mode; /* Don't accept external connections. */
int gopher_enabled; /* If true the server will reply to gopher int gopher_enabled; /* If true the server will reply to gopher
queries. Will still serve RESP2 queries. */ queries. Will still serve RESP2 queries. */
@ -1104,8 +1108,8 @@ struct redisServer {
long long slowlog_log_slower_than; /* SLOWLOG time limit (to get logged) */ long long slowlog_log_slower_than; /* SLOWLOG time limit (to get logged) */
unsigned long slowlog_max_len; /* SLOWLOG max number of items logged */ unsigned long slowlog_max_len; /* SLOWLOG max number of items logged */
struct malloc_stats cron_malloc_stats; /* sampled in serverCron(). */ struct malloc_stats cron_malloc_stats; /* sampled in serverCron(). */
long long stat_net_input_bytes; /* Bytes read from network. */ _Atomic long long stat_net_input_bytes; /* Bytes read from network. */
long long stat_net_output_bytes; /* Bytes written to network. */ _Atomic long long stat_net_output_bytes; /* Bytes written to network. */
size_t stat_rdb_cow_bytes; /* Copy on write bytes during RDB saving. */ size_t stat_rdb_cow_bytes; /* Copy on write bytes during RDB saving. */
size_t stat_aof_cow_bytes; /* Copy on write bytes during AOF rewrite. */ size_t stat_aof_cow_bytes; /* Copy on write bytes during AOF rewrite. */
/* The following two are used to track instantaneous metrics, like /* The following two are used to track instantaneous metrics, like
@ -1128,7 +1132,7 @@ struct redisServer {
int active_defrag_cycle_min; /* minimal effort for defrag in CPU percentage */ int active_defrag_cycle_min; /* minimal effort for defrag in CPU percentage */
int active_defrag_cycle_max; /* maximal effort for defrag in CPU percentage */ int active_defrag_cycle_max; /* maximal effort for defrag in CPU percentage */
unsigned long active_defrag_max_scan_fields; /* maximum number of fields of set/hash/zset/list to process from within the main dict scan */ unsigned long active_defrag_max_scan_fields; /* maximum number of fields of set/hash/zset/list to process from within the main dict scan */
size_t client_max_querybuf_len; /* Limit for client query buffer length */ _Atomic size_t client_max_querybuf_len; /* Limit for client query buffer length */
int dbnum; /* Total number of configured DBs */ int dbnum; /* Total number of configured DBs */
int supervised; /* 1 if supervised, 0 otherwise. */ int supervised; /* 1 if supervised, 0 otherwise. */
int supervised_mode; /* See SUPERVISED_* */ int supervised_mode; /* See SUPERVISED_* */
@ -1297,10 +1301,10 @@ struct redisServer {
int list_max_ziplist_size; int list_max_ziplist_size;
int list_compress_depth; int list_compress_depth;
/* time cache */ /* time cache */
time_t unixtime; /* Unix time sampled every cron cycle. */ _Atomic time_t unixtime; /* Unix time sampled every cron cycle. */
time_t timezone; /* Cached timezone. As set by tzset(). */ time_t timezone; /* Cached timezone. As set by tzset(). */
int daylight_active; /* Currently in daylight saving time. */ int daylight_active; /* Currently in daylight saving time. */
long long mstime; /* Like 'unixtime' but with milliseconds resolution. */ long long mstime; /* 'unixtime' with milliseconds resolution. */
/* Pubsub */ /* Pubsub */
dict *pubsub_channels; /* Map channels to list of subscribed clients */ dict *pubsub_channels; /* Map channels to list of subscribed clients */
list *pubsub_patterns; /* A list of pubsub_patterns */ list *pubsub_patterns; /* A list of pubsub_patterns */
@ -1360,12 +1364,6 @@ struct redisServer {
int watchdog_period; /* Software watchdog period in ms. 0 = off */ int watchdog_period; /* Software watchdog period in ms. 0 = off */
/* System hardware info */ /* System hardware info */
size_t system_memory_size; /* Total memory in system as reported by OS */ size_t system_memory_size; /* Total memory in system as reported by OS */
/* Mutexes used to protect atomic variables when atomic builtins are
* not available. */
pthread_mutex_t lruclock_mutex;
pthread_mutex_t next_client_id_mutex;
pthread_mutex_t unixtime_mutex;
}; };
typedef struct pubsubPattern { typedef struct pubsubPattern {