File: gsCore.c

package info (click to toggle)
openmohaa 0.82.1%2Bdfsg-1
  • links: PTS, VCS
  • area: contrib
  • in suites: forky, sid
  • size: 34,192 kB
  • sloc: cpp: 315,720; ansic: 275,789; sh: 312; xml: 246; asm: 141; makefile: 7
file content (446 lines) | stat: -rw-r--r-- 12,934 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
///////////////////////////////////////////////////////////////////////////////
///////////////////////////////////////////////////////////////////////////////
// Core task/callback manager
#include "gsPlatform.h"
#include "gsPlatformThread.h"

#include "gsCommon.h"
#include "gsCore.h"
#include "gsAssert.h"
#include "../ghttp/ghttp.h"




// This defines how long the core will wait if there is a thread synchronization
// problem when initializing or shutting down the core.  
#define GSI_CORE_INIT_YIELD_MS      100
#define GSI_CORE_SHUTDOWN_YIELD_MS  50


///////////////////////////////////////////////////////////////////////////////
///////////////////////////////////////////////////////////////////////////////
static GSCoreMgr* gsiGetStaticCore()
{
	static GSCoreMgr gStaticCore;
	return &gStaticCore;
}


///////////////////////////////////////////////////////////////////////////////
///////////////////////////////////////////////////////////////////////////////
// This is registered with the ANSI atexit() function
//     - don't do anything that might fail
//     - don't do anything that won't complete instantly
//     - don't do anything that requires other objects/resources to exist
static void gsiCoreAtExitShutdown(void)
{
	// delete queue critical section
	GSCoreMgr * aCore = gsiGetStaticCore();
	gsiDeleteCriticalSection(&aCore->mQueueCrit);
	GSI_UNUSED(aCore);
}


///////////////////////////////////////////////////////////////////////////////
///////////////////////////////////////////////////////////////////////////////
// Increment core ref count, initialize the core if necessary
//     - WARNING:  This code is a bit tricky do to multithread issues
void gsCoreInitialize()
{
	GSCoreMgr* aCore = gsiGetStaticCore();

	// Is someone else shutting down the core?
	while(gsi_is_true(aCore->mIsShuttingDown))
		msleep(GSI_CORE_INIT_YIELD_MS); // yield to other thread

	// If we're the first reference, initialize the core
	if (gsiInterlockedIncrement(&aCore->mRefCount) == 1)
	{
		// Are we the first ever?
		if (gsi_is_false(aCore->mIsStaticInitComplete))
		{
			// perform one-time initialization of core critical section
			gsiInitializeCriticalSection(&aCore->mQueueCrit);

			// register function to destroy critical section at program termination
			#ifndef _MANAGED
				atexit(gsiCoreAtExitShutdown);
			#endif

			// one time init completed
			aCore->mIsStaticInitComplete = gsi_true;
		}

		// take the critical section to begin initialization
		// this is necessary in case another thread began shutdown before we incremented ref count
		gsiEnterCriticalSection(&aCore->mQueueCrit);
		gsiLeaveCriticalSection(&aCore->mQueueCrit);

		// wait here if another thread is concurrently shutting down the core
		// we may need to wait a few times if the shutdown does not complete immediately
		while(gsi_is_true(aCore->mIsShuttingDown))
			msleep(GSI_CORE_INIT_YIELD_MS);

		// Setup the task array
		#ifdef GSICORE_DYNAMIC_TASK_LIST
			aCore->mTaskArray = ArrayNew(sizeof(GSTask*), 10, NULL);
			GS_ASSERT(aCore->mTaskArray);
		#else
			memset(aCore->mTaskArray, 0, sizeof(aCore->mTaskArray));
		#endif

		// Init http sdk (ghttp is ref counted)
		ghttpStartup();

		// release other threads that may have blocked during init
		//     - this must be the last thing done at end of init
		aCore->mIsInitialized = gsi_true;
	}
	else
	{
		// Core is already initialized -OR- another thread will initialize the core
		
		// make sure critical section has been initialized
		while(gsi_is_false(aCore->mIsStaticInitComplete))
			msleep(GSI_CORE_INIT_YIELD_MS);

		// take the critical section
		// this is necessary in case another thread began shutdown before we incremented ref count
		gsiEnterCriticalSection(&aCore->mQueueCrit);
		gsiLeaveCriticalSection(&aCore->mQueueCrit);

		// wait for other thread to initial core
		while(gsi_is_false(aCore->mIsInitialized))
			msleep(GSI_CORE_INIT_YIELD_MS); 
	}
}


///////////////////////////////////////////////////////////////////////////////
///////////////////////////////////////////////////////////////////////////////
static void gsiCoreTaskDispatchCallback(GSTask *theTask, GSTaskResult theResult)
{
	if (theTask->mIsCallbackPending)
	{
		theTask->mIsCallbackPending = 0;
		if (theTask->mCallbackFunc)
			(theTask->mCallbackFunc)(theTask->mTaskData, theResult);
	}
}


