File: bench_stream.ml

package info (click to toggle)
ocaml-eio 1.3-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 2,548 kB
  • sloc: ml: 14,608; ansic: 1,237; makefile: 25
file content (90 lines) | stat: -rw-r--r-- 3,323 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
(* Some sender domains each run a bunch of fibers submitting items to a stream.
   Some receiver domains each run a single fiber accepting items from the stream.
   It also tests the single-domain case. *)

open Eio.Std

let n_sender_fibers = 10        (* Concurrent sending fibers per sending domain *)

(* Simulate other work in the domain, and also prevent it from going to sleep.
   Otherwise, we're just measuring how long it takes the OS to wake a sleeping thread. *)
let rec spin () =
  Fiber.yield ();
  spin ()

(* [n_fibers] fibers each send values [1..n_iters] to [stream]. *)
let run_sender ~n_fibers ~n_iters stream =
  Switch.run @@ fun sw ->
  Fiber.fork_daemon ~sw spin;
  for _ = 1 to n_fibers do
    Fiber.fork ~sw (fun () ->
        for i = 1 to n_iters do
          Eio.Stream.add stream i
        done
      )
  done

(* Read [n_iters] values from [stream] and add them to [total] (at the end). *)
let run_recv ~n_iters ~total stream =
  Switch.run @@ fun sw ->
  Fiber.fork_daemon ~sw spin;
  let rec aux acc = function
    | 0 -> acc
    | i -> aux (acc + Eio.Stream.take stream) (i - 1) in
  ignore (Atomic.fetch_and_add total (aux 0 n_iters) : int)

(* Run the tests using [n_sender_domains] additional domains to send (0 to send
   and receive in a single domain). When [n_sender_domains > 0], we also use
   that many receiver domains. *)
let run_bench ~domain_mgr ~clock ~n_send_domains ~n_iters ~capacity =
    let stream = Eio.Stream.create capacity in
    let total = Atomic.make 0 in                (* Total received (sanity check at the end) *)
    let n_senders = max 1 n_send_domains in
    let n_iters_total = (* Total number of items to be sent through [stream] *)
      n_iters * n_sender_fibers * n_senders
    in
    Gc.full_major ();
    let t0 = Eio.Time.now clock in
    Switch.run (fun sw ->
        let run_sender () = run_sender ~n_fibers:n_sender_fibers ~n_iters stream in
        if n_send_domains > 0 then (
          for _ = 1 to n_send_domains do
            Fiber.fork ~sw (fun () -> Eio.Domain_manager.run domain_mgr run_sender)
          done
        ) else (
          Fiber.fork ~sw run_sender
        );
        let run_recv () = run_recv ~n_iters:(n_iters * n_sender_fibers) ~total stream in
        for _ = 1 to n_senders - 1 do
          Fiber.fork ~sw @@ fun () ->
          Eio.Domain_manager.run domain_mgr run_recv
        done;
        Fiber.fork ~sw run_recv
      );
    let t1 = Eio.Time.now clock in
    let total = Atomic.get total in
    let expected_total = n_senders * n_sender_fibers * (n_iters * (1 + n_iters) / 2) in
    assert (total = expected_total);
    let time_total = t1 -. t0 in
    let time_per_iter = time_total /. float n_iters_total in
    Metric.create
      (Printf.sprintf "sender-domains:%d iterations:%d capacity:%d" n_send_domains n_iters capacity)
      (`Float (1e9 *. time_per_iter)) "ns"
      "Time to transmit one item over the stream"

let main ~domain_mgr ~clock =
  [0, 100_000;
   1, 100_000;
   2, 100_000;
   4, 100_000;
  ]
  |> List.concat_map (fun (n_send_domains, n_iters) ->
      [0; 1; 100] |> List.map (fun capacity ->
          run_bench ~domain_mgr ~clock ~n_send_domains ~n_iters ~capacity
        )
    )

let run env =
  main
    ~domain_mgr:(Eio.Stdenv.domain_mgr env)
    ~clock:(Eio.Stdenv.clock env)