File: priority.cpp

package info (click to toggle)
boost1.83 1.83.0-5
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 545,632 kB
  • sloc: cpp: 3,857,086; xml: 125,552; ansic: 34,414; python: 25,887; asm: 5,276; sh: 4,799; ada: 1,681; makefile: 1,629; perl: 1,212; pascal: 1,139; sql: 810; yacc: 478; ruby: 102; lisp: 24; csh: 6
file content (353 lines) | stat: -rw-r--r-- 13,290 bytes parent folder | download | duplicates (13)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
//          Copyright Nat Goodspeed 2014.
// Distributed under the Boost Software License, Version 1.0.
//    (See accompanying file LICENSE_1_0.txt or copy at
//          http://www.boost.org/LICENSE_1_0.txt)

#include <chrono>
#include <condition_variable>
#include <iostream>
#include <mutex>
#include <algorithm>                // std::find_if()

#include <boost/fiber/all.hpp>
#include <boost/fiber/scheduler.hpp>

class Verbose {
public:
    Verbose( std::string const& d, std::string const& s="stop") :
        desc( d),
        stop( s) {
        std::cout << desc << " start" << std::endl;
    }

    ~Verbose() {
        std::cout << desc << ' ' << stop << std::endl;
    }

    Verbose( Verbose const&) = delete;
    Verbose & operator=( Verbose const&) = delete;

private:
    std::string     desc;
    std::string     stop;
};

//[priority_props
class priority_props : public boost::fibers::fiber_properties {
public:
    priority_props( boost::fibers::context * ctx):
        fiber_properties( ctx), /*< Your subclass constructor must accept a 
                                 [^[class_link context]*] and pass it to
                                 the `fiber_properties` constructor. >*/
        priority_( 0) {
    }

    int get_priority() const {
        return priority_; /*< Provide read access methods at your own discretion. >*/
    }

    // Call this method to alter priority, because we must notify
    // priority_scheduler of any change.
    void set_priority( int p) { /*<
            It's important to call `notify()` on any
            change in a property that can affect the
            scheduler's behavior. Therefore, such
            modifications should only be performed
            through an access method. >*/
        // Of course, it's only worth reshuffling the queue and all if we're
        // actually changing the priority.
        if ( p != priority_) {
            priority_ = p;
            notify();
        }
    }

    // The fiber name of course is solely for purposes of this example
    // program; it has nothing to do with implementing scheduler priority.
    // This is a public data member -- not requiring set/get access methods --
    // because we need not inform the scheduler of any change.
    std::string name; /*< A property that does not affect the scheduler does
                          not need access methods. >*/
private:
    int priority_;
};
//]

