File: fiber.mli

package info (click to toggle)
ocaml-dune 2.7.1-2
  • links: PTS, VCS
  • area: main
  • in suites: bullseye
  • size: 14,064 kB
  • sloc: ml: 70,777; lisp: 466; ansic: 241; sh: 209; makefile: 119; python: 38; cpp: 17; javascript: 6
file content (263 lines) | stat: -rw-r--r-- 7,613 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
(** Concurrency library

    This module implements
    {{:https://en.wikipedia.org/wiki/Structured_concurrency} "structured
    concurrency"}. *)

open! Stdune

(** {1 Generals} *)

(** Type of fiber. A fiber represent a suspended computation. Note that using
    the same fiber twice will execute it twice, which is probably not what you
    want. To share the result of a fiber, use an [Ivar.t]. *)
type 'a t

(** Create a fiber that has already terminated. *)
val return : 'a -> 'a t

(** Converts a thunk to a fiber, making sure the thunk runs in the context of
    the fiber (rather than applied in the current context).

    Equivalent to [(>>=) (return ())], but more explicit. *)
val of_thunk : (unit -> 'a t) -> 'a t

(** Fiber that never completes. *)
val never : 'a t

module O : sig
  (** [>>>] is a sequencing operator. [a >>> b] is the fiber that first executes
      [a] and then [b]. *)
  val ( >>> ) : unit t -> 'a t -> 'a t

  (** [>>=] is similar to [>>>] except that the result of the first fiber is
      used to create the second one. *)
  val ( >>= ) : 'a t -> ('a -> 'b t) -> 'b t

  (** [t >>| f] is the same as [t >>= fun x -> return (f x)] but slightly more
      efficient. *)
  val ( >>| ) : 'a t -> ('a -> 'b) -> 'b t

  val ( let* ) : 'a t -> ('a -> 'b t) -> 'b t

  val ( let+ ) : 'a t -> ('a -> 'b) -> 'b t
end

val map : 'a t -> f:('a -> 'b) -> 'b t

val bind : 'a t -> f:('a -> 'b t) -> 'b t

(** {1 Joining} *)

(** The following combinators are helpers to combine the result of several
    fibers into one. Note that they do not introduce parallelism. *)

val both : 'a t -> 'b t -> ('a * 'b) t

val sequential_map : 'a list -> f:('a -> 'b t) -> 'b list t

val sequential_iter : 'a list -> f:('a -> unit t) -> unit t

(** {1 Forking + joining} *)

(** The following functions combine forking 2 or more fibers followed by joining
    the results. The execution of the various fibers might be interleaved,
    however once the combining fiber has terminated, it is guaranteed that there
    are no fibers lingering around. *)

(** Start two fibers and wait for their results. *)
val fork_and_join : (unit -> 'a t) -> (unit -> 'b t) -> ('a * 'b) t

(** Same but assume the first fiber returns [unit]. *)
val fork_and_join_unit : (unit -> unit t) -> (unit -> 'a t) -> 'a t

(** Map a list in parallel. *)
val parallel_map : 'a list -> f:('a -> 'b t) -> 'b list t

(** Iter over a list in parallel. *)
val parallel_iter : 'a list -> f:('a -> unit t) -> unit t

(** {1 Local storage} *)

(** Variables local to a fiber *)
module Var : sig
  type 'a fiber = 'a t

  type 'a t

  (** Create a new variable *)
  val create : unit -> 'a t

  (** [get var] reads the value of [var]. *)
  val get : 'a t -> 'a option

  (** Same as [get] but raises if [var] is unset. *)
  val get_exn : 'a t -> 'a

  (** [set var value fiber] sets [var] to [value] during the execution of
      [fiber].

      For instance, the following fiber always evaluate to [true]:

      {[ set v x (get_exn v >>| fun y -> x = y) ]} *)
  val set : 'a t -> 'a -> (unit -> 'b fiber) -> 'b fiber

  val set_sync : 'a t -> 'a -> (unit -> 'b) -> 'b

  val unset : 'a t -> (unit -> 'b fiber) -> 'b fiber

  val unset_sync : 'a t -> (unit -> 'b) -> 'b
end
with type 'a fiber := 'a t

(** {1 Error handling} *)

(** [with_error_handler f ~on_error] calls [on_error] for every exception raised
    during the execution of [f]. This include exceptions raised when calling
    [f ()] or during the execution of fibers after [f ()] has returned.
    Exceptions raised by [on_error] are passed on to the parent error handler.

    It is guaranteed that after the fiber has returned a value, [on_error] will
    never be called. *)
val with_error_handler :
  (unit -> 'a t) -> on_error:(Exn_with_backtrace.t -> unit) -> 'a t

(** [fold_errors f ~init ~on_error] calls [on_error] for every exception raised
    during the execution of [f]. This include exceptions raised when calling
    [f ()] or during the execution of fibers after [f ()] has returned.

    Exceptions raised by [on_error] are passed on to the parent error handler. *)
val fold_errors :
     (unit -> 'a t)
  -> init:'b
  -> on_error:(Exn_with_backtrace.t -> 'b -> 'b)
  -> ('a, 'b) Result.t t

(** [collect_errors f] is:
    [fold_errors f ~init:\[\] ~on_error:(fun e l -> e :: l)] *)
val collect_errors :
  (unit -> 'a t) -> ('a, Exn_with_backtrace.t list) Result.t t

(** [finalize f ~finally] runs [finally] after [f ()] has terminated, whether it
    fails or succeeds. *)
val finalize : (unit -> 'a t) -> finally:(unit -> unit t) -> 'a t

(** {1 Synchronization} *)

(** Write once variables *)
module Ivar : sig
  type 'a fiber

  (** A ivar is a synchronization variable that can be written only once. *)
  type 'a t

  (** Create a new empty ivar. *)
  val create : unit -> 'a t

  (** Read the contents of the ivar. *)
  val read : 'a t -> 'a fiber

  (** Fill the ivar with the following value. This can only be called once for a
      given ivar. *)
  val fill : 'a t -> 'a -> unit fiber

  (** Return [Some x] is [fill t x] has been called previously. *)
  val peek : 'a t -> 'a option fiber
end
with type 'a fiber := 'a t

(** Mailbox variables *)
module Mvar : sig
  type 'a fiber

  (** A mailbox variable can be thought of as a box that is either empty or
      full. [create ()] creates a new empty box, and [create_full x] creates a
      new full box containing [x].

      [read] removes the value from a full mailbox variable and returns it, but
      blocks if the mvar is currently empty. Symmetrically, [write] puts a value
      into the mvar but blocks if the mvar is already full. *)

  type 'a t

  val create : unit -> 'a t

  val create_full : 'a -> 'a t

  val read : 'a t -> 'a fiber

  val write : 'a t -> 'a -> unit fiber
end
with type 'a fiber := 'a t

module Mutex : sig
  type 'a fiber = 'a t

  type t

  val create : unit -> t

  val with_lock : t -> (unit -> 'a fiber) -> 'a fiber
end
with type 'a fiber := 'a t

module Throttle : sig
  (** Limit the number of jobs *)

  type 'a fiber = 'a t

  type t

  (** [create n] creates a throttler that allows to run [n] jobs at once *)
  val create : int -> t

  (** How many jobs can run at the same time *)
  val size : t -> int

  (** Change the number of jobs that can run at once *)
  val resize : t -> int -> unit fiber

  (** Execute a fiber, waiting if too many jobs are already running *)
  val run : t -> f:(unit -> 'a fiber) -> 'a fiber

  (** Return the number of jobs currently running *)
  val running : t -> int
end
with type 'a fiber := 'a t

module Sequence : sig
  type 'a fiber = 'a t

  type 'a t = 'a node fiber

  and 'a node =
    | Nil
    | Cons of 'a * 'a t

  val sequential_iter : 'a t -> f:('a -> unit fiber) -> unit fiber

  (** [parallel_iter t ~f] is the same as:

      {[
        let rec loop t ~f =
          t >>= function
          | Nil -> return ()
          | Cons (x, t) ->
            fork_and_join_unit (fun () -> f x) (fun () -> loop t ~f)
      ]}

      except that if the sequence is infinite, the above code would leak memory
      while [parallel_iter] does not. This function can typically be used to
      process a sequence of events. *)
  val parallel_iter : 'a t -> f:('a -> unit fiber) -> unit fiber
end
with type 'a fiber := 'a t

(** {1 Running fibers} *)

type fill = Fill : 'a Ivar.t * 'a -> fill

(** [run t ~iter] runs a fiber until it terminates. [iter] is used to implement
    the scheduler, it should block waiting for an event and return an ivar to
    fill. *)
val run : 'a t -> iter:(unit -> fill) -> 'a