File: attach_stream_source.hpp

package info (click to toggle)
actor-framework 0.17.6-3.2
  • links: PTS
  • area: main
  • in suites: forky, sid
  • size: 9,008 kB
  • sloc: cpp: 77,684; sh: 674; python: 309; makefile: 13
file content (171 lines) | stat: -rw-r--r-- 9,347 bytes parent folder | download | duplicates (4)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
/******************************************************************************
 *                       ____    _    _____                                   *
 *                      / ___|  / \  |  ___|    C++                           *
 *                     | |     / _ \ | |_       Actor                         *
 *                     | |___ / ___ \|  _|      Framework                     *
 *                      \____/_/   \_|_|                                      *
 *                                                                            *
 * Copyright 2011-2019 Dominik Charousset                                     *
 *                                                                            *
 * Distributed under the terms and conditions of the BSD 3-Clause License or  *
 * (at your option) under the terms and conditions of the Boost Software      *
 * License 1.0. See accompanying files LICENSE and LICENSE_ALTERNATIVE.       *
 *                                                                            *
 * If you did not receive a copy of the license files, see                    *
 * http://opensource.org/licenses/BSD-3-Clause and                            *
 * http://www.boost.org/LICENSE_1_0.txt.                                      *
 ******************************************************************************/

#pragma once

#include <tuple>

#include "caf/broadcast_downstream_manager.hpp"
#include "caf/check_typed_input.hpp"
#include "caf/default_downstream_manager.hpp"
#include "caf/detail/stream_source_driver_impl.hpp"
#include "caf/detail/stream_source_impl.hpp"
#include "caf/detail/type_list.hpp"
#include "caf/detail/type_traits.hpp"
#include "caf/downstream_manager.hpp"
#include "caf/fwd.hpp"
#include "caf/is_actor_handle.hpp"
#include "caf/make_source_result.hpp"
#include "caf/policy/arg.hpp"
#include "caf/response_type.hpp"
#include "caf/stream_source.hpp"

