1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320
|
(* $Id$ *)
(* POSIX semaphores: A better implementation would use POSIX semaphores.
For each Netplex semaphore <name> we define
- One POSIX semaphore <prefix>_<name>
- One POSIX semaphore <prefix>_<name>_isprotected (only for
storing the protected attribute)
- If the semaphore is protected, another set of counters
<prefix>_<name>_<container> for every container. These cannot be
semaphores, because negative values are possible. A shared memory
segment is possible (memory cells are managed by the controller,
and RPC calls are used for this).
The Netplex operations map nicely to POSIX operations:
- Netplex create: sem_open with O_CREAT and O_EXCL
- Netplex increment: sem_post. If the semaphore is protected, the
container-specific counter is also incremented (and created with
value 0 if not existing).
- Netplex decrement w/o wait: sem_trywait. If the decrement is successful,
the container-specific counter is also decremented.
- Netplex decrement with wait: sem_wait. If the decrement is successful,
the container-specific counter is also decremented.
If the container crashes, the controller looks at the container-specific
counter, and calls sem_post or sem_trywait as often as the counter says:
sem_post for negative values, and sem_trywait for positive values.
*)
open Netplex_types
let int64_incr v =
v := Int64.succ !v
let int64_decr v =
v := Int64.pred !v
let release = ref (fun () -> ())
let plugin_i =
( object(self)
val mutable semaphores = Hashtbl.create 50
val mutable containers = Hashtbl.create 50
initializer (
release :=
(fun () ->
semaphores <- Hashtbl.create 1;
containers <- Hashtbl.create 1
)
)
method required = []
method program =
Netplex_ctrl_aux.program_Semaphore'V1
method ctrl_added _ =
()
method ctrl_unplugged ctrl =
List.iter
(fun cid ->
self # ctrl_container_finished ctrl cid false
)
ctrl#containers
method ctrl_receive_call ctrl cid procname procarg_val reply =
match procname with
| "ping" ->
reply(Some(Netplex_ctrl_aux._of_Semaphore'V1'ping'res ()))
| "increment" ->
let sem_name =
Netplex_ctrl_aux._to_Semaphore'V1'increment'arg procarg_val in
let r =
self # increment ctrl cid sem_name in
reply(Some(Netplex_ctrl_aux._of_Semaphore'V1'increment'res r))
| "decrement" ->
let proc_reply v =
let v' =
Netplex_ctrl_aux._of_Semaphore'V1'decrement'res v in
reply(Some v') in
let (sem_name, wait_flag) =
Netplex_ctrl_aux._to_Semaphore'V1'decrement'arg procarg_val in
self # decrement_async ctrl cid sem_name wait_flag proc_reply
| "get" ->
let sem_name =
Netplex_ctrl_aux._to_Semaphore'V1'get'arg procarg_val in
let (sem, _, _) = self # get_sem_tuple ctrl sem_name in
reply(Some(Netplex_ctrl_aux._of_Semaphore'V1'get'res !sem))
| "create" ->
let (sem_name, init_val, protected) =
Netplex_ctrl_aux._to_Semaphore'V1'create'arg procarg_val in
let r =
snd(self # get_or_create_sem
ctrl sem_name init_val protected) in
reply(Some(Netplex_ctrl_aux._of_Semaphore'V1'create'res r))
| "destroy" ->
let sem_name =
Netplex_ctrl_aux._to_Semaphore'V1'destroy'arg procarg_val in
self # destroy_sem ctrl sem_name;
reply(Some(Netplex_ctrl_aux._of_Semaphore'V1'destroy'res ()))
| _ ->
failwith "Unknown procedure"
method increment ctrl cid sem_name =
let (sem, protected, waiting) = self # get_sem_tuple ctrl sem_name in
let cont_sem = self # get_cont_sem cid sem_name protected in
int64_incr sem;
int64_incr cont_sem;
let semval = !sem in
if !sem = 1L then (
if not (Queue.is_empty waiting) then (
let (waiting_reply, waiting_cid) = Queue.take waiting in
let waiting_cont_sem =
self # get_cont_sem waiting_cid sem_name protected in
self#really_decrement sem waiting_cont_sem protected;
waiting_reply 0L
)
);
semval
method private decrement_async ctrl cid sem_name wait_flag reply =
let (sem, protected, waiting) = self # get_sem_tuple ctrl sem_name in
let cont_sem = self # get_cont_sem cid sem_name protected in
if !sem > 0L then (
self#really_decrement sem cont_sem protected;
reply !sem
)
else (
if wait_flag then
Queue.push (reply,cid) waiting
else
reply (-1L)
)
method private really_decrement sem cont_sem protected =
assert(!sem > 0L);
int64_decr sem;
if protected then
int64_decr cont_sem
method private get_or_create_sem ctrl sem_name init_val protected =
try
(Hashtbl.find semaphores (ctrl, sem_name), false)
with Not_found ->
let waiting = Queue.create() in
let new_sem = (ref init_val, protected, waiting) in
Hashtbl.add semaphores (ctrl, sem_name) new_sem;
(new_sem, true)
method create_sem ctrl sem_name init_val protected =
snd(self # get_or_create_sem ctrl sem_name init_val protected)
method get_sem_tuple ctrl sem_name =
fst(self # get_or_create_sem ctrl sem_name 0L true)
method get_sem ctrl sem_name =
let ((value,_,_),_) = self # get_or_create_sem ctrl sem_name 0L true in
!value
method private get_cont_sem cid sem_name protected =
if protected then (
let ht =
try Hashtbl.find containers cid
with Not_found ->
let new_ht = Hashtbl.create 1 in
Hashtbl.add containers cid new_ht;
new_ht in
try
Hashtbl.find ht sem_name
with Not_found ->
let new_sem = ref 0L in
Hashtbl.add ht sem_name new_sem;
new_sem
)
else (ref 0L)
method destroy_sem ctrl sem_name =
try
let (_,_,waiting) = Hashtbl.find semaphores (ctrl, sem_name) in
let q = Queue.create() in
Queue.transfer waiting q;
Queue.iter
(fun (waiting_reply, waiting_cid) ->
let ht = Hashtbl.find containers waiting_cid in
Hashtbl.remove ht sem_name;
waiting_reply (-1L)
)
q
with Not_found -> ()
method ctrl_container_finished ctrl cid _ =
try
let ht = Hashtbl.find containers cid in (* or Not_found *)
let sems = ref [] in
Hashtbl.iter
(fun sem_name value ->
(*Netlog.logf `Debug "semaphore shutdown name=%s d=%Ld"
sem_name !value;
*)
let (sem, _, waiting) = self # get_sem_tuple ctrl sem_name in
let zero_flag = (!sem = 0L) in
sem := Int64.sub !sem !value;
if !sem < 0L then sem := 0L;
if zero_flag && !sem > 0L then
sems := sem_name :: !sems
)
ht;
List.iter
(fun sem_name ->
let (sem, protected, waiting) =
self # get_sem_tuple ctrl sem_name in
let v = ref !sem in
while not(Queue.is_empty waiting) && !v > 0L do
let (waiting_reply,waiting_cid) = Queue.take waiting in
let waiting_cont_sem =
self # get_cont_sem waiting_cid sem_name protected in
self#really_decrement sem waiting_cont_sem protected;
waiting_reply 0L;
int64_decr v
done
)
!sems;
Hashtbl.remove containers cid
with
| Not_found -> ()
end
)
let plugin = (plugin_i :> plugin)
let () =
(* Release memory after [fork]: *)
Netsys_posix.register_post_fork_handler
(object
method name = "Netplex_semaphore"
method run () = !release()
end
)
let increment sem_name =
let cont = Netplex_cenv.self_cont() in
Netplex_ctrl_aux._to_Semaphore'V1'increment'res
(cont # call_plugin plugin "increment"
(Netplex_ctrl_aux._of_Semaphore'V1'increment'arg sem_name))
let decrement ?(wait=false) sem_name =
let cont = Netplex_cenv.self_cont() in
Netplex_ctrl_aux._to_Semaphore'V1'decrement'res
(cont # call_plugin plugin "decrement"
(Netplex_ctrl_aux._of_Semaphore'V1'decrement'arg (sem_name, wait)))
let get sem_name =
try
match Netplex_cenv.self_obj() with
| `Container cont ->
let cont = Netplex_cenv.self_cont() in
Netplex_ctrl_aux._to_Semaphore'V1'get'res
(cont # call_plugin plugin "get"
(Netplex_ctrl_aux._of_Semaphore'V1'get'arg sem_name))
| `Controller ctrl ->
plugin_i # get_sem ctrl sem_name
with
| Not_found ->
raise Netplex_cenv.Not_in_container_thread
let create ?(protected=false) sem_name init_val =
try
match Netplex_cenv.self_obj() with
| `Container cont ->
Netplex_ctrl_aux._to_Semaphore'V1'create'res
(cont # call_plugin plugin "create"
(Netplex_ctrl_aux._of_Semaphore'V1'create'arg
(sem_name, init_val, protected)))
| `Controller ctrl ->
plugin_i # create_sem ctrl sem_name init_val protected
with
| Not_found ->
raise Netplex_cenv.Not_in_container_thread
let destroy sem_name =
try
match Netplex_cenv.self_obj() with
| `Container cont ->
Netplex_ctrl_aux._to_Semaphore'V1'destroy'res
(cont # call_plugin plugin "destroy"
(Netplex_ctrl_aux._of_Semaphore'V1'destroy'arg sem_name))
| `Controller ctrl ->
plugin_i # destroy_sem ctrl sem_name
with
| Not_found ->
raise Netplex_cenv.Not_in_container_thread
let ctrl_increment sem_name cid =
try
match Netplex_cenv.self_obj() with
| `Container cont ->
failwith "Netplex_semaphore.ctrl_increment: not in controller context"
| `Controller ctrl ->
plugin_i # increment ctrl cid sem_name
with
| Not_found ->
raise Netplex_cenv.Not_in_container_thread
|