File: emitters.rb

package info (click to toggle)
ruby-snowplow-tracker 0.8.0-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 176 kB
  • sloc: ruby: 515; makefile: 4
file content (465 lines) | stat: -rw-r--r-- 17,622 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
# Copyright (c) 2013-2021 Snowplow Analytics Ltd. All rights reserved.
#
# This program is licensed to you under the Apache License Version 2.0,
# and you may not use this file except in compliance with the Apache License Version 2.0.
# You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0.
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the Apache License Version 2.0 is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the Apache License Version 2.0 for the specific language governing permissions and limitations there under.

# Author:: Snowplow Analytics Ltd
# Copyright:: Copyright (c) 2013-2021 Snowplow Analytics Ltd
# License:: Apache License Version 2.0


require 'net/https'
require 'set'
require 'logger'

module SnowplowTracker
  # @see Emitter
  # For logging Emitter activity messages
  LOGGER = Logger.new(STDERR)
  LOGGER.level = Logger::INFO

  # This class sends events to the event collector. All {Tracker}s must have at
  # least one associated Emitter or the subclass AsyncEmitter.
  #
  # The network settings are defined as part of the Emitter initalization. This
  # table displays the default Emitter settings:
  #
  # | Property | Default setting |
  # | --- | --- |
  # | Protocol | HTTP |
  # | Method | GET |
  # | Buffer size | 1 |
  # | Path | `/i` |
  #
  # The buffer size is the number of events which will be buffered before they
  # are all sent simultaneously. The process of sending all buffered events is
  # called "flushing". The default buffer size is 1 because GET requests can
  # only contain one event.
  #
  # If you choose to use POST requests, the buffer_size defaults to 10, and the
  # buffered events are all sent together in a single request. The default path
  # is '/com.snowplowanalytics.snowplow/tp2' for Emitters using POST.
  #
  # # Logging
  # Emitters log their activity to STDERR by default, using the Ruby standard
  # library Logger class. A different logger can be configured during Emitter
  # initialization. For example, to disable logging, you could provide
  # `Logger.new(IO::NULL)` in the options hash.
  #
  # By default, only messages with priority "INFO" or higher will be logged.
  # This can be changed at any time for the default logger, which is saved as a
  # module constant (`LOGGER = Logger.new(STDERR)`). If you are not using the
  # default logger, set the message level before initializing your Emitter.
  #
  # @see https://ruby-doc.org/stdlib-2.7.2/libdoc/logger/rdoc/Logger.html Logger documentation
  #
  # @example Changing the logger message level.
  #   require 'logger'
  #   SnowplowTracker::LOGGER.level = Logger::DEBUG
  class Emitter
    # Default Emitter settings
    DEFAULT_CONFIG = {
      protocol: 'http',
      method: 'get'
    }

    # @private
    attr_reader :logger

    # Create a new Emitter instance. The endpoint is required.
    #
    # @example Initializing an Emitter with all the possible extra configuration.
    #   success_callback = ->(success_count) { puts "#{success_count} events sent successfully" }
    #   failure_callback = ->(success_count, failures) do
    #     puts "#{success_count} events sent successfully, #{failures.size} sent unsuccessfully"
    #   end
    #
    #   SnowplowTracker::Emitter.new(endpoint: 'collector.example.com',
    #               options: { path: '/my-pipeline/1',
    #                          protocol: 'https',
    #                          port: 443,
    #                          method: 'post',
    #                          buffer_size: 5,
    #                          on_success: success_callback,
    #                          on_failure: failure_callback,
    #                          logger: Logger.new(STDOUT) })
    #
    # The options hash can have any of these optional parameters:
    #
    # | Parameter | Description | Type |
    # | --- | --- | --- |
    # | path | Override the default path for appending to the endpoint | String |
    # | protocol | 'http' or 'https' | String |
    # | port | The port for the connection | Integer |
    # | method | 'get' or 'post' | String |
    # | buffer_size | Number of events to send at once | Integer |
    # | on_success | A method to call if events were sent successfully | Method |
    # | on_failure | A method to call if events did not send | Method |
    # | thread_count | Number of threads to use | Integer |
    # | logger | Log somewhere other than STDERR | Logger |
    #
    # Note that `thread_count` is relevant only to the subclass {AsyncEmitter},
    # and will be ignored if provided to an Emitter.
    #
    # If you choose to use HTTPS, we recommend using port 443.
    #
    # Only 2xx and 3xx status codes are considered successes.
    #
    # The `on_success` callback should accept one argument: the number of
    # requests sent this way. The `on_failure` callback should accept two
    # arguments: the number of successfully sent events, and an array containing
    # the unsuccessful events.
    #
    # @param endpoint [String] the endpoint to send the events to
    # @param options [Hash] allowed configuration options
    #
    # @see AsyncEmitter#initialize
    # @api public
    def initialize(endpoint:, options: {})
      config = DEFAULT_CONFIG.merge(options)
      @lock = Monitor.new
      path = confirm_path(config)
      @collector_uri = create_collector_uri(endpoint, config[:protocol], config[:port], path)
      @buffer = []
      @buffer_size = confirm_buffer_size(config)
      @method = config[:method]
      @on_success = config[:on_success]
      @on_failure = config[:on_failure]
      @logger = config[:logger] || LOGGER
      logger.info("#{self.class} initialized with endpoint #{@collector_uri}")
    end

    # Creates the `@buffer_size` variable during initialization. Unless
    # otherwise defined, it's 1 for Emitters using GET and 10 for Emitters using
    # POST requests.
    # @private
    def confirm_buffer_size(config)
      return config[:buffer_size] unless config[:buffer_size].nil?

      config[:method] == 'get' ? 1 : 10
    end

    # Creates the `@path` variable during initialization. Allows a non-standard
    # path to be provided.
    # @private
    def confirm_path(config)
      return config[:path] unless config[:path].nil?

      config[:method] == 'get' ? '/i' : '/com.snowplowanalytics.snowplow/tp2'
    end

    # Creates the `@collector_uri` variable during initialization.
    # The default is "http://{endpoint}/i".
    # @private
    def create_collector_uri(endpoint, protocol, port, path)
      port_string = port.nil? ? '' : ":#{port}"

      "#{protocol}://#{endpoint}#{port_string}#{path}"
    end

    # Add an event to the buffer and flush it if maximum size has been reached.
    # This method is not required for standard Ruby tracker usage. A {Tracker}
    # privately calls this method once the event payload is ready to send.
    #
    # We have included it as part of the public API for its possible use in the
    # `on_failure` callback. This is the optional method, provided in the
    # `options` Emitter initalization hash, that is called when events fail
    # to send. You could use {#input} as part of your callback to immediately
    # retry the failed event.
    #
    # The `on_failure` callback should accept two arguments: the number of
    # successfully sent events, and an array containing the unsuccessful events.
    #
    # @example A possible `on_failure` method using `#input`
    #   def retry_on_failure(failed_event_count, failed_events)
    #     # possible backoff-and-retry timeout here
    #     failed_events.each do |event|
    #       my_emitter.input(event)
    #     end
    #   end
    #
    # @api public
    def input(payload)
      payload.each { |k, v| payload[k] = v.to_s }
      @lock.synchronize do
        @buffer.push(payload)
        flush if @buffer.size >= @buffer_size
      end

      nil
    end

    # Flush the Emitter, forcing it to send all the events in its
    # buffer, even if the buffer is not full. {Emitter} objects, unlike
    # {AsyncEmitter}s, can only `flush` synchronously. A {Tracker} can manually flush all
    # its Emitters by calling {Tracker#flush}, part of the public API which
    # calls this method.
    #
    # The unused async parameter here is to avoid ArgumentError, since
    # {AsyncEmitter#flush} does take an argument.
    #
    # @see AsyncEmitter#flush
    # @private
    def flush(_async = true)
      @lock.synchronize do
        send_requests(@buffer)
        @buffer = []
      end

      nil
    end

    # Send all events in the buffer to the collector
    # @private
    def send_requests(events)
      if events.empty?
        logger.info('Skipping sending events since buffer is empty')
        return
      end

      logger.info("Attempting to send #{events.size} request#{events.size == 1 ? '' : 's'}")

      events.each do |event|
        # add the sent timestamp, overwrite if already exists
        event['stm'] = Timestamp.create.to_s
      end

      if @method == 'post'
        send_requests_with_post(events)
      elsif @method == 'get'
        send_requests_with_get(events)
      end

      nil
    end

    # Part of {#send_requests}.
    # @private
    def send_requests_with_post(events)
      post_succeeded = false
      begin
        request = http_post(SelfDescribingJson.new(
          'iglu:com.snowplowanalytics.snowplow/payload_data/jsonschema/1-0-4',
          events
        ).to_json)
        post_succeeded = good_status_code?(request.code)
      rescue StandardError => standard_error
        logger.warn(standard_error)
      end

      if post_succeeded
        @on_success.call(events.size) unless @on_success.nil?
      else
        @on_failure.call(0, events) unless @on_failure.nil?
      end

      nil
    end

    # Part of {#send_requests}.
    # @private
    def send_requests_with_get(events)
      success_count = 0
      unsent_requests = []

      events.each do |event|
        request = process_get_event(event)
        request ? success_count += 1 : unsent_requests << event
      end

      if unsent_requests.size.zero?
        @on_success.call(success_count) unless @on_success.nil?
      else
        @on_failure.call(success_count, unsent_requests) unless @on_failure.nil?
      end

      nil
    end

    # Part of {#send_requests_with_get}.
    # @private
    def process_get_event(event)
      get_succeeded = false
      begin
        request = http_get(event)
        get_succeeded = good_status_code?(request.code)
      rescue StandardError => standard_error
        logger.warn(standard_error)
      end
      get_succeeded
    end

    # Part of {#process_get_event}. This sends a GET request.
    # @private
    def http_get(payload)
      destination = URI(@collector_uri + '?' + URI.encode_www_form(payload))
      logger.info("Sending GET request to #{@collector_uri}...")
      logger.debug("Payload: #{payload}")
      http = Net::HTTP.new(destination.host, destination.port)
      request = Net::HTTP::Get.new(destination.request_uri)
      http.use_ssl = true if destination.scheme == 'https'
      response = http.request(request)
      logger.add(good_status_code?(response.code) ? Logger::INFO : Logger::WARN) do
        "GET request to #{@collector_uri} finished with status code #{response.code}"
      end

      response
    end

    # Part of {#send_requests_with_post}. This sends a POST request.
    # @private
    def http_post(payload)
      logger.info("Sending POST request to #{@collector_uri}...")
      logger.debug("Payload: #{payload}")
      destination = URI(@collector_uri)
      http = Net::HTTP.new(destination.host, destination.port)
      request = Net::HTTP::Post.new(destination.request_uri)
      http.use_ssl = true if destination.scheme == 'https'
      request.body = payload.to_json
      request.set_content_type('application/json; charset=utf-8')
      response = http.request(request)
      logger.add(good_status_code?(response.code) ? Logger::INFO : Logger::WARN) do
        "POST request to #{@collector_uri} finished with status code #{response.code}"
      end

      response
    end

    # Check if the response is good.
    # Only 2xx and 3xx status codes are considered successes.
    # @private
    def good_status_code?(status_code)
      status_code.to_i >= 200 && status_code.to_i < 400
    end

    private :create_collector_uri,
            :http_get,
            :http_post
  end

  # This {Emitter} subclass provides asynchronous event sending. Whenever the
  # buffer is flushed, the AsyncEmitter places the flushed events in a work
  # queue. The AsyncEmitter asynchronously sends events in this queue using a
  # thread pool of a fixed size. The size of the thread pool is 1 by default,
  # but can be configured as part of the options hash during initialization.
  #
  # @see Emitter
  # @api public
  class AsyncEmitter < Emitter
    # Create a new AsyncEmitter object. The endpoint is required.
    #
    # @example Initializing an AsyncEmitter with all the possible extra configuration.
    #   success_callback = ->(success_count) { puts "#{success_count} events sent successfully" }
    #   failure_callback = ->(success_count, failures) do
    #     puts "#{success_count} events sent successfully, #{failures.size} sent unsuccessfully"
    #   end
    #
    #   SnowplowTracker::Emitter.new(endpoint: 'collector.example.com',
    #               options: { path: '/my-pipeline/1',
    #                          protocol: 'https',
    #                          port: 443,
    #                          method: 'post',
    #                          buffer_size: 5,
    #                          on_success: success_callback,
    #                          on_failure: failure_callback,
    #                          logger: Logger.new(STDOUT),
    #                          thread_count: 5 })
    #
    # The options hash can have any of these optional parameters:
    #
    # | Parameter | Description | Type |
    # | --- | --- | --- |
    # | path | Override the default path for appending to the endpoint | String |
    # | protocol | 'http' or 'https' | String |
    # | port | The port for the connection | Integer |
    # | method | 'get' or 'post' | String |
    # | buffer_size | Number of events to send at once | Integer |
    # | on_success | A function to call if events were sent successfully | Function |
    # | on_failure | A function to call if events did not send | Function |
    # | thread_count | Number of threads to use | Integer |
    # | logger | Log somewhere other than STDERR | Logger |
    #
    # The `thread_count` determines the number of worker threads which will be
    # used to send events.
    #
    # If you choose to use HTTPS, we recommend using port 443.
    #
    # Only 2xx and 3xx status codes are considered successes.
    #
    # The `on_success` callback should accept one argument: the number of
    # requests sent this way. The `on_failure` callback should accept two
    # arguments: the number of successfully sent events, and an array containing
    # the unsuccessful events.
    #
    # @note if you test the AsyncEmitter by using a short script to send an
    #   event, you may find that the event fails to send. This is because the
    #   process exits before the flushing thread is finished. You can get round
    #   this either by adding a sleep(10) to the end of your script or by using
    #   the synchronous flush.
    #
    # @param endpoint [String] the endpoint to send the events to
    # @param options [Hash] allowed configuration options
    #
    # @see Emitter#initialize
    # @api public
    def initialize(endpoint:, options: {})
      @queue = Queue.new
      # @all_processed_condition and @results_unprocessed are used to emulate Python's Queue.task_done()
      @queue.extend(MonitorMixin)
      @all_processed_condition = @queue.new_cond
      @results_unprocessed = 0
      (options[:thread_count] || 1).times { Thread.new { consume } }
      super(endpoint: endpoint, options: options)
    end

    # AsyncEmitters use the MonitorMixin module, which provides the
    # `synchronize` and `broadcast` methods.
    # @private
    def consume
      loop do
        work_unit = @queue.pop
        send_requests(work_unit)
        @queue.synchronize do
          @results_unprocessed -= 1
          @all_processed_condition.broadcast
        end
      end
    end

    # Flush the Emitter, forcing it to send all the events in its buffer, even
    # if the buffer is not full.
    #
    # If `async` is true (the default), events are sent even if the queue is not
    # empty. If `async` is false, it blocks until all queued events have been
    # sent. Note that this method can be called by public API method
    # {Tracker#flush}, which has a default of `async` being false.
    #
    # @param async [Bool] whether to flush asynchronously or not
    #
    # @see Emitter#flush
    # @private
    def flush(async = true)
      loop do
        @lock.synchronize do
          @queue.synchronize { @results_unprocessed += 1 }
          @queue << @buffer
          @buffer = []
        end
        unless async
          logger.info('Starting synchronous flush')
          @queue.synchronize do
            @all_processed_condition.wait_while { @results_unprocessed > 0 }
            logger.info('Finished synchronous flush')
          end
        end
        break if @buffer.empty?
      end
    end
  end
end