1use futures::channel::mpsc;
3use futures::stream::{self, Stream, StreamExt};
4
5pub fn channel<T>(size: usize, f: impl AsyncFnOnce(mpsc::Sender<T>)) -> impl Stream<Item = T> {
12 let (sender, receiver) = mpsc::channel(size);
13
14 let runner = stream::once(f(sender)).filter_map(|_| async { None });
15
16 stream::select(receiver, runner)
17}
18
19pub fn try_channel<T, E>(
22 size: usize,
23 f: impl AsyncFnOnce(mpsc::Sender<T>) -> Result<(), E>,
24) -> impl Stream<Item = Result<T, E>> {
25 let (sender, receiver) = mpsc::channel(size);
26
27 let runner = stream::once(f(sender)).filter_map(|result| async {
28 match result {
29 Ok(()) => None,
30 Err(error) => Some(Err(error)),
31 }
32 });
33
34 stream::select(receiver.map(Ok), runner)
35}