File: wvstream.h

package info (click to toggle)
wvstreams 4.0.2-4
  • links: PTS
  • area: main
  • in suites: sarge
  • size: 6,420 kB
  • ctags: 6,518
  • sloc: cpp: 52,544; sh: 5,770; ansic: 810; makefile: 461; tcl: 114; perl: 18
file content (635 lines) | stat: -rw-r--r-- 23,124 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
/* -*- Mode: C++ -*-
 * Worldvisions Weaver Software:
 *   Copyright (C) 1997-2002 Net Integration Technologies, Inc.
 * 
 * Provides basic streaming I/O support.
 */ 
#ifndef __WVSTREAM_H
#define __WVSTREAM_H

#include "iwvstream.h"
#include "wvtimeutils.h"
#include <errno.h>
#include <limits.h>

#ifdef _WIN32
#include <time.h>
#include <Winsock2.h>
#include <ws2tcpip.h>
#else
#include <unistd.h> // not strictly necessary, but EVERYBODY uses this...
#include <sys/time.h>
#endif


/**
 * Unified support for streams, that is, sequences of bytes that may or
 * may not be ready for read/write at any given time.
 * 
 * We provide typical read and write routines, as well as a select() function
 * for each stream.
 */
class WvStream: public IWvStream
{
    IMPLEMENT_IOBJECT(WvStream);
public:
    /**
     * 'force' is the list of default SelectRequest values when you use the
     * variant of select() that doesn't override them.
     */
    SelectRequest force;
    
    /**
     * If this is set, select() doesn't return true for read unless the
     * given stream also returns true for write.
     */
    WvStream *read_requires_writable;

    /**
     * If this is set, select() doesn't return true for write unless the
     * given stream also returns true for read.
     */
    WvStream *write_requires_readable;
    
    /** If this is set, enables the use of continue_select(). */
    bool uses_continue_select;

    /** Specifies the stack size to reserve for continue_select(). */
    size_t personal_stack_size;

    /**
     * This will be true during callback execution if the
     * callback was triggered by the alarm going off.
     */
    bool alarm_was_ticking;
    
    /** True if noread()/nowrite()/close() have been called, respectively. */
    bool stop_read, stop_write, closed;
    
    /** Basic constructor for just a do-nothing WvStream */
    WvStream();
    virtual ~WvStream();

    /**
     * Close the stream if it is open; isok() becomes false from now on.
     * Note!!  If you override this function in a derived class, you must
     *   call it yourself from your destructor.  WvStream::~WvStream()
     *   can only call WvStream::close() because of the way virtual
     *   functions work in C++.
     */ 
    virtual void close();

    /** Override seterr() from WvError so that it auto-closes the stream. */
    virtual void seterr(int _errnum);
    void seterr(WvStringParm specialerr)
        { WvErrorBase::seterr(specialerr); }
    void seterr(WVSTRING_FORMAT_DECL)
        { seterr(WvString(WVSTRING_FORMAT_CALL)); }
    
    /** return true if the stream is actually usable right now */
    virtual bool isok() const;
    
    /** read a data block on the stream.  Returns the actual amount read. */
    virtual size_t read(void *buf, size_t count);

    /**
     * Read exactly count bytes from the stream.
     *
     * Notes:
     *      must be using continue_select to use this function.
     *      if timeout strikes or !isok() before count bytes could be read,
     *          nothing is read and 0 is returned.
     *      resets queuemin to 0.
     * 
     * FIXME: yes, that means if the stream closes, continue_read might not
     * read the last bit of data.  You can use read() for that if you want.
     */
    virtual size_t continue_read(time_t wait_msec, void *buf, size_t count);

    /** Read exactly count bytes from the stream, using continue_select(). */
    virtual size_t continue_read(time_t wait_msec, WvBuf &outbuf,
				 size_t count);

    /**
     * Reads up to 'count' bytes of data from the stream into the buffer.
     * Returns the actual amount read.
     *
     * If 'count' is greater than the amount of free space available
     * in the buffer, only reads at most that amount.  You should
     * specify a reasonable upper bound on how much data should
     * be read at once.
     */
    virtual size_t read(WvBuf &outbuf, size_t count);

