1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119
|
-- This Source Code Form is subject to the terms of the Mozilla Public
-- License, v. 2.0. If a copy of the MPL was not distributed with this
-- file, You can obtain one at http://mozilla.org/MPL/2.0/.
--[[
# Heka Compatible TCP Input
## Sample Configuration
```lua
filename = "heka_tcp.lua"
instruction_limit = 0
-- address (string) - an IP address (* for all interfaces)
-- Default:
-- address = "127.0.0.1"
-- port (integer) - IP port to listen on (ignored for UNIX socket)
-- Default:
-- port = 5565
-- Size of the read chunks
-- Default:
-- buf_size = 1024 * 32
ssl_params = {
mode = "server",
protocol = "tlsv1",
key = "/etc/hindsight/certs/serverkey.pem",
certificate = "/etc/hindsight/certs/server.pem",
cafile = "/etc/hindsight/certs/CA.pem",
verify = {"peer", "fail_if_no_peer_cert"},
options = {"all", "no_sslv3"}
}
```
--]]
require "coroutine"
local socket = require "socket"
require "string"
require "table"
local address = read_config("address") or "127.0.0.1"
local port = read_config("port") or 5565
local buf_size = read_config("buf_size") or 1024 * 32
local ssl_params = read_config("ssl_params")
local ssl_ctx = nil
local ssl = nil
if ssl_params then
ssl = require "ssl"
ssl_ctx = assert(ssl.newcontext(ssl_params))
end
local server = assert(socket.bind(address, port))
server:settimeout(0)
local threads = {}
local sockets = {server}
local is_running = is_running
local function handle_client(client, caddr, cport)
local found, consumed, need = false, 0, buf_size
local hsr = create_stream_reader(string.format("%s:%d -> %s:%d", caddr, cport, address, port))
client:settimeout(0)
while client do
local buf, err, partial = client:receive(need)
if partial then buf = partial end
if not buf then break end
repeat
found, consumed, need = hsr:find_message(buf)
if found then inject_message(hsr) end
buf = nil
until not found
if err == "closed" then break end
coroutine.yield()
end
end
function process_message()
while is_running() do
local ready = socket.select(sockets, nil, 1)
if ready then
for _, s in ipairs(ready) do
if s == server then
local client = s:accept()
if client then
local caddr, cport = client:getpeername()
if not caddr then
caddr = "unknown"
cport = 0
end
if ssl_ctx then
client = ssl.wrap(client, ssl_ctx)
client:dohandshake()
end
sockets[#sockets + 1] = client
threads[client] = coroutine.create(
function() handle_client(client, caddr, cport) end)
end
else
if threads[s] then
local status = coroutine.resume(threads[s])
if not status then
s:close()
for i = #sockets, 2, -1 do
if s == sockets[i] then
table.remove(sockets, i)
break
end
end
threads[s] = nil
end
end
end
end
end
end
return 0
end
|