File: curl_fuse.cpp

package info (click to toggle)
actor-framework 0.17.6-3.2
  • links: PTS
  • area: main
  • in suites: forky, sid
  • size: 9,008 kB
  • sloc: cpp: 77,684; sh: 674; python: 309; makefile: 13
file content (366 lines) | stat: -rw-r--r-- 13,098 bytes parent folder | download | duplicates (4)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
/******************************************************************************
 * This example                                                               *
 * - emulates a client launching a request every 10-300ms                     *
 * - uses a CURL-backend consisting of a master and 10 workers                *
 * - runs until it is shut down by a CTRL+C signal                            *
 *                                                                            *
 *                                                                            *
 * Schematic view:                                                            *
 *                                                                            *
 *    client      |    client_job    |    curl_master    |    curl_worker     *
 *          /--------------|*|-------------\       /-------------|*|          *
 *         /---------------|*|--------------\     /                           *
 *        /----------------|*|---------------\   /                            *
 *     |*| ----------------|*|----------------|*|----------------|*|          *
 *        \________________|*|_______________/   \                            *
 *         \_______________|*|______________/     \                           *
 *          \______________|*|_____________/       \-------------|*|          *
 *                                                                            *
 *                                                                            *
 * Communication pattern:                                                     *
 *                                                                            *
 *        client_job      curl_master        curl_worker                      *
 *          |                  |                  |                           *
 *          | ----(read)-----> |                  |                           *
 *          |                  | --(forward)----> |                           *
 *          |                                     |---\                       *
 *          |                                     |   |                       *
 *          |                                     |<--/                       *
 *          | <-------------(reply)-------------- |                           *
 *          X                                                                 *
 ******************************************************************************/

// C includes
#include <csignal>
#include <cstdlib>
#include <ctime>

// C++ includes
#include <iostream>
#include <random>
#include <string>
#include <vector>

// CAF
#include "caf/all.hpp"
#include "caf/io/all.hpp"

CAF_PUSH_WARNINGS
#include <curl/curl.h>
CAF_POP_WARNINGS

// disable some clang warnings here caused by CURL macros
#ifdef __clang__
#  pragma clang diagnostic ignored "-Wshorten-64-to-32"
#  pragma clang diagnostic ignored "-Wdisabled-macro-expansion"
#  pragma clang diagnostic ignored "-Wunused-const-variable"
#endif // __clang__

CAF_BEGIN_TYPE_ID_BLOCK(curl_fuse, first_custom_type_id)

  CAF_ADD_TYPE_ID(curl_fuse, (std::vector<char>) )

  CAF_ADD_ATOM(curl_fuse, custom, read_atom, "read")
  CAF_ADD_ATOM(curl_fuse, custom, fail_atom, "fail")
  CAF_ADD_ATOM(curl_fuse, custom, next_atom, "next")
  CAF_ADD_ATOM(curl_fuse, custom, reply_atom, "reply")
  CAF_ADD_ATOM(curl_fuse, custom, finished_atom, "finished")

CAF_END_TYPE_ID_BLOCK(curl_fuse)

using namespace caf;
using namespace custom;

using buffer_type = std::vector<char>;

namespace color {

// UNIX terminal color codes
constexpr char reset[] = "\033[0m";
constexpr char reset_endl[] = "\033[0m\n";
constexpr char black[] = "\033[30m";
constexpr char red[] = "\033[31m";
constexpr char green[] = "\033[32m";
constexpr char yellow[] = "\033[33m";
constexpr char blue[] = "\033[34m";
constexpr char magenta[] = "\033[35m";
constexpr char cyan[] = "\033[36m";
constexpr char white[] = "\033[37m";
constexpr char bold_black[] = "\033[1m\033[30m";
constexpr char bold_red[] = "\033[1m\033[31m";
constexpr char bold_green[] = "\033[1m\033[32m";
constexpr char bold_yellow[] = "\033[1m\033[33m";
constexpr char bold_blue[] = "\033[1m\033[34m";
constexpr char bold_magenta[] = "\033[1m\033[35m";
constexpr char bold_cyan[] = "\033[1m\033[36m";
constexpr char bold_white[] = "\033[1m\033[37m";

} // namespace color

// number of HTTP workers
constexpr size_t num_curl_workers = 10;

// minimum delay between HTTP requests
constexpr int min_req_interval = 10;

