File: elastic_rest.rb

package info (click to toggle)
puppet-module-voxpupuli-elasticsearch 9.0.0-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 1,496 kB
  • sloc: ruby: 9,906; sh: 392; makefile: 4
file content (286 lines) | stat: -rw-r--r-- 8,687 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
# frozen_string_literal: true

require 'json'
require 'net/http'
require 'openssl'

# Parent class encapsulating general-use functions for children REST-based
# providers.
class Puppet::Provider::ElasticREST < Puppet::Provider
  class << self
    attr_accessor :api_discovery_uri, :api_resource_style, :api_uri, :discrete_resource_creation, :metadata, :metadata_pipeline, :query_string
  end

  # Fetch arbitrary metadata for the class from an instance object.
  #
  # @return String
  def metadata
    self.class.metadata
  end

  # Retrieve the class query_string variable
  #
  # @return String
  def query_string
    self.class.query_string
  end

  # Perform a REST API request against the indicated endpoint.
  #
  # @return Net::HTTPResponse
  def self.rest(http,
                req,
                timeout = 10,
                username = nil,
                password = nil,
                validate_tls: true)

    if username && password
      req.basic_auth username, password
    elsif username || password
      Puppet.warning(
        'username and password must both be defined, skipping basic auth'
      )
    end

    req['Accept'] = 'application/json'

    http.read_timeout = timeout
    http.open_timeout = timeout
    http.verify_mode = OpenSSL::SSL::VERIFY_NONE unless validate_tls

    begin
      http.request req
    rescue EOFError => e
      # Because the provider attempts a best guess at API access, we
      # only fail when HTTP operations fail for mutating methods.
      unless %w[GET OPTIONS HEAD].include? req.method
        raise Puppet::Error,
              "Received '#{e}' from the Elasticsearch API. Are your API settings correct?"
      end
    end
  end

  # Helper to format a remote URL request for Elasticsearch which takes into
  # account path ordering, et cetera.
  def self.format_uri(resource_path, property_flush = {})
    return api_uri if resource_path.nil? || api_resource_style == :bare

    if discrete_resource_creation && !property_flush[:ensure].nil?
      resource_path
    else
      case api_resource_style
      when :prefix
        "#{resource_path}/#{api_uri}"
      else
        "#{api_uri}/#{resource_path}"
      end
    end
  end

  # Fetch Elasticsearch API objects. Accepts a variety of argument functions
  # dictating how to connect to the Elasticsearch API.
  #
  # @return Array
  #   an array of Hashes representing the found API objects, whether they be
  #   templates, pipelines, et cetera.
  def self.api_objects(protocol = 'http',
                       host = 'localhost',
                       port = 9200,
                       timeout = 10,
                       username = nil,
                       password = nil,
                       ca_file = nil,
                       ca_path = nil,
                       validate_tls = true)

    uri = URI("#{protocol}://#{host}:#{port}/#{format_uri(api_discovery_uri)}")
    http = Net::HTTP.new uri.host, uri.port
    req = Net::HTTP::Get.new uri.request_uri

    http.use_ssl = uri.scheme == 'https'
    [[ca_file, :ca_file=], [ca_path, :ca_path=]].each do |arg, method|
      http.send method, arg if arg && http.respond_to?(method)
    end

    response = rest http, req, timeout, username, password, validate_tls: validate_tls

    results = []

    results = process_body(response.body) if response.respond_to?(:code) && response.code.to_i == 200

    results
  end

  # Process the JSON response body
  def self.process_body(body)
    JSON.parse(body).map do |object_name, api_object|
      {
        :name => object_name,
        :ensure => :present,
        metadata => process_metadata(api_object),
        :provider => name
      }
    end
  end

  # Passes API objects through arbitrary Procs/lambdas in order to postprocess
  # API responses.
  def self.process_metadata(raw_metadata)
    if metadata_pipeline.is_a?(Array) && !metadata_pipeline.empty?
      metadata_pipeline.reduce(raw_metadata) do |md, processor|
        processor.call md
      end
    else
      raw_metadata
    end
  end

  # Fetch an array of provider objects from the Elasticsearch API.
  def self.instances
    api_objects.map { |resource| new resource }
  end

  # Unlike a typical #prefetch, which just ties discovered #instances to the
  # correct resources, we need to quantify all the ways the resources in the
  # catalog know about Elasticsearch API access and use those settings to
  # fetch any templates we can before associating resources and providers.
  def self.prefetch(resources)
    # Get all relevant API access methods from the resources we know about
    res = resources.map do |_, resource|
      p = resource.parameters
      [
        p[:protocol].value,
        p[:host].value,
        p[:port].value,
        p[:timeout].value,
        (p.key?(:username) ? p[:username].value : nil),
        (p.key?(:password) ? p[:password].value : nil),
        (p.key?(:ca_file) ? p[:ca_file].value : nil),
        (p.key?(:ca_path) ? p[:ca_path].value : nil),
        (p.key?(:validate_tls) ? p[:validate_tls].value : true),
      ]
      # Deduplicate identical settings, and fetch templates
    end.uniq
    res = res.map do |api|
      api_objects(*api)
      # Flatten and deduplicate the array, instantiate providers, and do the
      # typical association dance
    end
    res.flatten.uniq.map { |resource| new resource }.each do |prov|
      if (resource = resources[prov.name])
        resource.provider = prov
      end
    end
  end

  def initialize(value = {})
    super(value)
    @property_flush = {}
  end

  # Generate a request body
  def generate_body
    JSON.generate(
      if metadata != :content && @property_flush[:ensure] == :present
        { metadata.to_s => resource[metadata] }
      else
        resource[metadata]
      end
    )
  end

  # Call Elasticsearch's REST API to appropriately PUT/DELETE/or otherwise
  # update any managed API objects.
  def flush
    Puppet.debug('Got to flush')
    uri = URI(
      format(
        '%s://%s:%d/%s',
        resource[:protocol],
        resource[:host],
        resource[:port],
        self.class.format_uri(resource[:name], @property_flush)
      )
    )
    uri.query = URI.encode_www_form query_string if query_string

    Puppet.debug("Generated URI = #{uri.inspect}")

    case @property_flush[:ensure]
    when :absent
      req = Net::HTTP::Delete.new uri.request_uri
    else
      req = Net::HTTP::Put.new uri.request_uri
      req.body = generate_body
      Puppet.debug("Generated body looks like: #{req.body.inspect}")
      # As of Elasticsearch 6.x, required when requesting with a payload (so we
      # set it always to be safe)
      req['Content-Type'] = 'application/json' if req['Content-Type'].nil?
    end

    http = Net::HTTP.new uri.host, uri.port
    http.use_ssl = uri.scheme == 'https'
    %i[ca_file ca_path].each do |arg|
      http.send "#{arg}=".to_sym, resource[arg] if !resource[arg].nil? && http.respond_to?(arg)
    end

    response = self.class.rest(
      http,
      req,
      resource[:timeout],
      resource[:username],
      resource[:password],
      validate_tls: resource[:validate_tls]
    )

    # Attempt to return useful error output
    unless response.code.to_i == 200
      Puppet.debug("Non-OK reponse: Body = #{response.body.inspect}")
      json = JSON.parse(response.body)

      err_msg = if json.key? 'error'
                  if json['error'].is_a?(Hash) \
                      && json['error'].key?('root_cause')
                    # Newer versions have useful output
                    json['error']['root_cause'].first['reason']
                  else
                    # Otherwise fallback to old-style error messages
                    json['error']
                  end
                else
                  # As a last resort, return the response error code
                  "HTTP #{response.code}"
                end

      raise Puppet::Error, "Elasticsearch API responded with: #{err_msg}"
    end
    @property_hash = self.class.api_objects(
      resource[:protocol],
      resource[:host],
      resource[:port],
      resource[:timeout],
      resource[:username],
      resource[:password],
      resource[:ca_file],
      resource[:ca_path],
      resource[:validate_tls].nil? ? true : resource[:validate_tls]
    ).find do |t|
      t[:name] == resource[:name]
    end
  end

  # Set this provider's `:ensure` property to `:present`.
  def create
    @property_flush[:ensure] = :present
  end

  def exists?
    @property_hash[:ensure] == :present
  end

  # Set this provider's `:ensure` property to `:absent`.
  def destroy
    @property_flush[:ensure] = :absent
  end
end