1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93
|
use {
super::drive_server,
crate::{
os::windows::named_pipe::{
pipe_mode, DuplexPipeStream, PipeListener, RecvPipeStream, SendPipeStream,
},
tests::util::*,
},
std::{
io::{prelude::*, BufReader},
sync::{mpsc::Sender, Arc},
},
};
fn msg(server: bool) -> Box<str> { message(None, server, Some('\n')) }
fn handle_conn_duplex(
listener: &mut PipeListener<pipe_mode::Bytes, pipe_mode::Bytes>,
) -> TestResult {
let (mut recver, mut sender) = listener.accept().opname("accept")?.split();
recv(&mut recver, msg(false))?;
send(&mut sender, msg(true))?;
DuplexPipeStream::reunite(recver, sender).opname("reunite")?;
Ok(())
}
fn handle_conn_cts(listener: &mut PipeListener<pipe_mode::Bytes, pipe_mode::None>) -> TestResult {
let mut recver = listener.accept().opname("accept")?;
recv(&mut recver, msg(false))
}
fn handle_conn_stc(listener: &mut PipeListener<pipe_mode::None, pipe_mode::Bytes>) -> TestResult {
let mut sender = listener.accept().opname("accept")?;
send(&mut sender, msg(true))
}
pub fn server_duplex(id: &str, name_sender: Sender<Arc<str>>, num_clients: u32) -> TestResult {
drive_server(
id,
name_sender,
num_clients,
|plo| plo.create_duplex::<pipe_mode::Bytes>(),
handle_conn_duplex,
)
}
pub fn server_cts(id: &str, name_sender: Sender<Arc<str>>, num_clients: u32) -> TestResult {
drive_server(
id,
name_sender,
num_clients,
|plo| plo.create_recv_only::<pipe_mode::Bytes>(),
handle_conn_cts,
)
}
pub fn server_stc(id: &str, name_sender: Sender<Arc<str>>, num_clients: u32) -> TestResult {
drive_server(
id,
name_sender,
num_clients,
|plo| plo.create_send_only::<pipe_mode::Bytes>(),
handle_conn_stc,
)
}
pub fn client_duplex(name: &str) -> TestResult {
let (mut recver, mut sender) =
DuplexPipeStream::<pipe_mode::Bytes>::connect_by_path(name).opname("connect")?.split();
send(&mut sender, msg(false))?;
recv(&mut recver, msg(true))?;
DuplexPipeStream::reunite(recver, sender).opname("reunite")?;
Ok(())
}
pub fn client_cts(name: &str) -> TestResult {
let mut sender =
SendPipeStream::<pipe_mode::Bytes>::connect_by_path(name).opname("connect")?;
send(&mut sender, msg(false))
}
pub fn client_stc(name: &str) -> TestResult {
let mut recver =
RecvPipeStream::<pipe_mode::Bytes>::connect_by_path(name).opname("connect")?;
recv(&mut recver, msg(true))
}
fn recv(conn: &mut RecvPipeStream<pipe_mode::Bytes>, exp: impl AsRef<str>) -> TestResult {
let mut conn = BufReader::new(conn);
let exp_ = exp.as_ref();
let mut buf = String::with_capacity(exp_.len());
conn.read_line(&mut buf).opname("receive")?;
ensure_eq!(buf, exp_);
Ok(())
}
fn send(conn: &mut SendPipeStream<pipe_mode::Bytes>, msg: impl AsRef<str>) -> TestResult {
conn.write_all(msg.as_ref().as_bytes()).opname("send")?;
conn.flush().opname("flush")
}
|