File: stream_shard_executor.rb

package info (click to toggle)
groonga 15.2.1%2Bdfsg-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 171,500 kB
  • sloc: ansic: 772,536; cpp: 51,530; ruby: 40,538; javascript: 10,250; yacc: 7,045; sh: 5,622; python: 2,821; makefile: 1,677
file content (265 lines) | stat: -rw-r--r-- 7,948 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
module Groonga
  module Sharding
    class StreamShardExecutor
      # Derived class must imprement methods below.
      # - execute_filter(range_index)
      # - find_range_index

      include Loggable

      attr_reader :shard
      attr_writer :previous_executor
      attr_writer :next_executor
      def initialize(command_name, context, shard, shard_range)
        @command_name = command_name
        @context = context
        @shard = shard
        @shard_range = shard_range

        @shard_key = shard.key
        @target_table = @shard.table

        @filter = @context.filter
        @post_filter = @context.post_filter
        @filtered_result_sets = []
        @temporary_tables = @context.temporary_tables

        @target_range = @context.enumerator.target_range

        @cover_type = @target_range.cover_type(@shard_range)

        @previous_executor = nil
        @next_executor = nil

        @range_index = nil
        @window = nil

        @prepared = false
        @filtered = false
      end

      private
      def have_record?
        return false if @cover_type == :none
        return false if @target_table.empty?
        true
      end

      def each_result_set
        if @window
          @filtered_result_sets.each do |result_set|
            @window.each(result_set) do |windowed_result_set|
              @temporary_tables << windowed_result_set
              if @context.dynamic_columns.have_filtered?
                apply_targets = [[windowed_result_set]]
                @context.dynamic_columns.apply_filtered(apply_targets)
              end
              unless @post_filter.nil?
                windowed_result_set = apply_post_filter(windowed_result_set)
                @temporary_tables << windowed_result_set
              end
              yield(windowed_result_set)
            end
          end
        else
          apply_filtered_dynamic_columns
          @filtered_result_sets.each do |result_set|
            unless @post_filter.nil?
              result_set = apply_post_filter(result_set)
              @temporary_tables << result_set
            end
            yield(result_set)
          end
        end
      end

      def add_initial_stage_context(apply_targets)
        ensure_prepared
        return unless @initial_table
        apply_targets << [@initial_table, { context: true }]
      end

      def add_filtered_stage_context(apply_targets)
        ensure_filtered
        @filtered_result_sets.each do |table|
          apply_targets << [table, { context: true }]
        end
      end

      def ensure_prepared
        return if @prepared
        @prepared = true

        return unless have_record?

        if @shard_key.nil?
          message = "[#{@command_name}] shard_key doesn't exist: " +
                    "<#{@shard.key_name}>"
          raise InvalidArgument, message
        end

        @expression_builder = RangeExpressionBuilder.new(@shard_key,
                                                         @target_range)
        @expression_builder.filter = @filter
        @range_index = find_range_index

        if @context.dynamic_columns.have_initial?
          if @cover_type == :all
            @target_table = @target_table.select_all
          else
            @expression_builder.filter = nil
            @target_table = create_expression(@target_table) do |expression|
              @expression_builder.build(expression, @shard_range)
              @target_table.select(expression)
            end
            @expression_builder.filter = @filter
            @cover_type = :all
          end
          @temporary_tables << @target_table
          @initial_table = @target_table
        end

        @window = detect_window
      end

      def find_range_index_raw(use_reason, line, method)
        index_info = @shard_key.find_index(Operator::LESS)
        if index_info.nil?
          log_use_range_index(false, "no range index", __LINE__, __method__)
          return nil
        end

        index = index_info.index
        @context.referred_objects << index
        log_use_range_index(true, use_reason, line, method)
        index
      end

      def ensure_filtered
        return if @filtered

        @filtered = true

        ensure_prepared
        return unless have_record?

        if @context.dynamic_columns.have_initial?
          apply_targets = []
          if @previous_executor
            @previous_executor.add_initial_stage_context(apply_targets)
          end
          apply_targets << [@target_table]
          if @next_executor
            @next_executor.add_initial_stage_context(apply_targets)
          end
          @context.dynamic_columns.apply_initial(apply_targets)
        end

        execute_filter(@range_index)

        return unless @context.need_look_ahead?

        apply_targets = []
        @filtered_result_sets = @filtered_result_sets.collect do |result_set|
          if result_set == @shard.table
            result_set = result_set.select_all
            @temporary_tables << result_set
          end
          apply_targets << [result_set]
          result_set
        end
        @context.dynamic_columns.apply_filtered(apply_targets,
                                                window_function: false)
      end

      def create_expression(table)
        expression = Expression.create(table)
        begin
          yield(expression)
        ensure
          expression.close
        end
      end

      def log_use_range_index(use, reason, line, method)
        message = "[#{@command_name}]"
        if use
          message << "[range-index] "
        else
          message << "[select] "
        end
        message << "<#{@shard.table_name}>: "
        message << reason
        logger.log(Logger::Level::DEBUG,
                   __FILE__,
                   line,
                   method.to_s,
                   message)
      end

      def apply_post_filter(table)
        create_expression(table) do |expression|
          expression.parse(@post_filter)
          table.select(expression)
        end
      end

      def filter_table
        table = @target_table
        create_expression(table) do |expression|
          yield(expression)
          result_set = table.select(expression)
          @temporary_tables << result_set
          add_filtered_result_set(result_set)
        end
      end

      def add_filtered_result_set(result_set)
        return if result_set.empty?
        @filtered_result_sets << result_set
      end

      def apply_filtered_dynamic_columns
        return unless @context.dynamic_columns.have_filtered?

        apply_targets = []
        if @previous_executor
          @previous_executor.add_filtered_stage_context(apply_targets)
        end
        @filtered_result_sets = @filtered_result_sets.collect do |result_set|
          if result_set == @shard.table
            result_set = result_set.select_all
            @temporary_tables << result_set
          end
          apply_targets << [result_set]
          result_set
        end
        if @next_executor
          @next_executor.add_filtered_stage_context(apply_targets)
        end
        options = {}
        if @context.need_look_ahead?
          options[:normal] = false
        end
        @context.dynamic_columns.apply_filtered(apply_targets, options)
      end

      def detect_window
        # TODO: return nil if result_set is small enough
        tag = "[#{@command_name}]"
        windows = []
        @context.time_classify_types.each do |type|
          case type
          when "minute", "second"
            windows << Window.new(@context, @shard, @shard_range, :hour, 1, tag)
          when "day", "hour"
            unless @shard_range.is_a?(LogicalEnumerator::DayShardRange)
              windows << Window.new(@context, @shard, @shard_range, :day, 1, tag)
            end
          end
        end
        windows.max
      end
    end
  end
end