File: iom_wait.c

package info (click to toggle)
libowfat 0.34-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 5,288 kB
  • sloc: ansic: 20,181; makefile: 16
file content (211 lines) | stat: -rw-r--r-- 7,146 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
#define _POSIX_C_SOURCE 200809L
#include "io_internal.h"
#ifdef HAVE_EPOLL
#include <sys/epoll.h>
#endif
#include <sys/time.h>
#ifdef HAVE_KQUEUE
#include <sys/types.h>
#include <sys/event.h>
#endif
#include <errno.h>
#ifdef __dietlibc__
#include <threads.h>
#else
#include <semaphore.h>
#endif

// return the next event, waiting if none are queued
// wait at most timeout milliseconds
// on success, return 1 and return the fd in *s and the events on it in *revents
// if we waited but ran into a timeout, return 0
// if we run into a system error, return -1
// if another thread aborted this iomux, return -2
int iom_wait(iomux_t* c,int64* s,unsigned int* revents,unsigned long timeout) {
  for (;;) {
    /* If we have an event in the queue, use that one */
    int r;
    if (c->working==-2) return -2;	/* iomux was aborted */

    for (;;) {		// CAS-loop get the first element from the queue
      unsigned int f=c->l;	// c is a ring buffer, c->l is low, c->h is high
      // f is here to prevent double fetches from the volatile low water mark
      if (f == c->h)
	break; /* no elements in queue */
      // We want to grab the first element but other threads might be
      // racing us. So first grab the event from the low water mark in
      // the ring buffer, then increment the low water mark atomically,
      // and if that worked we know we grabbed the right event.
      int n=(f+1)%SLOTS;	// next value for c->l

      *s=c->q[f].fd;		// grab event we think is next in line
      *revents=c->q[f].events;

      // now atomically increment low water mark
      if (__sync_bool_compare_and_swap(&c->l,f,n)) {
	/* Nobody snatched the event from us. Report success */
	return 1;
      }
      /* collided with another thread, try again */
    }

    /* The queue was empty. If someone else is already calling
    * epoll_wait/kevent, then use the semaphore */
    if (__sync_bool_compare_and_swap(&c->working,0,1)) {
      /* If we get here, we got the lock and no other thread is
       * attempting to fill the queue at the same time. However,
       * another thread could have interrupted and refilled the job
       * queue already, so check if that happened. */
      if (c->h != c->l) {
	/* set working back to 0 unless someone set it to -2 in the mean time (iom_abort) */
	if (__sync_val_compare_and_swap(&c->working,1,0)==-2) return -2;
	continue;	// this is why we have an outer for loop, so we don't need goto here
      }

      /* We have the job to fill the struct. */

#ifdef HAVE_EPOLL
      struct epoll_event ee[SLOTS];
      int i;
      r=epoll_wait(c->ctx, ee, SLOTS, timeout);
      if (r <= 0) {
	/* epoll_wait returned a timeout or an error! */
	/* relinquish the lock and return 0 / -1 */
	if (__sync_val_compare_and_swap(&c->working,1,0)==-2) return -2;
#ifdef __dietlibc__
	// for timeout we want to hand off to one other thread, no need
	// to wake them all up. Error might be transient (EINTR) and the
	// next guy might succeed, so only wake one up. If the error was
	// not transient, then they will also get an error and wake the
	// next up
	cnd_signal(&c->sem);
#elif defined(__APPLE__)
	pthread_cond_signal(&c->sem);
#else
	sem_post(&c->sem);
#endif
	return r;
      }
      for (i=0; i<r; ++i) {
	/* convert events */
	int e = ((ee[i].events & (EPOLLIN|EPOLLHUP|EPOLLERR)) ? IOM_READ : 0) |
	        ((ee[i].events & (EPOLLOUT|EPOLLHUP|EPOLLERR)) ? IOM_WRITE : 0) |
	        ((ee[i].events & EPOLLERR) ? IOM_ERROR : 0);
	if (i+1==r) {
	  /* return last event instead of enqueueing it */
	  *s=ee[i].data.fd;
	  *revents=e;
	  /* loop terminates here, but no return statement because we
	   * still need to signal the semaphore below */
	} else {
	  /* The CAS loop on c->working above ensures we are the only one writing to c->h */
	  size_t hcapture = __atomic_load_n(&c->h, __ATOMIC_ACQUIRE);	// c->h is volatile so make copy to perf-avoid double fetch
	  c->q[hcapture].fd=ee[i].data.fd;
	  c->q[hcapture].events=e;
//	  c->h = (hcapture + 1) % SLOTS;
	  // use __atomic_store so ARM hardware writes c->q before c->h
	  __atomic_store_n(&c->h, (hcapture + 1) % SLOTS, __ATOMIC_RELEASE);
	}
      }
#elif defined(HAVE_KQUEUE)
      struct kevent kev[SLOTS];
      struct timespec ts = { .tv_sec=timeout/1000, .tv_nsec=(timeout%1000)*1000000 };
      int r=kevent(c->ctx, 0, 0, kev, SLOTS, &ts);
      int i;
      if (r<=0) {
	/* kevent returned a timeout or an error! */
	/* relinquish the lock and return 0 / -1 */
	if (__sync_val_compare_and_swap(&c->working,1,0)==-2) return -2;
#ifdef __dietlibc__
	// no dietlibc for kqueue based systems yet
	cnd_broadcast(&c->sem);
#elif defined(__APPLE__)
	pthread_cond_broadcast(&c->sem);
#else
	sem_post(&c->sem);
#endif
	return r;
      }
      for (i=0; i<r; ++i) {
	/* convert events */
	int e = (kev[i].filter == EVFILT_READ ? IOM_READ : 0) |
	        (kev[i].filter == EVFILT_WRITE ? IOM_WRITE : 0);
	if (i+1==r) {
	  /* return last event instead of enqueueing it */
	  *s=kev[i].ident;
	  *revents=e;
	  /* loop terminates here, but no return statement because we
	   * still need to signal the semaphore below */
	} else {
	  /* The CAS loop on c->working above ensures we are the only one writing to c->h */
	  size_t hcapture = __atomic_load_n(&c->h, __ATOMIC_ACQUIRE);	// c->h is volatile so make copy to perf-avoid double fetch
	  c->q[hcapture].fd=kev[i].ident;
	  c->q[hcapture].events=e;
//	  c->h = (c->h + 1) % SLOTS;
	  // use __atomic_store so ARM hardware writes c->q before c->h
	  __atomic_store_n(&c->h, (hcapture + 1) % SLOTS, __ATOMIC_RELEASE);
	}
      }
#else
#warning "only epoll and kqueue supported for now"
#endif
      /* We need to signal the other threads.
	 Either there are other events left, or we need one of them to
	 wake up and call epoll_wait/kevent next, because we aren't
	 doing it anymore */
      if (__sync_val_compare_and_swap(&c->working,1,0)==-2) return -2;
#ifdef __dietlibc__
      if (c->h == (c->l + 1) % SLOTS)
	cnd_signal(&c->sem);
      else
	cnd_broadcast(&c->sem);
#elif defined(__APPLE__)
      if (c->h == (c->l + 1) % SLOTS)
	pthread_cond_signal(&c->sem);
      else
	pthread_cond_broadcast(&c->sem);
#else
      sem_post(&c->sem);
#endif
      return 1;
    } else {
      /* somebody else has the job to fill the queue */
      struct timespec ts;
      struct timeval tv;
      gettimeofday(&tv, NULL);
      tv.tv_sec += timeout/1000;
      tv.tv_usec += timeout%1000;
      if (tv.tv_usec>1000000) {
	tv.tv_usec-=1000000;
	++tv.tv_sec;
      }
      ts.tv_sec = tv.tv_sec;
      ts.tv_nsec = tv.tv_usec * 1000;
#ifdef __dietlibc__
      r=cnd_timedwait(&c->sem,&c->mtx,&ts);
      switch (r) {
      case thrd_success:
	continue;
      case thrd_timedout:
	return 0;
      case thrd_error:
	return -1;
      }
#elif defined(__APPLE__)
      r=pthread_cond_timedwait(&c->sem,&c->mtx,&ts);
      switch (r) {
      case 0: continue;
      case ETIMEDOUT: return 0;
      default: return -1;
      }
#else
      r=sem_timedwait(&c->sem,&ts);
      if (r==-1) {
	if (errno==ETIMEDOUT) return 0;
	return -1;
      }
#endif
      /* fall through into next loop iteration */
    }
  }
}