File: integer_stream.cpp

package info (click to toggle)
actor-framework 0.17.6-3.2
  • links: PTS
  • area: main
  • in suites: forky, sid
  • size: 9,008 kB
  • sloc: cpp: 77,684; sh: 674; python: 309; makefile: 13
file content (148 lines) | stat: -rw-r--r-- 5,087 bytes parent folder | download | duplicates (4)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
// Basic, non-interactive streaming example for processing integers.

#include <iostream>
#include <vector>

#include "caf/all.hpp"

CAF_BEGIN_TYPE_ID_BLOCK(integer_stream, first_custom_type_id)

  CAF_ADD_TYPE_ID(integer_stream, (caf::stream<int32_t>) )
  CAF_ADD_TYPE_ID(integer_stream, (std::vector<int32_t>) )

CAF_END_TYPE_ID_BLOCK(integer_stream)

using std::endl;
using namespace caf;

namespace {

// --(rst-source-begin)--
// Simple source for generating a stream of integers from [0, n).
behavior int_source(event_based_actor* self) {
  return {
    [=](open_atom, int32_t n) {
      // Produce at least one value.
      if (n <= 0)
        n = 1;
      // Create a stream manager for implementing a stream source. The
      // streaming logic requires three functions: initializer, generator, and
      // predicate.
      return attach_stream_source(
        self,
        // Initializer. The type of the first argument (state) is freely
        // chosen. If no state is required, `caf::unit_t` can be used here.
        [](int32_t& x) { x = 0; },
        // Generator. This function is called by CAF to produce new stream
        // elements for downstream actors. The `x` argument is our state again
        // (with our freely chosen type). The second argument `out` points to
        // the output buffer. The template argument (here: int) determines what
        // elements downstream actors receive in this stream. Finally, `num` is
        // a hint from CAF how many elements we should ideally insert into
        // `out`. We can always insert fewer or more items.
        [n](int32_t& x, downstream<int32_t>& out, size_t num) {
          auto max_x = std::min(x + static_cast<int>(num), n);
          for (; x < max_x; ++x)
            out.push(x);
        },
        // Predicate. This function tells CAF when we reached the end.
        [n](const int32_t& x) { return x == n; });
    },
  };
}
// --(rst-source-end)--

// --(rst-stage-begin)--
// Simple stage that only selects even numbers.
behavior int_selector(event_based_actor* self) {
  return {
    [=](stream<int32_t> in) {
      // Create a stream manager for implementing a stream stage. Similar to
      // `make_source`, we need three functions: initialzer, processor, and
      // finalizer.
      return attach_stream_stage(
        self,
        // Our input source.
        in,
        // Initializer. Here, we don't need any state and simply use unit_t.
        [](unit_t&) {
          // nop
        },
        // Processor. This function takes individual input elements as `val`
        // and forwards even integers to `out`.
        [](unit_t&, downstream<int32_t>& out, int32_t val) {
          if (val % 2 == 0)
            out.push(val);
        },
        // Finalizer. Allows us to run cleanup code once the stream terminates.
        [=](unit_t&, const error& err) {
          if (err) {
            aout(self) << "int_selector aborted with error: " << err
                       << std::endl;
          } else {
            aout(self) << "int_selector finalized" << std::endl;
          }
          // else: regular stream shutdown
        });
    },
  };
}
// --(rst-stage-end)--

// --(rst-sink-begin)--
behavior int_sink(event_based_actor* self) {
  return {
    [=](stream<int32_t> in) {
      // Create a stream manager for implementing a stream sink. Once more, we
      // have to provide three functions: Initializer, Consumer, Finalizer.
      return attach_stream_sink(
        self,
        // Our input source.
        in,
        // Initializer. Here, we store all values we receive. Note that streams
        // are potentially unbound, so this is usually a bad idea outside small
        // examples like this one.
        [](std::vector<int>&) {
          // nop
        },
        // Consumer. Takes individual input elements as `val` and stores them
        // in our history.
        [](std::vector<int32_t>& xs, int32_t val) { xs.emplace_back(val); },
        // Finalizer. Allows us to run cleanup code once the stream terminates.
        [=](std::vector<int32_t>& xs, const error& err) {
          if (err) {
            aout(self) << "int_sink aborted with error: " << err << std::endl;
          } else {
            aout(self) << "int_sink finalized after receiving: " << xs
                       << std::endl;
          }
        });
    },
  };
}
// --(rst-sink-end)--

struct config : actor_system_config {
  config() {
    opt_group{custom_options_, "global"}
      .add(with_stage, "with-stage,s", "use a stage for filtering odd numbers")
      .add(n, "num-values,n", "number of values produced by the source");
  }

  bool with_stage = false;
  int32_t n = 100;
};

// --(rst-main-begin)--
void caf_main(actor_system& sys, const config& cfg) {
  auto src = sys.spawn(int_source);
  auto snk = sys.spawn(int_sink);
  auto pipeline = cfg.with_stage ? snk * sys.spawn(int_selector) * src
                                 : snk * src;
  anon_send(pipeline, open_atom_v, cfg.n);
}
// --(rst-main-end)--

} // namespace

CAF_MAIN(id_block::integer_stream)