File: 0004-slightly-more-efficient-fix-for-concurrent-access-to.patch

package info (click to toggle)
nproc 0.5.1-5
  • links: PTS, VCS
  • area: main
  • in suites: sid, trixie
  • size: 208 kB
  • sloc: ml: 1,656; makefile: 85
file content (88 lines) | stat: -rw-r--r-- 3,006 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
From: pveber <philippe.veber@gmail.com>
Date: Mon, 1 Apr 2013 12:06:03 +0200
Subject: slightly more efficient fix for concurrent access to [in_stream]

now each pool has its own mutex.
---
 nproc.ml | 34 +++++++++++++++-------------------
 1 file changed, 15 insertions(+), 19 deletions(-)

diff --git a/nproc.ml b/nproc.ml
index d42ccef..62252ce 100644
--- a/nproc.ml
+++ b/nproc.ml
@@ -153,26 +153,21 @@ struct
     try Unix.waitpid [] pid
     with Unix.Unix_error (Unix.EINTR, _, _) -> waitpid pid
 
-  let mutex = Lwt_mutex.create ()
-
   (* --master-- *)
-  let pull_task kill_workers in_stream central_service worker =
+  let pull_task kill_workers in_stream in_stream_mutex central_service worker =
     (* Note: input and output file descriptors are automatically closed 
        when the end of the lwt channel is reached. *)
     let ic = Lwt_io.of_fd ~mode:Lwt_io.input worker.worker_in in
     let oc = Lwt_io.of_fd ~mode:Lwt_io.output worker.worker_out in
     let rec pull () =
-      Lwt.bind (Lwt_mutex.lock mutex) (fun () ->
-        Lwt.bind (Lwt_stream.get in_stream) (
-          function
-          | None -> Lwt_mutex.unlock mutex ; Lwt.return ()
-          | Some (f, x, g) ->
-              Lwt_mutex.unlock mutex ;
-              let req = Worker_req (f, x) in
-              Lwt.bind
-                (write_value oc req)
-                (read_from_worker g)
-        )
+      Lwt.bind (Lwt_mutex.with_lock in_stream_mutex (fun () -> Lwt_stream.get in_stream)) (
+        function
+        | None -> Lwt.return ()
+        | Some (f, x, g) ->
+            let req = Worker_req (f, x) in
+            Lwt.bind
+              (write_value oc req)
+              (read_from_worker g)
       )
     and read_from_worker g () =
       Lwt.try_bind
@@ -219,7 +214,7 @@ struct
     pull ()
 
   (* --master-- *)
-  let create_gen init (in_stream, push) nproc central_service worker_data =
+  let create_gen init ((in_stream, push), in_stream_mutex) nproc central_service worker_data =
     let proc_pool = Array.make nproc None in
     Array.iteri (
       fun i _ ->
@@ -286,7 +281,7 @@ struct
     let jobs =
       Lwt.join 
         (List.map
-           (pull_task kill_workers in_stream central_service)
+           (pull_task kill_workers in_stream in_stream_mutex central_service)
            worker_info)
     in
 
@@ -315,7 +310,7 @@ struct
   let default_init worker_info = ()
 
   let create ?(init = default_init) nproc central_service worker_data =
-    create_gen init (Lwt_stream.create ()) nproc central_service worker_data
+    create_gen init (Lwt_stream.create (), Lwt_mutex.create ()) nproc central_service worker_data
 
   let close p =
     p.close ()
@@ -402,8 +397,9 @@ struct
       in
       let p, t =
         create_gen init 
-          (task_stream,
-           (fun _ -> assert false) (* push *))
+          ((task_stream,
+            (fun _ -> assert false) (* push *)),
+           Lwt_mutex.create ())
           nproc serv env
       in
       try