File: Stream.h

package info (click to toggle)
ace 6.4.5%2Bdfsg-1
  • links: PTS, VCS
  • area: main
  • in suites: buster
  • size: 48,640 kB
  • ctags: 41,204
  • sloc: cpp: 336,448; perl: 33,068; ansic: 20,676; sh: 3,735; exp: 787; python: 635; yacc: 511; xml: 330; lex: 158; lisp: 116; makefile: 80; csh: 20; tcl: 5
file content (250 lines) | stat: -rw-r--r-- 8,167 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
// -*- C++ -*-

//==========================================================================
/**
 *  @file    Stream.h
 *
 *  @author Douglas C. Schmidt <schmidt@uci.edu>
 */
//==========================================================================

#ifndef ACE_STREAM_H
#define ACE_STREAM_H

#include /**/ "ace/pre.h"

#include /**/ "ace/config-all.h"

#if !defined (ACE_LACKS_PRAGMA_ONCE)
# pragma once
#endif /* ACE_LACKS_PRAGMA_ONCE */

#include "ace/IO_Cntl_Msg.h"
#include "ace/Message_Block.h"
#include "ace/Module.h"
#if defined (ACE_HAS_THREADS)
# include "ace/Condition_Attributes.h"
#endif

ACE_BEGIN_VERSIONED_NAMESPACE_DECL

// Forward decls.
template<ACE_SYNCH_DECL, class TIME_POLICY> class ACE_Stream_Iterator;
class ACE_Time_Value;

/**
 * @class ACE_Stream
 *
 * @brief This class is the primary abstraction for the ASX framework.
 * It is moduled after System V Stream.
 *
 * A Stream consists of a stack of @c ACE_Modules, each of which
 * contains two @c ACE_Tasks.  Even though the methods in this
 * class are virtual, this class isn't really intended for
 * subclassing unless you know what you are doing.  In
 * particular, the ACE_Stream destructor calls <close>, which
 * won't be overridden properly unless you call it in a subclass
 * destructor.
 */
template <ACE_SYNCH_DECL, class TIME_POLICY = ACE_System_Time_Policy>
class ACE_Stream
{
public:
  friend class ACE_Stream_Iterator<ACE_SYNCH_USE, TIME_POLICY>;

  enum
  {
    /// Indicates that @c close() deletes the Tasks.  Don't change this
    /// value without updating the same enum in class ACE_Module...
    M_DELETE = 3
  };

  // = Initializatation and termination methods.
  /**
   * Create a Stream consisting of @a head and @a tail as the Stream
   * head and Stream tail, respectively.  If these are 0 then the
   * ACE_Stream_Head and ACE_Stream_Tail are used, respectively.
   * @a arg is the value past in to the <open> methods of the tasks.
   */
  ACE_Stream (void *arg = 0,
              ACE_Module<ACE_SYNCH_USE, TIME_POLICY> *head = 0,
              ACE_Module<ACE_SYNCH_USE, TIME_POLICY> *tail = 0);

  /**
   * Create a Stream consisting of @a head and @a tail as the Stream
   * head and Stream tail, respectively.  If these are 0 then the
   * ACE_Stream_Head and ACE_Stream_Tail are used, respectively.
   * @a arg is the value past in to the @c open() methods of the tasks.
   */
  virtual int open (void *arg,
                    ACE_Module<ACE_SYNCH_USE, TIME_POLICY> *head = 0,
                    ACE_Module<ACE_SYNCH_USE, TIME_POLICY> *tail = 0);

  /// Close down the stream and release all the resources.
  virtual int close (int flags = M_DELETE);

  /// Close down the stream and release all the resources.
  virtual ~ACE_Stream (void);

  // = ACE_Stream plumbing operations

  /// Add a new module @a mod right below the Stream head.  The
  /// @c open() hook methods of the @c ACE_Tasks in this ACE_Module
  /// are invoked to initialize the tasks.
  virtual int push (ACE_Module<ACE_SYNCH_USE, TIME_POLICY> *mod);

  /// Remove the @a mod right below the Stream head and close it down.
  //  The <close()> hook methods of the <ACE_Tasks> in this ACE_Module
  /// are invoked to cleanup the tasks.
  virtual int pop (int flags = M_DELETE);

  /// Return the top module on the stream (right below the stream
  /// head).
  virtual int top (ACE_Module<ACE_SYNCH_USE, TIME_POLICY> *&mod);

  /// Insert a new module @a mod below the named module @a prev_name.
  virtual int insert (const ACE_TCHAR *prev_name,
                      ACE_Module<ACE_SYNCH_USE, TIME_POLICY> *mod);

  /// Replace the named module @a replace_name with a new module @a mod.
  virtual int replace (const ACE_TCHAR *replace_name,
                       ACE_Module<ACE_SYNCH_USE, TIME_POLICY> *mod,
                       int flags = M_DELETE);

  /// Remove the named module @a mod from the stream.  This bypasses the
  /// strict LIFO ordering of @c push and @c pop.
  virtual int remove (const ACE_TCHAR *mod,
                      int flags = M_DELETE);