    /** 
     * Puts data back into the stream's internal buffer.  We cheat so that
     * there's no restriction on how much (or what) data can be unread().
     * This is different from WvBuf::unget() (which is rather restrictive).
     */
    virtual void unread(WvBuf &outbuf, size_t count);

    /**
     * Write data to the stream.  Returns the actual amount written.
     * Since WvStream has an output buffer, it *always* successfully "writes"
     * the full amount (but you might have to flush the buffers later so it
     * actually gets sent).
     */
    virtual size_t write(const void *buf, size_t count);

    /**
     * Writes data to the stream from the given buffer.
     * Returns the actual amount written.
     *
     * If count is greater than the amount of data available in
     * the buffer, only writes at most that amount.
     */
    virtual size_t write(WvBuf &inbuf, size_t count = INT_MAX);

    /**
     * set the maximum size of outbuf, beyond which a call to write() will
     * return 0.  I need to do this for tape backups, since all I can do
     * is write to the loopback as fast as I can, which causes us to run 
     * out of memory and get SIGABRT'd.  (dcoombs: 12/15/2000)
     * 
     * FIXME: there must be a better way.  This confuses the semantics of
     * write(); can you trust it to always write all the bytes, or not?
     */
    void outbuf_limit(size_t size)
        { max_outbuf_size = size; }

    virtual void noread();
    virtual void nowrite();
    virtual void maybe_autoclose();
    
    virtual bool isreadable();
    virtual bool iswritable();
    
    /**
     * unbuffered I/O functions; these ignore the buffer, which is
     * handled by read().  Don't call these functions explicitly unless
     * you have a _really_ good reason.
     * 
     * This is what you would override in a derived class.
     */ 
    virtual size_t uread(void *buf, size_t count)
        { return 0; /* basic WvStream doesn't actually do anything! */ }

    /**
     * unbuffered I/O functions; these ignore the buffer, which is
     * handled by write().  Don't call these functions explicitly unless
     * you have a _really_ good reason.
     * 
     * This is what you would override in a derived class.
     */ 
    virtual size_t uwrite(const void *buf, size_t count)
        { return count; /* basic WvStream doesn't actually do anything! */ }
    
    /**
     * read up to one line of data from the stream and return a pointer
     * to the internal buffer containing this line.  If the end-of-line
     * 'separator' is encountered, it is removed from the string.  If
     * wait_msec times out before the end of line is found, returns NULL and
     * the line may be returned next time, or you can read what we have so
     * far by calling read().
     *
     * If wait_msec < 0, waits forever for a newline (often a bad idea!)
     * If wait_msec=0, never waits.  Otherwise, waits up to wait_msec
     * milliseconds until a newline appears.
     *
     * Readahead specifies the maximum amount of data that the stream is
     * allowed to read in one shot.
     *
     * It is expected that there will be no NULL characters on the line.
     * 
     * If uses_continue_select is true, getline() will use continue_select()
     * rather than select() to wait for its timeout.
     */
    char *getline(time_t wait_msec, char separator = '\n',
		  int readahead = 1024);
    
    /**
     * read up to count characters into buf, up to and including the first
     * instance of separator.
     * 
     * if separator is not found on input before timeout (usual symantics)
     * or stream close or error, or if count is 0, nothing is placed in buf
     * and 0 is returned.
     * 
     * if your buffer is not large enough for line, call multiple times
     * until seperator is found at end of buffer to retrieve the entire
     * line.
     * 
     * Returns the number of characters that were put in buf.
     * 
     * If uses_continue_select is true, getline() will use
     * continue_select() rather than select() to wait for its timeout.
     */
    size_t read_until(void *buf, size_t count, time_t wait_msec,
                      char separator);

    /**
     * force read() to not return any bytes unless 'count' bytes can be
     * read at once.  (Useful for processing Content-Length headers, etc.)
     * Use count==0 to disable this feature.
     * 
     * WARNING: getline() sets queuemin to 0 automatically!
     */ 
    void queuemin(size_t count)
        { queue_min = count; }

    /**
     * drain the input buffer (read and discard data until select(0)
     * returns false)
     */
    void drain();
    
    /**
     * force write() to always buffer output.  This can be more efficient
     * if you write a lot of small segments and want to "coagulate" them
     * automatically.  To flush the output buffer, use flush() or select().
     */ 
    void delay_output(bool is_delayed)
    {
        outbuf_delayed_flush = is_delayed;
        want_to_flush = !is_delayed;
    }

