1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95
|
//! An executor where you can only push a limited number of tasks.
use async_executor::{Executor, Task};
use async_lock::Semaphore;
use std::{future::Future, sync::Arc, time::Duration};
/// An executor where you can only push a limited number of tasks.
struct LimitedExecutor {
/// Inner running executor.
executor: Executor<'static>,
/// Semaphore limiting the number of tasks.
semaphore: Arc<Semaphore>,
}
impl LimitedExecutor {
fn new(max: usize) -> Self {
Self {
executor: Executor::new(),
semaphore: Semaphore::new(max).into(),
}
}
/// Spawn a task, waiting until there is a slot available.
async fn spawn<F: Future + Send + 'static>(&self, future: F) -> Task<F::Output>
where
F::Output: Send + 'static,
{
// Wait for a semaphore permit.
let permit = self.semaphore.acquire_arc().await;
// Wrap it into a new future.
let future = async move {
let result = future.await;
drop(permit);
result
};
// Spawn the task.
self.executor.spawn(future)
}
/// Run a future to completion.
async fn run<F: Future>(&self, future: F) -> F::Output {
self.executor.run(future).await
}
}
fn main() {
futures_lite::future::block_on(async {
let ex = Arc::new(LimitedExecutor::new(10));
ex.run({
let ex = ex.clone();
async move {
// Spawn a bunch of tasks that wait for a while.
for i in 0..15 {
ex.spawn(async move {
async_io::Timer::after(Duration::from_millis(fastrand::u64(1..3))).await;
println!("Waiting task #{i} finished!");
})
.await
.detach();
}
let (start_tx, start_rx) = async_channel::bounded::<()>(1);
let mut current_rx = start_rx;
// Send the first message.
start_tx.send(()).await.unwrap();
// Spawn a bunch of channel tasks that wake eachother up.
for i in 0..25 {
let (next_tx, next_rx) = async_channel::bounded::<()>(1);
ex.spawn(async move {
current_rx.recv().await.unwrap();
println!("Channel task {i} woken up!");
next_tx.send(()).await.unwrap();
println!("Channel task {i} finished!");
})
.await
.detach();
current_rx = next_rx;
}
// Wait for the last task to finish.
current_rx.recv().await.unwrap();
println!("All tasks finished!");
}
})
.await;
});
}
|