File: mainloop-worker.c

package info (click to toggle)
syslog-ng 4.8.1-6
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 20,456 kB
  • sloc: ansic: 177,631; python: 13,035; cpp: 11,611; makefile: 7,012; sh: 5,147; java: 3,651; xml: 3,344; yacc: 1,377; lex: 599; perl: 193; awk: 190; objc: 162
file content (464 lines) | stat: -rw-r--r-- 14,260 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
/*
 * Copyright (c) 2002-2013 Balabit
 * Copyright (c) 1998-2013 Balázs Scheidler
 *
 * This library is free software; you can redistribute it and/or
 * modify it under the terms of the GNU Lesser General Public
 * License as published by the Free Software Foundation; either
 * version 2.1 of the License, or (at your option) any later version.
 *
 * This library is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 * Lesser General Public License for more details.
 *
 * You should have received a copy of the GNU Lesser General Public
 * License along with this library; if not, write to the Free Software
 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
 *
 * As an additional exemption you are allowed to compile & link against the
 * OpenSSL libraries as published by the OpenSSL project. See the file
 * COPYING for details.
 *
 */
#include "mainloop-worker.h"
#include "mainloop-call.h"
#include "tls-support.h"
#include "apphook.h"
#include "messages.h"
#include "scratch-buffers.h"
#include "atomic.h"

#include <iv.h>

TLS_BLOCK_START
{
  /* Thread IDs are low numbered integers that can be used to index
   * per-thread data in an array.  IDs get reused and the smallest possible
   * ID is allocated for newly started threads.  */

  /* the thread id is shifted by one, to make 0 the uninitialized state,
   * e.g. everything that sets it adds +1, everything that queries it
   * subtracts 1 */
  gint main_loop_worker_id;
  MainLoopWorkerType main_loop_worker_type;
  struct iv_list_head batch_callbacks;
}
TLS_BLOCK_END;

#define main_loop_worker_id __tls_deref(main_loop_worker_id)
#define batch_callbacks    __tls_deref(batch_callbacks)
#define main_loop_worker_type __tls_deref(main_loop_worker_type)

GQueue sync_call_actions = G_QUEUE_INIT;

/* cause workers to stop, no new I/O jobs to be submitted */
volatile gboolean main_loop_workers_quit;
volatile gboolean is_reloading_scheduled;

/* number of I/O worker jobs running */
static GAtomicCounter main_loop_jobs_running;

static struct iv_task main_loop_workers_reenable_jobs_task;

/* thread ID allocation */
static GMutex main_loop_workers_idmap_lock;

#define MAIN_LOOP_IDMAP_BITS_PER_ROW    (sizeof(guint64)*8)
#define MAIN_LOOP_IDMAP_ROWS            (MAIN_LOOP_MAX_WORKER_THREADS / MAIN_LOOP_IDMAP_BITS_PER_ROW)

static guint64 main_loop_workers_idmap[MAIN_LOOP_IDMAP_ROWS];
static gint main_loop_max_workers = 0;
static gint main_loop_estimated_number_of_workers = 0;

/* NOTE: return a zero based index for the current thread, to be used in
 * array indexes.  -1 means that the thread does not have an ID */
gint
main_loop_worker_get_thread_index(void)
{
  return main_loop_worker_id - 1;
}

static void
_allocate_thread_id(void)
{
  g_mutex_lock(&main_loop_workers_idmap_lock);

  /* the maximum number of threads must be dividible by 64, the array
   * main_loop_workers_idmap is sized accordingly, e.g.  the remainder could
   * not be represented in the array as is.  */

  G_STATIC_ASSERT((MAIN_LOOP_MAX_WORKER_THREADS % MAIN_LOOP_IDMAP_BITS_PER_ROW) == 0);

  main_loop_worker_id = 0;

  for (gint thread_index = 0; thread_index < MAIN_LOOP_MAX_WORKER_THREADS; thread_index++)
    {
      gint row = thread_index / MAIN_LOOP_IDMAP_BITS_PER_ROW;
      gint bit_in_row = thread_index % MAIN_LOOP_IDMAP_BITS_PER_ROW;

      if ((main_loop_workers_idmap[row] & (1ULL << bit_in_row)) == 0)
        {
          /* thread_index not yet used */

          main_loop_workers_idmap[row] |= (1ULL << bit_in_row);
          main_loop_worker_id = (thread_index + 1);
          break;
        }
    }
  g_mutex_unlock(&main_loop_workers_idmap_lock);

  if (main_loop_worker_id == 0)
    {
      msg_warning_once("Unable to allocate a unique thread ID. This can only "
                       "happen if the number of syslog-ng worker threads exceeds the "
                       "compile time constant MAIN_LOOP_MAX_WORKER_THREADS. "
                       "This is not a fatal problem but can be a cause for "
                       "decreased performance. Increase this number and recompile "
                       "or contact the syslog-ng authors",
                       evt_tag_int("max-worker-threads-hard-limit", MAIN_LOOP_MAX_WORKER_THREADS));
    }

  if (main_loop_worker_id > main_loop_max_workers)
    {
      msg_warning_once("The actual number of worker threads exceeds the number of threads "
                       "estimated at startup. This indicates a bug in thread estimation, "
                       "which is not fatal but could cause decreased performance. Please "
                       "contact the syslog-ng authors with your config to help troubleshoot "
                       "this issue",
                       evt_tag_int("worker-id", main_loop_worker_id),
                       evt_tag_int("max-worker-threads", main_loop_max_workers));
      main_loop_worker_id = 0;
    }
}

