1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757
|
{1 The Equeue, Unixqueue, and Engines HOWTO}
This document tries to explain briefly what you can do with the
Equeue library. It is more superficial than {!Equeue_intro}, and
gives some recipes how to do things.
Contents:
- {!Equeue_howto.about}
- {!Equeue_howto.esys}
- {!Equeue_howto.engines}
- {!Equeue_howto.overview}
- {!Equeue_howto.tricks}
- {!Equeue_howto.lwt}
{2:about What is this about?}
We talk here about a form of concurrent programming, sometimes called
lightweight threading or cooperative threading. As for all concurrency
mechanisms, the ultimate goal is to do several things in
parallel. This type, however, focuses especially on I/O, because the
points where the execution threads can be switched are usually the
points where a thread needs to stop for waiting on an I/O resource.
There is no preemption: When normal OCaml code is executed, there is
no possibility to switch the thread. First when the control is
returned to the Equeue library, a different thread can be selected for
execution.
There is also no parallelism: All execution happens in the context
of the process or kernel thread running the code. It is only possible
to use one CPU core.
Note that LWT is another popular (and younger) implementation of the
same idea for OCaml. It is possible to use LWT together with Ocamlnet,
but there are restrictions. We'll explain this later in this article.
{2:esys What are event systems?}
You will often see the type
{[ Unixqueue.event_system ]}
in signatures of functions. An event system bundles all resources that
are needed to run cooperative threads, like watched file descriptors,
handlers for file events, and timers. It is the common anchor point
for all activities that will happen concurrently:
- If you define several cooperative threads for the same [event_system],
it is possible to run them concurrently.
- You can have any number of [event_system] objects in your program.
However, once you attach cooperative threads to different event
systems, they cannot run together anymore.
- Having several event systems makes nevertheless sense in a number
of scenarios. For example, you could write a library function that
will do a number of I/O actions concurrently, but when all I/O
is finished, the function returns normally (and stops any concurrent
execution). In this case you would use a local event system that exists
for the lifetime of this function only.
- A more extreme model is to use only one event system for the whole
program. This, however, means that the whole program must follow
a programming style that is compatible with events.
The [event_system] object is usually passed on from one function call
to the next. There is no global event system. (NB. If you develop for
Netplex, there is a pseudo-global event system for every Netplex container.
But this just means that you can define your own global event system if
you need it for your application.)
{b How to create an event system:}
{[
let esys = Unixqueue.standard_event_system()
]}
An older name of the same is [Unixqueue.create_unix_event_system].
There is also a second implementation which uses accelerated poll interfaces
if provided by the operating system (e.g. epoll on Linux):
{[
let esys = Unixqueue.performance_event_system()
]}
This, however, is only an advantage if you have hundreds of file
descriptors to observe.
{b How to attach actions to event systems:}
The abstraction of event systems defines an API allowing one to interact
with it. This is available in the {!Unixqueue} module. Normally, however,
you don't use this module directly, because it is {i very low-level}.
So, let's look directly at high-level interfaces. For example, the
{!Nethttp_client} uses event systems internally, and one can also control
this aspect of it. When creating an {!Nethttp_client.pipeline} object,
just set the event system to the one you want to use. This attaches
the whole HTTP protocol interpreter implemented by this object to the
event system:
{[
let pipeline = new Nethttp_client.pipeline
let () = pipeline # set_event_system esys
]}
Note that you can attach other pipelines and even unrelated, other I/O
actions to the same event system. This just means, as mentioned above,
that these actions are done concurrently.
The HTTP pipeline is initially empty, i.e. it does nothing. Before something
can happen, you need to program it, i.e. add tasks to do. For example:
{[
pipeline # add_with_callback
(new Nethttp_client.get "http://caml.inria.fr/")
(fun get -> ...)
]}
The [add_with_callback] method adds the HTTP task to run to the internal
queue. Also, there is a callback function which gets invoked when the
task is done.
If you enter the shown OCaml code into a toploop, you will notice that
no I/O occurs so far. Adding something to the internal task queue does not
yet trigger that it is executed. This is meant as a feature of all
Equeue-based concurrency: You have the chance to set the machinery up
before it starts running.
This example showed how to deal with HTTP clients. What about other
network protocols? The scheme is always the same: The event system object
needs to be passed down to the protocol interpreter, either directly
at creation time, or directly after that.
{b How to run event systems}
The remaining question is now how to start the execution after everything
is set up. This is normally done with
{[
Unixqueue.run esys
]}
This single statement starts whatever action was previously configured,
and returns first when the action is completely finished. In our
example this means it covers the whole HTTP GET protocol.
It is allowed to modify the scene while something is already happening.
For example, you could download a second HTTP file when the first is
done:
{[
pipeline # add_with_callback
(new Nethttp_client.get "http://caml.inria.fr/")
(fun get1 ->
pipeline # add_with_callback
(new Nethttp_client.get "http://www.camlcity.org/")
(fun get2 -> ...)
)
]}
These "in-place" modifications of what to do are not only allowed at
points like the shown where a part of the action is already complete,
but at any point in time. For example, you can define a timer that
starts the other action, no matter at which point of execution the
running action currently is:
{[
pipeline # add_with_callback
(new Nethttp_client.get "http://caml.inria.fr/")
(fun get1 -> ...);
let g = Unixqueue.new_group esys
Unixqueue.once esys g time
(fun () ->
pipeline # add_with_callback
(new Nethttp_client.get "http://www.camlcity.org/")
(fun get2 -> ...)
)
]}
After [time] seconds the second download is started. (NB. What is the
purpose of [g]? A Unixqueue group can be used for cancelling all actions
associated to the group. In this case for cancelling the timer.)
The HTTP client provides an API style where the completion of an
action is indicated to the user via a callback. This style is easy to
use for beginners, but it has a drawback: There is no uniform way how
to compose more elementary actions to more complex actions. Such
composition is possible as shown in the example, but it is always an
ad-hoc solution.
{b Recursion is your friend}
Let's have a look at such an ad-hoc composition: Assumed we have
a list of URLs we want to download them with high concurrency.
Idea 1: We just add all URLs to the same pipeline, as in
{[
let count = ref 0
List.iter
(fun url ->
pipeline # add_with_callback
(new Nethttp_client.get url)
(fun get ->
decr count;
if !count = 0 then ... (* do some followup action here *)
);
incr count
)
list
]}
and then run the [esys]. This works, but the "intelligence" of the
HTTP pipeline object is only limited. If there are several files to
download from the same server, the pipeline is able to manage to use
only a limited number of connections to do this, and to serialize the
requests over these connections. There is, however, no built-in
mechanism that would limit the number of servers to contact at
once. If you had one million different servers in this list, the
pipeline would try to download from all servers concurrently. Of
course, this will fail (lacking system resources).
Idea 2: We only add a limited number of URLs at a time.
{[
let count = ref 0
let count_max = 10
let list = ref list
let rec maybe_next() =
if !count < count_max then (
match !list with
| [] -> ()
| url :: list' ->
pipeline # add_with_callback
(new Nethttp_client.get url)
(fun get ->
decr count;
maybe_next();
if !count = 0 then ... (* followup action *)
);
incr count;
list := list';
maybe_next()
)
]}
We use here recursion to encode the repetitive algorithm. This is the
mechanism of choice, because we need to continue the loop from the
callback function (an imperative construct could not do so).
Note that recursive calls from callback functions do not fill up the
stack, so you could do this endlessly without risking a stack
overflow.
{b Trap: do not mix synchronous and event-based APIs}
Many protocol interpreters provide both styles of APIs: Conventional
synchronous APIs, and event-based APIs. The question is whether one
can mix them.
This is not possible, because the synchronous API is normally derived
from the event-based API by setting up a one-time action in the event
system and then running the event system. If you mixed the APIs,
it would occur that a running event system is again tried to be run.
This is forbidden, though, and will cause that an exception is thrown.
So: Use the same instance of the protocol interpreter either in a
synchronous way, or in an event-based way, but do not do both.
{2:engines What are engines?}
As we have seen, callbacks are a common way to notify the caller about
state changes. However, callbacks are too primitive to allow
systematic composition of actions. The abstraction of engines has been
developed to fill this gap. As a first approximation, imagine an
engine as a wrapped callback interface: a machinery which is executing
something concurrently until the result is available, with the
possibility of notifying users of the result.
Continuing the HTTP example, there is also an engine-based version of
adding a request to the pipeline:
{[
let e = pipeline # add_e (new Nethttp_client.get url)
]}
This is the same as [add_with_callback] only that the delivery mechanism
is different.
It is possible to attach a callback to an engine:
{[
Uq_engines.when_state
~is_done:(fun () -> ...)
~is_error:(fun ex -> ...)
e
]}
The first function is called when the HTTP request could be processed
successfully, and the second one when a fatal error occurs (with [ex]
being the exception). Using {!Uq_engines.when_state}, every engine-style
interface can be turned into a callback-style interface. Of course, this
is not the primary idea of this abstraction, but this possibility means
that we can go to the lower level of callbacks whenever needed.
The three most important composition operators are available in
{!Uq_engines.Operators}. It is suggested that you [open] this module,
and use the operators directly:
- Sequence: With
{[ e1 ++ (fun r1 -> e2) ]}
the engine [e1] is executed first, and when it has computed the result
[r1], the engine [e2] is started. The result of [++] is again an engine,
so it is possible to concatenate several expressions:
{[ e1 ++ (fun r1 -> e2) ++ (fun r2 -> e3) ++ ... ]}
One can also set the parentheses differently if the previous results
are needed later:
{[ e1 ++ (fun r1 -> e2 ++ (fun r2 -> e3 ++ ... )) ]}
The [++] operator is also available as normal function:
{!Uq_engines.seq_engine}
- Mapping: With
{[ e1 >> (fun st1 -> st2) ]}
one can map the final state of [e1] to a different state. The state
of the engine is either the computed value, the resulting exception,
or the tag that the engine was aborted:
{[
e1 >>
(function
| `Done v -> ...
| `Error ex -> ...
| `Aborted -> ...
)
]}
As you can also have access to exceptions, this construction can be
used to catch exceptions, and to transform them into normal values:
{[
e1 >>
(function
| `Done s -> `Done(Some s)
| `Error Not_found -> `Done None
| `Error ex -> `Error ex
| `Aborted -> `Aborted
)
]}
- Values as engines: If you just want to encapsulate an already existing
value [v] into an engine, use
{[ eps_e (`Done v) esys ]}
or more generally
{[ eps_e st esys ]}
to encapsulate any state. The [eps_e] makes an engine out of a value
by pretending that the value is computed in a single step (the
epsilon step).
Using this, the above example of downloading two files, one after the
other, looks like:
{[
let e =
pipeline # add_e
(new Nethttp_client.get "http://caml.inria.fr/")
++ (fun get1 ->
pipeline # add_e
(new Nethttp_client.get "http://www.camlcity.org/")
++ (fun get2 ->
...;
eps_e (`Done()) pipeline#event_system
)
)
]}
Note that the final result is here just [()], and it is transformed with
[eps_e] into an engine-compatible shape.
{b Getting results out of an engine-based algorithm}
As engines use event systems internally, the constructed complex
engine (like [e] in the previous example) is not immediately started,
but first when the event system runs (unless the event system is
already running). So you still finally need
{[
Unixqueue.run esys
]}
to fire up the prepared engines, and to wait for the result.
You may wonder how to access the result. In the previous example, the
result was just [()], so there is no interest in knowing it. But you
could also just return what you have got, as in
{[
let e =
pipeline # add_e
(new Nethttp_client.get "http://caml.inria.fr/")
++ (fun get1 ->
pipeline # add_e
(new Nethttp_client.get "http://www.camlcity.org/")
++ (fun get2 ->
eps_e (`Done(get1, get2)) pipeline#event_system
)
)
]}
and the question is how to get the pair [(get1,get2)] with the
downloaded files after [e] is finished. This is in deed very simple -
after [Unixqueue.run] returns, you can check for result values:
{[
let st = e # state
]}
Here, [st] can again have the values
- [`Done x] if the engine has a final value [x] (here our pair)
- [`Error e] if the engine execution ended in an exception [e]
- [`Aborted] if the engine was articially stopped
but also
- [`Working n] if the engine is not yet done, and [n] is an integer
indicating the number of computation steps
The [`Working] state is only visible if you query the state directly with
[state] but not in the [>>] operator.
{b Forking and joining concurrent threads built with engines}
By concatenating elementary engines with [++] you basically create an
execution thread. As we are talking here about concurrent programming,
the question is how to fork a new thread off of an existing one, and
how to join again with the created thread once it is finished.
Forking is very simple: Just have several expressions, e.g.
{[
let e1 = <expression using ++>
let e2 = <expression using ++>
]}
Now [e1] and [e2] run concurrently.
For joining, use the function {!Uq_engines.sync_engine}:
{[
let e_joined =
Uq_engines.sync_engine e1 e2
]}
The engine [e_joined] is first finished when both [e1] and [e2] are
finished. The result value of [e_joined] is the pair of the results of
[e1] and [e2], e.g.
{[
e_joined ++ (fun (r1,r2) -> ...)
]}
There is also a version of [sync_engine] which can join any number of
engines: {!Uq_engines.msync_engine}. We use it in the engine-based version
of the download example:
{[
let count_max = 10
let download_e list =
let list = ref list in
let rec download_thread_e() =
match !list with
| [] -> eps_e (`Done ()) esys
| url :: list' ->
list := list';
pipeline # add_e (new Nethttp_client.get url)
++ (fun get ->
download_thread_e()
) in
let rec fork_e k =
if k < count_max then
let e = download_thread_e() in
e :: fork_e (k+1)
else
[] in
let e_list = fork_e 0 in
Uq_engines.msync_engine
e_list
(fun _ () -> ())
()
esys
]}
The function [download_thread_e] downloads documents sequentially from the
HTTP servers. The URLs are fetched from the variable [list]. In order to
get concurrency, [count_max] of these threads are started by the
[fork_e] recursion. The result is [e_list], a list of all concurrently
running engines. Finally, [msync_engine] is used to wait until all of these
threads are finished. [msync_engine] works like a fold operator, and
aggregates the result values via the argument function. Here, the threads
only return [()] as results, so aggregation is pointless.
{2:overview Overview of library functions for engines}
{b In {!Uq_engines}:}
This module contains the basic definition of engines, {!Uq_engines.engine},
plus a number of combinators for engines:
- Sequence: {!Uq_engines.seq_engine}, also backing the [++] operator
- Mapping: {!Uq_engines.map_engine} and {!Uq_engines.fmap_engine}. The
latter is backing the [>>] operator
- Waiting on event: {!Uq_engines.signal_engine}. The engine stops and
waits until a [signal] function is called.
- Error handling: {!Uq_engines.meta_engine}. Errors are lifted into the
normal value space.
- Streaming: {!Uq_engines.stream_seq_engine}. Folding over a stream of
values ([Stream] module), and evaluation of the fold function as engine.
- Joining: {!Uq_engines.sync_engine} and {!Uq_engines.msync_engine}
- Delays: {!Uq_engines.delay_engine} suspends the execution n seconds
- Timeouts: {!Uq_engines.timeout_engine} gives an engine a maximum time
for computations, and if the time is exceeded the engine is aborted.
- Automatic serialization:
{!Uq_engines.serializer} forces that an engine function
is serialized - the next engine can first start when the previous is
finished. {!Uq_engines.prioritizer} is an advanced version where the
waiting engines can be prioritized.
- Cache (lazy evaluation): {!Uq_engines.cache} obtains a lazily computed
value by running an engine.
Also, the module defines basic I/O engines:
- File descriptor polling: {!Uq_engines.poll_engine} waits until a
file descriptor is ready for an I/O operation
- Input and output: {!Uq_engines.input_engine} and {!Uq_engines.output_engine}
can be used to define I/O engines by wrapping [read] and [write]
{b In {!Uq_client}:}
- Connect a client: {!Uq_client.connect_e} creates a new socket and
connects it to a server
{b In {!Uq_server}:}
- Define a server: {!Uq_server.listener} creates a server socket
and allows it to process the accepted connections with engines.
{b In {!Uq_io}:}
This module contains functions for buffered I/O:
- Read data: {!Uq_io.input_e} reads data from a device
- Read data with fixed length: {!Uq_io.really_input_e}
- Read data line by line: {!Uq_io.input_line_e} and {!Uq_io.input_lines_e}
- Write data: {!Uq_io.output_e} writes data to a device
- Write data with fixed length: {!Uq_io.really_output_e}. There are also
variants for writing strings and buffers
- Write EOF: {!Uq_io.write_eof_e}
- Copy data between devices: {!Uq_io.copy_e}
A "device" is a file descriptor, a multiplex controller (see
{!Uq_engines.multiplex_controller}), or an asynchronous channel.
The definition of devices is extensible.
{b In {!Uq_transfer}:}
- Copying data: {!Uq_transfer.copier} copies data between file descriptors
without looking at the data
{b Other basics:}
- Name resolution (e.g. DNS): {!Uq_resolver}
- SOCKS proxies: {!Uq_socks5}
- Using event systems together with kernel threads: {!Uq_mt}
{b Protocol interpreters:}
- RPC clients: {!Rpc_client}
- RPC servers: {!Rpc_server}
- HTTP clients: {!Nethttp_client}
- HTTP servers: {!Nethttpd_engine}
- FTP clients: {!Netftp_client}
- Telnet clients: {!Nettelnet_client}
{b Definition hierarchy}
We have, top-to-bottom:
- Engines are the top-level abstraction. Essentially, they "only" provide
a notification mechanism for operations over event systems, but
exactly this make them the easily composable units that are most useful
for constructing algorithms.
- Event systems are simply a uniform interface for event loops, and events
can be queued up in user space ([Equeue]).
- Pollsets ({!Netsys_pollset}) are event loops, i.e. it is waited for
file descriptor conditions.
- Kernel interface ([select], [poll], [epoll] etc.)
{2:tricks Various tricks}
{b Aborting engines:} Unlike other types of threads, cooperative threads
can be aborted at any time. Use the [abort] method:
{[ e # abort() ]}
The engine [e] will then enter the [`Aborted] state as soon as possible.
Often this will happen immediately, but there are also engines where this
takes some time.
{b Exceptions:} There are several ways to signal exceptions. First, the
orderly way:
{[ eps_e (`Error x) esys ]}
This creates an engine that represents an exception as final state. Because
exceptions cannot always handled that cleanly, the basic combinators like
[++] always catch exceptions, and represent these exceptions in their final
state. For example:
{[
e1 ++ (fun r1 -> raise x)
]}
The [++] operator catches the exception, and the state transitions to
[`Error x] (just as the [eps_e] example would do).
Note that this behavior relates to engines only. If you program event
systems directly, there will be no automatic exception handling. For
example
{[
Unixqueue.once esys g 1.0 (fun () -> raise x)
]}
does not catch [x] in any way. The effect is that the exception falls
through to the caller, which is always [Unixqueue.run].
{b How to jump out of event processing:} The just mentioned way of raising
an exception can be used to leave event processing. Just define
{[ exception Esys_exit ]}
and throw it like
{[
Unixqueue.once esys g 0.0 (fun () -> raise Esys_exit)
]}
and catch it like
{[
try
Unixqueue.run esys
with
| Esys_exit -> ...
]}
This works always, even if the call of [Unixqueue.once] is inside an
engine expression.
{2:lwt Engines and LWT}
Users who are familiar with LWT will certainly recognize many operations -
although they often have another name. (You may wonder why both implementations
exist - well, a longer story, but essentially Ocamlnet has older roots,
and at a certain state of the development it only knew event systems but not
yet engines. The LWT developers saw this, and found this insufficient, and
developed LWT. Unfortunately, they did not base their new library on Ocamlnet,
but chose to reimplement the event loop core. In the meantime, the Ocamlnet
development fixed the deficiencies in their library. Now we have two good
libraries for the same range of problems.)
Compare:
- Wrap values: [eps_e] vs. [Lwt.return] and [Lwt.fail]
- Waiting: {!Uq_engines.signal_engine} vs. [Lwt.wait]
- Sequences: {!Uq_engines.seq_engines} vs. [Lwt.bind]
- Joining: {!Uq_engines.sync_engine} vs. [Lwt.join], [Lwt.choose], and
[Lwt.pick]
- Run until finished: {!Unixqueue.run} vs. [Lwt_main.run]
You should, however, be aware that there are some differences between
the libraries:
- Delayed vs immediate execution:
The LWT threads are immediately started, and run until they sleep.
First at this point, you need to call [Lwt_main.run]. In contrast,
Equeue prefers to first start at [Unixqueue.run] time.
- LWT allows it to create Lwt threads which are already terminated
([Lwt.return]).
In Equeue we don't do - [eps_e] creates an engine which will terminate
as soon as possible and which results in a constant value. Effectively,
this means that [eps_e] is seen as point where threads can be switched,
whereas this is not the case for [Lwt.return]. This gives more
opportunities for switching, but there are also more subtle consequences,
like who is the caller of suspended functions. In Equeue it is always
the core of the event loop, whereas this is not uniform in LWT.
- In Equeue all engines can be aborted, whereas in LWT a special
abstraction needs to be used.
- In LWT there can be only one active event loop at a time, whereas
in Equeue there can be as many [event_system] objects as needed. This
is mainly a restriction for programs using kernel threads: In Equeue,
every thread can have its own [event_system], but doing the same in
LWT is impossible (incompatibility with kernel threads on this level).
The remaining question is how to use both facilities in the same program
(provided it is not multi-threaded, which rules LWT out). There is now
the library {!Uq_lwt} helping here.
The idea is to replace the event loop built into LWT by the Ocamlnet
event loop. This is done for a given [esys] by
{[
class lwt_engine esys =
object
inherit Lwt_engine.abstract
inherit Uq_lwt.lwt_backend esys
end
Lwt_engine.set (new lwt_engine esys)
]}
Additionally, it is required that you now always call [Lwt_main.run] for
starting event-based programs, and not [Unixqueue.run esys]. The latter
would not work for technical reasons. Of course, this means that the
main program will be LWT.
{b How to call engines from LWT threads}: Assumed you are in some
LWT code, and want to call an engine-based function
[f : 'a -> 'b engine]. This can be achieved by first simply calling
the function, and then using an LWT primitive to LWT-wait until the
engine is done:
{[
let call_thread f x =
let e = f x in
let waiter, condition = Lwt.wait() in
Uq_engines.when_state
~is_done:(fun r -> Lwt.wakeup condition r)
~is_error:(fun x -> Lwt.wakeup_exn condition x)
e;
waiter
]}
{b How to call LWT threads from engines:} The reverse for a function
[f : 'a -> 'b Lwt.thread]:
{[
let call_e f x =
let thr = f x in
let waiter, signal = Uq_engines.signal_engine() in
Lwt.on_success thr (fun r -> signal (`Done r));
Lwt.on_failure thr (fun x -> signal (`Error x));
waiter
]}
|