File: equeue_howto.txt

package info (click to toggle)
ocamlnet 4.1.2-1
  • links: PTS, VCS
  • area: main
  • in suites: stretch
  • size: 51,764 kB
  • ctags: 16,446
  • sloc: ml: 148,419; ansic: 10,989; sh: 1,885; makefile: 1,355
file content (757 lines) | stat: -rw-r--r-- 26,270 bytes parent folder | download | duplicates (6)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
{1 The Equeue, Unixqueue, and Engines HOWTO}

This document tries to explain briefly what you can do with the
Equeue library. It is more superficial than {!Equeue_intro}, and
gives some recipes how to do things.

Contents:

 - {!Equeue_howto.about}
 - {!Equeue_howto.esys}
 - {!Equeue_howto.engines}
 - {!Equeue_howto.overview}
 - {!Equeue_howto.tricks}
 - {!Equeue_howto.lwt}


{2:about What is this about?}

We talk here about a form of concurrent programming, sometimes called
lightweight threading or cooperative threading. As for all concurrency
mechanisms, the ultimate goal is to do several things in
parallel. This type, however, focuses especially on I/O, because the
points where the execution threads can be switched are usually the
points where a thread needs to stop for waiting on an I/O resource.

There is no preemption: When normal OCaml code is executed, there is
no possibility to switch the thread. First when the control is
returned to the Equeue library, a different thread can be selected for
execution.

There is also no parallelism: All execution happens in the context
of the process or kernel thread running the code. It is only possible
to use one CPU core.

Note that LWT is another popular (and younger) implementation of the
same idea for OCaml. It is possible to use LWT together with Ocamlnet,
but there are restrictions. We'll explain this later in this article.

{2:esys What are event systems?}

You will often see the type

{[ Unixqueue.event_system ]}

in signatures of functions. An event system bundles all resources that
are needed to run cooperative threads, like watched file descriptors,
handlers for file events, and timers. It is the common anchor point
for all activities that will happen concurrently:

 - If you define several cooperative threads for the same [event_system],
   it is possible to run them concurrently.
 - You can have any number of [event_system] objects in your program.
   However, once you attach cooperative threads to different event
   systems, they cannot run together anymore.
 - Having several event systems makes nevertheless sense in a number
   of scenarios. For example, you could write a library function that
   will do a number of I/O actions concurrently, but when all I/O
   is finished, the function returns normally (and stops any concurrent
   execution). In this case you would use a local event system that exists
   for the lifetime of this function only.
 - A more extreme model is to use only one event system for the whole
   program. This, however, means that the whole program must follow
   a programming style that is compatible with events.

The [event_system] object is usually passed on from one function call
to the next. There is no global event system. (NB. If you develop for
Netplex, there is a pseudo-global event system for every Netplex container.
But this just means that you can define your own global event system if
you need it for your application.)

{b How to create an event system:}

{[
let esys = Unixqueue.standard_event_system()
]}

An older name of the same is [Unixqueue.create_unix_event_system].

There is also a second implementation which uses accelerated poll interfaces
if provided by the operating system (e.g. epoll on Linux):

{[
let esys = Unixqueue.performance_event_system()
]}

This, however, is only an advantage if you have hundreds of file
descriptors to observe.

{b How to attach actions to event systems:}

The abstraction of event systems defines an API allowing one to interact
with it. This is available in the {!Unixqueue} module. Normally, however,
you don't use this module directly, because it is {i very low-level}.

So, let's look directly at high-level interfaces. For example, the
{!Nethttp_client} uses event systems internally, and one can also control
this aspect of it. When creating an {!Nethttp_client.pipeline} object,
just set the event system to the one you want to use. This attaches
the whole HTTP protocol interpreter implemented by this object to the
event system:

{[
let pipeline = new Nethttp_client.pipeline
let () = pipeline # set_event_system esys
]}

Note that you can attach other pipelines and even unrelated, other I/O
actions to the same event system. This just means, as mentioned above,
that these actions are done concurrently.

The HTTP pipeline is initially empty, i.e. it does nothing. Before something
can happen, you need to program it, i.e. add tasks to do. For example:

