Description: Update from http 0.2 stack to http 1 stack
Author: Peter Michael Green <plugwash@debian.org>


Index: dockworker/Cargo.toml
===================================================================
--- dockworker.orig/Cargo.toml
+++ dockworker/Cargo.toml
@@ -51,24 +51,34 @@ features = ["std"]
 default-features = false
 
 [dependencies.http]
-version = "0.2"
+version = "1"
 
 [dependencies.hyper]
-version = "0.14"
+version = "1"
 features = [
     "client",
     "http1",
-    "stream",
-    "tcp",
+#    "stream",
+#    "tcp",
 ]
 
+[dependencies.hyper-util]
+version = "0.1"
+features = ["client","client-legacy","http1"]
+
+[dependencies.http-body]
+version = "1"
+
+[dependencies.http-body-util]
+version = "0.1"
+
 [dependencies.hyper-rustls]
-version = "0.24"
+version = "0.27"
 features = ["http2"]
 optional = true
 
 [dependencies.hyper-tls]
-version = "0.5"
+version = "0.6"
 optional = true
 
 [dependencies.log]
@@ -87,7 +97,7 @@ version = "0.10"
 optional = true
 
 [dependencies.rustls]
-version = "0.21"
+version = "0.23"
 optional = true
 
 [dependencies.rustls-pemfile]
@@ -136,7 +146,7 @@ version = "2"
 version = "0.8"
 
 [dev-dependencies.reqwest]
-version = "0.11.24"
+version = "0.12"
 
 [features]
 default = []
@@ -153,7 +163,7 @@ ssl-rustls = [
 ]
 
 [target."cfg(unix)".dependencies.hyperlocal]
-version = "0.8"
+version = "0.9"
 
 [badges.appveyor]
 branch = "master"
Index: dockworker/src/docker.rs
===================================================================
--- dockworker.orig/src/docker.rs
+++ dockworker/src/docker.rs
@@ -29,14 +29,29 @@ use serde::de::DeserializeOwned;
 use std::env;
 use std::path::{Path, PathBuf};
 use std::time::Duration;
-
-async fn into_aframe_stream(
-    body: hyper::Body,
-) -> Result<BoxStream<'static, Result<AttachResponseFrame, DwError>>, DwError> {
+use http_body_util::Full;
+use http_body_util::BodyExt;
+use hyper::body::Body;
+use core::ops::Deref;
+use std::error::Error as StdError;
+
+async fn into_aframe_stream<B>(
+    body: B
+) -> Result<BoxStream<'static, Result<AttachResponseFrame, DwError>>, DwError>
+where
+    B: Body + 'static,
+    B: Send,
+    B: Unpin,
+    <B as Body>::Error: StdError,
+    <B as Body>::Error: Send,
+    <B as Body>::Error: Sync,
+    <B as Body>::Error: 'static,
+    <B as Body>::Data: Send,
+{
     use futures::stream::StreamExt;
     use futures::stream::TryStreamExt;
     let mut aread = tokio_util::io::StreamReader::new(
-        body.map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err)),
+        body.map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err)).into_data_stream(),
     );
     let mut buf = [0u8; 8];
     let src = async_stream::stream! {
@@ -84,29 +99,55 @@ async fn into_aframe_stream(
     Ok(src.boxed())
 }
 
-async fn into_docker_error(body: hyper::Body) -> Result<DockerError, DwError> {
-    let body = hyper::body::to_bytes(body).await?;
+async fn into_docker_error<B>(body: B) -> Result<DockerError, DwError>
+where
+    B: Body,
+    B: Send,
+    crate::errors::Error: From<<B as Body>::Error>,
+    <B as Body>::Error: Send,
+    <B as Body>::Error: Sync,
+    <B as Body>::Error: 'static,
+    <B as Body>::Data: Send,
+{
+    //let body = hyper::body::to_bytes(body).await?;
+    let body = body.collect().await?.to_bytes();
     let err = serde_json::from_slice::<DockerError>(body.as_ref())?;
     Ok(err)
 }
 
-fn into_lines(body: hyper::Body) -> Result<BoxStream<'static, Result<String, DwError>>, DwError> {
+fn into_lines<B>(body: B) -> Result<BoxStream<'static, Result<String, DwError>>, DwError> 
+where
+    B: Body + 'static,
+    B: Send,
+    <B as Body>::Error: StdError, 
+    <B as Body>::Error: Send,
+    <B as Body>::Error: Sync, 
+    <B as Body>::Error: 'static,
+    <B as Body>::Data: Send,
+{
     use futures::stream::StreamExt;
     use futures::stream::TryStreamExt;
     use tokio::io::AsyncBufReadExt;
     let aread = tokio_util::io::StreamReader::new(
-        body.map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err)),
+        body.map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err)).into_data_stream(),
     );
     let stream = tokio_stream::wrappers::LinesStream::new(aread.lines());
     let stream = stream.map_err(Into::into).boxed();
     Ok(stream)
 }
 
