1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110
|
/*
* rocks/flight.c
*
* In-flight buffering and recovery.
*
* Copyright (C) 2001 Victor Zandy
* See COPYING for distribution terms.
*/
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <assert.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include "rs.h"
#include "ring.h"
#include "log.h"
/* Find the upper limits on the amount of in-flight data in the send
and receive directions on socket SD, and store them in MAXSND and
MAXRCV. A call to this must be synchronized with another call by
the peer. Return 0 on success, -1 on failure. */
int
rs_inflight_limits(int sd, unsigned *maxsnd, unsigned *maxrcv)
{
socklen_t optlen;
int ret;
unsigned peer_snd, peer_rcv;
unsigned locl_snd, locl_rcv;
unsigned x;
/* Determine our buffer sizes */
optlen = sizeof(locl_snd);
if (0 > getsockopt(sd, SOL_SOCKET, SO_SNDBUF,
(void*) &locl_snd, &optlen)) {
return -1;
}
optlen = sizeof(locl_rcv);
if (0 > getsockopt(sd, SOL_SOCKET, SO_RCVBUF,
(void*) &locl_rcv, &optlen)) {
return -1;
}
/* Tell peer our buffer sizes */
x = htonl(locl_snd);
ret = rs_xwrite(sd, &x, sizeof(x));
if (0 > ret) {
return -1;
}
x = htonl(locl_rcv);
ret = rs_xwrite(sd, &x, sizeof(x));
if (0 > ret) {
return -1;
}
/* Read buffer sizes of peer */
ret = rs_xread(sd, &peer_snd, sizeof(peer_snd), 0);
if (0 > ret) {
return -1;
}
ret = rs_xread(sd, &peer_rcv, sizeof(peer_rcv), 0);
if (0 > ret) {
return -1;
}
*maxsnd = locl_snd + ntohl(peer_rcv);
*maxrcv = locl_rcv + ntohl(peer_snd);
return 0;
}
int rs_inflight_recover(int sd, ring_t ring,
unsigned long rcvseq, unsigned long sndseq,
unsigned *maxsnd, unsigned *maxrcv)
{
unsigned long rseq; /* peer's recv sequence number */
unsigned long nbytes;
unsigned new_maxsnd, new_maxrcv;
/* Exchange sequence numbers */
rcvseq = htonl(rcvseq);
if (0 > rs_xwrite(sd, &rcvseq, sizeof(rcvseq)))
return -1;
if (0 > rs_xread(sd, &rseq, sizeof(rseq), 0))
return -1;
rseq = ntohl(rseq);
/* Discard bytes the receiver has consumed */
rs_set_ring_seq(ring, rseq);
/* Get new buffer sizes (which currently must be the same) */
if (0 > rs_inflight_limits(sd, &new_maxsnd, &new_maxrcv))
return -1;
assert(new_maxsnd == *maxsnd);
assert(new_maxrcv == *maxrcv);
nbytes = rs_ring_nbytes(ring);
if (!nbytes)
return 0; /* Nothing to resend */
/* This write may block as the data is transferred to peer,
but it should be bounded, as we know there's enough room in
our combined TCP buffers to hold it all. */
assert(nbytes <= *maxsnd); /* Otherwise write might block
indefinitely. */
if (0 > rs_xwrite(sd, rs_ring_data(ring), nbytes))
return -1;
return 0;
}
|