1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158
|
// Copyright (c) 2021 Parity Technologies (UK) Ltd.
//
// Licensed under the Apache License, Version 2.0
// <LICENSE-APACHE or http://www.apache.org/licenses/LICENSE-2.0> or the MIT
// license <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. All files in the project carrying such notice may not be copied,
// modified, or distributed except according to those terms.
// An example of how to use of Soketto alongside Hyper, so that we can handle
// standard HTTP traffic with Hyper, and WebSocket connections with Soketto, on
// the same port.
//
// To try this, start up the example (`cargo run --example hyper_server`) and then
// navigate to localhost:3000 and, in the browser JS console, run:
//
// ```
// var socket = new WebSocket("ws://localhost:3000");
// socket.onmessage = function(msg) { console.log(msg) };
// socket.send("Hello!");
// ```
//
// You'll see any messages you send echoed back.
use std::net::SocketAddr;
use futures::io::{BufReader, BufWriter};
use hyper::server::conn::http1;
use hyper::{body::Bytes, service::service_fn, Request, Response};
use hyper_util::rt::TokioIo;
use soketto::{
handshake::http::{is_upgrade_request, Server},
BoxedError,
};
use tokio_util::compat::TokioAsyncReadCompatExt;
type FullBody = http_body_util::Full<Bytes>;
/// Start up a hyper server.
#[tokio::main]
async fn main() -> Result<(), BoxedError> {
env_logger::init();
let addr: SocketAddr = ([127, 0, 0, 1], 3000).into();
let listener = tokio::net::TcpListener::bind(addr).await?;
log::info!(
"Listening on http://{:?} — connect and I'll echo back anything you send!",
listener.local_addr().unwrap()
);
loop {
let stream = match listener.accept().await {
Ok((stream, addr)) => {
log::info!("Accepting new connection: {addr}");
stream
}
Err(e) => {
log::error!("Accepting new connection failed: {e}");
continue;
}
};
tokio::spawn(async {
let io = TokioIo::new(stream);
let conn = http1::Builder::new().serve_connection(io, service_fn(handler));
// Enable upgrades on the connection for the websocket upgrades to work.
let conn = conn.with_upgrades();
// Log any errors that might have occurred during the connection.
if let Err(err) = conn.await {
log::error!("HTTP connection failed {err}");
}
});
}
}
/// Handle incoming HTTP Requests.
async fn handler(req: Request<hyper::body::Incoming>) -> Result<hyper::Response<FullBody>, BoxedError> {
if is_upgrade_request(&req) {
// Create a new handshake server.
let mut server = Server::new();
// Add any extensions that we want to use.
#[cfg(feature = "deflate")]
{
let deflate = soketto::extension::deflate::Deflate::new(soketto::Mode::Server);
server.add_extension(Box::new(deflate));
}
// Attempt the handshake.
match server.receive_request(&req) {
// The handshake has been successful so far; return the response we're given back
// and spawn a task to handle the long-running WebSocket server:
Ok(response) => {
tokio::spawn(async move {
if let Err(e) = websocket_echo_messages(server, req).await {
log::error!("Error upgrading to websocket connection: {}", e);
}
});
Ok(response.map(|()| FullBody::default()))
}
// We tried to upgrade and failed early on; tell the client about the failure however we like:
Err(e) => {
log::error!("Could not upgrade connection: {}", e);
Ok(Response::new(FullBody::from("Something went wrong upgrading!")))
}
}
} else {
// The request wasn't an upgrade request; let's treat it as a standard HTTP request:
Ok(Response::new(FullBody::from("Hello HTTP!")))
}
}
/// Echo any messages we get from the client back to them
async fn websocket_echo_messages(server: Server, req: Request<hyper::body::Incoming>) -> Result<(), BoxedError> {
// The negotiation to upgrade to a WebSocket connection has been successful so far. Next, we get back the underlying
// stream using `hyper::upgrade::on`, and hand this to a Soketto server to use to handle the WebSocket communication
// on this socket.
//
// Note: awaiting this won't succeed until the handshake response has been returned to the client, so this must be
// spawned on a separate task so as not to block that response being handed back.
let stream = hyper::upgrade::on(req).await?;
let io = TokioIo::new(stream);
let stream = BufReader::new(BufWriter::new(io.compat()));
// Get back a reader and writer that we can use to send and receive websocket messages.
let (mut sender, mut receiver) = server.into_builder(stream).finish();
// Echo any received messages back to the client:
let mut message = Vec::new();
loop {
message.clear();
match receiver.receive_data(&mut message).await {
Ok(soketto::Data::Binary(n)) => {
assert_eq!(n, message.len());
sender.send_binary_mut(&mut message).await?;
sender.flush().await?
}
Ok(soketto::Data::Text(n)) => {
assert_eq!(n, message.len());
if let Ok(txt) = std::str::from_utf8(&message) {
sender.send_text(txt).await?;
sender.flush().await?
} else {
break;
}
}
Err(soketto::connection::Error::Closed) => break,
Err(e) => {
eprintln!("Websocket connection error: {}", e);
break;
}
}
}
Ok(())
}
|