// maximum delay between HTTP requests
constexpr int max_req_interval = 300;

// put everything into anonymous namespace (except main)
namespace {

// provides print utility, a name, and a parent
struct base_state {
  base_state(local_actor* thisptr) : self(thisptr) {
    // nop
  }

  actor_ostream print() {
    return aout(self) << color << name << " (id = " << self->id() << "): ";
  }

  virtual bool init(std::string m_name, std::string m_color) {
    name = std::move(m_name);
    color = std::move(m_color);
    print() << "started" << color::reset_endl;
    return true;
  }

  virtual ~base_state() {
    print() << "done" << color::reset_endl;
  }

  local_actor* self;
  std::string name;
  std::string color;
};

// encapsulates an HTTP request
behavior client_job(stateful_actor<base_state>* self, const actor& parent) {
  if (!self->state.init("client-job", color::blue))
    return {}; // returning an empty behavior terminates the actor
  self->send(parent, read_atom_v, "http://www.example.com/index.html",
             uint64_t{0}, uint64_t{4095});
  return {
    [=](reply_atom, const buffer_type& buf) {
      self->state.print() << "successfully received " << buf.size() << " bytes"
                          << color::reset_endl;
      self->quit();
    },
    [=](fail_atom) {
      self->state.print() << "failure" << color::reset_endl;
      self->quit();
    },
  };
}

struct client_state : base_state {
  client_state(local_actor* selfptr)
    : base_state(selfptr),
      count(0),
      re(rd()),
      dist(min_req_interval, max_req_interval) {
    // nop
  }

  size_t count;
  std::random_device rd;
  std::default_random_engine re;
  std::uniform_int_distribution<int> dist;
};

// spawns HTTP requests
behavior client(stateful_actor<client_state>* self, const actor& parent) {
  using std::chrono::milliseconds;
  self->link_to(parent);
  if (!self->state.init("client", color::green))
    return {}; // returning an empty behavior terminates the actor
  self->send(self, next_atom_v);
  return {[=](next_atom) {
    auto& st = self->state;
    st.print() << "spawn new client_job (nr. " << ++st.count << ")"
               << color::reset_endl;
    // client_job will use IO
    // and should thus be spawned in a separate thread
    self->spawn<detached + linked>(client_job, parent);
    // compute random delay until next job is launched
    auto delay = st.dist(st.re);
    self->delayed_send(self, milliseconds(delay), next_atom_v);
  }};
}

struct curl_state : base_state {
  curl_state(local_actor* selfptr) : base_state(selfptr) {
    // nop
  }

  ~curl_state() override {
    if (curl != nullptr)
      curl_easy_cleanup(curl);
  }

  static size_t callback(void* data, size_t bsize, size_t nmemb, void* userp) {
    size_t size = bsize * nmemb;
    auto& buf = reinterpret_cast<curl_state*>(userp)->buf;
    auto first = reinterpret_cast<char*>(data);
    auto last = first + bsize;
    buf.insert(buf.end(), first, last);
    return size;
  }

  bool init(std::string m_name, std::string m_color) override {
    curl = curl_easy_init();
    if (curl == nullptr)
      return false;
    curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, &curl_state::callback);
    curl_easy_setopt(curl, CURLOPT_NOSIGNAL, 1);
    return base_state::init(std::move(m_name), std::move(m_color));
  }

  CURL* curl = nullptr;
  buffer_type buf;
};

