1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115
|
use futures::StreamExt;
use rand::Rng;
use std::env;
use std::time::Duration;
use tokio::task::spawn_blocking;
use tracing::instrument;
use tracing_durations_export::DurationsLayerBuilder;
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::util::SubscriberInitExt;
#[instrument]
async fn make_network_request(api: &str, id: usize) -> String {
let millis = rand::thread_rng().gen_range(5..10);
tokio::time::sleep(Duration::from_millis(millis)).await;
format!("{api} {id}")
}
#[instrument]
async fn read_cache(id: usize) -> Option<String> {
let millis = rand::thread_rng().gen_range(1..3);
tokio::time::sleep(Duration::from_millis(millis)).await;
// There's a 50% change there's a cache entry
if rand::thread_rng().gen_bool(0.5) {
Some(format!("cached({id})"))
} else {
None
}
}
/// cpu intensive, blocking method
#[instrument(skip_all)]
fn parse_cache(data: &str) -> String {
let millis = rand::thread_rng().gen_range(2..6);
std::thread::sleep(Duration::from_millis(millis));
format!("from_cache({data})")
}
/// cpu intensive, blocking method
#[instrument(skip_all)]
fn parse_network(data: &str) -> String {
let millis = rand::thread_rng().gen_range(3..8);
std::thread::sleep(Duration::from_millis(millis));
format!("from_network({data})")
}
#[instrument]
async fn cached_network_request(api: &str, id: usize) -> String {
if let Some(cached) = read_cache(id).await {
spawn_blocking(move || parse_cache(&cached))
.await
.expect("executor died")
} else {
let response = make_network_request(api, id).await;
spawn_blocking(move || parse_network(&response))
.await
.expect("executor died")
}
}
#[tokio::main]
async fn main() {
let (duration_layer, _guard) = if let Ok(location) = env::var("TRACING_DURATION_EXPORT") {
let (layer, guard) = DurationsLayerBuilder::default()
.durations_file(location)
.build()
.expect("Couldn't create TRACING_DURATION_FILE");
(Some(layer), Some(guard))
} else {
(None, None)
};
tracing_subscriber::registry().with(duration_layer).init();
// Sequential
futures::stream::iter(0..4)
.then(|id| make_network_request("https://example.org/uncached", id))
.then(|data| async {
spawn_blocking(move || parse_network(&data))
.await
.expect("the executor is broken")
})
.collect::<Vec<String>>()
.await;
// Spacer
tokio::time::sleep(Duration::from_millis(5)).await;
// Parallel
futures::stream::iter(0..4)
.map(|id| async move {
let data = make_network_request("https://example.org/uncached", id).await;
spawn_blocking(move || parse_network(&data))
.await
.expect("the executor is broken")
})
.buffer_unordered(4)
.collect::<Vec<String>>()
.await;
tokio::time::sleep(Duration::from_millis(5)).await;
// Sequential
futures::stream::iter(0..4)
.then(|id| cached_network_request("https://example.net/cached", id))
.collect::<Vec<String>>()
.await;
tokio::time::sleep(Duration::from_millis(5)).await;
// Parallel
futures::stream::iter(0..4)
.map(|id| cached_network_request("https://example.net/cached", id))
.buffer_unordered(3)
.collect::<Vec<String>>()
.await;
}
|