File: infinibuf.h

package info (click to toggle)
muchsync 6-1
  • links: PTS, VCS
  • area: main
  • in suites: bullseye, sid
  • size: 580 kB
  • sloc: cpp: 3,860; sh: 982; makefile: 17
file content (323 lines) | stat: -rw-r--r-- 10,764 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
// -*- C++ -*-

#ifndef _INFINIBUF_H_
#define _INFINIBUF_H_ 1

/** \file infinibuf.h
 *  \brief iostreams-friendly buffers that can grow without bounds.
 */

#include <condition_variable>
#include <list>
#include <memory>
#include <thread>

/**
 * \brief Abstract buffer-management class for unbounded buffers.
 *
 * A derived class must at a minimum override either `notempty()` (for
 * output buffers) or `gwait()` (for input buffers).
 *
 * Most methods are not thread-safe.
 */
class infinibuf {
protected:
  static constexpr int default_startpos_ = 8;
  static constexpr int chunksize_ = 0x10000;

  std::list<char *> data_;
  int gpos_;
  int ppos_;
  bool eof_{false};
  int errno_{0};
  const int startpos_;		// For putback

  /** Called to signal when the buffer transitions from empty to
   *  non-empty. */
  virtual void notempty() {}

  /** Called when sufficient bytes are consumed to free some memory. */
  virtual void notfull() {}

public:
  explicit infinibuf(int sp = default_startpos_)
    : gpos_(sp), ppos_(sp), startpos_(sp) {
    data_.push_back(new char[chunksize_]);
  }
  infinibuf(const infinibuf &) = delete;
  virtual ~infinibuf() = 0;
  infinibuf &operator= (const infinibuf &) = delete;
		   
  // These functions are never thread safe:

  bool empty() { return data_.front() == data_.back() && gpos_ == ppos_; }
  bool eof() { return eof_; }
  std::size_t buffer_size() { return data_.size() * chunksize_; }
  int err() { return errno_; }
  void err(int num) { if (!errno_) errno_ = num; peof(); }

  char *eback() { return data_.front(); }
  char *gptr() { return eback() + gpos_; }
  int gsize() {
    return (data_.front() == data_.back() ? ppos_ : chunksize_) - gpos_;
  }
  char *egptr() { return gptr() + gsize(); }
  void gbump(int n);
  /** Called to wait for the buffer to be non-empty. */
  virtual void gwait() {}

  char *pbase() { return data_.back(); }
  char *pptr() { return pbase() + ppos_; }
  int psize() { return chunksize_ - ppos_; }
  char *epptr() { return pptr() + psize(); }
  void pbump(int n);
  void peof() { eof_ = true; if (empty()) notempty(); }
  /** Called to sleep if the buffer is too full. */
  virtual void pwait() {}

  // These functions are thread safe for some subtypes:

  /** By default `lock()` and `unlock()` do nothing, but threadsafe
   *  derived classes must override these functions. */
  virtual void lock() {}
  /** See comment at lock. */
  virtual void unlock() {}

  /** \brief Drain the current contents of the buffer.
   *
   * This function is thread safe and must be called *without* locking
   * the `infinibuf`.  If the `infinibuf` is already locked, deadlock
   * will ensue.
   *
   * \param fd The file descriptor to write to.
   * \return 0 at EOF if there is no point in ever calling `output`
   * again, -1 after EAGAIN, and 1 after successful output.
   * \throws runtime_error if the `write` system call fails and
   * `errno` is not `EAGAIN`. */
  int output(int fd);

  /** Fill the buffer from a file descriptor.
   *
   * This function is thread safe and must be called *without* locking
   * the `infinibuf`.
   *
   * \param fd The file descriptor to read from.
   * \return 0 at EOF if there is no point in ever calling
   * `input` again, 1 after successful input, and -1 after EAGAIN.
   * \throws runtime_error if the `read` system call fails and
   * `errno` is not `EAGAIN`. */
  int input(int fd);

