File: esl_threads.c

package info (click to toggle)
hmmer 3.2.1%2Bdfsg-1
  • links: PTS, VCS
  • area: main
  • in suites: buster
  • size: 23,380 kB
  • sloc: ansic: 119,305; perl: 8,791; sh: 3,266; makefile: 1,871; python: 598
file content (443 lines) | stat: -rw-r--r-- 13,093 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
/* Simple master/worker data parallelization using POSIX threads.
 * 
 * Contents:
 *    1. The <ESL_THREADS> object: a gang of workers.
 *    2. Determining thread number to use.
 *    3. Examples.
 */
#include "esl_config.h"

#ifdef HAVE_PTHREAD

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>

#ifdef HAVE_UNISTD_H
#include <unistd.h>
#endif
#ifdef HAVE_SYS_PARAM_H		/* On OpenBSD, sys/sysctl.h requires sys/param.h */
#include <sys/param.h>
#endif
#ifdef HAVE_SYS_SYSCTL_H
#include <sys/sysctl.h>
#endif
#ifdef HAVE_SYS_TYPES_H
#include <sys/types.h>
#endif

#include "easel.h"
#include "esl_threads.h"


/*****************************************************************
 *# 1. The <ESL_THREADS> object: a gang of workers.
 *****************************************************************/ 

/* Function:  esl_threads_Create()
 * Synopsis:  Create a threads object.
 * Incept:    MSF, Thu Jun 18 11:51:39 2009
 *
 * Purpose:   Creates an <ESL_THREADS> object, for organizing
 8            a bunch of worker threads that will all run 
 *            the worker function <fnptr>. This object is a shell
 *            for now; the worker threads themselves are 
 *            created individually with <esl_threads_AddThread()>.
 *
 * Returns:   ptr to the new <ESL_THREADS> object.
 * 
 * Throws:    <NULL> on allocation or initialization failure.
 */
ESL_THREADS *
esl_threads_Create(void (*fnptr)(void *))
{
  ESL_THREADS *obj = NULL;
  int          status;

  ESL_ALLOC(obj, sizeof(ESL_THREADS));

  obj->threadCount     = 0;
  obj->threadId        = NULL;
  obj->data            = NULL;
  obj->startThread     = 0;
  obj->func            = fnptr;

  if (pthread_mutex_init(&obj->startMutex, NULL) != 0) ESL_XEXCEPTION(eslESYS, "mutex init failed");
  if (pthread_cond_init (&obj->startCond,  NULL) != 0) ESL_XEXCEPTION(eslESYS, "cond init failed");
  return obj;

 ERROR:
  return NULL;
}

/* Function:  esl_threads_Destroy()
 * Synopsis:  Destroys an <ESL_THREADS> object.
 * Incept:    MSF, Thu Jun 18 11:51:39 2009
 *
 * Purpose:   Frees an <ESL_THREADS> object.  
 *
 *            The caller routine must first free the
 *            contents of each <obj->data[]>.
 *
 * Returns:   void
 */
void 
esl_threads_Destroy(ESL_THREADS *obj)
{
  if (obj == NULL) return;

  if (obj->threadId != NULL) free(obj->threadId);
  if (obj->data     != NULL) free(obj->data);
  pthread_mutex_destroy(&obj->startMutex);
  pthread_cond_destroy (&obj->startCond);
  free(obj);
  return;
}

/* Function:  esl_threads_AddThread()
 * Synopsis:  Add a worker thread to the <ESL_THREADS> object.
 * Incept:    MSF, Thu Jun 18 11:51:39 2009
 *
 * Purpose:   Create a new worker thread for the <ESL_THREADS> object,
 *            assigning it the work unit pointed to by <data>.
 *
 *            The caller remains responsible for any memory allocated
 *            to <data>; the <ESL_THREADS> object will only manage
 *            a copy of a pointer to <data>.
 *
 * Returns:   <eslOK> on success.
 * 
 * Throws:    <eslEMEM> on allocation failure. 
 *            <eslESYS> if thread creation fails.
 *            <eslEINVAL> if something's wrong with the <obj>.
 */
