File: joinTextProc.ml

package info (click to toggle)
jocaml 4.01.0-3
  • links: PTS, VCS
  • area: main
  • in suites: jessie, jessie-kfreebsd
  • size: 16,736 kB
  • ctags: 23,836
  • sloc: ml: 111,262; ansic: 32,746; sh: 6,057; lisp: 4,230; makefile: 3,861; asm: 3,734; awk: 88; perl: 45; fortran: 21; sed: 19; cs: 9
file content (194 lines) | stat: -rw-r--r-- 5,282 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
let verbose =
  try
    int_of_string (Sys.getenv "TEXTVERBOSE")
  with _ -> 0


open Printf

let debug tag =
  if verbose > 0 then Join.debug tag
  else  ksprintf (fun _ -> ())


let pp_status = function
  | Unix.WEXITED i -> sprintf "EXIT %i" i
  | Unix.WSIGNALED i -> sprintf "SIGNAL %i" i
  | _ -> assert false

let safe_kill pid s =
  debug "TEXT" "KILL@%i" pid ;
  try Unix.kill pid s
  with Unix.Unix_error _ -> ()

let safe_close_out chan =
  try close_out chan
  with Sys_error _msg -> ()

let rec safe_wait pid =
  try
    let _,st = Unix.waitpid [] pid in
    debug "TEXT" "WAIT@%i: %s" pid (pp_status st);
    st
  with
  | Unix.Unix_error (Unix.EINTR,_,_) -> safe_wait pid



module Async = struct

  type producer = string JoinCom.P.t

  type t =
      { out : producer ;
        err : producer ;
        waitpid : Unix.process_status Join.chan Join.chan ;
        kill : int -> unit;
        gkill : int -> unit; }


  let et =
    let ep = JoinCom.P.empty() in
    { out=ep; err=ep;
      waitpid=(def k(_) = 0 in k);
      kill=(fun _ -> ());
      gkill=(fun _ -> ());
    }

      
  def producer_to_chan (prod,chan) = JoinCom.P.to_text_close (prod,chan)

  let async_kill pid prods chans =
    def kill(sid) =
      debug "TEXT" "KILL %i -> %i" sid pid ;
      List.iter (fun p -> spawn p.JoinCom.P.kill()) prods ;
      List.iter safe_close_out chans ;      
      safe_kill pid sid ;
      reply to kill in
    kill

  let add_kill_wait pid prods chans waited waitpid r =
    spawn waited(safe_wait pid) ;
    { r with
      kill=async_kill pid prods chans;
      gkill=async_kill (-pid) prods chans;
      waitpid=waitpid; }      

  let of_text chan = JoinCom.P.of_text chan

  let command cmd argv =
    let pid = JoinProc.command cmd argv in
    def waited(st) & waitpid(k) = waited(st) & k(st) in
    add_kill_wait pid [] [] waited waitpid et

  let open_in cmd argv =
    let pid,in_chan = JoinProc.open_in cmd argv in
    debug "TEXT" "START %i" pid ;
    def waited(st) & waitpid(k) = waited(st) & k(st) in
    let out = of_text in_chan in
    add_kill_wait pid [out] [] waited waitpid { et with out; }

  let open_out cmd argv input =
    let pid,out_chan = JoinProc.open_out cmd argv in
    debug "TEXT" "START %i" pid ;
    def waited(st) & waitpid(k) = waited(st) & k(st) in
    spawn producer_to_chan (input,out_chan) ;
    add_kill_wait pid [] [out_chan] waited waitpid et

  let open_in_out cmd argv input =
    let pid,(in_chan,out_chan) = JoinProc.open_in_out cmd argv in
    debug "TEXT" "START %i" pid ;
    def waited(st) & waitpid(k) = waited(st) & k(st) in
    let out = of_text in_chan in
    spawn producer_to_chan (input,out_chan) ;
    add_kill_wait pid [out] [out_chan] waited waitpid { et with out; }


  let open_full cmd argv input =
    let pid,(in_chan, out_chan,err_chan) = JoinProc.open_full cmd argv in
    debug "TEXT" "START %i" pid ;
    def waited(st) & waitpid(k) = waited(st) & k(st) in
    let out = of_text in_chan
    and err = of_text err_chan in
    spawn producer_to_chan (input,out_chan) ;
    add_kill_wait pid [out;err] [out_chan] waited waitpid { et with out; err;}

end

module Sync = struct

  type text = string list

  type result =
      { st : Unix.process_status ;
        out : text ;
        err : text ; }

  let er = { st=Unix.WEXITED (-1) ; out=[]; err=[]; }
    
  type t =
      { wait : unit  -> result;
        kill : int -> unit;
        gkill : int -> unit; }


  module P = JoinCom.P

  let list_to_producer = P.of_list

  def consume (prod,k) =
    P.to_list(prod,def kk(xs) = prod.P.kill() & k(xs) in kk) 

  let fst xs = match xs with
  | x::_ -> x
  | [] -> ""

  let tagsync = "SYNC"

  let verb tag k =
     def v(r) =  debug tagsync "%s: %s" tag (fst r) ; k(r) in v

  let command cmd argv =
    let ext = Async.command cmd argv in
    def wait_ter() & waitpid(st) =
      reply { er with st=st; } to wait_ter in
    let () = spawn ext.Async.waitpid(waitpid) in
    { wait=wait_ter; kill=ext.Async.kill; gkill=ext.Async.gkill; }

  let open_in cmd argv =
    let ext = Async.open_in cmd argv in
    def wait_ter() & waitpid(st) & out(os) =
      reply { er with st=st; out=os; } to wait_ter in
    let () = spawn begin
      consume(ext.Async.out,verb "OUT" out) &
      ext.Async.waitpid(waitpid)
    end in
    { wait=wait_ter; kill=ext.Async.kill; gkill=ext.Async.gkill; }


  let open_in_out cmd argv =
    let f = Async.open_in_out cmd argv in
    def fork(input) =
      let ext = f(list_to_producer(input)) in
      def wait_ter() & waitpid(st) & out(os) =
        reply { er with st=st; out=os; } to wait_ter in
      consume(ext.Async.out,verb "OUT" out) &
      ext.Async.waitpid(waitpid) &
      reply { wait=wait_ter; kill=ext.Async.kill; gkill=ext.Async.gkill;}
       to fork in
    fork


  let open_full cmd argv =
    let f = Async.open_full cmd argv in
    def fork(input) =
      let ext = f(list_to_producer(input)) in
      def wait_ter() & waitpid(st) & out(os) & err(es) =
        reply { st=st; out=os; err=es } to wait_ter in
      consume(ext.Async.out,verb "OUT" out) &
      consume(ext.Async.err,verb "ERR" err) &
      ext.Async.waitpid(waitpid) &
      reply { wait=wait_ter; kill=ext.Async.kill; gkill=ext.Async.gkill; }
      to fork in
    fork
end