File: attach_stream_source.hpp

package info (click to toggle)
actor-framework 0.18.7-1~exp1
  • links: PTS
  • area: main
  • in suites: experimental
  • size: 8,740 kB
  • sloc: cpp: 85,162; sh: 491; python: 187; makefile: 11
file content (162 lines) | stat: -rw-r--r-- 8,495 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
// This file is part of CAF, the C++ Actor Framework. See the file LICENSE in
// the main distribution directory for license terms and copyright or visit
// https://github.com/actor-framework/actor-framework/blob/master/LICENSE.

#pragma once

#include <tuple>

#include "caf/broadcast_downstream_manager.hpp"
#include "caf/check_typed_input.hpp"
#include "caf/default_downstream_manager.hpp"
#include "caf/detail/implicit_conversions.hpp"
#include "caf/detail/stream_source_driver_impl.hpp"
#include "caf/detail/stream_source_impl.hpp"
#include "caf/detail/type_list.hpp"
#include "caf/detail/type_traits.hpp"
#include "caf/downstream_manager.hpp"
#include "caf/fwd.hpp"
#include "caf/is_actor_handle.hpp"
#include "caf/make_source_result.hpp"
#include "caf/policy/arg.hpp"
#include "caf/response_type.hpp"
#include "caf/stream_source.hpp"

