File: heka_tcp.lua

package info (click to toggle)
lua-sandbox-extensions 0~git20161128-1
  • links: PTS, VCS
  • area: main
  • in suites: stretch
  • size: 2,596 kB
  • ctags: 1,458
  • sloc: ansic: 4,402; cpp: 2,102; makefile: 8
file content (119 lines) | stat: -rw-r--r-- 3,624 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
-- This Source Code Form is subject to the terms of the Mozilla Public
-- License, v. 2.0. If a copy of the MPL was not distributed with this
-- file, You can obtain one at http://mozilla.org/MPL/2.0/.

--[[
# Heka Compatible TCP Input

## Sample Configuration
```lua
filename = "heka_tcp.lua"
instruction_limit = 0

-- address (string) - an IP address (* for all interfaces)
-- Default:
-- address = "127.0.0.1"

-- port (integer) - IP port to listen on (ignored for UNIX socket)
-- Default:
-- port = 5565

-- Size of the read chunks
-- Default:
-- buf_size = 1024 * 32

ssl_params = {
  mode = "server",
  protocol = "tlsv1",
  key = "/etc/hindsight/certs/serverkey.pem",
  certificate = "/etc/hindsight/certs/server.pem",
  cafile = "/etc/hindsight/certs/CA.pem",
  verify = {"peer", "fail_if_no_peer_cert"},
  options = {"all", "no_sslv3"}
}
```
--]]

require "coroutine"
local socket = require "socket"
require "string"
require "table"

local address    = read_config("address") or "127.0.0.1"
local port       = read_config("port") or 5565
local buf_size   = read_config("buf_size") or 1024 * 32
local ssl_params = read_config("ssl_params")

local ssl_ctx = nil
local ssl = nil
if ssl_params then
    ssl = require "ssl"
    ssl_ctx = assert(ssl.newcontext(ssl_params))
end
local server = assert(socket.bind(address, port))
server:settimeout(0)
local threads = {}
local sockets = {server}
local is_running = is_running

local function handle_client(client, caddr, cport)
    local found, consumed, need = false, 0, buf_size
    local hsr = create_stream_reader(string.format("%s:%d -> %s:%d", caddr, cport, address, port))
    client:settimeout(0)
    while client do
        local buf, err, partial = client:receive(need)
        if partial then buf = partial end
        if not buf then break end

        repeat
            found, consumed, need = hsr:find_message(buf)
            if found then inject_message(hsr) end
            buf = nil
        until not found

        if err == "closed" then break end
        coroutine.yield()
    end
end

function process_message()
    while is_running() do
        local ready = socket.select(sockets, nil, 1)
        if ready then
            for _, s in ipairs(ready) do
                if s == server then
                    local client = s:accept()
                    if client then
                        local caddr, cport = client:getpeername()
                        if not caddr then
                            caddr = "unknown"
                            cport = 0
                        end
                        if ssl_ctx then
                            client = ssl.wrap(client, ssl_ctx)
                            client:dohandshake()
                        end
                        sockets[#sockets + 1] = client
                        threads[client] = coroutine.create(
                            function() handle_client(client, caddr, cport) end)
                    end
                else
                    if threads[s] then
                        local status = coroutine.resume(threads[s])
                        if not status then
                            s:close()
                            for i = #sockets, 2, -1 do
                                if s == sockets[i] then
                                    table.remove(sockets, i)
                                    break
                                end
                            end
                            threads[s] = nil
                        end
                    end
                end
            end
        end
    end
    return 0
end