int 
esl_threads_AddThread(ESL_THREADS *obj, void *data)
{
  int    status;
  void  *p;

  if (obj == NULL) ESL_EXCEPTION(eslEINVAL, "Invalid thread object");

  /* allocate inside the ESL_THREADS object to hold another worker */
  ESL_RALLOC(obj->threadId, p, sizeof(pthread_t) * (obj->threadCount+1));
  ESL_RALLOC(obj->data,     p, sizeof(void *)    * (obj->threadCount+1));
  
  obj->data[obj->threadCount] = data;
  if (pthread_create(&(obj->threadId[obj->threadCount]), NULL, (void *(*)(void *)) obj->func, obj) != 0) ESL_EXCEPTION(eslESYS, "thread creation failed");
  obj->threadCount++;
  return eslOK;

 ERROR:
  return status;
}

/* Function:  esl_threads_GetWorkerCount()
 * Synopsis:  Return the total number of worker threads.
 * Incept:    SRE, Fri Aug 21 13:22:52 2009 [Janelia]
 *
 * Purpose:   Returns the total number of worker threads.
 */
int
esl_threads_GetWorkerCount(ESL_THREADS *obj)
{
  return obj->threadCount;
}


/* Function:  esl_threads_WaitForStart()
 * Synopsis:  Blocks master until all workers have started.
 * Incept:    MSF, Thu Jun 18 11:51:39 2009
 *
 * Purpose:   Make the master thread wait until all the worker threads have
 *            started. When all the worker threads have started and
 *            are blocking at the start mutex, release them.
 *
 * Returns:   <eslOK> on success.
 * 
 * Throws:    <eslESYS> if thread synchronization fails somewhere.
 *            <eslEINVAL> if something is awry with <obj>.
 */
int
esl_threads_WaitForStart(ESL_THREADS *obj)
{
  if (obj == NULL) ESL_EXCEPTION(eslEINVAL, "Invalid thread object");

  if (pthread_mutex_lock (&obj->startMutex) != 0) ESL_EXCEPTION(eslESYS, "mutex lock failed");

  /* wait for all worker threads to start */
  while (obj->startThread < obj->threadCount) {
    if (pthread_cond_wait(&obj->startCond, &obj->startMutex) != 0) ESL_EXCEPTION(eslESYS, "wait cond failed");
  }

  /* release all the worker threads */
  obj->startThread = 0;
  if (pthread_cond_broadcast(&obj->startCond)  != 0) ESL_EXCEPTION(eslESYS, "cond broadcast failed");
  if (pthread_mutex_unlock  (&obj->startMutex) != 0) ESL_EXCEPTION(eslESYS, "mutex unlock failed");
  return eslOK;
}

/* Function:  esl_threads_WaitForFinish()
 * Synopsis:  Blocks master until all workers have completed.
 * Incept:    MSF, Thu Jun 18 11:51:39 2009
 *
 * Purpose:   Block the master thread until all the worker threads have
 *            completed. As each worker completes, remove it from the 
 *            <obj>. 
 *            
 *            Upon exit, the <obj> is returned to the same (empty)
 *            state it was in after it was created. It may be reused
 *            for a new problem by adding new workers.
 *
 * Returns:   <eslOK> on success.
 *
 * Throws:    <eslESYS> if thread synchronization fails somewhere.
 *            <eslEINVAL> if something is awry with <obj>.
 */
int 
esl_threads_WaitForFinish(ESL_THREADS *obj)
{
  int  w;			

  if (obj == NULL) ESL_EXCEPTION(eslEINVAL, "Invalid thread object");

  /* wait for all worker threads to complete */
  for (w = obj->threadCount-1; w >= 0; w--)
    {
      if (pthread_join(obj->threadId[w], NULL) != 0) ESL_EXCEPTION(eslESYS, "pthread join failed");
      obj->threadCount--;
    }

  return eslOK;
}

