ae.c now supports multiple polling API modules, even if only ae_select.c is implemented currently. Also adding and removing an event is now O(1).

This commit is contained in:
antirez
2009-11-23 18:50:39 +01:00
parent 5b2a1c292a
commit 266373b283
6 changed files with 147 additions and 126 deletions

144
ae.c
View File

@ -38,20 +38,39 @@
#include "ae.h"
#include "zmalloc.h"
#include "config.h"
/* Include the best multiplexing layer supported by this system.
* The following should be ordered by performances, descending. */
#ifdef HAVE_EPOLL
#include "ae_epoll.c"
#else
#include "ae_select.c"
#endif
aeEventLoop *aeCreateEventLoop(void) {
aeEventLoop *eventLoop;
int i;
eventLoop = zmalloc(sizeof(*eventLoop));
if (!eventLoop) return NULL;
eventLoop->fileEventHead = NULL;
eventLoop->timeEventHead = NULL;
eventLoop->timeEventNextId = 0;
eventLoop->stop = 0;
eventLoop->maxfd = -1;
if (aeApiCreate(eventLoop) == -1) {
zfree(eventLoop);
return NULL;
}
/* Events with mask == AE_NONE are not set. So let's initialize the
* vector with it. */
for (i = 0; i < AE_SETSIZE; i++)
eventLoop->events[i].mask = AE_NONE;
return eventLoop;
}
void aeDeleteEventLoop(aeEventLoop *eventLoop) {
aeApiFree(eventLoop);
zfree(eventLoop);
}
@ -60,42 +79,39 @@ void aeStop(aeEventLoop *eventLoop) {
}
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
aeFileProc *proc, void *clientData,
aeEventFinalizerProc *finalizerProc)
aeFileProc *proc, void *clientData)
{
aeFileEvent *fe;
if (fd >= AE_SETSIZE) return AE_ERR;
aeFileEvent *fe = &eventLoop->events[fd];
fe = zmalloc(sizeof(*fe));
if (fe == NULL) return AE_ERR;
fe->fd = fd;
fe->mask = mask;
fe->fileProc = proc;
fe->finalizerProc = finalizerProc;
if (aeApiAddEvent(eventLoop, fd, mask) == -1)
return AE_ERR;
fe->mask |= mask;
if (mask & AE_READABLE) fe->rfileProc = proc;
if (mask & AE_WRITABLE) fe->wfileProc = proc;
if (mask & AE_EXCEPTION) fe->efileProc = proc;
fe->clientData = clientData;
fe->next = eventLoop->fileEventHead;
eventLoop->fileEventHead = fe;
if (fd > eventLoop->maxfd)
eventLoop->maxfd = fd;
return AE_OK;
}
void aeDeleteFileEvent(aeEventLoop *eventLoop, int fd, int mask)
{
aeFileEvent *fe, *prev = NULL;
if (fd >= AE_SETSIZE) return;
aeFileEvent *fe = &eventLoop->events[fd];
fe = eventLoop->fileEventHead;
while(fe) {
if (fe->fd == fd && fe->mask == mask) {
if (prev == NULL)
eventLoop->fileEventHead = fe->next;
else
prev->next = fe->next;
if (fe->finalizerProc)
fe->finalizerProc(eventLoop, fe->clientData);
zfree(fe);
return;
}
prev = fe;
fe = fe->next;
if (fe->mask == AE_NONE) return;
fe->mask = fe->mask & (~mask);
if (fd == eventLoop->maxfd && fe->mask == AE_NONE) {
/* Update the max fd */
int j;
for (j = eventLoop->maxfd-1; j >= 0; j--)
if (eventLoop->events[j].mask != AE_NONE) break;
eventLoop->maxfd = j;
}
aeApiDelEvent(eventLoop, fd, mask);
}
static void aeGetTime(long *seconds, long *milliseconds)
@ -254,34 +270,18 @@ static int processTimeEvents(aeEventLoop *eventLoop) {
* The function returns the number of events processed. */
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
int maxfd = 0, numfd = 0, processed = 0;
fd_set rfds, wfds, efds;
aeFileEvent *fe = eventLoop->fileEventHead;
int processed = 0, numevents;
/* Nothing to do? return ASAP */
if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;
FD_ZERO(&rfds);
FD_ZERO(&wfds);
FD_ZERO(&efds);
/* Check file events */
if (flags & AE_FILE_EVENTS) {
while (fe != NULL) {
if (fe->mask & AE_READABLE) FD_SET(fe->fd, &rfds);
if (fe->mask & AE_WRITABLE) FD_SET(fe->fd, &wfds);
if (fe->mask & AE_EXCEPTION) FD_SET(fe->fd, &efds);
if (maxfd < fe->fd) maxfd = fe->fd;
numfd++;
fe = fe->next;
}
}
/* Note that we want call select() even if there are no
* file events to process as long as we want to process time
* events, in order to sleep until the next time event is ready
* to fire. */
if (numfd || ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
int retval;
if (eventLoop->maxfd != -1 ||
((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
int j;
aeTimeEvent *shortest = NULL;
struct timeval tv, *tvp;
@ -301,6 +301,8 @@ int aeProcessEvents(aeEventLoop *eventLoop, int flags)
} else {
tvp->tv_usec = (shortest->when_ms - now_ms)*1000;
}
if (tvp->tv_sec < 0) tvp->tv_sec = 0;
if (tvp->tv_usec < 0) tvp->tv_usec = 0;
} else {
/* If we have to check for events but need to return
* ASAP because of AE_DONT_WAIT we need to se the timeout
@ -314,38 +316,24 @@ int aeProcessEvents(aeEventLoop *eventLoop, int flags)
}
}
retval = select(maxfd+1, &rfds, &wfds, &efds, tvp);
if (retval > 0) {
fe = eventLoop->fileEventHead;
while(fe != NULL) {
int fd = (int) fe->fd;
numevents = aeApiPoll(eventLoop, tvp);
for (j = 0; j < numevents; j++) {
aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
int mask = eventLoop->fired[j].mask;
int fd = eventLoop->fired[j].fd;
if ((fe->mask & AE_READABLE && FD_ISSET(fd, &rfds)) ||
(fe->mask & AE_WRITABLE && FD_ISSET(fd, &wfds)) ||
(fe->mask & AE_EXCEPTION && FD_ISSET(fd, &efds)))
{
int mask = 0;
if (fe->mask & AE_READABLE && FD_ISSET(fd, &rfds))
mask |= AE_READABLE;
if (fe->mask & AE_WRITABLE && FD_ISSET(fd, &wfds))
mask |= AE_WRITABLE;
if (fe->mask & AE_EXCEPTION && FD_ISSET(fd, &efds))
mask |= AE_EXCEPTION;
fe->fileProc(eventLoop, fe->fd, fe->clientData, mask);
processed++;
/* After an event is processed our file event list
* may no longer be the same, so what we do
* is to clear the bit for this file descriptor and
* restart again from the head. */
fe = eventLoop->fileEventHead;
FD_CLR(fd, &rfds);
FD_CLR(fd, &wfds);
FD_CLR(fd, &efds);
} else {
fe = fe->next;
}
}
/* note the fe->mask & mask & ... code: maybe an already processed
* event removed an element that fired and we still didn't
* processed, so we check if the event is still valid. */
if (fe->mask & mask & AE_READABLE)
fe->rfileProc(eventLoop,fd,fe->clientData,mask);
if (fe->mask & mask & AE_WRITABLE && fe->wfileProc != fe->rfileProc)
fe->wfileProc(eventLoop,fd,fe->clientData,mask);
if (fe->mask & mask & AE_EXCEPTION &&
fe->efileProc != fe->wfileProc &&
fe->efileProc != fe->rfileProc)
fe->efileProc(eventLoop,fd,fe->clientData,mask);
processed++;
}
}
/* Check time events */