1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567
|
[/
Copyright Oliver Kowalke 2014.
Distributed under the Boost Software License, Version 1.0.
(See accompanying file LICENSE_1_0.txt or copy at
http://www.boost.org/LICENSE_1_0.txt
]
[section:motivation Motivation]
In order to support a broad range of execution control behaviour the coroutine
types of __acoro__ can be used to ['escape-and-reenter] loops, to
['escape-and-reenter] recursive computations and for ['cooperative] multitasking
helping to solve problems in a much simpler and more elegant way than with only
a single flow of control.
[heading event-driven model]
The event-driven model is a programming paradigm where the flow of a program is
determined by events. The events are generated by multiple independent sources
and an event-dispatcher, waiting on all external sources, triggers callback
functions (event-handlers) whenever one of those events is detected (event-loop).
The application is divided into event selection (detection) and event handling.
[$../../../../libs/coroutine2/doc/images/event_model.png [align center]]
The resulting applications are highly scalable, flexible, have high
responsiveness and the components are loosely coupled. This makes the event-driven
model suitable for user interface applications, rule-based productions systems
or applications dealing with asynchronous I/O (for instance network servers).
[heading event-based asynchronous paradigm]
A classic synchronous console program issues an I/O request (e.g. for user
input or filesystem data) and blocks until the request is complete.
In contrast, an asynchronous I/O function initiates the physical operation but
immediately returns to its caller, even though the operation is not yet
complete. A program written to leverage this functionality does not block: it
can proceed with other work (including other I/O requests in parallel) while
the original operation is still pending. When the operation completes, the
program is notified. Because asynchronous applications spend less overall time
waiting for operations, they can outperform synchronous programs.
Events are one of the paradigms for asynchronous execution, but
not all asynchronous systems use events.
Although asynchronous programming can be done using threads, they come with
their own costs:
* hard to program (traps for the unwary)
* memory requirements are high
* large overhead with creation and maintenance of thread state
* expensive context switching between threads
The event-based asynchronous model avoids those issues:
* simpler because of the single stream of instructions
* much less expensive context switches
The downside of this paradigm consists in a sub-optimal program
structure. An event-driven program is required to split its code into
multiple small callback functions, i.e. the code is organized in a sequence of
small steps that execute intermittently. An algorithm that would usually be expressed
as a hierarchy of functions and loops must be transformed into callbacks. The
complete state has to be stored into a data structure while the control flow
returns to the event-loop.
As a consequence, event-driven applications are often tedious and confusing to
write. Each callback introduces a new scope, error callback etc. The
sequential nature of the algorithm is split into multiple callstacks,
making the application hard to debug. Exception handlers are restricted to
local handlers: it is impossible to wrap a sequence of events into a single
try-catch block.
The use of local variables, while/for loops, recursions etc. together with the
event-loop is not possible. The code becomes less expressive.
In the past, code using asio's ['asynchronous operations] was convoluted by
callback functions.
class session
{
public:
session(boost::asio::io_service& io_service) :
socket_(io_service) // construct a TCP-socket from io_service
{}
tcp::socket& socket(){
return socket_;
}
void start(){
// initiate asynchronous read; handle_read() is callback-function
socket_.async_read_some(boost::asio::buffer(data_,max_length),
boost::bind(&session::handle_read,this,
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred));
}
private:
void handle_read(const boost::system::error_code& error,
size_t bytes_transferred){
if (!error)
// initiate asynchronous write; handle_write() is callback-function
boost::asio::async_write(socket_,
boost::asio::buffer(data_,bytes_transferred),
boost::bind(&session::handle_write,this,
boost::asio::placeholders::error));
else
delete this;
}
void handle_write(const boost::system::error_code& error){
if (!error)
// initiate asynchronous read; handle_read() is callback-function
socket_.async_read_some(boost::asio::buffer(data_,max_length),
boost::bind(&session::handle_read,this,
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred));
else
delete this;
}
boost::asio::ip::tcp::socket socket_;
enum { max_length=1024 };
char data_[max_length];
};
In this example, a simple echo server, the logic is split into three member
functions - local state (such as data buffer) is moved to member variables.
__boost_asio__ provides with its new ['asynchronous result] feature a new
framework combining event-driven model and coroutines, hiding the complexity
of event-driven programming and permitting the style of classic sequential code.
The application is not required to pass callback functions to asynchronous
operations and local state is kept as local variables. Therefore the code
is much easier to read and understand.
[footnote Christopher Kohlhoff,
[@ http://www.open-std.org/jtc1/sc22/wg21/docs/papers/2014/n3964.pdf
N3964 - Library Foundations for Asynchronous Operations, Revision 1]].
void session(boost::asio::io_service& io_service){
// construct TCP-socket from io_service
boost::asio::ip::tcp::socket socket(io_service);
try{
for(;;){
// local data-buffer
char data[max_length];
boost::system::error_code ec;
// read asynchronous data from socket
// execution context will be suspended until
// some bytes are read from socket
std::size_t length=socket.async_read_some(
boost::asio::buffer(data),
boost::asio::yield[ec]);
if (ec==boost::asio::error::eof)
break; //connection closed cleanly by peer
else if(ec)
throw boost::system::system_error(ec); //some other error
// write some bytes asynchronously
boost::asio::async_write(
socket,
boost::asio::buffer(data,length),
boost::asio::yield[ec]);
if (ec==boost::asio::error::eof)
break; //connection closed cleanly by peer
else if(ec)
throw boost::system::system_error(ec); //some other error
}
} catch(std::exception const& e){
std::cerr<<"Exception: "<<e.what()<<"\n";
}
}
In contrast to the previous example this one gives the impression of sequential
code and local data (['data]) while using asynchronous operations
(['async_read()], ['async_write()]). The algorithm is implemented in one
function and error handling is done by one try-catch block.
[heading recursive descent parsing]
Coroutines let you invert the flow of control so you can ask a recursive descent
parser for parsed symbols.
class Parser{
char next;
std::istream& is;
std::function<void(char)> cb;
char pull(){
return std::char_traits<char>::to_char_type(is.get());
}
void scan(){
do{
next=pull();
}
while(isspace(next));
}
public:
Parser(std::istream& is_,std::function<void(char)> cb_) :
next(), is(is_), cb(cb_)
{}
void run() {
scan();
E();
}
private:
void E(){
T();
while (next=='+'||next=='-'){
cb(next);
scan();
T();
}
}
void T(){
S();
while (next=='*'||next=='/'){
cb(next);
scan();
S();
}
}
void S(){
if (std::isdigit(next)){
cb(next);
scan();
}
else if(next=='('){
cb(next);
scan();
E();
if (next==')'){
cb(next);
scan();
}else{
throw parser_error();
}
}
else{
throw parser_error();
}
}
};
typedef boost::coroutines2::coroutine< char > coro_t;
int main() {
std::istringstream is("1+1");
// invert control flow
coro_t::pull_type seq(
boost::coroutines2::fixedsize_stack(),
[&is](coro_t::push_type & yield) {
// create parser with callback function
Parser p( is,
[&yield](char ch){
// resume user-code
yield(ch);
});
// start recursive parsing
p.run();
});
// user-code pulls parsed data from parser
// invert control flow
for(char c:seq){
printf("Parsed: %c\n",c);
}
}
This problem does not map at all well to communicating between independent
threads. It makes no sense for either side to proceed independently of the
other. You want them to pass control back and forth.
There's yet another advantage to using coroutines. This recursive descent parser
throws an exception when parsing fails. With a coroutine implementation, you
need only wrap the calling code in try/catch.
With communicating threads, you would have to arrange to catch the exception
and pass along the exception pointer on the same queue you're using to deliver
the other events. You would then have to rethrow the exception to unwind the
recursive document processing.
The coroutine solution maps very naturally to the problem space.
[heading 'same fringe' problem]
The advantages of suspending at an arbitrary call depth can be seen
particularly clearly with the use of a recursive function, such as traversal
of trees.
If traversing two different trees in the same deterministic order produces the
same list of leaf nodes, then both trees have the same fringe.
[$../../../../libs/coroutine2/doc/images/same_fringe.png [align center]]
Both trees in the picture have the same fringe even though the structure of the
trees is different.
The same fringe problem could be solved using coroutines by iterating over the
leaf nodes and comparing this sequence via ['std::equal()]. The range of data
values is generated by function ['traverse()] which recursively traverses the
tree and passes each node's data value to its __push_coro__.
__push_coro__ suspends the recursive computation and transfers the data value to
the main execution context.
__pull_coro_it__, created from __pull_coro__, steps over those data values and
delivers them to ['std::equal()] for comparison. Each increment of
__pull_coro_it__ resumes ['traverse()]. Upon return from
['iterator::operator++()], either a new data value is available, or tree
traversal is finished (iterator is invalidated).
In effect, the coroutine iterator presents a flattened view of the recursive
data structure.
struct node{
typedef std::shared_ptr<node> ptr_t;
// Each tree node has an optional left subtree,
// an optional right subtree and a value of its own.
// The value is considered to be between the left
// subtree and the right.
ptr_t left,right;
std::string value;
// construct leaf
node(const std::string& v):
left(),right(),value(v)
{}
// construct nonleaf
node(ptr_t l,const std::string& v,ptr_t r):
left(l),right(r),value(v)
{}
static ptr_t create(const std::string& v){
return ptr_t(new node(v));
}
static ptr_t create(ptr_t l,const std::string& v,ptr_t r){
return ptr_t(new node(l,v,r));
}
};
node::ptr_t create_left_tree_from(const std::string& root){
/* --------
root
/ \
b e
/ \
a c
-------- */
return node::create(
node::create(
node::create("a"),
"b",
node::create("c")),
root,
node::create("e"));
}
node::ptr_t create_right_tree_from(const std::string& root){
/* --------
root
/ \
a d
/ \
c e
-------- */
return node::create(
node::create("a"),
root,
node::create(
node::create("c"),
"d",
node::create("e")));
}
typedef boost::coroutines2::coroutine<std::string> coro_t;
// recursively walk the tree, delivering values in order
void traverse(node::ptr_t n,
coro_t::push_type& out){
if(n->left) traverse(n->left,out);
out(n->value);
if(n->right) traverse(n->right,out);
}
// evaluation
{
node::ptr_t left_d(create_left_tree_from("d"));
coro_t::pull_type left_d_reader([&](coro_t::push_type & out){
traverse(left_d,out);
});
node::ptr_t right_b(create_right_tree_from("b"));
coro_t::pull_type right_b_reader([&](coro_t::push_type & out){
traverse(right_b,out);
});
std::cout << "left tree from d == right tree from b? "
<< std::boolalpha
<< std::equal(begin(left_d_reader),
end(left_d_reader),
begin(right_b_reader))
<< std::endl;
}
{
node::ptr_t left_d(create_left_tree_from("d"));
coro_t::pull_type left_d_reader([&](coro_t::push_type & out){
traverse(left_d,out);
});
node::ptr_t right_x(create_right_tree_from("x"));
coro_t::pull_type right_x_reader([&](coro_t::push_type & out){
traverse(right_x,out);
});
std::cout << "left tree from d == right tree from x? "
<< std::boolalpha
<< std::equal(begin(left_d_reader),
end(left_d_reader),
begin(right_x_reader))
<< std::endl;
}
std::cout << "Done" << std::endl;
output:
left tree from d == right tree from b? true
left tree from d == right tree from x? false
Done
[heading chaining coroutines]
This code shows how coroutines could be chained.
typedef boost::coroutines2::coroutine<std::string> coro_t;
// deliver each line of input stream to sink as a separate string
void readlines(coro_t::push_type& sink,std::istream& in){
std::string line;
while(std::getline(in,line))
sink(line);
}
void tokenize(coro_t::push_type& sink, coro_t::pull_type& source){
// This tokenizer doesn't happen to be stateful: you could reasonably
// implement it with a single call to push each new token downstream. But
// I've worked with stateful tokenizers, in which the meaning of input
// characters depends in part on their position within the input line.
for(std::string line:source){
std::string::size_type pos=0;
while(pos<line.length()){
if(line[pos]=='"'){
std::string token;
++pos; // skip open quote
while(pos<line.length()&&line[pos]!='"')
token+=line[pos++];
++pos; // skip close quote
sink(token); // pass token downstream
} else if (std::isspace(line[pos])){
++pos; // outside quotes, ignore whitespace
} else if (std::isalpha(line[pos])){
std::string token;
while (pos < line.length() && std::isalpha(line[pos]))
token += line[pos++];
sink(token); // pass token downstream
} else { // punctuation
sink(std::string(1,line[pos++]));
}
}
}
}
void only_words(coro_t::push_type& sink,coro_t::pull_type& source){
for(std::string token:source){
if (!token.empty() && std::isalpha(token[0]))
sink(token);
}
}
void trace(coro_t::push_type& sink, coro_t::pull_type& source){
for(std::string token:source){
std::cout << "trace: '" << token << "'\n";
sink(token);
}
}
struct FinalEOL{
~FinalEOL(){
std::cout << std::endl;
}
};
void layout(coro_t::pull_type& source,int num,int width){
// Finish the last line when we leave by whatever means
FinalEOL eol;
// Pull values from upstream, lay them out 'num' to a line
for (;;){
for (int i = 0; i < num; ++i){
// when we exhaust the input, stop
if (!source) return;
std::cout << std::setw(width) << source.get();
// now that we've handled this item, advance to next
source();
}
// after 'num' items, line break
std::cout << std::endl;
}
}
// For example purposes, instead of having a separate text file in the
// local filesystem, construct an istringstream to read.
std::string data(
"This is the first line.\n"
"This, the second.\n"
"The third has \"a phrase\"!\n"
);
{
std::cout << "\nfilter:\n";
std::istringstream infile(data);
coro_t::pull_type reader(std::bind(readlines, _1, std::ref(infile)));
coro_t::pull_type tokenizer(std::bind(tokenize, _1, std::ref(reader)));
coro_t::pull_type filter(std::bind(only_words, _1, std::ref(tokenizer)));
coro_t::pull_type tracer(std::bind(trace, _1, std::ref(filter)));
for(std::string token:tracer){
// just iterate, we're already pulling through tracer
}
}
{
std::cout << "\nlayout() as coroutine::push_type:\n";
std::istringstream infile(data);
coro_t::pull_type reader(std::bind(readlines, _1, std::ref(infile)));
coro_t::pull_type tokenizer(std::bind(tokenize, _1, std::ref(reader)));
coro_t::pull_type filter(std::bind(only_words, _1, std::ref(tokenizer)));
coro_t::push_type writer(std::bind(layout, _1, 5, 15));
for(std::string token:filter){
writer(token);
}
}
{
std::cout << "\nfiltering output:\n";
std::istringstream infile(data);
coro_t::pull_type reader(std::bind(readlines,_1,std::ref(infile)));
coro_t::pull_type tokenizer(std::bind(tokenize,_1,std::ref(reader)));
coro_t::push_type writer(std::bind(layout,_1,5,15));
// Because of the symmetry of the API, we can use any of these
// chaining functions in a push_type coroutine chain as well.
coro_t::push_type filter(std::bind(only_words,std::ref(writer),_1));
for(std::string token:tokenizer){
filter(token);
}
}
[endsect]
|