/* Function:  esl_threads_Started()
 * Synopsis:  Blocks worker until master gives the start signal.
 * Incept:    MSF, Thu Jun 18 11:51:39 2009
 *
 * Purpose:   Block a worker thread until master sees that all workers
 *            have started and gives the start signal. Assign the worker
 *            a unique number (0..nworkers-1), and return it in
 *            <*ret_workeridx>. The worker uses this index to 
 *            retrieve its work units.
 *
 * Returns:   <eslOK> on success.
 *
 * Throws:    <eslESYS> if thread synchronization fails somewhere.
 *            <eslEINVAL> if something is awry with <obj>.
 */
int
esl_threads_Started(ESL_THREADS *obj, int *ret_workeridx)
{
  int           w;
  pthread_t     threadId;
  int           status;

  if (obj == NULL)                                ESL_XEXCEPTION(eslEINVAL, "Invalid thread object");
  if (pthread_mutex_lock (&obj->startMutex) != 0) ESL_XEXCEPTION(eslESYS,   "mutex lock failed");

  /* signal that we're started */
  obj->startThread++;
  if (pthread_cond_broadcast (&obj->startCond) != 0) ESL_XEXCEPTION(eslESYS, "cond broadcast failed");

  /* wait for the master's signal to start the calculations */
  while (obj->startThread) {
    if (pthread_cond_wait(&obj->startCond, &obj->startMutex) != 0) ESL_XEXCEPTION(eslESYS, "cond wait failed");
  }

  if (pthread_mutex_unlock (&obj->startMutex) != 0)  ESL_XEXCEPTION(eslESYS, "mutex unlock failed");

  /* Figure out the worker's index */
  threadId = pthread_self();
  for (w = 0; w < obj->threadCount; w++)
    if (pthread_equal(threadId, obj->threadId[w])) break;
  if (w == obj->threadCount) ESL_XEXCEPTION(eslESYS, "thread not registered");

  *ret_workeridx = w;
  return eslOK;

 ERROR:
  *ret_workeridx = 0;
  return status;
}


/* Function:  esl_threads_GetData()
 * Synopsis:  Return the data associated with this thread.
 * Incept:    MSF, Thu Jun 18 11:51:39 2009
 *
 * Purpose:   Return the data pointer associated with the worker thread
 *            <workeridx>. The data pointer was set by the 
 *            <esl_threads_AddThread()> function.
 *
 * Returns:   void *
 */
void *
esl_threads_GetData(ESL_THREADS *obj, int workeridx)
{
  return obj->data[workeridx];
}


/* Function:  esl_threads_Finished()
 * Synopsis:  Terminate the thread.
 * Incept:    MSF, Thu Jun 18 11:51:39 2009
 *
 * Purpose:   Terminate a worker thread.
 *            This is currently a no-op, serving as
 *            a placeholder in case we eventually need
 *            any cleanup.
 *
 * Returns:   <eslOK> on success.
 */
int
esl_threads_Finished(ESL_THREADS *obj, int workeridx)
{
  return eslOK;
}


/*****************************************************************
 * 2. Determining thread number to use
 *****************************************************************/

/* Function:  esl_threads_CPUCount()
 * Synopsis:  Figure out how many cpus the machine has.
 * Incept:    SRE, Wed Aug 19 11:31:24 2009 [Janelia]
 *
 * Purpose:   Determine the number of logical processors on this
 *            machine; return that number in <*ret_ncpu>.
 *            
 *            The number of available processors is found by
 *            <sysconf(_SC_NPROCESSORS_ONLN)>,
 *            <sysconf(_SC_NPROC_ONLN)>, or a <sysctl()> call,
 *            depending on the host system.  This determined number of
 *            available processors will be the number of logical
 *            processors, not physical processors. On systems with
 *            hyperthreading, the number of logical processors is more
 *            than the number of physical cpus. It may or may not be a
 *            good thing to spawn more threads than physical
 *            processors.
 *            
 * Args:      ret_ncpu  - RETURN: number of logical CPUs
 *
 * Returns:   <eslOK> on success.
 *
 * Throws:    (no abnormal error conditions)
 *
 * Xref:      J5/68
 */
