File: join_queue.ml

package info (click to toggle)
jocaml 4.01.0-3
  • links: PTS, VCS
  • area: main
  • in suites: jessie, jessie-kfreebsd
  • size: 16,736 kB
  • ctags: 23,836
  • sloc: ml: 111,262; ansic: 32,746; sh: 6,057; lisp: 4,230; makefile: 3,861; asm: 3,734; awk: 88; perl: 45; fortran: 21; sed: 19; cs: 9
file content (76 lines) | stat: -rw-r--r-- 2,357 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
(***********************************************************************)
(*                                                                     *)
(*                           Objective Caml                            *)
(*                                                                     *)
(*            Luc Maranget, projet Moscova, INRIA Rocquencourt         *)
(*                                                                     *)
(*  Copyright 2005 Institut National de Recherche en Informatique et   *)
(*  en Automatique.  All rights reserved.  This file is distributed    *)
(*  under the terms of the Q Public License version 1.0.               *)
(*                                                                     *)
(***********************************************************************)

(* $Id: join_queue.ml 7127 2005-10-13 17:28:32Z maranget $ *)

type 'a t =
  {mutable bag : 'a list ; mutex : Mutex.t ; cond : Condition.t ;
   flush_cond : Condition.t ; mutable flush_wait : int ;
   mutable sender_waits : bool ; }

let create () =
  {
    bag = [] ;
    mutex = Mutex.create () ;
    cond = Condition.create () ;
    flush_cond = Condition.create () ;
    flush_wait = 0 ;
    sender_waits = false ;
  } 

let put q x =
  Mutex.lock q.mutex ;
  q.bag <- x :: q.bag ;
  if q.sender_waits then Condition.signal q.cond ;
  Mutex.unlock q.mutex
   

let rec hard_get q = match q.bag with
| [] ->
    if q.flush_wait > 0 then Condition.broadcast q.flush_cond ;
    assert (not q.sender_waits) ; (* sender is unique *)
    q.sender_waits <- true ;    
    Condition.wait q.cond q.mutex ;
    q.sender_waits <- false ;
    hard_get q
| x::rem ->
    q.bag <- rem ;
    x


let get q =
  Mutex.lock q.mutex ;
  let r = hard_get q in
  Mutex.unlock q.mutex ;
  r

let hard_empty q = match q.bag with
| [] ->
    if not q.sender_waits then
      Condition.wait q.flush_cond q.mutex
| _::_ -> Condition.wait q.flush_cond q.mutex

let wait_empty q = 
  Mutex.lock q.mutex ;
  q.flush_wait <- q.flush_wait + 1 ;
  hard_empty q ;
  q.flush_wait <- q.flush_wait - 1 ;
  Mutex.unlock q.mutex  

let clean q do_it =
  assert (not q.sender_waits) ; (* sender is unique *)
  Mutex.lock q.mutex ;
  let bag = q.bag in
  q.bag <- [] ;
  if q.flush_wait > 0 then Condition.broadcast q.flush_cond ;
  Mutex.unlock q.mutex ;
  List.iter do_it bag