File: http_server.rs

package info (click to toggle)
rust-wasmtime 26.0.1%2Bdfsg-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 48,492 kB
  • sloc: ansic: 4,003; sh: 561; javascript: 542; cpp: 254; asm: 175; ml: 96; makefile: 55
file content (138 lines) | stat: -rw-r--r-- 4,835 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
use anyhow::{Context, Result};
use http_body_util::{combinators::BoxBody, BodyExt, Full};
use hyper::{body::Bytes, service::service_fn, Request, Response};
use std::{
    future::Future,
    net::{SocketAddr, TcpStream},
    thread::JoinHandle,
};
use tokio::net::TcpListener;
use wasmtime_wasi_http::io::TokioIo;

async fn test(
    mut req: Request<hyper::body::Incoming>,
) -> http::Result<Response<BoxBody<Bytes, std::convert::Infallible>>> {
    tracing::debug!("preparing mocked response",);
    let method = req.method().to_string();
    let body = req.body_mut().collect().await.unwrap();
    let buf = body.to_bytes();
    tracing::trace!("hyper request body size {:?}", buf.len());

    Response::builder()
        .status(http::StatusCode::OK)
        .header("x-wasmtime-test-method", method)
        .header("x-wasmtime-test-uri", req.uri().to_string())
        .body(Full::<Bytes>::from(buf).boxed())
}

pub struct Server {
    addr: SocketAddr,
    worker: Option<JoinHandle<Result<()>>>,
}

impl Server {
    fn new<F>(
        run: impl FnOnce(TokioIo<tokio::net::TcpStream>) -> F + Send + 'static,
    ) -> Result<Self>
    where
        F: Future<Output = Result<()>>,
    {
        let thread = std::thread::spawn(|| -> Result<_> {
            let rt = tokio::runtime::Builder::new_current_thread()
                .enable_all()
                .build()
                .context("failed to start tokio runtime")?;
            let listener = rt.block_on(async move {
                let addr = SocketAddr::from(([127, 0, 0, 1], 0));
                TcpListener::bind(addr).await.context("failed to bind")
            })?;
            Ok((rt, listener))
        });
        let (rt, listener) = thread.join().unwrap()?;
        let addr = listener.local_addr().context("failed to get local addr")?;
        let worker = std::thread::spawn(move || {
            tracing::debug!("dedicated thread to start listening");
            rt.block_on(async move {
                tracing::debug!("preparing to accept connection");
                let (stream, _) = listener.accept().await.map_err(anyhow::Error::from)?;
                run(TokioIo::new(stream)).await
            })
        });
        Ok(Self {
            worker: Some(worker),
            addr,
        })
    }

    pub fn http1() -> Result<Self> {
        tracing::debug!("initializing http1 server");
        Self::new(|io| async move {
            let mut builder = hyper::server::conn::http1::Builder::new();
            let http = builder.keep_alive(false).pipeline_flush(true);

            tracing::debug!("preparing to bind connection to service");
            let conn = http.serve_connection(io, service_fn(test)).await;
            tracing::trace!("connection result {:?}", conn);
            conn?;
            Ok(())
        })
    }

    pub fn http2() -> Result<Self> {
        tracing::debug!("initializing http2 server");
        Self::new(|io| async move {
            let mut builder = hyper::server::conn::http2::Builder::new(TokioExecutor);
            let http = builder.max_concurrent_streams(20);

            tracing::debug!("preparing to bind connection to service");
            let conn = http.serve_connection(io, service_fn(test)).await;
            tracing::trace!("connection result {:?}", conn);
            if let Err(e) = &conn {
                let message = e.to_string();
                if message.contains("connection closed before reading preface")
                    || message.contains("unspecific protocol error detected")
                {
                    return Ok(());
                }
            }
            conn?;
            Ok(())
        })
    }

    pub fn addr(&self) -> String {
        format!("localhost:{}", self.addr.port())
    }
}

impl Drop for Server {
    fn drop(&mut self) {
        tracing::debug!("shutting down http1 server");
        // Force a connection to happen in case one hasn't happened already.
        let _ = TcpStream::connect(&self.addr);

        // If the worker fails with an error, report it here but don't panic.
        // Some tests don't make a connection so the error will be that the tcp
        // stream created above is closed immediately afterwards. Let the test
        // independently decide if it failed or not, and this should be in the
        // logs to assist with debugging if necessary.
        let worker = self.worker.take().unwrap();
        if let Err(e) = worker.join().unwrap() {
            eprintln!("worker failed with error {e:?}");
        }
    }
}

#[derive(Clone)]
/// An Executor that uses the tokio runtime.
struct TokioExecutor;

impl<F> hyper::rt::Executor<F> for TokioExecutor
where
    F: Future + Send + 'static,
    F::Output: Send + 'static,
{
    fn execute(&self, fut: F) {
        tokio::task::spawn(fut);
    }
}