File: mapply.ml

package info (click to toggle)
herdtools7 7.58-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 19,732 kB
  • sloc: ml: 128,583; ansic: 3,827; makefile: 670; python: 407; sh: 212; awk: 14
file content (313 lines) | stat: -rw-r--r-- 9,491 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
(****************************************************************************)
(*                           the diy toolsuite                              *)
(*                                                                          *)
(* Jade Alglave, University College London, UK.                             *)
(* Luc Maranget, INRIA Paris-Rocquencourt, France.                          *)
(*                                                                          *)
(* Copyright 2015-present Institut National de Recherche en Informatique et *)
(* en Automatique and the authors. All rights reserved.                     *)
(*                                                                          *)
(* This software is governed by the CeCILL-B license under French law and   *)
(* abiding by the rules of distribution of free software. You can use,      *)
(* modify and/ or redistribute the software under the terms of the CeCILL-B *)
(* license as circulated by CEA, CNRS and INRIA at the following URL        *)
(* "http://www.cecill.info". We also give a copy in LICENSE.txt.            *)
(****************************************************************************)
open Printf

type mode = Buff | File
(* Task engine *)

module type TArg = sig
  val com : string
  val comargs : string list
  val verbose : int
  val mode : mode
end

module Task(A:TArg) = struct
  let stdout_chan = stdout
  open Unix
  module W = Warn.Make(A)

  let update_exit_status,get_exit_status =
    let exit_status = ref 0 in
    (fun n -> if n <> 0 && !exit_status = 0 then exit_status := n),
    (fun () -> !exit_status)

  type task =
      { idx : int ; com : string ; chan : in_channel ;
        oname : string ; buff : Buffer.t; }
(* Fork utility *)
  let dir =
    Filename.concat (Filename.get_temp_dir_name ())
      (sprintf "mapply.%i" (getpid()))

  let rmrf dir = ignore (Sys.command (sprintf "/bin/rm -rf %s" dir))

  let _ =
    match A.mode with
    | File ->
        let doit signum =
          Sys.set_signal signum
            (Sys.Signal_handle
               (fun _ -> rmrf dir ; exit 2)) in
        doit Sys.sigint ;
        doit Sys.sigquit ;
        doit Sys.sigterm ;
        doit Sys.sighup ;
        ()
    | Buff -> ()


  let nobuff = Buffer.create 0

  let popen idx cmd args name =
    try
      let base =
        try
          Filename.chop_extension (Filename.basename name)
        with Invalid_argument _ ->
          Warn.warn_always "Ignoring file %s, since it has no extension" name ;
          raise Exit in
      let oname = Filename.concat dir (sprintf "%s-%02i.txt" base idx) in
      let com =
        let opts = match args with
        | [] -> ""
        | _::_ -> " " ^ String.concat " " args in
        match A.mode with
        | File -> sprintf "%s%s %s>%s" cmd opts name oname
        | Buff -> sprintf "%s%s %s" cmd opts name in
      if A.verbose > 2 then eprintf "Starting: '%s' on %02i\n" com idx ;
      let chan = Unix.open_process_in com in
      begin match A.mode with
      | File -> ()
      | Buff -> set_nonblock (descr_of_in_channel chan)
      end ;
      let buff = match A.mode with  Buff -> Buffer.create 128 | File -> nobuff in
      Some { com; idx; buff; chan; oname;}
    with Exit -> None

  let table = Hashtbl.create 17

  let get_waiting () = Hashtbl.fold (fun fd _ r -> fd::r) table []

  let rec start_task idx (nrun,iter as k) = match iter with
  | None -> k
  | Some iter -> match Misc.next_iter iter with
    | Some (name,iter) ->
        let task = popen idx A.com A.comargs name in
        begin match task with
        | Some task ->
            let fd = descr_of_in_channel task.chan in
            Hashtbl.add table fd task ;
            if A.verbose > 1 then eprintf "Start %02i\n%!" idx ;
            nrun+1,Some iter
        | None ->  start_task idx (nrun,Some iter)
      end
    | None -> nrun,None

  let sz = match A.mode with File -> 1024 | Buff -> 1024

  let warn_status st =
    Warn.warn_always "task ended with %s"
      (match st with
      | WEXITED i -> sprintf "exit %i" i
      | WSIGNALED i -> sprintf "signaled %i" i
      | WSTOPPED i -> sprintf "stopped %i" i)

  let to_stdout oname =
    Misc.input_protect
      (fun chan ->
        let buff = Bytes.create sz in
        try
          while true do
            match input chan buff 0 sz with
            | 0 -> raise Exit
            | n -> output stdout_chan buff 0 n
          done
        with Exit ->())
      oname ;
    flush stdout_chan ;
    Sys.remove oname

  let task_file (nrun,files) fd =
    let task =
      try Hashtbl.find table fd
      with Not_found -> assert false in
    Hashtbl.remove table fd ;
    begin match close_process_in task.chan with
    | WEXITED n ->
       update_exit_status n ;
       to_stdout task.oname ;
        start_task task.idx (nrun-1,files)
    | st ->
        warn_status st ;
        start_task task.idx (nrun-1,files)
    end


  let to_buff fd t =
    let buff = Bytes.create sz in
    let rec to_rec () =
      try
        if A.verbose > 2 then eprintf "Read %02i\n%!" t.idx ;
        let nread = read fd buff 0 sz in
        if A.verbose > 1 then eprintf "Got %i from %02i\n%!" nread t.idx ;
        match nread with
        | 0 -> true
        | n ->
            Buffer.add_string t.buff (Bytes.sub_string buff 0 n) ;
            to_rec ()
      with
      | Unix_error ((EWOULDBLOCK|EAGAIN),_,_) -> false
      | e -> raise e in
    to_rec ()

  let task_buff (nrun,files as k) fd =
    let task =
      try Hashtbl.find table fd
      with Not_found -> assert false in
    let is_over = to_buff fd task in
    if is_over then begin
      if A.verbose > 1 then eprintf "Over %02i\n%!" task.idx ;
      Hashtbl.remove table fd ;
      begin match close_process_in task.chan with
      | WEXITED n ->
          update_exit_status n ;
          Buffer.output_buffer stdout_chan task.buff ;
          flush stdout_chan
      | st ->
          warn_status st
      end ;
      start_task task.idx (nrun-1,files)
    end else begin
      if A.verbose > 2 then eprintf "Again %02i\n%!" task.idx ;
      k
    end

  let process_task = match A.mode with
  | File -> task_file
  | Buff -> task_buff

  let ppok ok =
     List.iter
      (fun fd ->
        let {idx=idx;_} =
          try Hashtbl.find table fd
          with Not_found -> assert false in
        eprintf " %02i" idx)
      ok ;
    eprintf "\n%!"

  let rec loop (nrun,_ as k) =
    if nrun > 0 then begin
      let fds = get_waiting () in
      assert (List.length fds = nrun) ;
      let ok,_,_ = select fds [] [] (-1.0) in
      if A.verbose > 0 then begin match ok with
      | []|[_] ->
          if A.verbose > 1 then begin
            eprintf "Select" ;
            ppok ok
          end
      | _ ->
          eprintf "Multiple select:" ;
          ppok ok
      end ;
      let k = List.fold_left process_task k ok in
      loop k
    end

  let run j names =
    let names = match names with
    | [] -> Misc.fold_stdin Misc.cons []
    | _::_ -> names in
    let names = Misc.mk_iter names in
    begin match A.mode with
    | File -> mkdir dir 0o700
    | Buff -> ()
    end ;
    let rec start_rec k = function
      | 0 -> k
      | j -> start_rec (start_task j k) (j-1) in
    loop (start_rec (0,Some names) j) ;
    begin match A.mode with
    | File ->
        begin try rmdir dir
        with _ -> W.warn "Cannot delete directory %s" dir end
    | Buff -> ()
    end
