1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332
|
/*
* Copyright (c) 2017 Balabit
* Copyright (c) 2017 Balazs Scheidler <balazs.scheidler@balabit.com>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
*
* As an additional exemption you are allowed to compile & link against the
* OpenSSL libraries as published by the OpenSSL project. See the file
* COPYING for details.
*
*/
#include "scratch-buffers.h"
#include "tls-support.h"
#include "stats/stats-registry.h"
#include "timeutils/cache.h"
#include "messages.h"
#include "apphook.h"
#include <iv.h>
/*
* scratch_buffers
*
* The design of this API is to allow call-sites to use GString based string
* buffers easily, without incurring the allocation overhead at all such
* invocations and the need to always explicitly free them.
*
* Use-cases:
* - something needs a set of GString buffers, such that their number is
* not determinable at init time, so no preallocation is possible.
* Allocating/freeing these buffers on-demand generates a non-trivial
* overhead.
*
* - We need a GString buffer that is needed for the duration of the
* processing of the given message, for example CSVScanner/KVScanner's
* buffers. Right now, these are locked data structures or GString's
* are allocated on demand, both of which slows them down. By using
* this API, these could reuse existing allocations and only free them
* once processing is finished of the given message.
*
* Design principles:
* - you can allocate a GString instance using scratch_buffers_alloc()
*
* - these are thread specific allocations and should not be leaked
* through thread boundaries, not until your own next invocation
*
* - you need to assume that once you return from your functions, the
* previous allocations are gone!
*
* - you don't need to free these allocations, as they get released
* automatically at the end of the message processing by the worker
* thread machinery
*
* - if you produce too much garbage, you can still explicitly free these
* buffers in bulk using the mark/reclaim functions.
*
* - reclaim works by freeing _all_ allocations up to a mark, so even
* those that are allocated by the functions you called. Please make
* sure that those do not hold references after returning.
*
* Other notes:
* - this is not a complete garbage collector, but a very simple allocator for
* buffers that creates coupling between various/independent parts of
* the codebase.
*
* - ONLY USE IT IF YOU UNDERSTAND THE IMPLICATIONS AND PROVIDES A
* MEASURABLE PERFORMANCE IMPACT.
*/
TLS_BLOCK_START
{
GPtrArray *scratch_buffers;
gint scratch_buffers_used;
glong scratch_buffers_bytes_reported;
time_t scratch_buffers_time_of_last_maintenance;
struct iv_task scratch_buffers_gc;
gboolean scratch_buffers_gc_executed;
}
TLS_BLOCK_END;
/* accessed by the test program */
StatsCounterItem *stats_scratch_buffers_count;
StatsCounterItem *stats_scratch_buffers_bytes;
#define scratch_buffers __tls_deref(scratch_buffers)
#define scratch_buffers_used __tls_deref(scratch_buffers_used)
#define scratch_buffers_bytes_reported __tls_deref(scratch_buffers_bytes_reported)
#define scratch_buffers_time_of_last_maintenance __tls_deref(scratch_buffers_time_of_last_maintenance)
#define scratch_buffers_gc __tls_deref(scratch_buffers_gc)
#define scratch_buffers_gc_executed __tls_deref(scratch_buffers_gc_executed)
/* update allocation counters once every period, in seconds */
#define SCRATCH_BUFFERS_MAINTENANCE_PERIOD 5
static void _register_gc_task(void);
void
scratch_buffers_mark(ScratchBuffersMarker *marker)
{
*marker = scratch_buffers_used;
}
GString *
scratch_buffers_alloc_and_mark(ScratchBuffersMarker *marker)
{
_register_gc_task();
if (marker)
scratch_buffers_mark(marker);
if (scratch_buffers_used >= scratch_buffers->len)
{
g_ptr_array_add(scratch_buffers, g_string_sized_new(255));
stats_counter_inc(stats_scratch_buffers_count);
}
GString *buffer = g_ptr_array_index(scratch_buffers, scratch_buffers_used);
g_string_truncate(buffer, 0);
scratch_buffers_used++;
return buffer;
}
GString *
scratch_buffers_alloc(void)
{
return scratch_buffers_alloc_and_mark(NULL);
}
void
scratch_buffers_reclaim_allocations(void)
{
scratch_buffers_reclaim_marked(0);
}
void
scratch_buffers_reclaim_marked(ScratchBuffersMarker marker)
{
scratch_buffers_used = marker;
}
/* get a snapshot of the global allocation counter, can be racy */
gint
scratch_buffers_get_global_allocation_count(void)
{
return stats_counter_get(stats_scratch_buffers_count);
}
/* get the number of thread-local allocations does not race */
gint
scratch_buffers_get_local_allocation_count(void)
{
return scratch_buffers->len;
}
glong
scratch_buffers_get_local_allocation_bytes(void)
{
glong bytes = 0;
for (gint i = 0; i < scratch_buffers->len; i++)
{
GString *str = g_ptr_array_index(scratch_buffers, i);
bytes += str->allocated_len;
}
return bytes;
}
gint
scratch_buffers_get_local_usage_count(void)
{
return scratch_buffers_used;
}
void
scratch_buffers_update_stats(void)
{
glong prev_reported = scratch_buffers_bytes_reported;
scratch_buffers_bytes_reported = scratch_buffers_get_local_allocation_bytes();
stats_counter_add(stats_scratch_buffers_bytes, -prev_reported + scratch_buffers_bytes_reported);
}
void
scratch_buffers_allocator_init(void)
{
scratch_buffers = g_ptr_array_sized_new(256);
}
void
scratch_buffers_allocator_deinit(void)
{
/* check if GC was executed */
if (scratch_buffers_used > 0 && !scratch_buffers_gc_executed)
{
msg_warning("WARNING: an exiting thread left behind scratch buffers garbage without ever calling the GC. This message could indicate a memory leak",
evt_tag_int("count", scratch_buffers->len),
evt_tag_long("bytes", scratch_buffers_bytes_reported));
}
/* remove our values from stats */
stats_counter_add(stats_scratch_buffers_count, -scratch_buffers->len);
stats_counter_add(stats_scratch_buffers_bytes, -scratch_buffers_bytes_reported);
/* free thread local scratch buffers */
for (int i = 0; i < scratch_buffers->len; i++)
{
GString *buffer = g_ptr_array_index(scratch_buffers, i);
g_string_free(buffer, TRUE);
}
g_ptr_array_free(scratch_buffers, TRUE);
}
/*********************************************************
* Lazy stats update
*********************************************************/
static gboolean
_thread_maintenance_period_elapsed(void)
{
if (!scratch_buffers_time_of_last_maintenance)
return TRUE;
if (scratch_buffers_time_of_last_maintenance - cached_g_current_time_sec() >= SCRATCH_BUFFERS_MAINTENANCE_PERIOD)
return TRUE;
return FALSE;
}
static void
_thread_maintenance_update_time(void)
{
scratch_buffers_time_of_last_maintenance = cached_g_current_time_sec();
}
void
scratch_buffers_lazy_update_stats(void)
{
if (_thread_maintenance_period_elapsed())
{
scratch_buffers_update_stats();
_thread_maintenance_update_time();
}
}
/*********************************************************
* Ivykis task based garbage collector
*********************************************************/
void
scratch_buffers_explicit_gc(void)
{
scratch_buffers_lazy_update_stats();
scratch_buffers_reclaim_allocations();
scratch_buffers_gc_executed = TRUE;
}
static void
_garbage_collect_scratch_buffers(gpointer arg)
{
scratch_buffers_explicit_gc();
}
static void
_register_gc_task(void)
{
if (scratch_buffers_gc.handler && iv_inited() && !iv_task_registered(&scratch_buffers_gc))
iv_task_register(&scratch_buffers_gc);
}
void
scratch_buffers_automatic_gc_init(void)
{
IV_TASK_INIT(&scratch_buffers_gc);
if (iv_inited())
{
/* the automatic GC requires an ivykis thread */
scratch_buffers_gc.handler = _garbage_collect_scratch_buffers;
}
}
void
scratch_buffers_automatic_gc_deinit(void)
{
if (iv_task_registered(&scratch_buffers_gc))
iv_task_unregister(&scratch_buffers_gc);
}
void
scratch_buffers_register_stats(void)
{
StatsClusterKey sc_key;
stats_lock();
stats_cluster_logpipe_key_set(&sc_key, SCS_GLOBAL, "scratch_buffers_count", NULL);
stats_register_counter(0, &sc_key, SC_TYPE_QUEUED, &stats_scratch_buffers_count);
stats_cluster_logpipe_key_set(&sc_key, SCS_GLOBAL, "scratch_buffers_bytes", NULL);
stats_register_counter(0, &sc_key, SC_TYPE_QUEUED, &stats_scratch_buffers_bytes);
stats_unlock();
}
void
scratch_buffers_unregister_stats(void)
{
StatsClusterKey sc_key;
stats_lock();
stats_cluster_logpipe_key_set(&sc_key, SCS_GLOBAL, "scratch_buffers_count", NULL);
stats_unregister_counter(&sc_key, SC_TYPE_QUEUED, &stats_scratch_buffers_count);
stats_cluster_logpipe_key_set(&sc_key, SCS_GLOBAL, "scratch_buffers_bytes", NULL);
stats_unregister_counter(&sc_key, SC_TYPE_QUEUED, &stats_scratch_buffers_bytes);
stats_unlock();
}
void
scratch_buffers_global_init(void)
{
register_application_hook(AH_RUNNING, (ApplicationHookFunc) scratch_buffers_register_stats, NULL);
}
void
scratch_buffers_global_deinit(void)
{
scratch_buffers_unregister_stats();
}
|