1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96
|
A version of [async-stream](https://github.com/tokio-rs/async-stream) without macros.
This crate provides generic implementations of `Stream` trait.
`Stream` is an asynchronous version of `std::iter::Iterator`.
Two functions are provided - `fn_stream` and `try_fn_stream`.
# Usage
## Basic Usage
If you need to create a stream that may result in error, use `try_fn_stream`, otherwise use `fn_stream`.
To create a stream:
1. Invoke `fn_stream` or `try_fn_stream`, passing a closure (anonymous function).
2. Closure will accept an `emitter`.
To return value from the stream, call `.emit(value)` on `emitter` and `.await` on its result.
Once stream consumer has processed the value and called `.next()` on stream, `.await` will return.
## Returning errors
`try_fn_stream` provides some conveniences for returning errors:
1. Errors can be return from closure via `return Err(...)` or the question mark (`?`) operator.
This will end the stream.
2. An `emitter` also has an `emit_err()` method to return errors without ending the stream.
# Examples
Finite stream of numbers
```rust
use async_fn_stream::fn_stream;
use futures_util::Stream;
fn build_stream() -> impl Stream<Item = i32> {
fn_stream(|emitter| async move {
for i in 0..3 {
// yield elements from stream via `emitter`
emitter.emit(i).await;
}
})
}
```
Read numbers from text file, with error handling
```rust
use anyhow::Context;
use async_fn_stream::try_fn_stream;
use futures_util::{pin_mut, Stream, StreamExt};
use tokio::{
fs::File,
io::{AsyncBufReadExt, BufReader},
};
fn read_numbers(file_name: String) -> impl Stream<Item = Result<i32, anyhow::Error>> {
try_fn_stream(|emitter| async move {
// Return errors via `?` operator.
let file = BufReader::new(File::open(file_name).await.context("Failed to open file")?);
pin_mut!(file);
let mut line = String::new();
loop {
line.clear();
let byte_count = file
.read_line(&mut line)
.await
.context("Failed to read line")?;
if byte_count == 0 {
break;
}
for token in line.split_ascii_whitespace() {
let Ok(number) = token.parse::<i32>() else {
// Return errors via the `emit_err` method.
emitter.emit_err(
anyhow::anyhow!("Failed to convert string \"{token}\" to number")
).await;
continue;
};
emitter.emit(number).await;
}
}
Ok(())
})
}
```
# Why not `async-stream`?
[async-stream](https://github.com/tokio-rs/async-stream) is great!
It has a nice syntax, but it is based on macros which brings some flaws:
* proc-macros sometimes interacts badly with IDEs such as rust-analyzer or IntelliJ Rust.
see e.g. <https://github.com/rust-lang/rust-analyzer/issues/11533>
* proc-macros may increase build times
|