1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111
|
-- This Source Code Form is subject to the terms of the Mozilla Public
-- License, v. 2.0. If a copy of the MPL was not distributed with this
-- file, You can obtain one at http://mozilla.org/MPL/2.0/.
--[[
# UDP and UNIX Socket Input
## Sample Configuration
```lua
filename = "udp.lua"
instruction_limit = 0
-- address (string) - An IP address (* for all interfaces), or a path to a UNIX
-- socket.
-- Default:
-- address = "127.0.0.1"
-- port (integer) - IP port to listen on (ignored for UNIX socket).
-- Default:
-- port = 514
-- default_headers (table) - Sets the message headers to these values if they
-- are not set by the decoder.
-- This input will always default the Fields.sender_ip and Fields.sender_port
-- for non UNIX sockets.
-- Default:
-- default_headers = nil
-- Specify a module that will decode the raw data and inject the resulting message.
-- Default:
-- decoder_module = "decoders.heka.protobuf"
-- send_decode_failures (bool) - When true a decode failure will inject a
-- message with the following structure:
-- msg.Type = "error.<category>"
-- msg.Payload = "<error message>"
-- msg.Fields.data = "<data that failed decode>"
-- Default
-- send_decode_failures = false
```
--]]
local socket = require "socket"
local address = read_config("address") or "127.0.0.1"
local is_unixsock = address:sub(1,1) == "/"
local port = read_config("port") or 514
local default_headers = read_config("default_headers")
assert(default_headers == nil or type(default_headers) == "table", "invalid default_headers cfg")
local decoder_module = read_config("decoder_module") or "decoders.heka.protobuf"
local decode = require(decoder_module).decode
if not decode then
error(decoder_module .. " does not provide a decode function")
end
local send_decode_failures = read_config("send_decode_failures")
local err_msg = {
Type = nil,
Payload = nil,
Fields = {
data = nil
}
}
local server
if is_unixsock then
socket.unix = require "socket.unix"
server = assert(socket.unix.udp())
require "os"
os.remove(address)
assert(server:bind(address))
else
server = assert(socket.udp())
assert(server:setsockname(address, port))
server:settimeout(1)
if not default_headers then default_headers = {} end
local port = {value = 0, value_type = 2}
default_headers.Fields = { sender_port = port }
err_msg.Fields.sender_port = port
end
local is_running = is_running
function process_message()
while is_running() do
local data, remote, port = server:receivefrom()
if data then
if not is_unixsock then
default_headers.Fields.sender_ip = remote
default_headers.Fields.sender_port.value = port
end
local ok, err = pcall(decode, data, default_headers)
if (not ok or err) and send_decode_failures then
err_msg.Type = "error.decode"
err_msg.Payload = err
err_msg.Fields.data = data
if not is_unixsock then
err_msg.Fields.sender_ip = remote
-- port is already set in the shared table
end
pcall(inject_message, err_msg)
end
elseif remote ~= "timeout" then
err_msg.Type = "error.closed"
err_msg.Payload = remote
err_msg.Fields.data = nil
pcall(inject_message, err_msg)
end
end
return 0
end
|