1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191
|
module Groonga
module Sharding
class LogicalCountCommand < Command
register("logical_count",
[
"logical_table",
"shard_key",
"min",
"min_border",
"max",
"max_border",
"filter",
"post_filter",
])
def run_body(input)
context = ShardCountContext.new(input)
begin
counter = Counter.new(context)
total = counter.execute
query_logger.log(:size, ":", "count(#{total})")
writer.write(total)
ensure
context.close
end
end
private
def cache_key(input)
key = "logical_count\0"
key << "#{input[:logical_table]}\0"
key << "#{input[:shard_key]}\0"
key << "#{input[:min]}\0"
key << "#{input[:min_border]}\0"
key << "#{input[:max]}\0"
key << "#{input[:max_border]}\0"
key << "#{input[:filter]}\0"
key << "#{input[:post_filter]}\0"
dynamic_columns = DynamicColumns.parse("[logical_count]", input)
key << dynamic_columns.cache_key
key
end
class ShardCountContext < StreamExecuteContext
def initialize(input)
super("logical_count", input)
end
end
class Counter < StreamExecutor
def initialize(context)
super(context, ShardCountExecutor)
end
def execute
have_shard = false
total = 0
each_shard_executor do |shard_executor|
have_shard = true
total += shard_executor.execute
end
unless have_shard
enumerator = @context.enumerator
message =
"[logical_count] no shard exists: " +
"logical_table: <#{enumerator.logical_table}>: " +
"shard_key: <#{enumerator.shard_key_name}>"
raise InvalidArgument, message
end
total
end
end
class ShardCountExecutor < StreamShardExecutor
def initialize(context, shard, shard_range)
super("logical_count", context, shard, shard_range)
end
def execute
ensure_prepared
if @range_index
return count_n_records_in_range
end
ensure_filtered
return 0 if @filtered_result_sets.empty?
total = 0
each_result_set do |result_set|
total += result_set.size
end
return total
end
def find_range_index
if @filter or @post_filter
log_use_range_index(false, "need filter",
__LINE__, __method__)
nil
elsif @cover_type == :all
log_use_range_index(false, "covered",
__LINE__, __method__)
nil
else
find_range_index_raw("range index is available",
__LINE__, __method__)
end
end
def execute_filter(range_index)
if @cover_type == :all and @filter.nil?
if @post_filter and @context.dynamic_columns.have_filtered?
filtered_table = @target_table.select_all
@temporary_tables << filtered_table
@filtered_result_sets << filtered_table
else
@filtered_result_sets << @target_table
end
else
expression = Expression.create(@target_table)
begin
expression_builder = RangeExpressionBuilder.new(@shard_key,
@target_range)
expression_builder.filter = @filter
case @cover_type
when :all
expression_builder.build_all(expression)
when :partial_min
expression_builder.build_partial_min(expression)
when :partial_max
expression_builder.build_partial_max(expression)
when :partial_min_and_max
expression_builder.build_partial_min_and_max(expression)
end
filtered_table = @target_table.select(expression)
ensure
expression.close
end
@temporary_tables << filtered_table
@filtered_result_sets << filtered_table
end
end
def count_n_records_in_range
case @cover_type
when :partial_min
min = @target_range.min
min_border = @target_range.min_border
max = nil
max_bordre = nil
when :partial_max
min = nil
min_bordre = nil
max = @target_range.max
max_border = @target_range.max_border
when :partial_min_and_max
min = @target_range.min
min_border = @target_range.min_border
max = @target_range.max
max_border = @target_range.max_border
end
flags = TableCursorFlags::BY_KEY
case min_border
when :include
flags |= TableCursorFlags::GE
when :exclude
flags |= TableCursorFlags::GT
end
case max_border
when :include
flags |= TableCursorFlags::LE
when :exclude
flags |= TableCursorFlags::LT
end
lexicon = @range_index.table
@context.referred_objects << lexicon
TableCursor.open(lexicon,
:min => min,
:max => max,
:flags => flags) do |table_cursor|
IndexCursor.open(table_cursor, @range_index) do |index_cursor|
index_cursor.count
end
end
end
end
end
end
end
|