1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356
|
.. include:: <isonum.txt>
*****************************
Imhiredis: Redis input plugin
*****************************
==================== =====================================
**Module Name:** **imhiredis**
**Author:** Jeremie Jourdin <jeremie.jourdin@advens.fr>
**Contributors:** Theo Bertin <theo.bertin@advens.fr>
==================== =====================================
Purpose
=======
Imhiredis is an input module reading arbitrary entries from Redis.
It uses the `hiredis library <https://github.com/redis/hiredis>`_ to query Redis instances using 3 modes:
- **queues**, using `LIST <https://redis.io/commands#list>`_ commands
- **channels**, using `SUBSCRIBE <https://redis.io/commands#pubsub>`_ commands
- **streams**, using `XREAD/XREADGROUP <https://redis.io/commands/?group=stream>`_ commands
.. _imhiredis_queue_mode:
Queue mode
----------
The **queue mode** uses Redis LISTs to push/pop messages to/from lists. It allows simple and efficient uses of Redis as a queueing system, providing both LIFO and FIFO methods.
This mode should be preferred if the user wants to use Redis as a caching system, with one (or many) Rsyslog instances POP'ing out entries.
.. Warning::
This mode was configured to provide optimal performances while not straining Redis, but as imhiredis has to poll the instance some trade-offs had to be made:
- imhiredis POPs entries by batches of 10 to improve performances (size of batch is configurable via the batchsize parameter)
- when no entries are left in the list, the module sleeps for 1 second before checking the list again. This means messages might be delayed by as much as 1 second between a push to the list and a pop by imhiredis (entries will still be POP'ed out as fast as possible while the list is not empty)
.. _imhiredis_channel_mode:
Channel mode
------------
The **subscribe** mode uses Redis PUB/SUB system to listen to messages published to Redis' channels. It allows performant use of Redis as a message broker.
This mode should be preferred to use Redis as a message broker, with zero, one or many subscribers listening to new messages.
.. Warning::
This mode shouldn't be used if messages are to be reliably processed, as messages published when no Imhiredis is listening will result in the loss of the message.
.. _imhiredis_stream_mode:
Stream mode
------------
The **stream** mode uses `Redis Streams system <https://redis.io/docs/data-types/streams/>`_ to read entries published to Redis' streams. It is a good alternative when:
- sharing work is desired
- not losing any log (even in the case of a crash) is mandatory
This mode is especially useful to define pools of workers that do various processing along the way, while ensuring not a single log is lost during processing by a worker.
.. note::
As Redis streams do not insert simple values in keys, but rather field/value pairs, this mode can also be useful when handling structured data. This is better shown with the examples for the parameter :ref:`imhiredis_fields`.
This mode also adds additional internal metadata to the message, it won't be included in json data or regular fields, but
- **$.redis!stream** will be added to the message, with the value of the source stream
- **$.redis!index** will be added to the message, with the exact ID of the entry
- **$.redis!group** will be added in the message (if :ref:`imhiredis_stream_consumergroup` is set), with the value of the group used to read the entry
- **$.redis!consumer** will be added in the message (if :ref:`imhiredis_stream_consumername` is set), with the value of the consumer name used to read the entry
This is especially useful when used with the omhiredis module, to allow it to get the required information semi-automatically (custom templates will still be required in the user configuration)
.. Warning::
This mode is the most reliable to handle entries stored in Redis, but it might also be the one with the most overhead. Although still minimal, make sure to test the different options and determine if this mode is right for you!
Master/Replica
--------------
This module is able to automatically connect to the master instance of a master/replica(s) cluster. Simply providing a valid connection entry point (being the current master or a valid replica), Imhiredis is able to redirect to the master node on startup and when states change between nodes.
Configuration Parameters
========================
.. note::
Parameter names are case-insensitive
Input Parameters
----------------
.. _imhiredis_mode:
mode
^^^^
.. csv-table::
:header: "type", "default", "mandatory", "|FmtObsoleteName| directive"
:widths: auto
:class: parameter-table
"word", "subscribe", "yes", "none"
| Defines the mode to use for the module.
| Should be either "**subscribe**" (:ref:`imhiredis_channel_mode`), "**queue**" (:ref:`imhiredis_queue_mode`) or "**stream**" (:ref:`imhiredis_stream_mode`) (case-sensitive).
ruleset
^^^^^^^
.. csv-table::
:header: "type", "default", "mandatory", "|FmtObsoleteName| directive"
:widths: auto
:class: parameter-table
"word", "none", "no", "none"
Assign messages from this input to a specific Rsyslog ruleset.
batchsize
^^^^^^^^^
.. csv-table::
:header: "type", "default", "mandatory", "|FmtObsoleteName| directive"
:widths: auto
:class: parameter-table
"number", "10", "yes", "none"
Defines the dequeue batch size for redis pipelining.
imhiredis will read "**batchsize**" elements from redis at a time.
When using the :ref:`imhiredis_queue_mode`, defines the size of the batch to use with LPOP / RPOP.
.. _imhiredis_key:
key
^^^
.. csv-table::
:header: "type", "default", "mandatory", "|FmtObsoleteName| directive"
:widths: auto
:class: parameter-table
"word", "none", "yes", "none"
Defines either the name of the list to use (for :ref:`imhiredis_queue_mode`) or the channel to listen to (for :ref:`imhiredis_channel_mode`).
.. _imhiredis_socketPath:
socketPath
^^^^^^^^^^
.. csv-table::
:header: "type", "default", "mandatory", "|FmtObsoleteName| directive"
:widths: auto
:class: parameter-table
"word", "no", "if no :ref:`imhiredis_server` provided", "none"
Defines the socket to use when trying to connect to Redis. Will be ignored if both :ref:`imhiredis_server` and :ref:`imhiredis_socketPath` are given.
.. _imhiredis_server:
server
^^^^^^
.. csv-table::
:header: "type", "default", "mandatory", "|FmtObsoleteName| directive"
:widths: auto
:class: parameter-table
"ip", "127.0.0.1", "if no :ref:`imhiredis_socketPath` provided", "none"
The Redis server's IP to connect to.
.. _imhiredis_port:
port
^^^^
.. csv-table::
:header: "type", "default", "mandatory", "|FmtObsoleteName| directive"
:widths: auto
:class: parameter-table
"number", "6379", "no", "none"
The Redis server's port to use when connecting via IP.
.. _imhiredis_password:
password
^^^^^^^^
.. csv-table::
:header: "type", "default", "mandatory", "|FmtObsoleteName| directive"
:widths: auto
:class: parameter-table
"word", "none", "no", "none"
The password to use when connecting to a Redis node, if necessary.
.. _imhiredis_uselpop:
uselpop
^^^^^^^
.. csv-table::
:header: "type", "default", "mandatory", "|FmtObsoleteName| directive"
:widths: auto
:class: parameter-table
"boolean", "no", "no", "none"
| When using the :ref:`imhiredis_queue_mode`, defines if imhiredis should use a LPOP instruction instead of a RPOP (the default).
| Has no influence on the :ref:`imhiredis_channel_mode` and will be ignored if set with this mode.
.. _imhiredis_stream_consumergroup:
stream.consumerGroup
^^^^^^^^^^^^^^^^^^^^
.. csv-table::
:header: "type", "default", "mandatory", "|FmtObsoleteName| directive"
:widths: auto
:class: parameter-table
"word", "", "no", "none"
| When using the :ref:`imhiredis_stream_mode`, defines a consumer group name to use (see `the XREADGROUP documentation <https://redis.io/commands/xreadgroup/>`_ for details). This parameter activates the use of **XREADGROUP** commands, in replacement to simple XREADs.
| Has no influence in the other modes (queue or channel) and will be ignored.
.. note::
If this parameter is set, :ref:`imhiredis_stream_consumername` should also be set
.. _imhiredis_stream_consumername:
stream.consumerName
^^^^^^^^^^^^^^^^^^^
.. csv-table::
:header: "type", "default", "mandatory", "|FmtObsoleteName| directive"
:widths: auto
:class: parameter-table
"word", "", "no", "none"
| When using the :ref:`imhiredis_stream_mode`, defines a consumer name to use (see `the XREADGROUP documentation <https://redis.io/commands/xreadgroup/>`_ for details). This parameter activates the use of **XREADGROUP** commands, in replacement to simple XREADs.
| Has no influence in the other modes (queue or channel) and will be ignored.
.. note::
If this parameter is set, :ref:`imhiredis_stream_consumergroup` should also be set
.. _imhiredis_stream_readfrom:
stream.readFrom
^^^^^^^^^^^^^^^
.. csv-table::
:header: "type", "default", "mandatory", "|FmtObsoleteName| directive"
:widths: auto
:class: parameter-table
"word", "$", "no", "none"
| When using the :ref:`imhiredis_stream_mode`, defines the `starting ID <https://redis.io/docs/data-types/streams-tutorial/#entry-ids>`_ for XREAD/XREADGROUP commands (can also use special IDs, see `documentation <https://redis.io/docs/data-types/streams-tutorial/#special-ids-in-the-streams-api>`_).
| Has no influence in the other modes (queue or channel) and will be ignored.
.. _imhiredis_stream_consumerack:
stream.consumerACK
^^^^^^^^^^^^^^^^^^
.. csv-table::
:header: "type", "default", "mandatory", "|FmtObsoleteName| directive"
:widths: auto
:class: parameter-table
"boolean", "on", "no", "none"
| When using :ref:`imhiredis_stream_mode` with :ref:`imhiredis_stream_consumergroup` and :ref:`imhiredis_stream_consumername`, determines if the module should directly acknowledge the ID once read from the Consumer Group.
| Has no influence in the other modes (queue or channel) and will be ignored.
.. note::
When using Consumer Groups and imhiredis, omhiredis can also integrate with this workflow to acknowledge a processed message once put back in another stream (or somewhere else). This parameter is then useful set to **off** to let the omhiredis module acknowledge the input ID once the message is correctly sent.
.. _imhiredis_stream_autoclaimidletime:
stream.autoclaimIdleTime
^^^^^^^^^^^^^^^^^^^^^^^^
.. csv-table::
:header: "type", "default", "mandatory", "|FmtObsoleteName| directive"
:widths: auto
:class: parameter-table
"positive number", "0", "no", "none"
| When using :ref:`imhiredis_stream_mode` with :ref:`imhiredis_stream_consumergroup` and :ref:`imhiredis_stream_consumername`, determines if the module should check for pending IDs that exceed this time (**in milliseconds**) to assume the original consumer failed to acknowledge the log and claim them for their own (see `the redis documentation <https://redis.io/docs/data-types/streams-tutorial/#automatic-claiming>`_ on this subject for more details on how that works).
| Has no influence in the other modes (queue or channel) and will be ignored.
.. note::
If this parameter is set, the AUTOCLAIM operation will also take into account the specified :ref:`imhiredis_stream_readfrom` parameter. **If its value is '$' (default), the AUTOCLAIM commands will use '0-0' as the starting ID**.
.. _imhiredis_fields:
fields
^^^^^^
.. csv-table::
:header: "type", "default", "mandatory", "|FmtObsoleteName| directive"
:widths: auto
:class: parameter-table
"array", "[]", "no", "none"
| When using :ref:`imhiredis_stream_mode`, the module won't get a simple entry but will instead get hashes, with field/value pairs.
| By default, the module will insert every value into their respective field in the **$!** object, but this parameter can change this behaviour, for each entry the value will be a string where:
- if the entry begins with a **!** or a **.**, it will be taken as a key to take into the original entry
- if the entry doesn't begin with a **!** or a **.**, the value will be taken verbatim
- in addition, if the value is prefixed with a **:<key>:** pattern, the value (verbatim or taken from the entry) will be inserted in this specific key (or subkey)
*Examples*:
.. csv-table::
:header: "configuration", "result"
:widths: auto
:class: parameter-table
``["static_value"]``, the value "static_value" will be inserted in $!static_value
``[":key:static_value"]``, the value "static_value" will be inserted in $!key
``["!field"]``, the value of the field "field" will be inserted in $!field
``[":key!subkey:!field"]``, the value of the field "field" will be inserted in $!key!subkey
|