1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178
|
(* A pool is a sequence of cells containing either available slots or consumers waiting for them.
A slot may or may not contain an actual resource.
To use a resource:
1. Get the next "suspend" cell. If it contains a resource slot, use it.
2. If no slot is ready and we're below capacity, create a new slot and add it (to the next resume cell).
3. Either way, wait for the cell to be resumed with a slot.
4. Once you have a slot, ensure it contains a resource, creating one if not.
5. When done, add the slot back (in the next resume cell).
*)
(* Import these directly because we copy this file for the dscheck tests. *)
module Fiber_context = Eio__core.Private.Fiber_context
module Suspend = Eio__core.Private.Suspend
type 'a slot = 'a option ref
module Cell = struct
(* The possible behaviours are:
1. Suspender : In_transition -> Request Suspender waits for a resource
1.1. Resumer : Request -> Finished Resumer then providers a resource
1.2. Suspender : Request -> Finished Suspender cancels
2. Resumer : In_transition -> Resource Resumer provides a spare resource
2.1. Suspender : Resource -> Finished Suspender doesn't need to wait
*)
type 'a t =
| In_transition
| Request of ('a slot -> unit)
| Resource of 'a slot
| Finished
let init = In_transition
let segment_order = 2
let dump f = function
| In_transition -> Fmt.string f "In_transition"
| Request _ -> Fmt.string f "Request"
| Resource _ -> Fmt.string f "Resource"
| Finished -> Fmt.string f "Finished"
end
module Q = Cells.Make(Cell)
type 'a t = {
slots : int Atomic.t; (* Total resources, available and in use *)
max_slots : int;
alloc : unit -> 'a;
validate : 'a -> bool;
dispose : 'a -> unit;
q : 'a Q.t;
}
let create ?(validate=Fun.const true) ?(dispose=ignore) max_size alloc =
if max_size <= 0 then invalid_arg "Pool.create: max_size is <= 0";
{
slots = Atomic.make 0;
max_slots = max_size;
alloc;
validate;
dispose;
q = Q.make ();
}
(* [add t x] adds [x] to the queue of available slots. *)
let rec add t x =
let cell = Q.next_resume t.q in
let rec aux () =
match Atomic.get cell with
| In_transition -> if not (Atomic.compare_and_set cell In_transition (Resource x)) then aux ()
| Finished -> add t x (* The consumer cancelled. Get another cell and retry. *)
| Request r as prev ->
if Atomic.compare_and_set cell prev Finished then (
r x (* We had a consumer waiting. Give it to them. *)
) else add t x (* Consumer cancelled; retry with another cell. *)
| Resource _ -> assert false (* Can't happen; only a resumer can set this, and we're the resumer. *)
in
aux ()
(* Try to cancel by transitioning from [Request] to [Finished].
This can only be called after previously transitioning to [Request]. *)
let cancel segment cell =
match Atomic.exchange cell Cell.Finished with
| Request _ -> Q.cancel_cell segment; true
| Finished -> false (* Already resumed; reject cancellation *)
| In_transition | Resource _ -> assert false (* Can't get here from [Request]. *)
(* If [t] is under capacity, add another (empty) slot. *)
let rec maybe_add_slot t current =
if current < t.max_slots then (
if Atomic.compare_and_set t.slots current (current + 1) then add t (ref None)
else maybe_add_slot t (Atomic.get t.slots) (* Concurrent update; try again *)
)
(* [run_with t f slot] ensures that [slot] contains a valid resource and then runs [f resource] with it.
Afterwards, the slot is returned to [t]. *)
let run_with t f slot =
match
begin match !slot with
| Some x when t.validate x -> f x
| Some x ->
slot := None;
t.dispose x;
let x = t.alloc () in
slot := Some x;
f x
| None ->
let x = t.alloc () in
slot := Some x;
f x
end
with
| r ->
add t slot;
r
| exception ex ->
let bt = Printexc.get_raw_backtrace () in
add t slot;
Printexc.raise_with_backtrace ex bt
(* Creates a fresh resource [x], runs [f x], then disposes of [x] *)
let run_new_and_dispose t f =
let x = t.alloc () in
match f x with
| r ->
t.dispose x;
r
| exception ex ->
let bt = Printexc.get_raw_backtrace () in
t.dispose x;
Printexc.raise_with_backtrace ex bt
let use t ?(never_block=false) f =
let segment, cell = Q.next_suspend t.q in
match Atomic.get cell with
| Finished | Request _ -> assert false
| Resource slot ->
Atomic.set cell Finished; (* Allow value to be GC'd *)
run_with t f slot
| In_transition ->
let current = Atomic.get t.slots in
match current < t.max_slots with
| false when never_block -> (
(* We are at capacity, but cannot block.
Create a new resource to run f but don't add it to the pool. *)
match Atomic.exchange cell Finished with
| Resource slot -> run_with t f slot
| _ -> run_new_and_dispose t f
)
| can_add ->
(* Create a slot if not at capacity. *)
if can_add then maybe_add_slot t current;
(* No item is available right now. Start waiting *)
let slot =
Suspend.enter_unchecked "Pool.acquire" (fun ctx enqueue ->
let r x = enqueue (Ok x) in
if Atomic.compare_and_set cell In_transition (Request r) then (
match Fiber_context.get_error ctx with
| Some ex ->
if cancel segment cell then enqueue (Error ex);
(* else being resumed *)
| None ->
Fiber_context.set_cancel_fn ctx (fun ex ->
if cancel segment cell then enqueue (Error ex)
(* else being resumed *)
)
) else (
match Atomic.exchange cell Finished with
| Resource x -> enqueue (Ok x)
| _ -> assert false
);
)
in
(* assert (Atomic.get cell = Finished); *)
run_with t f slot
|