    /**
     * if true, force write() to call flush() each time, the default behavour.
     * otherwise, flush() is granted special meaning when explicitly invoked
     * by the client and write() may empty the output buffer, but will not
     * explicitly flush().
     */
    void auto_flush(bool is_automatic)
        { is_auto_flush = is_automatic; }

    /**
     * flush the output buffer, if we can do it without delaying more than
     * msec_timeout milliseconds at a time.  (-1 means wait forever)
     * 
     * Returns true if the flushing finished (the output buffer is empty).
     */
    virtual bool flush(time_t msec_timeout);

    virtual bool should_flush();

    /**
     * flush the output buffer automatically as select() is called.  If
     * the buffer empties, close the stream.  If msec_timeout seconds pass,
     * close the stream.  After the stream closes, it will become !isok()
     * (and a WvStreamList can delete it automatically)
     */ 
    void flush_then_close(int msec_timeout);
    
    /**
     * pre_select() sets up for eventually calling ::select().
     * It adds the right fds to the read, write, and except lists in the
     * SelectInfo struct.
     * 
     * Returns true if we already know this stream is ready, and there's no
     * need to actually do a real ::select().  Some streams, such as timers,
     * can be implemented by _only_ either returning true or false here after
     * doing a calculation, and never actually adding anything to the
     * SelectInfo.
     * 
     * You can add your stream to any of the lists even if readable,
     * writable, or isexception isn't set.  This is what force_select()
     * does.  You can also choose not to add yourself to the list if you know
     * it would be useless right now.
     * 
     * pre_select() is only called if isok() is true.
     * 
     * pre_select() is allowed to reduce msec_timeout (or change it if it's
     * -1).  However, it's not allowed to _increase_ msec_timeout.
     */ 
    virtual bool pre_select(SelectInfo &si);
    
    /**
     * A more convenient version of pre_select() usable for overriding the
     * 'want' value temporarily.
     */
    bool pre_select(SelectInfo &si, const SelectRequest &r)
    {
	SelectRequest oldwant = si.wants;
	si.wants = r;
	bool val = pre_select(si);
	si.wants = oldwant;
	return val;
    }
    
    /**
     * Like pre_select(), but still exists even if you override the other
     * pre_select() in a subclass.  Sigh.
     */
    bool xpre_select(SelectInfo &si, const SelectRequest &r)
        { return pre_select(si, r); }
    
    /**
     * post_select() is called after ::select(), and returns true if this
     * object is now ready.  Usually this is done by checking for this object
     * in the read, write, and except lists in the SelectInfo structure.  If
     * you want to do it in some other way, you should usually do it in
     * pre_select() instead.
     * 
     * You may also want to do extra maintenance functions here; for example,
     * the standard WvStream::post_select tries to flush outbuf if it's
     * nonempty.  WvTCPConn might retry connect() if it's waiting for a
     * connection to be established.
     */
    virtual bool post_select(SelectInfo &si);

    /**
     * Like post_select(), but still exists even if you override the other
     * post_select() in a subclass.  Sigh.
     */
    bool xpost_select(SelectInfo &si, const SelectRequest &r)
        { return post_select(si, r); }
    
    /**
     * A more convenient version of post_select() usable for overriding the
     * 'want' value temporarily.
     */
    bool post_select(SelectInfo &si, const SelectRequest &r)
    {
	SelectRequest oldwant = si.wants;
	si.wants = r;
	bool val = post_select(si);
	si.wants = oldwant;
	return val;
    }
    
    /**
     * Return true if any of the requested features are true on the stream.
     * If msec_timeout < 0, waits forever (bad idea!).  ==0, does not wait.
     * Otherwise, waits for up to msec_timeout milliseconds.
     * 
     * **NOTE**
     *   select() is _not_ virtual!  To change the select() behaviour
     *   of a stream, override the pre_select() and/or post_select()
     *   functions.
     * 
     * This version of select() sets forceable==true, so force_select
     * options are taken into account.
     * 
     * You almost always use this version of select() with callbacks, like
     * this:  if (stream.select(1000)) stream.callback();
     * 
     * If you want to read/write the stream in question, try using the other
     * variant of select().
     * 
     * DEPRECATED.  Call runonce() instead.
     */
    bool select(time_t msec_timeout)
        { return _select(msec_timeout, false, false, false, true); }
    
