File: mainloop-worker.c

package info (click to toggle)
syslog-ng 3.8.1-10
  • links: PTS, VCS
  • area: main
  • in suites: stretch
  • size: 47,320 kB
  • ctags: 43,937
  • sloc: ansic: 159,432; yacc: 25,059; sh: 13,574; makefile: 4,669; python: 3,468; java: 3,218; xml: 2,309; perl: 318; lex: 316; awk: 184
file content (364 lines) | stat: -rw-r--r-- 10,481 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
/*
 * Copyright (c) 2002-2013 Balabit
 * Copyright (c) 1998-2013 Balázs Scheidler
 *
 * This library is free software; you can redistribute it and/or
 * modify it under the terms of the GNU Lesser General Public
 * License as published by the Free Software Foundation; either
 * version 2.1 of the License, or (at your option) any later version.
 *
 * This library is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 * Lesser General Public License for more details.
 *
 * You should have received a copy of the GNU Lesser General Public
 * License along with this library; if not, write to the Free Software
 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
 *
 * As an additional exemption you are allowed to compile & link against the
 * OpenSSL libraries as published by the OpenSSL project. See the file
 * COPYING for details.
 *
 */
#include "mainloop-worker.h"
#include "mainloop-call.h"
#include "tls-support.h"
#include "apphook.h"

#include <iv.h>

typedef enum { GENERAL_THREAD = 0, OUTPUT_THREAD, EXTERNAL_INPUT_THREAD, MAIN_LOOP_WORKER_TYPE_MAX} MainLoopWorkerType;

TLS_BLOCK_START
{
  /* Thread IDs are low numbered integers that can be used to index
   * per-thread data in an array.  IDs get reused and the smallest possible
   * ID is allocated for newly started threads.  */

  /* the thread id is shifted by one, to make 0 the uninitialized state,
   * e.g. everything that sets it adds +1, everything that queries it
   * subtracts 1 */
  gint main_loop_worker_id;
  MainLoopWorkerType main_loop_worker_type;
  struct iv_list_head batch_callbacks;
}
TLS_BLOCK_END;

#define main_loop_worker_id __tls_deref(main_loop_worker_id)
#define batch_callbacks    __tls_deref(batch_callbacks)
#define main_loop_worker_type __tls_deref(main_loop_worker_type)


/* cause workers to stop, no new I/O jobs to be submitted */
volatile gboolean main_loop_workers_quit;

/* number of I/O worker jobs running */
static gint main_loop_workers_running;

/* the function to be killed when all threads have exited */
static void (*main_loop_workers_sync_func)(void);
static struct iv_task main_loop_workers_reenable_jobs_task;

/* thread ID allocation */
static GStaticMutex main_loop_workers_idmap_lock = G_STATIC_MUTEX_INIT;

static guint64 main_loop_workers_idmap[MAIN_LOOP_WORKER_TYPE_MAX];

static void
_allocate_thread_id(void)
{
  gint id;

  g_static_mutex_lock(&main_loop_workers_idmap_lock);

  /* NOTE: this algorithm limits the number of I/O worker threads to 64,
   * since the ID map is stored in a single 64 bit integer.  If we ever need
   * more threads than that, we can generalize this algorithm further. */

  main_loop_worker_id = 0;

  if(main_loop_worker_type != EXTERNAL_INPUT_THREAD)
    {
      for (id = 0; id < MAIN_LOOP_MAX_WORKER_THREADS; id++)
        {
          if ((main_loop_workers_idmap[main_loop_worker_type] & (1 << id)) == 0)
            {
              /* id not yet used */

              main_loop_worker_id = (id + 1)  + (main_loop_worker_type * MAIN_LOOP_MAX_WORKER_THREADS);
              main_loop_workers_idmap[main_loop_worker_type] |= (1 << id);
              break;
            }
        }
    }
  g_static_mutex_unlock(&main_loop_workers_idmap_lock);
}

static void
_release_thread_id(void)
{
  g_static_mutex_lock(&main_loop_workers_idmap_lock);
  if (main_loop_worker_id)
    {
      main_loop_workers_idmap[main_loop_worker_type] &= ~(1 << (main_loop_worker_id - 1));
      main_loop_worker_id = 0;
    }
  g_static_mutex_unlock(&main_loop_workers_idmap_lock);
}

/* NOTE: only used by the unit test program to emulate worker threads with
 * LogQueue, other threads acquire a thread id when they start up. */
void
main_loop_worker_set_thread_id(gint id)
{
  main_loop_worker_id = id + 1;
}

gint
main_loop_worker_get_thread_id(void)
{
  return main_loop_worker_id - 1;
}

typedef struct _WorkerExitNotification
{
  WorkerExitNotificationFunc func;
  gpointer user_data;
} WorkerExitNotification;

static GList *exit_notification_list = NULL;

static void
_register_exit_notification_callback(WorkerExitNotificationFunc func, gpointer user_data)
{
  WorkerExitNotification *cfunc = g_new(WorkerExitNotification, 1);

  cfunc->func = func;
  cfunc->user_data = user_data;

  exit_notification_list = g_list_append(exit_notification_list, cfunc);
}

static void
_invoke_worker_exit_callback(WorkerExitNotification *func)
{
  func->func(func->user_data);
}

static void
_request_all_threads_to_exit(void)
{
  g_list_foreach(exit_notification_list, (GFunc) _invoke_worker_exit_callback, NULL);
  g_list_foreach(exit_notification_list, (GFunc) g_free, NULL);
  g_list_free(exit_notification_list);
  exit_notification_list = NULL;
  main_loop_workers_quit = TRUE;
}

