File: SoapyStreamEndpoint.hpp

package info (click to toggle)
soapyremote 0.4.3-1
  • links: PTS, VCS
  • area: main
  • in suites: bullseye, buster, sid
  • size: 504 kB
  • sloc: cpp: 6,740; makefile: 13
file content (161 lines) | stat: -rw-r--r-- 4,204 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
// Copyright (c) 2015-2016 Josh Blum
// SPDX-License-Identifier: BSL-1.0

#pragma once
#include "SoapyRemoteConfig.hpp"
#include <cstddef>
#include <vector>

class SoapyRPCSocket;

/*!
 * The stream endpoint supports a windowed link datagram protocol.
 * This endpoint can be operated in only one mode: receive or send,
 * and must be paired with another differently configured endpoint.
 */
class SOAPY_REMOTE_API SoapyStreamEndpoint
{
public:
    SoapyStreamEndpoint(
        SoapyRPCSocket &streamSock,
        SoapyRPCSocket &statusSock,
        const bool datagramMode,
        const bool isRecv,
        const size_t numChans,
        const size_t elemSize,
        const size_t mtu,
        const size_t window);

    ~SoapyStreamEndpoint(void);

    //! How many channels configured
    size_t getNumChans(void) const
    {
        return _numChans;
    }

    //! Element size in bytes
    size_t getElemSize(void) const
    {
        return _elemSize;
    }

    //! Actual buffer size in elements
    size_t getBuffSize(void) const
    {
        return _buffSize;
    }

    //! Actual number of buffers
    size_t getNumBuffs(void) const
    {
        return _numBuffs;
    }

    //! Query handle addresses
    void getAddrs(const size_t handle, void **buffs) const
    {
        for (size_t i = 0; i < _numChans; i++)
        {
            buffs[i] = _buffData[handle].buffs[i];
        }
    }

    /*******************************************************************
     * receive endpoint API
     ******************************************************************/

    /*!
     * Wait for a datagram to arrive at the socket
     * return true when ready for false for timeout.
     */
    bool waitRecv(const long timeoutUs);

    /*!
     * Acquire a receive buffer with metadata.
     * return the number of elements or error code
     */
    int acquireRecv(size_t &handle, const void **buffs, int &flags, long long &timeNs);

    /*!
     * Release the buffer when done.
     */
    void releaseRecv(const size_t handle);

    /*******************************************************************
     * send endpoint API
     ******************************************************************/

    /*!
     * Wait for the flow control to allow transmission.
     * return true when ready for false for timeout.
     */
    bool waitSend(const long timeoutUs);

    /*!
     * Acquire a receive buffer with metadata.
     */
    int acquireSend(size_t &handle, void **buffs);

    /*!
     * Release the buffer when done.
     * pass in the number of elements or error code
     */
    void releaseSend(const size_t handle, const int numElemsOrErr, int &flags, const long long timeNs);

    /*******************************************************************
     * status endpoint API -- used by both directions
     ******************************************************************/

    /*!
     * Wait for a status message to arrive
     */
    bool waitStatus(const long timeoutUs);

    /*!
     * Read the stream status data.
     * Return 0 or error code.
     */
    int readStatus(size_t &chanMask, int &flags, long long &timeNs);

    /*!
     * Write the stream status from the forwarder.
     */
    void writeStatus(const int code, const size_t chanMask, const int flags, const long long timeNs);

private:
    SoapyRPCSocket &_streamSock;
    SoapyRPCSocket &_statusSock;
    const bool _datagramMode;
    const size_t _xferSize;
    const size_t _numChans;
    const size_t _elemSize;
    const size_t _buffSize;
    const size_t _numBuffs;

    struct BufferData
    {
        std::vector<char> buff; //actual POD
        std::vector<void *> buffs; //pointers
        bool acquired;
    };
    std::vector<BufferData> _buffData;

    //acquire+release tracking
    size_t _nextHandleAcquire;
    size_t _nextHandleRelease;
    size_t _numHandlesAcquired;

    //sequence tracking
    size_t _lastSendSequence;
    size_t _lastRecvSequence;
    size_t _maxInFlightSeqs;
    bool _receiveInitial;

    //how often to send a flow control ACK? (recv only)
    size_t _triggerAckWindow;

    //flow control helpers
    void sendACK(void);
    void recvACK(void);
};