//[priority_scheduler
class priority_scheduler :
    public boost::fibers::algo::algorithm_with_properties< priority_props > {
private:
    typedef boost::fibers::scheduler::ready_queue_type/*< See [link ready_queue_t]. >*/   rqueue_t;

    rqueue_t                                rqueue_;
    std::mutex                  mtx_{};
    std::condition_variable     cnd_{};
    bool                        flag_{ false };

public:
    priority_scheduler() :
        rqueue_() {
    }

    // For a subclass of algorithm_with_properties<>, it's important to
    // override the correct awakened() overload.
    /*<< You must override the [member_link algorithm_with_properties..awakened]
         method. This is how your scheduler receives notification of a
         fiber that has become ready to run. >>*/
    virtual void awakened( boost::fibers::context * ctx, priority_props & props) noexcept {
        int ctx_priority = props.get_priority(); /*< `props` is the instance of
                                                   priority_props associated
                                                   with the passed fiber `ctx`. >*/
        // With this scheduler, fibers with higher priority values are
        // preferred over fibers with lower priority values. But fibers with
        // equal priority values are processed in round-robin fashion. So when
        // we're handed a new context*, put it at the end of the fibers
        // with that same priority. In other words: search for the first fiber
        // in the queue with LOWER priority, and insert before that one.
        rqueue_t::iterator i( std::find_if( rqueue_.begin(), rqueue_.end(),
            [ctx_priority,this]( boost::fibers::context & c)
            { return properties( &c ).get_priority() < ctx_priority; }));
        // Now, whether or not we found a fiber with lower priority,
        // insert this new fiber here.
        rqueue_.insert( i, * ctx);
//<-

        std::cout << "awakened(" << props.name << "): ";
        describe_ready_queue();
//->
    }

    /*<< You must override the [member_link algorithm_with_properties..pick_next]
         method. This is how your scheduler actually advises the fiber manager
         of the next fiber to run. >>*/
    virtual boost::fibers::context * pick_next() noexcept {
        // if ready queue is empty, just tell caller
        if ( rqueue_.empty() ) {
            return nullptr;
        }
        boost::fibers::context * ctx( & rqueue_.front() );
        rqueue_.pop_front();
//<-
        std::cout << "pick_next() resuming " << properties( ctx).name << ": ";
        describe_ready_queue();
//->
        return ctx;
    }

    /*<< You must override [member_link algorithm_with_properties..has_ready_fibers]
      to inform the fiber manager of the state of your ready queue. >>*/
    virtual bool has_ready_fibers() const noexcept {
        return ! rqueue_.empty();
    }

    /*<< Overriding [member_link algorithm_with_properties..property_change]
         is optional. This override handles the case in which the running
         fiber changes the priority of another ready fiber: a fiber already in
         our queue. In that case, move the updated fiber within the queue. >>*/
    virtual void property_change( boost::fibers::context * ctx, priority_props & props) noexcept {
        // Although our priority_props class defines multiple properties, only
        // one of them (priority) actually calls notify() when changed. The
        // point of a property_change() override is to reshuffle the ready
        // queue according to the updated priority value.
//<-
        std::cout << "property_change(" << props.name << '(' << props.get_priority()
                  << ")): ";
//->

        // 'ctx' might not be in our queue at all, if caller is changing the
        // priority of (say) the running fiber. If it's not there, no need to
        // move it: we'll handle it next time it hits awakened().
        if ( ! ctx->ready_is_linked()) { /*<
            Your `property_change()` override must be able to
            handle the case in which the passed `ctx` is not in
            your ready queue. It might be running, or it might be
            blocked. >*/
//<-
            // hopefully user will distinguish this case by noticing that
            // the fiber with which we were called does not appear in the
            // ready queue at all
            describe_ready_queue();
//->
            return;
        }

        // Found ctx: unlink it
        ctx->ready_unlink();

        // Here we know that ctx was in our ready queue, but we've unlinked
        // it. We happen to have a method that will (re-)add a context* to the
        // right place in the ready queue.
        awakened( ctx, props);
    }
//<-

    void describe_ready_queue() {
        if ( rqueue_.empty() ) {
            std::cout << "[empty]";
        } else {
            const char * delim = "";
            for ( boost::fibers::context & ctx : rqueue_) {
                priority_props & props( properties( & ctx) );
                std::cout << delim << props.name << '(' << props.get_priority() << ')';
                delim = ", ";
            }
        }
        std::cout << std::endl;
    }
//->

    void suspend_until( std::chrono::steady_clock::time_point const& time_point) noexcept {
        if ( (std::chrono::steady_clock::time_point::max)() == time_point) {
            std::unique_lock< std::mutex > lk( mtx_);
            cnd_.wait( lk, [this](){ return flag_; });
            flag_ = false;
        } else {
            std::unique_lock< std::mutex > lk( mtx_);
            cnd_.wait_until( lk, time_point, [this](){ return flag_; });
            flag_ = false;
        }
    }

    void notify() noexcept {
        std::unique_lock< std::mutex > lk( mtx_);
        flag_ = true;
        lk.unlock();
        cnd_.notify_all();
    }
};
//]

//[launch
template< typename Fn >
boost::fibers::fiber launch( Fn && func, std::string const& name, int priority) {
    boost::fibers::fiber fiber( func);
    priority_props & props( fiber.properties< priority_props >() );
    props.name = name;
    props.set_priority( priority);
    return fiber;
}
//]

void yield_fn() {
    std::string name( boost::this_fiber::properties< priority_props >().name);
    Verbose v( std::string("fiber ") + name);
    for ( int i = 0; i < 3; ++i) {
        std::cout << "fiber " << name << " yielding" << std::endl;
        boost::this_fiber::yield();
    }
}

