1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107
|
module Groonga
module Sharding
class StreamExecuteContext
attr_reader :enumerator
attr_reader :filter
attr_reader :post_filter
attr_reader :dynamic_columns
attr_reader :time_classify_types
attr_reader :order
def initialize(command_name, input)
@command_name = command_name
@input = input
@enumerator = LogicalEnumerator.new(command_name,
@input,
{:unref_immediately => true})
@filter = @input[:filter]
@post_filter = @input[:post_filter]
@dynamic_columns = DynamicColumns.parse("[#{command_name}]",
@input)
@temporary_tables_queue = []
@time_classify_types = detect_time_classify_types
@referred_objects_queue = []
@order = parse_order(@input, :order)
end
def temporary_tables
@temporary_tables_queue.last
end
def referred_objects
@referred_objects_queue.last
end
def push
@temporary_tables_queue << []
@referred_objects_queue << []
end
def shift
temporary_tables = @temporary_tables_queue.shift
temporary_tables.each(&:close)
temporary_tables.clear
referred_objects = @referred_objects_queue.shift
referred_objects.each(&:unref)
referred_objects.clear
end
def close
until @temporary_tables_queue.empty?
shift
end
@enumerator.unref
end
def need_look_ahead?
return false unless @dynamic_columns.have_window_function?
return false unless @time_classify_types.empty?
true
end
def parse_order(input, name)
order = input[name]
return :ascending if order.nil?
case order
when "ascending"
:ascending
when "descending"
:descending
else
message =
"[#{@command_name}] #{name} must be " +
"\"ascending\" or \"descending\": <#{order}>"
raise InvalidArgument, message
end
end
def detect_time_classify_types
window_group_keys = []
@dynamic_columns.each_filtered do |dynamic_column|
window_group_keys.concat(dynamic_column.window_group_keys)
end
return [] if window_group_keys.empty?
types = []
@dynamic_columns.each do |dynamic_column|
next unless window_group_keys.include?(dynamic_column.label)
case dynamic_column.value.strip
when /\Atime_classify_(.+?)\s*\(\s*([a-zA-Z\d_]+)\s*[,)]/
type = $1
column = $2
next if column != @enumerator.shard_key_name
case type
when "minute", "second"
types << type
when "day", "hour"
types << type
end
end
end
types
end
end
end
end
|