1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138
|
use anyhow::{Context, Result};
use http_body_util::{combinators::BoxBody, BodyExt, Full};
use hyper::{body::Bytes, service::service_fn, Request, Response};
use std::{
future::Future,
net::{SocketAddr, TcpStream},
thread::JoinHandle,
};
use tokio::net::TcpListener;
use wasmtime_wasi_http::io::TokioIo;
async fn test(
mut req: Request<hyper::body::Incoming>,
) -> http::Result<Response<BoxBody<Bytes, std::convert::Infallible>>> {
tracing::debug!("preparing mocked response",);
let method = req.method().to_string();
let body = req.body_mut().collect().await.unwrap();
let buf = body.to_bytes();
tracing::trace!("hyper request body size {:?}", buf.len());
Response::builder()
.status(http::StatusCode::OK)
.header("x-wasmtime-test-method", method)
.header("x-wasmtime-test-uri", req.uri().to_string())
.body(Full::<Bytes>::from(buf).boxed())
}
pub struct Server {
addr: SocketAddr,
worker: Option<JoinHandle<Result<()>>>,
}
impl Server {
fn new<F>(
run: impl FnOnce(TokioIo<tokio::net::TcpStream>) -> F + Send + 'static,
) -> Result<Self>
where
F: Future<Output = Result<()>>,
{
let thread = std::thread::spawn(|| -> Result<_> {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.context("failed to start tokio runtime")?;
let listener = rt.block_on(async move {
let addr = SocketAddr::from(([127, 0, 0, 1], 0));
TcpListener::bind(addr).await.context("failed to bind")
})?;
Ok((rt, listener))
});
let (rt, listener) = thread.join().unwrap()?;
let addr = listener.local_addr().context("failed to get local addr")?;
let worker = std::thread::spawn(move || {
tracing::debug!("dedicated thread to start listening");
rt.block_on(async move {
tracing::debug!("preparing to accept connection");
let (stream, _) = listener.accept().await.map_err(anyhow::Error::from)?;
run(TokioIo::new(stream)).await
})
});
Ok(Self {
worker: Some(worker),
addr,
})
}
pub fn http1() -> Result<Self> {
tracing::debug!("initializing http1 server");
Self::new(|io| async move {
let mut builder = hyper::server::conn::http1::Builder::new();
let http = builder.keep_alive(false).pipeline_flush(true);
tracing::debug!("preparing to bind connection to service");
let conn = http.serve_connection(io, service_fn(test)).await;
tracing::trace!("connection result {:?}", conn);
conn?;
Ok(())
})
}
pub fn http2() -> Result<Self> {
tracing::debug!("initializing http2 server");
Self::new(|io| async move {
let mut builder = hyper::server::conn::http2::Builder::new(TokioExecutor);
let http = builder.max_concurrent_streams(20);
tracing::debug!("preparing to bind connection to service");
let conn = http.serve_connection(io, service_fn(test)).await;
tracing::trace!("connection result {:?}", conn);
if let Err(e) = &conn {
let message = e.to_string();
if message.contains("connection closed before reading preface")
|| message.contains("unspecific protocol error detected")
{
return Ok(());
}
}
conn?;
Ok(())
})
}
pub fn addr(&self) -> String {
format!("localhost:{}", self.addr.port())
}
}
impl Drop for Server {
fn drop(&mut self) {
tracing::debug!("shutting down http1 server");
// Force a connection to happen in case one hasn't happened already.
let _ = TcpStream::connect(&self.addr);
// If the worker fails with an error, report it here but don't panic.
// Some tests don't make a connection so the error will be that the tcp
// stream created above is closed immediately afterwards. Let the test
// independently decide if it failed or not, and this should be in the
// logs to assist with debugging if necessary.
let worker = self.worker.take().unwrap();
if let Err(e) = worker.join().unwrap() {
eprintln!("worker failed with error {e:?}");
}
}
}
#[derive(Clone)]
/// An Executor that uses the tokio runtime.
struct TokioExecutor;
impl<F> hyper::rt::Executor<F> for TokioExecutor
where
F: Future + Send + 'static,
F::Output: Send + 'static,
{
fn execute(&self, fut: F) {
tokio::task::spawn(fut);
}
}
|