File: simple_http_broker.cpp

package info (click to toggle)
actor-framework 0.17.6-3.2
  • links: PTS
  • area: main
  • in suites: forky, sid
  • size: 9,008 kB
  • sloc: cpp: 77,684; sh: 674; python: 309; makefile: 13
file content (95 lines) | stat: -rw-r--r-- 2,131 bytes parent folder | download | duplicates (5)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
#include <iostream>
#include <chrono>

#include "caf/all.hpp"
#include "caf/io/all.hpp"

using std::cout;
using std::cerr;
using std::endl;

using namespace caf;
using namespace caf::io;

namespace {

constexpr const char http_ok[] = R"__(HTTP/1.1 200 OK
Content-Type: text/plain
Connection: keep-alive
Transfer-Encoding: chunked

d
Hi there! :)

0


)__";

template <size_t Size>
constexpr size_t cstr_size(const char (&)[Size]) {
  return Size;
}

behavior connection_worker(broker* self, connection_handle hdl) {
  self->configure_read(hdl, receive_policy::at_most(1024));
  return {
    [=](const new_data_msg& msg) {
      self->write(msg.handle, cstr_size(http_ok), http_ok);
      self->quit();
    },
    [=](const connection_closed_msg&) {
      self->quit();
    }
  };
}

behavior server(broker* self) {
  auto counter = std::make_shared<int>(0);
  self->set_down_handler([=](down_msg&) {
    ++*counter;
  });
  self->delayed_send(self, std::chrono::seconds(1), tick_atom_v);
  return {
    [=](const new_connection_msg& ncm) {
      auto worker = self->fork(connection_worker, ncm.handle);
      self->monitor(worker);
      self->link_to(worker);
    },
    [=](tick_atom) {
      aout(self) << "Finished " << *counter << " requests per second." << endl;
      *counter = 0;
      self->delayed_send(self, std::chrono::seconds(1), tick_atom_v);
    }
  };
}

class config : public actor_system_config {
public:
  uint16_t port = 0;

  config() {
    opt_group{custom_options_, "global"}
    .add(port, "port,p", "set port");
  }
};

void caf_main(actor_system& system, const config& cfg) {
  auto server_actor = system.middleman().spawn_server(server, cfg.port);
  if (!server_actor) {
    cerr << "*** cannot spawn server: " << to_string(server_actor.error())
         << endl;
    return;
  }
  cout << "*** listening on port " << cfg.port << endl;
  cout << "*** to quit the program, simply press <enter>" << endl;
  // wait for any input
  std::string dummy;
  std::getline(std::cin, dummy);
  // kill server
  anon_send_exit(*server_actor, exit_reason::user_shutdown);
}

} // namespace

CAF_MAIN(io::middleman)