File: hyper_server.rs

package info (click to toggle)
rust-soketto 0.8.1-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 264 kB
  • sloc: makefile: 2; sh: 1
file content (158 lines) | stat: -rw-r--r-- 5,270 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
// Copyright (c) 2021 Parity Technologies (UK) Ltd.
//
// Licensed under the Apache License, Version 2.0
// <LICENSE-APACHE or http://www.apache.org/licenses/LICENSE-2.0> or the MIT
// license <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. All files in the project carrying such notice may not be copied,
// modified, or distributed except according to those terms.

// An example of how to use of Soketto alongside Hyper, so that we can handle
// standard HTTP traffic with Hyper, and WebSocket connections with Soketto, on
// the same port.
//
// To try this, start up the example (`cargo run --example hyper_server`) and then
// navigate to localhost:3000 and, in the browser JS console, run:
//
// ```
// var socket = new WebSocket("ws://localhost:3000");
// socket.onmessage = function(msg) { console.log(msg) };
// socket.send("Hello!");
// ```
//
// You'll see any messages you send echoed back.

use std::net::SocketAddr;

use futures::io::{BufReader, BufWriter};
use hyper::server::conn::http1;
use hyper::{body::Bytes, service::service_fn, Request, Response};
use hyper_util::rt::TokioIo;
use soketto::{
	handshake::http::{is_upgrade_request, Server},
	BoxedError,
};
use tokio_util::compat::TokioAsyncReadCompatExt;

type FullBody = http_body_util::Full<Bytes>;

/// Start up a hyper server.
#[tokio::main]
async fn main() -> Result<(), BoxedError> {
	env_logger::init();

	let addr: SocketAddr = ([127, 0, 0, 1], 3000).into();
	let listener = tokio::net::TcpListener::bind(addr).await?;

	log::info!(
		"Listening on http://{:?} — connect and I'll echo back anything you send!",
		listener.local_addr().unwrap()
	);

	loop {
		let stream = match listener.accept().await {
			Ok((stream, addr)) => {
				log::info!("Accepting new connection: {addr}");
				stream
			}
			Err(e) => {
				log::error!("Accepting new connection failed: {e}");
				continue;
			}
		};

		tokio::spawn(async {
			let io = TokioIo::new(stream);
			let conn = http1::Builder::new().serve_connection(io, service_fn(handler));

			// Enable upgrades on the connection for the websocket upgrades to work.
			let conn = conn.with_upgrades();

			// Log any errors that might have occurred during the connection.
			if let Err(err) = conn.await {
				log::error!("HTTP connection failed {err}");
			}
		});
	}
}

/// Handle incoming HTTP Requests.
async fn handler(req: Request<hyper::body::Incoming>) -> Result<hyper::Response<FullBody>, BoxedError> {
	if is_upgrade_request(&req) {
		// Create a new handshake server.
		let mut server = Server::new();

		// Add any extensions that we want to use.
		#[cfg(feature = "deflate")]
		{
			let deflate = soketto::extension::deflate::Deflate::new(soketto::Mode::Server);
			server.add_extension(Box::new(deflate));
		}

		// Attempt the handshake.
		match server.receive_request(&req) {
			// The handshake has been successful so far; return the response we're given back
			// and spawn a task to handle the long-running WebSocket server:
			Ok(response) => {
				tokio::spawn(async move {
					if let Err(e) = websocket_echo_messages(server, req).await {
						log::error!("Error upgrading to websocket connection: {}", e);
					}
				});
				Ok(response.map(|()| FullBody::default()))
			}
			// We tried to upgrade and failed early on; tell the client about the failure however we like:
			Err(e) => {
				log::error!("Could not upgrade connection: {}", e);
				Ok(Response::new(FullBody::from("Something went wrong upgrading!")))
			}
		}
	} else {
		// The request wasn't an upgrade request; let's treat it as a standard HTTP request:
		Ok(Response::new(FullBody::from("Hello HTTP!")))
	}
}

/// Echo any messages we get from the client back to them
async fn websocket_echo_messages(server: Server, req: Request<hyper::body::Incoming>) -> Result<(), BoxedError> {
	// The negotiation to upgrade to a WebSocket connection has been successful so far. Next, we get back the underlying
	// stream using `hyper::upgrade::on`, and hand this to a Soketto server to use to handle the WebSocket communication
	// on this socket.
	//
	// Note: awaiting this won't succeed until the handshake response has been returned to the client, so this must be
	// spawned on a separate task so as not to block that response being handed back.
	let stream = hyper::upgrade::on(req).await?;
	let io = TokioIo::new(stream);
	let stream = BufReader::new(BufWriter::new(io.compat()));

	// Get back a reader and writer that we can use to send and receive websocket messages.
	let (mut sender, mut receiver) = server.into_builder(stream).finish();

	// Echo any received messages back to the client:
	let mut message = Vec::new();
	loop {
		message.clear();
		match receiver.receive_data(&mut message).await {
			Ok(soketto::Data::Binary(n)) => {
				assert_eq!(n, message.len());
				sender.send_binary_mut(&mut message).await?;
				sender.flush().await?
			}
			Ok(soketto::Data::Text(n)) => {
				assert_eq!(n, message.len());
				if let Ok(txt) = std::str::from_utf8(&message) {
					sender.send_text(txt).await?;
					sender.flush().await?
				} else {
					break;
				}
			}
			Err(soketto::connection::Error::Closed) => break,
			Err(e) => {
				eprintln!("Websocket connection error: {}", e);
				break;
			}
		}
	}

	Ok(())
}