File: http.rb

package info (click to toggle)
ruby-influxdb 0.8.1-2
  • links: PTS, VCS
  • area: main
  • in suites: bookworm
  • size: 424 kB
  • sloc: ruby: 3,530; sh: 61; makefile: 7
file content (149 lines) | stat: -rw-r--r-- 4,205 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
require 'uri'
require 'cgi'
require 'net/http'
require 'net/https'

module InfluxDB
  # rubocop:disable Metrics/MethodLength
  # rubocop:disable Metrics/AbcSize
  module HTTP # :nodoc:
    def get(url, options = {})
      connect_with_retry do |http|
        response = do_request http, Net::HTTP::Get.new(url)
        case response
        when Net::HTTPSuccess
          handle_successful_response(response, options)
        when Net::HTTPUnauthorized
          raise InfluxDB::AuthenticationError, response.body
        else
          resolve_error(response.body)
        end
      end
    end

    def post(url, data)
      headers = { "Content-Type" => "application/octet-stream" }
      connect_with_retry do |http|
        response = do_request http, Net::HTTP::Post.new(url, headers), data

        case response
        when Net::HTTPNoContent
          return response
        when Net::HTTPUnauthorized
          raise InfluxDB::AuthenticationError, response.body
        else
          resolve_error(response.body)
        end
      end
    end

    private

    def connect_with_retry
      host = config.next_host
      delay = config.initial_delay
      retry_count = 0

      begin
        http = build_http(host, config.port)
        http.open_timeout = config.open_timeout
        http.read_timeout = config.read_timeout

        http = setup_ssl(http)
        yield http
      rescue *InfluxDB::NON_RECOVERABLE_EXCEPTIONS => e
        raise InfluxDB::ConnectionError, InfluxDB::NON_RECOVERABLE_MESSAGE
      rescue Timeout::Error, *InfluxDB::RECOVERABLE_EXCEPTIONS => e
        retry_count += 1
        unless (config.retry == -1 || retry_count <= config.retry) && !stopped?
          raise InfluxDB::ConnectionError, "Tried #{retry_count - 1} times to reconnect but failed."
        end

        log(:warn) { "Failed to contact host #{host}: #{e.inspect} - retrying in #{delay}s." }
        sleep delay
        delay = [config.max_delay, delay * 2].min
        retry
      ensure
        http.finish if http.started?
      end
    end

    def do_request(http, req, data = nil)
      req.basic_auth config.username, config.password if basic_auth?
      req.body = data if data
      http.request(req)
    end

    def basic_auth?
      config.auth_method == 'basic_auth'
    end

    def resolve_error(response)
      case response
      when /Couldn\'t find series/
        raise InfluxDB::SeriesNotFound, response
      else
        raise InfluxDB::Error, response
      end
    end

    def handle_successful_response(response, options)
      if options.fetch(:json_streaming, false)
        parsed_response = response.body.each_line.with_object({}) do |line, parsed|
          parsed.merge!(JSON.parse(line)) { |_key, oldval, newval| oldval + newval }
        end
      elsif (body = response.body) && (body != "")
        parsed_response = JSON.parse(response.body)
      end

      errors = errors_from_response(parsed_response)
      raise InfluxDB::QueryError, errors if errors

      options.fetch(:parse, false) ? parsed_response : response
    end

    def errors_from_response(parsed_resp)
      return unless parsed_resp.is_a?(Hash)

      parsed_resp
        .fetch('results', [])
        .fetch(0, {})
        .fetch('error', nil)
    end

    def setup_ssl(http)
      http.use_ssl = config.use_ssl
      http.verify_mode = OpenSSL::SSL::VERIFY_NONE unless config.verify_ssl

      return http unless config.use_ssl

      http.cert_store = generate_cert_store
      http
    end

    def generate_cert_store
      store = OpenSSL::X509::Store.new
      store.set_default_paths
      if config.ssl_ca_cert
        if File.directory?(config.ssl_ca_cert)
          store.add_path(config.ssl_ca_cert)
        else
          store.add_file(config.ssl_ca_cert)
        end
      end
      store
    end

    # Builds an http instance, taking into account any configured
    # proxy configuration
    def build_http(host, port)
      if config.proxy_addr
        Net::HTTP.new(host, port, config.proxy_addr, config.proxy_port)
      else
        Net::HTTP.new(host, port)
      end
    end
  end
  # rubocop:enable Metrics/MethodLength
  # rubocop:enable Metrics/AbcSize
end