1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107
|
type 'a state =
| Resolved of 'a
| Unresolved of Broadcast.t
type !'a promise = {
id : Trace.id;
state : 'a state Atomic.t; (* Note: we always switch to Resolved before broadcasting *)
}
type +!'a t
type -!'a u
type 'a or_exn = ('a, exn) result t
let to_public_promise : 'a promise -> 'a t = Obj.magic
let to_public_resolver : 'a promise -> 'a u = Obj.magic
let of_public_promise : 'a t -> 'a promise = Obj.magic
let of_public_resolver : 'a u -> 'a promise = Obj.magic
let create_with_id id =
let t = {
id;
state = Atomic.make (Unresolved (Broadcast.create ()));
} in
to_public_promise t, to_public_resolver t
let create ?label () =
let id = Trace.mint_id () in
Trace.create_obj ?label id Promise;
create_with_id id
let create_resolved x =
let id = Trace.mint_id () in
Trace.create_obj id Promise;
to_public_promise { id; state = Atomic.make (Resolved x) }
let await t =
let t = of_public_promise t in
match Atomic.get t.state with
| Resolved x ->
Trace.get t.id;
x
| Unresolved b ->
Suspend.enter "Promise.await" (fun ctx enqueue ->
match Broadcast.suspend b (fun () -> enqueue (Ok ())) with
| None -> () (* We got resumed immediately *)
| Some request ->
match Atomic.get t.state with
| Resolved _ ->
(* The promise was resolved as we were suspending.
Resume now if we haven't already done so. *)
if Broadcast.cancel request then enqueue (Ok ())
| Unresolved _ ->
(* We observed the promise to be still unresolved after registering a waiter.
Therefore any resolution must happen after we were registered and we will be notified. *)
Trace.try_get t.id;
Cancel.Fiber_context.set_cancel_fn ctx (fun ex ->
if Broadcast.cancel request then enqueue (Error ex)
(* else already resumed *)
)
);
match Atomic.get t.state with
| Resolved x ->
Trace.get t.id;
x
| Unresolved _ -> assert false
let await_exn t =
match await t with
| Ok x -> x
| Error ex -> raise ex
let try_resolve t v =
let rec resolve' t v =
match Atomic.get t.state with
| Resolved _ -> false
| Unresolved b as prev ->
if Atomic.compare_and_set t.state prev (Resolved v) then (
Trace.put t.id;
Broadcast.resume_all b;
true
) else (
(* Otherwise, the promise was already resolved. Retry (to get the error). *)
resolve' t v
)
in
resolve' (of_public_resolver t) v
let resolve u x =
if not (try_resolve u x) then
invalid_arg "Can't resolve already-resolved promise"
let resolve_ok u x = resolve u (Ok x)
let resolve_error u x = resolve u (Error x)
let peek t =
let t = of_public_promise t in
match Atomic.get t.state with
| Unresolved _ -> None
| Resolved x -> Some x
let id t =
let t = of_public_promise t in
t.id
let is_resolved t =
Option.is_some (peek t)
|