1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69
|
-- Copyright (C) Dejiang Zhu(doujiang24)
local response = require "resty.kafka.response"
local to_int32 = response.to_int32
local setmetatable = setmetatable
local tcp = ngx.socket.tcp
local _M = {}
local mt = { __index = _M }
function _M.new(self, host, port, socket_config)
return setmetatable({
host = host,
port = port,
config = socket_config,
}, mt)
end
function _M.send_receive(self, request)
local sock, err = tcp()
if not sock then
return nil, err, true
end
sock:settimeout(self.config.socket_timeout)
local ok, err = sock:connect(self.host, self.port)
if not ok then
return nil, err, true
end
local bytes, err = sock:send(request:package())
if not bytes then
return nil, err, true
end
local data, err = sock:receive(4)
if not data then
if err == "timeout" then
sock:close()
return nil, err
end
return nil, err, true
end
local len = to_int32(data)
local data, err = sock:receive(len)
if not data then
if err == "timeout" then
sock:close()
return nil, err
end
return nil, err, true
end
sock:setkeepalive(self.config.keepalive_timeout, self.config.keepalive_size)
return response:new(data, request.api_version), nil, true
end
return _M
|