static void
_release_thread_id(void)
{
  g_mutex_lock(&main_loop_workers_idmap_lock);
  if (main_loop_worker_id)
    {
      const gint thread_index = main_loop_worker_get_thread_index();
      gint row = thread_index / MAIN_LOOP_IDMAP_BITS_PER_ROW;
      gint bit_in_row = thread_index % MAIN_LOOP_IDMAP_BITS_PER_ROW;

      main_loop_workers_idmap[row] &= ~(1ULL << (bit_in_row));
      main_loop_worker_id = 0;
    }
  g_mutex_unlock(&main_loop_workers_idmap_lock);
}

gboolean
main_loop_worker_is_worker_thread(void)
{
  return main_loop_worker_type > MLW_UNKNOWN;
}

typedef struct _WorkerExitNotification
{
  WorkerExitNotificationFunc func;
  gpointer user_data;
} WorkerExitNotification;

static GList *exit_notification_list = NULL;

void
main_loop_worker_register_exit_notification_callback(WorkerExitNotificationFunc func, gpointer user_data)
{
  WorkerExitNotification *cfunc = g_new(WorkerExitNotification, 1);

  cfunc->func = func;
  cfunc->user_data = user_data;

  exit_notification_list = g_list_append(exit_notification_list, cfunc);
}

static void
_invoke_worker_exit_callback(WorkerExitNotification *func)
{
  func->func(func->user_data);
}

static void
_request_all_threads_to_exit(void)
{
  g_list_foreach(exit_notification_list, (GFunc) _invoke_worker_exit_callback, NULL);
  g_list_foreach(exit_notification_list, (GFunc) g_free, NULL);
  g_list_free(exit_notification_list);
  exit_notification_list = NULL;
  main_loop_workers_quit = TRUE;
}

/* Call this function from worker threads, when you start up */
void
main_loop_worker_thread_start(MainLoopWorkerType worker_type)
{
  main_loop_worker_type = worker_type;

  _allocate_thread_id();
  INIT_IV_LIST_HEAD(&batch_callbacks);

  g_mutex_lock(&workers_running_lock);
  main_loop_workers_running++;
  g_mutex_unlock(&workers_running_lock);

  app_thread_start();
}

/* Call this function from worker threads, when you stop */
void
main_loop_worker_thread_stop(void)
{
  app_thread_stop();
  _release_thread_id();

  g_mutex_lock(&workers_running_lock);
  main_loop_workers_running--;
  g_cond_signal(&thread_halt_cond);
  g_mutex_unlock(&workers_running_lock);
}

void
main_loop_worker_run_gc(void)
{
  scratch_buffers_explicit_gc();
}

/*
 * This function is called in the main thread prior to starting the
 * processing of a work item in a worker thread.
 */
void
main_loop_worker_job_start(void)
{
  g_atomic_counter_inc(&main_loop_jobs_running);
}

typedef struct
{
  void (*func)(gpointer user_data);
  gpointer user_data;
} SyncCallAction;

void
_register_sync_call_action(GQueue *q, void (*func)(gpointer user_data), gpointer user_data)
{

  SyncCallAction *action = g_new0(SyncCallAction, 1);
  action->func = func;
  action->user_data = user_data;

  g_queue_push_tail(q, action);

}

void
_consume_action(SyncCallAction *action)
{
  action->func(action->user_data);
  g_free(action);
}

static void
_invoke_sync_call_actions(void)
{
  while (!g_queue_is_empty(&sync_call_actions))
    {
      SyncCallAction *action = g_queue_pop_head(&sync_call_actions);
      _consume_action(action);
    }
}

/*
 * This function is called in the main thread after a job was finished in
 * one of the worker threads.
 *
 * If an intrusive operation (reload, termination) is pending and the number
 * of workers has dropped to zero, it commences with the intrusive
 * operation, as in that case we can safely assume that all workers exited.
 */
void
main_loop_worker_job_complete(void)
{
  main_loop_assert_main_thread();

  gboolean reached_zero = g_atomic_counter_dec_and_test(&main_loop_jobs_running);
  if (main_loop_workers_quit && reached_zero)
    {
      /* NOTE: we can't reenable I/O jobs by setting
       * main_loop_io_workers_quit to FALSE right here, because a task
       * generated by the old config might still be sitting in the task
       * queue, to be invoked once we return from here.  Tasks cannot be
       * cancelled, thus we have to get to the end of the currently running
       * task queue.
       *
       * Thus we register another task
       * (&main_loop_io_workers_reenable_jobs_task), which is guaranteed to
       * be added to the end of the task queue, which reenables task
       * submission.
       *
       *
       * A second constraint is that any tasks submitted by the reload
       * logic (sitting behind the sync_func() call below), MUST be
       * registered after the reenable_jobs_task, because otherwise some
       * I/O events will be missed, due to main_loop_io_workers_quit being
       * TRUE.
       *
       *
       *   |OldTask1|OldTask2|OldTask3| ReenableTask |NewTask1|NewTask2|NewTask3|
       *   ^
       *   | ivykis task list
       *
       * OldTasks get dropped because _quit is TRUE, NewTasks have to be
       * executed properly, otherwise we'd hang.
       */

      iv_task_register(&main_loop_workers_reenable_jobs_task);
    }
}

