iced_runtime/
task.rs

1//! Create runtime tasks.
2use crate::Action;
3use crate::core::widget;
4use crate::futures::futures::channel::mpsc;
5use crate::futures::futures::channel::oneshot;
6use crate::futures::futures::future::{self, FutureExt};
7use crate::futures::futures::stream::{self, Stream, StreamExt};
8use crate::futures::{BoxStream, MaybeSend, boxed_stream};
9
10use std::convert::Infallible;
11use std::sync::Arc;
12
13#[cfg(feature = "sipper")]
14#[doc(no_inline)]
15pub use sipper::{Never, Sender, Sipper, Straw, sipper, stream};
16
17/// A set of concurrent actions to be performed by the iced runtime.
18///
19/// A [`Task`] _may_ produce a bunch of values of type `T`.
20#[must_use = "`Task` must be returned to the runtime to take effect; normally in your `update` or `new` functions."]
21pub struct Task<T> {
22    stream: Option<BoxStream<Action<T>>>,
23    units: usize,
24}
25
26impl<T> Task<T> {
27    /// Creates a [`Task`] that does nothing.
28    pub fn none() -> Self {
29        Self {
30            stream: None,
31            units: 0,
32        }
33    }
34
35    /// Creates a new [`Task`] that instantly produces the given value.
36    pub fn done(value: T) -> Self
37    where
38        T: MaybeSend + 'static,
39    {
40        Self::future(future::ready(value))
41    }
42
43    /// Creates a [`Task`] that runs the given [`Future`] to completion and maps its
44    /// output with the given closure.
45    pub fn perform<A>(
46        future: impl Future<Output = A> + MaybeSend + 'static,
47        f: impl FnOnce(A) -> T + MaybeSend + 'static,
48    ) -> Self
49    where
50        T: MaybeSend + 'static,
51        A: MaybeSend + 'static,
52    {
53        Self::future(future.map(f))
54    }
55
56    /// Creates a [`Task`] that runs the given [`Stream`] to completion and maps each
57    /// item with the given closure.
58    pub fn run<A>(
59        stream: impl Stream<Item = A> + MaybeSend + 'static,
60        f: impl Fn(A) -> T + MaybeSend + 'static,
61    ) -> Self
62    where
63        T: 'static,
64    {
65        Self::stream(stream.map(f))
66    }
67
68    /// Creates a [`Task`] that runs the given [`Sipper`] to completion, mapping
69    /// progress with the first closure and the output with the second one.
70    #[cfg(feature = "sipper")]
71    pub fn sip<S>(
72        sipper: S,
73        on_progress: impl FnMut(S::Progress) -> T + MaybeSend + 'static,
74        on_output: impl FnOnce(<S as Future>::Output) -> T + MaybeSend + 'static,
75    ) -> Self
76    where
77        S: sipper::Core + MaybeSend + 'static,
78        T: MaybeSend + 'static,
79    {
80        Self::stream(stream(sipper::sipper(move |sender| async move {
81            on_output(sipper.with(on_progress).run(sender).await)
82        })))
83    }
84
85    /// Combines the given tasks and produces a single [`Task`] that will run all of them
86    /// in parallel.
87    pub fn batch(tasks: impl IntoIterator<Item = Self>) -> Self
88    where
89        T: 'static,
90    {
91        let mut select_all = stream::SelectAll::new();
92        let mut units = 0;
93
94        for task in tasks.into_iter() {
95            if let Some(stream) = task.stream {
96                select_all.push(stream);
97            }
98
99            units += task.units;
100        }
101
102        Self {
103            stream: Some(boxed_stream(select_all)),
104            units,
105        }
106    }
107
108    /// Maps the output of a [`Task`] with the given closure.
109    pub fn map<O>(
110        self,
111        mut f: impl FnMut(T) -> O + MaybeSend + 'static,
112    ) -> Task<O>
113    where
114        T: MaybeSend + 'static,
115        O: MaybeSend + 'static,
116    {
117        self.then(move |output| Task::done(f(output)))
118    }
119
120    /// Performs a new [`Task`] for every output of the current [`Task`] using the
121    /// given closure.
122    ///
123    /// This is the monadic interface of [`Task`]—analogous to [`Future`] and
124    /// [`Stream`].
125    pub fn then<O>(
126        self,
127        mut f: impl FnMut(T) -> Task<O> + MaybeSend + 'static,
128    ) -> Task<O>
129    where
130        T: MaybeSend + 'static,
131        O: MaybeSend + 'static,
132    {
133        Task {
134            stream: match self.stream {
135                None => None,
136                Some(stream) => {
137                    Some(boxed_stream(stream.flat_map(move |action| {
138                        match action.output() {
139                            Ok(output) => {
140                                f(output).stream.unwrap_or_else(|| {
141                                    boxed_stream(stream::empty())
142                                })
143                            }
144                            Err(action) => boxed_stream(stream::once(
145                                async move { action },
146                            )),
147                        }
148                    })))
149                }
150            },
151            units: self.units,
152        }
153    }
154
155    /// Chains a new [`Task`] to be performed once the current one finishes completely.
156    pub fn chain(self, task: Self) -> Self
157    where
158        T: 'static,
159    {
160        match self.stream {
161            None => task,
162            Some(first) => match task.stream {
163                None => Self {
164                    stream: Some(first),
165                    units: self.units,
166                },
167                Some(second) => Self {
168                    stream: Some(boxed_stream(first.chain(second))),
169                    units: self.units + task.units,
170                },
171            },
172        }
173    }
174
175    /// Creates a new [`Task`] that collects all the output of the current one into a [`Vec`].
176    pub fn collect(self) -> Task<Vec<T>>
177    where
178        T: MaybeSend + 'static,
179    {
180        match self.stream {
181            None => Task::done(Vec::new()),
182            Some(stream) => Task {
183                stream: Some(boxed_stream(
184                    stream::unfold(
185                        (stream, Some(Vec::new())),
186                        move |(mut stream, outputs)| async move {
187                            let mut outputs = outputs?;
188
189                            let Some(action) = stream.next().await else {
190                                return Some((
191                                    Some(Action::Output(outputs)),
192                                    (stream, None),
193                                ));
194                            };
195
196                            match action.output() {
197                                Ok(output) => {
198                                    outputs.push(output);
199
200                                    Some((None, (stream, Some(outputs))))
201                                }
202                                Err(action) => Some((
203                                    Some(action),
204                                    (stream, Some(outputs)),
205                                )),
206                            }
207                        },
208                    )
209                    .filter_map(future::ready),
210                )),
211                units: self.units,
212            },
213        }
214    }
215
216    /// Creates a new [`Task`] that discards the result of the current one.
217    ///
218    /// Useful if you only care about the side effects of a [`Task`].
219    pub fn discard<O>(self) -> Task<O>
220    where
221        T: MaybeSend + 'static,
222        O: MaybeSend + 'static,
223    {
224        self.then(|_| Task::none())
225    }
226
227    /// Creates a new [`Task`] that can be aborted with the returned [`Handle`].
228    pub fn abortable(self) -> (Self, Handle)
229    where
230        T: 'static,
231    {
232        let (stream, handle) = match self.stream {
233            Some(stream) => {
234                let (stream, handle) = stream::abortable(stream);
235
236                (Some(boxed_stream(stream)), InternalHandle::Manual(handle))
237            }
238            None => (
239                None,
240                InternalHandle::Manual(stream::AbortHandle::new_pair().0),
241            ),
242        };
243
244        (
245            Self {
246                stream,
247                units: self.units,
248            },
249            Handle { internal: handle },
250        )
251    }
252
253    /// Creates a new [`Task`] that runs the given [`Future`] and produces
254    /// its output.
255    pub fn future(future: impl Future<Output = T> + MaybeSend + 'static) -> Self
256    where
257        T: 'static,
258    {
259        Self::stream(stream::once(future))
260    }
261
262    /// Creates a new [`Task`] that runs the given [`Stream`] and produces
263    /// each of its items.
264    pub fn stream(stream: impl Stream<Item = T> + MaybeSend + 'static) -> Self
265    where
266        T: 'static,
267    {
268        Self {
269            stream: Some(boxed_stream(stream.map(Action::Output))),
270            units: 1,
271        }
272    }
273
274    /// Returns the amount of work "units" of the [`Task`].
275    pub fn units(&self) -> usize {
276        self.units
277    }
278}
279
280impl<T> std::fmt::Debug for Task<T> {
281    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
282        f.debug_struct(&format!("Task<{}>", std::any::type_name::<T>()))
283            .field("units", &self.units)
284            .finish()
285    }
286}
287
288/// A handle to a [`Task`] that can be used for aborting it.
289#[derive(Debug, Clone)]
290pub struct Handle {
291    internal: InternalHandle,
292}
293
294#[derive(Debug, Clone)]
295enum InternalHandle {
296    Manual(stream::AbortHandle),
297    AbortOnDrop(Arc<stream::AbortHandle>),
298}
299
300impl InternalHandle {
301    pub fn as_ref(&self) -> &stream::AbortHandle {
302        match self {
303            InternalHandle::Manual(handle) => handle,
304            InternalHandle::AbortOnDrop(handle) => handle.as_ref(),
305        }
306    }
307}
308
309impl Handle {
310    /// Aborts the [`Task`] of this [`Handle`].
311    pub fn abort(&self) {
312        self.internal.as_ref().abort();
313    }
314
315    /// Returns a new [`Handle`] that will call [`Handle::abort`] whenever
316    /// all of its instances are dropped.
317    ///
318    /// If a [`Handle`] is cloned, [`Handle::abort`] will only be called
319    /// once all of the clones are dropped.
320    ///
321    /// This can be really useful if you do not want to worry about calling
322    /// [`Handle::abort`] yourself.
323    pub fn abort_on_drop(self) -> Self {
324        match &self.internal {
325            InternalHandle::Manual(handle) => Self {
326                internal: InternalHandle::AbortOnDrop(Arc::new(handle.clone())),
327            },
328            InternalHandle::AbortOnDrop(_) => self,
329        }
330    }
331
332    /// Returns `true` if the [`Task`] of this [`Handle`] has been aborted.
333    pub fn is_aborted(&self) -> bool {
334        self.internal.as_ref().is_aborted()
335    }
336}
337
338impl Drop for Handle {
339    fn drop(&mut self) {
340        if let InternalHandle::AbortOnDrop(handle) = &mut self.internal {
341            let handle = std::mem::replace(
342                handle,
343                Arc::new(stream::AbortHandle::new_pair().0),
344            );
345
346            if let Some(handle) = Arc::into_inner(handle) {
347                handle.abort();
348            }
349        }
350    }
351}
352
353impl<T> Task<Option<T>> {
354    /// Executes a new [`Task`] after this one, only when it produces `Some` value.
355    ///
356    /// The value is provided to the closure to create the subsequent [`Task`].
357    pub fn and_then<A>(
358        self,
359        f: impl Fn(T) -> Task<A> + MaybeSend + 'static,
360    ) -> Task<A>
361    where
362        T: MaybeSend + 'static,
363        A: MaybeSend + 'static,
364    {
365        self.then(move |option| option.map_or_else(Task::none, &f))
366    }
367}
368
369impl<T, E> Task<Result<T, E>> {
370    /// Executes a new [`Task`] after this one, only when it succeeds with an `Ok` value.
371    ///
372    /// The success value is provided to the closure to create the subsequent [`Task`].
373    pub fn and_then<A>(
374        self,
375        f: impl Fn(T) -> Task<A> + MaybeSend + 'static,
376    ) -> Task<A>
377    where
378        T: MaybeSend + 'static,
379        E: MaybeSend + 'static,
380        A: MaybeSend + 'static,
381    {
382        self.then(move |option| option.map_or_else(|_| Task::none(), &f))
383    }
384}
385
386impl<T> From<()> for Task<T> {
387    fn from(_value: ()) -> Self {
388        Self::none()
389    }
390}
391
392/// Creates a new [`Task`] that runs the given [`widget::Operation`] and produces
393/// its output.
394pub fn widget<T>(operation: impl widget::Operation<T> + 'static) -> Task<T>
395where
396    T: Send + 'static,
397{
398    channel(move |sender| {
399        let operation =
400            widget::operation::map(Box::new(operation), move |value| {
401                let _ = sender.clone().try_send(value);
402            });
403
404        Action::Widget(Box::new(operation))
405    })
406}
407
408/// Creates a new [`Task`] that executes the [`Action`] returned by the closure and
409/// produces the value fed to the [`oneshot::Sender`].
410pub fn oneshot<T>(f: impl FnOnce(oneshot::Sender<T>) -> Action<T>) -> Task<T>
411where
412    T: MaybeSend + 'static,
413{
414    let (sender, receiver) = oneshot::channel();
415
416    let action = f(sender);
417
418    Task {
419        stream: Some(boxed_stream(stream::once(async move { action }).chain(
420            receiver.into_stream().filter_map(|result| async move {
421                Some(Action::Output(result.ok()?))
422            }),
423        ))),
424        units: 1,
425    }
426}
427
428/// Creates a new [`Task`] that executes the [`Action`] returned by the closure and
429/// produces the values fed to the [`mpsc::Sender`].
430pub fn channel<T>(f: impl FnOnce(mpsc::Sender<T>) -> Action<T>) -> Task<T>
431where
432    T: MaybeSend + 'static,
433{
434    let (sender, receiver) = mpsc::channel(1);
435
436    let action = f(sender);
437
438    Task {
439        stream: Some(boxed_stream(
440            stream::once(async move { action })
441                .chain(receiver.map(|result| Action::Output(result))),
442        )),
443        units: 1,
444    }
445}
446
447/// Creates a new [`Task`] that executes the given [`Action`] and produces no output.
448pub fn effect<T>(action: impl Into<Action<Infallible>>) -> Task<T> {
449    let action = action.into();
450
451    Task {
452        stream: Some(boxed_stream(stream::once(async move {
453            action.output().expect_err("no output")
454        }))),
455        units: 1,
456    }
457}
458
459/// Returns the underlying [`Stream`] of the [`Task`].
460pub fn into_stream<T>(task: Task<T>) -> Option<BoxStream<Action<T>>> {
461    task.stream
462}