1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382
|
/*++
/* NAME
/* nbbio 3
/* SUMMARY
/* non-blocking buffered I/O
/* SYNOPSIS
/* #include <nbbio.h>
/*
/* NBBIO *nbbio_create(fd, bufsize, label, action, context)
/* int fd;
/* ssize_t bufsize;
/* const char *label;
/* void (*action)(int event, void *context);
/* char *context;
/*
/* void nbbio_free(np)
/* NBBIO *np;
/*
/* void nbbio_enable_read(np, timeout)
/* NBBIO *np;
/* int timeout;
/*
/* void nbbio_enable_write(np, timeout)
/* NBBIO *np;
/* int timeout;
/*
/* void nbbio_disable_readwrite(np)
/* NBBIO *np;
/*
/* void nbbio_slumber(np, timeout)
/* NBBIO *np;
/* int timeout;
/*
/* int NBBIO_ACTIVE_FLAGS(np)
/* NBBIO *np;
/*
/* int NBBIO_ERROR_FLAGS(np)
/* NBBIO *np;
/*
/* const ssize_t NBBIO_BUFSIZE(np)
/* NBBIO *np;
/*
/* ssize_t NBBIO_READ_PEND(np)
/* NBBIO *np;
/*
/* char *NBBIO_READ_BUF(np)
/* NBBIO *np;
/*
/* const ssize_t NBBIO_WRITE_PEND(np)
/* NBBIO *np;
/*
/* char *NBBIO_WRITE_BUF(np)
/* NBBIO *np;
/* DESCRIPTION
/* This module implements low-level support for event-driven
/* I/O on a full-duplex stream. Read/write events are handled
/* by pseudothreads that run under control by the events(5)
/* module. After each I/O operation, the application is
/* notified via a call-back routine.
/*
/* It is up to the call-back routine to turn on/off read/write
/* events as appropriate. It is an error to leave read events
/* enabled for a buffer that is full, or to leave write events
/* enabled for a buffer that is empty.
/*
/* nbbio_create() creates a pair of buffers of the named size
/* for the named stream. The label specifies the purpose of
/* the stream, and is used for diagnostic messages. The
/* nbbio(3) event handler invokes the application call-back
/* routine with the current event type (EVENT_READ etc.) and
/* with the application-specified context.
/*
/* nbbio_free() terminates any pseudothreads associated with
/* the named buffer pair, closes the stream, and destroys the
/* buffer pair.
/*
/* nbbio_enable_read() enables a read pseudothread (if one
/* does not already exist) for the named buffer pair, and
/* (re)starts the buffer pair's timer. It is an error to enable
/* a read pseudothread while the read buffer is full, or while
/* a write pseudothread is still enabled.
/*
/* nbbio_enable_write() enables a write pseudothread (if one
/* does not already exist) for the named buffer pair, and
/* (re)starts the buffer pair's timer. It is an error to enable
/* a write pseudothread while the write buffer is empty, or
/* while a read pseudothread is still enabled.
/*
/* nbbio_disable_readwrite() disables any read/write pseudothreads
/* for the named buffer pair, including timeouts. To ensure
/* buffer liveness, use nbbio_slumber() instead of
/* nbbio_disable_readwrite(). It is no error to call this
/* function while no read/write pseudothread is enabled.
/*
/* nbbio_slumber() disables any read/write pseudothreads for
/* the named buffer pair, but keeps the timer active to ensure
/* buffer liveness. It is no error to call this function while
/* no read/write pseudothread is enabled.
/*
/* NBBIO_ERROR_FLAGS() returns the error flags for the named buffer
/* pair: zero or more of NBBIO_FLAG_EOF (read EOF), NBBIO_FLAG_ERROR
/* (read/write error) or NBBIO_FLAG_TIMEOUT (time limit
/* exceeded).
/*
/* NBBIO_ACTIVE_FLAGS() returns the pseudothread flags for the
/* named buffer pair: NBBIO_FLAG_READ (read pseudothread is
/* active), NBBIO_FLAG_WRITE (write pseudothread is active),
/* or zero (no pseudothread is active).
/*
/* NBBIO_WRITE_PEND() and NBBIO_WRITE_BUF() evaluate to the
/* number of to-be-written bytes and the write buffer for the
/* named buffer pair. NBBIO_WRITE_PEND() must be updated by
/* the application code that fills the write buffer; no more
/* than NBBIO_BUFSIZE() bytes may be filled.
/*
/* NBBIO_READ_PEND() and NBBIO_READ_BUF() evaluate to the
/* number of unread bytes and the read buffer for the named
/* buffer pair. NBBIO_READ_PEND() and NBBIO_READ_BUF() must
/* be updated by the application code that drains the read
/* buffer.
/* SEE ALSO
/* events(3) event manager
/* DIAGNOSTICS
/* Panic: interface violation.
/*
/* Fatal: out of memory.
/* LICENSE
/* .ad
/* .fi
/* The Secure Mailer license must be distributed with this software.
/* AUTHOR(S)
/* Wietse Venema
/* IBM T.J. Watson Research
/* P.O. Box 704
/* Yorktown Heights, NY 10598, USA
/*
/* Wietse Venema
/* Google, Inc.
/* 111 8th Avenue
/* New York, NY 10011, USA
/*--*/
/*
* System library.
*/
#include <sys_defs.h>
#include <unistd.h>
#include <errno.h>
#include <string.h> /* memmove() */
/*
* Utility library.
*/
#include <mymalloc.h>
#include <msg.h>
#include <events.h>
#include <nbbio.h>
/* nbbio_event - non-blocking event handler */
static void nbbio_event(int event, void *context)
{
const char *myname = "nbbio_event";
NBBIO *np = (NBBIO *) context;
ssize_t count;
switch (event) {
/*
* Read data into the read buffer. Leave it up to the application to
* drain the buffer until it is empty.
*/
case EVENT_READ:
if (np->read_pend == np->bufsize)
msg_panic("%s: socket fd=%d: read buffer is full",
myname, np->fd);
if (np->read_pend < 0 || np->read_pend > np->bufsize)
msg_panic("%s: socket fd=%d: bad pending read count %ld",
myname, np->fd, (long) np->read_pend);
count = read(np->fd, np->read_buf + np->read_pend,
np->bufsize - np->read_pend);
if (count > 0) {
np->read_pend += count;
if (msg_verbose)
msg_info("%s: read %ld on %s fd=%d",
myname, (long) count, np->label, np->fd);
} else if (count == 0) {
np->flags |= NBBIO_FLAG_EOF;
if (msg_verbose)
msg_info("%s: read EOF on %s fd=%d",
myname, np->label, np->fd);
} else {
if (errno == EAGAIN)
msg_warn("%s: read() returns EAGAIN on readable descriptor",
myname);
np->flags |= NBBIO_FLAG_ERROR;
if (msg_verbose)
msg_info("%s: read %s fd=%d: %m", myname, np->label, np->fd);
}
break;
/*
* Drain data from the output buffer. Notify the application
* whenever some bytes are written.
*
* XXX Enforce a total time limit to ensure liveness when a hostile
* receiver sets a very small TCP window size.
*/
case EVENT_WRITE:
if (np->write_pend == 0)
msg_panic("%s: socket fd=%d: empty write buffer", myname, np->fd);
if (np->write_pend < 0 || np->write_pend > np->bufsize)
msg_panic("%s: socket fd=%d: bad pending write count %ld",
myname, np->fd, (long) np->write_pend);
count = write(np->fd, np->write_buf, np->write_pend);
if (count > 0) {
np->write_pend -= count;
if (np->write_pend > 0)
memmove(np->write_buf, np->write_buf + count, np->write_pend);
} else {
if (errno == EAGAIN)
msg_warn("%s: write() returns EAGAIN on writable descriptor",
myname);
np->flags |= NBBIO_FLAG_ERROR;
if (msg_verbose)
msg_info("%s: write %s fd=%d: %m", myname, np->label, np->fd);
}
break;
/*
* Something bad happened.
*/
case EVENT_XCPT:
np->flags |= NBBIO_FLAG_ERROR;
if (msg_verbose)
msg_info("%s: error on %s fd=%d: %m", myname, np->label, np->fd);
break;
/*
* Something good didn't happen.
*/
case EVENT_TIME:
np->flags |= NBBIO_FLAG_TIMEOUT;
if (msg_verbose)
msg_info("%s: %s timeout on %s fd=%d",
myname, NBBIO_OP_NAME(np), np->label, np->fd);
break;
default:
msg_panic("%s: unknown event %d", myname, event);
}
/*
* Application notification. The application will check for any error
* flags, copy application data from or to our buffer pair, and decide
* what I/O happens next.
*/
np->action(event, np->context);
}
/* nbbio_enable_read - enable reading from socket into buffer */
void nbbio_enable_read(NBBIO *np, int timeout)
{
const char *myname = "nbbio_enable_read";
/*
* Sanity checks.
*/
if (np->flags & (NBBIO_MASK_ACTIVE & ~NBBIO_FLAG_READ))
msg_panic("%s: socket fd=%d is enabled for %s",
myname, np->fd, NBBIO_OP_NAME(np));
if (timeout <= 0)
msg_panic("%s: socket fd=%d: bad timeout %d",
myname, np->fd, timeout);
if (np->read_pend >= np->bufsize)
msg_panic("%s: socket fd=%d: read buffer is full",
myname, np->fd);
/*
* Enable events.
*/
if ((np->flags & NBBIO_FLAG_READ) == 0) {
event_enable_read(np->fd, nbbio_event, (void *) np);
np->flags |= NBBIO_FLAG_READ;
}
event_request_timer(nbbio_event, (void *) np, timeout);
}
/* nbbio_enable_write - enable writing from buffer to socket */
void nbbio_enable_write(NBBIO *np, int timeout)
{
const char *myname = "nbbio_enable_write";
/*
* Sanity checks.
*/
if (np->flags & (NBBIO_MASK_ACTIVE & ~NBBIO_FLAG_WRITE))
msg_panic("%s: socket fd=%d is enabled for %s",
myname, np->fd, NBBIO_OP_NAME(np));
if (timeout <= 0)
msg_panic("%s: socket fd=%d: bad timeout %d",
myname, np->fd, timeout);
if (np->write_pend <= 0)
msg_panic("%s: socket fd=%d: empty write buffer",
myname, np->fd);
/*
* Enable events.
*/
if ((np->flags & NBBIO_FLAG_WRITE) == 0) {
event_enable_write(np->fd, nbbio_event, (void *) np);
np->flags |= NBBIO_FLAG_WRITE;
}
event_request_timer(nbbio_event, (void *) np, timeout);
}
/* nbbio_disable_readwrite - disable read/write/timer events */
void nbbio_disable_readwrite(NBBIO *np)
{
np->flags &= ~NBBIO_MASK_ACTIVE;
event_disable_readwrite(np->fd);
event_cancel_timer(nbbio_event, (void *) np);
}
/* nbbio_slumber - disable read/write events, keep timer */
void nbbio_slumber(NBBIO *np, int timeout)
{
np->flags &= ~NBBIO_MASK_ACTIVE;
event_disable_readwrite(np->fd);
event_request_timer(nbbio_event, (void *) np, timeout);
}
/* nbbio_create - create socket buffer */
NBBIO *nbbio_create(int fd, ssize_t bufsize, const char *label,
NBBIO_ACTION action, void *context)
{
NBBIO *np;
/*
* Sanity checks.
*/
if (fd < 0)
msg_panic("nbbio_create: bad file descriptor: %d", fd);
if (bufsize <= 0)
msg_panic("nbbio_create: bad buffer size: %ld", (long) bufsize);
/*
* Create a new buffer pair.
*/
np = (NBBIO *) mymalloc(sizeof(*np));
np->fd = fd;
np->bufsize = bufsize;
np->label = mystrdup(label);
np->action = action;
np->context = context;
np->flags = 0;
np->read_buf = mymalloc(bufsize);
np->read_pend = 0;
np->write_buf = mymalloc(bufsize);
np->write_pend = 0;
return (np);
}
/* nbbio_free - destroy socket buffer */
void nbbio_free(NBBIO *np)
{
nbbio_disable_readwrite(np);
(void) close(np->fd);
myfree(np->label);
myfree(np->read_buf);
myfree(np->write_buf);
myfree((void *) np);
}
|