| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
 100
 101
 102
 103
 104
 105
 106
 107
 108
 109
 110
 111
 112
 113
 114
 115
 116
 117
 118
 119
 120
 121
 122
 123
 124
 125
 126
 127
 128
 129
 130
 131
 132
 133
 134
 135
 136
 137
 138
 139
 140
 141
 142
 143
 144
 145
 146
 147
 148
 149
 150
 151
 152
 153
 154
 155
 156
 157
 158
 159
 160
 161
 162
 163
 164
 165
 166
 167
 168
 169
 170
 171
 172
 173
 174
 175
 176
 177
 178
 179
 180
 181
 182
 183
 184
 185
 186
 187
 188
 189
 190
 191
 192
 193
 194
 195
 196
 197
 198
 199
 200
 201
 202
 203
 204
 205
 206
 207
 208
 209
 210
 211
 212
 213
 214
 215
 216
 217
 218
 219
 220
 221
 222
 223
 224
 225
 226
 227
 228
 229
 230
 231
 232
 233
 234
 235
 236
 237
 238
 239
 240
 241
 242
 243
 244
 245
 246
 247
 248
 249
 250
 251
 252
 253
 254
 255
 256
 257
 258
 259
 260
 261
 262
 263
 264
 265
 266
 267
 268
 269
 270
 271
 272
 273
 274
 275
 276
 277
 278
 279
 280
 281
 282
 283
 284
 285
 286
 287
 288
 289
 290
 291
 292
 293
 294
 295
 296
 297
 298
 299
 300
 301
 302
 303
 304
 305
 306
 307
 308
 309
 310
 311
 312
 313
 314
 315
 316
 317
 318
 319
 320
 321
 322
 323
 324
 325
 326
 327
 328
 329
 330
 331
 332
 333
 334
 335
 336
 337
 338
 339
 340
 341
 342
 343
 344
 345
 346
 347
 348
 349
 350
 351
 352
 353
 354
 355
 356
 357
 358
 359
 360
 361
 362
 363
 364
 365
 366
 367
 368
 369
 370
 371
 372
 373
 374
 375
 376
 377
 378
 379
 380
 381
 382
 383
 384
 385
 386
 387
 388
 389
 390
 391
 392
 393
 394
 395
 396
 397
 398
 399
 400
 401
 402
 403
 404
 405
 406
 407
 408
 409
 410
 411
 412
 413
 414
 415
 416
 417
 418
 419
 420
 421
 422
 423
 424
 425
 426
 427
 428
 429
 430
 431
 432
 433
 434
 435
 436
 437
 438
 439
 440
 441
 442
 443
 444
 445
 446
 447
 448
 449
 450
 451
 452
 453
 454
 455
 456
 457
 458
 459
 460
 461
 462
 463
 464
 465
 466
 467
 468
 469
 470
 471
 472
 473
 474
 475
 476
 477
 478
 479
 480
 481
 482
 483
 484
 485
 486
 487
 488
 489
 490
 491
 492
 493
 494
 495
 496
 497
 498
 499
 500
 501
 502
 503
 504
 505
 506
 507
 508
 509
 510
 511
 512
 513
 514
 515
 516
 517
 518
 519
 520
 521
 522
 523
 524
 525
 526
 527
 528
 529
 530
 531
 532
 533
 534
 535
 536
 537
 538
 539
 540
 541
 542
 543
 544
 545
 546
 547
 548
 549
 550
 551
 552
 553
 554
 555
 556
 557
 558
 559
 560
 561
 562
 563
 564
 565
 566
 567
 568
 569
 570
 571
 572
 573
 574
 575
 576
 577
 578
 579
 580
 581
 582
 583
 584
 585
 586
 587
 588
 589
 590
 591
 592
 593
 594
 595
 596
 597
 598
 599
 600
 601
 602
 603
 604
 605
 606
 607
 608
 609
 610
 611
 612
 613
 614
 615
 616
 617
 618
 619
 620
 621
 622
 623
 624
 625
 626
 627
 628
 629
 630
 
 | # frozen_string_literal: false
