1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88
|
From: pveber <philippe.veber@gmail.com>
Date: Mon, 1 Apr 2013 12:06:03 +0200
Subject: slightly more efficient fix for concurrent access to [in_stream]
now each pool has its own mutex.
nproc.ml | 34 +++++++++++++++-------------------
1 file changed, 15 insertions(+), 19 deletions(-)
diff --git a/nproc.ml b/nproc.ml
index d42ccef..62252ce 100644
@@ -153,26 +153,21 @@ struct
try Unix.waitpid [] pid
with Unix.Unix_error (Unix.EINTR, _, _) -> waitpid pid
- let mutex = Lwt_mutex.create ()
-
(* --master-- *)
- let pull_task kill_workers in_stream central_service worker =
+ let pull_task kill_workers in_stream in_stream_mutex central_service worker =
(* Note: input and output file descriptors are automatically closed
when the end of the lwt channel is reached. *)
let ic = Lwt_io.of_fd ~mode:Lwt_io.input worker.worker_in in
let oc = Lwt_io.of_fd ~mode:Lwt_io.output worker.worker_out in
let rec pull () =
- Lwt.bind (Lwt_mutex.lock mutex) (fun () ->
- Lwt.bind (Lwt_stream.get in_stream) (
- function
- | None -> Lwt_mutex.unlock mutex ; Lwt.return ()
- | Some (f, x, g) ->
- Lwt_mutex.unlock mutex ;
- let req = Worker_req (f, x) in
- Lwt.bind
- (write_value oc req)
- (read_from_worker g)
- )
+ Lwt.bind (Lwt_mutex.with_lock in_stream_mutex (fun () -> Lwt_stream.get in_stream)) (
+ function
+ | None -> Lwt.return ()
+ | Some (f, x, g) ->
+ let req = Worker_req (f, x) in
+ Lwt.bind
+ (write_value oc req)
+ (read_from_worker g)
)
and read_from_worker g () =
Lwt.try_bind
@@ -219,7 +214,7 @@ struct
pull ()
(* --master-- *)
- let create_gen init (in_stream, push) nproc central_service worker_data =
+ let create_gen init ((in_stream, push), in_stream_mutex) nproc central_service worker_data =
let proc_pool = Array.make nproc None in
Array.iteri (
fun i _ ->
@@ -286,7 +281,7 @@ struct
let jobs =
Lwt.join
(List.map
- (pull_task kill_workers in_stream central_service)
+ (pull_task kill_workers in_stream in_stream_mutex central_service)
worker_info)
in
@@ -315,7 +310,7 @@ struct
let default_init worker_info = ()
let create ?(init = default_init) nproc central_service worker_data =
- create_gen init (Lwt_stream.create ()) nproc central_service worker_data
+ create_gen init (Lwt_stream.create (), Lwt_mutex.create ()) nproc central_service worker_data
let close p =
p.close ()
@@ -402,8 +397,9 @@ struct
in
let p, t =
create_gen init
- (task_stream,
- (fun _ -> assert false) (* push *))
+ ((task_stream,
+ (fun _ -> assert false) (* push *)),
+ Lwt_mutex.create ())
nproc serv env
in
try
|