File: promises_spec.rb

package info (click to toggle)
ruby-concurrent 1.1.6%2Bdfsg-3
  • links: PTS, VCS
  • area: main
  • in suites: bullseye
  • size: 30,284 kB
  • sloc: ruby: 30,875; java: 6,117; ansic: 288; makefile: 9; sh: 6
file content (799 lines) | stat: -rw-r--r-- 28,015 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
require 'concurrent/edge/promises'
require 'thread'

RSpec.describe 'Concurrent::Promises' do

  include Concurrent::Promises::FactoryMethods

  describe 'chain_resolvable' do
    it 'event' do
      b = resolvable_event
      a = resolvable_event.chain_resolvable(b)
      a.resolve
      expect(b).to be_resolved
    end

    it 'future' do
      b = resolvable_future
      a = resolvable_future.chain_resolvable(b)
      a.fulfill :val
      expect(b).to be_resolved
      expect(b.value).to eq :val
    end
  end

  describe '.future' do
    it 'executes' do
      future = future { 1 + 1 }
      expect(future.value!).to eq 2

      future = fulfilled_future(1).then { |v| v + 1 }
      expect(future.value!).to eq 2

      future = future(1, 2, &-> v { v })
      expect { future.value! }.to raise_error ArgumentError, /wrong number of arguments/

      future = fulfilled_future(1).then(2, &-> v { v })
      expect { future.value! }.to raise_error ArgumentError, /wrong number of arguments/
    end

    it 'executes with args' do
      future = future(1, 2, &:+)
      expect(future.value!).to eq 3

      future = fulfilled_future(1).then(1) { |v, a| v + 1 }
      expect(future.value!).to eq 2
    end
  end

  describe '.delay' do

    def behaves_as_delay(delay, value)
      expect(delay.resolved?).to eq false
      expect(delay.value!).to eq value
    end

    specify do
      behaves_as_delay delay { 1 + 1 }, 2
      behaves_as_delay fulfilled_future(1).delay.then { |v| v + 1 }, 2
      behaves_as_delay delay(1) { |a| a + 1 }, 2
      behaves_as_delay fulfilled_future(1).delay.then { |v| v + 1 }, 2
    end
  end

  describe '.schedule' do
    it 'scheduled execution' do
      start  = Time.now.to_f
      queue  = Queue.new
      future = schedule(0.1) { 1 + 1 }.then { |v| queue.push(v); queue.push(Time.now.to_f - start); queue }

      expect(future.value!).to eq queue
      expect(queue.pop).to eq 2
      expect(queue.pop).to be >= 0.09

      start  = Time.now.to_f
      queue  = Queue.new
      future = resolved_event.
          schedule(0.1).
          then { 1 }.
          then { |v| queue.push(v); queue.push(Time.now.to_f - start); queue }

      expect(future.value!).to eq queue
      expect(queue.pop).to eq 1
      expect(queue.pop).to be >= 0.09
    end

    it 'scheduled execution in graph' do
      start  = Time.now.to_f
      queue  = Queue.new
      future = future { sleep 0.1; 1 }.
          schedule(0.1).
          then { |v| v + 1 }.
          then { |v| queue.push(v); queue.push(Time.now.to_f - start); queue }

      expect(future.value!).to eq queue
      expect(queue.pop).to eq 2
      expect(queue.pop).to be >= 0.09

      scheduled = resolved_event.schedule(0.1)
      expect(scheduled.resolved?).to be_falsey
      scheduled.wait
      expect(scheduled.resolved?).to be_truthy
    end

  end

  describe '.event' do
    specify do
      resolvable_event = resolvable_event()
      one              = resolvable_event.chain(1) { |arg| arg }
      join             = zip(resolvable_event).chain { 1 }
      expect(one.resolved?).to be false
      resolvable_event.resolve
      expect(one.value!).to eq 1
      expect(join.wait.resolved?).to be true
    end
  end

  describe '.future without block' do
    specify do
      resolvable_future = resolvable_future()
      one               = resolvable_future.then(&:succ)
      join              = zip_futures(resolvable_future).then { |v| v }
      expect(one.resolved?).to be false
      resolvable_future.fulfill 0
      expect(one.value!).to eq 1
      expect(join.wait!.resolved?).to be true
      expect(join.value!).to eq 0
    end
  end

  describe '.any_resolved' do
    it 'continues on first result' do
      f1 = resolvable_future
      f2 = resolvable_future
      f3 = resolvable_future

      any1 = any_resolved_future(f1, f2)
      any2 = f2 | f3

      f1.fulfill 1
      f2.reject StandardError.new

      expect(any1.value!).to eq 1
      expect(any2.reason).to be_a_kind_of StandardError
    end
  end

  describe '.any_fulfilled' do
    it 'continues on first result' do
      f1 = resolvable_future
      f2 = resolvable_future

      any = any_fulfilled_future(f1, f2)

      f1.reject StandardError.new
      f2.fulfill :value

      expect(any.value!).to eq :value
    end
  end

  describe '.zip' do
    it 'waits for all results' do
      a = future { 1 }
      b = future { 2 }
      c = future { 3 }

      z1 = a & b
      z2 = zip a, b, c
      z3 = zip a
      z4 = zip

      expect(z1.value!).to eq [1, 2]
      expect(z2.value!).to eq [1, 2, 3]
      expect(z3.value!).to eq [1]
      expect(z4.value!).to eq []

      q = Queue.new
      z1.then { |*args| q << args }
      # first is an array because it is zipping so 2 arguments
      expect(q.pop).to eq [1, 2]

      z1.then { |*args| args }.then { |*args| q << args }
      # after then it is again just one argument
      expect(q.pop).to eq [[1, 2]]

      fulfilled_future([1, 2]).then { |*args| q << args }
      expect(q.pop).to eq [[1, 2]]

      z1.then { |a1, b1, c1| q << [a1, b1, c1] }
      expect(q.pop).to eq [1, 2, nil]

      z2.then { |a1, b1, c1| q << [a1, b1, c1] }
      expect(q.pop).to eq [1, 2, 3]

      z3.then { |a1| q << a1 }
      expect(q.pop).to eq 1

      z3.then { |*as| q << as }
      expect(q.pop).to eq [1]

      z4.then { |a1| q << a1 }
      expect(q.pop).to eq nil

      z4.then { |*as| q << as }
      expect(q.pop).to eq []

      expect(z1.then { |a1, b1| a1 + b1 }.value!).to eq 3
      expect(z1.then { |a1, b1| a1 + b1 }.value!).to eq 3
      expect(z1.then(&:+).value!).to eq 3
      expect(z2.then { |a1, b1, c1| a1 + b1 + c1 }.value!).to eq 6

      expect(future { 1 }.delay).to be_a_kind_of Concurrent::Promises::Future
      expect(future { 1 }.delay.wait!).to be_resolved
      expect(resolvable_event.resolve.delay).to be_a_kind_of Concurrent::Promises::Event
      expect(resolvable_event.resolve.delay.wait).to be_resolved

      a = future { 1 }
      b = future { raise 'b' }
      c = future { raise 'c' }

      zip(a, b, c).chain { |*args| q << args }
      expect(q.pop.flatten.map(&:class)).to eq [FalseClass, 0.class, NilClass, NilClass, NilClass, RuntimeError, RuntimeError]
      zip(a, b, c).rescue { |*args| q << args }
      expect(q.pop.map(&:class)).to eq [NilClass, RuntimeError, RuntimeError]

      expect(zip.wait(0.1)).to eq true
    end

    context 'when a future raises an error' do

      let(:a_future) { future { raise 'error' } }

      it 'raises a concurrent error' do
        expect { zip(a_future).value! }.to raise_error(StandardError, 'error')
      end

      context 'when deeply nested' do
        it 'raises the original error' do
          expect { zip(zip(a_future)).value! }.to raise_error(StandardError, 'error')
        end
      end
    end
  end

  describe '.zip_events' do
    it 'waits for all and returns event' do
      a = fulfilled_future 1
      b = rejected_future :any
      c = resolvable_event.resolve

      z2 = zip_events a, b, c
      z3 = zip_events a
      z4 = zip_events

      expect(z2.resolved?).to be_truthy
      expect(z3.resolved?).to be_truthy
      expect(z4.resolved?).to be_truthy
    end
  end

  describe '.rejected_future' do
    it 'raises the correct error when passed an unraised error' do
      f = rejected_future(StandardError.new('boom'))
      expect { f.value! }.to raise_error(StandardError, 'boom')
    end
  end

  describe 'Future' do
    it 'has sync and async callbacks' do
      callbacks_tester = ->(event_or_future) do
        queue     = Queue.new
        push_args = -> *args { queue.push args }

        event_or_future.on_resolution!(&push_args)
        event_or_future.on_resolution!(1, &push_args)
        if event_or_future.is_a? Concurrent::Promises::Future
          event_or_future.on_fulfillment!(&push_args)
          event_or_future.on_fulfillment!(2, &push_args)
          event_or_future.on_rejection!(&push_args)
          event_or_future.on_rejection!(3, &push_args)
        end

        event_or_future.on_resolution(&push_args)
        event_or_future.on_resolution(4, &push_args)
        if event_or_future.is_a? Concurrent::Promises::Future
          event_or_future.on_fulfillment(&push_args)
          event_or_future.on_fulfillment(5, &push_args)
          event_or_future.on_rejection(&push_args)
          event_or_future.on_rejection(6, &push_args)
        end
        event_or_future.on_resolution_using(:io, &push_args)
        event_or_future.on_resolution_using(:io, 7, &push_args)
        if event_or_future.is_a? Concurrent::Promises::Future
          event_or_future.on_fulfillment_using(:io, &push_args)
          event_or_future.on_fulfillment_using(:io, 8, &push_args)
          event_or_future.on_rejection_using(:io, &push_args)
          event_or_future.on_rejection_using(:io, 9, &push_args)
        end

        event_or_future.wait
        ::Array.new(event_or_future.is_a?(Concurrent::Promises::Future) ? 12 : 6) { queue.pop }
      end

      callback_results = callbacks_tester.call(fulfilled_future(:v))
      expect(callback_results).to contain_exactly([true, :v, nil],
                                                  [true, :v, nil, 1],
                                                  [:v],
                                                  [:v, 2],
                                                  [true, :v, nil],
                                                  [true, :v, nil, 4],
                                                  [:v],
                                                  [:v, 5],
                                                  [true, :v, nil],
                                                  [true, :v, nil, 7],
                                                  [:v],
                                                  [:v, 8])

      err              = StandardError.new 'boo'
      callback_results = callbacks_tester.call(rejected_future(err))
      expect(callback_results).to contain_exactly([false, nil, err],
                                                  [false, nil, err, 1],
                                                  [err],
                                                  [err, 3],
                                                  [false, nil, err],
                                                  [false, nil, err, 4],
                                                  [err],
                                                  [err, 6],
                                                  [false, nil, err],
                                                  [false, nil, err, 7],
                                                  [err],
                                                  [err, 9])

      callback_results = callbacks_tester.call(resolved_event)
      expect(callback_results).to contain_exactly([], [1], [], [4], [], [7])
    end

    methods_with_timeout = { wait:   false,
                             wait!:  false,
                             value:  nil,
                             value!: nil,
                             reason: nil,
                             result: nil }
    methods_with_timeout.each do |method_with_timeout, timeout_value|
      it "#{ method_with_timeout } supports setting timeout" do
        start_latch = Concurrent::CountDownLatch.new
        end_latch   = Concurrent::CountDownLatch.new

        future = future do
          start_latch.count_down
          end_latch.wait(0.2)
        end

        expect(start_latch.wait(0.1)).to eq true
        expect(future).not_to be_resolved
        expect(future.send(method_with_timeout, 0.01)).to eq timeout_value
        expect(future).not_to be_resolved

        end_latch.count_down
        expect(future.value!).to eq true
      end
    end

    it 'chains' do
      future0 = future { 1 }.then { |v| v + 2 } # both executed on default FAST_EXECUTOR
      future1 = future0.then_on(:fast) { raise 'boo' } # executed on IO_EXECUTOR
      future2 = future1.then { |v| v + 1 } # will reject with 'boo' error, executed on default FAST_EXECUTOR
      future3 = future1.rescue { |err| err.message } # executed on default FAST_EXECUTOR
      future4 = future0.chain { |success, value, reason| success } # executed on default FAST_EXECUTOR
      future5 = future3.with_default_executor(:fast) # connects new future with different executor, the new future is resolved when future3 is
      future6 = future5.then(&:capitalize) # executes on IO_EXECUTOR because default was set to :io on future5
      future7 = future0 & future3
      future8 = future0.rescue { raise 'never happens' } # future0 fulfills so future8'll have same value as future 0

      futures = [future0, future1, future2, future3, future4, future5, future6, future7, future8]
      futures.each(&:wait)

      table = futures.each_with_index.map do |f, i|
        '%5i %7s %10s %6s %4s %6s' % [i, f.fulfilled?, f.value, f.reason,
                                      (f.promise.executor if f.promise.respond_to?(:executor)),
                                      f.default_executor]
      end.unshift('index success      value reason pool d.pool')

      expect(table.join("\n")).to eq <<-TABLE.gsub(/^\s+\|/, '').strip
        |index success      value reason pool d.pool
        |    0    true          3          io     io
        |    1   false               boo fast     io
        |    2   false               boo   io     io
        |    3    true        boo          io     io
        |    4    true       true          io     io
        |    5    true        boo               fast
        |    6    true        Boo        fast   fast
        |    7    true [3, "boo"]                 io
        |    8    true          3          io     io
      TABLE
    end

    it 'chains with correct arguments' do
      heads   = [future { 1 },
                 future { [2, 3] },
                 fulfilled_future(4),
                 fulfilled_future([5, 6])]
      results = [1,
                 [2, 3],
                 4,
                 [5, 6]]
      heads.each_with_index do |head, i|
        expect(head.then { |a| a }.value!).to eq results[i]
        expect(head.then { |a, b| [a, b].compact }.value!).to eq (results[i].is_a?(Array) ? results[i] : [results[i]])
        expect(head.then { |*a| a }.value!).to eq [results[i]]
      end
    end

    it 'constructs promise like tree' do
      # if head of the tree is not constructed with #future but with #delay it does not start execute,
      # it's triggered later by calling wait or value on any of the dependent futures or the delay itself
      three = (head = delay { 1 }).then { |v| v.succ }.then(&:succ)
      four  = three.delay.then(&:succ)

      # meaningful to_s and inspect defined for Future and Promise
      expect(head.to_s).to match(/#<Concurrent::Promises::Future:0x[\da-f]+ pending>/)
      expect(head.inspect).to(
          match(/#<Concurrent::Promises::Future:0x[\da-f]+ pending>/))

      # evaluates only up to three, four is left unevaluated
      expect(three.value!).to eq 3
      expect(four).not_to be_resolved

      expect(four.value!).to eq 4

      # futures hidden behind two delays trigger evaluation of both
      double_delay = delay { 1 }.delay.then(&:succ)
      expect(double_delay.value!).to eq 2
    end

    it 'allows graphs' do
      head    = future { 1 }
      branch1 = head.then(&:succ)
      branch2 = head.then(&:succ).delay.then(&:succ)
      results = [
          zip(branch1, branch2).then { |b1, b2| b1 + b2 },
          branch1.zip(branch2).then { |b1, b2| b1 + b2 },
          (branch1 & branch2).then { |b1, b2| b1 + b2 }]

      Thread.pass until branch1.resolved?
      expect(branch1).to be_resolved
      expect(branch2).not_to be_resolved

      expect(results.map(&:value)).to eq [5, 5, 5]
      expect(zip(branch1, branch2).value!).to eq [2, 3]
    end

    describe '#flat' do
      it 'returns value of inner future' do
        f = future { future { 1 } }.flat.then(&:succ)
        expect(f.value!).to eq 2
      end

      it 'propagates rejection of inner future' do
        err = StandardError.new('boo')
        f   = future { rejected_future(err) }.flat
        expect(f.reason).to eq err
      end

      it 'it propagates rejection of the future which was suppose to provide inner future' do
        f = future { raise 'boo' }.flat
        expect(f.reason.message).to eq 'boo'
      end

      it 'rejects if inner value is not a future' do
        f = future { 'boo' }.flat
        expect(f.reason).to be_an_instance_of TypeError
      end

      it 'accepts inner event' do
        f = future { resolved_event }.flat
        expect(f.result).to eq [true, nil, nil]
      end

      it 'propagates requests for values to delayed futures' do
        expect(future { delay { 1 } }.flat.value!(0.1)).to eq 1
        expect(::Array.new(3) { |i| Concurrent::Promises.delay { i } }.
            inject { |a, b| a.then { b }.flat }.value!(0.2)).to eq 2
      end

      it 'has shortcuts' do
        expect(fulfilled_future(1).then_flat { |v| future(v) { v + 1 } }.value!).to eq 2
        expect(fulfilled_future(1).then_flat_event { |v| resolved_event }.wait.resolved?).to eq true
        expect(fulfilled_future(1).then_flat_on(:fast) { |v| future(v) { v + 1 } }.value!).to eq 2
      end
    end

    it 'resolves future when Exception raised' do
      message = 'reject by an Exception'
      future  = future { raise Exception, message }
      expect(future.wait(0.1)).to eq true
      future.wait
      expect(future).to be_resolved
      expect(future).to be_rejected

      expect(future.reason).to be_instance_of Exception
      expect(future.result).to be_instance_of Array
      expect(future.value).to be_nil
      expect { future.value! }.to raise_error(Exception, message)
    end

    it 'runs' do
      body = lambda do |v|
        v += 1
        v < 5 ? future(v, &body) : v
      end
      expect(future(0, &body).run.value!).to eq 5

      body = lambda do |v|
        v += 1
        v < 5 ? future(v, &body) : raise(v.to_s)
      end
      expect(future(0, &body).run.reason.message).to eq '5'

      body = lambda do |v|
        v += 1
        v < 5 ? [future(v, &body)] : v
      end
      expect(future(0, &body).run(-> v { v.first if v.is_a? Array }).value!).to eq 5
    end

    it 'can be risen when rejected' do
      strip_methods = -> backtrace do
        backtrace.map do |line|
          /^.*:\d+:in/.match(line)[0] rescue line
        end
      end

      future    = rejected_future TypeError.new
      backtrace = caller; exception = (raise future rescue $!)
      expect(exception).to be_a TypeError
      expect(strip_methods[backtrace] - strip_methods[exception.backtrace]).to be_empty

      exception = TypeError.new
      exception.set_backtrace(first_backtrace = %W[/a /b /c])
      future    = rejected_future exception
      backtrace = caller; exception = (raise future rescue $!)
      expect(exception).to be_a TypeError
      expect(strip_methods[first_backtrace + backtrace] - strip_methods[exception.backtrace]).to be_empty

      future    = rejected_future(TypeError.new) & rejected_future(TypeError.new)
      backtrace = caller; exception = (raise future rescue $!)
      expect(exception).to be_a Concurrent::MultipleErrors
      expect(strip_methods[backtrace] - strip_methods[exception.backtrace]).to be_empty
    end
  end

  describe 'ResolvableEvent' do
    specify "#wait" do
      event = resolvable_event
      expect(event.wait(0, false)).to be_falsey
      expect(event.wait(0, true)).to be_falsey
      expect(event.wait).to eq event
      expect(event.wait(0, false)).to be_truthy
      expect(event.wait(0, true)).to be_truthy
    end

    specify "reservation" do
      event = resolvable_event
      expect(event.reserve).to be_truthy
      expect(event.pending?).to be_truthy
      expect(event.state).to eq :pending
      expect(event.resolve false).to be_falsey
      expect(event.resolve true, true).to be_truthy
    end
  end

  describe 'ResolvableFuture' do
    specify "#wait" do
      future = resolvable_future
      expect(future.wait(0)).to be_falsey
      expect(future.wait(0, [true, :v, nil])).to be_falsey
      expect(future.wait).to eq future
      expect(future.wait(0, nil)).to be_truthy
      expect(future.wait(0, [true, :v, nil])).to be_truthy
    end

    specify "#wait!" do
      future = resolvable_future
      expect(future.wait!(0)).to be_falsey
      expect(future.wait!(0, [true, :v, nil])).to be_falsey
      expect(future.wait!).to eq future
      expect(future.wait!(0, nil)).to be_truthy
      expect(future.wait!(0, [true, :v, nil])).to be_truthy

      future = resolvable_future
      expect(future.wait!(0)).to be_falsey
      expect(future.wait!(0, [false, nil, RuntimeError.new])).to be_falsey
      expect { future.wait! }.to raise_error RuntimeError
    end

    specify "#value" do
      future = resolvable_future
      expect(future.value(0)).to eq nil
      expect(future.value(0, :timeout, [true, :v, nil])).to eq :timeout
      expect(future.value).to eq :v
      expect(future.value(0)).to eq :v
      expect(future.value(0, :timeout, [true, :v, nil])).to eq :v
    end

    specify "#value!" do
      future = resolvable_future
      expect(future.value!(0)).to eq nil
      expect(future.value!(0, :timeout, [true, :v, nil])).to eq :timeout
      expect(future.value!).to eq :v
      expect(future.value!(0, :timeout, nil)).to eq :v
      expect(future.value!(0, :timeout, [true, :v, nil])).to eq :v

      future = resolvable_future
      expect(future.wait!(0)).to be_falsey
      expect(future.wait!(0, [false, nil, RuntimeError.new])).to be_falsey
      expect { future.wait! }.to raise_error RuntimeError
    end

    specify "#reason" do
      future = resolvable_future
      expect(future.reason(0)).to eq nil
      expect(future.reason(0, :timeout, [false, nil, :err])).to eq :timeout
      expect(future.reason).to eq :err
      expect(future.reason(0)).to eq :err
      expect(future.reason(0, :timeout, [false, nil, :err])).to eq :err
    end

    specify "result" do
      future = resolvable_future
      expect(future.result(0)).to eq nil
      expect(future.result(0, [true, :v, nil])).to be_falsey
      expect(future.result).to eq [true, :v, nil]
      expect(future.result(0)).to eq [true, :v, nil]
      expect(future.result(0, [true, :v, nil])).to eq [true, :v, nil]
    end

    specify "reservation" do
      future = resolvable_future
      expect(future.reserve).to be_truthy
      expect(future.pending?).to be_truthy
      expect(future.state).to eq :pending
      expect(future.resolve true, :value, nil, false).to be_falsey
      expect(future.fulfill :value, false).to be_falsey
      expect(future.reject :err, false).to be_falsey
      expect { future.resolve true, :value, nil }.to raise_error(Concurrent::MultipleAssignmentError)
      expect(future.resolve true, :value, nil, false, true).to be_truthy

      future = resolvable_future
      expect(future.reserve).to be_truthy
      expect(future.fulfill :value, false, true).to be_truthy

      future = resolvable_future
      expect(future.reserve).to be_truthy
      expect(future.reject :err, false, true).to be_truthy
    end

    specify "atomic_resolution" do
      future1 = resolvable_future
      future2 = resolvable_future

      expect(Concurrent::Promises::Resolvable.
          atomic_resolution(future1 => [true, :v, nil],
                            future2 => [false, nil, :err])).to eq true
      expect(future1.fulfilled?).to be_truthy
      expect(future2.rejected?).to be_truthy

      future1 = resolvable_future
      future2 = resolvable_future.fulfill :val

      expect(Concurrent::Promises::Resolvable.
          atomic_resolution(future1 => [true, :v, nil],
                            future2 => [false, nil, :err])).to eq false

      expect(future1.pending?).to be_truthy
      expect(future2.fulfilled?).to be_truthy

      expect(future1.reserve).to be_truthy
      expect(future2.reserve).to be_falsey
    end
  end

  describe 'interoperability' do
    it 'with processing actor', if: !defined?(JRUBY_VERSION) do
      actor = Concurrent::Actor::Utils::AdHoc.spawn :doubler do
        -> v { v * 2 }
      end

      expect(future { 2 }.
          then_ask(actor).
          then { |v| v + 2 }.
          value!).to eq 6
    end

    if Concurrent.const_defined? :ErlangActor
      it 'with erlang actor' do
        actor = Concurrent::ErlangActor.spawn type: :on_thread do
          reply receive * 2
        end

        expect(future { 2 }.
            then_ask(actor).
            then { |v| v + 2 }.
            value!).to eq 6
      end
    end

    it 'with channel' do
      ch1 = Concurrent::Promises::Channel.new
      ch2 = Concurrent::Promises::Channel.new

      result = Concurrent::Promises::Channel.select_op([ch1, ch2])
      ch1.push 1
      expect(result.value!).to eq [ch1, 1]

      future { 1 + 1 }.then_channel_push(ch1)
      result = (Concurrent::Promises.future { '%02d' } & ch1.select_op(ch2)).
          then { |format, (_channel, value)| format format, value }
      expect(result.value!).to eq '02'
    end
  end

  specify 'zip_futures_over' do
    expect(zip_futures_over([1, 2]) { |v| v.succ }.value!).to eq [2, 3]
  end
end

RSpec.describe 'Concurrent::ProcessingActor' do
  specify do
    actor = Concurrent::ProcessingActor.act do |the_actor|
      the_actor.receive.then do |message|
        # the actor ends with message
        message
      end
    end #

    actor.tell! :a_message
    expect(actor.termination.value!).to eq :a_message

    def count(actor, count)
      # the block passed to receive is called when the actor receives the message
      actor.receive.then do |number_or_command, answer|
        # number_or_command, answer = p a
        # p number_or_command, answer

        # code which is evaluated after the number is received
        case number_or_command
        when :done
          # this will become the result (final value) of the actor
          count
        when :count
          # reply the current count
          answer.fulfill count
          # continue running
          count(actor, count)
        when Integer
          # this will call count again to set up what to do on next message, based on new state `count + numer`
          count(actor, count + number_or_command)
        end
      end
      # evaluation of count ends immediately
      # code which is evaluated before the number is received, should be empty
    end

    counter = Concurrent::ProcessingActor.act { |a| count a, 0 }
    answer  = counter.tell!(2).ask_op { |a| [:count, a] }.value!
    expect(counter.tell!(3).tell!(:done).termination.value!).to eq 5
    expect(answer.value!).to eq 2

    add_once_actor = Concurrent::ProcessingActor.act do |the_actor|
      the_actor.receive.then do |a, b, reply|
        result = a + b
        reply.fulfill result
        # terminate with result value
        result
      end
    end

    expect(add_once_actor.ask_op { |a| [1, 2, a] }.value!.value!).to eq 3
    # expect(add_once_actor.ask_operation(%w(ab cd)).reason).to be_a_kind_of RuntimeError
    expect(add_once_actor.termination.value!).to eq 3

    def pair_adder(actor)
      (actor.receive & actor.receive).then do |(value1, answer1), (value2, answer2)|
        result = value1 + value2
        answer1.fulfill result if answer1
        answer2.fulfill result if answer2
        pair_adder actor
      end
    end

    pair_adder = Concurrent::ProcessingActor.act { |a| pair_adder a }
    pair_adder.ask_op { |a| [2, a] }
    answer = pair_adder.ask_op { |a| [3, a] }.value!
    expect(answer.value!).to eq 5
    expect((pair_adder.ask_op { |a| ['a', a] }.value! & pair_adder.ask_op { |a| ['b', a] }.value!).value!).to eq %w[ab ab]
    expect((pair_adder.ask_op { |a| ['a', a] }.value! | pair_adder.ask_op { |a| ['b', a] }.value!).value!).to eq 'ab'
  end
end