1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162
|
// This file is part of CAF, the C++ Actor Framework. See the file LICENSE in
// the main distribution directory for license terms and copyright or visit
// https://github.com/actor-framework/actor-framework/blob/master/LICENSE.
#pragma once
#include <tuple>
#include "caf/broadcast_downstream_manager.hpp"
#include "caf/check_typed_input.hpp"
#include "caf/default_downstream_manager.hpp"
#include "caf/detail/implicit_conversions.hpp"
#include "caf/detail/stream_source_driver_impl.hpp"
#include "caf/detail/stream_source_impl.hpp"
#include "caf/detail/type_list.hpp"
#include "caf/detail/type_traits.hpp"
#include "caf/downstream_manager.hpp"
#include "caf/fwd.hpp"
#include "caf/is_actor_handle.hpp"
#include "caf/make_source_result.hpp"
#include "caf/policy/arg.hpp"
#include "caf/response_type.hpp"
#include "caf/stream_source.hpp"
namespace caf {
/// Attaches a new stream source to `self` by creating a default stream source
/// manager with `Driver`.
/// @param self Points to the hosting actor.
/// @param xs User-defined arguments for the stream handshake.
/// @param ctor_args Parameter pack for constructing the driver.
/// @returns The allocated `stream_manager` and the output slot.
template <class Driver, class... Ts, class... CtorArgs>
make_source_result_t<typename Driver::downstream_manager_type, Ts...>
attach_stream_source(scheduled_actor* self, std::tuple<Ts...> xs,
CtorArgs&&... ctor_args) {
using namespace detail;
auto mgr = make_stream_source<Driver>(self,
std::forward<CtorArgs>(ctor_args)...);
auto slot = mgr->add_outbound_path(std::move(xs));
return {slot, std::move(mgr)};
}
/// Attaches a new stream source to `self` by creating a default stream source
/// manager with the default driver.
/// @param self Points to the hosting actor.
/// @param xs User-defined arguments for the stream handshake.
/// @param init Function object for initializing the state of the source.
/// @param pull Generator function object for producing downstream messages.
/// @param done Predicate returning `true` when generator is done.
/// @param fin Cleanup handler.
/// @returns The allocated `stream_manager` and the output slot.
template <class... Ts, class Init, class Pull, class Done,
class Finalize = unit_t, class Trait = stream_source_trait_t<Pull>,
class DownstreamManager = broadcast_downstream_manager<
typename Trait::output>>
make_source_result_t<DownstreamManager, Ts...>
attach_stream_source(scheduled_actor* self, std::tuple<Ts...> xs, Init init,
Pull pull, Done done, Finalize fin = {},
policy::arg<DownstreamManager> = {}) {
using state_type = typename Trait::state;
static_assert(std::is_same<
void(state_type&),
typename detail::get_callable_trait<Init>::fun_sig>::value,
"Expected signature `void (State&)` for init function");
static_assert(std::is_same<
bool(const state_type&),
typename detail::get_callable_trait<Done>::fun_sig>::value,
"Expected signature `bool (const State&)` "
"for done predicate function");
using driver = detail::stream_source_driver_impl<DownstreamManager, Pull,
Done, Finalize>;
return attach_stream_source<driver>(self, std::move(xs), std::move(init),
std::move(pull), std::move(done),
std::move(fin));
}
/// Attaches a new stream source to `self` by creating a default stream source
/// manager with the default driver.
/// @param self Points to the hosting actor.
/// @param init Function object for initializing the state of the source.
/// @param pull Generator function object for producing downstream messages.
/// @param done Predicate returning `true` when generator is done.
/// @param fin Cleanup handler.
/// @returns The allocated `stream_manager` and the output slot.
template <class Init, class Pull, class Done, class Finalize = unit_t,
class DownstreamManager = default_downstream_manager_t<Pull>,
class Trait = stream_source_trait_t<Pull>>
detail::enable_if_t<!is_actor_handle<Init>::value && Trait::valid,
make_source_result_t<DownstreamManager>>
attach_stream_source(scheduled_actor* self, Init init, Pull pull, Done done,
Finalize fin = {},
policy::arg<DownstreamManager> token = {}) {
using output_type = typename Trait::output;
static_assert(detail::sendable<output_type>,
"the output type of the source has has no type ID, "
"did you forgot to announce it via CAF_ADD_TYPE_ID?");
static_assert(detail::sendable<stream<output_type>>,
"stream<T> for the output type has has no type ID, "
"did you forgot to announce it via CAF_ADD_TYPE_ID?");
return attach_stream_source(self, std::make_tuple(), init, pull, done, fin,
token);
}
/// Attaches a new stream source to `self` by creating a default stream source
/// manager with the default driver and starts sending to `dest` immediately.
/// @param self Points to the hosting actor.
/// @param dest Handle to the next stage in the pipeline.
/// @param xs User-defined arguments for the stream handshake.
/// @param init Function object for initializing the state of the source.
/// @param pull Generator function object for producing downstream messages.
/// @param done Predicate returning `true` when generator is done.
/// @param fin Cleanup handler.
/// @returns The allocated `stream_manager` and the output slot.
template <class ActorHandle, class... Ts, class Init, class Pull, class Done,
class Finalize = unit_t,
class DownstreamManager = default_downstream_manager_t<Pull>,
class Trait = stream_source_trait_t<Pull>>
detail::enable_if_t<is_actor_handle<ActorHandle>::value,
make_source_result_t<DownstreamManager>>
attach_stream_source(scheduled_actor* self, const ActorHandle& dest,
std::tuple<Ts...> xs, Init init, Pull pull, Done done,
Finalize fin = {}, policy::arg<DownstreamManager> = {}) {
using namespace detail;
using token = type_list<stream<typename DownstreamManager::output_type>,
strip_and_convert_t<Ts>...>;
static_assert(response_type_unbox<signatures_of_t<ActorHandle>, token>::valid,
"receiver does not accept the stream handshake");
using driver = detail::stream_source_driver_impl<DownstreamManager, Pull,
Done, Finalize>;
auto mgr = detail::make_stream_source<driver>(self, std::move(init),
std::move(pull),
std::move(done),
std::move(fin));
auto slot = mgr->add_outbound_path(dest, std::move(xs));
return {slot, std::move(mgr)};
}
/// Attaches a new stream source to `self` by creating a default stream source
/// manager with the default driver and starts sending to `dest` immediately.
/// @param self Points to the hosting actor.
/// @param dest Handle to the next stage in the pipeline.
/// @param init Function object for initializing the state of the source.
/// @param pull Generator function object for producing downstream messages.
/// @param done Predicate returning `true` when generator is done.
/// @param fin Cleanup handler.
/// @returns The allocated `stream_manager` and the output slot.
template <class ActorHandle, class Init, class Pull, class Done,
class Finalize = unit_t,
class DownstreamManager = default_downstream_manager_t<Pull>,
class Trait = stream_source_trait_t<Pull>>
detail::enable_if_t<is_actor_handle<ActorHandle>::value && Trait::valid,
make_source_result_t<DownstreamManager>>
attach_stream_source(scheduled_actor* self, const ActorHandle& dest, Init init,
Pull pull, Done done, Finalize fin = {},
policy::arg<DownstreamManager> token = {}) {
return attach_stream_source(self, dest, std::make_tuple(), std::move(init),
std::move(pull), std::move(done), std::move(fin),
token);
}
} // namespace caf
|