File: stream.rs

package info (click to toggle)
rust-interprocess 2.2.3-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 1,016 kB
  • sloc: makefile: 2
file content (136 lines) | stat: -rw-r--r-- 4,600 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
use {
    crate::{
        local_socket::{
            tokio::{prelude::*, Stream},
            ListenerOptions, Name,
        },
        tests::util::*,
        BoolExt, SubUsizeExt,
    },
    ::tokio::{
        io::{AsyncBufRead, AsyncBufReadExt, AsyncWrite, AsyncWriteExt, BufReader},
        sync::oneshot::Sender,
        task, try_join,
    },
    color_eyre::eyre::WrapErr,
    std::{future::Future, str, sync::Arc},
};

fn msg(server: bool, nts: bool) -> Box<str> {
    message(None, server, Some(['\n', '\0'][nts.to_usize()]))
}

pub async fn server<HCF: Future<Output = TestResult> + Send + 'static>(
    id: &str,
    mut handle_client: impl FnMut(Stream) -> HCF,
    name_sender: Sender<Arc<Name<'static>>>,
    num_clients: u32,
    path: bool,
) -> TestResult {
    let (name, listener) = listen_and_pick_name(&mut namegen_local_socket(id, path), |nm| {
        ListenerOptions::new().name(nm.borrow()).create_tokio()
    })?;

    let _ = name_sender.send(name);

    let mut tasks = Vec::with_capacity(num_clients.try_into().unwrap());
    for _ in 0..num_clients {
        let conn = listener.accept().await.opname("accept")?;
        tasks.push(task::spawn(handle_client(conn)));
    }
    for task in tasks {
        task.await
            .context("server task panicked")?
            .context("server task returned early with error")?;
    }
    Ok(())
}

pub async fn handle_client_nosplit(conn: Stream) -> TestResult {
    let (mut recver, mut sender) = (BufReader::new(&conn), &conn);
    let recv = async {
        recv(&mut recver, &msg(false, false), 0).await?;
        recv(&mut recver, &msg(false, true), 1).await
    };
    let send = async {
        send(&mut sender, &msg(true, false), 0).await?;
        send(&mut sender, &msg(true, true), 1).await
    };
    try_join!(recv, send).map(|((), ())| ())
}

pub async fn handle_client_split(conn: Stream) -> TestResult {
    let (recver, sender) = conn.split();

    let recv = task::spawn(async move {
        let mut recver = BufReader::new(recver);
        recv(&mut recver, &msg(true, false), 0).await?;
        recv(&mut recver, &msg(true, true), 1).await?;
        TestResult::<_>::Ok(recver.into_inner())
    });
    let send = task::spawn(async move {
        let mut sender = sender;
        send(&mut sender, &msg(false, false), 0).await?;
        send(&mut sender, &msg(false, true), 1).await?;
        TestResult::<_>::Ok(sender)
    });

    let (recver, sender) = try_join!(recv, send)?;
    Stream::reunite(recver?, sender?).opname("reunite")?;
    Ok(())
}

pub async fn client_nosplit(nm: Arc<Name<'static>>) -> TestResult {
    let conn = Stream::connect(nm.borrow()).await.opname("connect")?;
    let (mut recver, mut sender) = (BufReader::new(&conn), &conn);
    let recv = async {
        recv(&mut recver, &msg(true, false), 0).await?;
        recv(&mut recver, &msg(true, true), 1).await
    };
    let send = async {
        send(&mut sender, &msg(false, false), 0).await?;
        send(&mut sender, &msg(false, true), 1).await
    };
    try_join!(recv, send).map(|((), ())| ())
}

pub async fn client_split(name: Arc<Name<'_>>) -> TestResult {
    let (recver, sender) = Stream::connect(name.borrow()).await.opname("connect")?.split();

    let recv = task::spawn(async move {
        let mut recver = BufReader::new(recver);
        recv(&mut recver, &msg(false, false), 0).await?;
        recv(&mut recver, &msg(false, true), 1).await?;
        TestResult::<_>::Ok(recver.into_inner())
    });
    let send = task::spawn(async move {
        let mut sender = sender;
        send(&mut sender, &msg(true, false), 0).await?;
        send(&mut sender, &msg(true, true), 1).await?;
        TestResult::<_>::Ok(sender)
    });

    let (recver, sender) = try_join!(recv, send)?;
    Stream::reunite(recver?, sender?).opname("reunite")?;
    Ok(())
}

async fn recv(conn: &mut (dyn AsyncBufRead + Unpin + Send), exp: &str, nr: u8) -> TestResult {
    let term = *exp.as_bytes().last().unwrap();
    let fs = ["first", "second"][nr.to_usize()];

    let mut buffer = Vec::with_capacity(exp.len());
    conn.read_until(term, &mut buffer)
        .await
        .wrap_err_with(|| format!("{} receive failed", fs))?;
    ensure_eq!(
        str::from_utf8(&buffer).with_context(|| format!("{} receive wasn't valid UTF-8", fs))?,
        exp,
    );
    Ok(())
}

async fn send(conn: &mut (dyn AsyncWrite + Unpin + Send), msg: &str, nr: u8) -> TestResult {
    let fs = ["first", "second"][nr.to_usize()];
    conn.write_all(msg.as_bytes()).await.with_context(|| format!("{} socket send failed", fs))
}