/* Call this function from worker threads, when you start up */
void
main_loop_worker_thread_start(void *cookie)
{
  WorkerOptions *worker_options = cookie;
  main_loop_worker_type = GENERAL_THREAD;

  if (worker_options && worker_options->is_output_thread)
    {
      main_loop_worker_type = OUTPUT_THREAD;
    }
  else if(worker_options && worker_options->is_external_input)
    {
      main_loop_worker_type = EXTERNAL_INPUT_THREAD;
    }

  _allocate_thread_id();
  INIT_IV_LIST_HEAD(&batch_callbacks);
  app_thread_start();
}

/* Call this function from worker threads, when you stop */
void
main_loop_worker_thread_stop(void)
{
  app_thread_stop();
  _release_thread_id();
}

/*
 * This function is called in the main thread prior to starting the
 * processing of a work item in a worker thread.
 */
void
main_loop_worker_job_start(void)
{
  main_loop_assert_main_thread();

  main_loop_workers_running++;
}

/*
 * This function is called in the main thread after a job was finished in
 * one of the worker threads.
 *
 * If an intrusive operation (reload, termination) is pending and the number
 * of workers has dropped to zero, it commences with the intrusive
 * operation, as in that case we can safely assume that all workers exited.
 */
void
main_loop_worker_job_complete(void)
{
  main_loop_assert_main_thread();

  main_loop_workers_running--;
  if (main_loop_workers_quit && main_loop_workers_running == 0)
    {
       /* NOTE: we can't reenable I/O jobs by setting
        * main_loop_io_workers_quit to FALSE right here, because a task
        * generated by the old config might still be sitting in the task
        * queue, to be invoked once we return from here.  Tasks cannot be
        * cancelled, thus we have to get to the end of the currently running
        * task queue.
        *
        * Thus we register another task
        * (&main_loop_io_workers_reenable_jobs_task), which is guaranteed to
        * be added to the end of the task queue, which reenables task
        * submission.
        *
        *
        * A second constraint is that any tasks submitted by the reload
        * logic (sitting behind the sync_func() call below), MUST be
        * registered after the reenable_jobs_task, because otherwise some
        * I/O events will be missed, due to main_loop_io_workers_quit being
        * TRUE.
        *
        *
        *   |OldTask1|OldTask2|OldTask3| ReenableTask |NewTask1|NewTask2|NewTask3|
        *   ^
        *   | ivykis task list
        *
        * OldTasks get dropped because _quit is TRUE, NewTasks have to be
        * executed properly, otherwise we'd hang.
        */

      iv_task_register(&main_loop_workers_reenable_jobs_task);
      main_loop_workers_sync_func();
    }
}

/*
 * Register a function to be called back when the current I/O job is
 * finished (in the worker thread).
 *
 * NOTE: we only support one pending callback at a time, may become a list of callbacks if needed in the future
 */
void
main_loop_worker_register_batch_callback(WorkerBatchCallback *cb)
{
  iv_list_add(&cb->list, &batch_callbacks);
}

void
main_loop_worker_invoke_batch_callbacks(void)
{
  struct iv_list_head *lh, *lh2;

  iv_list_for_each_safe(lh, lh2, &batch_callbacks)
    {
      WorkerBatchCallback *cb = iv_list_entry(lh, WorkerBatchCallback, list);

      cb->func(cb->user_data);
      iv_list_del_init(&cb->list);
    }
}

typedef struct _WorkerThreadParams
{
  WorkerThreadFunc func;
  gpointer data;
  WorkerOptions *worker_options;
} WorkerThreadParams;

static gpointer
_worker_thread_func(gpointer st)
{
  WorkerThreadParams *p = st;

  main_loop_worker_thread_start(p->worker_options);
  p->func(p->data);
  main_loop_call((MainLoopTaskFunc) main_loop_worker_job_complete, NULL, TRUE);
  main_loop_worker_thread_stop();


  /* NOTE: this assert aims to validate that the worker thread in fact
   * invokes main_loop_worker_invoke_batch_callbacks() during its operation.
   * Please do so every once a couple of messages, hopefully you have a
   * natural barrier that let's you decide when, the easiest would be
   * log-fetch-limit(), but other limits may also be applicable.
   */
  g_assert(iv_list_empty(&batch_callbacks));

  g_free(st);
  return NULL;
}

void
main_loop_create_worker_thread(WorkerThreadFunc func, WorkerExitNotificationFunc terminate_func, gpointer data, WorkerOptions *worker_options)
{
  GThread *h;
  WorkerThreadParams *p;

  main_loop_assert_main_thread();

  p = g_new0(WorkerThreadParams, 1);
  p->func = func;
  p->data = data;
  p->worker_options = worker_options;

  main_loop_worker_job_start();
  if (terminate_func)
    _register_exit_notification_callback(terminate_func, data);
  h = g_thread_create_full(_worker_thread_func, p, 1024 * 1024, FALSE, TRUE, G_THREAD_PRIORITY_NORMAL, NULL);
  g_assert(h != NULL);
}

static void
_reenable_worker_jobs(void *s)
{
  main_loop_workers_quit = FALSE;
  main_loop_workers_sync_func = NULL;
}

void
main_loop_worker_sync_call(void (*func)(void))
{
#if 0
  /* FIXME */
  g_assert(main_loop_workers_sync_func == NULL || main_loop_workers_sync_func == func || under_termination);
#endif

  g_assert(main_loop_workers_sync_func == NULL || main_loop_workers_sync_func == func);

  if (main_loop_workers_running == 0)
    {
      func();
    }
  else
    {
      main_loop_workers_sync_func = func;
      _request_all_threads_to_exit();
    }
}

void
main_loop_worker_init(void)
{
  IV_TASK_INIT(&main_loop_workers_reenable_jobs_task);
  main_loop_workers_reenable_jobs_task.handler = _reenable_worker_jobs;

}

void
main_loop_worker_deinit(void)
{
}