/*
 * Register a function to be called back when the current I/O job is
 * finished (in the worker thread).
 *
 * NOTE: we only support one pending callback at a time, may become a list of callbacks if needed in the future
 */
void
main_loop_worker_register_batch_callback(WorkerBatchCallback *cb)
{
  iv_list_add(&cb->list, &batch_callbacks);
}

void
main_loop_worker_invoke_batch_callbacks(void)
{
  struct iv_list_head *lh, *lh2;

  iv_list_for_each_safe(lh, lh2, &batch_callbacks)
  {
    WorkerBatchCallback *cb = iv_list_entry(lh, WorkerBatchCallback, list);
    iv_list_del_init(&cb->list);

    cb->func(cb->user_data);
  }
}

void
main_loop_worker_assert_batch_callbacks_were_processed(void)
{
  g_assert(iv_list_empty(&batch_callbacks));
}

static void
_reenable_worker_jobs(void *s)
{
  _invoke_sync_call_actions();
  main_loop_workers_quit = FALSE;
  if (is_reloading_scheduled)
    msg_notice("Configuration reload finished");
  is_reloading_scheduled = FALSE;
}

void
main_loop_worker_sync_call(void (*func)(gpointer user_data), gpointer user_data)
{
  main_loop_assert_main_thread();

  _register_sync_call_action(&sync_call_actions, func, user_data);

  /*
   * This might seem racy as we are reading an atomic counter without
   * testing it for its zero value. This is safe, because:
   *
   *   - the only case we increment main_loop_jobs_running from the non-main
   *     thread is when we submit slave jobs to the worker pool
   *
   *   - slave jobs are submitted by worker jobs at a point where
   *     main_loop_jobs_running cannot be zero (since they are running)
   *
   *   - decrementing main_loop_jobs_running always happens in the main
   *     thread (in main_loop_worker_job_complete)
   *
   *    - this function is called by the main thread.
   *
   * With all this said, checking the main_loop_jobs_running is zero is not
   * in fact racy as once it reaches zero there's no concurrency.  If it's
   * non-zero, then the _complete() callbacks are yet to run, but that
   * always happens in the thread we are executing now.
   */
  if (g_atomic_counter_get(&main_loop_jobs_running) == 0)
    {
      _reenable_worker_jobs(NULL);
    }
  else
    {
      _request_all_threads_to_exit();
    }
}

/* This function is intended to be used from test programs to properly
 * synchronize threaded worker startups and then trigger everything to exit
 * and wait for that too.  This is useful in LogThreadedDestDriver test
 * cases, where the test program itself is the "main" thread and we don't
 * want to launch an entire main loop, because in that case we'd be forced
 * to feed the worker thread from ivykis callbacks, which is a lot more
 * difficult to write/maintain.
 *
 * This function clearly shows the level of ugly couplings between the
 * various mainloop components.  (e.g.  mainloop and mainloop worker).  I
 * consider that this should be part of the mainloop layer (semantically, it
 * is the main loop that we are launching in a special mode.  This is also
 * indicated by the iv_main() call below). However its implementation
 * requires access to the main_loop_workers variable.
 */
void
main_loop_sync_worker_startup_and_teardown(void)
{
  struct iv_task request_exit;
  if (g_atomic_counter_get(&main_loop_jobs_running) == 0)
    return;

  IV_TASK_INIT(&request_exit);
  request_exit.handler = (void (*)(void *)) _request_all_threads_to_exit;
  iv_task_register(&request_exit);
  _register_sync_call_action(&sync_call_actions, (void (*)(gpointer user_data)) iv_quit, NULL);
  iv_main();
}

gint
main_loop_worker_get_max_number_of_threads(void)
{
  return main_loop_max_workers;
}

void
main_loop_worker_allocate_thread_space(gint num_threads)
{
  main_loop_estimated_number_of_workers += num_threads;
}

void
main_loop_worker_finalize_thread_space(void)
{
  main_loop_max_workers = main_loop_estimated_number_of_workers;
  main_loop_estimated_number_of_workers = 0;
}

static void
__pre_init_hook(gint type, gpointer user_data)
{
  main_loop_worker_finalize_thread_space();
}

void
main_loop_worker_init(void)
{
  IV_TASK_INIT(&main_loop_workers_reenable_jobs_task);
  main_loop_workers_reenable_jobs_task.handler = _reenable_worker_jobs;
  register_application_hook(AH_CONFIG_PRE_INIT, __pre_init_hook, NULL, AHM_RUN_REPEAT);
}

void
main_loop_worker_deinit(void)
{
}