File: process_stdin.rs

package info (click to toggle)
rust-wasmtime 26.0.1%2Bdfsg-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 48,492 kB
  • sloc: ansic: 4,003; sh: 561; javascript: 542; cpp: 254; asm: 175; ml: 96; makefile: 55
file content (165 lines) | stat: -rw-r--r-- 5,892 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
use std::io::{BufRead, Write};
use std::process::Command;
use wasmtime_wasi::{HostInputStream, Subscribe};

const VAR_NAME: &str = "__CHILD_PROCESS";

fn main() {
    if cfg!(miri) {
        return;
    }
    // Skip this tests if it looks like we're in a cross-compiled situation and
    // we're emulating this test for a different platform. In that scenario
    // emulators (like QEMU) tend to not report signals the same way and such.
    if std::env::vars()
        .filter(|(k, _v)| k.starts_with("CARGO_TARGET") && k.ends_with("RUNNER"))
        .count()
        > 0
    {
        return;
    }

    match std::env::var(VAR_NAME) {
        Ok(_) => child_process(),
        Err(_) => parent_process(),
    }

    fn child_process() {
        let mut result_write = std::io::stderr();
        let mut child_running = true;
        while child_running {
            tokio::runtime::Builder::new_multi_thread()
                .enable_all()
                .build()
                .unwrap()
                .block_on(async {
                    'task: loop {
                        println!("child: creating stdin");
                        let mut stdin = wasmtime_wasi::stdin();

                        println!("child: checking that stdin is not ready");
                        assert!(
                            tokio::time::timeout(
                                std::time::Duration::from_millis(100),
                                stdin.ready()
                            )
                            .await
                            .is_err(),
                            "stdin available too soon"
                        );

                        writeln!(&mut result_write, "start").unwrap();

                        println!("child: started");

                        let mut buffer = String::new();
                        loop {
                            println!("child: waiting for stdin to be ready");
                            stdin.ready().await;

                            println!("child: reading input");
                            // We can't effectively test for the case where stdin was closed, so panic if it is...
                            let bytes = stdin.read(1024).unwrap();

                            println!("child got: {bytes:?}");

                            buffer.push_str(std::str::from_utf8(bytes.as_ref()).unwrap());
                            if let Some((line, rest)) = buffer.split_once('\n') {
                                if line == "all done" {
                                    writeln!(&mut result_write, "done").unwrap();
                                    println!("child: exiting...");
                                    child_running = false;
                                    break 'task;
                                } else if line == "restart_runtime" {
                                    writeln!(&mut result_write, "restarting").unwrap();
                                    println!("child: restarting runtime...");
                                    break 'task;
                                } else if line == "restart_task" {
                                    writeln!(&mut result_write, "restarting").unwrap();
                                    println!("child: restarting task...");
                                    continue 'task;
                                } else {
                                    writeln!(&mut result_write, "{line}").unwrap();
                                }

                                buffer = rest.to_owned();
                            }
                        }
                    }
                });
            println!("child: runtime exited");
        }
        println!("child: exiting");
    }
}

fn parent_process() {
    let me = std::env::current_exe().unwrap();
    let mut cmd = Command::new(me);
    cmd.env(VAR_NAME, "1");
    cmd.stdin(std::process::Stdio::piped());

    if std::env::args().any(|arg| arg == "--nocapture") {
        cmd.stdout(std::process::Stdio::inherit());
    } else {
        cmd.stdout(std::process::Stdio::null());
    }

    cmd.stderr(std::process::Stdio::piped());
    let mut child = cmd.spawn().unwrap();

    let mut stdin_write = child.stdin.take().unwrap();
    let mut result_read = std::io::BufReader::new(child.stderr.take().unwrap());

    let mut line = String::new();
    result_read.read_line(&mut line).unwrap();
    assert_eq!(line, "start\n");

    for i in 0..5 {
        let message = format!("some bytes {i}\n");
        stdin_write.write_all(message.as_bytes()).unwrap();
        line.clear();
        result_read.read_line(&mut line).unwrap();
        assert_eq!(line, message);
    }

    writeln!(&mut stdin_write, "restart_task").unwrap();
    line.clear();
    result_read.read_line(&mut line).unwrap();
    assert_eq!(line, "restarting\n");
    line.clear();

    result_read.read_line(&mut line).unwrap();
    assert_eq!(line, "start\n");

    for i in 0..10 {
        let message = format!("more bytes {i}\n");
        stdin_write.write_all(message.as_bytes()).unwrap();
        line.clear();
        result_read.read_line(&mut line).unwrap();
        assert_eq!(line, message);
    }

    writeln!(&mut stdin_write, "restart_runtime").unwrap();
    line.clear();
    result_read.read_line(&mut line).unwrap();
    assert_eq!(line, "restarting\n");
    line.clear();

    result_read.read_line(&mut line).unwrap();
    assert_eq!(line, "start\n");

    for i in 0..17 {
        let message = format!("even more bytes {i}\n");
        stdin_write.write_all(message.as_bytes()).unwrap();
        line.clear();
        result_read.read_line(&mut line).unwrap();
        assert_eq!(line, message);
    }

    writeln!(&mut stdin_write, "all done").unwrap();

    line.clear();
    result_read.read_line(&mut line).unwrap();
    assert_eq!(line, "done\n");
}