    /**
     * Exactly the same as:
     *     if (select(timeout)) callback();
     * 
     * ...except that the above is deprecated, because it assumes callbacks
     * aren't called automatically and that the return value of one-parameter
     * select() is actually meaningful.
     * 
     * Update your main loop to call runonce() instead of the above.
     * 
     * Almost all modern programs should use msec_timeout = -1.
     */
    void runonce(time_t msec_timeout = -1)
        { if (select(msec_timeout)) callback(); }
    
     /**
      * This version of select() sets forceable==false, so we use the exact
      * readable/writable/isexception options provided.
      * 
      * You normally use this variant of select() when deciding whether you
      * should read/write a particular stream.  For example:
      * 
      *     if (stream.select(1000, true, false))
      *             len = stream.read(buf, sizeof(buf));
      * 
      * This variant of select() is probably not what you want with
      * most WvStreamLists, unless you know exactly what you're doing.
      * 
      * WARNING: the difference between the one-parameter and multi-parameter
      * versions of select() is *incredibly* confusing.  Make sure you use the
      * right one!
      * 
      * DEPRECATED.  Call isreadable() or iswritable() instead, if
      * msec_timeout was going to be zero.  Other values of msec_timeout are
      * not really recommended anyway.
      */
    bool select(time_t msec_timeout,
		bool readable, bool writable, bool isex = false)
        { return _select(msec_timeout, readable, writable, isex, false); }
    
    /**
     * Use force_select() to force one or more particular modes (readable,
     * writable, or isexception) to true when selecting on this stream.
     * 
     * If an option is set 'true', we will select on that option when someone
     * does a select().  If it's set 'false', we don't change its force
     * status.  (To de-force something, use undo_force_select().)
     */
    void force_select(bool readable, bool writable, bool isexception = false);
    
    /**
     * Undo a previous force_select() - ie. un-forces the options which
     * are 'true', and leaves the false ones alone.
     */
    void undo_force_select(bool readable, bool writable,
			   bool isexception = false);
    
    /**
     * return to the caller from execute(), but don't really return exactly;
     * this uses WvCont::yield() to return to the caller of callback()
     * without losing our place in execute() itself.  So, next time someone
     * calls callback(), it will be as if continue_select() returned.
     * 
     * NOTE: execute() will won't be called recursively this way, but any
     * other member function might get called, or member variables changed,
     * or the state of the world updated while continue_select() runs.  Don't
     * assume that nothing has changed after a call to continue_select().
     * 
     * NOTE 2: if you're going to call continue_select(), you should set
     * uses_continue_select=true before the first call to callback().
     * Otherwise your WvCont won't get created.
     * 
     * NOTE 3: if msec_timeout >= 0, this uses WvStream::alarm().
     */
    bool continue_select(time_t msec_timeout);
    
    /**
     * you MUST run this from your destructor if you use continue_select(), or
     * very weird things will happen if someone deletes your object while in
     * continue_select().
     */
    void terminate_continue_select();

    /**
     * get the remote address from which the last data block was received.
     * May be NULL.  The pointer becomes invalid upon the next call to read().
     */
    virtual const WvAddr *src() const;
    
    /**
     * define the callback function for this stream, called whenever
     * the callback() member is run, and passed the 'userdata' pointer.
     */
    void setcallback(WvStreamCallback _callfunc, void *_userdata);
        
    /** Sets a callback to be invoked on close().  */
    void setclosecallback(WvStreamCallback _callfunc, void *_userdata);

    /**
     * set the callback function for this stream to an internal routine
     * that auto-forwards all incoming stream data to the given output
     * stream.
     */
    void autoforward(WvStream &s);

    /** Stops autoforwarding. */
    void noautoforward();
    static void autoforward_callback(WvStream &s, void *userdata);
    
    /**
     * A wrapper that's compatible with WvCont, but calls the "real" callback.
     */
    void *_callwrap(void *);
    
    /**
     * Actually call the registered callfunc and execute().
     */
    void _callback();
    
    /**
     * if the stream has a callback function defined, call it now.
     * otherwise call execute().
     */
    virtual void callback();
    
