File: limit.rs

package info (click to toggle)
rust-async-executor 1.13.3-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 252 kB
  • sloc: makefile: 2; sh: 1
file content (95 lines) | stat: -rw-r--r-- 2,908 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
//! An executor where you can only push a limited number of tasks.

use async_executor::{Executor, Task};
use async_lock::Semaphore;
use std::{future::Future, sync::Arc, time::Duration};

/// An executor where you can only push a limited number of tasks.
struct LimitedExecutor {
    /// Inner running executor.
    executor: Executor<'static>,

    /// Semaphore limiting the number of tasks.
    semaphore: Arc<Semaphore>,
}

impl LimitedExecutor {
    fn new(max: usize) -> Self {
        Self {
            executor: Executor::new(),
            semaphore: Semaphore::new(max).into(),
        }
    }

    /// Spawn a task, waiting until there is a slot available.
    async fn spawn<F: Future + Send + 'static>(&self, future: F) -> Task<F::Output>
    where
        F::Output: Send + 'static,
    {
        // Wait for a semaphore permit.
        let permit = self.semaphore.acquire_arc().await;

        // Wrap it into a new future.
        let future = async move {
            let result = future.await;
            drop(permit);
            result
        };

        // Spawn the task.
        self.executor.spawn(future)
    }

    /// Run a future to completion.
    async fn run<F: Future>(&self, future: F) -> F::Output {
        self.executor.run(future).await
    }
}

fn main() {
    futures_lite::future::block_on(async {
        let ex = Arc::new(LimitedExecutor::new(10));
        ex.run({
            let ex = ex.clone();
            async move {
                // Spawn a bunch of tasks that wait for a while.
                for i in 0..15 {
                    ex.spawn(async move {
                        async_io::Timer::after(Duration::from_millis(fastrand::u64(1..3))).await;
                        println!("Waiting task #{i} finished!");
                    })
                    .await
                    .detach();
                }

                let (start_tx, start_rx) = async_channel::bounded::<()>(1);
                let mut current_rx = start_rx;

                // Send the first message.
                start_tx.send(()).await.unwrap();

                // Spawn a bunch of channel tasks that wake eachother up.
                for i in 0..25 {
                    let (next_tx, next_rx) = async_channel::bounded::<()>(1);

                    ex.spawn(async move {
                        current_rx.recv().await.unwrap();
                        println!("Channel task {i} woken up!");
                        next_tx.send(()).await.unwrap();
                        println!("Channel task {i} finished!");
                    })
                    .await
                    .detach();

                    current_rx = next_rx;
                }

                // Wait for the last task to finish.
                current_rx.recv().await.unwrap();

                println!("All tasks finished!");
            }
        })
        .await;
    });
}