1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211
|
#define _POSIX_C_SOURCE 200809L
#include "io_internal.h"
#ifdef HAVE_EPOLL
#include <sys/epoll.h>
#endif
#include <sys/time.h>
#ifdef HAVE_KQUEUE
#include <sys/types.h>
#include <sys/event.h>
#endif
#include <errno.h>
#ifdef __dietlibc__
#include <threads.h>
#else
#include <semaphore.h>
#endif
// return the next event, waiting if none are queued
// wait at most timeout milliseconds
// on success, return 1 and return the fd in *s and the events on it in *revents
// if we waited but ran into a timeout, return 0
// if we run into a system error, return -1
// if another thread aborted this iomux, return -2
int iom_wait(iomux_t* c,int64* s,unsigned int* revents,unsigned long timeout) {
for (;;) {
/* If we have an event in the queue, use that one */
int r;
if (c->working==-2) return -2; /* iomux was aborted */
for (;;) { // CAS-loop get the first element from the queue
unsigned int f=c->l; // c is a ring buffer, c->l is low, c->h is high
// f is here to prevent double fetches from the volatile low water mark
if (f == c->h)
break; /* no elements in queue */
// We want to grab the first element but other threads might be
// racing us. So first grab the event from the low water mark in
// the ring buffer, then increment the low water mark atomically,
// and if that worked we know we grabbed the right event.
int n=(f+1)%SLOTS; // next value for c->l
*s=c->q[f].fd; // grab event we think is next in line
*revents=c->q[f].events;
// now atomically increment low water mark
if (__sync_bool_compare_and_swap(&c->l,f,n)) {
/* Nobody snatched the event from us. Report success */
return 1;
}
/* collided with another thread, try again */
}
/* The queue was empty. If someone else is already calling
* epoll_wait/kevent, then use the semaphore */
if (__sync_bool_compare_and_swap(&c->working,0,1)) {
/* If we get here, we got the lock and no other thread is
* attempting to fill the queue at the same time. However,
* another thread could have interrupted and refilled the job
* queue already, so check if that happened. */
if (c->h != c->l) {
/* set working back to 0 unless someone set it to -2 in the mean time (iom_abort) */
if (__sync_val_compare_and_swap(&c->working,1,0)==-2) return -2;
continue; // this is why we have an outer for loop, so we don't need goto here
}
/* We have the job to fill the struct. */
#ifdef HAVE_EPOLL
struct epoll_event ee[SLOTS];
int i;
r=epoll_wait(c->ctx, ee, SLOTS, timeout);
if (r <= 0) {
/* epoll_wait returned a timeout or an error! */
/* relinquish the lock and return 0 / -1 */
if (__sync_val_compare_and_swap(&c->working,1,0)==-2) return -2;
#ifdef __dietlibc__
// for timeout we want to hand off to one other thread, no need
// to wake them all up. Error might be transient (EINTR) and the
// next guy might succeed, so only wake one up. If the error was
// not transient, then they will also get an error and wake the
// next up
cnd_signal(&c->sem);
#elif defined(__APPLE__)
pthread_cond_signal(&c->sem);
#else
sem_post(&c->sem);
#endif
return r;
}
for (i=0; i<r; ++i) {
/* convert events */
int e = ((ee[i].events & (EPOLLIN|EPOLLHUP|EPOLLERR)) ? IOM_READ : 0) |
((ee[i].events & (EPOLLOUT|EPOLLHUP|EPOLLERR)) ? IOM_WRITE : 0) |
((ee[i].events & EPOLLERR) ? IOM_ERROR : 0);
if (i+1==r) {
/* return last event instead of enqueueing it */
*s=ee[i].data.fd;
*revents=e;
/* loop terminates here, but no return statement because we
* still need to signal the semaphore below */
} else {
/* The CAS loop on c->working above ensures we are the only one writing to c->h */
size_t hcapture = __atomic_load_n(&c->h, __ATOMIC_ACQUIRE); // c->h is volatile so make copy to perf-avoid double fetch
c->q[hcapture].fd=ee[i].data.fd;
c->q[hcapture].events=e;
// c->h = (hcapture + 1) % SLOTS;
// use __atomic_store so ARM hardware writes c->q before c->h
__atomic_store_n(&c->h, (hcapture + 1) % SLOTS, __ATOMIC_RELEASE);
}
}
#elif defined(HAVE_KQUEUE)
struct kevent kev[SLOTS];
struct timespec ts = { .tv_sec=timeout/1000, .tv_nsec=(timeout%1000)*1000000 };
int r=kevent(c->ctx, 0, 0, kev, SLOTS, &ts);
int i;
if (r<=0) {
/* kevent returned a timeout or an error! */
/* relinquish the lock and return 0 / -1 */
if (__sync_val_compare_and_swap(&c->working,1,0)==-2) return -2;
#ifdef __dietlibc__
// no dietlibc for kqueue based systems yet
cnd_broadcast(&c->sem);
#elif defined(__APPLE__)
pthread_cond_broadcast(&c->sem);
#else
sem_post(&c->sem);
#endif
return r;
}
for (i=0; i<r; ++i) {
/* convert events */
int e = (kev[i].filter == EVFILT_READ ? IOM_READ : 0) |
(kev[i].filter == EVFILT_WRITE ? IOM_WRITE : 0);
if (i+1==r) {
/* return last event instead of enqueueing it */
*s=kev[i].ident;
*revents=e;
/* loop terminates here, but no return statement because we
* still need to signal the semaphore below */
} else {
/* The CAS loop on c->working above ensures we are the only one writing to c->h */
size_t hcapture = __atomic_load_n(&c->h, __ATOMIC_ACQUIRE); // c->h is volatile so make copy to perf-avoid double fetch
c->q[hcapture].fd=kev[i].ident;
c->q[hcapture].events=e;
// c->h = (c->h + 1) % SLOTS;
// use __atomic_store so ARM hardware writes c->q before c->h
__atomic_store_n(&c->h, (hcapture + 1) % SLOTS, __ATOMIC_RELEASE);
}
}
#else
#warning "only epoll and kqueue supported for now"
#endif
/* We need to signal the other threads.
Either there are other events left, or we need one of them to
wake up and call epoll_wait/kevent next, because we aren't
doing it anymore */
if (__sync_val_compare_and_swap(&c->working,1,0)==-2) return -2;
#ifdef __dietlibc__
if (c->h == (c->l + 1) % SLOTS)
cnd_signal(&c->sem);
else
cnd_broadcast(&c->sem);
#elif defined(__APPLE__)
if (c->h == (c->l + 1) % SLOTS)
pthread_cond_signal(&c->sem);
else
pthread_cond_broadcast(&c->sem);
#else
sem_post(&c->sem);
#endif
return 1;
} else {
/* somebody else has the job to fill the queue */
struct timespec ts;
struct timeval tv;
gettimeofday(&tv, NULL);
tv.tv_sec += timeout/1000;
tv.tv_usec += timeout%1000;
if (tv.tv_usec>1000000) {
tv.tv_usec-=1000000;
++tv.tv_sec;
}
ts.tv_sec = tv.tv_sec;
ts.tv_nsec = tv.tv_usec * 1000;
#ifdef __dietlibc__
r=cnd_timedwait(&c->sem,&c->mtx,&ts);
switch (r) {
case thrd_success:
continue;
case thrd_timedout:
return 0;
case thrd_error:
return -1;
}
#elif defined(__APPLE__)
r=pthread_cond_timedwait(&c->sem,&c->mtx,&ts);
switch (r) {
case 0: continue;
case ETIMEDOUT: return 0;
default: return -1;
}
#else
r=sem_timedwait(&c->sem,&ts);
if (r==-1) {
if (errno==ETIMEDOUT) return 0;
return -1;
}
#endif
/* fall through into next loop iteration */
}
}
}
|