File: logthrdestdrv.h

package info (click to toggle)
syslog-ng 4.8.1-6
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 20,456 kB
  • sloc: ansic: 177,631; python: 13,035; cpp: 11,611; makefile: 7,012; sh: 5,147; java: 3,651; xml: 3,344; yacc: 1,377; lex: 599; perl: 193; awk: 190; objc: 162
file content (326 lines) | stat: -rw-r--r-- 10,573 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
/*
 * Copyright (c) 2013, 2014 Balabit
 * Copyright (c) 2013, 2014 Gergely Nagy <algernon@balabit.hu>
 *
 * This library is free software; you can redistribute it and/or
 * modify it under the terms of the GNU Lesser General Public
 * License as published by the Free Software Foundation; either
 * version 2.1 of the License, or (at your option) any later version.
 *
 * This library is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 * Lesser General Public License for more details.
 *
 * You should have received a copy of the GNU Lesser General Public
 * License along with this library; if not, write to the Free Software
 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
 *
 * As an additional exemption you are allowed to compile & link against the
 * OpenSSL libraries as published by the OpenSSL project. See the file
 * COPYING for details.
 *
 */

#ifndef LOGTHRDESTDRV_H
#define LOGTHRDESTDRV_H

#include "syslog-ng.h"
#include "driver.h"
#include "stats/stats-registry.h"
#include "stats/aggregator/stats-aggregator.h"
#include "stats/stats-compat.h"
#include "stats/stats-cluster-key-builder.h"
#include "logqueue.h"
#include "seqnum.h"
#include "mainloop-threaded-worker.h"
#include "timeutils/misc.h"
#include "template/templates.h"

#include <iv.h>
#include <iv_event.h>

typedef enum
{
  /* flush modes */

  /* flush the infligh messages */
  LTF_FLUSH_NORMAL,

  /* expedite flush, to be used at reload, when the persistency of the queue
   * contents is ensured */
  LTF_FLUSH_EXPEDITE,
} LogThreadedFlushMode;

typedef enum
{
  LTR_DROP,
  LTR_ERROR,
  LTR_EXPLICIT_ACK_MGMT,
  LTR_SUCCESS,
  LTR_QUEUED,
  LTR_NOT_CONNECTED,
  LTR_RETRY,
  LTR_MAX
} LogThreadedResult;

enum
{
  LTDF_SEQNUM_ALL = 0x0001,
  LTDF_SEQNUM = 0x0002,
  /* NOTE: everything >= 0x1000 is driver specific */
};

typedef struct _LogThreadedDestDriver LogThreadedDestDriver;
typedef struct _LogThreadedDestWorker LogThreadedDestWorker;

struct _LogThreadedDestWorker
{
  MainLoopThreadedWorker thread;
  LogQueue *queue;
  struct iv_task  do_work;
  struct iv_event wake_up_event;
  struct iv_event shutdown_event;
  struct iv_timer timer_reopen;
  struct iv_timer timer_throttle;
  struct iv_timer timer_flush;

  LogThreadedDestDriver *owner;

  gint worker_index;
  gboolean connected;
  gint batch_size;
  gint rewound_batch_size;
  gint retries_on_error_counter;
  guint retries_counter;
  gint32 seq_num;
  struct timespec last_flush_time;
  gboolean enable_batching;
  gboolean suspended;
  time_t time_reopen;

  struct
  {
    GString *last_key;
  } partitioning;

  struct
  {
    StatsClusterKey *output_event_bytes_sc_key;
    StatsClusterKey *output_unreachable_key;
    StatsClusterKey *message_delay_sample_key;
    StatsClusterKey *message_delay_sample_age_key;

    StatsByteCounter written_bytes;
    StatsCounterItem *output_unreachable;
    StatsCounterItem *message_delay_sample;
    StatsCounterItem *message_delay_sample_age;

    gint64 last_delay_update;
  } metrics;