{[
pipeline # add_with_callback
  (new Nethttp_client.get "http://caml.inria.fr/")
  (fun get -> ...)
]}

The [add_with_callback] method adds the HTTP task to run to the internal
queue. Also, there is a callback function which gets invoked when the
task is done.

If you enter the shown OCaml code into a toploop, you will notice that
no I/O occurs so far. Adding something to the internal task queue does not
yet trigger that it is executed. This is meant as a feature of all 
Equeue-based concurrency: You have the chance to set the machinery up
before it starts running.

This example showed how to deal with HTTP clients. What about other
network protocols? The scheme is always the same: The event system object
needs to be passed down to the protocol interpreter, either directly
at creation time, or directly after that.

{b How to run event systems}

The remaining question is now how to start the execution after everything
is set up. This is normally done with

{[
Unixqueue.run esys
]}

This single statement starts whatever action was previously configured,
and returns first when the action is completely finished. In our
example this means it covers the whole HTTP GET protocol.

It is allowed to modify the scene while something is already happening.
For example, you could download a second HTTP file when the first is
done:

{[
pipeline # add_with_callback
  (new Nethttp_client.get "http://caml.inria.fr/")
  (fun get1 -> 
    pipeline # add_with_callback
      (new Nethttp_client.get "http://www.camlcity.org/")
      (fun get2 -> ...)
  )
]}

These "in-place" modifications of what to do are not only allowed at
points like the shown where a part of the action is already complete,
but at any point in time. For example, you can define a timer that
starts the other action, no matter at which point of execution the
running action currently is:

{[
pipeline # add_with_callback
  (new Nethttp_client.get "http://caml.inria.fr/")
  (fun get1 -> ...);

let g = Unixqueue.new_group esys
Unixqueue.once esys g time
  (fun () ->
    pipeline # add_with_callback
      (new Nethttp_client.get "http://www.camlcity.org/")
      (fun get2 -> ...)
  )
]}

After [time] seconds the second download is started. (NB. What is the
purpose of [g]? A Unixqueue group can be used for cancelling all actions
associated to the group. In this case for cancelling the timer.)

The HTTP client provides an API style where the completion of an
action is indicated to the user via a callback. This style is easy to
use for beginners, but it has a drawback: There is no uniform way how
to compose more elementary actions to more complex actions. Such
composition is possible as shown in the example, but it is always an
ad-hoc solution.

{b Recursion is your friend}

Let's have a look at such an ad-hoc composition: Assumed we have
a list of URLs we want to download them with high concurrency.

Idea 1: We just add all URLs to the same pipeline, as in

{[
let count = ref 0
List.iter
  (fun url ->
    pipeline # add_with_callback
      (new Nethttp_client.get url)
      (fun get ->
        decr count;
        if !count = 0 then ... (* do some followup action here *)
      );
    incr count
  )
  list
]}

and then run the [esys]. This works, but the "intelligence" of the
HTTP pipeline object is only limited. If there are several files to
download from the same server, the pipeline is able to manage to use
only a limited number of connections to do this, and to serialize the
requests over these connections. There is, however, no built-in
mechanism that would limit the number of servers to contact at
once. If you had one million different servers in this list, the
pipeline would try to download from all servers concurrently. Of
course, this will fail (lacking system resources).

Idea 2: We only add a limited number of URLs at a time.

{[
let count = ref 0
let count_max = 10
let list = ref list

let rec maybe_next() =
  if !count < count_max then (
    match !list with
    | [] -> ()
    | url :: list' ->
        pipeline # add_with_callback
          (new Nethttp_client.get url)
          (fun get ->
             decr count;
             maybe_next();
             if !count = 0 then ...  (* followup action *)
          );
        incr count;
        list := list';
        maybe_next()
  )
]}

We use here recursion to encode the repetitive algorithm. This is the
mechanism of choice, because we need to continue the loop from the
callback function (an imperative construct could not do so).

Note that recursive calls from callback functions do not fill up the
stack, so you could do this endlessly without risking a stack
overflow.

{b Trap: do not mix synchronous and event-based APIs}

