1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149
|
require 'uri'
require 'cgi'
require 'net/http'
require 'net/https'
module InfluxDB
# rubocop:disable Metrics/MethodLength
# rubocop:disable Metrics/AbcSize
module HTTP # :nodoc:
def get(url, options = {})
connect_with_retry do |http|
response = do_request http, Net::HTTP::Get.new(url)
case response
when Net::HTTPSuccess
handle_successful_response(response, options)
when Net::HTTPUnauthorized
raise InfluxDB::AuthenticationError, response.body
else
resolve_error(response.body)
end
end
end
def post(url, data)
headers = { "Content-Type" => "application/octet-stream" }
connect_with_retry do |http|
response = do_request http, Net::HTTP::Post.new(url, headers), data
case response
when Net::HTTPNoContent
return response
when Net::HTTPUnauthorized
raise InfluxDB::AuthenticationError, response.body
else
resolve_error(response.body)
end
end
end
private
def connect_with_retry
host = config.next_host
delay = config.initial_delay
retry_count = 0
begin
http = build_http(host, config.port)
http.open_timeout = config.open_timeout
http.read_timeout = config.read_timeout
http = setup_ssl(http)
yield http
rescue *InfluxDB::NON_RECOVERABLE_EXCEPTIONS => e
raise InfluxDB::ConnectionError, InfluxDB::NON_RECOVERABLE_MESSAGE
rescue Timeout::Error, *InfluxDB::RECOVERABLE_EXCEPTIONS => e
retry_count += 1
unless (config.retry == -1 || retry_count <= config.retry) && !stopped?
raise InfluxDB::ConnectionError, "Tried #{retry_count - 1} times to reconnect but failed."
end
log(:warn) { "Failed to contact host #{host}: #{e.inspect} - retrying in #{delay}s." }
sleep delay
delay = [config.max_delay, delay * 2].min
retry
ensure
http.finish if http.started?
end
end
def do_request(http, req, data = nil)
req.basic_auth config.username, config.password if basic_auth?
req.body = data if data
http.request(req)
end
def basic_auth?
config.auth_method == 'basic_auth'
end
def resolve_error(response)
case response
when /Couldn\'t find series/
raise InfluxDB::SeriesNotFound, response
else
raise InfluxDB::Error, response
end
end
def handle_successful_response(response, options)
if options.fetch(:json_streaming, false)
parsed_response = response.body.each_line.with_object({}) do |line, parsed|
parsed.merge!(JSON.parse(line)) { |_key, oldval, newval| oldval + newval }
end
elsif (body = response.body) && (body != "")
parsed_response = JSON.parse(response.body)
end
errors = errors_from_response(parsed_response)
raise InfluxDB::QueryError, errors if errors
options.fetch(:parse, false) ? parsed_response : response
end
def errors_from_response(parsed_resp)
return unless parsed_resp.is_a?(Hash)
parsed_resp
.fetch('results', [])
.fetch(0, {})
.fetch('error', nil)
end
def setup_ssl(http)
http.use_ssl = config.use_ssl
http.verify_mode = OpenSSL::SSL::VERIFY_NONE unless config.verify_ssl
return http unless config.use_ssl
http.cert_store = generate_cert_store
http
end
def generate_cert_store
store = OpenSSL::X509::Store.new
store.set_default_paths
if config.ssl_ca_cert
if File.directory?(config.ssl_ca_cert)
store.add_path(config.ssl_ca_cert)
else
store.add_file(config.ssl_ca_cert)
end
end
store
end
# Builds an http instance, taking into account any configured
# proxy configuration
def build_http(host, port)
if config.proxy_addr
Net::HTTP.new(host, port, config.proxy_addr, config.proxy_port)
else
Net::HTTP.new(host, port)
end
end
end
# rubocop:enable Metrics/MethodLength
# rubocop:enable Metrics/AbcSize
end
|