  /// Return current stream head.
  virtual ACE_Module<ACE_SYNCH_USE, TIME_POLICY> *head (void);

  /// Return current stream tail.
  virtual ACE_Module<ACE_SYNCH_USE, TIME_POLICY> *tail (void);

  /// Find a particular ACE_Module.
  virtual ACE_Module<ACE_SYNCH_USE, TIME_POLICY> *find (const ACE_TCHAR *mod);

  /// Create a pipe between two Streams.
  virtual int link (ACE_Stream<ACE_SYNCH_USE, TIME_POLICY> &);

  /// Remove a pipe formed between two Streams.
  virtual int unlink (void);

  // = Blocking data transfer operations
  /**
   * Send the message @a mb down the stream, starting at the Module
   * below the Stream head.  Wait for upto @a timeout amount of
   * absolute time for the operation to complete (or block forever if
   * @a timeout == 0).
   */
  virtual int put (ACE_Message_Block *mb,
                   ACE_Time_Value *timeout = 0);

  /**
   * Read the message @a mb that is stored in the stream head.
   * Wait for upto @a timeout amount of absolute time for the operation
   * to complete (or block forever if @a timeout == 0).
   */
  virtual int get (ACE_Message_Block *&mb,
                   ACE_Time_Value *timeout = 0);

  /// Send control message down the stream.
  virtual int control (ACE_IO_Cntl_Msg::ACE_IO_Cntl_Cmds cmd,
                       void *args);

  /// Synchronize with the final close of the stream.
  virtual int wait (void);

  /// Dump the state of an object.
  virtual void dump (void) const;

  /// Declare the dynamic allocation hooks.
  ACE_ALLOC_HOOK_DECLARE;

private:
  /// Actually perform the unlinking of two Streams (must be called
  /// with locks held).
  int unlink_i (void);

  /// Actually perform the linking of two Streams (must be called with
  /// locks held).
  int link_i (ACE_Stream<ACE_SYNCH_USE, TIME_POLICY> &);

  /// Must a new module onto the Stream.
  int push_module (ACE_Module<ACE_SYNCH_USE, TIME_POLICY> *,
                   ACE_Module<ACE_SYNCH_USE, TIME_POLICY> * = 0,
                   ACE_Module<ACE_SYNCH_USE, TIME_POLICY> * = 0);

  /// Pointer to the head of the stream.
  ACE_Module<ACE_SYNCH_USE, TIME_POLICY> *stream_head_;

  /// Pointer to the tail of the stream.
  ACE_Module<ACE_SYNCH_USE, TIME_POLICY> *stream_tail_;

  /// Pointer to an adjoining linked stream.
  ACE_Stream<ACE_SYNCH_USE, TIME_POLICY> *linked_us_;

  // = Synchronization objects used for thread-safe streams.
  /// Protect the stream against race conditions.
  ACE_SYNCH_MUTEX_T lock_;

#if defined (ACE_HAS_THREADS)
  /// Attributes to initialize condition with.
  /* We only need this because some crappy compilers can't
     properly handle initializing the conditions with
     temporary objects. */
  ACE_Condition_Attributes_T<TIME_POLICY> cond_attr_;
#endif

  /// Use to tell all threads waiting on the close that we are done.
  ACE_SYNCH_CONDITION_T final_close_;
};

/**
 * @class ACE_Stream_Iterator
 *
 * @brief Iterate through an ACE_Stream.
 */
template <ACE_SYNCH_DECL, class TIME_POLICY = ACE_System_Time_Policy>
class ACE_Stream_Iterator
{
public:
  // = Initialization method.
  ACE_Stream_Iterator (const ACE_Stream<ACE_SYNCH_USE, TIME_POLICY> &sr);

  // = Iteration methods.

  /// Pass back the @a next_item that hasn't been seen in the set.
  /// Returns 0 when all items have been seen, else 1.
  int next (const ACE_Module<ACE_SYNCH_USE, TIME_POLICY> *&next_item);

  /// Returns 1 when all items have been seen, else 0.
  int done (void) const;

  /// Move forward by one element in the set.  Returns 0 when all the
  /// items in the set have been seen, else 1.
  int advance (void);

private:
  /// Next ACE_Module that we haven't yet seen.
  ACE_Module<ACE_SYNCH_USE, TIME_POLICY> *next_;
};

ACE_END_VERSIONED_NAMESPACE_DECL

#if defined (__ACE_INLINE__)
#include "ace/Stream.inl"
#endif /* __ACE_INLINE__ */

#if defined (ACE_TEMPLATES_REQUIRE_SOURCE)
#include "ace/Stream.cpp"
#endif /* ACE_TEMPLATES_REQUIRE_SOURCE */

#if defined (ACE_TEMPLATES_REQUIRE_PRAGMA)
#pragma implementation ("Stream.cpp")
#endif /* ACE_TEMPLATES_REQUIRE_PRAGMA */

#include /**/ "ace/post.h"

#endif /* ACE_STREAM_H */