1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290
|
require_relative 'spec_helper'
{''=>false, ' with :preempt_async_thread Database option'=>true}.each do |desc, preempt_async_thread|
describe "async_thread_pool extension" do
before do
@db = Sequel.mock(:extensions=>'async_thread_pool', :fetch=>{:v=>1}, :keep_reference=>false, :num_async_threads=>1, :preempt_async_thread=>preempt_async_thread)
end
it 'should allow running queries in async threads' do
t = Thread.current
t2 = nil
q = Queue.new
q2 = Queue.new
@db[:test].async.all{|x| t3 = Thread.current; q2.push(x); q.pop; t2 = t3; q2.push(nil)}
t2.must_be_nil
q2.pop.must_equal(:v=>1)
q.push(nil)
q2.pop
t2.wont_be_nil
t.wont_equal t2
end
it 'should raise exceptions that occur in async threads when result is accessed' do
v = @db[:test].with_fetch(RuntimeError).async.first
proc{v.__value}.must_raise Sequel::DatabaseError
end
it 'should have proxy objects delegate all methods other than equal?, __id__, and __send__' do
v = @db[:test].async.first
v.class.must_equal Hash
(!v).must_equal false
(v == {:v=>1}).must_equal true
(v != {:v=>1}).must_equal false
v.instance_eval{__id__}.must_equal v.__value.instance_eval{__id__}
v.instance_exec{__id__}.must_equal v.__value.instance_exec{__id__}
v.__send__(:__id__).wont_equal v.__value.__send__(:__id__)
v.respond_to?(:each).must_equal true
v.__send__(:respond_to_missing?, :each).must_equal true
v = @db[:test].async.with_fetch(:v=>false).get(:v)
v.class.must_equal FalseClass
(!v).must_equal true
(v == false).must_equal true
(v != false).must_equal false
v.respond_to?(:each).must_equal false
v.__send__(:respond_to_missing?, :each).must_equal false
end
it 'should work when loading async_thread_pool extension after already loaded' do
@db.extension(:async_thread_pool)
@db[:test].async.first.must_equal(:v=>1)
end
it 'should support sync methods on async datasets to not use an async thread or proxy object' do
t = Thread.current
t2 = nil
v = @db[:test].async.sync.all{|x| t2 = Thread.current}
(Array === v).must_equal true
t2.must_equal t
end
it 'should support async loading with proxy objects on all dataset action and enumerable methods' do
ds = @db[:test].async.with_autoid(1)
ds.<<(:v=>1).__value.must_be_kind_of Sequel::Dataset
ds.each{}.__value.must_be_kind_of Sequel::Dataset
ds.fetch_rows('foo'){}.__value.must_be_nil
ds.import([:v], [[1]]).__value
ds.multi_insert([{:v=>1}]).__value
ds.order(:v).paged_each{}.__value.must_be_kind_of Sequel::Dataset
ds.where_each(:v){}.__value.must_be_kind_of Sequel::Dataset
ds.truncate.__value.must_be_nil
@db.sqls.must_equal [
"INSERT INTO test (v) VALUES (1)",
"SELECT * FROM test",
"foo",
"INSERT INTO test (v) VALUES (1)",
"INSERT INTO test (v) VALUES (1)",
"BEGIN", "SELECT * FROM test ORDER BY v LIMIT 1000 OFFSET 0", "COMMIT",
"SELECT * FROM test WHERE v",
"TRUNCATE TABLE test",
]
ds[:v].__value.must_equal(:v=>1)
ds.all.__value.must_equal [{:v=>1}]
ds.as_hash(:v, :v).__value.must_equal(1=>1)
ds.avg(:v).__value.must_equal(1)
ds.count.__value.must_equal(1)
ds.columns.__value.must_equal []
ds.columns!.__value.must_equal []
ds.delete.__value.must_equal 0
ds.empty?.__value.must_equal false
ds.first.__value.must_equal(:v=>1)
ds.first!.__value.must_equal(:v=>1)
ds.get(:v).__value.must_equal 1
ds.insert.__value.must_equal 2
ds.order(:v).last.__value.must_equal(:v=>1)
ds.max(:v).__value.must_equal 1
ds.min(:v).__value.must_equal 1
ds.select_hash(:v, :v).__value.must_equal(1=>1)
ds.select_hash_groups(:v, :v).__value.must_equal(1=>[1])
ds.select_map(:v).__value.must_equal([1])
ds.select_order_map(:v).__value.must_equal([1])
ds.single_record.__value.must_equal(:v=>1)
ds.single_record!.__value.must_equal(:v=>1)
ds.single_value.__value.must_equal 1
ds.single_value!.__value.must_equal 1
ds.sum(:v).__value.must_equal 1
ds.to_hash(:v).__value.must_equal(1=>{:v=>1})
ds.to_hash_groups(:v).__value.must_equal(1=>[{:v=>1}])
ds.update(:v=>1).__value.must_equal 0
ds.where_all(:v).__value.must_equal [{:v=>1}]
ds.where_single_value(:v).__value.must_equal 1
ds.all?.__value.must_equal true
ds.any?.__value.must_equal true
ds.drop(0).__value.must_equal [{:v=>1}]
ds.entries.__value.must_equal [{:v=>1}]
ds.grep_v(//).__value.must_equal [{:v=>1}] if RUBY_VERSION >= '2.3'
ds.include?(:v=>1).__value.must_equal true
ds.inject{}.__value.must_equal(:v=>1)
ds.member?(:v=>1).__value.must_equal true
ds.minmax.__value.must_equal([{:v=>1}, {:v=>1}])
ds.none?.__value.must_equal false
ds.one?.__value.must_equal true
ds.reduce{}.__value.must_equal(:v=>1)
ds.sort.__value.must_equal [{:v=>1}]
ds.take(1).__value.must_equal [{:v=>1}]
ds.tally.__value.must_equal({:v=>1}=>1) if RUBY_VERSION >= '2.7'
ds.to_a.__value.must_equal [{:v=>1}]
ds.to_h{|x| [x[:v], x]}.__value.must_equal(1=>{:v=>1}) if RUBY_VERSION >= '2.6'
ds.uniq.__value.must_equal [{:v=>1}] if RUBY_VERSION >= '2.4'
ds.zip.__value.must_equal [[{:v=>1}]]
ds.collect{|x| x}.__value.must_equal [{:v=>1}]
ds.collect_concat{|x| x}.__value.must_equal [{:v=>1}]
ds.detect{true}.__value.must_equal(:v=>1)
ds.drop_while{false}.__value.must_equal [{:v=>1}]
ds.each_with_object(0){|x| x}.__value.must_equal 0
ds.filter_map{|x| x}.__value.must_equal [{:v=>1}] if RUBY_VERSION >= '2.7'
ds.find{true}.__value.must_equal(:v=>1)
ds.find_all{true}.__value.must_equal [{:v=>1}]
ds.find_index{true}.__value.must_equal 0
ds.flat_map{|x| x}.__value.must_equal [{:v=>1}]
ds.max_by{}.__value.must_equal(:v=>1)
ds.min_by{}.__value.must_equal(:v=>1)
ds.minmax_by{}.__value.must_equal [{:v=>1}, {:v=>1}]
ds.partition{true}.__value.must_equal [[{:v=>1}], []]
ds.reject{false}.__value.must_equal [{:v=>1}]
ds.sort_by{}.__value.must_equal [{:v=>1}]
ds.take_while{true}.__value.must_equal [{:v=>1}]
@db.sqls
if RUBY_VERSION >= '3.1'
ds.each_cons(1){}.__value.must_be_kind_of Sequel::Dataset
ds.each_slice(1){}.__value.must_be_kind_of Sequel::Dataset
else
ds.each_cons(1){}.__value.must_be_nil
ds.each_slice(1){}.__value.must_be_nil
end
ds.each_entry{}.__value.must_be_kind_of Sequel::Dataset
ds.each_with_index{}.__value.must_be_kind_of Sequel::Dataset
ds.reverse_each{}.__value.must_be_kind_of Sequel::Dataset
@db.sqls.must_equal [
"SELECT * FROM test",
"SELECT * FROM test",
"SELECT * FROM test",
"SELECT * FROM test",
"SELECT * FROM test",
]
(Enumerator === ds.collect).must_equal true
(Enumerator === ds.collect_concat).must_equal true
(Enumerator === ds.detect).must_equal true
(Enumerator === ds.drop_while).must_equal true
(Enumerator === ds.each_cons(1)).must_equal true
(Enumerator === ds.each_entry).must_equal true
(Enumerator === ds.each_slice(1)).must_equal true
(Enumerator === ds.each_with_index).must_equal true
(Enumerator === ds.each_with_object(1)).must_equal true
(Enumerator === ds.filter_map).must_equal true if RUBY_VERSION >= '2.7'
(Enumerator === ds.find).must_equal true
(Enumerator === ds.find_all).must_equal true
(Enumerator === ds.find_index).must_equal true
(Enumerator === ds.flat_map).must_equal true
(Enumerator === ds.max_by).must_equal true
(Enumerator === ds.min_by).must_equal true
(Enumerator === ds.minmax_by).must_equal true
(Enumerator === ds.partition).must_equal true
(Enumerator === ds.reject).must_equal true
(Enumerator === ds.reverse_each).must_equal true
(Enumerator === ds.sort_by).must_equal true
(Enumerator === ds.take_while).must_equal true
(Enumerator === ds.order(:v).paged_each).must_equal true
ds.map(:v).__value.must_equal [1]
ds.map{|x| x}.__value.must_equal [{:v=>1}]
(Enumerator === ds.map).must_equal true
end
end
end
describe "async_thread_pool extension" do
before do
@db = Sequel.mock(:extensions=>'async_thread_pool', :fetch=>{:v=>1}, :keep_reference=>false, :num_async_threads=>1)
end
it 'should perform async work before returning value' do
t = Thread.current
t2 = nil
v = @db[:test].async.all{|x| t2 = Thread.current}
v.must_equal [{:v=>1}]
t2.wont_be_nil
t.wont_equal t2
v.equal?(v.to_a).must_equal false
(Array === v).must_equal false
v.__value.equal?(v.to_a).must_equal true
(Array === v.__value).must_equal true
if RUBY_VERSION >= '2.2'
v.itself.equal?(v.to_a).must_equal true
(Array === v.itself).must_equal true
end
end
it 'should not allow calling the __run_block multiple times' do
v = Sequel::Database::AsyncThreadPool::Proxy.new{1}
v.__send__(:__run_block)
proc{v.__send__(:__run_block)}.must_raise Sequel::Error
end
it 'should not allow creating proxy objects without a block' do
proc{Sequel::Database::AsyncThreadPool::Proxy.new}.must_raise Sequel::Error
end
end
describe "async_thread_pool extension with :preempt_async_thread Database option" do
before do
@db = Sequel.mock(:extensions=>'async_thread_pool', :fetch=>{:v=>1}, :keep_reference=>false, :num_async_threads=>1, :preempt_async_thread=>true)
end
it 'should allow preempting async threads' do
t = Thread.current
t2 = nil
t4 = nil
q = Queue.new
q2 = Queue.new
@db[:test].async.all{|x| t3 = Thread.current; q2.push(x); q.pop; t2 = t3; q2.push(nil)}
t2.must_be_nil
q2.pop.must_equal(:v=>1)
@db[:test].async.all{|x| t4 = Thread.current}.__value
t4.must_equal t
q.push(nil)
q2.pop
t2.wont_be_nil
t.wont_equal t2
end
end
describe "async_thread_pool extension" do
it "should raise an error if trying to load the async_thread_pool extension into a single connection pool" do
db = Sequel.mock(:keep_reference=>false, :single_threaded=>true)
proc{db.extension(:async_thread_pool)}.must_raise Sequel::Error
end
it "should use :num_async_threads as size of async thread pool" do
3.times do |i|
Sequel.mock(:extensions=>'async_thread_pool', :num_async_threads=>i+1, :max_connections=>4).instance_variable_get(:@async_thread_pool).size.must_equal(i+1)
end
end
it "should use :max_connections as size of async thread pool if :num_async_threads is not given" do
3.times do |i|
Sequel.mock(:extensions=>'async_thread_pool', :max_connections=>i+1).instance_variable_get(:@async_thread_pool).size.must_equal(i+1)
end
end
it "should use 4 as size of async thread pool if :num_async_threads and :max_connections is not given" do
Sequel.mock(:extensions=>'async_thread_pool').instance_variable_get(:@async_thread_pool).size.must_equal 4
end
it "should raise if the number of async threads is not positive" do
proc{Sequel.mock(:extensions=>'async_thread_pool', :num_async_threads=>0)}.must_raise Sequel::Error
end
end
|