end


let args = ref []
let com = ref "echo"
let verbose = ref 0
let j = ref 1
let do_exit = ref false
let mode = ref Buff
let comargs = ref []

let parse_mode tag = match tag with
| "buff" -> Buff
| "file" -> File
| _ ->
    raise
      (Arg.Bad (sprintf "%s: bad rag for option -mode" tag))

let pp_mode = function
  | Buff -> "buff"
  | File -> "file"

let set_mode tag = mode := parse_mode tag

let usage = String.concat "\n" [
  Printf.sprintf "Usage: %s [options] [<token> ...]" (Filename.basename Sys.argv.(0)) ;
  "" ;
  "Apply a command to every non-option token on the command-line. If none are" ;
  "provided, tokens are read from stdin. Tokens that start with `@` are" ;
  "interpreted as filepaths, and the lines of the file are read as tokens." ;
  "" ;
  "Options:" ;
]

let () =
  Arg.parse
    ["-v", Arg.Unit (fun () -> incr verbose)," be verbose";
     "-j", Arg.Int (fun i -> j := i),"<n> manage <n> simultaneaous tasks" ;
     "-exit",
     Arg.Bool (fun b -> do_exit := b),
     Printf.sprintf "replicate (first) non-zero exit status, default %b"
       !do_exit;
     "-com", Arg.String (fun c -> com := c),"<com> set command (default echo)";
     "-comargs",
     Arg.String (fun args -> comargs := !comargs @ Misc.split_comma args),
     "<args> initial arguments for command (comma separated)";
     "-mode", Arg.String set_mode,
     sprintf
       "(buff|file) use either internal buffers or files for comunication, default %s" (pp_mode !mode);]
    (fun arg -> args := arg :: !args)
    usage

let names = !args


let () =
  if !j <= 1 then
    let do_test name =
      let comargs = String.concat " " !comargs in
      let com = sprintf "%s %s %s" !com comargs name in
      ignore (Sys.command com) in
    Misc.iter_argv_or_stdin do_test names
  else
    let module T =
      Task
        (struct
          let com = !com
          let comargs = !comargs
          let verbose = !verbose
          let mode = !mode
        end) in
    T.run !j names ;
    if !do_exit then T.get_exit_status () |> exit