1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154
|
let debug = false
module T = Sync
(* Create a synchronous channel. [prod] producers write values to it and [cons] consumers take values.
Both producers and consumers try to cancel if they can.
[take_nonblocking] additional consumers also perform a single non-blocking take.
At the end, we check that:
- We received the expected values.
- No processes are still queued up (since everything tries to cancel before finishing).
*)
let test ~prod ~cons ~take_nonblocking () =
let messages = ref [] in
let log fmt = (fmt ^^ "@.") |> Format.kasprintf @@ fun msg -> messages := msg :: !messages in
if debug then log "== start ==";
let t = T.create () in
let finished_producers = ref 0 in
let expected_total = ref 0 in
let received = ref 0 in
let cancelled_consumers = ref 0 in
let cancelled_producers = ref 0 in
let run_consumer l =
Fake_sched.run
(fun () ->
match T.take t with
| Error `Closed -> assert false
| Ok v ->
if debug then log "c%d: Recv %d" l v;
received := !received + v
| exception Eio__core.Cancel.Cancelled _ ->
if debug then log "c%d: Cancelled" l;
incr cancelled_consumers
)
|> Option.iter (fun ctx ->
if debug then log "c%d: Suspended" l;
Fake_sched.cancel ctx;
)
in
let run_producer v =
Fake_sched.run
(fun () ->
match T.put t v with
| () ->
if debug then log "p%d: Sent" v;
expected_total := !expected_total + v;
incr finished_producers
| exception Eio__core.Cancel.Cancelled _ ->
if debug then log "p%d: Cancelled" v;
incr finished_producers;
incr cancelled_producers
)
|> Option.iter (fun ctx ->
if debug then log "p%d: Suspended sending" v;
Fake_sched.cancel ctx
)
in
for i = 1 to prod do
Atomic.spawn (fun () -> run_producer i)
done;
for i = 1 to cons do
Atomic.spawn (fun () -> run_consumer i)
done;
for i = 1 to take_nonblocking do
Atomic.spawn (fun () ->
match T.take_nonblocking t with
| Error `Closed -> assert false
| Error `Would_block ->
if debug then log "nb%d: found nothing" i;
incr cancelled_consumers;
| Ok v ->
if debug then log "nb%d: took %d" i v;
received := !received + v
)
done;
Atomic.final (fun () ->
if debug then (
List.iter print_string (List.rev !messages);
Fmt.pr "%a@." T.dump t;
Fmt.pr "Received total = %d/%d (%d/%d cancelled consumers)@."
!received !expected_total
!cancelled_consumers (cons + take_nonblocking);
Fmt.pr "Finished producers = %d/%d (incl %d cancelled)@."
!finished_producers prod
!cancelled_producers;
);
assert (!finished_producers = prod);
(* Everyone finishes by trying to cancel (if they didn't succeed immediately),
so there shouldn't be any balance at the end. *)
assert (T.balance t = Ok 0);
assert (!received = !expected_total);
)
(* A producer puts "A" and then closes the stream.
Two consumers try to read. One gets the "A", the other gets end-of-stream. *)
let test_close () =
let t = T.create () in
let got = ref [] in
Atomic.spawn
(fun () ->
let _ : Sync.Cancel.t option = Fake_sched.run (fun () -> T.put t "A"; T.close t) in
()
);
for _ = 1 to 2 do
Atomic.spawn
(fun () ->
let _ : Sync.Cancel.t option = Fake_sched.run (fun () ->
let msg = T.take t |> Result.value ~default:"end-of-stream" in
got := msg :: !got
)
in
()
);
done;
Atomic.final (fun () ->
let results = List.sort String.compare !got in
if debug then (
Fmt.pr "%a@." T.dump t;
Fmt.pr "%a@." Fmt.(Dump.list string) results;
);
assert (results = ["A"; "end-of-stream"]);
assert (T.balance t = Error `Closed);
)
(* A producer tries to add an item (but never succeeds, as there are no consumers).
At some point, the stream is closed and the operation aborts. *)
let test_close2 () =
let t = T.create () in
let result = ref "Waiting" in
Atomic.spawn
(fun () ->
let _ : Sync.Cancel.t option = Fake_sched.run (fun () ->
match T.put t "A" with
| () -> failwith "Shouldn't succeed with no consumer!"
| exception (Invalid_argument msg) -> result := msg
) in
()
);
Atomic.spawn (fun () -> T.close t);
Atomic.final (fun () ->
if debug then (
Fmt.pr "%a@." T.dump t;
Fmt.pr "%s@." !result;
);
match !result with
| "Stream closed" -> ()
| x -> failwith x
)
let () =
Atomic.trace (test ~prod:1 ~cons:1 ~take_nonblocking:1);
Atomic.trace (test ~prod:2 ~cons:1 ~take_nonblocking:0);
Atomic.trace (test ~prod:1 ~cons:2 ~take_nonblocking:0);
Atomic.trace test_close;
Atomic.trace test_close2;
|