1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353
|
// Copyright Nat Goodspeed 2014.
// Distributed under the Boost Software License, Version 1.0.
// (See accompanying file LICENSE_1_0.txt or copy at
// http://www.boost.org/LICENSE_1_0.txt)
#include <chrono>
#include <condition_variable>
#include <iostream>
#include <mutex>
#include <algorithm> // std::find_if()
#include <boost/fiber/all.hpp>
#include <boost/fiber/scheduler.hpp>
class Verbose {
public:
Verbose( std::string const& d, std::string const& s="stop") :
desc( d),
stop( s) {
std::cout << desc << " start" << std::endl;
}
~Verbose() {
std::cout << desc << ' ' << stop << std::endl;
}
Verbose( Verbose const&) = delete;
Verbose & operator=( Verbose const&) = delete;
private:
std::string desc;
std::string stop;
};
//[priority_props
class priority_props : public boost::fibers::fiber_properties {
public:
priority_props( boost::fibers::context * ctx):
fiber_properties( ctx), /*< Your subclass constructor must accept a
[^[class_link context]*] and pass it to
the `fiber_properties` constructor. >*/
priority_( 0) {
}
int get_priority() const {
return priority_; /*< Provide read access methods at your own discretion. >*/
}
// Call this method to alter priority, because we must notify
// priority_scheduler of any change.
void set_priority( int p) { /*<
It's important to call `notify()` on any
change in a property that can affect the
scheduler's behavior. Therefore, such
modifications should only be performed
through an access method. >*/
// Of course, it's only worth reshuffling the queue and all if we're
// actually changing the priority.
if ( p != priority_) {
priority_ = p;
notify();
}
}
// The fiber name of course is solely for purposes of this example
// program; it has nothing to do with implementing scheduler priority.
// This is a public data member -- not requiring set/get access methods --
// because we need not inform the scheduler of any change.
std::string name; /*< A property that does not affect the scheduler does
not need access methods. >*/
private:
int priority_;
};
//]
//[priority_scheduler
class priority_scheduler :
public boost::fibers::algo::algorithm_with_properties< priority_props > {
private:
typedef boost::fibers::scheduler::ready_queue_type/*< See [link ready_queue_t]. >*/ rqueue_t;
rqueue_t rqueue_;
std::mutex mtx_{};
std::condition_variable cnd_{};
bool flag_{ false };
public:
priority_scheduler() :
rqueue_() {
}
// For a subclass of algorithm_with_properties<>, it's important to
// override the correct awakened() overload.
/*<< You must override the [member_link algorithm_with_properties..awakened]
method. This is how your scheduler receives notification of a
fiber that has become ready to run. >>*/
virtual void awakened( boost::fibers::context * ctx, priority_props & props) noexcept {
int ctx_priority = props.get_priority(); /*< `props` is the instance of
priority_props associated
with the passed fiber `ctx`. >*/
// With this scheduler, fibers with higher priority values are
// preferred over fibers with lower priority values. But fibers with
// equal priority values are processed in round-robin fashion. So when
// we're handed a new context*, put it at the end of the fibers
// with that same priority. In other words: search for the first fiber
// in the queue with LOWER priority, and insert before that one.
rqueue_t::iterator i( std::find_if( rqueue_.begin(), rqueue_.end(),
[ctx_priority,this]( boost::fibers::context & c)
{ return properties( &c ).get_priority() < ctx_priority; }));
// Now, whether or not we found a fiber with lower priority,
// insert this new fiber here.
rqueue_.insert( i, * ctx);
//<-
std::cout << "awakened(" << props.name << "): ";
describe_ready_queue();
//->
}
/*<< You must override the [member_link algorithm_with_properties..pick_next]
method. This is how your scheduler actually advises the fiber manager
of the next fiber to run. >>*/
virtual boost::fibers::context * pick_next() noexcept {
// if ready queue is empty, just tell caller
if ( rqueue_.empty() ) {
return nullptr;
}
boost::fibers::context * ctx( & rqueue_.front() );
rqueue_.pop_front();
//<-
std::cout << "pick_next() resuming " << properties( ctx).name << ": ";
describe_ready_queue();
//->
return ctx;
}
/*<< You must override [member_link algorithm_with_properties..has_ready_fibers]
to inform the fiber manager of the state of your ready queue. >>*/
virtual bool has_ready_fibers() const noexcept {
return ! rqueue_.empty();
}
/*<< Overriding [member_link algorithm_with_properties..property_change]
is optional. This override handles the case in which the running
fiber changes the priority of another ready fiber: a fiber already in
our queue. In that case, move the updated fiber within the queue. >>*/
virtual void property_change( boost::fibers::context * ctx, priority_props & props) noexcept {
// Although our priority_props class defines multiple properties, only
// one of them (priority) actually calls notify() when changed. The
// point of a property_change() override is to reshuffle the ready
// queue according to the updated priority value.
//<-
std::cout << "property_change(" << props.name << '(' << props.get_priority()
<< ")): ";
//->
// 'ctx' might not be in our queue at all, if caller is changing the
// priority of (say) the running fiber. If it's not there, no need to
// move it: we'll handle it next time it hits awakened().
if ( ! ctx->ready_is_linked()) { /*<
Your `property_change()` override must be able to
handle the case in which the passed `ctx` is not in
your ready queue. It might be running, or it might be
blocked. >*/
//<-
// hopefully user will distinguish this case by noticing that
// the fiber with which we were called does not appear in the
// ready queue at all
describe_ready_queue();
//->
return;
}
// Found ctx: unlink it
ctx->ready_unlink();
// Here we know that ctx was in our ready queue, but we've unlinked
// it. We happen to have a method that will (re-)add a context* to the
// right place in the ready queue.
awakened( ctx, props);
}
//<-
void describe_ready_queue() {
if ( rqueue_.empty() ) {
std::cout << "[empty]";
} else {
const char * delim = "";
for ( boost::fibers::context & ctx : rqueue_) {
priority_props & props( properties( & ctx) );
std::cout << delim << props.name << '(' << props.get_priority() << ')';
delim = ", ";
}
}
std::cout << std::endl;
}
//->
void suspend_until( std::chrono::steady_clock::time_point const& time_point) noexcept {
if ( (std::chrono::steady_clock::time_point::max)() == time_point) {
std::unique_lock< std::mutex > lk( mtx_);
cnd_.wait( lk, [this](){ return flag_; });
flag_ = false;
} else {
std::unique_lock< std::mutex > lk( mtx_);
cnd_.wait_until( lk, time_point, [this](){ return flag_; });
flag_ = false;
}
}
void notify() noexcept {
std::unique_lock< std::mutex > lk( mtx_);
flag_ = true;
lk.unlock();
cnd_.notify_all();
}
};
//]
//[launch
template< typename Fn >
boost::fibers::fiber launch( Fn && func, std::string const& name, int priority) {
boost::fibers::fiber fiber( func);
priority_props & props( fiber.properties< priority_props >() );
props.name = name;
props.set_priority( priority);
return fiber;
}
//]
void yield_fn() {
std::string name( boost::this_fiber::properties< priority_props >().name);
Verbose v( std::string("fiber ") + name);
for ( int i = 0; i < 3; ++i) {
std::cout << "fiber " << name << " yielding" << std::endl;
boost::this_fiber::yield();
}
}
void barrier_fn( boost::fibers::barrier & barrier) {
std::string name( boost::this_fiber::properties< priority_props >().name);
Verbose v( std::string("fiber ") + name);
std::cout << "fiber " << name << " waiting on barrier" << std::endl;
barrier.wait();
std::cout << "fiber " << name << " yielding" << std::endl;
boost::this_fiber::yield();
}
//[change_fn
void change_fn( boost::fibers::fiber & other,
int other_priority,
boost::fibers::barrier& barrier) {
std::string name( boost::this_fiber::properties< priority_props >().name);
Verbose v( std::string("fiber ") + name);
//<-
std::cout << "fiber " << name << " waiting on barrier" << std::endl;
//->
barrier.wait();
// We assume a couple things about 'other':
// - that it was also waiting on the same barrier
// - that it has lower priority than this fiber.
// If both are true, 'other' is now ready to run but is sitting in
// priority_scheduler's ready queue. Change its priority.
priority_props & other_props(
other.properties< priority_props >() );
//<-
std::cout << "fiber " << name << " changing priority of " << other_props.name
<< " to " << other_priority << std::endl;
//->
other_props.set_priority( other_priority);
}
//]
//[main
int main( int argc, char *argv[]) {
// make sure we use our priority_scheduler rather than default round_robin
boost::fibers::use_scheduling_algorithm< priority_scheduler >();
/*= ...*/
/*=}*/
//]
Verbose v("main()");
// for clarity
std::cout << "main() setting name" << std::endl;
//[main_name
boost::this_fiber::properties< priority_props >().name = "main";
//]
std::cout << "main() running tests" << std::endl;
{
Verbose v("high-priority first", "stop\n");
// verify that high-priority fiber always gets scheduled first
boost::fibers::fiber low( launch( yield_fn, "low", 1) );
boost::fibers::fiber med( launch( yield_fn, "medium", 2) );
boost::fibers::fiber hi( launch( yield_fn, "high", 3) );
std::cout << "main: high.join()" << std::endl;
hi.join();
std::cout << "main: medium.join()" << std::endl;
med.join();
std::cout << "main: low.join()" << std::endl;
low.join();
}
{
Verbose v("same priority round-robin", "stop\n");
// fibers of same priority are scheduled in round-robin order
boost::fibers::fiber a( launch( yield_fn, "a", 0) );
boost::fibers::fiber b( launch( yield_fn, "b", 0) );
boost::fibers::fiber c( launch( yield_fn, "c", 0) );
std::cout << "main: a.join()" << std::endl;
a.join();
std::cout << "main: b.join()" << std::endl;
b.join();
std::cout << "main: c.join()" << std::endl;
c.join();
}
{
Verbose v("barrier wakes up all", "stop\n");
// using a barrier wakes up all waiting fibers at the same time
boost::fibers::barrier barrier( 3);
boost::fibers::fiber low( launch( [&barrier](){ barrier_fn( barrier); }, "low", 1) );
boost::fibers::fiber med( launch( [&barrier](){ barrier_fn( barrier); }, "medium", 2) );
boost::fibers::fiber hi( launch( [&barrier](){ barrier_fn( barrier); }, "high", 3) );
std::cout << "main: low.join()" << std::endl;
low.join();
std::cout << "main: medium.join()" << std::endl;
med.join();
std::cout << "main: high.join()" << std::endl;
hi.join();
}
{
Verbose v("change priority", "stop\n");
// change priority of a fiber in priority_scheduler's ready queue
boost::fibers::barrier barrier( 3);
boost::fibers::fiber c( launch( [&barrier](){ barrier_fn( barrier); }, "c", 1) );
boost::fibers::fiber a( launch( [&c,&barrier]() { change_fn( c, 3, barrier); }, "a", 3) );
boost::fibers::fiber b( launch( [&barrier](){ barrier_fn( barrier); }, "b", 2) );
std::cout << "main: a.join()" << std::endl;
std::cout << "main: a.join()" << std::endl;
a.join();
std::cout << "main: b.join()" << std::endl;
b.join();
std::cout << "main: c.join()" << std::endl;
c.join();
}
std::cout << "done." << std::endl;
return EXIT_SUCCESS;
}
|