-pub fn into_jsonlines<T>(
-    body: hyper::Body,
+pub fn into_jsonlines<T,B>(
+    body: B,
 ) -> Result<BoxStream<'static, Result<T, DwError>>, DwError>
 where
     T: DeserializeOwned,
+    B: Body + 'static,
+    B: Send,
+    <B as Body>::Error: StdError, 
+    <B as Body>::Error: Send,
+    <B as Body>::Error: Sync, 
+    <B as Body>::Error: 'static,
+    <B as Body>::Data: Send,
 {
     use futures::StreamExt;
     let o = into_lines(body)?;
@@ -840,7 +881,7 @@ impl Docker {
         if res.status().is_success() {
             use futures::stream::StreamExt;
             use futures::stream::TryStreamExt;
-            Ok(res.into_body().map_err(DwError::from).boxed())
+            Ok(res.into_body().map_err(DwError::from).into_data_stream().boxed())
         } else {
             Err(into_docker_error(res.into_body()).await?.into())
         }
@@ -1133,7 +1174,7 @@ impl Docker {
         if res.status().is_success() {
             use futures::stream::StreamExt;
             use futures::stream::TryStreamExt;
-            Ok(res.into_body().map_err(Into::into).boxed())
+            Ok(res.into_body().map_err(Into::into).into_data_stream().boxed())
         } else {
             Err(into_docker_error(res.into_body()).await?.into())
         }
@@ -1280,7 +1321,7 @@ impl Docker {
         if res.status().is_success() {
             use futures::stream::StreamExt;
             use futures::stream::TryStreamExt;
-            Ok(res.into_body().map_err(Into::into).boxed())
+            Ok(res.into_body().map_err(Into::into).into_data_stream().boxed())
         } else {
             Err(into_docker_error(res.into_body()).await?.into())
         }
@@ -1293,7 +1334,7 @@ impl Docker {
     pub async fn ping(&self) -> Result<(), DwError> {
         let res = self.http_client().get(self.headers(), "/_ping").await?;
         if res.status().is_success() {
-            let buf = String::from_utf8(res.into_body().to_vec()).unwrap();
+            let buf = String::from_utf8(res.into_body()).unwrap();
             assert_eq!(&buf, "OK");
             Ok(())
         } else {
Index: dockworker/src/errors.rs
===================================================================
--- dockworker.orig/src/errors.rs
+++ dockworker/src/errors.rs
@@ -18,6 +19,8 @@ pub enum Error {
     Envvar(#[from] env::VarError),
     #[error("hyper error")]
     Hyper(#[from] hyper::Error),
+    #[error("hyper util error")]
+    HyperUtil(#[from] hyper_util::client::legacy::Error),
     #[error("json error")]
     Json(#[from] serde_json::Error),
     #[error("docker error")]
Index: dockworker/src/http_client.rs
===================================================================
--- dockworker.orig/src/http_client.rs
+++ dockworker/src/http_client.rs
@@ -1,10 +1,14 @@
 use http::{HeaderMap, Response};
 use std::path::Path;
+use http_body::Body;
+use http_body_util::Full;
+use bytes::Bytes;
 
 /// A http client
 #[async_trait::async_trait]
 pub trait HttpClient {
     type Err: Send + 'static;
+    type Incoming: Body;
 
     async fn get(&self, headers: &HeaderMap, path: &str) -> Result<Response<Vec<u8>>, Self::Err>;
 
@@ -12,7 +16,7 @@ pub trait HttpClient {
         &self,
         headers: &HeaderMap,
         path: &str,
-    ) -> Result<Response<hyper::Body>, Self::Err>;
+    ) -> Result<Response<Self::Incoming>, Self::Err>;
 
     async fn head(&self, headers: &HeaderMap, path: &str) -> Result<HeaderMap, Self::Err>;
 
@@ -28,7 +32,7 @@ pub trait HttpClient {
         headers: &HeaderMap,
         path: &str,
         body: &str,
-    ) -> Result<Response<hyper::Body>, Self::Err>;
+    ) -> Result<Response<Self::Incoming>, Self::Err>;
 
     async fn post_file(
         &self,
@@ -42,7 +46,7 @@ pub trait HttpClient {
         headers: &HeaderMap,
         path: &str,
         file: &Path,
-    ) -> Result<Response<hyper::Body>, Self::Err>;
+    ) -> Result<Response<Self::Incoming>, Self::Err>;
 
     async fn delete(&self, headers: &HeaderMap, path: &str)
         -> Result<Response<Vec<u8>>, Self::Err>;
Index: dockworker/src/hyper_client.rs
===================================================================
--- dockworker.orig/src/hyper_client.rs
+++ dockworker/src/hyper_client.rs
@@ -4,21 +4,26 @@ use http::{HeaderMap, Request, Response}
 use hyper::Uri;
 use std::path::Path;
 use std::str::FromStr;
+use http_body_util::Full;
+use bytes::Bytes;
+use hyper_util::rt::TokioExecutor;
+use hyper::body::Incoming;
+use http_body_util::BodyExt;
 
 #[allow(clippy::enum_variant_names)]
 #[derive(Clone, Debug)]
 enum Client {
-    HttpClient(hyper::Client<hyper::client::HttpConnector>),
+    HttpClient(hyper_util::client::legacy::Client<hyper_util::client::legacy::connect::HttpConnector,Full<Bytes>>),
     #[cfg(all(feature = "openssl", feature = "hyper-tls"))]
-    HttpsClient(hyper::Client<hyper_tls::HttpsConnector<hyper::client::HttpConnector>>),
+    HttpsClient(hyper_util::client::legacy::Client<hyper_tls::HttpsConnector<hyper_util::client::legacy::connect::HttpConnector>,Full<Bytes>>),
     #[cfg(all(feature = "rustls", feature = "hyper-rustls"))]
-    HttpsClient(hyper::Client<hyper_rustls::HttpsConnector<hyper::client::HttpConnector>>),
+    HttpsClient(hyper_util::client::legacy::Client<hyper_rustls::HttpsConnector<hyper_util::client::legacy::connect::HttpConnector>,Full<Bytes>>),
     #[cfg(unix)]
-    UnixClient(hyper::Client<hyperlocal::UnixConnector>),
+    UnixClient(hyper_util::client::legacy::Client<hyperlocal::UnixConnector,Full<Bytes>>),
 }
 
 impl Client {
-    fn request(&self, req: Request<hyper::Body>) -> hyper::client::ResponseFuture {
+    fn request(&self, req: Request<Full<Bytes>>) -> hyper_util::client::legacy::ResponseFuture {
         match self {
             Client::HttpClient(http_client) => http_client.request(req),
             #[cfg(all(feature = "openssl", feature = "hyper-tls"))]
@@ -60,18 +65,18 @@ fn request_builder(
     request
 }
 
-async fn request_with_redirect<T: Into<hyper::Body> + Sync + Send + 'static + Clone>(
+async fn request_with_redirect<T: Into<Full<Bytes>> + Sync + Send + 'static + Clone>(
     client: Client,
     method: http::Method,
     uri: Uri,
     headers: HeaderMap,
     body: Option<T>,
-) -> Result<http::Response<hyper::Body>, DwError> {
+) -> Result<http::Response<Incoming>, DwError> {
     let request =
         request_builder(&method, &uri, &headers).body(if let Some(body) = body.clone() {
             body.into()
         } else {
-            hyper::Body::empty()
+            Full::new(Bytes::new())
         })?;
     let mut future = client.request(request);
     let mut max_redirects = 10;
@@ -107,11 +112,11 @@ async fn request_with_redirect<T: Into<h
                 request = request.uri(location.clone());
 
                 future = client.request(if see_other {
-                    request.body(hyper::Body::empty()).unwrap()
+                    request.body(Full::new(Bytes::new())).unwrap()
                 } else if let Some(body) = body.clone() {
                     request.body(body.into()).unwrap()
                 } else {
-                    request.body(hyper::Body::empty()).unwrap()
+                    request.body(Full::new(Bytes::new())).unwrap()
                 });
 
                 max_redirects -= 1;
@@ -120,9 +125,10 @@ async fn request_with_redirect<T: Into<h
     }
 }
 
-async fn fetch_body(resp: http::Response<hyper::Body>) -> Result<http::Response<Vec<u8>>, DwError> {
+async fn fetch_body(resp: http::Response<Incoming>) -> Result<http::Response<Vec<u8>>, DwError> {
     let (p, b) = resp.into_parts();
-    let b = hyper::body::to_bytes(b).await?.to_vec();
+    //let b = hyper::body::to_bytes(b).await?.to_vec();
+    let b = b.collect().await?.to_bytes().to_vec();
     Ok(Response::from_parts(p, b))
 }
 
@@ -137,7 +143,7 @@ impl HyperClient {
         let url = hyperlocal::Uri::new(path, "").into();
         // Prevent from using connection pooling.
         // See https://github.com/hyperium/hyper/issues/2312.
-        let client: hyper::Client<_> = hyper::Client::builder()
+        let client: hyper_util::client::legacy::Client<_,_> = hyper_util::client::legacy::Client::builder(TokioExecutor::new())
             .pool_idle_timeout(std::time::Duration::from_millis(0))
             .pool_max_idle_per_host(0)
             .build(hyperlocal::UnixConnector);
@@ -171,10 +177,10 @@ impl HyperClient {
             var: addr_https,
             source: err,
         })?;
-        let mut http = hyper::client::HttpConnector::new();
+        let mut http = hyper_util::client::legacy::connect::HttpConnector::new();
         http.enforce_http(false);
         let https = hyper_tls::HttpsConnector::from((http, builder.build()?.into()));
-        let client = hyper::Client::builder().build::<_, hyper::Body>(https);
+        let client = hyper_util::client::legacy::Client::builder(TokioExecutor::new()).build::<_, Full<Bytes>>(https);
         Ok(Self::new(Client::HttpsClient(client), url))
     }
 
@@ -186,7 +192,7 @@ impl HyperClient {
         ca: &Path,
     ) -> Result<Self, DwError> {
         use log::warn;
-        use rustls::{Certificate, PrivateKey};
+        //use rustls::{Certificate, PrivateKey};
         use rustls_pemfile::Item;
         use std::fs::File;
         use std::io::BufReader;
@@ -203,40 +209,36 @@ impl HyperClient {
 
         let private_key = match rustls_pemfile::rsa_private_keys(&mut key_buf).collect::<Result<Vec<_>,_>>()? {
             keys if keys.is_empty() => return Err(rustls::Error::NoCertificatesPresented.into()),
-            mut keys if keys.len() == 1 => PrivateKey(keys.remove(0).secret_pkcs1_der().to_vec()),
+            mut keys if keys.len() == 1 => keys.remove(0),
             mut keys => {
                 // if keys.len() > 1
                 warn!("Private key file contains multiple keys. Using only first one.");
-                PrivateKey(keys.remove(0).secret_pkcs1_der().to_vec())
+                keys.remove(0)
             }
         };
         let certs = rustls_pemfile::read_all(&mut cert_buf)
             .into_iter()
             .filter_map(|item| match item {
-                Ok(Item::X509Certificate(c)) => Some(Ok(Certificate(c.to_vec()))),
+                Ok(Item::X509Certificate(c)) => Some(Ok(c)),
                 Err(e) => Some(Err(e)),
                 _ => None,
             })
             .collect::<Result<_,_>>()?;
         let mut root_certs = rustls::RootCertStore::empty();
         for c in rustls_pemfile::certs(&mut ca_buf) {
-            root_certs.add(&Certificate(c?.to_vec()))?;
+            root_certs.add(c?)?;
         }
 
         let config = rustls::ClientConfig::builder()
-            .with_safe_default_cipher_suites()
-            .with_safe_default_kx_groups()
-            .with_safe_default_protocol_versions()
-            .unwrap()
             .with_root_certificates(root_certs)
-            .with_single_cert(certs, private_key)
+            .with_client_auth_cert(certs, rustls::pki_types::PrivateKeyDer::Pkcs1(private_key))
             .expect("bad certificate/key");
         let https = hyper_rustls::HttpsConnectorBuilder::new()
             .with_tls_config(config)
             .https_or_http()
             .enable_all_versions()
             .build();
-        let client = hyper::Client::builder().build::<_, hyper::Body>(https);
+        let client = hyper_util::client::legacy::Client::builder(TokioExecutor::new()).build::<_, Full<Bytes>>(https);
         Ok(Self::new(Client::HttpsClient(client), url))
     }
 
@@ -247,13 +249,14 @@ impl HyperClient {
             var: addr_https,
             source: err,
         })?;
-        Ok(Self::new(Client::HttpClient(hyper::Client::new()), url))
+        Ok(Self::new(Client::HttpClient(hyper_util::client::legacy::Client::builder(TokioExecutor::new()).build_http()), url))
     }
 }
 
 #[async_trait::async_trait]
 impl HttpClient for HyperClient {
     type Err = DwError;
+    type Incoming = Incoming;
 
     async fn get(&self, headers: &HeaderMap, path: &str) -> Result<Response<Vec<u8>>, Self::Err> {
         let url = join_uri(&self.base, path)?;
@@ -273,7 +276,7 @@ impl HttpClient for HyperClient {
         &self,
         headers: &HeaderMap,
         path: &str,
-    ) -> Result<Response<hyper::Body>, Self::Err> {
+    ) -> Result<Response<Incoming>, Self::Err> {
         let url = join_uri(&self.base, path)?;
 
         let res = request_with_redirect::<Vec<u8>>(
@@ -327,7 +330,7 @@ impl HttpClient for HyperClient {
         headers: &HeaderMap,
         path: &str,
         body: &str,
-    ) -> Result<Response<hyper::Body>, Self::Err> {
+    ) -> Result<Response<Incoming>, Self::Err> {
         let url = join_uri(&self.base, path)?;
 
         let res = request_with_redirect(
@@ -371,7 +374,7 @@ impl HttpClient for HyperClient {
         headers: &HeaderMap,
         path: &str,
         file: &Path,
-    ) -> Result<Response<hyper::Body>, Self::Err> {
+    ) -> Result<Response<Incoming>, Self::Err> {
         let mut content = tokio::fs::File::open(file).await?;
         let url = join_uri(&self.base, path)?;
 
Index: dockworker/src/test.rs
===================================================================
--- dockworker.orig/src/test.rs
+++ dockworker/src/test.rs
@@ -9,6 +9,7 @@ use crate::process::Top;
 use crate::stats::Stats;
 use crate::system::SystemInfo;
 use crate::version::Version;
+use hyper::body::Incoming;
 
 #[test]
 fn get_containers() {
@@ -32,7 +33,7 @@ fn get_stats_suspended() {
 #[tokio::test]
 async fn get_stats_streaming() {
     let res = get_stats_response();
-    let src = crate::docker::into_jsonlines::<Stats>(res.into_body()).unwrap();
+    let src = crate::docker::into_jsonlines::<Stats,_>(res.into_body()).unwrap();
     use futures::stream::StreamExt;
     let stats = src
         .collect::<Vec<_>>()
@@ -150,11 +151,11 @@ fn get_version_response() -> &'static st
     include_str!("fixtures/version.json")
 }
 
-fn get_stats_response() -> http::Response<hyper::Body> {
+fn get_stats_response() -> http::Response<String> {
     let response = http::Response::builder()
         .status(http::StatusCode::OK)
         .header("Transfer-Encoding", "chunked")
         .header("Connection", "Close");
     let body = include_str!("fixtures/stats_stream.json").to_string();
-    response.body(hyper::Body::from(body)).unwrap()
+    response.body(body).unwrap()
 }