  /** Calls `output` over and over in a loop on an `infinibuf`.
   *
   * \param ib The `infinibuf` on which to call `output`.
   *
   * \param fd The file descriptor to which to write consumed data.
   *
   * \param oblocked If non-null is called with `true` whenever the
   * output is blocked by flow control, and then called again with
   * `false` when the output becomes unblocked.
   */
  static void output_loop(std::shared_ptr<infinibuf> ib, int fd,
			  std::function<void(bool)> oblocked = nullptr);
  static void input_loop(std::shared_ptr<infinibuf> ib, int fd);
};

/** \brief An `infinibuf` that synchronously reads from a file
 *  descriptor when the buffer underflows.
 *
 *  Closes the file descriptor upon destruction. */
class infinibuf_infd : public infinibuf {
  const int fd_;
public:
  explicit infinibuf_infd (int fd, int sp = default_startpos_)
    : infinibuf(sp), fd_(fd) {}
  ~infinibuf_infd();
  void gwait() override { input(fd_); }
};

/** \brief An `infinibuf` that synchronously writes to a file
 *  descriptor when the buffer overflows or is synced.
 *
 *  Closes the file descriptor upon destruction. */
class infinibuf_outfd : public infinibuf {
  const int fd_;
  std::function<void(bool)> oblocked_;
  
public:
  explicit infinibuf_outfd (int fd,
			    std::function<void(bool)> oblocked = nullptr);
  ~infinibuf_outfd();
  void notempty() override;
};

/** \brief Thread-safe infinibuf.
 *
 * This infinibuf can safely be used in an `iostream` by one thread,
 * while a different thread fills or drains the buffer (for instance
 * executing `infinibuf::output_loop` or `infinibuf::input_loop`).
 */
class infinibuf_mt : public infinibuf {
  std::mutex m_;
  std::condition_variable cv_;
  std::condition_variable flow_ctrl_cv_;
  std::size_t max_buf_size_{0};
public:
  explicit infinibuf_mt (int sp = default_startpos_) : infinibuf(sp) {}
  void lock() override { m_.lock(); }
  void unlock() override { m_.unlock(); }
  void notempty() override { cv_.notify_all(); }
  void notfull() override { flow_ctrl_cv_.notify_all(); }
  void set_max_buf_size(std::size_t val) {
    std::lock_guard<infinibuf> _lk(*this);
    if (!val || val > max_buf_size_)
      notfull();
    max_buf_size_ = val;
  }
  void gwait() override {
    if (empty() && !eof()) {
      std::unique_lock<std::mutex> ul (m_, std::adopt_lock);
      while (empty() && !eof())
	cv_.wait(ul);
      ul.release();
    }
  }
  void pwait() override {
    if (max_buf_size_ && buffer_size() > max_buf_size_) {
      if (max_buf_size_ && buffer_size() > max_buf_size_) {
	std::unique_lock<std::mutex> ul (m_, std::adopt_lock);
	flow_ctrl_cv_.wait(ul);
	ul.release();
      }
    }
  }
};

/** \brief `infinibuf`-based `streambuf`.
 *
 * This streambuf can make use of any buffer type derived from
 * `infinibuf`.  The `infinibuf` is always converted to a
 * `shared_ptr`, even if it is passed in as a raw `infinibuf*`.
 */
class infinistreambuf : public std::streambuf {
protected:
  std::shared_ptr<infinibuf> ib_;
  int_type underflow() override;
  int_type overflow(int_type ch) override;
  int sync() override;
public:
  explicit infinistreambuf(std::shared_ptr<infinibuf> ib);
  explicit infinistreambuf(infinibuf *ib)
    : infinistreambuf(std::shared_ptr<infinibuf>(ib)) {}
  infinistreambuf(infinistreambuf &&isb)
    : infinistreambuf(isb.ib_) {}
  std::shared_ptr<infinibuf> get_infinibuf() { return ib_; }
  void sputeof();
};

class ifdstream : public std::istream {
  infinistreambuf isb_;
public:
  ifdstream(int fd)
    : std::istream (nullptr), isb_ (new infinibuf_infd(fd)) {
    init(&isb_);
  }
  ~ifdstream() {
    std::lock_guard<infinibuf> _lk (*isb_.get_infinibuf());
    isb_.get_infinibuf()->err(EPIPE);
  }
};

