iced_futures/
stream.rs

1//! Create asynchronous streams of data.
2use futures::channel::mpsc;
3use futures::stream::{self, Stream, StreamExt};
4
5/// Creates a new [`Stream`] that produces the items sent from a [`Future`]
6/// to the [`mpsc::Sender`] provided to the closure.
7///
8/// This is a more ergonomic [`stream::unfold`], which allows you to go
9/// from the "world of futures" to the "world of streams" by simply looping
10/// and publishing to an async channel from inside a [`Future`].
11pub fn channel<T>(
12    size: usize,
13    f: impl AsyncFnOnce(mpsc::Sender<T>),
14) -> impl Stream<Item = T> {
15    let (sender, receiver) = mpsc::channel(size);
16
17    let runner = stream::once(f(sender)).filter_map(|_| async { None });
18
19    stream::select(receiver, runner)
20}
21
22/// Creates a new [`Stream`] that produces the items sent from a [`Future`]
23/// that can fail to the [`mpsc::Sender`] provided to the closure.
24pub fn try_channel<T, E>(
25    size: usize,
26    f: impl AsyncFnOnce(mpsc::Sender<T>) -> Result<(), E>,
27) -> impl Stream<Item = Result<T, E>> {
28    let (sender, receiver) = mpsc::channel(size);
29
30    let runner = stream::once(f(sender)).filter_map(|result| async {
31        match result {
32            Ok(()) => None,
33            Err(error) => Some(Err(error)),
34        }
35    });
36
37    stream::select(receiver.map(Ok), runner)
38}