1use futures::channel::mpsc;
3use futures::stream::{self, Stream, StreamExt};
4
5pub fn channel<T>(
12 size: usize,
13 f: impl AsyncFnOnce(mpsc::Sender<T>),
14) -> impl Stream<Item = T> {
15 let (sender, receiver) = mpsc::channel(size);
16
17 let runner = stream::once(f(sender)).filter_map(|_| async { None });
18
19 stream::select(receiver, runner)
20}
21
22pub fn try_channel<T, E>(
25 size: usize,
26 f: impl AsyncFnOnce(mpsc::Sender<T>) -> Result<(), E>,
27) -> impl Stream<Item = Result<T, E>> {
28 let (sender, receiver) = mpsc::channel(size);
29
30 let runner = stream::once(f(sender)).filter_map(|result| async {
31 match result {
32 Ok(()) => None,
33 Err(error) => Some(Err(error)),
34 }
35 });
36
37 stream::select(receiver.map(Ok), runner)
38}