File: logical_count.rb

package info (click to toggle)
groonga 15.2.1%2Bdfsg-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 171,500 kB
  • sloc: ansic: 772,536; cpp: 51,530; ruby: 40,538; javascript: 10,250; yacc: 7,045; sh: 5,622; python: 2,821; makefile: 1,677
file content (191 lines) | stat: -rw-r--r-- 5,937 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
module Groonga
  module Sharding
    class LogicalCountCommand < Command
      register("logical_count",
               [
                 "logical_table",
                 "shard_key",
                 "min",
                 "min_border",
                 "max",
                 "max_border",
                 "filter",
                 "post_filter",
               ])

      def run_body(input)
        context = ShardCountContext.new(input)
        begin
          counter = Counter.new(context)
          total = counter.execute
          query_logger.log(:size, ":", "count(#{total})")
          writer.write(total)
        ensure
          context.close
        end
      end

      private
      def cache_key(input)
        key = "logical_count\0"
        key << "#{input[:logical_table]}\0"
        key << "#{input[:shard_key]}\0"
        key << "#{input[:min]}\0"
        key << "#{input[:min_border]}\0"
        key << "#{input[:max]}\0"
        key << "#{input[:max_border]}\0"
        key << "#{input[:filter]}\0"
        key << "#{input[:post_filter]}\0"
        dynamic_columns = DynamicColumns.parse("[logical_count]", input)
        key << dynamic_columns.cache_key
        key
      end

      class ShardCountContext < StreamExecuteContext
        def initialize(input)
          super("logical_count", input)
        end
      end

      class Counter < StreamExecutor
        def initialize(context)
          super(context, ShardCountExecutor)
        end

        def execute
          have_shard = false
          total = 0
          each_shard_executor do |shard_executor|
            have_shard = true
            total += shard_executor.execute
          end
          unless have_shard
            enumerator = @context.enumerator
            message =
              "[logical_count] no shard exists: " +
              "logical_table: <#{enumerator.logical_table}>: " +
              "shard_key: <#{enumerator.shard_key_name}>"
            raise InvalidArgument, message
          end
          total
        end
      end

      class ShardCountExecutor < StreamShardExecutor
        def initialize(context, shard, shard_range)
          super("logical_count", context, shard, shard_range)
        end

        def execute
          ensure_prepared
          if @range_index
            return count_n_records_in_range
          end

          ensure_filtered
          return 0 if @filtered_result_sets.empty?

          total = 0
          each_result_set do |result_set|
            total += result_set.size
          end
          return total
        end

        def find_range_index
          if @filter or @post_filter
            log_use_range_index(false, "need filter",
                                __LINE__, __method__)
            nil
          elsif @cover_type == :all
            log_use_range_index(false, "covered",
                                __LINE__, __method__)
            nil
          else
            find_range_index_raw("range index is available",
                                __LINE__, __method__)
          end
        end

        def execute_filter(range_index)
          if @cover_type == :all and @filter.nil?
            if @post_filter and @context.dynamic_columns.have_filtered?
              filtered_table = @target_table.select_all
              @temporary_tables << filtered_table
              @filtered_result_sets << filtered_table
            else
              @filtered_result_sets << @target_table
            end
          else
            expression = Expression.create(@target_table)
            begin
              expression_builder = RangeExpressionBuilder.new(@shard_key,
                                                              @target_range)
              expression_builder.filter = @filter
              case @cover_type
              when :all
                expression_builder.build_all(expression)
              when :partial_min
                expression_builder.build_partial_min(expression)
              when :partial_max
                expression_builder.build_partial_max(expression)
              when :partial_min_and_max
                expression_builder.build_partial_min_and_max(expression)
              end
              filtered_table = @target_table.select(expression)
            ensure
              expression.close
            end
            @temporary_tables << filtered_table
            @filtered_result_sets << filtered_table
          end
        end

        def count_n_records_in_range
          case @cover_type
          when :partial_min
            min = @target_range.min
            min_border = @target_range.min_border
            max = nil
            max_bordre = nil
          when :partial_max
            min = nil
            min_bordre = nil
            max = @target_range.max
            max_border = @target_range.max_border
          when :partial_min_and_max
            min = @target_range.min
            min_border = @target_range.min_border
            max = @target_range.max
            max_border = @target_range.max_border
          end

          flags = TableCursorFlags::BY_KEY
          case min_border
          when :include
            flags |= TableCursorFlags::GE
          when :exclude
            flags |= TableCursorFlags::GT
          end
          case max_border
          when :include
            flags |= TableCursorFlags::LE
          when :exclude
            flags |= TableCursorFlags::LT
          end

          lexicon = @range_index.table
          @context.referred_objects << lexicon
          TableCursor.open(lexicon,
                           :min => min,
                           :max => max,
                           :flags => flags) do |table_cursor|
            IndexCursor.open(table_cursor, @range_index) do |index_cursor|
              index_cursor.count
            end
          end
        end
      end
    end
  end
end