int
esl_threads_CPUCount(int *ret_ncpu)
{
  int   ncpu = 1;

#if defined     (HAVE_SYSCONF) && defined (_SC_NPROCESSORS_ONLN)     /* Many systems (including Linux) */
  ncpu = sysconf(_SC_NPROCESSORS_ONLN);
#elif defined   (HAVE_SYSCONF) && defined (_SC_NPROC_ONLN)	     /* Silicon Graphics IRIX */
  ncpu = sysconf(_SC_NPROC_ONLN);
#elif defined   (HAVE_SYSCTL)	                                     /* BSD systems including OS/X */
  int    mib[2] = {CTL_HW, HW_NCPU};
  size_t len    = sizeof(int);
  int    status;

  status = sysctl(mib, 2, &ncpu, &len, NULL, (size_t) NULL);
  if (status < 0 || len != sizeof(int)) ncpu = 1;
#endif
  
  if (ncpu < 1) ncpu = 1;

  *ret_ncpu = ncpu;
  return eslOK;
}


/* Function:  esl_threads_GetCPUCount()
 * Synopsis:  Returns the number of CPU cores on machine.
 * Incept:    SRE, Mon Aug 21 08:52:29 2017
 *
 * Purpose:   Identical to <esl_threads_CPUCount()>, except
 *            it directly returns the result.
 */
int
esl_threads_GetCPUCount(void)
{
  static int ncpu = -1;                         // so we only make system calls once.
  if (ncpu == -1) esl_threads_CPUCount(&ncpu);
  return ncpu;
}


/*****************************************************************
 * 3. Example
 *****************************************************************/

#ifdef eslTHREADS_EXAMPLE
#include "easel.h"
#include "esl_threads.h"

/* gcc --std=gnu99 -g -Wall -pthread -o esl_threads_example -I. -DeslTHREADS_EXAMPLE esl_threads.c easel.c */
static void 		
worker_thread(void *data)
{
  ESL_THREADS *thr = (ESL_THREADS *) data;
  char        *s   = NULL;
  int          w;

  esl_threads_Started(thr, &w);
  
  s = (char *) esl_threads_GetData(thr, w);
  printf("worker thread %d receives: %s\n", w, s);

  esl_threads_Finished(thr, w);
  return;
}

int
main(void)
{
  ESL_THREADS  *thr  = NULL;
  int           ncpu = 8;
  int           i;
  char        **work = NULL;

  work = malloc(sizeof(char *) * ncpu);
  for (i = 0; i < ncpu; i++) 
    esl_sprintf(&(work[i]), "work packet %d", i);

  thr = esl_threads_Create(&worker_thread);

  for (i = 0; i < ncpu; i++)
    esl_threads_AddThread(thr, (void *) work[i]);

  esl_threads_WaitForStart (thr);
  /* The worker threads now run their work. */
  esl_threads_WaitForFinish(thr);
  esl_threads_Destroy(thr);
  for (i = 0; i < ncpu; i++) free(work[i]);
  free(work);
  return eslOK;
}
#endif /*eslTHREADS_EXAMPLE*/


#ifdef eslTHREADS_EXAMPLE2
#include "easel.h"
#include "esl_threads.h"

/* gcc --std=gnu99 -g -Wall -pthread -o esl_threads_example2 -I. -DeslTHREADS_EXAMPLE2 esl_threads.c easel.c */
int 
main(void)
{
  int ncpu;

  esl_threads_CPUCount(&ncpu);
  printf("Processors: %d\n", ncpu);

  return eslOK;
}
#endif /*eslTHREADS_EXAMPLE2*/
#endif /*HAVE_PTHREAD*/