File: drive.rs

package info (click to toggle)
rust-interprocess 2.2.3-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 1,016 kB
  • sloc: makefile: 2
file content (112 lines) | stat: -rw-r--r-- 3,839 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
use {
    super::{choke::*, num_clients, num_concurrent_clients, TestResult},
    color_eyre::eyre::{bail, Context},
    std::{
        borrow::Borrow,
        io,
        sync::mpsc::{channel, /*Receiver,*/ Sender},
        thread,
    },
};

/// Waits for the leader closure to reach a point where it sends a message for the follower closure,
/// then runs the follower. Captures Eyre errors on both sides and bubbles them up if they occur,
/// reporting which side produced the error.
pub fn drive_pair<T, Ld, Fl>(
    leader: Ld,
    leader_name: &str,
    follower: Fl,
    follower_name: &str,
) -> TestResult
where
    T: Send,
    Ld: FnOnce(Sender<T>) -> TestResult + Send,
    Fl: FnOnce(T) -> TestResult,
{
    thread::scope(|scope| {
        let (sender, receiver) = channel();

        let ltname = leader_name.to_lowercase();
        let leading_thread = thread::Builder::new()
            .name(ltname)
            .spawn_scoped(scope, move || leader(sender))
            .with_context(|| format!("{leader_name} thread launch failed"))?;

        if let Ok(msg) = receiver.recv() {
            // If the leader reached the send point, proceed with the follower code
            let rslt = follower(msg);
            exclude_deadconn(rslt)
                .with_context(|| format!("{follower_name} exited early with error"))?;
        }
        let Ok(rslt) = leading_thread.join() else {
            bail!("{leader_name} panicked");
        };
        exclude_deadconn(rslt).with_context(|| format!("{leader_name} exited early with error"))
    })
}

/// Filters errors that have to do with the other side returning an error and not bubbling it up in
/// time.
#[rustfmt::skip] // oh FUCK OFF
fn exclude_deadconn(r: TestResult) -> TestResult {
    use io::ErrorKind::*;
    let Err(e) = r else {
        return r;
    };
    let Some(ioe) = e.root_cause().downcast_ref::<io::Error>() else {
        return Err(e);
    };
    match ioe.kind() {
          ConnectionRefused
        | ConnectionReset
        | ConnectionAborted
        | NotConnected
        | BrokenPipe
        | WriteZero
        | UnexpectedEof => Ok(()),
        _ => Err(e),
    }
}

pub fn drive_server_and_multiple_clients<T, B, Srv, Clt>(server: Srv, client: Clt) -> TestResult
where
    T: Send + Borrow<B>,
    B: Send + Sync + ?Sized,
    Srv: FnOnce(Sender<T>, u32) -> TestResult + Send,
    Clt: Fn(&B) -> TestResult + Send + Sync,
{
    let choke = Choke::new(num_concurrent_clients());

    let num_clients = num_clients();
    let client_wrapper = |msg: T| {
        thread::scope(|scope| {
            let mut client_threads = Vec::with_capacity(usize::try_from(num_clients).unwrap());
            for n in 1..=num_clients {
                let tname = format!("client {n}");

                let choke_guard = choke.take();
                let (bclient, bmsg) = (&client, msg.borrow());

                let jhndl = thread::Builder::new()
                    .name(tname.clone())
                    .spawn_scoped(scope, move || {
                        // Has to use move to send to other thread to drop when client finishes
                        let _cg = choke_guard;
                        bclient(bmsg)
                    })
                    .with_context(|| format!("{tname} thread launch failed"))?;
                client_threads.push(jhndl);
            }
            for client in client_threads {
                let Ok(rslt) = client.join() else {
                    bail!("client thread panicked");
                };
                rslt?; // Early-return the first error; context not necessary as drive_pair does it
            }
            Ok(())
        })
    };
    let server_wrapper = move |sender: Sender<T>| server(sender, num_clients);

    drive_pair(server_wrapper, "server", client_wrapper, "client")
}