namespace caf {

/// Attaches a new stream source to `self` by creating a default stream source
/// manager with `Driver`.
/// @param self Points to the hosting actor.
/// @param xs User-defined arguments for the stream handshake.
/// @param init Function object for initializing the state of the source.
/// @param pull Function object for generating downstream messages.
/// @param done Predicate returning `true` when generator is done.
/// @param fin Optional cleanup handler.
/// @returns The allocated `stream_manager` and the output slot.
template <class Driver, class... Ts, class... CtorArgs>
make_source_result_t<typename Driver::downstream_manager_type, Ts...>
attach_stream_source(scheduled_actor* self, std::tuple<Ts...> xs,
                     CtorArgs&&... ctor_args) {
  using namespace detail;
  auto mgr = make_stream_source<Driver>(self,
                                        std::forward<CtorArgs>(ctor_args)...);
  auto slot = mgr->add_outbound_path(std::move(xs));
  return {slot, std::move(mgr)};
}

/// Attaches a new stream source to `self` by creating a default stream source
/// manager with the default driver.
/// @param self Points to the hosting actor.
/// @param xs User-defined arguments for the stream handshake.
/// @param init Function object for initializing the state of the source.
/// @param pull Generator function object for producing downstream messages.
/// @param done Predicate returning `true` when generator is done.
/// @param fin Cleanup handler.
/// @returns The allocated `stream_manager` and the output slot.
template <class... Ts, class Init, class Pull, class Done,
          class Finalize = unit_t, class Trait = stream_source_trait_t<Pull>,
          class DownstreamManager = broadcast_downstream_manager<
            typename Trait::output>>
make_source_result_t<DownstreamManager, Ts...>
attach_stream_source(scheduled_actor* self, std::tuple<Ts...> xs, Init init,
                     Pull pull, Done done, Finalize fin = {},
                     policy::arg<DownstreamManager> = {}) {
  using state_type = typename Trait::state;
  static_assert(std::is_same<
                  void(state_type&),
                  typename detail::get_callable_trait<Init>::fun_sig>::value,
                "Expected signature `void (State&)` for init function");
  static_assert(std::is_same<
                  bool(const state_type&),
                  typename detail::get_callable_trait<Done>::fun_sig>::value,
                "Expected signature `bool (const State&)` "
                "for done predicate function");
  using driver = detail::stream_source_driver_impl<DownstreamManager, Pull,
                                                   Done, Finalize>;
  return attach_stream_source<driver>(self, std::move(xs), std::move(init),
                                      std::move(pull), std::move(done),
                                      std::move(fin));
}

/// Attaches a new stream source to `self` by creating a default stream source
/// manager with the default driver.
/// @param self Points to the hosting actor.
/// @param init Function object for initializing the state of the source.
/// @param pull Generator function object for producing downstream messages.
/// @param done Predicate returning `true` when generator is done.
/// @param fin Cleanup handler.
/// @returns The allocated `stream_manager` and the output slot.
template <class Init, class Pull, class Done, class Finalize = unit_t,
          class DownstreamManager = default_downstream_manager_t<Pull>,
          class Trait = stream_source_trait_t<Pull>>
detail::enable_if_t<!is_actor_handle<Init>::value && Trait::valid,
                    make_source_result_t<DownstreamManager>>
attach_stream_source(scheduled_actor* self, Init init, Pull pull, Done done,
                     Finalize finalize = {},
                     policy::arg<DownstreamManager> token = {}) {
  return attach_stream_source(self, std::make_tuple(), init, pull, done,
                              finalize, token);
}

/// Attaches a new stream source to `self` by creating a default stream source
/// manager with the default driver and starts sending to `dest` immediately.
/// @param self Points to the hosting actor.
/// @param dest Handle to the next stage in the pipeline.
/// @param xs User-defined arguments for the stream handshake.
/// @param init Function object for initializing the state of the source.
/// @param pull Generator function object for producing downstream messages.
/// @param done Predicate returning `true` when generator is done.
/// @param fin Cleanup handler.
/// @returns The allocated `stream_manager` and the output slot.
template <class ActorHandle, class... Ts, class Init, class Pull, class Done,
          class Finalize = unit_t,
          class DownstreamManager = default_downstream_manager_t<Pull>,
          class Trait = stream_source_trait_t<Pull>>
detail::enable_if_t<is_actor_handle<ActorHandle>::value,
                    make_source_result_t<DownstreamManager>>
attach_stream_source(scheduled_actor* self, const ActorHandle& dest,
                     std::tuple<Ts...> xs, Init init, Pull pull, Done done,
                     Finalize fin = {}, policy::arg<DownstreamManager> = {}) {
  using namespace detail;
  using token = type_list<stream<typename DownstreamManager::output_type>,
                          strip_and_convert_t<Ts>...>;
  static_assert(response_type_unbox<signatures_of_t<ActorHandle>, token>::valid,
                "receiver does not accept the stream handshake");
  using driver = detail::stream_source_driver_impl<DownstreamManager, Pull,
                                                   Done, Finalize>;
  auto mgr = detail::make_stream_source<driver>(self, std::move(init),
                                                std::move(pull),
                                                std::move(done),
                                                std::move(fin));
  auto slot = mgr->add_outbound_path(dest, std::move(xs));
  return {slot, std::move(mgr)};
}

/// Attaches a new stream source to `self` by creating a default stream source
/// manager with the default driver and starts sending to `dest` immediately.
/// @param self Points to the hosting actor.
/// @param dest Handle to the next stage in the pipeline.
/// @param init Function object for initializing the state of the source.
/// @param pull Generator function object for producing downstream messages.
/// @param done Predicate returning `true` when generator is done.
/// @param fin Cleanup handler.
/// @returns The allocated `stream_manager` and the output slot.
template <class ActorHandle, class Init, class Pull, class Done,
          class Finalize = unit_t,
          class DownstreamManager = default_downstream_manager_t<Pull>,
          class Trait = stream_source_trait_t<Pull>>
detail::enable_if_t<is_actor_handle<ActorHandle>::value && Trait::valid,
                    make_source_result_t<DownstreamManager>>
attach_stream_source(scheduled_actor* self, const ActorHandle& dest, Init init,
                     Pull pull, Done done, Finalize fin = {},
                     policy::arg<DownstreamManager> token = {}) {
  return attach_stream_source(self, dest, std::make_tuple(), std::move(init),
                              std::move(pull), std::move(done), std::move(fin),
                              token);
}

} // namespace caf