File: remote_actor.cpp

package info (click to toggle)
actor-framework 0.18.7-1~exp1
  • links: PTS
  • area: main
  • in suites: experimental
  • size: 8,740 kB
  • sloc: cpp: 85,162; sh: 491; python: 187; makefile: 11
file content (129 lines) | stat: -rw-r--r-- 3,222 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
// This file is part of CAF, the C++ Actor Framework. See the file LICENSE in
// the main distribution directory for license terms and copyright or visit
// https://github.com/actor-framework/actor-framework/blob/master/LICENSE.

#define CAF_SUITE io.remote_actor

#include "io-test.hpp"

#include <algorithm>
#include <sstream>
#include <utility>
#include <vector>

#include "caf/all.hpp"
#include "caf/io/all.hpp"

using namespace caf;

namespace {

struct suite_state {
  int pings = 0;
  int pongs = 0;
  error linking_result;
  suite_state() = default;
};

using suite_state_ptr = std::shared_ptr<suite_state>;

behavior ping(event_based_actor* self, suite_state_ptr ssp) {
  return {
    [=](ok_atom, const actor& pong) {
      MESSAGE("received `ok_atom`");
      ++ssp->pings;
      self->send(pong, ping_atom_v);
      self->become([=](pong_atom) {
        MESSAGE("ping: received pong");
        self->send(pong, ping_atom_v);
        if (++ssp->pings == 10) {
          self->quit();
          MESSAGE("ping is done");
        }
      });
    },
  };
}

behavior pong(event_based_actor* self, suite_state_ptr ssp) {
  return {
    [=](ping_atom) -> pong_atom {
      MESSAGE("pong: received ping");
      if (++ssp->pongs == 10) {
        self->quit();
        MESSAGE("pong is done");
      }
      return pong_atom_v;
    },
  };
}

using fragile_mirror_actor = typed_actor<replies_to<int>::with<int>>;

fragile_mirror_actor::behavior_type
fragile_mirror(fragile_mirror_actor::pointer self) {
  return {
    [=](int i) {
      self->quit(exit_reason::user_shutdown);
      return i;
    },
  };
}

behavior linking_actor(event_based_actor* self,
                       const fragile_mirror_actor& buddy, suite_state_ptr ssp) {
  MESSAGE("link to mirror and send dummy message");
  self->send(buddy, 42);
  self->link_to(buddy);
  self->set_exit_handler([=](exit_msg& msg) {
    // Record exit reason for checking it later.
    ssp->linking_result = msg.reason;
    self->quit(std::move(msg.reason));
  });
  return {
    [](int i) { CHECK_EQ(i, 42); },
  };
}

struct fixture : point_to_point_fixture<> {
  fixture() {
    prepare_connection(mars, earth, "mars", 8080);
    ssp = std::make_shared<suite_state>();
  }

  suite_state_ptr ssp;
};

} // namespace

BEGIN_FIXTURE_SCOPE(fixture)

CAF_TEST(identity_semantics) {
  auto server = mars.sys.spawn(pong, ssp);
  auto port = mars.publish(server, 8080);
  CHECK_EQ(port, 8080u);
  auto same_server = earth.remote_actor("mars", 8080);
  CAF_REQUIRE_EQUAL(same_server, server);
  anon_send_exit(server, exit_reason::user_shutdown);
}

CAF_TEST(ping_pong) {
  auto port = mars.publish(mars.sys.spawn(pong, ssp), 8080);
  CHECK_EQ(port, 8080u);
  auto remote_pong = earth.remote_actor("mars", 8080);
  anon_send(earth.sys.spawn(ping, ssp), ok_atom_v, remote_pong);
  run();
  CHECK_EQ(ssp->pings, 10);
  CHECK_EQ(ssp->pongs, 10);
}

CAF_TEST(remote_link) {
  auto port = mars.publish(mars.sys.spawn(fragile_mirror), 8080);
  CHECK_EQ(port, 8080u);
  auto mirror = earth.remote_actor<fragile_mirror_actor>("mars", 8080);
  earth.sys.spawn(linking_actor, mirror, ssp);
  run();
  CHECK_EQ(ssp->linking_result, exit_reason::user_shutdown);
}

END_FIXTURE_SCOPE()