File: stream.mli

package info (click to toggle)
ocaml-eio 1.3-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 2,548 kB
  • sloc: ml: 14,608; ansic: 1,237; makefile: 25
file content (50 lines) | stat: -rw-r--r-- 1,594 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
(** Reading from an empty queue will wait until an item is available.
    Writing to a full queue will wait until there is space.

    Example:
    {[
      let t = Stream.create 100 in
      Stream.add t 1;
      Stream.add t 2;
      assert (Stream.take t = 1);
      assert (Stream.take t = 2)
    ]}

    Streams are thread-safe and so can be shared between domains and used
    to communicate between them. *)

type 'a t
(** A queue of items of type ['a]. *)

val create : int -> 'a t
(** [create capacity] is a new stream which can hold up to [capacity] items without blocking writers.

    - If [capacity = 0] then writes block until a reader is ready.
    - If [capacity = 1] then this acts as a "mailbox".
    - If [capacity = max_int] then the stream is effectively unbounded. *)

val add : 'a t -> 'a -> unit
(** [add t item] adds [item] to [t].

    If this would take [t] over capacity, it blocks until there is space. *)

val take : 'a t -> 'a
(** [take t] takes the next item from the head of [t].

    If no items are available, it waits until one becomes available. *)

val take_nonblocking : 'a t -> 'a option
(** [take_nonblocking t] is like [Some (take t)] except that
    it returns [None] if the stream is empty rather than waiting.

    Note that if another domain may add to the stream then a [None]
    result may already be out-of-date by the time this returns. *)

val length : 'a t -> int
(** [length t] returns the number of items currently in [t]. *)

val is_empty : 'a t -> bool
(** [is_empty t] is [length t = 0]. *)

val dump : 'a t Fmt.t
(** For debugging. *)