///////////////////////////////////////////////////////////////////////////////
///////////////////////////////////////////////////////////////////////////////
// Return values:
//     GSTaskResult_InProgress - Keep calling gsCoreTaskThink
//     GSTaskResult_Finished   - Task memory freed; task object is now invalid
GSTaskResult gsCoreTaskThink(GSTask* theTask)
{
	GSCoreMgr* aCore = gsiGetStaticCore();
	GSTaskResult aResult = GSTaskResult_None;

	if (theTask == NULL)
		return GSTaskResult_Finished;

	// If the task is running let it think (it may be cancelled and still running)
	if (theTask->mIsRunning && theTask->mThinkFunc)
		aResult = (theTask->mThinkFunc)(theTask->mTaskData);

	// Check for time out
	if ((!theTask->mIsCanceled) && (aResult == GSTaskResult_InProgress))
	{
		if ((theTask->mTimeout != 0) && (current_time() - theTask->mStartTime > theTask->mTimeout))
		{
			// Cancel the task...
			gsiCoreCancelTask(theTask);

			// ...but trigger callback immediately with "Timed Out"
			gsiCoreTaskDispatchCallback(theTask, GSTaskResult_TimedOut);
		}
		//else
		//    continue processing it
	}
	else if (aResult != GSTaskResult_InProgress)
	{
		// Note: This section may be triggered multiple times if the cleanup
		//       function fails.  (possibly due to lack of memory)
		int i=0;
		gsi_bool removeTask = gsi_true;

		// Call the callback if we haven't already
		if (theTask->mIsRunning)
		{
			gsiCoreTaskDispatchCallback(theTask, aResult);
			theTask->mIsRunning = 0;
		}

		// Call Cleanup hook and remove task
		if (theTask->mCleanupFunc)
			removeTask = (theTask->mCleanupFunc)(theTask->mTaskData);

		// Remove the task
		if (gsi_is_true(removeTask))
		{
			gsiEnterCriticalSection(&aCore->mQueueCrit);
			#ifdef GSICORE_DYNAMIC_TASK_LIST
			{
				int len = ArrayLength(aCore->mTaskArray);
				for (i=0; i < len; i++)
				{
					if(*(GSTask**)ArrayNth(aCore->mTaskArray, i) == theTask)
					{
						ArrayRemoveAt(aCore->mTaskArray, i);
						gsifree(theTask);
						break;
					}
				}
			}
			#else
				for (i=0; i < GSICORE_MAXTASKS; i++)
				{
					if (aCore->mTaskArray[i] == theTask)
					{
						aCore->mTaskArray[i] = NULL;
						gsifree(theTask);
						break;
					}
				}
			#endif
			gsiLeaveCriticalSection(&aCore->mQueueCrit);
			return GSTaskResult_Finished;
		}
	}

	// Note: This function should always return InProgress until
	//       the task has been removed from the TaskArray.
	//       The developer may have already received a completed callback
	//       while this continue to return InProgress meaning "still needs to be pumped"
	return GSTaskResult_InProgress;
}


///////////////////////////////////////////////////////////////////////////////
///////////////////////////////////////////////////////////////////////////////
// Optional maximum processing time
//    - Pass in 0 to process each task once
void gsCoreThink(gsi_time theMS)
{
	GSCoreMgr* aCore = gsiGetStaticCore();
	int i=0;
	gsi_time aStartTime = 0;
	gsi_i32 allTasksAreDead = 1;

	if (gsi_is_false(aCore->mIsInitialized))
		return;

	// enter queue critical section
	gsiEnterCriticalSection(&aCore->mQueueCrit);

	// start timing
	aStartTime = current_time();

	// process all tasks in the queue, dispatch callbacks
	// cancelled tasks continue processing until the cancel is acknowledge by the task
	#ifdef GSICORE_DYNAMIC_TASK_LIST
	{
		int len = ArrayLength(aCore->mTaskArray);
		if(len > 0)
			allTasksAreDead = 0;
		for(i=(len-1); i>=0; i--)
		{
			GSTask* task = *(GSTask**)ArrayNth(aCore->mTaskArray, i);
			if(gsi_is_true(task->mAutoThink))
				gsCoreTaskThink(task);
			if (theMS != 0 && (current_time()-aStartTime > theMS))
				break;
		}
	}
	#else
		for (i=0; i<GSICORE_MAXTASKS; i++)
		{
			if (aCore->mTaskArray[i] != NULL)
			{
				allTasksAreDead = 0;

				if (aCore->mTaskArray[i]->mAutoThink == gsi_true)
					gsCoreTaskThink(aCore->mTaskArray[i]);
			}
			// Enough time to process another? (if not, break)
			if (theMS != 0 && (current_time()-aStartTime > theMS))
				break;
		}
	#endif

	// shutting down?
	if (aCore->mIsShuttingDown && allTasksAreDead)
	{
		ghttpCleanup();

#ifdef GSICORE_DYNAMIC_TASK_LIST
        if(aCore->mTaskArray)
        {
            ArrayFree(aCore->mTaskArray);
            aCore->mTaskArray = NULL;
        }
#endif

		aCore->mIsShuttingDown = 0;
	}

	// leave queue critical section
	gsiLeaveCriticalSection(&aCore->mQueueCrit);
	
	GSI_UNUSED(theMS);
}