  gboolean (*init)(LogThreadedDestWorker *s);
  void (*deinit)(LogThreadedDestWorker *s);
  gboolean (*connect)(LogThreadedDestWorker *s);
  void (*disconnect)(LogThreadedDestWorker *s);
  LogThreadedResult (*insert)(LogThreadedDestWorker *s, LogMessage *msg);
  LogThreadedResult (*flush)(LogThreadedDestWorker *s, LogThreadedFlushMode mode);
  void (*free_fn)(LogThreadedDestWorker *s);
};

const gchar *log_threaded_result_to_str(LogThreadedResult self);

struct _LogThreadedDestDriver
{
  LogDestDriver super;

  struct
  {
    StatsClusterKey *output_events_sc_key;
    StatsClusterKey *processed_sc_key;
    StatsClusterKey *output_event_retries_sc_key;

    StatsCounterItem *dropped_messages;
    StatsCounterItem *processed_messages;
    StatsCounterItem *written_messages;
    StatsCounterItem *output_event_retries;

    gboolean raw_bytes_enabled;

    StatsAggregator *max_message_size;
    StatsAggregator *average_messages_size;
    StatsAggregator *max_batch_size;
    StatsAggregator *average_batch_size;
    StatsAggregator *CPS;
  } metrics;

  gint batch_lines;
  gint batch_timeout;
  gboolean under_termination;
  time_t time_reopen;
  gint retries_on_error_max;
  guint retries_max;

  struct
  {
    LogThreadedDestWorker *(*construct)(LogThreadedDestDriver *s, gint worker_index);

    /* this is a compatibility layer that can be removed once all drivers have
     * been migrated to the use of LogThreadedDestWorker based interface.
     * Right now, if a driver is not overriding the Worker instance, we would
     * be calling these methods from the functions named `_compat_*()`. */
    LogThreadedDestWorker instance;
    void (*thread_init)(LogThreadedDestDriver *s);
    void (*thread_deinit)(LogThreadedDestDriver *s);
    gboolean (*connect)(LogThreadedDestDriver *s);
    void (*disconnect)(LogThreadedDestDriver *s);
    LogThreadedResult (*insert)(LogThreadedDestDriver *s, LogMessage *msg);
    LogThreadedResult (*flush)(LogThreadedDestDriver *s);
  } worker;

  LogThreadedDestWorker **workers;
  gint num_workers;
  gint created_workers;
  guint last_worker;

  gboolean flush_on_key_change;
  LogTemplate *worker_partition_key;
  gint stats_source;

  /* this counter is not thread safe if there are multiple worker threads,
   * in that case, one needs to use LogThreadedDestWorker->seq_num, which is
   * static for a single insert() invocation, whereas this might be
   * increased in parallel by the multiple threads. */

  gint32 shared_seq_num;
  guint32 flags;

  const gchar *(*format_stats_key)(LogThreadedDestDriver *s, StatsClusterKeyBuilder *kb);
};

static inline gboolean
log_threaded_dest_worker_init(LogThreadedDestWorker *self)
{
  if (self->init)
    return self->init(self);
  return TRUE;
}

static inline void
log_threaded_dest_worker_deinit(LogThreadedDestWorker *self)
{
  if (self->deinit)
    self->deinit(self);
}

static inline gboolean
log_threaded_dest_worker_connect(LogThreadedDestWorker *self)
{
  if (self->connect)
    self->connected = self->connect(self);
  else
    self->connected = TRUE;


  stats_counter_set(self->metrics.output_unreachable, !self->connected);
  return self->connected;
}

static inline void
log_threaded_dest_worker_disconnect(LogThreadedDestWorker *self)
{
  if (self->disconnect)
    self->disconnect(self);
  self->connected = FALSE;
  stats_counter_set(self->metrics.output_unreachable, !self->connected);
}