    /**
     * set an alarm, ie. select() will return true after this many ms.
     * The alarm is cleared when callback() is called.
     */
    void alarm(time_t msec_timeout);

    /**
     * return the number of milliseconds remaining before the alarm will go
     * off; -1 means no alarm is set (infinity), 0 means the alarm has
     * been hit and will be cleared by the next callback().
     */
    time_t alarm_remaining();

    /**
     * print a preformatted WvString to the stream.
     * see the simple version of write() way up above.
     */
    size_t write(WvStringParm s)
        { return write(s.cstr(), s.len()); }
    size_t print(WvStringParm s)
        { return write(s); }
    size_t operator() (WvStringParm s)
        { return write(s); }

    /** preformat and write() a string. */
    size_t print(WVSTRING_FORMAT_DECL)
	{ return write(WvString(WVSTRING_FORMAT_CALL)); }
    size_t operator() (WVSTRING_FORMAT_DECL)
        { return write(WvString(WVSTRING_FORMAT_CALL)); }

protected:
    // builds the SelectInfo data structure (runs pre_select)
    // returns true if there are callbacks to be dispatched
    //
    // all of the fields are filled in with new values
    // si.msec_timeout contains the time until the next alarm expires
    bool _build_selectinfo(SelectInfo &si, time_t msec_timeout,
        bool readable, bool writable, bool isexcept,
        bool forceable);

    // runs the actual select() function over the given
    // SelectInfo data structure, returns the number of descriptors
    // in the set, and sets the error code if a problem occurs
    int _do_select(SelectInfo &si);

    // processes the SelectInfo data structure (runs post_select)
    // returns true if there are callbacks to be dispatched
    bool _process_selectinfo(SelectInfo &si, bool forceable);

    // tries to empty the output buffer if the stream is writable
    // not quite the same as flush() since it merely empties the output
    // buffer asynchronously whereas flush() might have other semantics
    // also handles autoclose (eg. after flush)
    bool flush_outbuf(time_t msec_timeout);

    // called once flush() has emptied outbuf to ensure that any other
    // internal stream buffers actually do get flushed before it returns
    virtual bool flush_internal(time_t msec_timeout);
    
    // the real implementations for these are actually in WvFDStream, which
    // is where they belong.  By IWvStream needs them to exist for now, so
    // it's a hack.  In standard WvStream they return -1.
    virtual int getrfd() const;
    virtual int getwfd() const;
    
private:
    /** The function that does the actual work of select(). */
    bool _select(time_t msec_timeout,
		 bool readable, bool writable, bool isexcept,
		 bool forceable);


protected:
    WvDynBuf inbuf, outbuf;
    WvStreamCallback callfunc, closecb_func;
    WvCallback<void*,void*> call_ctx;
    void *userdata;
    void *closecb_data;
    size_t max_outbuf_size;
    bool outbuf_delayed_flush;
    bool is_auto_flush;

    // Used to guard against excessive flushing when using delay_flush
    bool want_to_flush;

    // Used to ensure we don't flush recursively.
    bool is_flushing;

    size_t queue_min;		// minimum bytes to read()
    time_t autoclose_time;	// close eventually, even if output is queued
    WvTime alarm_time;          // select() returns true at this time
    WvTime last_alarm_check;    // last time we checked the alarm_remaining
    bool wvstream_execute_called;
    
    /** Prevent accidental copying of WvStreams. */
    WvStream(const WvStream &s) { }
    WvStream& operator= (const WvStream &s) { return *this; }

    /**
     * The callback() function calls execute(), and then calls the user-
     * specified callback if one is defined.  Do not call execute() directly;
     * call callback() instead.
     * 
     * The default execute() function does nothing.
     * 
     * Note: If you override this function in a derived class, you must
     * call the parent execute() yourself from the derived class.
     */
    virtual void execute();
    
    // every call to select() selects on the globalstream.
    static WvStream *globalstream;
};

/**
 * Console streams...
 *
 * This can be reassigned while the program is running, if desired,
 * but MUST NOT be NULL.
 */
extern WvStream *wvcon; // tied stdin and stdout stream
extern WvStream *wvin;  // stdin stream
extern WvStream *wvout; // stdout stream
extern WvStream *wverr; // stderr stream

#endif // __WVSTREAM_H