1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134
|
module Zzz = Eio_utils.Zzz
type job =
| New
| Exit
| Job : {
fn : unit -> 'a;
enqueue : ('a, Eio.Exn.with_bt) result -> unit;
} -> job
(* Mailbox with blocking semaphore *)
module Mailbox = struct
type t = {
available : Semaphore.Binary.t;
mutable cell : job;
}
let create () = { available = Semaphore.Binary.make false; cell = New }
let put mbox x =
(* The Semaphore contains an atomic frontier,
therefore [cell] does not need to be an atomic *)
mbox.cell <- x;
Semaphore.Binary.release mbox.available
let take mbox =
Semaphore.Binary.acquire mbox.available;
mbox.cell
end
module Free_pool = struct
type list =
| Empty
| Closed
| Free of Mailbox.t * list
type t = list Atomic.t
let rec close_list = function
| Free (x, xs) -> Mailbox.put x Exit; close_list xs
| Empty | Closed -> ()
let close t =
let items = Atomic.exchange t Closed in
close_list items
let rec drop t =
match Atomic.get t with
| Closed | Empty -> ()
| Free _ as items ->
if Atomic.compare_and_set t items Empty then close_list items
else drop t
let rec put t mbox =
match Atomic.get t with
| Closed -> assert false
| (Empty | Free _) as current ->
let next = Free (mbox, current) in
if not (Atomic.compare_and_set t current next) then
put t mbox (* concurrent update, try again *)
let make_thread t =
let mbox = Mailbox.create () in
let _thread : Thread.t = Thread.create (fun () ->
while true do
match Mailbox.take mbox with
| New -> assert false
| Exit -> raise Thread.Exit
| Job { fn; enqueue } ->
let result =
try Ok (fn ())
with exn ->
let bt = Printexc.get_raw_backtrace () in
Error (exn, bt)
in
put t mbox; (* Ensure thread is in free-pool before enqueuing. *)
enqueue result
done
) ()
in
mbox
let rec get_thread t =
match Atomic.get t with
| Closed -> invalid_arg "Thread pool closed!"
| Empty -> make_thread t
| Free (mbox, next) as current ->
if Atomic.compare_and_set t current next then mbox
else get_thread t (* concurrent update, try again *)
end
type t = {
free : Free_pool.t;
sleep_q : Zzz.t;
mutable timeout : Zzz.Key.t option;
}
type _ Effect.t += Run_in_systhread : (unit -> 'a) -> (('a, Eio.Exn.with_bt) result * t) Effect.t
let terminate t =
Free_pool.close t.free;
Option.iter (fun key -> Zzz.remove t.sleep_q key; t.timeout <- None) t.timeout
let create ~sleep_q =
{ free = Atomic.make Free_pool.Empty; sleep_q; timeout = None }
let run t fn =
match fn () with
| x -> terminate t; x
| exception ex ->
let bt = Printexc.get_raw_backtrace () in
terminate t;
Printexc.raise_with_backtrace ex bt
let submit t ~ctx ~enqueue fn =
match Eio.Private.Fiber_context.get_error ctx with
| Some e -> enqueue (Error (e, Eio.Exn.empty_backtrace))
| None ->
let mbox = Free_pool.get_thread t.free in
Mailbox.put mbox (Job { fn; enqueue })
let run_in_systhread ?(label="systhread") fn =
Eio.Private.Trace.suspend_fiber label;
let r, t = Effect.perform (Run_in_systhread fn) in
if t.timeout = None then (
let time =
Mtime.add_span (Mtime_clock.now ()) Mtime.Span.(20 * ms)
|> Option.value ~default:Mtime.max_stamp
in
t.timeout <- Some (Zzz.add t.sleep_q time (Fn (fun () -> Free_pool.drop t.free; t.timeout <- None)))
);
match r with
| Ok x -> x
| Error (ex, bt) -> Printexc.raise_with_backtrace ex bt
|