File: cached_network.rs

package info (click to toggle)
rust-tracing-durations-export 0.3.0-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 400 kB
  • sloc: python: 260; makefile: 2
file content (115 lines) | stat: -rw-r--r-- 3,529 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
use futures::StreamExt;
use rand::Rng;
use std::env;
use std::time::Duration;
use tokio::task::spawn_blocking;
use tracing::instrument;
use tracing_durations_export::DurationsLayerBuilder;
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::util::SubscriberInitExt;

#[instrument]
async fn make_network_request(api: &str, id: usize) -> String {
    let millis = rand::thread_rng().gen_range(5..10);
    tokio::time::sleep(Duration::from_millis(millis)).await;
    format!("{api} {id}")
}

#[instrument]
async fn read_cache(id: usize) -> Option<String> {
    let millis = rand::thread_rng().gen_range(1..3);
    tokio::time::sleep(Duration::from_millis(millis)).await;
    // There's a 50% change there's a cache entry
    if rand::thread_rng().gen_bool(0.5) {
        Some(format!("cached({id})"))
    } else {
        None
    }
}

/// cpu intensive, blocking method
#[instrument(skip_all)]
fn parse_cache(data: &str) -> String {
    let millis = rand::thread_rng().gen_range(2..6);
    std::thread::sleep(Duration::from_millis(millis));
    format!("from_cache({data})")
}

/// cpu intensive, blocking method
#[instrument(skip_all)]
fn parse_network(data: &str) -> String {
    let millis = rand::thread_rng().gen_range(3..8);
    std::thread::sleep(Duration::from_millis(millis));
    format!("from_network({data})")
}

#[instrument]
async fn cached_network_request(api: &str, id: usize) -> String {
    if let Some(cached) = read_cache(id).await {
        spawn_blocking(move || parse_cache(&cached))
            .await
            .expect("executor died")
    } else {
        let response = make_network_request(api, id).await;
        spawn_blocking(move || parse_network(&response))
            .await
            .expect("executor died")
    }
}

#[tokio::main]
async fn main() {
    let (duration_layer, _guard) = if let Ok(location) = env::var("TRACING_DURATION_EXPORT") {
        let (layer, guard) = DurationsLayerBuilder::default()
            .durations_file(location)
            .build()
            .expect("Couldn't create TRACING_DURATION_FILE");
        (Some(layer), Some(guard))
    } else {
        (None, None)
    };
    tracing_subscriber::registry().with(duration_layer).init();

    // Sequential
    futures::stream::iter(0..4)
        .then(|id| make_network_request("https://example.org/uncached", id))
        .then(|data| async {
            spawn_blocking(move || parse_network(&data))
                .await
                .expect("the executor is broken")
        })
        .collect::<Vec<String>>()
        .await;

    // Spacer
    tokio::time::sleep(Duration::from_millis(5)).await;

    // Parallel
    futures::stream::iter(0..4)
        .map(|id| async move {
            let data = make_network_request("https://example.org/uncached", id).await;
            spawn_blocking(move || parse_network(&data))
                .await
                .expect("the executor is broken")
        })
        .buffer_unordered(4)
        .collect::<Vec<String>>()
        .await;

    tokio::time::sleep(Duration::from_millis(5)).await;

    // Sequential
    futures::stream::iter(0..4)
        .then(|id| cached_network_request("https://example.net/cached", id))
        .collect::<Vec<String>>()
        .await;

    tokio::time::sleep(Duration::from_millis(5)).await;

    // Parallel
    futures::stream::iter(0..4)
        .map(|id| cached_network_request("https://example.net/cached", id))
        .buffer_unordered(3)
        .collect::<Vec<String>>()
        .await;
}