File: msg.rs

package info (click to toggle)
rust-interprocess 2.2.3-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 1,016 kB
  • sloc: makefile: 2
file content (155 lines) | stat: -rw-r--r-- 4,519 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
use {
    super::drive_server,
    crate::{
        os::windows::named_pipe::{
            pipe_mode, DuplexPipeStream, PipeListener, PipeMode, RecvPipeStream, SendPipeStream,
        },
        tests::util::*,
        SubUsizeExt,
    },
    color_eyre::eyre::{ensure, WrapErr},
    recvmsg::{MsgBuf, RecvMsg, RecvResult},
    std::{
        str,
        sync::{mpsc::Sender, Arc},
    },
};

fn msgs(server: bool) -> [Box<str>; 2] {
    [
        message(Some(format_args!("First")), server, None),
        message(Some(format_args!("Second")), server, None),
    ]
}
fn futf8(m: &[u8]) -> TestResult<&str> {
    str::from_utf8(m).context("received message was not valid UTF-8")
}

fn handle_conn_duplex(
    listener: &mut PipeListener<pipe_mode::Messages, pipe_mode::Messages>,
) -> TestResult {
    let (mut recver, mut sender) = listener.accept().opname("accept")?.split();

    let [msg1, msg2] = msgs(false);
    recv(&mut recver, msg1, 0)?;
    recv(&mut recver, msg2, 1)?;

    let [msg1, msg2] = msgs(true);
    send(&mut sender, msg1, 0)?;
    send(&mut sender, msg2, 1)?;

    DuplexPipeStream::reunite(recver, sender).opname("reunite")?;
    Ok(())
}
fn handle_conn_cts(
    listener: &mut PipeListener<pipe_mode::Messages, pipe_mode::None>,
) -> TestResult {
    let mut recver = listener.accept().opname("accept")?;
    let [msg1, msg2] = msgs(false);
    recv(&mut recver, msg1, 0)?;
    recv(&mut recver, msg2, 1)
}
fn handle_conn_stc(
    listener: &mut PipeListener<pipe_mode::None, pipe_mode::Messages>,
) -> TestResult {
    let mut sender = listener.accept().opname("accept")?;
    let [msg1, msg2] = msgs(true);
    send(&mut sender, msg1, 0)?;
    send(&mut sender, msg2, 1)
}

pub fn server_duplex(id: &str, name_sender: Sender<Arc<str>>, num_clients: u32) -> TestResult {
    drive_server(
        id,
        name_sender,
        num_clients,
        |plo| plo.mode(PipeMode::Messages).create_duplex::<pipe_mode::Messages>(),
        handle_conn_duplex,
    )
}
pub fn server_cts(id: &str, name_sender: Sender<Arc<str>>, num_clients: u32) -> TestResult {
    drive_server(
        id,
        name_sender,
        num_clients,
        |plo| plo.mode(PipeMode::Messages).create_recv_only::<pipe_mode::Messages>(),
        handle_conn_cts,
    )
}
pub fn server_stc(id: &str, name_sender: Sender<Arc<str>>, num_clients: u32) -> TestResult {
    drive_server(
        id,
        name_sender,
        num_clients,
        |plo| plo.mode(PipeMode::Messages).create_send_only::<pipe_mode::Messages>(),
        handle_conn_stc,
    )
}

pub fn client_duplex(name: &str) -> TestResult {
    let (mut recver, mut sender) =
        DuplexPipeStream::<pipe_mode::Messages>::connect_by_path(name).opname("connect")?.split();

    let [msg1, msg2] = msgs(false);
    send(&mut sender, msg1, 0)?;
    send(&mut sender, msg2, 1)?;

    let [msg1, msg2] = msgs(true);
    recv(&mut recver, msg1, 0)?;
    recv(&mut recver, msg2, 1)?;

    DuplexPipeStream::reunite(recver, sender).opname("reunite")?;
    Ok(())
}
pub fn client_cts(name: &str) -> TestResult {
    let mut sender =
        SendPipeStream::<pipe_mode::Messages>::connect_by_path(name).opname("connect")?;
    let [msg1, msg2] = msgs(false);
    send(&mut sender, msg1, 0)?;
    send(&mut sender, msg2, 1)
}
pub fn client_stc(name: &str) -> TestResult {
    let mut recver =
        RecvPipeStream::<pipe_mode::Messages>::connect_by_path(name).opname("connect")?;
    let [msg1, msg2] = msgs(true);
    recv(&mut recver, msg1, 0)?;
    recv(&mut recver, msg2, 1)
}

fn recv(
    conn: &mut RecvPipeStream<pipe_mode::Messages>,
    exp: impl AsRef<str>,
    nr: u8,
) -> TestResult {
    let fs = ["first", "second"][nr.to_usize()];
    let exp_ = exp.as_ref();
    let mut len = exp_.len();
    if nr == 2 {
        len -= 1; // tests spill
    }
    let mut buf = MsgBuf::from(Vec::with_capacity(len));

    let rslt = conn.recv_msg(&mut buf, None).with_context(|| format!("{} receive failed", fs))?;

    ensure_eq!(futf8(buf.filled_part())?, exp_);
    if nr == 2 {
        ensure!(matches!(rslt, RecvResult::Spilled));
    } else {
        ensure!(matches!(rslt, RecvResult::Fit));
    }
    Ok(())
}

fn send(
    conn: &mut SendPipeStream<pipe_mode::Messages>,
    msg: impl AsRef<str>,
    nr: u8,
) -> TestResult {
    let msg_ = msg.as_ref();
    let fs = ["first", "second"][nr.to_usize()];

    let sent = conn.send(msg_.as_bytes()).wrap_err_with(|| format!("{} send failed", fs))?;

    ensure_eq!(sent, msg_.len());
    Ok(())
}