1//! Create asynchronous streams of data.
2use futures::channel::mpsc;
3use futures::stream::{self, Stream, StreamExt};
45/// Creates a new [`Stream`] that produces the items sent from a [`Future`]
6/// to the [`mpsc::Sender`] provided to the closure.
7///
8/// This is a more ergonomic [`stream::unfold`], which allows you to go
9/// from the "world of futures" to the "world of streams" by simply looping
10/// and publishing to an async channel from inside a [`Future`].
11pub fn channel<T>(
12 size: usize,
13 f: impl AsyncFnOnce(mpsc::Sender<T>),
14) -> impl Stream<Item = T> {
15let (sender, receiver) = mpsc::channel(size);
1617let runner = stream::once(f(sender)).filter_map(|_| async { None });
1819 stream::select(receiver, runner)
20}
2122/// Creates a new [`Stream`] that produces the items sent from a [`Future`]
23/// that can fail to the [`mpsc::Sender`] provided to the closure.
24pub fn try_channel<T, E>(
25 size: usize,
26 f: impl AsyncFnOnce(mpsc::Sender<T>) -> Result<(), E>,
27) -> impl Stream<Item = Result<T, E>> {
28let (sender, receiver) = mpsc::channel(size);
2930let runner = stream::once(f(sender)).filter_map(|result| async {
31match result {
32Ok(()) => None,
33Err(error) => Some(Err(error)),
34 }
35 });
3637 stream::select(receiver.map(Ok), runner)
38}