require 'test/unit'
require 'tmpdir'
require 'timeout'
class TestThreadQueue < Test::Unit::TestCase
  Queue = Thread::Queue
  SizedQueue = Thread::SizedQueue
  def test_queue_initialized
    assert_raise(TypeError) {
      Queue.allocate.push(nil)
    }
  end
  def test_sized_queue_initialized
    assert_raise(TypeError) {
      SizedQueue.allocate.push(nil)
    }
  end
  def test_queue
    grind(5, 1000, 15, Queue)
  end
  def test_sized_queue
    grind(5, 1000, 15, SizedQueue, 1000)
  end
  def grind(num_threads, num_objects, num_iterations, klass, *args)
    from_workers = klass.new(*args)
    to_workers = klass.new(*args)
    workers = (1..num_threads).map {
      Thread.new {
        while object = to_workers.pop
          from_workers.push object
        end
      }
    }
    Thread.new {
      num_iterations.times {
        num_objects.times { to_workers.push 99 }
        num_objects.times { from_workers.pop }
      }
    }.join
    # close the queue the old way to test for backwards-compatibility
    num_threads.times { to_workers.push nil }
    workers.each { |t| t.join }
    assert_equal 0, from_workers.size
    assert_equal 0, to_workers.size
  end
  def test_sized_queue_initialize
    q = SizedQueue.new(1)
    assert_equal 1, q.max
    assert_raise(ArgumentError) { SizedQueue.new(0) }
    assert_raise(ArgumentError) { SizedQueue.new(-1) }
  end
  def test_sized_queue_assign_max
    q = SizedQueue.new(2)
    assert_equal(2, q.max)
    q.max = 1
    assert_equal(1, q.max)
    assert_raise(ArgumentError) { q.max = 0 }
    assert_equal(1, q.max)
    assert_raise(ArgumentError) { q.max = -1 }
    assert_equal(1, q.max)
    before = q.max
    q.max.times { q << 1 }
    t1 = Thread.new { q << 1 }
    sleep 0.01 until t1.stop?
    q.max = q.max + 1
    assert_equal before + 1, q.max
  ensure
    t1.join if t1
  end
  def test_queue_pop_interrupt
    q = Queue.new
    t1 = Thread.new { q.pop }
    sleep 0.01 until t1.stop?
    t1.kill.join
    assert_equal(0, q.num_waiting)
  end
  def test_queue_pop_non_block
    q = Queue.new
    assert_raise_with_message(ThreadError, /empty/) do
      q.pop(true)
    end
  end
  def test_sized_queue_pop_interrupt
    q = SizedQueue.new(1)
    t1 = Thread.new { q.pop }
    sleep 0.01 until t1.stop?
    t1.kill.join
    assert_equal(0, q.num_waiting)
  end
  def test_sized_queue_pop_non_block
    q = SizedQueue.new(1)
    assert_raise_with_message(ThreadError, /empty/) do
      q.pop(true)
    end
  end
  def test_sized_queue_push_interrupt
    q = SizedQueue.new(1)
    q.push(1)
    assert_raise_with_message(ThreadError, /full/) do
      q.push(2, true)
    end
  end
  def test_sized_queue_push_non_block
    q = SizedQueue.new(1)
    q.push(1)
    t1 = Thread.new { q.push(2) }
    sleep 0.01 until t1.stop?
    t1.kill.join
    assert_equal(0, q.num_waiting)
  end
  def test_thr_kill
    bug5343 = '[ruby-core:39634]'
    Dir.mktmpdir {|d|
      timeout = 60
      total_count = 250
      begin
        assert_normal_exit(<<-"_eom", bug5343, **{:timeout => timeout, :chdir=>d})
          #{total_count}.times do |i|
            open("test_thr_kill_count", "w") {|f| f.puts i }
            queue = Queue.new
            r, w = IO.pipe
            th = Thread.start {
              queue.push(nil)
              r.read 1
            }
            queue.pop
            th.kill
            th.join
          end
        _eom
      rescue Timeout::Error
        count = File.read("#{d}/test_thr_kill_count").to_i
        flunk "only #{count}/#{total_count} done in #{timeout} seconds."
      end
    }
  end
  def test_queue_push_return_value
    q = Queue.new
    retval = q.push(1)
    assert_same q, retval
  end
  def test_queue_clear_return_value
    q = Queue.new
    retval = q.clear
    assert_same q, retval
  end
  def test_sized_queue_clear
    # Fill queue, then test that SizedQueue#clear wakes up all waiting threads
    sq = SizedQueue.new(2)
    2.times { sq << 1 }
    t1 = Thread.new do
      sq << 1
    end
    t2 = Thread.new do
      sq << 1
    end
    t3 = Thread.new do
      Thread.pass
      sq.clear
    end
    [t3, t2, t1].each(&:join)
    assert_equal sq.length, 2
  end
  def test_sized_queue_push_return_value
    q = SizedQueue.new(1)
    retval = q.push(1)
    assert_same q, retval
  end
  def test_sized_queue_clear_return_value
    q = SizedQueue.new(1)
    retval = q.clear
    assert_same q, retval
  end
  def test_sized_queue_throttle
    q = SizedQueue.new(1)
    i = 0
    consumer = Thread.new do
      while q.pop
        i += 1
        Thread.pass
      end
    end
    nprod = 4
    npush = 100
    producer = nprod.times.map do
      Thread.new do
        npush.times { q.push(true) }
      end
    end
    producer.each(&:join)
    q.push(nil)
    consumer.join
    assert_equal(nprod * npush, i)
  end
  def test_queue_thread_raise
    q = Queue.new
    th1 = Thread.new do
      begin
        q.pop
      rescue RuntimeError
        sleep
      end
    end
    th2 = Thread.new do
      sleep 0.1
      q.pop
    end
    sleep 0.1
    th1.raise
    sleep 0.1
    q << :s
    assert_nothing_raised(Timeout::Error) do
      Timeout.timeout(1) { th2.join }
    end
  ensure
    [th1, th2].each do |th|
      if th and th.alive?
        th.wakeup
        th.join
      end
    end
  end
  def test_dup
    bug9440 = '[ruby-core:59961] [Bug #9440]'
    q = Queue.new
    assert_raise(NoMethodError, bug9440) do
      q.dup
    end
  end
  (DumpableQueue = Queue.dup).class_eval {remove_method :marshal_dump}
  def test_dump
    bug9674 = '[ruby-core:61677] [Bug #9674]'
    q = Queue.new
    assert_raise_with_message(TypeError, /#{Queue}/, bug9674) do
      Marshal.dump(q)
    end
    sq = SizedQueue.new(1)
    assert_raise_with_message(TypeError, /#{SizedQueue}/, bug9674) do
      Marshal.dump(sq)
    end
    q = DumpableQueue.new
    assert_raise(TypeError, bug9674) do
      Marshal.dump(q)
    end
  end
  def test_close
    [->{Queue.new}, ->{SizedQueue.new 3}].each do |qcreate|
      q = qcreate.call
      assert_equal false, q.closed?
      q << :something
      assert_equal q, q.close
      assert q.closed?
      assert_raise_with_message(ClosedQueueError, /closed/){q << :nothing}
      assert_equal q.pop, :something
      assert_nil q.pop
      assert_nil q.pop
      # non-blocking
      assert_raise_with_message(ThreadError, /queue empty/){q.pop(non_block=true)}
    end
  end
  # test that waiting producers are woken up on close
  def close_wakeup( num_items, num_threads, &qcreate )
    raise "This test won't work with num_items(#{num_items}) >= num_threads(#{num_threads})" if num_items >= num_threads
    # create the Queue
    q = yield
    threads = num_threads.times.map{Thread.new{q.pop}}
    num_items.times{|i| q << i}
    # wait until queue empty
    (Thread.pass; sleep 0.01) until q.size == 0
    # close the queue so remaining threads will wake up
    q.close
    # wait for them to go away
    Thread.pass until threads.all?{|thr| thr.status == false}
    # check that they've gone away. Convert nil to -1 so we can sort and do the comparison
    expected_values = [-1] * (num_threads - num_items) + num_items.times.to_a
    assert_equal expected_values, threads.map{|thr| thr.value || -1 }.sort
  end
  def test_queue_close_wakeup
    close_wakeup(15, 18){Queue.new}
  end
  def test_size_queue_close_wakeup
    close_wakeup(5, 8){SizedQueue.new 9}
  end
  def test_sized_queue_one_closed_interrupt
    q = SizedQueue.new 1
    q << :one
    t1 = Thread.new {
      Thread.current.report_on_exception = false
      q << :two
    }
    sleep 0.01 until t1.stop?
    q.close
    assert_raise(ClosedQueueError) {t1.join}
    assert_equal 1, q.size
    assert_equal :one, q.pop
    assert q.empty?, "queue not empty"
  end
  # make sure that shutdown state is handled properly by empty? for the non-blocking case
  def test_empty_non_blocking
    q = SizedQueue.new 3
    3.times{|i| q << i}
    # these all block cos the queue is full
    prod_threads = 4.times.map {|i|
      Thread.new {
        Thread.current.report_on_exception = false
        q << 3+i
      }
    }
    sleep 0.01 until prod_threads.all?{|thr| thr.stop?}
    items = []
    # sometimes empty? is false but pop will raise ThreadError('empty'),
    # meaning a value is not immediately available but will be soon.
    while prod_threads.any?(&:alive?) or !q.empty?
      items << q.pop(true) rescue nil
    end
    assert_join_threads(prod_threads)
    items.compact!
    assert_equal 7.times.to_a, items.sort
    assert q.empty?
  end
  def test_sized_queue_closed_push_non_blocking
    q = SizedQueue.new 7
    q.close
    assert_raise_with_message(ClosedQueueError, /queue closed/){q.push(non_block=true)}
  end
  def test_blocked_pushers
    q = SizedQueue.new 3
    prod_threads = 6.times.map do |i|
      thr = Thread.new{
        Thread.current.report_on_exception = false
        q << i
      }
      thr[:pc] = i
      thr
    end
    # wait until some producer threads have finished, and the other 3 are blocked
    sleep 0.01 while prod_threads.reject{|t| t.status}.count < 3
    # this would ensure that all producer threads call push before close
    # sleep 0.01 while prod_threads.select{|t| t.status == 'sleep'}.count < 3
    q.close
    # more than prod_threads
    cons_threads = 10.times.map do |i|
      thr = Thread.new{q.pop}; thr[:pc] = i; thr
    end
    # values that came from the queue
    popped_values = cons_threads.map &:value
    # wait untl all threads have finished
    sleep 0.01 until prod_threads.find_all{|t| t.status}.count == 0
    # pick only the producer threads that got in before close
    successful_prod_threads = prod_threads.reject{|thr| thr.status == nil}
    assert_nothing_raised{ successful_prod_threads.map(&:value) }
    # the producer threads that tried to push after q.close should all fail
    unsuccessful_prod_threads = prod_threads - successful_prod_threads
    unsuccessful_prod_threads.each do |thr|
      assert_raise(ClosedQueueError){ thr.value }
    end
    assert_equal cons_threads.size, popped_values.size
    assert_equal 0, q.size
    # check that consumer threads with values match producers that called push before close
    assert_equal successful_prod_threads.map{|thr| thr[:pc]}, popped_values.compact.sort
    assert_nil q.pop
  end
  def test_deny_pushers
    [->{Queue.new}, ->{SizedQueue.new 3}].each do |qcreate|
      q = qcreate[]
      synq = Queue.new
      prod_threads = 20.times.map do |i|
        Thread.new {
          synq.pop
          assert_raise(ClosedQueueError) {
            q << i
          }
        }
      end
      q.close
      synq.close # start producer threads
      prod_threads.each(&:join)
    end
  end
  # size should account for waiting pushers during shutdown
  def sized_queue_size_close
    q = SizedQueue.new 4
    4.times{|i| q << i}
    Thread.new{ q << 5 }
    Thread.new{ q << 6 }
    assert_equal 4, q.size
    assert_equal 4, q.items
    q.close
    assert_equal 6, q.size
    assert_equal 4, q.items
  end
  def test_blocked_pushers_empty
    q = SizedQueue.new 3
    prod_threads = 6.times.map do |i|
      Thread.new{
        Thread.current.report_on_exception = false
        q << i
      }
    end
    # this ensures that all producer threads call push before close
    sleep 0.01 while prod_threads.select{|t| t.status == 'sleep'}.count < 3
    q.close
    ary = []
    until q.empty?
      ary << q.pop
    end
    assert_equal 0, q.size
    assert_equal 3, ary.size
    ary.each{|e| assert [0,1,2,3,4,5].include?(e)}
    assert_nil q.pop
    prod_threads.each{|t|
      begin
        t.join
      rescue
      end
    }
  end
  # test thread wakeup on one-element SizedQueue with close
  def test_one_element_sized_queue
    q = SizedQueue.new 1
    t = Thread.new{ q.pop }
    q.close
    assert_nil t.value
  end
  def test_close_twice
    [->{Queue.new}, ->{SizedQueue.new 3}].each do |qcreate|
      q = qcreate[]
      q.close
      assert_nothing_raised(ClosedQueueError){q.close}
    end
  end
  def test_queue_close_multi_multi
    q = SizedQueue.new rand(800..1200)
    count_items = rand(3000..5000)
    count_producers = rand(10..20)
    producers = count_producers.times.map do
      Thread.new do
        sleep(rand / 100)
        count_items.times{|i| q << [i,"#{i} for #{Thread.current.inspect}"]}
      end
    end
    consumers = rand(7..12).times.map do
      Thread.new do
        count = 0
        while e = q.pop
          i, st = e
          count += 1 if i.is_a?(Integer) && st.is_a?(String)
        end
        count
      end
    end
    # No dead or finished threads, give up to 10 seconds to start running
    t = Time.now
    Thread.pass until Time.now - t > 10 || (consumers + producers).all?{|thr| thr.status =~ /\A(?:run|sleep)\z/}
    assert (consumers + producers).all?{|thr| thr.status =~ /\A(?:run|sleep)\z/}, 'no threads running'
    # just exercising the concurrency of the support methods.
    counter = Thread.new do
      until q.closed? && q.empty?
        raise if q.size > q.max
        # otherwise this exercise causes too much contention on the lock
        sleep 0.01
      end
    end
    producers.each &:join
    q.close
    # results not randomly distributed. Not sure why.
    # consumers.map{|thr| thr.value}.each do |x|
    #   assert_not_equal 0, x
    # end
    all_items_count = consumers.map{|thr| thr.value}.inject(:+)
    assert_equal count_items * count_producers, all_items_count
    # don't leak this thread
    assert_nothing_raised{counter.join}
  end
  def test_queue_with_trap
    if ENV['APPVEYOR'] == 'True' && RUBY_PLATFORM.match?(/mswin/)
      skip 'This test fails too often on AppVeyor vs140'
    end
    if RUBY_PLATFORM.match?(/mingw/)
      skip 'This test fails too often on MinGW'
    end
    assert_in_out_err([], <<-INPUT, %w(INT INT exit), [])
      q = Queue.new
      trap(:INT){
        q.push 'INT'
      }
      Thread.new{
        loop{
          Process.kill :INT, $$
        }
      }
      puts q.pop
      puts q.pop
      puts 'exit'
    INPUT
  end
  def test_fork_while_queue_waiting
    q = Queue.new
    sq = SizedQueue.new(1)
    thq = Thread.new { q.pop }
    thsq = Thread.new { sq.pop }
    Thread.pass until thq.stop? && thsq.stop?
    pid = fork do
      exit!(1) if q.num_waiting != 0
      exit!(2) if sq.num_waiting != 0
      exit!(6) unless q.empty?
      exit!(7) unless sq.empty?
      q.push :child_q
      sq.push :child_sq
      exit!(3) if q.pop != :child_q
      exit!(4) if sq.pop != :child_sq
      exit!(0)
    end
    _, s = Process.waitpid2(pid)
    assert_predicate s, :success?, 'no segfault [ruby-core:86316] [Bug #14634]'
    q.push :thq
    sq.push :thsq
    assert_equal :thq, thq.value
    assert_equal :thsq, thsq.value
    sq.push(1)
    th = Thread.new { q.pop; sq.pop }
    thsq = Thread.new { sq.push(2) }
    Thread.pass until th.stop? && thsq.stop?
    pid = fork do
      exit!(1) if q.num_waiting != 0
      exit!(2) if sq.num_waiting != 0
      exit!(3) unless q.empty?
      exit!(4) if sq.empty?
      exit!(5) if sq.pop != 1
      exit!(0)
    end
    _, s = Process.waitpid2(pid)
    assert_predicate s, :success?, 'no segfault [ruby-core:86316] [Bug #14634]'
    assert_predicate thsq, :stop?
    assert_equal 1, sq.pop
    assert_same sq, thsq.value
    q.push('restart th')
    assert_equal 2, th.value
  end if Process.respond_to?(:fork)
end
 |