File: stream_execute_context.rb

package info (click to toggle)
groonga 15.2.1%2Bdfsg-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 171,500 kB
  • sloc: ansic: 772,536; cpp: 51,530; ruby: 40,538; javascript: 10,250; yacc: 7,045; sh: 5,622; python: 2,821; makefile: 1,677
file content (107 lines) | stat: -rw-r--r-- 3,023 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
module Groonga
  module Sharding
    class StreamExecuteContext
      attr_reader :enumerator
      attr_reader :filter
      attr_reader :post_filter
      attr_reader :dynamic_columns
      attr_reader :time_classify_types
      attr_reader :order
      def initialize(command_name, input)
        @command_name = command_name
        @input = input
        @enumerator = LogicalEnumerator.new(command_name,
                                            @input,
                                            {:unref_immediately => true})
        @filter = @input[:filter]
        @post_filter = @input[:post_filter]

        @dynamic_columns = DynamicColumns.parse("[#{command_name}]",
                                                @input)
        @temporary_tables_queue = []
        @time_classify_types = detect_time_classify_types
        @referred_objects_queue = []
        @order = parse_order(@input, :order)
      end

      def temporary_tables
        @temporary_tables_queue.last
      end

      def referred_objects
        @referred_objects_queue.last
      end

      def push
        @temporary_tables_queue << []
        @referred_objects_queue << []
      end

      def shift
        temporary_tables = @temporary_tables_queue.shift
        temporary_tables.each(&:close)
        temporary_tables.clear
        referred_objects = @referred_objects_queue.shift
        referred_objects.each(&:unref)
        referred_objects.clear
      end

      def close
        until @temporary_tables_queue.empty?
          shift
        end
        @enumerator.unref
      end

      def need_look_ahead?
        return false unless @dynamic_columns.have_window_function?
        return false unless @time_classify_types.empty?
        true
      end

      def parse_order(input, name)
        order = input[name]
        return :ascending if order.nil?

        case order
        when "ascending"
          :ascending
        when "descending"
          :descending
        else
          message =
            "[#{@command_name}] #{name} must be " +
            "\"ascending\" or \"descending\": <#{order}>"
          raise InvalidArgument, message
        end
      end

      def detect_time_classify_types
        window_group_keys = []
        @dynamic_columns.each_filtered do |dynamic_column|
          window_group_keys.concat(dynamic_column.window_group_keys)
        end
        return [] if window_group_keys.empty?

        types = []
        @dynamic_columns.each do |dynamic_column|
          next unless window_group_keys.include?(dynamic_column.label)
          case dynamic_column.value.strip
          when /\Atime_classify_(.+?)\s*\(\s*([a-zA-Z\d_]+)\s*[,)]/
            type = $1
            column = $2
            next if column != @enumerator.shard_key_name

            case type
            when "minute", "second"
              types << type
            when "day", "hour"
              types << type
            end
          end
        end
        types
      end
    end
  end
end