Many protocol interpreters provide both styles of APIs: Conventional
synchronous APIs, and event-based APIs. The question is whether one
can mix them.

This is not possible, because the synchronous API is normally derived
from the event-based API by setting up a one-time action in the event
system and then running the event system. If you mixed the APIs,
it would occur that a running event system is again tried to be run.
This is forbidden, though, and will cause that an exception is thrown.

So: Use the same instance of the protocol interpreter either in a
synchronous way, or in an event-based way, but do not do both.

{2:engines What are engines?}

As we have seen, callbacks are a common way to notify the caller about
state changes. However, callbacks are too primitive to allow
systematic composition of actions. The abstraction of engines has been
developed to fill this gap. As a first approximation, imagine an
engine as a wrapped callback interface: a machinery which is executing
something concurrently until the result is available, with the
possibility of notifying users of the result.

Continuing the HTTP example, there is also an engine-based version of
adding a request to the pipeline:

{[
let e = pipeline # add_e (new Nethttp_client.get url)
]}

This is the same as [add_with_callback] only that the delivery mechanism
is different.

It is possible to attach a callback to an engine:

{[
Uq_engines.when_state
  ~is_done:(fun () -> ...)
  ~is_error:(fun ex -> ...)
  e
]}

The first function is called when the HTTP request could be processed
successfully, and the second one when a fatal error occurs (with [ex]
being the exception). Using {!Uq_engines.when_state}, every engine-style
interface can be turned into a callback-style interface. Of course, this
is not the primary idea of this abstraction, but this possibility means
that we can go to the lower level of callbacks whenever needed.

The three most important composition operators are available in
{!Uq_engines.Operators}. It is suggested that you [open] this module,
and use the operators directly:

 - Sequence: With
   {[ e1 ++ (fun r1 -> e2) ]}
   the engine [e1] is executed first, and when it has computed the result
   [r1], the engine [e2] is started. The result of [++] is again an engine,
   so it is possible to concatenate several expressions:
   {[ e1 ++ (fun r1 -> e2) ++ (fun r2 -> e3) ++ ... ]}
   One can also set the parentheses differently if the previous results
   are needed later:
   {[ e1 ++ (fun r1 -> e2 ++ (fun r2 -> e3 ++ ... )) ]}
   The [++] operator is also available as normal function:
   {!Uq_engines.seq_engine}
 - Mapping: With
   {[ e1 >> (fun st1 -> st2) ]}
   one can map the final state of [e1] to a different state. The state
   of the engine is either the computed value, the resulting exception,
   or the tag that the engine was aborted:
   {[
     e1 >>
       (function
         | `Done v -> ...
         | `Error ex -> ...
         | `Aborted -> ...
       )
   ]}
   As you can also have access to exceptions, this construction can be
   used to catch exceptions, and to transform them into normal values:
   {[
     e1 >>
       (function
         | `Done s          -> `Done(Some s)
         | `Error Not_found -> `Done None
         | `Error ex        -> `Error ex
         | `Aborted         -> `Aborted
       )
   ]}
 - Values as engines: If you just want to encapsulate an already existing
   value [v] into an engine, use
   {[ eps_e (`Done v) esys ]}
   or more generally
   {[ eps_e st esys ]}
   to encapsulate any state. The [eps_e] makes an engine out of a value
   by pretending that the value is computed in a single step (the
   epsilon step).

Using this, the above example of downloading two files, one after the
other, looks like:

{[
let e =
  pipeline # add_e
    (new Nethttp_client.get "http://caml.inria.fr/")
  ++ (fun get1 -> 
        pipeline # add_e
          (new Nethttp_client.get "http://www.camlcity.org/")
        ++ (fun get2 ->
              ...;
              eps_e (`Done()) pipeline#event_system
           )
     )
]}

Note that the final result is here just [()], and it is transformed with
[eps_e] into an engine-compatible shape.

{b Getting results out of an engine-based algorithm}

As engines use event systems internally, the constructed complex
engine (like [e] in the previous example) is not immediately started,
but first when the event system runs (unless the event system is
already running).  So you still finally need

{[
Unixqueue.run esys
]}

to fire up the prepared engines, and to wait for the result.

You may wonder how to access the result. In the previous example, the
result was just [()], so there is no interest in knowing it. But you
could also just return what you have got, as in

{[
let e =
  pipeline # add_e
    (new Nethttp_client.get "http://caml.inria.fr/")
  ++ (fun get1 -> 
        pipeline # add_e
          (new Nethttp_client.get "http://www.camlcity.org/")
        ++ (fun get2 ->
              eps_e (`Done(get1, get2)) pipeline#event_system
           )
     )
]}

