1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158
|
#include <cassert>
#include <chrono>
#include <iomanip>
#include <iostream>
#include <numeric>
#include <vector>
#include "caf/actor_system.hpp"
#include "caf/actor_system_config.hpp"
#include "caf/event_based_actor.hpp"
#include "caf/exec_main.hpp"
#include "caf/function_view.hpp"
#include "caf/init_global_meta_objects.hpp"
#include "caf/policy/select_all.hpp"
#include "caf/scoped_actor.hpp"
#include "caf/typed_actor.hpp"
#include "caf/typed_event_based_actor.hpp"
CAF_BEGIN_TYPE_ID_BLOCK(fan_out_request, first_custom_type_id)
CAF_ADD_ATOM(fan_out_request, row_atom)
CAF_ADD_ATOM(fan_out_request, column_atom)
CAF_ADD_ATOM(fan_out_request, average_atom)
CAF_END_TYPE_ID_BLOCK(fan_out_request)
using std::endl;
using std::chrono::seconds;
using namespace caf;
/// A simple actor for storing an integer value.
using cell = typed_actor<
// Writes a new value.
result<void>(put_atom, int),
// Reads the value.
result<int>(get_atom)>;
/// An for storing a 2-dimensional matrix of integers.
using matrix = typed_actor<
// Writes a new value to given cell (x-coordinate, y-coordinate, new-value).
result<void>(put_atom, int, int, int),
// Reads from given cell.
result<int>(get_atom, int, int),
// Computes the average for given row.
result<double>(get_atom, average_atom, row_atom, int),
// Computes the average for given column.
result<double>(get_atom, average_atom, column_atom, int)>;
struct cell_state {
int value = 0;
static constexpr const char* name = "cell";
};
cell::behavior_type cell_actor(cell::stateful_pointer<cell_state> self) {
return {
[=](put_atom, int val) { self->state.value = val; },
[=](get_atom) { return self->state.value; },
};
}
struct matrix_state {
using row_type = std::vector<cell>;
std::vector<row_type> rows;
static constexpr const char* name = "matrix";
};
matrix::behavior_type matrix_actor(matrix::stateful_pointer<matrix_state> self,
int rows, int columns) {
// Spawn all cells and return our behavior.
self->state.rows.resize(rows);
for (int row = 0; row < rows; ++row) {
auto& row_vec = self->state.rows[row];
row_vec.resize(columns);
for (int column = 0; column < columns; ++column)
row_vec[column] = self->spawn(cell_actor);
}
return {
[=](put_atom put, int row, int column, int val) {
assert(row < rows && column < columns);
return self->delegate(self->state.rows[row][column], put, val);
},
[=](get_atom get, int row, int column) {
assert(row < rows && column < columns);
return self->delegate(self->state.rows[row][column], get);
},
[=](get_atom get, average_atom, row_atom, int row) {
assert(row < rows);
auto rp = self->make_response_promise<double>();
auto& row_vec = self->state.rows[row];
self->fan_out_request<policy::select_all>(row_vec, infinite, get)
.then(
[=](std::vector<int> xs) mutable {
assert(xs.size() == static_cast<size_t>(columns));
rp.deliver(std::accumulate(xs.begin(), xs.end(), 0.0) / columns);
},
[=](error& err) mutable { rp.deliver(std::move(err)); });
return rp;
},
// --(rst-fan-out-begin)--
[=](get_atom get, average_atom, column_atom, int column) {
assert(column < columns);
std::vector<cell> columns;
columns.reserve(rows);
auto& rows_vec = self->state.rows;
for (int row = 0; row < rows; ++row)
columns.emplace_back(rows_vec[row][column]);
auto rp = self->make_response_promise<double>();
self->fan_out_request<policy::select_all>(columns, infinite, get)
.then(
[=](std::vector<int> xs) mutable {
assert(xs.size() == static_cast<size_t>(rows));
rp.deliver(std::accumulate(xs.begin(), xs.end(), 0.0) / rows);
},
[=](error& err) mutable { rp.deliver(std::move(err)); });
return rp;
},
// --(rst-fan-out-end)--
};
}
std::ostream& operator<<(std::ostream& out, const expected<int>& x) {
if (x)
return out << *x;
return out << to_string(x.error());
}
void caf_main(actor_system& sys) {
// Spawn our matrix.
static constexpr int rows = 3;
static constexpr int columns = 6;
auto mx = sys.spawn(matrix_actor, rows, columns);
auto f = make_function_view(mx);
// Set cells in our matrix to these values:
// 2 4 8 16 32 64
// 3 9 27 81 243 729
// 4 16 64 256 1024 4096
for (int row = 0; row < rows; ++row)
for (int column = 0; column < columns; ++column)
f(put_atom_v, row, column, (int) pow(row + 2, column + 1));
// Print out matrix.
for (int row = 0; row < rows; ++row) {
for (int column = 0; column < columns; ++column)
std::cout << std::setw(4) << f(get_atom_v, row, column) << ' ';
std::cout << '\n';
}
// Print out AVG for each row and column.
for (int row = 0; row < rows; ++row)
std::cout << "AVG(row " << row
<< ") = " << f(get_atom_v, average_atom_v, row_atom_v, row)
<< '\n';
for (int column = 0; column < columns; ++column)
std::cout << "AVG(column " << column
<< ") = " << f(get_atom_v, average_atom_v, column_atom_v, column)
<< '\n';
}
CAF_MAIN(id_block::fan_out_request)
|