iced_futures/
stream.rs

1//! Create asynchronous streams of data.
2use futures::channel::mpsc;
3use futures::stream::{self, Stream, StreamExt};
4
5/// Creates a new [`Stream`] that produces the items sent from a [`Future`]
6/// to the [`mpsc::Sender`] provided to the closure.
7///
8/// This is a more ergonomic [`stream::unfold`], which allows you to go
9/// from the "world of futures" to the "world of streams" by simply looping
10/// and publishing to an async channel from inside a [`Future`].
11pub fn channel<T>(size: usize, f: impl AsyncFnOnce(mpsc::Sender<T>)) -> impl Stream<Item = T> {
12    let (sender, receiver) = mpsc::channel(size);
13
14    let runner = stream::once(f(sender)).filter_map(|_| async { None });
15
16    stream::select(receiver, runner)
17}
18
19/// Creates a new [`Stream`] that produces the items sent from a [`Future`]
20/// that can fail to the [`mpsc::Sender`] provided to the closure.
21pub fn try_channel<T, E>(
22    size: usize,
23    f: impl AsyncFnOnce(mpsc::Sender<T>) -> Result<(), E>,
24) -> impl Stream<Item = Result<T, E>> {
25    let (sender, receiver) = mpsc::channel(size);
26
27    let runner = stream::once(f(sender)).filter_map(|result| async {
28        match result {
29            Ok(()) => None,
30            Err(error) => Some(Err(error)),
31        }
32    });
33
34    stream::select(receiver.map(Ok), runner)
35}