static inline LogThreadedResult
log_threaded_dest_worker_insert(LogThreadedDestWorker *self, LogMessage *msg)
{
  if ((self->owner->flags & LTDF_SEQNUM) &&
      ((self->owner->flags & LTDF_SEQNUM_ALL) || (msg->flags & LF_LOCAL)))
    {
      if (self->owner->num_workers > 1)
        self->seq_num = step_sequence_number_atomic(&self->owner->shared_seq_num);
      else
        self->seq_num = step_sequence_number(&self->owner->shared_seq_num);
    }
  else
    self->seq_num = 0;

  LogThreadedResult result = self->insert(self, msg);

  if (self->metrics.message_delay_sample
      && (result == LTR_QUEUED || result == LTR_SUCCESS || result == LTR_EXPLICIT_ACK_MGMT))
    {
      UnixTime now;

      unix_time_set_now(&now);
      gint64 diff_msec = unix_time_diff_in_msec(&now, &msg->timestamps[LM_TS_RECVD]);

      if (self->metrics.last_delay_update != now.ut_sec)
        {
          stats_counter_set_time(self->metrics.message_delay_sample, diff_msec);
          stats_counter_set_time(self->metrics.message_delay_sample_age, now.ut_sec);
          self->metrics.last_delay_update = now.ut_sec;
        }
    }

  return result;
}

static inline LogThreadedResult
log_threaded_dest_worker_flush(LogThreadedDestWorker *self, LogThreadedFlushMode mode)
{
  LogThreadedResult result = LTR_SUCCESS;

  if (self->flush)
    result = self->flush(self, mode);
  iv_validate_now();
  self->last_flush_time = iv_now;
  return result;
}

/* function for drivers that are not yet using the worker API */
static inline LogThreadedResult
log_threaded_dest_driver_flush(LogThreadedDestDriver *self)
{
  return log_threaded_dest_worker_flush(&self->worker.instance, LTF_FLUSH_NORMAL);
}

void log_threaded_dest_worker_ack_messages(LogThreadedDestWorker *self, gint batch_size);
void log_threaded_dest_worker_drop_messages(LogThreadedDestWorker *self, gint batch_size);
void log_threaded_dest_worker_rewind_messages(LogThreadedDestWorker *self, gint batch_size);
void log_threaded_dest_worker_wakeup_when_suspended(LogThreadedDestWorker *self);
gboolean log_threaded_dest_worker_init_method(LogThreadedDestWorker *self);
void log_threaded_dest_worker_deinit_method(LogThreadedDestWorker *self);
void log_threaded_dest_worker_init_instance(LogThreadedDestWorker *self,
                                            LogThreadedDestDriver *owner,
                                            gint worker_index);
void log_threaded_dest_worker_free_method(LogThreadedDestWorker *self);
void log_threaded_dest_worker_free(LogThreadedDestWorker *self);

void log_threaded_dest_worker_written_bytes_add(LogThreadedDestWorker *self, gsize b);
void log_threaded_dest_driver_insert_msg_length_stats(LogThreadedDestDriver *self, gsize len);
void log_threaded_dest_driver_insert_batch_length_stats(LogThreadedDestDriver *self, gsize len);
void log_threaded_dest_driver_register_aggregated_stats(LogThreadedDestDriver *self);
void log_threaded_dest_driver_unregister_aggregated_stats(LogThreadedDestDriver *self);

gboolean log_threaded_dest_driver_deinit_method(LogPipe *s);
gboolean log_threaded_dest_driver_init_method(LogPipe *s);
gboolean log_threaded_dest_driver_start_workers(LogPipe *s);

void log_threaded_dest_driver_init_instance(LogThreadedDestDriver *self, GlobalConfig *cfg);
void log_threaded_dest_driver_free(LogPipe *s);

void log_threaded_dest_driver_set_max_retries_on_error(LogDriver *s, gint max_retries);
void log_threaded_dest_driver_set_num_workers(LogDriver *s, gint num_workers);
void log_threaded_dest_driver_set_worker_partition_key_ref(LogDriver *s, LogTemplate *key);
void log_threaded_dest_driver_set_flush_on_worker_key_change(LogDriver *s, gboolean f);
void log_threaded_dest_driver_set_batch_lines(LogDriver *s, gint batch_lines);
void log_threaded_dest_driver_set_batch_timeout(LogDriver *s, gint batch_timeout);
void log_threaded_dest_driver_set_time_reopen(LogDriver *s, time_t time_reopen);
gboolean log_threaded_dest_driver_process_flag(LogDriver *driver, const gchar *flag);

#endif