File: simple_broker.cpp

package info (click to toggle)
actor-framework 0.17.6-3.2
  • links: PTS
  • area: main
  • in suites: forky, sid
  • size: 9,008 kB
  • sloc: cpp: 77,684; sh: 674; python: 309; makefile: 13
file content (221 lines) | stat: -rw-r--r-- 6,762 bytes parent folder | download | duplicates (4)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
// This example program showcases how to manually manage socket I/O using a
// broker. Server and client exchange integers in a "ping-pong protocol".
//
// Minimal setup:
// - ./build/bin/broker -s 4242
// - ./build/bin/broker -c localhost 4242

#include "caf/config.hpp"

#ifdef CAF_WINDOWS
#  define _WIN32_WINNT 0x0600
#  include <winsock2.h>
#else
#  include <arpa/inet.h> // htonl
#endif

#include <cassert>
#include <cstdint>
#include <iostream>
#include <limits>
#include <memory>
#include <string>
#include <vector>

#include "caf/all.hpp"
#include "caf/io/all.hpp"

using std::cerr;
using std::cout;
using std::endl;

using namespace caf;
using namespace caf::io;

namespace {

// --(rst-attach-begin)--
// Utility function to print an exit message with custom name.
void print_on_exit(const actor& hdl, const std::string& name) {
  hdl->attach_functor([=](const error& reason) {
    cout << name << " exited: " << to_string(reason) << endl;
  });
}
// --(rst-attach-end)--

enum class op : uint8_t {
  ping,
  pong,
};

behavior ping(event_based_actor* self, size_t num_pings) {
  auto count = std::make_shared<size_t>(0);
  return {
    [=](ok_atom, const actor& pong) {
      self->send(pong, ping_atom_v, int32_t(1));
      self->become([=](pong_atom, int32_t value) -> result<ping_atom, int32_t> {
        if (++*count >= num_pings)
          self->quit();
        return {ping_atom_v, value + 1};
      });
    },
  };
}

behavior pong() {
  return {
    [](ping_atom, int32_t value) -> result<pong_atom, int32_t> {
      return {pong_atom_v, value};
    },
  };
}

// Utility function for sending an integer type.
template <class T>
void write_int(broker* self, connection_handle hdl, T value) {
  using unsigned_type = typename std::make_unsigned<T>::type;
  auto cpy = static_cast<T>(htonl(static_cast<unsigned_type>(value)));
  self->write(hdl, sizeof(T), &cpy);
  self->flush(hdl);
}

// Utility function for reading an ingeger from incoming data.
template <class T>
void read_int(const void* data, T& storage) {
  using unsigned_type = typename std::make_unsigned<T>::type;
  memcpy(&storage, data, sizeof(T));
  storage = static_cast<T>(ntohl(static_cast<unsigned_type>(storage)));
}

// Implementation of our broker.
behavior broker_impl(broker* self, connection_handle hdl, const actor& buddy) {
  // We assume io_fsm manages a broker with exactly one connection,
  // i.e., the connection ponted to by `hdl`.
  assert(self->num_connections() == 1);
  // Monitor buddy to quit broker if buddy is done.
  self->monitor(buddy);
  self->set_down_handler([=](down_msg& dm) {
    if (dm.source == buddy) {
      aout(self) << "our buddy is down" << endl;
      // Quit for same reason.
      self->quit(dm.reason);
    }
  });
  // Setup: we are exchanging only messages consisting of an atom
  // (as uint64_t) and an integer value (int32_t).
  self->configure_read(
    hdl, receive_policy::exactly(sizeof(uint64_t) + sizeof(int32_t)));
  // Our message handlers.
  return {
    [=](const connection_closed_msg& msg) {
      // Brokers can multiplex any number of connections, however
      // this example assumes io_fsm to manage a broker with
      // exactly one connection.
      if (msg.handle == hdl) {
        aout(self) << "connection closed" << endl;
        // force buddy to quit
        self->send_exit(buddy, exit_reason::remote_link_unreachable);
        self->quit(exit_reason::remote_link_unreachable);
      }
    },
    [=](ping_atom, int32_t i) {
      aout(self) << "send {ping, " << i << "}" << endl;
      write_int(self, hdl, static_cast<uint8_t>(op::ping));
      write_int(self, hdl, i);
    },
    [=](pong_atom, int32_t i) {
      aout(self) << "send {pong, " << i << "}" << endl;
      write_int(self, hdl, static_cast<uint8_t>(op::pong));
      write_int(self, hdl, i);
    },
    [=](const new_data_msg& msg) {
      // Read the operation value as uint8_t from buffer.
      uint8_t op_val;
      read_int(msg.buf.data(), op_val);
      // Read integer value from buffer, jumping to the correct
      // position via offset_data(...).
      int32_t ival;
      read_int(msg.buf.data() + sizeof(uint8_t), ival);
      // Show some output.
      aout(self) << "received {" << op_val << ", " << ival << "}" << endl;
      // Send composed message to our buddy.
      switch (static_cast<op>(op_val)) {
        case op::ping:
          self->send(buddy, ping_atom_v, ival);
          break;
        case op::pong:
          self->send(buddy, pong_atom_v, ival);
          break;
        default:
          aout(self) << "invalid value for op_val, stop" << endl;
          self->quit(sec::invalid_argument);
      }
    },
  };
}

behavior server(broker* self, const actor& buddy) {
  aout(self) << "server is running" << endl;
  return {
    [=](const new_connection_msg& msg) {
      aout(self) << "server accepted new connection" << endl;
      // By forking into a new broker, we are no longer
      // responsible for the connection.
      auto impl = self->fork(broker_impl, msg.handle, buddy);
      print_on_exit(impl, "broker_impl");
      aout(self) << "quit server (only accept 1 connection)" << endl;
      self->quit();
    },
  };
}

class config : public actor_system_config {
public:
  uint16_t port = 0;
  std::string host = "localhost";
  bool server_mode = false;

  config() {
    opt_group{custom_options_, "global"}
      .add(port, "port,p", "set port")
      .add(host, "host,H", "set host (ignored in server mode)")
      .add(server_mode, "server-mode,s", "enable server mode");
  }
};

void run_server(actor_system& system, const config& cfg) {
  cout << "run in server mode" << endl;
  auto pong_actor = system.spawn(pong);
  auto server_actor = system.middleman().spawn_server(server, cfg.port,
                                                      pong_actor);
  if (!server_actor) {
    std::cerr << "failed to spawn server: " << to_string(server_actor.error())
              << endl;
    return;
  }
  print_on_exit(*server_actor, "server");
  print_on_exit(pong_actor, "pong");
}

void run_client(actor_system& system, const config& cfg) {
  auto ping_actor = system.spawn(ping, size_t{20});
  auto io_actor = system.middleman().spawn_client(broker_impl, cfg.host,
                                                  cfg.port, ping_actor);
  if (!io_actor) {
    std::cerr << "failed to spawn client: " << to_string(io_actor.error())
              << endl;
    return;
  }
  print_on_exit(ping_actor, "ping");
  print_on_exit(*io_actor, "protobuf_io");
  send_as(*io_actor, ping_actor, ok_atom_v, *io_actor);
}

void caf_main(actor_system& system, const config& cfg) {
  auto f = cfg.server_mode ? run_server : run_client;
  f(system, cfg);
}

} // namespace

CAF_MAIN(io::middleman)