and the question is how to get the pair [(get1,get2)] with the
downloaded files after [e] is finished. This is in deed very simple -
after [Unixqueue.run] returns, you can check for result values:

{[
let st = e # state
]}

Here, [st] can again have the values

 - [`Done x] if the engine has a final value [x] (here our pair)
 - [`Error e] if the engine execution ended in an exception [e]
 - [`Aborted] if the engine was articially stopped

but also

  - [`Working n] if the engine is not yet done, and [n] is an integer
    indicating the number of computation steps

The [`Working] state is only visible if you query the state directly with
[state] but not in the [>>] operator.

{b Forking and joining concurrent threads built with engines}

By concatenating elementary engines with [++] you basically create an
execution thread. As we are talking here about concurrent programming,
the question is how to fork a new thread off of an existing one, and
how to join again with the created thread once it is finished.

Forking is very simple: Just have several expressions, e.g.

{[
let e1 = <expression using ++>
let e2 = <expression using ++>
]}

Now [e1] and [e2] run concurrently.

For joining, use the function {!Uq_engines.sync_engine}:

{[
let e_joined =
  Uq_engines.sync_engine e1 e2
]}

The engine [e_joined] is first finished when both [e1] and [e2] are
finished. The result value of [e_joined] is the pair of the results of
[e1] and [e2], e.g.

{[
e_joined ++ (fun (r1,r2) -> ...)
]}

There is also a version of [sync_engine] which can join any number of
engines: {!Uq_engines.msync_engine}. We use it in the engine-based version
of the download example:

{[
let count_max = 10

let download_e list =
  let list = ref list in

  let rec download_thread_e() =
    match !list with
    | [] -> eps_e (`Done ()) esys
    | url :: list' ->
        list := list';
        pipeline # add_e (new Nethttp_client.get url)
        ++ (fun get ->
              download_thread_e()
           ) in

  let rec fork_e k =
    if k < count_max then
      let e = download_thread_e() in
      e :: fork_e (k+1)
    else
      [] in

  let e_list = fork_e 0 in

  Uq_engines.msync_engine
    e_list
    (fun _ () -> ())
    ()
    esys
]}

The function [download_thread_e] downloads documents sequentially from the
HTTP servers. The URLs are fetched from the variable [list]. In order to
get concurrency, [count_max] of these threads are started by the
[fork_e] recursion. The result is [e_list], a list of all concurrently
running engines. Finally, [msync_engine] is used to wait until all of these
threads are finished. [msync_engine] works like a fold operator, and
aggregates the result values via the argument function. Here, the threads
only return [()] as results, so aggregation is pointless.

{2:overview Overview of library functions for engines}

{b In {!Uq_engines}:}

This module contains the basic definition of engines, {!Uq_engines.engine},
plus a number of combinators for engines:

 - Sequence: {!Uq_engines.seq_engine}, also backing the [++] operator
 - Mapping: {!Uq_engines.map_engine} and {!Uq_engines.fmap_engine}. The
   latter is backing the [>>] operator
 - Waiting on event: {!Uq_engines.signal_engine}. The engine stops and
   waits until a [signal] function is called.
 - Error handling: {!Uq_engines.meta_engine}. Errors are lifted into the
   normal value space.
 - Streaming: {!Uq_engines.stream_seq_engine}. Folding over a stream of
   values ([Stream] module), and evaluation of the fold function as engine.
 - Joining: {!Uq_engines.sync_engine} and {!Uq_engines.msync_engine}
 - Delays: {!Uq_engines.delay_engine} suspends the execution n seconds
 - Timeouts: {!Uq_engines.timeout_engine} gives an engine a maximum time
   for computations, and if the time is exceeded the engine is aborted.
 - Automatic serialization:
   {!Uq_engines.serializer} forces that an engine function
   is serialized - the next engine can first start when the previous is
   finished. {!Uq_engines.prioritizer} is an advanced version where the
   waiting engines can be prioritized.
 - Cache (lazy evaluation): {!Uq_engines.cache} obtains a lazily computed
   value by running an engine.