void barrier_fn( boost::fibers::barrier & barrier) {
    std::string name( boost::this_fiber::properties< priority_props >().name);
    Verbose v( std::string("fiber ") + name);
    std::cout << "fiber " << name << " waiting on barrier" << std::endl;
    barrier.wait();
    std::cout << "fiber " << name << " yielding" << std::endl;
    boost::this_fiber::yield();
}

//[change_fn
void change_fn( boost::fibers::fiber & other,
                int other_priority,
                boost::fibers::barrier& barrier) {
    std::string name( boost::this_fiber::properties< priority_props >().name);
    Verbose v( std::string("fiber ") + name);

//<-
    std::cout << "fiber " << name << " waiting on barrier" << std::endl;
//->
    barrier.wait();
    // We assume a couple things about 'other':
    // - that it was also waiting on the same barrier
    // - that it has lower priority than this fiber.
    // If both are true, 'other' is now ready to run but is sitting in
    // priority_scheduler's ready queue. Change its priority.
    priority_props & other_props(
            other.properties< priority_props >() );
//<-
    std::cout << "fiber " << name << " changing priority of " << other_props.name
              << " to " << other_priority << std::endl;
//->
    other_props.set_priority( other_priority);
}
//]

//[main
int main( int argc, char *argv[]) {
    // make sure we use our priority_scheduler rather than default round_robin
    boost::fibers::use_scheduling_algorithm< priority_scheduler >();
/*=    ...*/
/*=}*/
//]
    Verbose v("main()");

    // for clarity
    std::cout << "main() setting name" << std::endl;
//[main_name
    boost::this_fiber::properties< priority_props >().name = "main";
//]
    std::cout << "main() running tests" << std::endl;

    {
        Verbose v("high-priority first", "stop\n");
        // verify that high-priority fiber always gets scheduled first
        boost::fibers::fiber low( launch( yield_fn, "low",    1) );
        boost::fibers::fiber med( launch( yield_fn, "medium", 2) );
        boost::fibers::fiber hi( launch( yield_fn,  "high",   3) );
        std::cout << "main: high.join()" << std::endl;
        hi.join();
        std::cout << "main: medium.join()" << std::endl;
        med.join();
        std::cout << "main: low.join()" << std::endl;
        low.join();
    }

    {
        Verbose v("same priority round-robin", "stop\n");
        // fibers of same priority are scheduled in round-robin order
        boost::fibers::fiber a( launch( yield_fn, "a", 0) );
        boost::fibers::fiber b( launch( yield_fn, "b", 0) );
        boost::fibers::fiber c( launch( yield_fn, "c", 0) );
        std::cout << "main: a.join()" << std::endl;
        a.join();
        std::cout << "main: b.join()" << std::endl;
        b.join();
        std::cout << "main: c.join()" << std::endl;
        c.join();
    }

    {
        Verbose v("barrier wakes up all", "stop\n");
        // using a barrier wakes up all waiting fibers at the same time
        boost::fibers::barrier barrier( 3);
        boost::fibers::fiber low( launch( [&barrier](){ barrier_fn( barrier); }, "low",    1) );
        boost::fibers::fiber med( launch( [&barrier](){ barrier_fn( barrier); }, "medium", 2) );
        boost::fibers::fiber hi( launch( [&barrier](){ barrier_fn( barrier); },  "high",   3) );
        std::cout << "main: low.join()" << std::endl;
        low.join();
        std::cout << "main: medium.join()" << std::endl;
        med.join();
        std::cout << "main: high.join()" << std::endl;
        hi.join();
    }

    {
        Verbose v("change priority", "stop\n");
        // change priority of a fiber in priority_scheduler's ready queue
        boost::fibers::barrier barrier( 3);
        boost::fibers::fiber c( launch( [&barrier](){ barrier_fn( barrier); }, "c", 1) );
        boost::fibers::fiber a( launch( [&c,&barrier]() { change_fn( c, 3, barrier); }, "a", 3) );
        boost::fibers::fiber b( launch( [&barrier](){ barrier_fn( barrier); }, "b", 2) );
        std::cout << "main: a.join()" << std::endl;
        std::cout << "main: a.join()" << std::endl;
        a.join();
        std::cout << "main: b.join()" << std::endl;
        b.join();
        std::cout << "main: c.join()" << std::endl;
        c.join();
    }

    std::cout << "done." << std::endl;

    return EXIT_SUCCESS;
}