1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136
|
use {
crate::{
local_socket::{
tokio::{prelude::*, Stream},
ListenerOptions, Name,
},
tests::util::*,
BoolExt, SubUsizeExt,
},
::tokio::{
io::{AsyncBufRead, AsyncBufReadExt, AsyncWrite, AsyncWriteExt, BufReader},
sync::oneshot::Sender,
task, try_join,
},
color_eyre::eyre::WrapErr,
std::{future::Future, str, sync::Arc},
};
fn msg(server: bool, nts: bool) -> Box<str> {
message(None, server, Some(['\n', '\0'][nts.to_usize()]))
}
pub async fn server<HCF: Future<Output = TestResult> + Send + 'static>(
id: &str,
mut handle_client: impl FnMut(Stream) -> HCF,
name_sender: Sender<Arc<Name<'static>>>,
num_clients: u32,
path: bool,
) -> TestResult {
let (name, listener) = listen_and_pick_name(&mut namegen_local_socket(id, path), |nm| {
ListenerOptions::new().name(nm.borrow()).create_tokio()
})?;
let _ = name_sender.send(name);
let mut tasks = Vec::with_capacity(num_clients.try_into().unwrap());
for _ in 0..num_clients {
let conn = listener.accept().await.opname("accept")?;
tasks.push(task::spawn(handle_client(conn)));
}
for task in tasks {
task.await
.context("server task panicked")?
.context("server task returned early with error")?;
}
Ok(())
}
pub async fn handle_client_nosplit(conn: Stream) -> TestResult {
let (mut recver, mut sender) = (BufReader::new(&conn), &conn);
let recv = async {
recv(&mut recver, &msg(false, false), 0).await?;
recv(&mut recver, &msg(false, true), 1).await
};
let send = async {
send(&mut sender, &msg(true, false), 0).await?;
send(&mut sender, &msg(true, true), 1).await
};
try_join!(recv, send).map(|((), ())| ())
}
pub async fn handle_client_split(conn: Stream) -> TestResult {
let (recver, sender) = conn.split();
let recv = task::spawn(async move {
let mut recver = BufReader::new(recver);
recv(&mut recver, &msg(true, false), 0).await?;
recv(&mut recver, &msg(true, true), 1).await?;
TestResult::<_>::Ok(recver.into_inner())
});
let send = task::spawn(async move {
let mut sender = sender;
send(&mut sender, &msg(false, false), 0).await?;
send(&mut sender, &msg(false, true), 1).await?;
TestResult::<_>::Ok(sender)
});
let (recver, sender) = try_join!(recv, send)?;
Stream::reunite(recver?, sender?).opname("reunite")?;
Ok(())
}
pub async fn client_nosplit(nm: Arc<Name<'static>>) -> TestResult {
let conn = Stream::connect(nm.borrow()).await.opname("connect")?;
let (mut recver, mut sender) = (BufReader::new(&conn), &conn);
let recv = async {
recv(&mut recver, &msg(true, false), 0).await?;
recv(&mut recver, &msg(true, true), 1).await
};
let send = async {
send(&mut sender, &msg(false, false), 0).await?;
send(&mut sender, &msg(false, true), 1).await
};
try_join!(recv, send).map(|((), ())| ())
}
pub async fn client_split(name: Arc<Name<'_>>) -> TestResult {
let (recver, sender) = Stream::connect(name.borrow()).await.opname("connect")?.split();
let recv = task::spawn(async move {
let mut recver = BufReader::new(recver);
recv(&mut recver, &msg(false, false), 0).await?;
recv(&mut recver, &msg(false, true), 1).await?;
TestResult::<_>::Ok(recver.into_inner())
});
let send = task::spawn(async move {
let mut sender = sender;
send(&mut sender, &msg(true, false), 0).await?;
send(&mut sender, &msg(true, true), 1).await?;
TestResult::<_>::Ok(sender)
});
let (recver, sender) = try_join!(recv, send)?;
Stream::reunite(recver?, sender?).opname("reunite")?;
Ok(())
}
async fn recv(conn: &mut (dyn AsyncBufRead + Unpin + Send), exp: &str, nr: u8) -> TestResult {
let term = *exp.as_bytes().last().unwrap();
let fs = ["first", "second"][nr.to_usize()];
let mut buffer = Vec::with_capacity(exp.len());
conn.read_until(term, &mut buffer)
.await
.wrap_err_with(|| format!("{} receive failed", fs))?;
ensure_eq!(
str::from_utf8(&buffer).with_context(|| format!("{} receive wasn't valid UTF-8", fs))?,
exp,
);
Ok(())
}
async fn send(conn: &mut (dyn AsyncWrite + Unpin + Send), msg: &str, nr: u8) -> TestResult {
let fs = ["first", "second"][nr.to_usize()];
conn.write_all(msg.as_bytes()).await.with_context(|| format!("{} socket send failed", fs))
}
|