class ofdstream : public std::ostream {
  infinistreambuf isb_;
public:
  ofdstream(int fd, std::function<void(bool)> oblocked = nullptr)
    : std::ostream (nullptr), isb_(new infinibuf_outfd(fd, oblocked)) {
    init(&isb_);
  }
  ~ofdstream() {
    if (std::uncaught_exception())
      try { isb_.sputeof(); } catch(...) {}
    else
      isb_.sputeof();
  }
};

/** \brief std::istream from file descriptor with unbounded buffer.
 *
 * Continously reads from and buffers input from a file descriptor in
 * another thread.  Closes the file descriptor after receiving EOF.
 * Kill the input thread if any further input is received, but the
 * input thread could get stuck if no input and no EOF happens.
 * Maximum buffer size defaults to infinity but can be adjusted with
 * `ifdinfinistream::set_max_buf_size`.
 */
class ifdinfinistream : public std::istream {
  std::shared_ptr<infinibuf_mt> ib_ { new infinibuf_mt() };
  infinistreambuf isb_ { ib_ };
public:
  explicit ifdinfinistream (int fd, std::size_t size = 0)
    : std::istream (nullptr) {
    set_max_buf_size(size);
    std::thread t (infinibuf::input_loop, isb_.get_infinibuf(), fd);
    t.detach();
    init(&isb_);
  }
  /** Sets maximum buffer size, above which it will stop reading from
   * the file descriptor until more is consumed locally.
   *
   * A value of 0 means no maximum buffer size.  */
  void set_max_buf_size(std::size_t size) { ib_->set_max_buf_size(size); }
  ~ifdinfinistream() {
    std::lock_guard<infinibuf> _lk (*isb_.get_infinibuf());
    // Sadly, there appears to be no portable way of waking up the
    // thread waiting in read.  I tried using dup2 to replace the file
    // descriptor with /dev/null, or using fcntl to set the O_NONBLOCK
    // flag after the read has already started, and neither works on
    // linux.  What does work is setting an empty function (not
    // SIG_IGN) as the signal handler on SIGCONT, then setting
    // O_NONBLOCK on the file descriptor, and finally calling
    // pthread_kill(t.native_handle(), SIGCONT)--but that could have
    // unintended consequences on other parts of the program following
    // a Ctrl-Z.  The only truly clean solution is to use a
    // "self-pipe" to wake up a poll call, thereby using three file
    // descriptors for the job of one (yuck).  Since we don't really
    // need to clean up the file descriptor, I'm not going to add the
    // complexity and cost of polling a second "self-pipe" file
    // descriptor or dropping down to native_handle.
    isb_.get_infinibuf()->err(EPIPE);
  }
};

#if 0
/** \brief `ostream` from file descriptor with unbounded buffer.
 *
 * Buffers unbounded amounts of data which are drained to a file
 * descriptor in another thread.  The file descriptor is closed when
 * the draining thread exits.  The class destructor waits for the
 * writer thread to flush the buffer and exit.
 */
class ofdinfinistream : public std::ostream {
  infinistreambuf isb_ { new infinibuf_mt(0) };
  std::thread t_;
public:
  ofdinfinistream (int fd) {
    std::thread t (infinibuf::output_loop, isb_.get_infinibuf(), fd, nullptr);
    t_ = std::move(t);
    rdbuf(&isb_);
  }
  // Doesn't work because std::ostream's virtual destructor is noexcept.
  ~ofdinfinistream() noexcept(false) {
    isb_.sputeof();
    if (!std::uncaught_exception()) {
      t_.join();
      std::lock_guard<infinibuf> lk (*isb_.get_infinibuf());
      if (isb_.get_infinibuf()->err())
	throw std::runtime_error (std::string("~ofdinfinistream: ") +
				  strerror(isb_.get_infinibuf()->err()));
    }
  }
};
#endif

#endif /* !_INFINIBUF_H_ */