File: ThreadData.cpp

package info (click to toggle)
storm-lang 0.7.5-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 52,028 kB
  • sloc: ansic: 261,471; cpp: 140,432; sh: 14,891; perl: 9,846; python: 2,525; lisp: 2,504; asm: 860; makefile: 678; pascal: 70; java: 52; xml: 37; awk: 12
file content (391 lines) | stat: -rw-r--r-- 9,138 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
#include "stdafx.h"
#include "ThreadData.h"
#include "UThread.h"
#include "ThreadGroup.h"
#include "Shared.h"

#ifdef WINDOWS
#include <process.h>
// For COM - CoInitializeEx
#include <Objbase.h>
#endif

namespace os {

	// Data to a started thread.
	class ThreadStart {
	public:
		// Semaphore to indicate successful start.
		Semaphore sema;

		// Pointer to the ThreadData object for the current thread.
		// Valid when 'sema' has been decreased.
		ThreadData *data;

		// Function to execute.
		const util::Fn<void, void> &startFn;

		// ThreadWait behavior. May be null.
		ThreadWait *wait;

		// Thread group.
		ThreadGroupData *group;

		// Init.
		ThreadStart(const util::Fn<void, void> &fn, ThreadWait *wait, ThreadGroupData *group) :
			sema(0), startFn(fn), wait(wait), group(group) {}

		// Remove one reference from 'data', if present. The started thread
		// should set one reference in 'data', so that it may not exit
		// before the 'start' function has returned and reclaimed at least one
		// reference.
		~ThreadStart() {
			if (data)
				data->release();
		}
	};

	// This one is system-specific
	static void startThread(ThreadStart &start);

	// Capture thread- and OS-specific data for each thread.
	static void *captureOSExtraData();

	/**
	 * Thread data.
	 */

	ThreadData::ThreadData(void *stack, void *extra) :
		references(0), uState(this, stack), osExtra(extra) {}

	ThreadData::~ThreadData() {}

	void ThreadData::reportZero() {
		wakeCond.signal();
	}

	/**
	 * Thread
	 */

	Thread::Thread(ThreadData *data) : data(data) {
		if (data)
			data->addRef();
	}

	wostream &operator <<(wostream &to, const Thread &o) {
		return to << L"thread @" << o.data;
	}

	void Thread::attach(Handle h) const {
		data->attach(h);
	}

	void Thread::detach(Handle h) const {
		data->detach(h);
	}

	static void *mainStackBase = null;
	static void *mainStackOSData = null;

	Thread Thread::current() {
		ThreadData *t = currentThreadData();
		if (t)
			return Thread(t);

		assert(mainStackBase, L"Call 'Thread::setStackBase' before using 'Thread::current'");

		// The first thread, create its data...
		static ThreadData firstData(mainStackBase, mainStackOSData);
		// Keep the first thread from firing 'signal' all the time.
		static Thread first(&firstData);

		// Store it in the thread local variable. Otherwise "current" will not work when called from
		// other shared objects, since 'mainStackBase' is static to this file.
		currentThreadData(&firstData);

		return first;
	}

	void Thread::setStackBase(void *base) {
		mainStackBase = base;
		mainStackOSData = captureOSExtraData();
	}

	const Thread Thread::invalid = Thread(null);

	const InlineSet<Stack> &Thread::stacks() const {
		return data->uState.stacks;
	}

	vector<UThread> Thread::idleUThreads() {
		vector<UThread> result;
		data->uState.idleThreads(result);
		return result;
	}

	Thread Thread::spawn(ThreadGroup &group) {
		return spawn(util::Fn<void>(), group);
	}

	Thread Thread::spawn(const util::Fn<void, void> &fn, ThreadGroup &group) {
		ThreadStart start(fn, null, group.data);
		startThread(start);
		start.sema.down();

		Thread t(start.data);
		// Consume the additional reference, making sure the thread does not terminate before 'spawn' returns.
		start.data->release();
		return t;
	}

	Thread Thread::spawn(ThreadWait *wait, ThreadGroup &group) {
		util::Fn<void, void> f;
		ThreadStart start(f, wait, group.data);
		startThread(start);
		start.sema.down();

		Thread t(start.data);
		// Consume the additional reference, making sure the thread does not terminate before 'spawn' returns.
		start.data->release();
		return t;
	}