Also, the module defines basic I/O engines:

 - File descriptor polling: {!Uq_engines.poll_engine} waits until a
   file descriptor is ready for an I/O operation
 - Input and output: {!Uq_engines.input_engine} and {!Uq_engines.output_engine}
   can be used to define I/O engines by wrapping [read] and [write]

{b In {!Uq_client}:}

 - Connect a client: {!Uq_client.connect_e} creates a new socket and
   connects it to a server

{b In {!Uq_server}:}

 - Define a server: {!Uq_server.listener} creates a server socket
   and allows it to process the accepted connections with engines.

{b In {!Uq_io}:}

This module contains functions for buffered I/O:

 - Read data: {!Uq_io.input_e} reads data from a device
 - Read data with fixed length: {!Uq_io.really_input_e}
 - Read data line by line: {!Uq_io.input_line_e} and {!Uq_io.input_lines_e}
 - Write data: {!Uq_io.output_e} writes data to a device
 - Write data with fixed length: {!Uq_io.really_output_e}. There are also
   variants for writing strings and buffers
 - Write EOF: {!Uq_io.write_eof_e}
 - Copy data between devices: {!Uq_io.copy_e}

A "device" is a file descriptor, a multiplex controller (see
{!Uq_engines.multiplex_controller}), or an asynchronous channel.
The definition of devices is extensible.

{b In {!Uq_transfer}:}

 - Copying data: {!Uq_transfer.copier} copies data between file descriptors
   without looking at the data


{b Other basics:}

 - Name resolution (e.g. DNS): {!Uq_resolver}
 - SOCKS proxies: {!Uq_socks5}
 - Using event systems together with kernel threads: {!Uq_mt}

{b Protocol interpreters:}

 - RPC clients: {!Rpc_client}
 - RPC servers: {!Rpc_server}
 - HTTP clients: {!Nethttp_client}
 - HTTP servers: {!Nethttpd_engine}
 - FTP clients: {!Netftp_client}
 - Telnet clients: {!Nettelnet_client}

{b Definition hierarchy}

We have, top-to-bottom:

 - Engines are the top-level abstraction. Essentially, they "only" provide
   a notification mechanism for operations over event systems, but
   exactly this make them the easily composable units that are most useful
   for constructing algorithms.
 - Event systems are simply a uniform interface for event loops, and events
   can be queued up in user space ([Equeue]).
 - Pollsets ({!Netsys_pollset}) are event loops, i.e. it is waited for
   file descriptor conditions.
 - Kernel interface ([select], [poll], [epoll] etc.)

{2:tricks Various tricks}

{b Aborting engines:} Unlike other types of threads, cooperative threads
can be aborted at any time. Use the [abort] method:

{[ e # abort() ]}

The engine [e] will then enter the [`Aborted] state as soon as possible.
Often this will happen immediately, but there are also engines where this
takes some time.

{b Exceptions:} There are several ways to signal exceptions. First, the
orderly way:

{[ eps_e (`Error x) esys ]}

This creates an engine that represents an exception as final state. Because
exceptions cannot always handled that cleanly, the basic combinators like
[++] always catch exceptions, and represent these exceptions in their final
state. For example:

{[
e1 ++ (fun r1 -> raise x)
]}

