Threaded IO: bugfix #6988 process events while blocked

This commit is contained in:
zhaozhao.zz 2020-03-15 21:49:10 +08:00
parent 453e01a091
commit 606a01df70

View File

@ -36,6 +36,7 @@
static void setProtocolError(const char *errstr, client *c); static void setProtocolError(const char *errstr, client *c);
int postponeClientRead(client *c); int postponeClientRead(client *c);
int process_while_blocked;
/* Return the size consumed from the allocator, for the specified SDS string, /* Return the size consumed from the allocator, for the specified SDS string,
* including internal fragmentation. This function is used in order to compute * including internal fragmentation. This function is used in order to compute
@ -2738,6 +2739,7 @@ int clientsArePaused(void) {
int processEventsWhileBlocked(void) { int processEventsWhileBlocked(void) {
int iterations = 4; /* See the function top-comment. */ int iterations = 4; /* See the function top-comment. */
int count = 0; int count = 0;
process_while_blocked = 1;
while (iterations--) { while (iterations--) {
int events = 0; int events = 0;
events += aeProcessEvents(server.el, AE_FILE_EVENTS|AE_DONT_WAIT); events += aeProcessEvents(server.el, AE_FILE_EVENTS|AE_DONT_WAIT);
@ -2745,6 +2747,7 @@ int processEventsWhileBlocked(void) {
if (!events) break; if (!events) break;
count += events; count += events;
} }
process_while_blocked = 0;
return count; return count;
} }
@ -2816,6 +2819,7 @@ void *IOThreadMain(void *myid) {
/* Initialize the data structures needed for threaded I/O. */ /* Initialize the data structures needed for threaded I/O. */
void initThreadedIO(void) { void initThreadedIO(void) {
io_threads_active = 0; /* We start with threads not active. */ io_threads_active = 0; /* We start with threads not active. */
process_while_blocked = 0;
/* Don't spawn any thread if the user selected a single thread: /* Don't spawn any thread if the user selected a single thread:
* we'll handle I/O directly from the main thread. */ * we'll handle I/O directly from the main thread. */
@ -2970,6 +2974,7 @@ int handleClientsWithPendingWritesUsingThreads(void) {
int postponeClientRead(client *c) { int postponeClientRead(client *c) {
if (io_threads_active && if (io_threads_active &&
server.io_threads_do_reads && server.io_threads_do_reads &&
!process_while_blocked &&
!(c->flags & (CLIENT_MASTER|CLIENT_SLAVE|CLIENT_PENDING_READ))) !(c->flags & (CLIENT_MASTER|CLIENT_SLAVE|CLIENT_PENDING_READ)))
{ {
c->flags |= CLIENT_PENDING_READ; c->flags |= CLIENT_PENDING_READ;