File: message_passing.ml

package info (click to toggle)
marionnet 0.90.6+bzr508-1
  • links: PTS, VCS
  • area: main
  • in suites: buster, sid
  • size: 9,532 kB
  • sloc: ml: 18,130; sh: 5,384; xml: 1,152; makefile: 1,003; ansic: 275
file content (102 lines) | stat: -rw-r--r-- 2,691 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
(* This file is part of Marionnet, a virtual network laboratory
   Copyright (C) 2007, 2008  Luca Saiu
   Copyright (C) 2010  Jean-Vincent Loddo
   Copyright (C) 2007, 2008, 2010  Université Paris 13

   This program is free software: you can redistribute it and/or modify
   it under the terms of the GNU General Public License as published by
   the Free Software Foundation, either version 2 of the License, or
   (at your option) any later version.

   This program is distributed in the hope that it will be useful,
   but WITHOUT ANY WARRANTY; without even the implied warranty of
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
   GNU General Public License for more details.

   You should have received a copy of the GNU General Public License
   along with this program.  If not, see <http://www.gnu.org/licenses/>. *)

(** A general-purpose message-passing facility, with transparent
    thread synchronization *)
class ['a] queue = object(self)
  val elements = ref []
  val mutex = Mutex.create ()
  val empty_condition = Condition.create ()

  (** This is not synchronized *)
  method private __empty =
    !elements = []

  method enqueue x =
    Mutex.lock mutex;
    elements := !elements @ [x];
    Condition.signal empty_condition;
    Mutex.unlock mutex

  (* This allows the user to use the queue as a deque, for 'urgent' messages, like
     thread termination requests: *)
  method prepend x =
    Mutex.lock mutex;
    elements := x :: !elements;
    Condition.signal empty_condition;
    Mutex.unlock mutex

  method dequeue : 'a =
    Mutex.lock mutex;
    while self#__empty do
      Condition.wait empty_condition mutex;
    done;
    let result =
      match !elements with
        x :: rest -> elements := rest; x
      | _ -> assert false in
    Mutex.unlock mutex;
    result
end;;

(*
let queue = new queue;;

let make_producer () =
  Thread.create
    (fun () ->
      while true do
        queue#enqueue (Random.int 1000);
      done)
    ();;

let make_consumer =
  let consumer_next_id = ref 1 in
  fun () ->
  let id = ! consumer_next_id in
  consumer_next_id := !consumer_next_id + 1;
  Thread.create
    (fun () ->
      while true do
        Log.printf "From consumer %i: got %i\n" id (queue#dequeue);
        flush_all ();
      done)
    ();;

let w = new task_runner;;

let make_producer x =
  Thread.create
    (fun () ->
      while true do
        w#schedule
          (fun () -> Log.printf "%i" x; flush_all ());
      done)
    ();;

let _ = make_producer 1;;
let _ = make_producer 2;;
let _ = make_producer 3;;
let _ = make_producer 4;;
let _ = make_producer 5;;
let _ = make_producer 6;;

Unix.sleep 30;;

w#terminate;;
*)