namespace caf {

/// Attaches a new stream source to `self` by creating a default stream source
/// manager with `Driver`.
/// @param self Points to the hosting actor.
/// @param xs User-defined arguments for the stream handshake.
/// @param ctor_args Parameter pack for constructing the driver.
/// @returns The allocated `stream_manager` and the output slot.
template <class Driver, class... Ts, class... CtorArgs>
make_source_result_t<typename Driver::downstream_manager_type, Ts...>
attach_stream_source(scheduled_actor* self, std::tuple<Ts...> xs,
                     CtorArgs&&... ctor_args) {
  using namespace detail;
  auto mgr = make_stream_source<Driver>(self,
                                        std::forward<CtorArgs>(ctor_args)...);
  auto slot = mgr->add_outbound_path(std::move(xs));
  return {slot, std::move(mgr)};
}

/// Attaches a new stream source to `self` by creating a default stream source
/// manager with the default driver.
/// @param self Points to the hosting actor.
/// @param xs User-defined arguments for the stream handshake.
/// @param init Function object for initializing the state of the source.
/// @param pull Generator function object for producing downstream messages.
/// @param done Predicate returning `true` when generator is done.
/// @param fin Cleanup handler.
/// @returns The allocated `stream_manager` and the output slot.
template <class... Ts, class Init, class Pull, class Done,
          class Finalize = unit_t, class Trait = stream_source_trait_t<Pull>,
          class DownstreamManager = broadcast_downstream_manager<
            typename Trait::output>>
make_source_result_t<DownstreamManager, Ts...>
attach_stream_source(scheduled_actor* self, std::tuple<Ts...> xs, Init init,
                     Pull pull, Done done, Finalize fin = {},
                     policy::arg<DownstreamManager> = {}) {
  using state_type = typename Trait::state;
  static_assert(std::is_same<
                  void(state_type&),
                  typename detail::get_callable_trait<Init>::fun_sig>::value,
                "Expected signature `void (State&)` for init function");
  static_assert(std::is_same<
                  bool(const state_type&),
                  typename detail::get_callable_trait<Done>::fun_sig>::value,
                "Expected signature `bool (const State&)` "
                "for done predicate function");
  using driver = detail::stream_source_driver_impl<DownstreamManager, Pull,
                                                   Done, Finalize>;
  return attach_stream_source<driver>(self, std::move(xs), std::move(init),
                                      std::move(pull), std::move(done),
                                      std::move(fin));
}

/// Attaches a new stream source to `self` by creating a default stream source
/// manager with the default driver.
/// @param self Points to the hosting actor.
/// @param init Function object for initializing the state of the source.
/// @param pull Generator function object for producing downstream messages.
/// @param done Predicate returning `true` when generator is done.
/// @param fin Cleanup handler.
/// @returns The allocated `stream_manager` and the output slot.
template <class Init, class Pull, class Done, class Finalize = unit_t,
          class DownstreamManager = default_downstream_manager_t<Pull>,
          class Trait = stream_source_trait_t<Pull>>
detail::enable_if_t<!is_actor_handle<Init>::value && Trait::valid,
                    make_source_result_t<DownstreamManager>>
attach_stream_source(scheduled_actor* self, Init init, Pull pull, Done done,
                     Finalize fin = {},
                     policy::arg<DownstreamManager> token = {}) {
  using output_type = typename Trait::output;
  static_assert(detail::sendable<output_type>,
                "the output type of the source has has no type ID, "
                "did you forgot to announce it via CAF_ADD_TYPE_ID?");
  static_assert(detail::sendable<stream<output_type>>,
                "stream<T> for the output type has has no type ID, "
                "did you forgot to announce it via CAF_ADD_TYPE_ID?");
  return attach_stream_source(self, std::make_tuple(), init, pull, done, fin,
                              token);
}

/// Attaches a new stream source to `self` by creating a default stream source
/// manager with the default driver and starts sending to `dest` immediately.
/// @param self Points to the hosting actor.
/// @param dest Handle to the next stage in the pipeline.
/// @param xs User-defined arguments for the stream handshake.
/// @param init Function object for initializing the state of the source.
/// @param pull Generator function object for producing downstream messages.
/// @param done Predicate returning `true` when generator is done.
/// @param fin Cleanup handler.
/// @returns The allocated `stream_manager` and the output slot.
template <class ActorHandle, class... Ts, class Init, class Pull, class Done,
          class Finalize = unit_t,
          class DownstreamManager = default_downstream_manager_t<Pull>,
          class Trait = stream_source_trait_t<Pull>>
detail::enable_if_t<is_actor_handle<ActorHandle>::value,
                    make_source_result_t<DownstreamManager>>
attach_stream_source(scheduled_actor* self, const ActorHandle& dest,
                     std::tuple<Ts...> xs, Init init, Pull pull, Done done,
                     Finalize fin = {}, policy::arg<DownstreamManager> = {}) {
  using namespace detail;
  using token = type_list<stream<typename DownstreamManager::output_type>,
                          strip_and_convert_t<Ts>...>;
  static_assert(response_type_unbox<signatures_of_t<ActorHandle>, token>::valid,
                "receiver does not accept the stream handshake");
  using driver = detail::stream_source_driver_impl<DownstreamManager, Pull,
                                                   Done, Finalize>;
  auto mgr = detail::make_stream_source<driver>(self, std::move(init),
                                                std::move(pull),
                                                std::move(done),
                                                std::move(fin));
  auto slot = mgr->add_outbound_path(dest, std::move(xs));
  return {slot, std::move(mgr)};
}

/// Attaches a new stream source to `self` by creating a default stream source
/// manager with the default driver and starts sending to `dest` immediately.
/// @param self Points to the hosting actor.
/// @param dest Handle to the next stage in the pipeline.
/// @param init Function object for initializing the state of the source.
/// @param pull Generator function object for producing downstream messages.
/// @param done Predicate returning `true` when generator is done.
/// @param fin Cleanup handler.
/// @returns The allocated `stream_manager` and the output slot.
template <class ActorHandle, class Init, class Pull, class Done,
          class Finalize = unit_t,
          class DownstreamManager = default_downstream_manager_t<Pull>,
          class Trait = stream_source_trait_t<Pull>>
detail::enable_if_t<is_actor_handle<ActorHandle>::value && Trait::valid,
                    make_source_result_t<DownstreamManager>>
attach_stream_source(scheduled_actor* self, const ActorHandle& dest, Init init,
                     Pull pull, Done done, Finalize fin = {},
                     policy::arg<DownstreamManager> token = {}) {
  return attach_stream_source(self, dest, std::make_tuple(), std::move(init),
                              std::move(pull), std::move(done), std::move(fin),
                              token);
}

} // namespace caf