File: death_monitor.ml

package info (click to toggle)
marionnet 0.90.6+bzr508-1
  • links: PTS, VCS
  • area: main
  • in suites: bullseye, buster, sid
  • size: 9,532 kB
  • sloc: ml: 18,130; sh: 5,384; xml: 1,152; makefile: 1,003; ansic: 275
file content (194 lines) | stat: -rw-r--r-- 7,230 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
(* This file is part of Marionnet, a virtual network laboratory
   Copyright (C) 2007, 2008, 2009  Luca Saiu
   Copyright (C) 2010  Jean-Vincent Loddo
   Copyright (C) 2007, 2008, 2009, 2010  Université Paris 13

   This program is free software: you can redistribute it and/or modify
   it under the terms of the GNU General Public License as published by
   the Free Software Foundation, either version 2 of the License, or
   (at your option) any later version.

   This program is distributed in the hope that it will be useful,
   but WITHOUT ANY WARRANTY; without even the implied warranty of
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
   GNU General Public License for more details.

   You should have received a copy of the GNU General Public License
   along with this program.  If not, see <http://www.gnu.org/licenses/>. *)

open Unix;;
open Sys;;
open Mutex;;

(** This functionality allows the user to register a callback to invoke in the event of
    the unexpected death of each given process. When a process death is detected
    the callback is invoked, and the process is automatically un-registered.
    Process death is not detected immediately, as the implementation is based on
    polling. *)

(** Define an associative map with pids as keys: *)
module OrderedInt = struct
  type t = int;;
  let compare = compare;;
end;;
module Map = Map.Make(OrderedInt);;
type process_name = string;;

type map = (process_name *  (* name of the executable program we're monitoring *)
            (int -> bool) * (* how to check whether we should invoke the callback *)
            (int -> process_name -> unit)) (* the callback *)
            Map.t;;

let linearize_map (map : map) =
  Map.fold
    (fun (pid : int) (name, predicate, thunk) list ->
      (pid, (name, predicate, thunk)) :: list)
    map
    [];;

(** A map mapping each pid into the callback to invoke when the process dies: *)
let processes_to_be_monitored : map ref = ref Map.empty;;
let poll_interval = ref 1.0;; (* in seconds *)
let map_size = ref 0;;

(** The death_monitor_mutex protecting processes_to_be_monitored from concurrent accesses,
    poll_interval and map_size: *)
let death_monitor_mutex = Mutex.create ();;

(** Return true iff we are currently monitoring the given process. Not thread-safe, only
    for internal use *)
let __are_we_monitoring pid =
  Map.mem pid !processes_to_be_monitored;;

(** Return true iff we are currently monitoring the given process.*)
let are_we_monitoring pid =
  lock death_monitor_mutex;
  let result = __are_we_monitoring pid in
  unlock death_monitor_mutex;
  result;;

(** The predefined predicate returning true if we should invoke the callback: *)
let default_predicate pid =
   not (UnixExtra.is_process_alive pid);;

(** Start monitoring the process with the given pid. Call the given function if
    it ever dies, using the pid and process name as its parameters. This is
    thread-safe. *)
let start_monitoring ?(predicate=default_predicate) pid name callback =
  lock death_monitor_mutex;
  (if Map.mem pid !processes_to_be_monitored then begin
    Log.printf1 "WARNING (THIS MAY BE SERIOUS): death_monitor: I was already monitoring %d\n" pid;
  end
  else begin
    processes_to_be_monitored :=
      Map.add pid (name, predicate, callback) !processes_to_be_monitored;
    map_size := !map_size + 1;
    (* We don't want to create zombies: let's asynchronously call waitpid on the process; this is
       important, otherwise other implementation of is_process_alive using kill with a 0 value for
       the signal will see the process as existing. *)
    let _ =
      Thread.create
        (fun () -> UnixExtra.Process.waitpid_non_intr ~wait_flags:[] pid)
        () in
    ();
  end);
  unlock death_monitor_mutex;;

(** Stop monitoring the process with the given pid. Not thread-safe, only for
    internal use. Users should call stop_monitoring instead. *)
let __stop_monitoring pid =
  if Map.mem pid !processes_to_be_monitored then begin
    processes_to_be_monitored := Map.remove pid !processes_to_be_monitored;
    map_size := !map_size - 1;
  end
  else begin
    Log.printf1 "WARNING: death_monitor: I was not monitoring %d\n" pid;
  end;;

(** Stop monitoring the process with the given pid. Thread-safe. *)
let stop_monitoring pid =
  lock death_monitor_mutex;
  try
    __stop_monitoring pid;
    unlock death_monitor_mutex;
  with e -> begin
    (* Don't leave the death_monitor_mutex locked when raising: *)
    unlock death_monitor_mutex;
    (* Re-raise: *)
    Log.printf1 "stop_monitoring: re-raising %s.\n" (Printexc.to_string e);
    raise e;
  end;;

(** Check the status of all processes which were registered, and invoke callbacks
    if needed. Thread-safe, but only for internal use. *)
let poll () =
  lock death_monitor_mutex;
  let thunks =
    List.map
      (fun (pid, (name, predicate, callback)) ->
        (fun () ->
          try if predicate pid then
            (* Only invoke the callback if we are *still* monitoring the process. Of
               processes tend to die in clusters, due to the fact that we often kill
               ALL the processes implementing a device if any single one fails. *)
            if are_we_monitoring pid then
              callback pid name
          with _ ->
            ()))
      (linearize_map !processes_to_be_monitored) in
  unlock death_monitor_mutex;
  List.iter
    (fun thunk -> thunk ())
    thunks;;

(** Update the poll interval length, which will become effective after the current
    poll intervall expires. Using a zero or negative parameter causes the polling
    loop to terminate. Thread-safe. *)
let set_poll_interval seconds =
  lock death_monitor_mutex;
  poll_interval := seconds;
  unlock death_monitor_mutex;;

(** Get the current poll interval. Thread-safe. *)
let get_poll_interval seconds =
  lock death_monitor_mutex;
  let result = !poll_interval in
  unlock death_monitor_mutex;
  result;;

let rec poll_in_a_loop interval_length =
  if interval_length <= 0.0 then begin
    Log.printf "Exiting from the infinite polling loop.\n";
  end
  else begin
    poll ();
    (try
      Thread.delay interval_length;
    with _ ->
      ()); (* we don't care very much if sleep is interrupted by a signal *)
    let interval_length = get_poll_interval () in
    poll_in_a_loop interval_length;
  end;;

(** Start polling in a loop: *)
let start_polling_loop () =
  Log.printf "Starting the infinite polling loop.\n";
  poll_in_a_loop (get_poll_interval ());;

(** Stop polling (at the end of the current interval). This version locks
    death_monitor_death_monitor_mutex, so it is thread safe. *)
let stop_polling_loop () =
  Log.printf "Stopping the infinite polling loop (locked).
If the program hangs at this point then you are probably using the
locked version within a callback. See the comment in death_monitor.ml .\n";
  set_poll_interval (-1.0);;

(** See the comment before stop_polling_loop. Non thread-safe. *)
let __stop_polling_loop () =
  Log.printf "Stopping the infinite polling loop (non-locked).\n";
  poll_interval := -1.0;; (* this does not touch the death_monitor_mutex *)

let _ =
  Thread.create
    (fun () -> start_polling_loop ())
    ();;