File: stream.rs

package info (click to toggle)
rust-interprocess 1.2.1-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 980 kB
  • sloc: makefile: 2
file content (99 lines) | stat: -rw-r--r-- 2,884 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
use {
    super::util::{NameGen, TestResult},
    anyhow::Context,
    futures::io::{AsyncBufReadExt, AsyncWriteExt, BufReader},
    interprocess::local_socket::tokio::{LocalSocketListener, LocalSocketStream},
    std::{convert::TryInto, io, sync::Arc},
    tokio::{sync::oneshot::Sender, task, try_join},
};

static SERVER_MSG: &str = "Hello from server!\n";
static CLIENT_MSG: &str = "Hello from client!\n";

pub async fn server(
    name_sender: Sender<String>,
    num_clients: u32,
    prefer_namespaced: bool,
) -> TestResult {
    async fn handle_conn(conn: LocalSocketStream) -> TestResult {
        let (reader, mut writer) = conn.into_split();
        let mut buffer = String::with_capacity(128);
        let mut reader = BufReader::new(reader);

        let read = async {
            reader
                .read_line(&mut buffer)
                .await
                .context("Socket receive failed")
        };
        let write = async {
            writer
                .write_all(SERVER_MSG.as_bytes())
                .await
                .context("Socket send failed")
        };
        try_join!(read, write)?;

        assert_eq!(buffer, CLIENT_MSG);
        Ok(())
    }

    let (name, listener) = NameGen::new_auto(prefer_namespaced)
        .find_map(|nm| {
            let l = match LocalSocketListener::bind(&*nm) {
                Ok(l) => l,
                Err(e) if e.kind() == io::ErrorKind::AddrInUse => return None,
                Err(e) => return Some(Err(e)),
            };
            Some(Ok((nm, l)))
        })
        .unwrap()
        .context("Listener bind failed")?;

    let _ = name_sender.send(name);

    let mut tasks = Vec::with_capacity(num_clients.try_into().unwrap());
    for _ in 0..num_clients {
        let conn = match listener.accept().await {
            Ok(c) => c,
            Err(e) => {
                eprintln!("Incoming connection failed: {}", e);
                continue;
            }
        };
        tasks.push(task::spawn(handle_conn(conn)));
    }
    for task in tasks {
        task.await
            .context("Server task panicked")?
            .context("Server task returned early with error")?;
    }
    Ok(())
}
pub async fn client(name: Arc<String>) -> TestResult {
    let mut buffer = String::with_capacity(128);

    let (reader, mut writer) = LocalSocketStream::connect(name.as_str())
        .await
        .context("Connect failed")?
        .into_split();
    let mut reader = BufReader::new(reader);

    let read = async {
        reader
            .read_line(&mut buffer)
            .await
            .context("Socket receive failed")
    };
    let write = async {
        writer
            .write_all(CLIENT_MSG.as_bytes())
            .await
            .context("Socket send failed")
    };
    try_join!(read, write)?;

    assert_eq!(buffer, SERVER_MSG);

    Ok(())
}