iced_runtime/
task.rs

1//! Create runtime tasks.
2use crate::Action;
3use crate::core::widget;
4use crate::futures::futures::channel::mpsc;
5use crate::futures::futures::channel::oneshot;
6use crate::futures::futures::future::{self, FutureExt};
7use crate::futures::futures::stream::{self, Stream, StreamExt};
8use crate::futures::{BoxStream, MaybeSend, boxed_stream};
9
10use std::sync::Arc;
11
12#[doc(no_inline)]
13pub use sipper::{Never, Sender, Sipper, Straw, sipper, stream};
14
15/// A set of concurrent actions to be performed by the iced runtime.
16///
17/// A [`Task`] _may_ produce a bunch of values of type `T`.
18#[allow(missing_debug_implementations)]
19#[must_use = "`Task` must be returned to the runtime to take effect; normally in your `update` or `new` functions."]
20pub struct Task<T>(Option<BoxStream<Action<T>>>);
21
22impl<T> Task<T> {
23    /// Creates a [`Task`] that does nothing.
24    pub fn none() -> Self {
25        Self(None)
26    }
27
28    /// Creates a new [`Task`] that instantly produces the given value.
29    pub fn done(value: T) -> Self
30    where
31        T: MaybeSend + 'static,
32    {
33        Self::future(future::ready(value))
34    }
35
36    /// Creates a [`Task`] that runs the given [`Future`] to completion and maps its
37    /// output with the given closure.
38    pub fn perform<A>(
39        future: impl Future<Output = A> + MaybeSend + 'static,
40        f: impl FnOnce(A) -> T + MaybeSend + 'static,
41    ) -> Self
42    where
43        T: MaybeSend + 'static,
44        A: MaybeSend + 'static,
45    {
46        Self::future(future.map(f))
47    }
48
49    /// Creates a [`Task`] that runs the given [`Stream`] to completion and maps each
50    /// item with the given closure.
51    pub fn run<A>(
52        stream: impl Stream<Item = A> + MaybeSend + 'static,
53        f: impl Fn(A) -> T + MaybeSend + 'static,
54    ) -> Self
55    where
56        T: 'static,
57    {
58        Self::stream(stream.map(f))
59    }
60
61    /// Creates a [`Task`] that runs the given [`Sipper`] to completion, mapping
62    /// progress with the first closure and the output with the second one.
63    pub fn sip<S>(
64        sipper: S,
65        on_progress: impl FnMut(S::Progress) -> T + MaybeSend + 'static,
66        on_output: impl FnOnce(<S as Future>::Output) -> T + MaybeSend + 'static,
67    ) -> Self
68    where
69        S: sipper::Core + MaybeSend + 'static,
70        T: MaybeSend + 'static,
71    {
72        Self::stream(stream(sipper::sipper(move |sender| async move {
73            on_output(sipper.with(on_progress).run(sender).await)
74        })))
75    }
76
77    /// Combines the given tasks and produces a single [`Task`] that will run all of them
78    /// in parallel.
79    pub fn batch(tasks: impl IntoIterator<Item = Self>) -> Self
80    where
81        T: 'static,
82    {
83        Self(Some(boxed_stream(stream::select_all(
84            tasks.into_iter().filter_map(|task| task.0),
85        ))))
86    }
87
88    /// Maps the output of a [`Task`] with the given closure.
89    pub fn map<O>(
90        self,
91        mut f: impl FnMut(T) -> O + MaybeSend + 'static,
92    ) -> Task<O>
93    where
94        T: MaybeSend + 'static,
95        O: MaybeSend + 'static,
96    {
97        self.then(move |output| Task::done(f(output)))
98    }
99
100    /// Performs a new [`Task`] for every output of the current [`Task`] using the
101    /// given closure.
102    ///
103    /// This is the monadic interface of [`Task`]—analogous to [`Future`] and
104    /// [`Stream`].
105    pub fn then<O>(
106        self,
107        mut f: impl FnMut(T) -> Task<O> + MaybeSend + 'static,
108    ) -> Task<O>
109    where
110        T: MaybeSend + 'static,
111        O: MaybeSend + 'static,
112    {
113        Task(match self.0 {
114            None => None,
115            Some(stream) => {
116                Some(boxed_stream(stream.flat_map(move |action| {
117                    match action.output() {
118                        Ok(output) => f(output)
119                            .0
120                            .unwrap_or_else(|| boxed_stream(stream::empty())),
121                        Err(action) => {
122                            boxed_stream(stream::once(async move { action }))
123                        }
124                    }
125                })))
126            }
127        })
128    }
129
130    /// Chains a new [`Task`] to be performed once the current one finishes completely.
131    pub fn chain(self, task: Self) -> Self
132    where
133        T: 'static,
134    {
135        match self.0 {
136            None => task,
137            Some(first) => match task.0 {
138                None => Task(Some(first)),
139                Some(second) => Task(Some(boxed_stream(first.chain(second)))),
140            },
141        }
142    }
143
144    /// Creates a new [`Task`] that collects all the output of the current one into a [`Vec`].
145    pub fn collect(self) -> Task<Vec<T>>
146    where
147        T: MaybeSend + 'static,
148    {
149        match self.0 {
150            None => Task::done(Vec::new()),
151            Some(stream) => Task(Some(boxed_stream(
152                stream::unfold(
153                    (stream, Some(Vec::new())),
154                    move |(mut stream, outputs)| async move {
155                        let mut outputs = outputs?;
156
157                        let Some(action) = stream.next().await else {
158                            return Some((
159                                Some(Action::Output(outputs)),
160                                (stream, None),
161                            ));
162                        };
163
164                        match action.output() {
165                            Ok(output) => {
166                                outputs.push(output);
167
168                                Some((None, (stream, Some(outputs))))
169                            }
170                            Err(action) => {
171                                Some((Some(action), (stream, Some(outputs))))
172                            }
173                        }
174                    },
175                )
176                .filter_map(future::ready),
177            ))),
178        }
179    }
180
181    /// Creates a new [`Task`] that discards the result of the current one.
182    ///
183    /// Useful if you only care about the side effects of a [`Task`].
184    pub fn discard<O>(self) -> Task<O>
185    where
186        T: MaybeSend + 'static,
187        O: MaybeSend + 'static,
188    {
189        self.then(|_| Task::none())
190    }
191
192    /// Creates a new [`Task`] that can be aborted with the returned [`Handle`].
193    pub fn abortable(self) -> (Self, Handle)
194    where
195        T: 'static,
196    {
197        match self.0 {
198            Some(stream) => {
199                let (stream, handle) = stream::abortable(stream);
200
201                (
202                    Self(Some(boxed_stream(stream))),
203                    Handle {
204                        internal: InternalHandle::Manual(handle),
205                    },
206                )
207            }
208            None => (
209                Self(None),
210                Handle {
211                    internal: InternalHandle::Manual(
212                        stream::AbortHandle::new_pair().0,
213                    ),
214                },
215            ),
216        }
217    }
218
219    /// Creates a new [`Task`] that runs the given [`Future`] and produces
220    /// its output.
221    pub fn future(future: impl Future<Output = T> + MaybeSend + 'static) -> Self
222    where
223        T: 'static,
224    {
225        Self::stream(stream::once(future))
226    }
227
228    /// Creates a new [`Task`] that runs the given [`Stream`] and produces
229    /// each of its items.
230    pub fn stream(stream: impl Stream<Item = T> + MaybeSend + 'static) -> Self
231    where
232        T: 'static,
233    {
234        Self(Some(boxed_stream(stream.map(Action::Output))))
235    }
236}
237
238/// A handle to a [`Task`] that can be used for aborting it.
239#[derive(Debug, Clone)]
240pub struct Handle {
241    internal: InternalHandle,
242}
243
244#[derive(Debug, Clone)]
245enum InternalHandle {
246    Manual(stream::AbortHandle),
247    AbortOnDrop(Arc<stream::AbortHandle>),
248}
249
250impl InternalHandle {
251    pub fn as_ref(&self) -> &stream::AbortHandle {
252        match self {
253            InternalHandle::Manual(handle) => handle,
254            InternalHandle::AbortOnDrop(handle) => handle.as_ref(),
255        }
256    }
257}
258
259impl Handle {
260    /// Aborts the [`Task`] of this [`Handle`].
261    pub fn abort(&self) {
262        self.internal.as_ref().abort();
263    }
264
265    /// Returns a new [`Handle`] that will call [`Handle::abort`] whenever
266    /// all of its instances are dropped.
267    ///
268    /// If a [`Handle`] is cloned, [`Handle::abort`] will only be called
269    /// once all of the clones are dropped.
270    ///
271    /// This can be really useful if you do not want to worry about calling
272    /// [`Handle::abort`] yourself.
273    pub fn abort_on_drop(self) -> Self {
274        match &self.internal {
275            InternalHandle::Manual(handle) => Self {
276                internal: InternalHandle::AbortOnDrop(Arc::new(handle.clone())),
277            },
278            InternalHandle::AbortOnDrop(_) => self,
279        }
280    }
281
282    /// Returns `true` if the [`Task`] of this [`Handle`] has been aborted.
283    pub fn is_aborted(&self) -> bool {
284        self.internal.as_ref().is_aborted()
285    }
286}
287
288impl Drop for Handle {
289    fn drop(&mut self) {
290        if let InternalHandle::AbortOnDrop(handle) = &mut self.internal {
291            let handle = std::mem::replace(
292                handle,
293                Arc::new(stream::AbortHandle::new_pair().0),
294            );
295
296            if let Some(handle) = Arc::into_inner(handle) {
297                handle.abort();
298            }
299        }
300    }
301}
302
303impl<T> Task<Option<T>> {
304    /// Executes a new [`Task`] after this one, only when it produces `Some` value.
305    ///
306    /// The value is provided to the closure to create the subsequent [`Task`].
307    pub fn and_then<A>(
308        self,
309        f: impl Fn(T) -> Task<A> + MaybeSend + 'static,
310    ) -> Task<A>
311    where
312        T: MaybeSend + 'static,
313        A: MaybeSend + 'static,
314    {
315        self.then(move |option| option.map_or_else(Task::none, &f))
316    }
317}
318
319impl<T, E> Task<Result<T, E>> {
320    /// Executes a new [`Task`] after this one, only when it succeeds with an `Ok` value.
321    ///
322    /// The success value is provided to the closure to create the subsequent [`Task`].
323    pub fn and_then<A>(
324        self,
325        f: impl Fn(T) -> Task<A> + MaybeSend + 'static,
326    ) -> Task<A>
327    where
328        T: MaybeSend + 'static,
329        E: MaybeSend + 'static,
330        A: MaybeSend + 'static,
331    {
332        self.then(move |option| option.map_or_else(|_| Task::none(), &f))
333    }
334}
335
336impl<T> From<()> for Task<T> {
337    fn from(_value: ()) -> Self {
338        Self::none()
339    }
340}
341
342/// Creates a new [`Task`] that runs the given [`widget::Operation`] and produces
343/// its output.
344pub fn widget<T>(operation: impl widget::Operation<T> + 'static) -> Task<T>
345where
346    T: Send + 'static,
347{
348    channel(move |sender| {
349        let operation =
350            widget::operation::map(Box::new(operation), move |value| {
351                let _ = sender.clone().try_send(value);
352            });
353
354        Action::Widget(Box::new(operation))
355    })
356}
357
358/// Creates a new [`Task`] that executes the [`Action`] returned by the closure and
359/// produces the value fed to the [`oneshot::Sender`].
360pub fn oneshot<T>(f: impl FnOnce(oneshot::Sender<T>) -> Action<T>) -> Task<T>
361where
362    T: MaybeSend + 'static,
363{
364    let (sender, receiver) = oneshot::channel();
365
366    let action = f(sender);
367
368    Task(Some(boxed_stream(
369        stream::once(async move { action }).chain(
370            receiver.into_stream().filter_map(|result| async move {
371                Some(Action::Output(result.ok()?))
372            }),
373        ),
374    )))
375}
376
377/// Creates a new [`Task`] that executes the [`Action`] returned by the closure and
378/// produces the values fed to the [`mpsc::Sender`].
379pub fn channel<T>(f: impl FnOnce(mpsc::Sender<T>) -> Action<T>) -> Task<T>
380where
381    T: MaybeSend + 'static,
382{
383    let (sender, receiver) = mpsc::channel(1);
384
385    let action = f(sender);
386
387    Task(Some(boxed_stream(
388        stream::once(async move { action })
389            .chain(receiver.map(|result| Action::Output(result))),
390    )))
391}
392
393/// Creates a new [`Task`] that executes the given [`Action`] and produces no output.
394pub fn effect<T>(action: impl Into<Action<Never>>) -> Task<T> {
395    let action = action.into();
396
397    Task(Some(boxed_stream(stream::once(async move {
398        action.output().expect_err("no output")
399    }))))
400}
401
402/// Returns the underlying [`Stream`] of the [`Task`].
403pub fn into_stream<T>(task: Task<T>) -> Option<BoxStream<Action<T>>> {
404    task.0
405}