The [++] operator catches the exception, and the state transitions to
[`Error x] (just as the [eps_e] example would do).

Note that this behavior relates to engines only. If you program event
systems directly, there will be no automatic exception handling. For
example

{[
Unixqueue.once esys g 1.0 (fun () -> raise x)
]}

does not catch [x] in any way. The effect is that the exception falls
through to the caller, which is always [Unixqueue.run].

{b How to jump out of event processing:} The just mentioned way of raising
an exception can be used to leave event processing. Just define

{[ exception Esys_exit ]}

and throw it like

{[
Unixqueue.once esys g 0.0 (fun () -> raise Esys_exit)
]}

and catch it like

{[
try
  Unixqueue.run esys
with
  | Esys_exit -> ...
]}

This works always, even if the call of [Unixqueue.once] is inside an
engine expression.

{2:lwt Engines and LWT}

Users who are familiar with LWT will certainly recognize many operations -
although they often have another name. (You may wonder why both implementations
exist - well, a longer story, but essentially Ocamlnet has older roots,
and at a certain state of the development it only knew event systems but not
yet engines. The LWT developers saw this, and found this insufficient, and
developed LWT. Unfortunately, they did not base their new library on Ocamlnet,
but chose to reimplement the event loop core. In the meantime, the Ocamlnet
development fixed the deficiencies in their library. Now we have two good
libraries for the same range of problems.)

Compare:
 - Wrap values: [eps_e] vs. [Lwt.return] and [Lwt.fail]
 - Waiting: {!Uq_engines.signal_engine} vs. [Lwt.wait]
 - Sequences: {!Uq_engines.seq_engines} vs. [Lwt.bind]
 - Joining: {!Uq_engines.sync_engine} vs. [Lwt.join], [Lwt.choose], and
   [Lwt.pick]
 - Run until finished: {!Unixqueue.run} vs. [Lwt_main.run]

You should, however, be aware that there are some differences between
the libraries:

 - Delayed vs immediate execution:
   The LWT threads are immediately started, and run until they sleep.
   First at this point, you need to call [Lwt_main.run]. In contrast,
   Equeue prefers to first start at [Unixqueue.run] time.
 - LWT allows it to create Lwt threads which are already terminated
   ([Lwt.return]).
   In Equeue we don't do - [eps_e] creates an engine which will terminate
   as soon as possible and which results in a constant value. Effectively,
   this means that [eps_e] is seen as point where threads can be switched,
   whereas this is not the case for [Lwt.return]. This gives more
   opportunities for switching, but there are also more subtle consequences,
   like who is the caller of suspended functions. In Equeue it is always
   the core of the event loop, whereas this is not uniform in LWT.
 - In Equeue all engines can be aborted, whereas in LWT a special
   abstraction needs to be used.
 - In LWT there can be only one active event loop at a time, whereas
   in Equeue there can be as many [event_system] objects as needed. This
   is mainly a restriction for programs using kernel threads: In Equeue,
   every thread can have its own [event_system], but doing the same in
   LWT is impossible (incompatibility with kernel threads on this level).

The remaining question is how to use both facilities in the same program
(provided it is not multi-threaded, which rules LWT out). There is now
the library {!Uq_lwt} helping here.

The idea is to replace the event loop built into LWT by the Ocamlnet
event loop. This is done for a given [esys] by

{[
class lwt_engine esys =
object
  inherit Lwt_engine.abstract
  inherit Uq_lwt.lwt_backend esys
end

Lwt_engine.set (new lwt_engine esys)
]}

Additionally, it is required that you now always call [Lwt_main.run] for
starting event-based programs, and not [Unixqueue.run esys]. The latter
would not work for technical reasons. Of course, this means that the
main program will be LWT.

{b How to call engines from LWT threads}: Assumed you are in some
LWT code, and want to call an engine-based function
[f : 'a -> 'b engine]. This can be achieved by first simply calling
the function, and then using an LWT primitive to LWT-wait until the
engine is done:

{[
let call_thread f x =
  let e = f x in
  let waiter, condition = Lwt.wait() in
  Uq_engines.when_state
    ~is_done:(fun r -> Lwt.wakeup condition r)
    ~is_error:(fun x -> Lwt.wakeup_exn condition x)
    e;
  waiter
]}

{b How to call LWT threads from engines:} The reverse for a function
[f : 'a -> 'b Lwt.thread]:

{[
let call_e f x =
  let thr = f x in
  let waiter, signal = Uq_engines.signal_engine() in
  Lwt.on_success thr (fun r -> signal (`Done r));
  Lwt.on_failure thr (fun x -> signal (`Error x));
  waiter
]}