File: IOHandle.cpp

package info (click to toggle)
storm-lang 0.7.5-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 52,028 kB
  • sloc: ansic: 261,471; cpp: 140,432; sh: 14,891; perl: 9,846; python: 2,525; lisp: 2,504; asm: 860; makefile: 678; pascal: 70; java: 52; xml: 37; awk: 12
file content (321 lines) | stat: -rw-r--r-- 8,026 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
#include "stdafx.h"
#include "IOHandle.h"
#include "IORequest.h"

#ifdef LINUX_IO_URING
#include <sys/eventfd.h>
#endif

namespace os {

#if defined(WINDOWS)

	IOHandle::IOHandle() : handle(NULL), pending(0) {}

	IOHandle::IOHandle(HANDLE h) : handle(h), pending(0) {}

	HANDLE IOHandle::v() const {
		/**
		 * NOTE: Due to a bug? in windows 7, an IO Completion Port is signaled at all times when it
		 * is not associated with any file handles. Because of this, we keep track of the current
		 * number of pending IO requests and pretend that we do not have an IO Completion Port if
		 * there are no outstanding requests for this thread.
		 */
		if (atomicRead(pending) > 0)
			return handle;
		else
			return NULL;
	}

	void IOHandle::add(Handle h, const ThreadData *id) {
		HANDLE r = CreateIoCompletionPort(h.v(), handle, (ULONG_PTR)id, 1);
		if (r == NULL) {
			// This fails if the handle does not have the OVERLAPPED flag set.
			// PLN(L"ERROR: " << GetLastError());
		} else {
			handle = r;
		}
	}

	void IOHandle::remove(Handle h, const ThreadData *id) {
		// Not needed.
	}

	void IOHandle::notifyAll(const ThreadData *id) const {
		if (!handle)
			return;

		while (true) {
			DWORD bytes = 0;
			ULONG_PTR key = 0;
			OVERLAPPED *request = NULL;
			BOOL ok = GetQueuedCompletionStatus(handle, &bytes, &key, &request, 0);
			int error = GetLastError();

			if (request) {
				// PLN(L"Got status: " << bytes << L", " << key << L", " << request << L", " << ok);
				if ((ULONG_PTR)id == key) {
					IORequest *r = (IORequest *)request;

					if (ok)
						r->complete(nat(bytes));
					else
						r->failed(nat(bytes), error);
				}
			} else {
				// Nothing was dequeued, abort.
				break;
			}
		}
	}

	void IOHandle::close() {
		if (handle)
			CloseHandle(handle);
		handle = NULL;
	}

	void IOHandle::attach() {
		atomicIncrement(pending);
	}

	void IOHandle::detach() {
		assert(atomicRead(pending) > 0);
		atomicDecrement(pending);
	}

#elif defined(LINUX_IO_URING)

	IOHandle::IOHandle() : linuxIO() {
		// Create an eventfd and set it up to receive notifications.

		// Note: We *could* utilize the fact that IOCondition already contains an eventfd and use
		// that directly instead. We would, however, need a fallback to using our own eventfd to be
		// compatible with the Gtk integration in the Gui library.

		eventfd = ::eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);
		if (eventfd < 0) {
			perror("Failed to create an eventfd");
			assert(false);
		}

		linuxIO.attachEventfd(eventfd);

