if Concurrent.ruby_version :>=, 2, 1, 0
  RSpec.describe 'Concurrent' do
    describe 'ErlangActor', edge: true do
      # TODO (pitr-ch 06-Feb-2019): include constants instead
      ANY      ||= Concurrent::ErlangActor::ANY
      TIMEOUT  ||= Concurrent::ErlangActor::TIMEOUT
      And      ||= Concurrent::ErlangActor::And
      identity = -> v { v }

      shared_examples 'erlang actor' do

        specify "run to termination" do
          expect(Concurrent::ErlangActor.spawn(type: type) do
            :v
          end.terminated.value!).to eq :v
        end

        specify "run to termination with arguments" do
          expect(Concurrent::ErlangActor.
              spawn(1, 2, type: type) { |a, b| a + b }.terminated.value!).
              to eq 3
        end

        specify '#receive' do
          succ = -> v { v.succ }

          [[[:v], -> { receive }, :v],
           [[:v], -> { receive on(ANY, &identity) }, :v],
           [[:v, 1], -> { receive Numeric }, 1],
           [[:v, 1], -> { receive(Numeric, &succ) }, 2],

           [[:v], -> { receive Numeric, timeout: 0 }, nil],
           [[:v], -> { receive(Numeric, timeout: 0, &succ) }, nil],
           [[:v], -> { receive Numeric, timeout: 0, timeout_value: :timeout }, :timeout],
           [[:v], -> { receive(Numeric, timeout: 0, timeout_value: :timeout, &succ) }, :timeout],

           [[:v, 1], -> { receive Numeric, timeout: 1 }, 1],
           [[:v, 1], -> { receive(Numeric, timeout: 1, &succ) }, 2],
           [[:v, 1], -> { receive Numeric, timeout: 1, timeout_value: :timeout }, 1],
           [[:v, 1], -> { receive(Numeric, timeout: 1, timeout_value: :timeout, &succ) }, 2],

           [[:v], -> { receive on(Numeric, &identity), on(TIMEOUT, nil), timeout: 0 }, nil],
           [[:v], -> { receive on(Numeric, &succ), on(TIMEOUT, nil), timeout: 0 }, nil],
           [[:v], -> { receive on(Numeric, &identity), on(TIMEOUT, :timeout), timeout: 0 }, :timeout],
           [[:v], -> { receive on(Numeric, &succ), on(TIMEOUT, :timeout), timeout: 0 }, :timeout],

           [[:v, 1], -> { receive on(Numeric, &identity), on(TIMEOUT, nil), timeout: 1 }, 1],
           [[:v, 1], -> { receive on(Numeric, &succ), on(TIMEOUT, nil), timeout: 1 }, 2],
           [[:v, 1], -> { receive on(Numeric, &identity), on(TIMEOUT, :timeout), timeout: 1 }, 1],
           [[:v, 1], -> { receive on(Numeric, &succ), on(TIMEOUT, :timeout), timeout: 1 }, 2],
          ].each_with_index do |(messages, body, result), i|
            a = Concurrent::ErlangActor.spawn(type: type, &body)
            messages.each { |m| a.tell m }
            expect(a.terminated.value!).to eq(result), "body: #{body}"
          end
        end

        specify 'pid has name' do
          actor = Concurrent::ErlangActor.spawn(type: type, name: 'test') {}
          expect(actor.to_s).to match(/test/)
          expect(actor.inspect).to match(/test/)
        end

        specify "receives message" do
          actor = Concurrent::ErlangActor.spawn(type: type,
                                                &{ on_thread: -> { receive },
                                                   on_pool:   -> { receive on(ANY, &identity) } }.fetch(type))
          actor.tell :v
          expect(actor.terminated.value!).to eq :v
        end

        specify "receives message with matchers" do
          body  = { on_thread:
                        -> do
                          [receive(on(Symbol, &identity)),
                           receive(on(Numeric, &:succ)),
                           receive(on(Numeric, :got_it), timeout: 0, timeout_value: :nothing)]
                        end,
                    on_pool:
                        -> do
                          @arr = []
                          receive(on(Symbol) do |v1|
                            @arr.push v1
                            receive(on(Numeric) do |v2|
                              @arr << v2.succ
                              receive(on(Numeric, :got_it), on(TIMEOUT) { @arr << :nothing; @arr }, timeout: 0)
                            end)
                          end)
                        end }
          actor = Concurrent::ErlangActor.spawn(type: type, &body.fetch(type))
          actor.tell 'junk'
          actor.tell 1
          actor.tell :v
          expect(actor.terminated.value!).to eq [:v, 2, :nothing]
        end

        describe "monitoring" do
          specify "(de)monitor" do
            body_receive = { on_thread:
                                 -> { receive },
                             on_pool:
                                 -> { receive { |v| v } } }

            body = { on_thread:
                         -> do
                           actor     = receive
                           reference = monitor actor
                           monitored = monitoring? reference
                           demonitor reference
                           result = [monitored, monitoring?(reference)]
                           actor.tell :finish
                           result
                         end,
                     on_pool:
                         -> do
                           receive do |actor|
                             reference = monitor actor
                             monitored = monitoring? reference
                             demonitor reference
                             result = [monitored, monitoring?(reference)]
                             actor.tell :finish
                             result
                           end
                         end }
            a1   = Concurrent::ErlangActor.spawn(type: type, &body.fetch(type))
            a2   = Concurrent::ErlangActor.spawn(type: type, &body_receive.fetch(type))
            a1.tell a2
            expect(a1.terminated.value!).to eq [true, false]
            expect(a2.terminated.value!).to eq :finish
          end

          specify "demonitor" do
            body = { on_thread:
                         -> do
                           actor     = receive
                           reference = monitor actor
                           monitored = monitoring? reference
                           actor.tell :done
                           actor.terminated.wait
                           demonitor = demonitor reference, :flush, :info
                           [monitored, monitoring?(reference), demonitor, receive(timeout: 0)]
                         end,
                     on_pool:
                         -> do
                           receive do |actor|
                             reference = monitor actor
                             monitored = monitoring? reference
                             actor.tell :done
                             actor.terminated.wait
                             demonitor = demonitor reference, :flush, :info
                             results   = [monitored, monitoring?(reference), demonitor]
                             receive(on(ANY) { |v| [*results, v] },
                                     on(TIMEOUT) { [*results, nil] },
                                     timeout: 0)
                           end
                         end }

            a1   = Concurrent::ErlangActor.spawn(type: type, &body.fetch(type))
            body = { on_thread: -> { receive },
                     on_pool:   -> { receive(&identity) } }
            a2   = Concurrent::ErlangActor.spawn(type: type, &body.fetch(type))
            a1.tell a2

            a1.terminated.wait
            expect(a1.terminated.value!).to eq [true, false, false, nil]
            expect(a2.terminated.value!).to eq :done
          end

          specify "demonitor should leave the down message in the inbox if it's already there" do
            body = { on_thread:
                         -> do
                           actor     = receive
                           reference = monitor actor
                           monitored = monitoring? reference
                           actor.tell :done
                           actor.terminated.wait
                           demonitor = demonitor reference, :info
                           [reference, monitored, monitoring?(reference), demonitor, receive(timeout: 0)]
                         end,
                     on_pool:
                         -> do
                           receive do |actor|
                             reference = monitor actor
                             monitored = monitoring? reference
                             actor.tell :done
                             actor.terminated.wait
                             demonitor = demonitor reference, :info
                             results   = [reference, monitored, monitoring?(reference), demonitor]
                             receive(on(ANY) { |v| [*results, v] },
                                     on(TIMEOUT) { [*results, nil] },
                                     timeout: 0)
                           end
                         end }

            a1   = Concurrent::ErlangActor.spawn(type: type, &body.fetch(type))
            body = { on_thread: -> { receive },
                     on_pool:   -> { receive(&identity) } }
            a2   = Concurrent::ErlangActor.spawn(type: type, &body.fetch(type))
            a1.tell a2

            reference, monitored, monitoring, demonitor, message = a1.terminated.value!
            expect(monitored).to eq true
            expect(monitoring).to eq false
            expect(demonitor).to eq false
            expect(message).to eq Concurrent::ErlangActor::Down.new(a2, reference, :normal)
            expect(a2.terminated.value!).to eq :done
          end

          specify "notifications 1" do
            body = { on_thread:
                         -> do
                           b   = spawn { [:done, receive] }
                           ref = monitor b
                           b.tell 42
                           [b, ref, receive]
                         end,
                     on_pool:
                         -> do
                           b   = spawn { receive on(ANY) { |v| [:done, v] } }
                           ref = monitor b
                           b.tell 42
                           receive on(ANY) { |v| [b, ref, v] }
                         end }

            a = Concurrent::ErlangActor.spawn(type: type, &body.fetch(type))

            b, ref, down = a.terminated.value!
            expect(down).to eq Concurrent::ErlangActor::Down.new(b, ref, :normal)
            expect(b.terminated.value!).to eq [:done, 42]
          end

          specify "notifications 2" do
            body = { on_thread:
                         -> do
                           b = spawn { :done }
                           b.terminated.wait
                           ref = monitor b
                           [b, ref, receive(timeout: 1, timeout_value: :timeout)]
                         end,
                     on_pool:
                         -> do
                           b = spawn { :done }
                           b.terminated.wait
                           ref = monitor b
                           receive(timeout: 1) { |v| [b, ref, v] }
                         end }

            a = Concurrent::ErlangActor.spawn(type: type, &body.fetch(type))

            b, ref, down = a.terminated.value!
            expect(down).to eq Concurrent::ErlangActor::Down.new(b, ref, Concurrent::ErlangActor::NoActor.new(b))
            expect(b.terminated.value!).to eq :done
          end

          # FIXME (pitr-ch 20-Jan-2019): test concurrent exit and monitor(), same for link
        end

        describe 'linking' do
          body_receive_test_linked = { on_thread:
                                           -> { linked?(receive) },
                                       on_pool:
                                           -> { receive { |a| linked? a } } }

          specify 'links' do
            body1 = { on_thread:
                          -> do
                            actor = receive
                            link actor
                            linked = linked? actor
                            actor.tell pid
                            linked
                          end,
                      on_pool:
                          -> do
                            receive do |actor|
                              link actor
                              linked = linked? actor
                              actor.tell pid
                              linked
                            end
                          end }

            a1 = Concurrent::ErlangActor.spawn(type: type, &body1.fetch(type))
            a2 = Concurrent::ErlangActor.spawn(type: type, &body_receive_test_linked.fetch(type))

            a1.tell a2
            expect(a1.terminated.value!).to be_truthy
            expect(a2.terminated.value!).to be_truthy
          end

          specify 'unlinks' do
            body1 = { on_thread:
                          -> do
                            actor = receive
                            link actor
                            unlink actor
                            linked = linked? actor
                            actor.tell pid
                            linked
                          end,
                      on_pool:
                          -> do
                            receive do |actor|
                              link actor
                              unlink actor
                              linked = linked? actor
                              actor.tell pid
                              linked
                            end
                          end }

            a1 = Concurrent::ErlangActor.spawn(type: type, &body1.fetch(type))
            a2 = Concurrent::ErlangActor.spawn(type: type, &body_receive_test_linked.fetch(type))
            a1.tell a2
            expect(a1.terminated.value!).to be_falsey
            expect(a2.terminated.value!).to be_falsey
          end

          specify 'link dead' do
            a = Concurrent::ErlangActor.spawn(type: type) do
              b = spawn { :done }
              b.terminated.wait
              link b
            end
            expect { a.terminated.value! }.to raise_error Concurrent::ErlangActor::NoActor
          end

          specify 'link dead when trapping' do
            body1 = { on_thread:
                          -> do
                            b = spawn { :done }
                            b.terminated.wait
                            sleep 0.1
                            trap
                            link b
                            [b, receive]
                          end,
                      on_pool:
                          -> do
                            b = spawn { :done }
                            b.terminated.wait
                            sleep 0.1
                            trap
                            link b
                            receive { |v| [b, v] }
                          end }

            a = Concurrent::ErlangActor.spawn(type: type, &body1.fetch(type))

            b, captured = a.terminated.value!
            expect(captured).to eq Concurrent::ErlangActor::Terminated.new(b, Concurrent::ErlangActor::NoActor.new(b))
          end


          describe 'exit/1 when linked' do
            # https://learnyousomeerlang.com/errors-and-processes#links
            specify 1 do
              body = { on_thread:
                           -> do
                             b = spawn(link: true) { :ok }
                             [receive(timeout: 0.01), b]
                           end,
                       on_pool:
                           -> do
                             b = spawn(link: true) { :ok }
                             receive(on(ANY) { |v| [v, b] },
                                     on(TIMEOUT) { |v| [nil, b] },
                                     timeout: 0.01)
                           end }

              a = Concurrent::ErlangActor.spawn(type: type, &body.fetch(type))

              message, b = a.terminated.value!
              expect(message).to eq nil
              expect(b.terminated.value!).to eq :ok
            end

            specify 2 do
              body = { on_thread:
                           -> do
                             b = spawn(link: true) { :ok }
                             trap
                             [receive(timeout: 1), b]
                           end,
                       on_pool:
                           -> do
                             b = spawn(link: true) { :ok }
                             trap
                             receive(on(ANY) { |v| [v, b] },
                                     on(TIMEOUT) { |v| [nil, b] },
                                     timeout: 1)
                           end }

              a = Concurrent::ErlangActor.spawn(type: type, &body.fetch(type))

              message, b = a.terminated.value!
              expect(message).to eq Concurrent::ErlangActor::Terminated.new(b, :normal)
              expect(b.terminated.value!).to eq :ok
            end

            specify 3 do
              body = { on_thread:
                           -> do
                             spawn(link: true) { terminate :boom }
                             receive(timeout: 1)
                           end,
                       on_pool:
                           -> do
                             spawn(link: true) { terminate :boom }
                             receive(on(ANY) { |v| [v, b] },
                                     on(TIMEOUT) { |v| [nil, b] },
                                     timeout: 1)
                           end }

              a = Concurrent::ErlangActor.spawn(type: type, &body.fetch(type))

              expect(a.terminated.reason).to eq :boom
            end

            specify 4 do
              body = { on_thread:
                           -> do
                             b = spawn(link: true) { terminate :boom }
                             trap
                             [receive(timeout: 1), b]
                           end,
                       on_pool:
                           -> do
                             b = spawn(link: true) { terminate :boom }
                             trap
                             receive(on(ANY) { |v| [v, b] },
                                     on(TIMEOUT) { |v| [nil, b] },
                                     timeout: 1)
                           end }

              a = Concurrent::ErlangActor.spawn(type: type, &body.fetch(type))

              trapped_exit, b = a.terminated.value!
              expect(trapped_exit).to eq Concurrent::ErlangActor::Terminated.new(b, :boom)
              expect(b.terminated.reason).to eq :boom
            end

            specify 5 do
              body = { on_thread:
                           -> do
                             b = spawn(link: true) { terminate :normal, value: :ok }
                             [receive(timeout: 0.01), b]
                           end,
                       on_pool:
                           -> do
                             b = spawn(link: true) { terminate :normal, value: :ok }
                             receive(on(ANY) { |v| [v, b] },
                                     on(TIMEOUT) { |v| [nil, b] },
                                     timeout: 0.01)
                           end }

              a = Concurrent::ErlangActor.spawn(type: type, &body.fetch(type))

              message, b = a.terminated.value!
              expect(message).to eq nil
              expect(b.terminated.value!).to eq :ok
            end

            specify 6 do
              body = { on_thread:
                           -> do
                             b = spawn(link: true) { terminate :normal, value: :ok }
                             trap
                             [receive(timeout: 1), b]
                           end,
                       on_pool:
                           -> do
                             b = spawn(link: true) { terminate :normal, value: :ok }
                             trap
                             receive(on(ANY) { |v| [v, b] },
                                     on(TIMEOUT) { |v| [nil, b] },
                                     timeout: 1)
                           end }

              a = Concurrent::ErlangActor.spawn(type: type, &body.fetch(type))

              message, b = a.terminated.value!
              expect(message).to eq Concurrent::ErlangActor::Terminated.new(b, :normal)
              expect(b.terminated.value!).to eq :ok
            end

            specify 7 do
              body = { on_thread:
                           -> do
                             spawn(link: true) { raise 'err' }
                             receive(timeout: 1)
                           end,
                       on_pool:
                           -> do
                             spawn(link: true) { raise 'err' }
                             receive(timeout: 1) { |v| v }
                           end }

              a = Concurrent::ErlangActor.spawn(type: type, &body.fetch(type))

              expect { a.terminated.value! }.to raise_error(RuntimeError, 'err')
            end

            specify 8 do
              body = { on_thread:
                           -> do
                             b = spawn(link: true) { raise 'err' }
                             trap
                             [receive(timeout: 1), b]
                           end,
                       on_pool:
                           -> do
                             b = spawn(link: true) { raise 'err' }
                             trap
                             receive(on(ANY) { |v| [v, b] },
                                     on(TIMEOUT) { |v| [nil, b] },
                                     timeout: 1)
                           end }

              a = Concurrent::ErlangActor.spawn(type: type, &body.fetch(type))

              trapped_exit, b = a.terminated.value!
              expect(trapped_exit).to be_a Concurrent::ErlangActor::Terminated
              expect(trapped_exit.from).to eq b
              expect(trapped_exit.reason).to eq b.terminated.reason
              expect(trapped_exit.reason).to be_a RuntimeError
              expect(trapped_exit.reason.message).to eq 'err'
            end

            specify 9 do
              body = { on_thread:
                           -> do
                             b = spawn(link: true) { throw :uncaught }
                             trap
                             [receive(timeout: 1), b]
                           end,
                       on_pool:
                           -> do
                             b = spawn(link: true) { throw :uncaught }
                             trap
                             receive(on(ANY) { |v| [v, b] },
                                     on(TIMEOUT) { |v| [nil, b] },
                                     timeout: 1)
                           end }

              a = Concurrent::ErlangActor.spawn(type: type, &body.fetch(type))

              trapped_exit, b = a.terminated.value!
              expect(trapped_exit).to be_a Concurrent::ErlangActor::Terminated
              expect(trapped_exit.from).to eq b
              expect(trapped_exit.reason).to eq b.terminated.reason
              expect(trapped_exit.reason).to be_a ArgumentError
              expect(trapped_exit.reason.message).to match(/uncaught throw :uncaught/)
            end
          end

          describe 'exit/2 when linked' do
            # https://learnyousomeerlang.com/errors-and-processes#links
            specify 1 do
              body = { on_thread:
                           -> do
                             terminate pid, :normal # sends the signal to mailbox
                             # TODO (pitr-ch 17-Jan-2019): does erlang require receive to process signals?
                             receive(timeout: 0.01)
                             :continued
                           end,
                       on_pool:
                           -> do
                             terminate pid, :normal # sends the signal to mailbox
                             receive(on(ANY, :continued),
                                     on(TIMEOUT, :timeout),
                                     timeout: 0.01)
                           end }

              a = Concurrent::ErlangActor.spawn(type: type, &body.fetch(type))

              expect(a.terminated.value!).to eq nil
            end

            specify 2 do
              body = { on_thread:
                           -> do
                             terminate pid, :normal
                             trap
                             receive(timeout: 0)
                           end,
                       on_pool:
                           -> do
                             terminate pid, :normal
                             trap
                             receive(on(ANY, &identity), on(TIMEOUT, nil), timeout: 0)
                           end }

              a = Concurrent::ErlangActor.spawn(type: type, &body.fetch(type))

              captured_exit = a.terminated.value!
              expect(captured_exit).to eq Concurrent::ErlangActor::Terminated.new(a, :normal)
            end

            specify 3 do
              body = { on_thread:
                           -> do
                             b = spawn(link: true) { receive timeout: 0.01, timeout_value: :timeout }
                             terminate b, :normal
                             b
                           end,
                       on_pool:
                           -> do
                             b = spawn(link: true) do
                               receive(on(ANY, :not_happening),
                                       on(TIMEOUT, :timeout),
                                       timeout: 0.01)
                             end

                             terminate b, :normal
                             b
                           end }
              a    = Concurrent::ErlangActor.spawn(type: type, &body.fetch(type))

              b = a.terminated.value!
              expect(b.terminated.value!).to eq :timeout
            end

            specify 4 do
              body = { on_thread:
                           -> do
                             b = spawn(link: true) { trap; receive timeout: 1, timeout_value: :timeout }
                             terminate b, :normal
                             b
                           end,
                       on_pool:
                           -> do
                             b = spawn(link: true) do
                               trap
                               receive(on(ANY, &identity),
                                       on(TIMEOUT, :timeout),
                                       timeout: 1)
                             end

                             terminate b, :normal
                             b
                           end }
              a    = Concurrent::ErlangActor.spawn(type: type, &body.fetch(type))

              b = a.terminated.value!
              expect(b.terminated.value!).to eq Concurrent::ErlangActor::Terminated.new(a, :normal)
            end

            specify 5 do
              body = { on_thread:
                           -> do
                             b = spawn(link: true) { receive timeout: 0.01; terminate :continued }
                             terminate b, :normal
                             trap
                             [b, receive(timeout: 1)]
                           end,
                       on_pool:
                           -> do
                             b = spawn(link: true) do
                               receive(on(ANY, :not_happening),
                                       on(TIMEOUT) { terminate :continued },
                                       timeout: 0.01)
                             end

                             terminate b, :normal
                             trap
                             receive(on(ANY) { |v| [b, v] }, on(TIMEOUT, :timeout), timeout: 1)
                           end }

              a = Concurrent::ErlangActor.spawn(type: type, &body.fetch(type))

              b, captured = a.terminated.value!
              expect(b.terminated.reason).to eq :continued
              # normal is never send from b to a back
              expect(captured).to eq Concurrent::ErlangActor::Terminated.new(b, :continued)
            end

            specify 6 do
              body = { on_thread:
                           -> do
                             b = spawn(link: true) { receive timeout: 1; :done }
                             terminate b, :remote_err
                             receive timeout: 1
                           end,
                       on_pool:
                           -> do
                             b = spawn(link: true) { receive(on(ANY, :done), on(TIMEOUT, :timeout), timeout: 1) }
                             terminate b, :remote_err
                             receive(on(ANY) { |v| [b, v] },
                                     on(TIMEOUT, :timeout),
                                     timeout: 1)
                           end }

              a = Concurrent::ErlangActor.spawn(type: type, &body.fetch(type))
              a.terminated.wait
              expect(a.terminated.reason).to eq :remote_err
            end

            specify 7 do
              body = { on_thread:
                           -> do
                             b = spawn(link: true) { receive timeout: 1; :done }
                             terminate b, :remote_err
                             trap
                             [b, receive(timeout: 1)]
                           end,
                       on_pool:
                           -> do
                             b = spawn(link: true) { receive(on(ANY, :done), on(TIMEOUT, :timeout), timeout: 1) }
                             terminate b, :remote_err
                             trap
                             receive(on(ANY) { |v| [b, v] },
                                     on(TIMEOUT, :timeout),
                                     timeout: 1)
                           end }

              a = Concurrent::ErlangActor.spawn(type: type, &body.fetch(type))

              b, captured = a.terminated.value!
              expect(b.terminated.reason).to eq :remote_err
              expect(captured.reason).to eq :remote_err
            end

            specify 8 do
              body = { on_thread:
                           -> do
                             b = spawn(link: true) { receive timeout: 1; :done }
                             terminate b, :kill
                             receive timeout: 1
                           end,
                       on_pool:
                           -> do
                             b = spawn(link: true) { receive(on(ANY, :done), on(TIMEOUT, :done), timeout: 1) }
                             terminate b, :kill
                             receive(on(ANY, &identity), on(TIMEOUT, :timeout), timeout: 1)
                           end }

              a = Concurrent::ErlangActor.spawn(type: type, &body.fetch(type))

              expect(a.terminated.reason).to eq :killed
            end

            specify 9 do
              body = { on_thread:
                           -> do
                             b = spawn(link: true) { receive timeout: 1; :done }
                             terminate b, :kill
                             trap
                             [b, receive(timeout: 1)]
                           end,
                       on_pool:
                           -> do
                             b = spawn(link: true) { receive(on(ANY, :done), on(TIMEOUT, :done), timeout: 1) }
                             terminate b, :kill
                             trap
                             receive(on(ANY) { |v| [b, v] }, on(TIMEOUT, :timeout), timeout: 1)
                           end }

              a           = Concurrent::ErlangActor.spawn(type: type, &body.fetch(type))
              b, captured = a.terminated.value!
              expect(b.terminated.reason).to eq :killed
              expect(captured.reason).to eq :killed
            end

            specify 10 do
              body = { on_thread:
                           -> do
                             terminate pid, :kill
                             receive timeout: 0
                           end,
                       on_pool:
                           -> do
                             terminate pid, :kill
                             receive(on(ANY, :continued), on(TIMEOUT, :timeout), timeout: 0)
                           end }

              a = Concurrent::ErlangActor.spawn(type: type, &body.fetch(type))

              expect(a.terminated.reason).to eq :killed
            end

            specify 11 do
              body = { on_thread:
                           -> do
                             terminate pid, :kill
                             trap
                             receive timeout: 0
                           end,
                       on_pool:
                           -> do
                             terminate pid, :kill
                             trap
                             receive(on(ANY, &identity), on(TIMEOUT, :timeout), timeout: 0)
                           end }

              a = Concurrent::ErlangActor.spawn(type: type, &body.fetch(type))

              expect(a.terminated.reason).to eq :killed
            end

            # explained in
            # http://erlang.org/pipermail/erlang-questions/2009-October/047241.html

            specify 12 do
              body = { on_thread:
                           -> do
                             spawn(link: true) { terminate :kill }
                             receive timeout: 1
                           end,
                       on_pool:
                           -> do
                             spawn(link: true) { terminate :kill }
                             receive(on(ANY, :continued),
                                     on(TIMEOUT, :timeout),
                                     timeout: 1)
                           end }

              a = Concurrent::ErlangActor.spawn(type: type, &body.fetch(type))

              expect(a.terminated.reason).to eq :kill
            end

            specify 13 do
              body = { on_thread:
                           -> do
                             b = spawn(link: true) { terminate :kill }
                             trap
                             [b, receive(timeout: 1)]
                           end,
                       on_pool:
                           -> do
                             b = spawn(link: true) { terminate :kill }
                             trap
                             receive(on(ANY) { |v| [b, v] },
                                     on(TIMEOUT, :timeout),
                                     timeout: 1)
                           end }

              a = Concurrent::ErlangActor.spawn(type: type, &body.fetch(type))

              b, captured = a.terminated.value!

              expect(b.terminated.reason).to eq :kill
              expect(captured).to eq Concurrent::ErlangActor::Terminated.new(b, :kill)
            end

          end
        end

        specify 'spawn(link: true)' do
          a = Concurrent::ErlangActor.spawn(type: type) do
            b = spawn(link: true) { :v }
            linked? b
          end
          expect(a.terminated.value!).to be_truthy

          a = Concurrent::ErlangActor.spawn(type: type) do
            b = spawn { :v }
            linked? b
          end
          expect(a.terminated.value!).to be_falsey
        end

        specify 'termination' do
          a = Concurrent::ErlangActor.spawn(type: type) { :v }
          expect(a.terminated.value!).to eq :v

          a = Concurrent::ErlangActor.spawn(type: type) { raise 'err' }
          expect { a.terminated.value! }.to raise_error(RuntimeError, 'err')

          a = Concurrent::ErlangActor.spawn(type: type) { terminate :normal, value: :val }
          expect(a.terminated.value!).to eq :val

          a = Concurrent::ErlangActor.spawn(type: type) { terminate :er }
          expect(a.terminated.reason).to eq :er
        end

        describe 'asking' do
          specify "replies" do
            body = { on_thread: -> { reply receive },
                     on_pool:   -> { receive { |v| reply v } } }
            a    = Concurrent::ErlangActor.spawn(type: type, &body.fetch(type))
            expect(a.ask(:v)).to eq :v

            body = { on_thread: -> { v = receive; reply v; reply v; },
                     on_pool:   -> { receive { |v| reply v; reply v } } }
            a    = Concurrent::ErlangActor.spawn(type: type, &body.fetch(type))
            expect(a.ask(:v)).to eq :v
            expect(a.terminated.value!).to be_falsey

            body = { on_thread:
                         -> do
                           v = receive
                           reply v
                           reply_resolution true, v.to_s, nil
                         end,
                     on_pool:
                         -> do
                           receive do |v|
                             reply v
                             reply_resolution true, v.to_s, nil
                           end
                         end }
            a    = Concurrent::ErlangActor.spawn(type: type, &body.fetch(type))
            expect(a.ask(:v)).to eq :v
            expect(a.terminated.value!).to be_falsey

            body = { on_thread: -> { reply_resolution false, nil, receive },
                     on_pool:   -> { receive { |v| reply_resolution false, nil, v } } }
            a    = Concurrent::ErlangActor.spawn(type: type, &body.fetch(type))
            expect { a.ask(:err) }.to raise_error StandardError, 'err'

            body = { on_thread: -> { reply_resolution false, nil, receive },
                     on_pool:   -> { receive { |v| reply_resolution false, nil, v } } }
            a    = Concurrent::ErlangActor.spawn(type: type, &body.fetch(type))
            expect(a.ask_op(:err).reason).to eq :err
          end

          specify "timing out" do
            count_down = Concurrent::CountDownLatch.new
            body = { on_thread: -> { m = receive; count_down.wait; reply m },
                     on_pool:   -> { receive { |m| count_down.wait; reply m } } }
            a    = Concurrent::ErlangActor.spawn(type: type, &body.fetch(type))
            expect(a.ask(:err, 0, 42)).to eq 42
            count_down.count_down
            expect(a.terminated.value!).to eq false

            body = { on_thread: -> { reply receive },
                     on_pool:   -> { receive { |m| reply m } } }
            b    = Concurrent::ErlangActor.spawn(type: type, &body.fetch(type))
            expect(b.ask(:v, 1, 42)).to eq :v
            expect(b.terminated.value!).to eq true
          end

          specify "rejects on no reply" do
            body = { on_thread: -> { receive; receive },
                     on_pool:   -> { receive { receive {} } } }

            a = Concurrent::ErlangActor.spawn(type: type, &body.fetch(type))
            expect(a.ask_op(:v).reason).to eq Concurrent::ErlangActor::NoReply
            expect { raise a.ask_op(:v).wait }.to raise_error Concurrent::ErlangActor::NoActor
            expect { raise a.ask(:v) }.to raise_error Concurrent::ErlangActor::NoActor
          end

        end
      end

      describe 'on thread' do
        let(:type) { :on_thread }
        it_behaves_like 'erlang actor'

        specify do
          actor = Concurrent::ErlangActor.spawn(type: :on_thread) do
            Thread.abort_on_exception = true
            while true
              receive on(Symbol) { |s| reply s.to_s },
                      on(And[Numeric, -> v { v >= 0 }]) { |v| reply v.succ },
                      # put last works as else
                      on(ANY) { |v| reply :bad_message; terminate [:bad_message, v] }
            end
          end
          expect(actor.ask(1)).to eq 2
          expect(actor.ask(:value)).to eq 'value'
          expect(actor.ask(-1)).to eq :bad_message
          expect { actor.ask 'junk' }.to raise_error Concurrent::ErlangActor::NoActor
          expect(actor.terminated.reason).to eq [:bad_message, -1]
        end
      end

      describe 'on pool' do
        let(:type) { :on_pool }
        it_behaves_like 'erlang actor'

        include Concurrent::ErlangActor::EnvironmentConstants

        specify "receives message repeatedly with keep" do
          actor = Concurrent::ErlangActor.spawn(type: :on_pool) do
            receive on(ANY) { |v| v == :done ? terminate(:normal, value: 42) : reply(v) },
                    keep: true
          end
          expect(actor.ask(1)).to eq 1
          expect(actor.ask(2)).to eq 2
          actor.tell :done
          expect(actor.terminated.value!).to eq 42
        end

        specify "class defined" do
          definition_module = Module.new do
            def start
              @sum = 0
              receive on(Numeric, &method(:count)),
                      on(:done, &method(:stop)),
                      on(TIMEOUT, &method(:fail)),
                      keep:    true,
                      timeout: 0.1
            end

            def count(message)
              reply @sum += message
            end

            def stop(_message)
              terminate :normal, value: @sum
            end

            def fail(_message)
              terminate :timeout
            end
          end
          definition_class  = Class.new Concurrent::ErlangActor::Environment do
            include definition_module
          end

          actor = Concurrent::ErlangActor.spawn(type: :on_pool, environment: definition_class) { start }
          actor.tell 1
          expect(actor.ask(2)).to eq 3
          actor.tell :done
          expect(actor.terminated.value!).to eq 3

          actor = Concurrent::ErlangActor.spawn(type: :on_pool, environment: definition_module)
          actor.tell 1
          expect(actor.ask(2)).to eq 3
          expect(actor.terminated.reason).to eq :timeout
        end

      end
    end
  end
end
