File: subprocess.ml

package info (click to toggle)
monotone-viz 1.0.2-2
  • links: PTS
  • area: main
  • in suites: squeeze, wheezy
  • size: 596 kB
  • ctags: 1,406
  • sloc: ml: 5,687; ansic: 779; makefile: 157
file content (225 lines) | stat: -rw-r--r-- 5,926 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
open Viz_misc

let init =
  Giochannel.init ; Gspawn.init

let debug fmt =
  Printf.kprintf 
    (if Viz_misc.debug "spawn" 
    then (fun s -> Printf.eprintf "### spawn: %s\n%!" s)
    else ignore)
    fmt

type encoding = [ `NONE | `LOCALE | `CHARSET of string ]


let setup_channel ~nonblock encoding fd =
  let chan = Giochannel.new_fd (some fd) in
  if nonblock then Giochannel.set_flags_noerr chan [`NONBLOCK] ;
  begin
    match encoding with
    | `NONE -> 
	Giochannel.set_encoding chan None ;
	Giochannel.set_buffered chan false
    | `LOCALE -> 
	let (is_utf8, charset) = Glib.Convert.get_charset () in
	if not is_utf8 
	then Giochannel.set_encoding chan (Some charset)
    | `CHARSET charset ->
	Giochannel.set_encoding chan (Some charset)
  end ;
  chan

let all_done_cb ~nb cb =
  let count = ref nb in
  fun () ->
    decr count ;
    if !count = 0
    then cb ()

type watch = { 
    mutable finished : bool ;
    name : string ;
    chan : Giochannel.t ;
    exn_cb  : exn -> unit ;
    done_cb : unit -> unit ;
  }

let stop_watch w =
  w.finished <- true ;
  debug "%s cb: closing pipe" w.name ;
  try Giochannel.shutdown w.chan true 
  with Giochannel.Error (_, msg) | Glib.Convert.Error (_, msg) ->
    debug "%s cb: error closing pipe %s" w.name msg

let reset_watch w continue =
  if not continue
  then begin
    stop_watch w ;
    w.done_cb ()
  end ;
  continue

let in_channel_watch w input =
  let input_pos = ref 0 in
  let callback conditions =
    debug "stdin  cb: %d left in buffer" (String.length input - !input_pos) ;

    let to_write = String.length input - !input_pos in
    let do_write = ref (to_write > 0 && List.mem `OUT conditions) in

    if !do_write
    then begin
      let bytes_written = ref 0 in
      try
	match Giochannel.write_chars w.chan ~bytes_written ~off:!input_pos input with
	| `NORMAL written ->
	    debug "stdin  cb: wrote %d" written ;
	    input_pos := !input_pos + written
	| `AGAIN -> 
	    debug "stdin  cb: EAGAIN ?"
      with 
      | Giochannel.Error (_, msg)
      | Glib.Convert.Error (_, msg) as exn ->
	  w.exn_cb exn ;
	  debug "stdin  cb: error %s, wrote %d" msg !bytes_written ; 
	  do_write := false
    end ;

    reset_watch w !do_write in

  Giochannel.add_watch w.chan [ `OUT ; `HUP ; `ERR ] callback


let out_channel_watch w b =
  let sb = String.create 4096 in
  let callback conditions =
    let need_to_read = ref (List.mem `IN conditions) in

    if !need_to_read 
    then begin
      try 
	match Giochannel.read_chars w.chan sb with
	| `NORMAL read ->
	    debug "%s cb: read %d" w.name read ;
	    Buffer.add_substring b sb 0 read 
	| `EOF ->
	    debug "%s cb: eof" w.name ;
	    need_to_read := false
	| `AGAIN ->
 	    debug "%s cb: AGAIN" w.name
      with
      | Giochannel.Error (_, msg)
      | Glib.Convert.Error (_, msg) as exn ->
	  w.exn_cb exn ;
	  debug "%s cb: error %s" w.name msg ; 
	  need_to_read := false
    end ;

    reset_watch w !need_to_read in

  Giochannel.add_watch w.chan [ `IN ; `HUP ; `ERR ] callback

let pid_watch pid callback =
  let callback status =
    debug "child %d exiting, status %d" (Gspawn.int_of_pid pid) status ;
    callback status ; () in
  Gspawn.add_child_watch pid callback


type t = {
    mutable watches : (watch * Giochannel.source_id) list ;
    mutable aborted : bool ;
    mutable status  : int ;
  }

let spawn ~encoding ~cmd ~input:input_opt ~reap_callback done_callback =
  if Viz_misc.debug "exec"
  then Printf.eprintf "### exec: Running '%s'\n%!" (String.concat " " cmd) ;
  let has_input = input_opt <> None in
  let spawn_flags = 
    [ `PIPE_STDOUT ; `PIPE_STDERR ;
      `SEARCH_PATH ; `DO_NOT_REAP_CHILD ] in
  
  let child_info =
    Gspawn.async_with_pipes 
      (if has_input then `PIPE_STDIN :: spawn_flags else spawn_flags)
      cmd in

  let state = { watches = [] ; aborted = false ; status = -1 } in

  let out_buffer = Buffer.create 4096 in
  let err_buffer = Buffer.create 1024 in
  let exn_list = ref [] in

  let all_done = 
    all_done_cb 
      ~nb:(if has_input then 4 else 3)
      (fun () -> 
	if not state.aborted 
	then
	  done_callback 
	    ~exceptions:!exn_list
	    ~stdout:(Buffer.contents out_buffer) 
	    ~stderr:(Buffer.contents err_buffer) state.status) in

  let exn_cb exn = 
    exn_list := exn :: !exn_list in

  let add_watch w id =
    state.watches <- (w, id) :: state.watches in

  if has_input then begin
    let ic = setup_channel ~nonblock:true encoding child_info.Gspawn.standard_input in
    let in_watch = { name = "stdin" ; finished = false ; chan = ic ; 
		     exn_cb = exn_cb ; done_cb = all_done } in
    let in_id = in_channel_watch in_watch (some input_opt) in
    add_watch in_watch in_id
  end ;

  begin
    let oc = setup_channel ~nonblock:false encoding child_info.Gspawn.standard_output in
    let out_watch = { name = "stdout" ; finished = false ; chan = oc ;
		      exn_cb = exn_cb ; done_cb = all_done } in
    let out_id = out_channel_watch out_watch out_buffer in
    add_watch out_watch out_id
  end ; 

  begin
    let ec = setup_channel ~nonblock:false encoding child_info.Gspawn.standard_error in
    let err_watch = { name = "stderr" ; finished = false ; chan = ec ;
		      exn_cb = exn_cb ; done_cb = all_done } in
    let err_id = out_channel_watch err_watch err_buffer in
    add_watch err_watch err_id
  end ; 

  let pid = some child_info.Gspawn.pid in
  ignore (pid_watch pid 
	    (fun s -> 
	      state.status <- s ; 
	      begin
		try reap_callback () 
		with _ -> ()
	      end ;
	      Gspawn.close_pid pid ;
	      all_done ())) ;

  state

type callback = 
  exceptions:exn list -> 
  stdout:string -> 
  stderr:string -> 
  int -> unit

let abort sub_data =
  if not sub_data.aborted then begin
    sub_data.aborted <- true ;
    List.iter 
      (fun (w, id) ->
	if not w.finished then begin
	  Giochannel.remove_watch id ;
	  stop_watch w
	end)
      sub_data.watches
  end