File: batch.rb

package info (click to toggle)
ruby-influxdb 0.8.1-2
  • links: PTS, VCS
  • area: main
  • in suites: bookworm
  • size: 424 kB
  • sloc: ruby: 3,530; sh: 61; makefile: 7
file content (97 lines) | stat: -rw-r--r-- 2,705 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
module InfluxDB
  module Query
    # Batch collects multiple queries and executes them together.
    #
    # You shouldn't use Batch directly, instead call Client.batch, which
    # constructs a new batch for you.
    class Batch
      attr_reader :client, :statements

      def initialize(client)
        @client     = client
        @statements = []

        yield self if block_given?
      end

      def add(query, params: nil)
        statements << client.builder.build(query.chomp(";"), params)
        statements.size - 1
      end

      def execute(
        denormalize:  config.denormalize,
        chunk_size:   config.chunk_size,
        **opts,
        &block
      )
        return [] if statements.empty?

        url = full_url "/query".freeze, **query_params(statements.join(";"), **opts)
        series = fetch_series get(url, parse: true, json_streaming: !chunk_size.nil?)

        if denormalize
          build_denormalized_result(series, &block)
        else
          build_result(series, &block)
        end
      end

      private

      def build_result(series)
        return series.values unless block_given?

        series.each do |id, statement_results|
          statement_results.each do |s|
            yield id, s["name".freeze], s["tags".freeze], raw_values(s)
          end

          # indicate empty result: yield useful amount of "nothing"
          yield id, nil, {}, [] if statement_results.empty?
        end
      end

      def build_denormalized_result(series)
        return series.map { |_, s| denormalized_series_list(s) } unless block_given?

        series.each do |id, statement_results|
          statement_results.each do |s|
            yield id, s["name".freeze], s["tags".freeze], denormalize_series(s)
          end

          # indicate empty result: yield useful amount of "nothing"
          yield id, nil, {}, [] if statement_results.empty?
        end
      end

      def fetch_series(response)
        response.fetch("results".freeze).each_with_object({}) do |result, list|
          sid = result["statement_id".freeze]
          list[sid] = result.fetch("series".freeze, [])
        end
      end

      # build simple method delegators
      %i[
        config
        full_url
        query_params
        get
        raw_values
        denormalize_series
        denormalized_series_list
      ].each do |method_name|
        if RUBY_VERSION < "2.7"
          define_method(method_name) do |*args|
            client.send method_name, *args
          end
        else
          define_method(method_name) do |*args, **kwargs|
            client.send method_name, *args, **kwargs
          end
        end
      end
    end
  end
end