1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146
|
//! Example usage with nbdkit:
//! nbdkit -U - memory 100M \
//! --run 'cargo run --example concurrent-read-write -- $unixsocket'
//! Or connect over a URI:
//! nbdkit -U - memory 100M \
//! --run 'cargo run --example concurrent-read-write -- "$uri"'
//!
//! This will read and write randomly over the plugin using multi-conn,
//! multiple threads and multiple requests in flight on each thread.
#![deny(warnings)]
use rand::prelude::*;
use std::env;
use std::sync::Arc;
use tokio::task::JoinSet;
/// Number of simultaneous connections to the NBD server.
///
/// Note that some servers only support a limited number of
/// simultaneous connections, and/or have a configurable thread pool
/// internally, and if you exceed those limits then something will break.
const NR_MULTI_CONN: usize = 8;
/// Number of commands that can be "in flight" at the same time on each
/// connection. (Therefore the total number of requests in flight may
/// be up to NR_MULTI_CONN * MAX_IN_FLIGHT).
const MAX_IN_FLIGHT: usize = 16;
/// The size of large reads and writes, must be > 512.
const BUFFER_SIZE: usize = 1024;
/// Number of commands we issue (per [task][tokio::task]).
const NR_CYCLES: usize = 32;
/// Statistics gathered during the run.
#[derive(Debug, Default)]
struct Stats {
/// The total number of requests made.
requests: usize,
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let args = env::args_os().collect::<Vec<_>>();
if args.len() != 2 {
anyhow::bail!("Usage: {:?} socket", args[0]);
}
// We begin by making a connection to the server to get the export size
// and ensure that it supports multiple connections and is writable.
let nbd = libnbd::Handle::new()?;
// Check if the user provided a URI or a unix socket.
let socket_or_uri = args[1].to_str().unwrap();
if nbd.is_uri(socket_or_uri).unwrap() {
nbd.connect_uri(socket_or_uri)?;
} else {
nbd.connect_unix(socket_or_uri)?;
}
let export_size = nbd.get_size()?;
anyhow::ensure!(
(BUFFER_SIZE as u64) < export_size,
"export is {export_size}B, must be larger than {BUFFER_SIZE}B"
);
anyhow::ensure!(
!nbd.is_read_only()?,
"error: this NBD export is read-only"
);
anyhow::ensure!(
nbd.can_multi_conn()?,
"error: this NBD export does not support multi-conn"
);
drop(nbd); // Close the connection.
// Start the worker tasks, one per connection.
let mut tasks = JoinSet::new();
for i in 0..NR_MULTI_CONN {
tasks.spawn(run_thread(i, socket_or_uri.to_owned(), export_size));
}
// Wait for the tasks to complete.
let mut stats = Stats::default();
while !tasks.is_empty() {
let this_stats = tasks.join_next().await.unwrap().unwrap()?;
stats.requests += this_stats.requests;
}
// Make sure the number of requests that were required matches what
// we expect.
assert_eq!(stats.requests, NR_MULTI_CONN * NR_CYCLES);
Ok(())
}
async fn run_thread(
task_idx: usize,
socket_or_uri: String,
export_size: u64,
) -> anyhow::Result<Stats> {
// Start a new connection to the server.
// We shall spawn many commands concurrently on different tasks and those
// futures must be `'static`, hence we wrap the handle in an [Arc].
let nbd = Arc::new(libnbd::AsyncHandle::new()?);
// Check if the user provided a URI or a unix socket.
if nbd.is_uri(socket_or_uri.clone()).unwrap() {
nbd.connect_uri(socket_or_uri).await?;
} else {
nbd.connect_unix(socket_or_uri).await?;
}
let mut rng = SmallRng::seed_from_u64(task_idx as u64);
// Issue commands.
let mut stats = Stats::default();
let mut join_set = JoinSet::new();
while stats.requests < NR_CYCLES || !join_set.is_empty() {
while stats.requests < NR_CYCLES && join_set.len() < MAX_IN_FLIGHT {
// If we want to issue another request, do so. Note that we reuse
// the same buffer for multiple in-flight requests. It doesn't
// matter here because we're just trying to write random stuff,
// but that would be Very Bad in a real application.
// Simulate a mix of large and small requests.
let size = if rng.gen() { BUFFER_SIZE } else { 512 };
let offset = rng.gen_range(0..export_size - size as u64);
let mut buf = [0u8; BUFFER_SIZE];
let nbd = nbd.clone();
if rng.gen() {
join_set.spawn(async move {
nbd.pread(&mut buf, offset, None).await
});
} else {
// Fill the buf with random data.
rng.fill(&mut buf);
join_set
.spawn(async move { nbd.pwrite(&buf, offset, None).await });
}
stats.requests += 1;
}
join_set.join_next().await.unwrap().unwrap()?;
}
Ok(stats)
}
|