	void ThreadData::threadMain(ThreadStart &start, void *stackBase) {
		ThreadData d(stackBase, captureOSExtraData());
		d.addRef(); // Add a reference so that 'd' do not terminate prematurely.

		Thread::initThread();
		threadCreated();

		// Read data from 'start'.
		ThreadGroupData *group = start.group;
		util::Fn<void, void> fn = start.startFn;
		ThreadWait *wait = start.wait;
		d.wait = wait;

		// One reference is consumed when 'threadWait' is terminated.
		d.addRef();
		// One is used to prevent signaling the semaphore before we have finished doing our main job.
		d.addRef();

		// Remember our identity.
		currentThreadData(&d);

		// Initialize our group.
		group->addRef();
		group->threadStarted(&d);

		// Initialize any 'wait' struct before anyone is able to call 'signal' on it.
		if (wait)
			wait->init();

		// Report back.
		start.data = &d;
		start.sema.up();
		// From here on, do not touch 'start'.

		// Any other initialization required before we start executing code?
		if (wait)
			wait->setup();

		// Run the function we were told to execute.
		fn();

		// Specific wait behavior?
		if (wait) {
			do {
				// Run any spawned UThreads, interleave with anything we need to do.
				do {
					if (d.wait)
						wait->work();
				} while (UThread::leave());

			} while (d.wait && d.waitForWork());

			// Clean up the 'wait' structure.
			d.wait = null; // No more notifications, but we can not delete it yet!
		}

		// Go back to zero references, so that we may terminate!
		d.release();

		while (true) {
			// Either we have more references, or more UThreads to run.
			// Either way, it does not hurt to try to run the UThreads.
			while (UThread::leave())
				;

			// If the refcount is zero, we can safely say that no one else
			// can increase it after this point (assuming no UThreads).
			// At that point no one can add more UThreads either, so in
			// this case we can not have any race-conditions.

			if (atomicRead(d.references) == 0) {
				if (!UThread::any()) {
					// Attempt to shutdown!
					if (group->threadUnreachable(&d))
						break;

					// If we get here, the thread group handed out a new reference while we were
					// looking, and we need to continue working for a while.
				}
			}

			// Wait for the condition to fire. This is done whenever the
			// refcount reaches zero, or when a new UThread has been added.
			d.waitForWork();
		}

		// Now, no one has any knowledge of our existence, we can safely delete the 'wait' now.
		delete wait;

		// Report we terminated.
		group->threadTerminated();
		group->release();

		// Failsafe for the currThreadData.
		currentThreadData(null);

		d.ioComplete.close();
		threadTerminated();
		Thread::cleanThread();
	}

	void ThreadData::reportWake() {
		// We need to signal both in case we're in the process of exiting from the 'wait' behaviour.
		wakeCond.signal();
		if (wait)
			wait->signal();
	}

	bool ThreadData::waitForWork() {
		bool result = false;
		checkIo();

		nat sleepFor = 0;
		if (uState.nextWake(sleepFor)) {
			if (sleepFor > 0) {
				if (wait) {
					if (!wait->wait(ioComplete, sleepFor))
						wait = null;
					else
						result = true;
				} else {
					wakeCond.wait(ioComplete, sleepFor);
				}
			} else if (wait) {
				// Just assume 'wait' shall remain, since we did not ask it about its desires.
				result = true;
			}
			uState.wakeThreads();
		} else {
			if (wait) {
				if (!wait->wait(ioComplete))
					wait = null;
				else
					result = true;
			} else {
				wakeCond.wait(ioComplete);
			}
		}

		checkIo();
		return result;
	}

	void ThreadData::checkIo() {
		ioComplete.notifyAll(this);
	}

#ifdef WINDOWS
	// Windows-specific implementation.

#ifdef X86

	// If we are on X86, there is an option to enable another layer on top of SAFESEH: namely, that
	// the system examines the last element of the SEH linked list and verifies that it points to a
	// particular function (FinalExceptionHandler in ntdll, there are a number of offsets there, and
	// we probably need to point to the right one).

	struct SehRecord {
		SehRecord *prev;
		void *handler;
	};

	static void *captureOSExtraData() {
		// Capture the top of the SEH chain.
		SehRecord *first = null;
		__asm {
			mov eax, fs:[0];
			mov first, eax;
		}

		// Traverse it until we reach 0xFFFFFFFF, and remember the last handler.
		void *lastHandler = null;
		while (size_t(first) != size_t(-1)) {
			lastHandler = first->handler;
			first = first->prev;
		}

		return lastHandler;
	}

#else

	static void *captureOSExtraData() {
		return null;
	}

#endif

	static void winThreadMain(void *param) {
		ThreadStart *s = (ThreadStart *)param;
		ThreadData::threadMain(*s, &param);
	}

	static void startThread(ThreadStart &start) {
		_beginthread(&winThreadMain, 0, &start);
	}

	void Thread::initThread() {
		CoInitializeEx(NULL, COINIT_APARTMENTTHREADED | COINIT_SPEED_OVER_MEMORY);
	}

	void Thread::cleanThread() {
		CoUninitialize();
	}

#else

	static void *posixThreadMain(void *param) {
		ThreadStart *s = (ThreadStart *)param;
		ThreadData::threadMain(*s, &param);
		return null;
	}

	static void startThread(ThreadStart &start) {
		pthread_t thread;
		pthread_create(&thread, null, &posixThreadMain, &start);
		pthread_detach(thread);
	}

	static void *captureOSExtraData() {
		return null;
	}

	void Thread::initThread() {}

	void Thread::cleanThread() {}

#endif

	ThreadWait::~ThreadWait() {}

	void ThreadWait::init() {}

	void ThreadWait::setup() {}

	void ThreadWait::work() {}

}