1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109
|
use http_body_util::{combinators::BoxBody, BodyExt, Full};
use hyper::body::Bytes;
use hyper::server::conn::http1;
use hyper::service::service_fn;
use hyper::{Request, Response};
use hyper_util::{client::legacy::Client, rt::TokioIo};
use std::{net::SocketAddr, time::Duration};
use tokio::io;
use tokio::net::TcpListener;
use tokio::sync::oneshot;
use tokio::task;
use hyper_timeout::TimeoutConnector;
async fn spawn_test_server(listener: TcpListener, shutdown_rx: oneshot::Receiver<()>) {
let http = http1::Builder::new();
let graceful = hyper_util::server::graceful::GracefulShutdown::new();
let mut signal = std::pin::pin!(shutdown_rx);
loop {
tokio::select! {
Ok((stream, _addr)) = listener.accept() => {
let io = TokioIo::new(stream);
let conn = http.serve_connection(io, service_fn(handle_request));
// watch this connection
let fut = graceful.watch(conn);
tokio::spawn(async move {
if let Err(e) = fut.await {
eprintln!("Error serving connection: {:?}", e);
}
});
},
_ = &mut signal => {
eprintln!("graceful shutdown signal received");
break;
}
}
}
tokio::select! {
_ = graceful.shutdown() => {
eprintln!("all connections gracefully closed");
},
_ = tokio::time::sleep(std::time::Duration::from_millis(100)) => {
eprintln!("timed out wait for all connections to close");
}
}
}
async fn handle_request(
req: Request<hyper::body::Incoming>,
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> {
let body = req.collect().await.expect("Failed to read body").to_bytes();
assert!(!body.is_empty(), "empty body");
Ok(Response::new(full("finished")))
}
fn full<T: Into<Bytes>>(chunk: T) -> BoxBody<Bytes, hyper::Error> {
Full::new(chunk.into())
.map_err(|never| match never {})
.boxed()
}
#[tokio::test]
async fn test_upload_timeout() {
let addr = SocketAddr::from(([127, 0, 0, 1], 0));
let listener = TcpListener::bind(addr)
.await
.expect("Failed to bind listener");
let (shutdown_tx, shutdown_rx) = oneshot::channel();
let server_addr = listener.local_addr().unwrap();
let server_handle = task::spawn(spawn_test_server(listener, shutdown_rx));
let h = hyper_util::client::legacy::connect::HttpConnector::new();
let mut connector = TimeoutConnector::new(h);
connector.set_read_timeout(Some(Duration::from_millis(500)));
// comment this out and the test will fail
connector.set_reset_reader_on_write(true);
let client = Client::builder(hyper_util::rt::TokioExecutor::new()).build(connector);
let body = vec![0; 10 * 1024 * 1024]; // 10MB
let req = Request::post(format!("http://{}/", server_addr))
.body(full(body))
.expect("request builder");
let mut res = client.request(req).await.expect("request failed");
let mut resp_body = Vec::new();
while let Some(frame) = res.body_mut().frame().await {
let bytes = frame
.expect("frame error")
.into_data()
.map_err(|_| io::Error::new(io::ErrorKind::Other, "Error when consuming frame"))
.expect("data error");
resp_body.extend_from_slice(&bytes);
}
assert_eq!(res.status(), 200);
assert_eq!(resp_body, b"finished");
let _ = shutdown_tx.send(());
let _ = server_handle.await;
}
|