File: joinPool.mli

package info (click to toggle)
jocaml 4.01.0-3
  • links: PTS, VCS
  • area: main
  • in suites: jessie, jessie-kfreebsd
  • size: 16,736 kB
  • ctags: 23,836
  • sloc: ml: 111,262; ansic: 32,746; sh: 6,057; lisp: 4,230; makefile: 3,861; asm: 3,734; awk: 88; perl: 45; fortran: 21; sed: 19; cs: 9
file content (220 lines) | stat: -rw-r--r-- 7,790 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
(***********************************************************************)
(*                                                                     *)
(*                           Objective Caml                            *)
(*                                                                     *)
(*            Luc Maranget, projet Moscova, INRIA Rocquencourt         *)
(*                                                                     *)
(*  Copyright 2004 Institut National de Recherche en Informatique et   *)
(*  en Automatique.  All rights reserved.  This file is distributed    *)
(*  under the terms of the Q Public License version 1.0.               *)
(*                                                                     *)
(***********************************************************************)

(* $Id$ *)

(** Dynamic dispatch of work among registered agents. *)


(** Simple implementation: use-once pools that can be used
    to dispatch one task. *)
module Simple : sig
(** {6 Enumerations} *)

type ('a, 'b) enum =
  { start : unit -> 'a;
    step : 'a -> ('b * 'a) option; }
(** Enumerations iterate over values in a functional way;
    ['a] is the type of the enumeration state,
    while ['b] is the type of the enumeration elements.

    [start] should return a new state pointing to the start of the enumeration.

    [step] should map a state to a (current element, next state) couple,
    returning [None] if there is no more element to return. *)

val enum_of_interval : int -> int -> (int, int) enum
(** [enum_of_interval inf sup] returns an enumeration that will iterate
    the interval from [int] to [sup]. *)

val enum_of_list : 'a list -> ('a list, 'a) enum
(** [enum_of_list l] returns an enumeration that will iterate over the
    elements of [l]. *)


(** {6 Pools} *)

type ('a, 'b, 'c) t =
  { register : ('a -> 'b) Join.chan;
    wait : unit -> 'c; }
(** Pools dispatch computations among registered agents, re-issuing pending
    tasks if agents do not send computation outcomes.

    Given a pool [p], returned by [create e comb y0]:
    - [p.register f] is used by agents to indicate that they can perform
      computations [f], mapping [xi] values to [yi] results.
    - [p.wait ()] returns the combined result [comb y1 (comb y2 (... (comb yn y0)))],
      where the [yi] values are
      the results of the [xi] applied to the functions
      registered by the agents. The [xi] are the values returned by
      the enumumeration specified at pool creation time. *)

val create : ('d, 'a) enum -> ('b -> 'c -> 'c) -> 'c -> ('a, 'b, 'c) t
(** [create e comb y0] returns a pool for computations of type ['a -> 'b],
    [comb] being used to combine results with initial result [y0].
    The enumeration [e] is used to generate the input values for the
    various computations. *)

end


(** Advanced implementation with functorial interface *)
module Shared : sig

(** This module provides advanced task management.
    Pools dispatch computations among registered agents, re-issuing pending
    tasks if agents do not send computation outcomes.
    It improves over the more simple {!Simple} in the following aspects:
    - The same pool can be shared by several computations.
    - More efficient handling of task re-issuing: fresh tasks have priority
      over re-issued tasks.
    - Ability to abort duplicated tasks when outcome reaches the pool.
    - A little control on pool behavior is offered by the means
      of the {!Config} module
      argument.
*)

(** {6 Arguments} *)


(** Configuration of pool *)
module type Config = sig
  val debug : bool
(** If true, gives a few diagnostics on the standard error stream. *)
  val nagain : int
(** A given task will be re-issued at most [nagain] times.
    No limit is enforced when [nagain] is strictly less that zero *)
end

(** Functional enumerations *)
module type Enumerable = sig


(** Signature of iterators: modules that
    offer a functional iterator over collections (type [t])
    of elements (type [elt]) in a functional way *)

  type t     (** Collection *)
  type elt   (** Elements in the collection *)
  type enum  (** Explicit state. *)


  val start : t -> enum
  (** Start iterating over a collection, [start c]
      returns the initial state *)

  val step : enum -> (elt * enum) option
  (** Iterate once, [step st] returns [None] when iteration is over,
      or [Some (e,st')] otherwise, where [e] is the next element and
      [st']  is the next explicit state. *)

(** An example: iterator over integer intervals:

{[module Interval = struct
  type t = { low : int; high : int; } (* Interval (low..high) *)
  type elt = int
  type enum = { next : int; max : int; }

  let start x = { next=x.low; max=x.high; }

  let step x =
    if x.next > x.max then None
    else Some (x.next, { x with next=x.next+1; })
end]}
*)

(** Another example: iterator over a list:

{[module  ListMake(E:sig type elt end) = struct
  type t = E.elt list
  type elt = E.elt
  type enum = t

  let start xs = xs

  let step = function
  | [] -> None
  | x::xs -> Some (x,xs)

end]}

*)

end


  

(** {6 Pools} *)

  type ('elt,'partial) worker = 'elt -> 'partial
(** Standard workers *)

  type subtask_id = int (** Subtask identifier *)

  type ('elt,'partial) interruptible_worker =
      subtask_id * 'elt -> 'partial option (** Workers that can be aborted asynchronously *)
  type kill = subtask_id Join.chan (** To abort given subtask *)


(** Output signature of the pool functor *)
module type S = sig
  type elt         (** Element from a collection *)
  type collection  (** Collection *)

  type ('partial, 'result) t = {
      register : (elt,'partial) worker Join.chan;
      register_interruptible :
        ((elt,'partial) interruptible_worker * kill) Join.chan;
      fold :
        collection -> ('partial -> 'result -> 'result) -> 'result -> 'result;
    }
(** Pools dispatch computations among registered agents, re-issuing pending
    tasks if agents do not send computation outcomes.

    Given a pool [p], returned by [create ()]:
    - [p.register w] is used by agents to indicate that they can perform
      computations, mapping [xi] values to [yi] results, using the
      synchronous channel [w].
    - [p.fold  c comb y0] returns the combined result
      [comb y1 (comb y2 (... (comb yn y0)))],
      where the [yi] values are the results of the [xi]
      transformed by the functions
      registered by the agents. The [xi] result from enumerating the collection
      [c]. The enumeration technique is specified by the module argument [E]
      (signature {!Enumerable})
      to the functor {!Make}.
    - [p.register_interruptible (w,k)] is used by agents to indicate that
      they can perform computations as above.
      Additionally the pool logics will attempt to abort computations
      found to be useless by issusing messages on channel [k].
      More specifically, when
      given an argument [(id,xi)] by the pool logics,
      the synchronous channel [w] should return [Some yi],
      where [xi] and [yi] are the same as in the description of [p.fold] above.
      However, if the pool sends [id] on channel [k] before [yi] is
      available, then the agent may abort the computation of [yi],
      so as to spare computing power.
      In that case, [w(xi)] must reply [None].
      It is the agent responsability to check that subtask identifiers
      sent on the [w] and [k] channels
      are equal before aborting the subtask and having [w] to reply [None] *)
     

  val create : unit ->  ('partial, 'result) t 
  (** Pool creator *)
end

(** Functor to build pools, given an enumeration technique specification ([E]) *)module Make (C:Config) (E:Enumerable) :
    S with type elt = E.elt and type collection = E.t

end