1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161
|
// Copyright (c) 2015-2016 Josh Blum
// SPDX-License-Identifier: BSL-1.0
#pragma once
#include "SoapyRemoteConfig.hpp"
#include <cstddef>
#include <vector>
class SoapyRPCSocket;
/*!
* The stream endpoint supports a windowed link datagram protocol.
* This endpoint can be operated in only one mode: receive or send,
* and must be paired with another differently configured endpoint.
*/
class SOAPY_REMOTE_API SoapyStreamEndpoint
{
public:
SoapyStreamEndpoint(
SoapyRPCSocket &streamSock,
SoapyRPCSocket &statusSock,
const bool datagramMode,
const bool isRecv,
const size_t numChans,
const size_t elemSize,
const size_t mtu,
const size_t window);
~SoapyStreamEndpoint(void);
//! How many channels configured
size_t getNumChans(void) const
{
return _numChans;
}
//! Element size in bytes
size_t getElemSize(void) const
{
return _elemSize;
}
//! Actual buffer size in elements
size_t getBuffSize(void) const
{
return _buffSize;
}
//! Actual number of buffers
size_t getNumBuffs(void) const
{
return _numBuffs;
}
//! Query handle addresses
void getAddrs(const size_t handle, void **buffs) const
{
for (size_t i = 0; i < _numChans; i++)
{
buffs[i] = _buffData[handle].buffs[i];
}
}
/*******************************************************************
* receive endpoint API
******************************************************************/
/*!
* Wait for a datagram to arrive at the socket
* return true when ready for false for timeout.
*/
bool waitRecv(const long timeoutUs);
/*!
* Acquire a receive buffer with metadata.
* return the number of elements or error code
*/
int acquireRecv(size_t &handle, const void **buffs, int &flags, long long &timeNs);
/*!
* Release the buffer when done.
*/
void releaseRecv(const size_t handle);
/*******************************************************************
* send endpoint API
******************************************************************/
/*!
* Wait for the flow control to allow transmission.
* return true when ready for false for timeout.
*/
bool waitSend(const long timeoutUs);
/*!
* Acquire a receive buffer with metadata.
*/
int acquireSend(size_t &handle, void **buffs);
/*!
* Release the buffer when done.
* pass in the number of elements or error code
*/
void releaseSend(const size_t handle, const int numElemsOrErr, int &flags, const long long timeNs);
/*******************************************************************
* status endpoint API -- used by both directions
******************************************************************/
/*!
* Wait for a status message to arrive
*/
bool waitStatus(const long timeoutUs);
/*!
* Read the stream status data.
* Return 0 or error code.
*/
int readStatus(size_t &chanMask, int &flags, long long &timeNs);
/*!
* Write the stream status from the forwarder.
*/
void writeStatus(const int code, const size_t chanMask, const int flags, const long long timeNs);
private:
SoapyRPCSocket &_streamSock;
SoapyRPCSocket &_statusSock;
const bool _datagramMode;
const size_t _xferSize;
const size_t _numChans;
const size_t _elemSize;
const size_t _buffSize;
const size_t _numBuffs;
struct BufferData
{
std::vector<char> buff; //actual POD
std::vector<void *> buffs; //pointers
bool acquired;
};
std::vector<BufferData> _buffData;
//acquire+release tracking
size_t _nextHandleAcquire;
size_t _nextHandleRelease;
size_t _numHandlesAcquired;
//sequence tracking
size_t _lastSendSequence;
size_t _lastRecvSequence;
size_t _maxInFlightSeqs;
bool _receiveInitial;
//how often to send a flow control ACK? (recv only)
size_t _triggerAckWindow;
//flow control helpers
void sendACK(void);
void recvACK(void);
};
|