1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247
|
/* Definition of the queue support module.
*
* Copyright 2008-2019 Rainer Gerhards and Adiscon GmbH.
*
* This file is part of the rsyslog runtime library.
*
* The rsyslog runtime library is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* The rsyslog runtime library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with the rsyslog runtime library. If not, see <http://www.gnu.org/licenses/>.
*
* A copy of the GPL can be found in the file "COPYING" in this distribution.
* A copy of the LGPL can be found in the file "COPYING.LESSER" in this distribution.
*/
#ifndef QUEUE_H_INCLUDED
#define QUEUE_H_INCLUDED
#include <pthread.h>
#include "obj.h"
#include "wtp.h"
#include "batch.h"
#include "stream.h"
#include "statsobj.h"
#include "cryprov.h"
/* support for the toDelete list */
typedef struct toDeleteLst_s toDeleteLst_t;
struct toDeleteLst_s {
qDeqID deqID;
int nElemDeq; /* numbe of elements that were dequeued and as such must now be discarded */
struct toDeleteLst_s *pNext;
};
/* queue types */
typedef enum {
QUEUETYPE_FIXED_ARRAY = 0,/* a simple queue made out of a fixed (initially malloced) array fast but memoryhog */
QUEUETYPE_LINKEDLIST = 1, /* linked list used as buffer, lower fixed memory overhead but slower */
QUEUETYPE_DISK = 2, /* disk files used as buffer */
QUEUETYPE_DIRECT = 3 /* no queuing happens, consumer is directly called */
} queueType_t;
/* list member definition for linked list types of queues: */
typedef struct qLinkedList_S {
struct qLinkedList_S *pNext;
smsg_t *pMsg;
} qLinkedList_t;
/* the queue object */
struct queue_s {
BEGINobjInstance;
queueType_t qType;
int nLogDeq; /* number of elements currently logically dequeued */
int bShutdownImmediate; /* should all workers cease processing messages? */
sbool bEnqOnly; /* does queue run in enqueue-only mode (1) or not (0)? */
sbool bSaveOnShutdown;/* persists everthing on shutdown (if DA!)? 1-yes, 0-no */
sbool bQueueStarted; /* has queueStart() been called on this queue? 1-yes, 0-no */
sbool takeFlowCtlFromMsg;/* override enq flow ctl by message property? */
int iQueueSize; /* Current number of elements in the queue */
int iMaxQueueSize; /* how large can the queue grow? */
int iNumWorkerThreads;/* number of worker threads to use */
int iCurNumWrkThrd;/* current number of active worker threads */
int iMinMsgsPerWrkr;
/* minimum nbr of msgs per worker thread, if more, a new worker is started until max wrkrs */
wtp_t *pWtpDA;
wtp_t *pWtpReg;
action_t *pAction; /* for action queues, ptr to action object; for main queues unused */
int iUpdsSincePersist;/* nbr of queue updates since the last persist call */
int iPersistUpdCnt; /* persits queue info after this nbr of updates - 0 -> persist only on shutdown */
sbool bSyncQueueFiles;/* if working with files, sync them after each write? */
int iHighWtrMrk; /* high water mark for disk-assisted memory queues */
int iLowWtrMrk; /* low water mark for disk-assisted memory queues */
int iDiscardMrk; /* if the queue is above this mark, low-severity messages are discarded */
int iFullDlyMrk; /* if the queue is above this mark, FULL_DELAYable message are put on hold */
int iLightDlyMrk; /* if the queue is above this mark, LIGHT_DELAYable message are put on hold */
int iDiscardSeverity;/* messages of this severity above are discarded on too-full queue */
sbool bNeedDelQIF; /* does the QIF file need to be deleted when queue becomes empty? */
int toQShutdown; /* timeout for regular queue shutdown in ms */
int toActShutdown; /* timeout for long-running action shutdown in ms */
int toWrkShutdown; /* timeout for idle workers in ms, -1 means indefinite (0 is immediate) */
toDeleteLst_t *toDeleteLst;/* this queue's to-delete list */
int toEnq; /* enqueue timeout */
int iDeqBatchSize; /* max number of elements that shall be dequeued at once */
int iMinDeqBatchSize;/* min number of elements that shall be dequeued at once */
int toMinDeqBatchSize;/* timeout for MinDeqBatchSize, in ms */
/* rate limiting settings (will be expanded) */
int iDeqSlowdown; /* slow down dequeue by specified nbr of microseconds */
/* end rate limiting */
/* dequeue time window settings (may also be expanded) */
int iDeqtWinFromHr; /* begin of dequeue time window (hour only) */
int iDeqtWinToHr; /* end of dequeue time window (hour only), set to 25 to disable deq window! */
/* note that begin and end have specific semantics. It is a big difference if we have
* begin 4, end 22 or begin 22, end 4. In the later case, dequeuing will run from 10p,
* throughout the night and stop at 4 in the morning. In the first case, it will start
* at 4am, run throughout the day, and stop at 10 in the evening! So far, not logic is
* applied to detect user configuration errors (and tell me how should we detect what
* the user really wanted...). -- rgerhards, 2008-04-02
*/
/* end dequeue time window */
rsRetVal (*pConsumer)(void *,batch_t*, wti_t*); /* user-supplied consumer function for dequeued messages */
/* calling interface for pConsumer: arg1 is the global user pointer from this structure, arg2 is the
* user pointer array that was dequeued (actual sample: for actions, arg1 is the pAction and arg2
* is pointer to an array of message message pointers)
*/
/* type-specific handlers (set during construction) */
rsRetVal (*qConstruct)(struct queue_s *pThis);
rsRetVal (*qDestruct)(struct queue_s *pThis);
rsRetVal (*qAdd)(struct queue_s *pThis, smsg_t *pMsg);
rsRetVal (*qDeq)(struct queue_s *pThis, smsg_t **ppMsg);
rsRetVal (*qDel)(struct queue_s *pThis);
/* end type-specific handler */
/* public entry points (set during construction, permit to set best algorithm for params selected) */
rsRetVal (*MultiEnq)(qqueue_t *pThis, multi_submit_t *pMultiSub);
/* end public entry points */
/* synchronization variables */
pthread_mutex_t mutThrdMgmt; /* mutex for the queue's thread management */
pthread_mutex_t *mut; /* mutex for enqueing and dequeueing messages */
pthread_cond_t notFull;
pthread_cond_t belowFullDlyWtrMrk; /* below eFLOWCTL_FULL_DELAY watermark */
pthread_cond_t belowLightDlyWtrMrk; /* below eFLOWCTL_FULL_DELAY watermark */
int bThrdStateChanged; /* at least one thread state has changed if 1 */
/* end sync variables */
/* the following variables are always present, because they
* are not only used for the "disk" queueing mode but also for
* any other queueing mode if it is set to "disk assisted".
* rgerhards, 2008-01-09
*/
uchar *pszSpoolDir;
size_t lenSpoolDir;
uchar *pszFilePrefix;
size_t lenFilePrefix;
uchar *pszQIFNam; /* full .qi file name, based on parts above */
size_t lenQIFNam;
int iNumberFiles; /* how many files make up the queue? */
int64 iMaxFileSize; /* max size for a single queue file */
int64 sizeOnDiskMax; /* maximum size on disk allowed */
qDeqID deqIDAdd; /* next dequeue ID to use during add to queue store */
qDeqID deqIDDel; /* queue store delete position */
int bIsDA; /* is this queue disk assisted? */
struct queue_s *pqDA; /* queue for disk-assisted modes */
struct queue_s *pqParent;/* pointer to the parent (if this is a child queue) */
int bDAEnqOnly; /* EnqOnly setting for DA queue */
/* now follow queueing mode specific data elements */
//union { /* different data elements based on queue type (qType) */
struct { /* different data elements based on queue type (qType) */
struct {
long deqhead, head, tail;
void** pBuf; /* the queued user data structure */
} farray;
struct {
qLinkedList_t *pDeqRoot;
qLinkedList_t *pDelRoot;
qLinkedList_t *pLast;
} linklist;
struct {
int64 sizeOnDisk; /* current amount of disk space used */
int64 deqOffs; /* offset after dequeue batch - used for file deleter */
int deqFileNumIn; /* same for the circular file numbers, mainly for */
int deqFileNumOut;/* deleting finished files */
strm_t *pWrite; /* current file to be written */
strm_t *pReadDeq; /* current file for dequeueing */
strm_t *pReadDel; /* current file for deleting */
int nForcePersist;/* force persist of .qi file the next "n" times */
} disk;
} tVars;
sbool useCryprov; /* quicker than checkig ptr (1 vs 8 bytes!) */
uchar *cryprovName; /* crypto provider to use */
cryprov_if_t cryprov; /* ptr to crypto provider interface */
void *cryprovData; /* opaque data ptr for provider use */
uchar *cryprovNameFull;/* full internal crypto provider name */
DEF_ATOMIC_HELPER_MUT(mutQueueSize)
DEF_ATOMIC_HELPER_MUT(mutLogDeq)
/* for statistics subsystem */
statsobj_t *statsobj;
STATSCOUNTER_DEF(ctrEnqueued, mutCtrEnqueued)
STATSCOUNTER_DEF(ctrFull, mutCtrFull)
STATSCOUNTER_DEF(ctrFDscrd, mutCtrFDscrd)
STATSCOUNTER_DEF(ctrNFDscrd, mutCtrNFDscrd)
int ctrMaxqsize; /* NOT guarded by a mutex */
int iSmpInterval; /* line interval of sampling logs */
int isRunning;
};
/* the define below is an "eternal" timeout for the timeout settings which require a value.
* It is one day, which is not really eternal, but comes close to it if we think about
* rsyslog (e.g.: do you want to wait on shutdown for more than a day? ;))
* rgerhards, 2008-01-17
*/
#define QUEUE_TIMEOUT_ETERNAL 24 * 60 * 60 * 1000
/* prototypes */
rsRetVal qqueueDestruct(qqueue_t **ppThis);
rsRetVal qqueueEnqMsg(qqueue_t *pThis, flowControl_t flwCtlType, smsg_t *pMsg);
rsRetVal qqueueStart(rsconf_t *cnf, qqueue_t *pThis);
rsRetVal qqueueSetMaxFileSize(qqueue_t *pThis, size_t iMaxFileSize);
rsRetVal qqueueSetFilePrefix(qqueue_t *pThis, uchar *pszPrefix, size_t iLenPrefix);
rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThreads,
int iMaxQueueSize, rsRetVal (*pConsumer)(void*,batch_t*, wti_t *));
int queueCnfParamsSet(struct nvlst *lst);
rsRetVal qqueueApplyCnfParam(qqueue_t *pThis, struct nvlst *lst);
void qqueueSetDefaultsRulesetQueue(qqueue_t *pThis);
void qqueueSetDefaultsActionQueue(qqueue_t *pThis);
void qqueueDbgPrint(qqueue_t *pThis);
rsRetVal qqueueShutdownWorkers(qqueue_t *pThis);
void qqueueDoneLoadCnf(void);
int queuesEqual(qqueue_t *pOld, qqueue_t *pNew);
void qqueueCorrectParams(qqueue_t *pThis);
PROTOTYPEObjClassInit(qqueue);
PROTOTYPEpropSetMeth(qqueue, iPersistUpdCnt, int);
PROTOTYPEpropSetMeth(qqueue, bSyncQueueFiles, int);
PROTOTYPEpropSetMeth(qqueue, iDeqtWinFromHr, int);
PROTOTYPEpropSetMeth(qqueue, iDeqtWinToHr, int);
PROTOTYPEpropSetMeth(qqueue, toQShutdown, long);
PROTOTYPEpropSetMeth(qqueue, toActShutdown, long);
PROTOTYPEpropSetMeth(qqueue, toWrkShutdown, long);
PROTOTYPEpropSetMeth(qqueue, toEnq, long);
PROTOTYPEpropSetMeth(qqueue, iLightDlyMrk, int);
PROTOTYPEpropSetMeth(qqueue, iHighWtrMrk, int);
PROTOTYPEpropSetMeth(qqueue, iLowWtrMrk, int);
PROTOTYPEpropSetMeth(qqueue, iDiscardMrk, int);
PROTOTYPEpropSetMeth(qqueue, iDiscardSeverity, int);
PROTOTYPEpropSetMeth(qqueue, iMinMsgsPerWrkr, int);
PROTOTYPEpropSetMeth(qqueue, iNumWorkerThreads, int);
PROTOTYPEpropSetMeth(qqueue, bSaveOnShutdown, int);
PROTOTYPEpropSetMeth(qqueue, pAction, action_t*);
PROTOTYPEpropSetMeth(qqueue, iDeqSlowdown, int);
PROTOTYPEpropSetMeth(qqueue, sizeOnDiskMax, int64);
PROTOTYPEpropSetMeth(qqueue, iDeqBatchSize, int);
#define qqueueGetID(pThis) ((unsigned long) pThis)
#ifdef ENABLE_IMDIAG
extern unsigned int iOverallQueueSize;
#endif
#endif /* #ifndef QUEUE_H_INCLUDED */
|