// manages a CURL session
behavior curl_worker(stateful_actor<curl_state>* self, const actor& parent) {
  if (!self->state.init("curl-worker", color::yellow))
    return {}; // returning an empty behavior terminates the actor
  return {[=](read_atom, const std::string& fname, uint64_t offset,
              uint64_t range) -> message {
    auto& st = self->state;
    st.print() << "read" << color::reset_endl;
    for (;;) {
      st.buf.clear();
      // set URL
      curl_easy_setopt(st.curl, CURLOPT_URL, fname.c_str());
      // set range
      std::ostringstream oss;
      oss << offset << "-" << range;
      curl_easy_setopt(st.curl, CURLOPT_RANGE, oss.str().c_str());
      // set curl callback
      curl_easy_setopt(st.curl, CURLOPT_WRITEDATA,
                       reinterpret_cast<void*>(&st));
      // launch file transfer
      auto res = curl_easy_perform(st.curl);
      if (res != CURLE_OK) {
        st.print() << "curl_easy_perform() failed: " << curl_easy_strerror(res)
                   << color::reset_endl;
      } else {
        long hc = 0; // http return code
        curl_easy_getinfo(st.curl, CURLINFO_RESPONSE_CODE, &hc);
        switch (hc) {
          default:
            st.print() << "http error: download failed with "
                       << "'HTTP RETURN CODE': " << hc << color::reset_endl;
            break;
          case 200: // ok
          case 206: // partial content
            st.print() << "received " << st.buf.size()
                       << " bytes with 'HTTP RETURN CODE': " << hc
                       << color::reset_endl;
            // tell parent that this worker is done
            self->send(parent, finished_atom_v);
            return make_message(reply_atom_v, std::move(st.buf));
          case 404: // file does not exist
            st.print() << "http error: download failed with "
                       << "'HTTP RETURN CODE': 404 (file does "
                       << "not exist!)" << color::reset_endl;
        }
      }
      // avoid 100% cpu utilization if remote side is not accessible
      std::this_thread::sleep_for(std::chrono::milliseconds(100));
    }
  }};
}

struct master_state : base_state {
  master_state(local_actor* selfptr) : base_state(selfptr) {
    // nop
  }
  std::vector<actor> idle;
  std::vector<actor> busy;
};

behavior curl_master(stateful_actor<master_state>* self) {
  if (!self->state.init("curl-master", color::magenta))
    return {}; // returning an empty behavior terminates the actor
  // spawn workers
  for (size_t i = 0; i < num_curl_workers; ++i)
    self->state.idle.push_back(
      self->spawn<detached + linked>(curl_worker, self));
  auto worker_finished = [=] {
    auto sender = self->current_sender();
    auto last = self->state.busy.end();
    auto i = std::find(self->state.busy.begin(), last, sender);
    if (i == last)
      return;
    self->state.idle.push_back(*i);
    self->state.busy.erase(i);
    self->state.print() << "worker is done" << color::reset_endl;
  };
  self->state.print() << "spawned " << self->state.idle.size() << " worker(s)"
                      << color::reset_endl;
  return {
    [=](read_atom rd, std::string& str, uint64_t x, uint64_t y) {
      auto& st = self->state;
      st.print() << "received {'read'}" << color::reset_endl;
      // forward job to an idle worker
      actor worker = st.idle.back();
      st.idle.pop_back();
      st.busy.push_back(worker);
      self->delegate(worker, rd, std::move(str), x, y);
      st.print() << st.busy.size() << " active jobs" << color::reset_endl;
      if (st.idle.empty()) {
        // wait until at least one worker finished its job
        self->become(keep_behavior, [=](finished_atom) {
          worker_finished();
          self->unbecome();
        });
      }
    },
    [=](finished_atom) { worker_finished(); },
  };
}

// signal handling for ctrl+c
std::atomic<bool> shutdown_flag{false};

} // namespace

void caf_main(actor_system& system) {
  // install signal handler
  struct sigaction act;
  act.sa_handler = [](int) { shutdown_flag = true; };
  auto set_sighandler = [&] {
    if (sigaction(SIGINT, &act, nullptr) != 0) {
      std::cerr << "fatal: cannot set signal handler" << std::endl;
      abort();
    }
  };
  set_sighandler();
  // initialize CURL
  curl_global_init(CURL_GLOBAL_DEFAULT);
  // get a scoped actor for the communication with our CURL actors
  scoped_actor self{system};
  // spawn client and curl_master
  auto master = self->spawn<detached>(curl_master);
  self->spawn<detached>(client, master);
  // poll CTRL+C flag every second
  while (!shutdown_flag)
    std::this_thread::sleep_for(std::chrono::seconds(1));
  aout(self) << color::cyan << "received CTRL+C" << color::reset_endl;
  // shutdown actors
  anon_send_exit(master, exit_reason::user_shutdown);
  // await actors
  act.sa_handler = [](int) { abort(); };
  set_sighandler();
  aout(self) << color::cyan
             << "await CURL; this may take a while "
                "(press CTRL+C again to abort)"
             << color::reset_endl;
  self->await_all_other_actors_done();
  // shutdown CURL
  curl_global_cleanup();
}

CAF_MAIN(id_block::curl_fuse, io::middleman)