1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181
|
/*
* Copyright 2004 The WebRTC Project Authors. All rights reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
#include "rtc_base/test_client.h"
#include <string.h>
#include <cstdint>
#include <memory>
#include <optional>
#include <utility>
#include "api/units/time_delta.h"
#include "api/units/timestamp.h"
#include "rtc_base/async_packet_socket.h"
#include "rtc_base/fake_clock.h"
#include "rtc_base/network/received_packet.h"
#include "rtc_base/socket.h"
#include "rtc_base/socket_address.h"
#include "rtc_base/synchronization/mutex.h"
#include "rtc_base/thread.h"
#include "rtc_base/time_utils.h"
namespace webrtc {
// DESIGN: Each packet received is put it into a list of packets.
// Callers can retrieve received packets from any thread by calling
// NextPacket.
TestClient::TestClient(std::unique_ptr<AsyncPacketSocket> socket)
: TestClient(std::move(socket), nullptr) {}
TestClient::TestClient(std::unique_ptr<AsyncPacketSocket> socket,
ThreadProcessingFakeClock* fake_clock)
: fake_clock_(fake_clock), socket_(std::move(socket)) {
socket_->RegisterReceivedPacketCallback(
[&](AsyncPacketSocket* socket, const ReceivedIpPacket& packet) {
OnPacket(socket, packet);
});
socket_->SignalReadyToSend.connect(this, &TestClient::OnReadyToSend);
}
TestClient::~TestClient() {}
bool TestClient::CheckConnState(AsyncPacketSocket::State state) {
// Wait for our timeout value until the socket reaches the desired state.
int64_t end = TimeAfter(kTimeoutMs);
while (socket_->GetState() != state && TimeUntil(end) > 0) {
AdvanceTime(1);
}
return (socket_->GetState() == state);
}
int TestClient::Send(const char* buf, size_t size) {
AsyncSocketPacketOptions options;
return socket_->Send(buf, size, options);
}
int TestClient::SendTo(const char* buf,
size_t size,
const SocketAddress& dest) {
AsyncSocketPacketOptions options;
return socket_->SendTo(buf, size, dest, options);
}
std::unique_ptr<TestClient::Packet> TestClient::NextPacket(int timeout_ms) {
// If no packets are currently available, we go into a get/dispatch loop for
// at most timeout_ms. If, during the loop, a packet arrives, then we can
// stop early and return it.
// Note that the case where no packet arrives is important. We often want to
// test that a packet does not arrive.
// Note also that we only try to pump our current thread's message queue.
// Pumping another thread's queue could lead to messages being dispatched from
// the wrong thread to non-thread-safe objects.
int64_t end = TimeAfter(timeout_ms);
while (TimeUntil(end) > 0) {
{
MutexLock lock(&mutex_);
if (!packets_.empty()) {
break;
}
}
AdvanceTime(1);
}
// Return the first packet placed in the queue.
std::unique_ptr<Packet> packet;
MutexLock lock(&mutex_);
if (!packets_.empty()) {
packet = std::move(packets_.front());
packets_.erase(packets_.begin());
}
return packet;
}
bool TestClient::CheckNextPacket(const char* buf,
size_t size,
SocketAddress* addr) {
bool res = false;
std::unique_ptr<Packet> packet = NextPacket(kTimeoutMs);
if (packet) {
res = (packet->buf.size() == size &&
memcmp(packet->buf.data(), buf, size) == 0 &&
CheckTimestamp(packet->packet_time));
if (addr)
*addr = packet->addr;
}
return res;
}
bool TestClient::CheckTimestamp(std::optional<Timestamp> packet_timestamp) {
bool res = true;
if (!packet_timestamp) {
res = false;
}
if (prev_packet_timestamp_) {
if (packet_timestamp < prev_packet_timestamp_) {
res = false;
}
}
prev_packet_timestamp_ = packet_timestamp;
return res;
}
void TestClient::AdvanceTime(int ms) {
// If the test is using a fake clock, we must advance the fake clock to
// advance time. Otherwise, ProcessMessages will work.
if (fake_clock_) {
for (int64_t start = TimeMillis(); TimeMillis() < start + ms;) {
fake_clock_->AdvanceTime(TimeDelta::Millis(1));
};
} else {
Thread::Current()->ProcessMessages(1);
}
}
bool TestClient::CheckNoPacket() {
return NextPacket(kNoPacketTimeoutMs) == nullptr;
}
int TestClient::GetError() {
return socket_->GetError();
}
int TestClient::SetOption(Socket::Option opt, int value) {
return socket_->SetOption(opt, value);
}
void TestClient::OnPacket(AsyncPacketSocket* socket,
const ReceivedIpPacket& received_packet) {
MutexLock lock(&mutex_);
packets_.push_back(std::make_unique<Packet>(received_packet));
}
void TestClient::OnReadyToSend(AsyncPacketSocket* socket) {
++ready_to_send_count_;
}
TestClient::Packet::Packet(const ReceivedIpPacket& received_packet)
: addr(received_packet.source_address()),
// Copy received_packet payload to a buffer owned by Packet.
buf(received_packet.payload().data(), received_packet.payload().size()),
packet_time(received_packet.arrival_time()) {}
TestClient::Packet::Packet(const Packet& p)
: addr(p.addr),
buf(p.buf.data(), p.buf.size()),
packet_time(p.packet_time) {}
} // namespace webrtc
|