		pollfds[1].fd = eventfd;
		pollfds[1].events = POLLIN;
		pollfds[1].revents = 0;
	}

	IOHandle::~IOHandle() {
		if (eventfd >= 0)
			::close(eventfd);
	}

	void IOHandle::close() {
		::close(eventfd);
		eventfd = -1;
	}

	void IOHandle::attach(Handle h, IORequest *request) {
		util::Lock::L z(lock);
		activeRequests.insert(request);
		activeFDs.insert(std::make_pair(h.v(), request));

		// We set user_data to a pointer to the request. Otherwise we won't know what is what.
		// Idea: We could simply put active requests in an array and have user_data be indices into
		// that array. That would be cheaper, but it would make duplicate notifications more
		// noteable, however.
		request->request.user_data = size_t(request);
		linuxIO.submit(request->request);
	}

	void IOHandle::attachAndRemove(Handle h, IORequest *request) {
		util::Lock::L z(lock);

		// We do these two while holding the lock. The key thing is that 'remove' does not block.
		remove(h, null);
		attach(h, request);
	}

	void IOHandle::detach(Handle h, IORequest *request) {
		util::Lock::L z(lock);
		activeRequests.erase(request);
		activeFDs.erase(std::make_pair(h.v(), request));
	}

	void IOHandle::cancel(IORequest *request) {
		util::Lock::L z(lock);
		if (activeRequests.count(request)) {
			struct io_uring_sqe job;
			zeroMem(job);
			job.opcode = IORING_OP_ASYNC_CANCEL;
			job.addr = size_t(request);
			job.user_data = 0;
			linuxIO.submit(job);
		}
	}

	void IOHandle::notifyAll(const ThreadData *id) {
		UNUSED(id);

		// Read the eventfd to see if we should check the LinuxIO object. This also resets the event
		// to non-signalling.
		uint64_t fromfd = 0;
		ssize_t readResult = read(eventfd, &fromfd, sizeof(fromfd));
		// If 'read' returned 0, or if 'fromfd' (= the old value in the eventfd) was 0, then
		// the event was not signalled (read returned 0 => -EAGAIN or -EWOULDBLOCK).
		if (readResult == 0 || fromfd == 0)
			return;

		// All good, check the LinuxIO implementation!
		struct io_uring_cqe entry;
		while (linuxIO.get(entry)) {
			IORequest *request = reinterpret_cast<IORequest *>(entry.user_data);
			if (request == null) {
				// We use user_data = 0 for internal requests to cancel. We can just ignore them if
				// they are successul.
				if (entry.res) {
					WARNING(L"Failing internal io_uring request: " << entry.res);
				}
				continue;
			}
			{
				util::Lock::L z(lock);
				// This has two purposes: we both remove the request from 'active' (we received a
				// response, we don't expect another), and check that we did not get a spurious
				// request from somewhere and write to random locations in memory.
				if (activeRequests.erase(request) == 0) {
					// This is only useful for debugging.
					WARNING(L"Ignoring completion to detached request: " << request);
					continue;
				}
			}

			// We want to do this outside of the lock.
			request->onFinish(entry.res);
		}
	}

	void IOHandle::add(Handle h, const ThreadData *id) {
		UNUSED(h);
		UNUSED(id);
	}

	void IOHandle::remove(Handle h, const ThreadData *id) {
		UNUSED(id);

		util::Lock::L z(lock);

		// Remove outstanding requests for handles by asking for cancellation:
		IORequest *empty = null;
		auto begin = activeFDs.lower_bound(std::make_pair(h.v(), empty));
		auto end = activeFDs.lower_bound(std::make_pair(h.v() + 1, empty));

		for (; begin != end; begin++) {
			// Note: 'cancel' does not modify 'activeFDs'.
			cancel(begin->second);
		}
	}

	IOHandle::Desc IOHandle::desc() {
		IOHandle::Desc r = {
			pollfds,
			2
		};
		return r;
	}

#elif defined(POSIX)

	IOHandle::IOHandle() {}

	IOHandle::~IOHandle() {}

	static short type(IORequest::Type type) {
		switch (type) {
		case IORequest::read:
			return POLLIN;
		case IORequest::write:
			return POLLOUT;
		default:
			return 0;
		}
	}

	void IOHandle::attach(Handle h, IORequest *wait) {
		util::Lock::L z(lock);
		handles.put(h.v(), type(wait->type), wait);
	}

	void IOHandle::detach(Handle h, IORequest *wait) {
		util::Lock::L z(lock);
		for (nat pos = handles.find(h.v()); pos < handles.capacity(); pos = handles.next(pos)) {
			if (handles.valueAt(pos) == wait) {
				handles.remove(pos);
				break;
			}
		}
	}

	void IOHandle::notifyAll(const ThreadData *id) {
		UNUSED(id);
		util::Lock::L z(lock);

		struct pollfd *wait = handles.data();

		// See if we need to ask the OS for new events...
		bool any = false;
		for (size_t i = 0; i < handles.capacity(); i++)
			if (wait[i + 1].fd >= 0 && wait[i + 1].revents)
				any = true;

		if (!any) {
			// Find new events.
			int r = poll(wait + 1, handles.capacity(), 0);

			// Any use checking for events?
			if (r <= 0)
				return;
		}

		for (size_t i = 0; i < handles.capacity(); i++) {
			if (wait[i + 1].revents) {
				// Something happened!
				if (IORequest *r = handles.valueAt(i))
					r->wake.set();
			}
			wait[i + 1].revents = 0;
		}
	}

	IOHandle::Desc IOHandle::desc() {
		util::Lock::L z(lock);
		Desc d = { handles.data(), handles.capacity() + 1 };
		return d;
	}

	void IOHandle::add(Handle h, const ThreadData *id) {
		// Nothing to do on POSIX.
	}

	void IOHandle::remove(Handle h, const ThreadData *id) {
		util::Lock::L z(lock);

		// Mark all pending things as 'complete' and remove them.
		for (nat pos = handles.find(h.v()); pos < handles.capacity(); pos = handles.find(h.v())) {
			IORequest *r = handles.valueAt(pos);
			r->closed = true;
			r->wake.set();

			// Remove!
			handles.remove(pos);
		}
	}

	void IOHandle::close() {
		// Nothing to do.
	}

#endif

}