///////////////////////////////////////////////////////////////////////////////
///////////////////////////////////////////////////////////////////////////////
void gsCoreShutdown()
{
	GSCoreMgr* aCore = gsiGetStaticCore();
	int i=0;

	// If not initialized, just bail
	if (gsi_is_false(aCore->mIsInitialized))
		return;

	// Take the critical section to prevent anyone from re-initializing while
	// we decide if we need to shutdown
	gsiEnterCriticalSection(&aCore->mQueueCrit);

	// If there are other references, just return
	if (gsiInterlockedDecrement(&aCore->mRefCount)>0)
	{
		gsiLeaveCriticalSection(&aCore->mQueueCrit);
		return;
	}
	else
	{
		// we released the final reference, begin shutdown
		// no other thread will begin using the core until
		// mIsShuttingDown has been set back to false
		aCore->mIsShuttingDown = gsi_true;

		// Cancel all tasks
		#ifdef GSICORE_DYNAMIC_TASK_LIST
		{
			int len = ArrayLength(aCore->mTaskArray);
			for(i=0; i<len; i++)
			{
				gsiCoreCancelTask(*(GSTask**)ArrayNth(aCore->mTaskArray, i));
			}
		}
		#else
			for (i=0; i<GSICORE_MAXTASKS; i++)
			{
				if (aCore->mTaskArray[i] != NULL)
				{
					gsiCoreCancelTask(aCore->mTaskArray[i]);
				}
			}
		#endif
		gsiLeaveCriticalSection(&aCore->mQueueCrit);
	}
}


///////////////////////////////////////////////////////////////////////////////
///////////////////////////////////////////////////////////////////////////////
GSCoreValue gsCoreIsShutdown()
{
	GSCoreMgr* aCore = gsiGetStaticCore();

	if (gsi_is_true(aCore->mIsShuttingDown))
		return GSCore_SHUTDOWN_PENDING;
	if (aCore->mRefCount == 0)
		return GSCore_SHUTDOWN_COMPLETE;

	// The core isn't shutting down, and ref count > 0,
	// therefore the core is in use
	return GSCore_IN_USE;
}


///////////////////////////////////////////////////////////////////////////////
///////////////////////////////////////////////////////////////////////////////
// Adds a GSCoreTask to the execution array
//   - Tasks may come from multiple threads
void gsiCoreExecuteTask(GSTask* theTask, gsi_time theTimeoutMs)
{
	GSCoreMgr* aCore = gsiGetStaticCore();

	// Bail, if the task has already started
	GS_ASSERT(!theTask->mIsRunning);

	// Mark it as started and running
	theTask->mIsCallbackPending = 1;
	theTask->mIsStarted = 1;
	theTask->mIsRunning = 1;
	theTask->mTimeout = theTimeoutMs;
	theTask->mStartTime = current_time();	
	
	// Execute the task
	if (theTask->mExecuteFunc)
		(theTask->mExecuteFunc)(theTask->mTaskData);

	gsiEnterCriticalSection(&aCore->mQueueCrit);
	// add it to the process list
	#ifdef GSICORE_DYNAMIC_TASK_LIST
		ArrayAppend(aCore->mTaskArray, &theTask);
	#else
	{
		int anInsertPos = -1;
		int i=0;
		for (i=0; i<GSICORE_MAXTASKS; i++)
		{
			if (aCore->mTaskArray[i] == NULL)
			{
				anInsertPos = i;
				break;
			}
		}
		GS_ASSERT(anInsertPos != -1); // make sure it got in
		aCore->mTaskArray[anInsertPos] = theTask;
	}
	#endif
	gsiLeaveCriticalSection(&aCore->mQueueCrit);
}


///////////////////////////////////////////////////////////////////////////////
///////////////////////////////////////////////////////////////////////////////
// cancelling a task is an *async request*
// A task that doesn't support cancelling, such as a blocking socket operation,
// may complete normally even though it was cancelled.
void gsiCoreCancelTask(GSTask* theTask)
{
	GSCoreMgr* aCore = gsiGetStaticCore();

	// Enter critical secction here so the developer 
	// may cancel a task from any thread.  (e.g. The task thread has blocked)
	gsiEnterCriticalSection(&aCore->mQueueCrit);
	if (theTask->mIsRunning && !theTask->mIsCanceled)
	{
		theTask->mIsCanceled = 1;
		if (theTask->mCancelFunc)
			(theTask->mCancelFunc)(theTask->mTaskData);
	}
	gsiLeaveCriticalSection(&aCore->mQueueCrit);
	GSI_UNUSED(aCore);
}


///////////////////////////////////////////////////////////////////////////////
///////////////////////////////////////////////////////////////////////////////
GSTask* gsiCoreCreateTask()
{
	GSTask* aTask = (GSTask*)gsimalloc(sizeof(GSTask));
	if (aTask == NULL)
		return NULL;

	memset(aTask, 0, sizeof(GSTask));
	aTask->mAutoThink = gsi_true;
	return aTask;
}