1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121
|
//
// Copyright 2011,2014 Ettus Research LLC
// Copyright 2018 Ettus Research, a National Instruments Company
//
// SPDX-License-Identifier: GPL-3.0-or-later
//
#include <uhd/transport/vrt_if_packet.hpp>
#include <uhd/types/metadata.hpp>
#include <uhd/utils/byteswap.hpp>
#include <uhd/utils/log.hpp>
#include <uhdlib/usrp/common/recv_packet_demuxer.hpp>
#include <deque>
#include <mutex>
#include <queue>
#include <vector>
using namespace uhd;
using namespace uhd::usrp;
using namespace uhd::transport;
struct recv_pkt_demux_mrb : public managed_recv_buffer
{
public:
recv_pkt_demux_mrb(void)
{ /*NOP*/
}
void release(void) override
{
delete this;
}
uint32_t buff[10];
};
static UHD_INLINE uint32_t extract_sid(managed_recv_buffer::sptr& buff)
{
// ASSUME that the data is in little endian format
return uhd::wtohx(buff->cast<const uint32_t*>()[1]);
}
recv_packet_demuxer::~recv_packet_demuxer(void)
{
/* NOP */
}
class recv_packet_demuxer_impl : public uhd::usrp::recv_packet_demuxer
{
public:
recv_packet_demuxer_impl(transport::zero_copy_if::sptr transport,
const size_t size,
const uint32_t sid_base)
: _transport(transport), _sid_base(sid_base), _queues(size)
{
/* NOP */
}
managed_recv_buffer::sptr get_recv_buff(
const size_t index, const double timeout) override
{
std::lock_guard<std::mutex> lock(_mutex);
managed_recv_buffer::sptr buff;
// there is already an entry in the queue, so pop that
if (not _queues[index].wrapper.empty()) {
std::swap(buff, _queues[index].wrapper.front());
_queues[index].wrapper.pop();
return buff;
}
while (true) {
// otherwise call into the transport
buff = _transport->get_recv_buff(timeout);
if (buff.get() == NULL)
return buff; // timeout
// check the stream id to know which channel
const size_t rx_index = extract_sid(buff) - _sid_base;
if (rx_index == index)
return buff; // got expected message
// otherwise queue and try again
if (rx_index < _queues.size())
_queues[rx_index].wrapper.push(buff);
else {
UHD_LOGGER_ERROR("STREAMER")
<< "Got a data packet with unknown SID " << extract_sid(buff);
recv_pkt_demux_mrb* mrb = new recv_pkt_demux_mrb();
vrt::if_packet_info_t info;
info.packet_type = vrt::if_packet_info_t::PACKET_TYPE_DATA;
info.num_payload_words32 = 1;
info.num_payload_bytes = info.num_payload_words32 * sizeof(uint32_t);
info.has_sid = true;
info.sid = _sid_base + index;
vrt::if_hdr_pack_le(mrb->buff, info);
mrb->buff[info.num_header_words32] = rx_metadata_t::ERROR_CODE_OVERFLOW;
return mrb->make(
mrb, mrb->buff, info.num_packet_words32 * sizeof(uint32_t));
}
}
}
private:
transport::zero_copy_if::sptr _transport;
const uint32_t _sid_base;
std::mutex _mutex;
struct channel_guts_type
{
channel_guts_type(void) : wrapper(container) {}
std::deque<managed_recv_buffer::sptr> container;
std::queue<managed_recv_buffer::sptr> wrapper;
};
std::vector<channel_guts_type> _queues;
};
recv_packet_demuxer::sptr recv_packet_demuxer::make(
transport::zero_copy_if::sptr transport, const size_t size, const uint32_t sid_base)
{
return sptr(new recv_packet_demuxer_impl(transport, size, sid_base));
}
|