iced_runtime/
task.rs

1//! Create runtime tasks.
2use crate::Action;
3use crate::core::widget;
4use crate::futures::futures::channel::mpsc;
5use crate::futures::futures::channel::oneshot;
6use crate::futures::futures::future::{self, FutureExt};
7use crate::futures::futures::stream::{self, Stream, StreamExt};
8use crate::futures::{BoxStream, MaybeSend, boxed_stream};
9
10use std::convert::Infallible;
11use std::sync::Arc;
12
13#[cfg(feature = "sipper")]
14#[doc(no_inline)]
15pub use sipper::{Never, Sender, Sipper, Straw, sipper, stream};
16
17/// A set of concurrent actions to be performed by the iced runtime.
18///
19/// A [`Task`] _may_ produce a bunch of values of type `T`.
20#[allow(missing_debug_implementations)]
21#[must_use = "`Task` must be returned to the runtime to take effect; normally in your `update` or `new` functions."]
22pub struct Task<T> {
23    stream: Option<BoxStream<Action<T>>>,
24    units: usize,
25}
26
27impl<T> Task<T> {
28    /// Creates a [`Task`] that does nothing.
29    pub fn none() -> Self {
30        Self {
31            stream: None,
32            units: 0,
33        }
34    }
35
36    /// Creates a new [`Task`] that instantly produces the given value.
37    pub fn done(value: T) -> Self
38    where
39        T: MaybeSend + 'static,
40    {
41        Self::future(future::ready(value))
42    }
43
44    /// Creates a [`Task`] that runs the given [`Future`] to completion and maps its
45    /// output with the given closure.
46    pub fn perform<A>(
47        future: impl Future<Output = A> + MaybeSend + 'static,
48        f: impl FnOnce(A) -> T + MaybeSend + 'static,
49    ) -> Self
50    where
51        T: MaybeSend + 'static,
52        A: MaybeSend + 'static,
53    {
54        Self::future(future.map(f))
55    }
56
57    /// Creates a [`Task`] that runs the given [`Stream`] to completion and maps each
58    /// item with the given closure.
59    pub fn run<A>(
60        stream: impl Stream<Item = A> + MaybeSend + 'static,
61        f: impl Fn(A) -> T + MaybeSend + 'static,
62    ) -> Self
63    where
64        T: 'static,
65    {
66        Self::stream(stream.map(f))
67    }
68
69    /// Creates a [`Task`] that runs the given [`Sipper`] to completion, mapping
70    /// progress with the first closure and the output with the second one.
71    #[cfg(feature = "sipper")]
72    pub fn sip<S>(
73        sipper: S,
74        on_progress: impl FnMut(S::Progress) -> T + MaybeSend + 'static,
75        on_output: impl FnOnce(<S as Future>::Output) -> T + MaybeSend + 'static,
76    ) -> Self
77    where
78        S: sipper::Core + MaybeSend + 'static,
79        T: MaybeSend + 'static,
80    {
81        Self::stream(stream(sipper::sipper(move |sender| async move {
82            on_output(sipper.with(on_progress).run(sender).await)
83        })))
84    }
85
86    /// Combines the given tasks and produces a single [`Task`] that will run all of them
87    /// in parallel.
88    pub fn batch(tasks: impl IntoIterator<Item = Self>) -> Self
89    where
90        T: 'static,
91    {
92        let mut select_all = stream::SelectAll::new();
93        let mut units = 0;
94
95        for task in tasks.into_iter() {
96            if let Some(stream) = task.stream {
97                select_all.push(stream);
98            }
99
100            units += task.units;
101        }
102
103        Self {
104            stream: Some(boxed_stream(select_all)),
105            units,
106        }
107    }
108
109    /// Maps the output of a [`Task`] with the given closure.
110    pub fn map<O>(
111        self,
112        mut f: impl FnMut(T) -> O + MaybeSend + 'static,
113    ) -> Task<O>
114    where
115        T: MaybeSend + 'static,
116        O: MaybeSend + 'static,
117    {
118        self.then(move |output| Task::done(f(output)))
119    }
120
121    /// Performs a new [`Task`] for every output of the current [`Task`] using the
122    /// given closure.
123    ///
124    /// This is the monadic interface of [`Task`]—analogous to [`Future`] and
125    /// [`Stream`].
126    pub fn then<O>(
127        self,
128        mut f: impl FnMut(T) -> Task<O> + MaybeSend + 'static,
129    ) -> Task<O>
130    where
131        T: MaybeSend + 'static,
132        O: MaybeSend + 'static,
133    {
134        Task {
135            stream: match self.stream {
136                None => None,
137                Some(stream) => {
138                    Some(boxed_stream(stream.flat_map(move |action| {
139                        match action.output() {
140                            Ok(output) => {
141                                f(output).stream.unwrap_or_else(|| {
142                                    boxed_stream(stream::empty())
143                                })
144                            }
145                            Err(action) => boxed_stream(stream::once(
146                                async move { action },
147                            )),
148                        }
149                    })))
150                }
151            },
152            units: self.units,
153        }
154    }
155
156    /// Chains a new [`Task`] to be performed once the current one finishes completely.
157    pub fn chain(self, task: Self) -> Self
158    where
159        T: 'static,
160    {
161        match self.stream {
162            None => task,
163            Some(first) => match task.stream {
164                None => Self {
165                    stream: Some(first),
166                    units: self.units,
167                },
168                Some(second) => Self {
169                    stream: Some(boxed_stream(first.chain(second))),
170                    units: self.units + task.units,
171                },
172            },
173        }
174    }
175
176    /// Creates a new [`Task`] that collects all the output of the current one into a [`Vec`].
177    pub fn collect(self) -> Task<Vec<T>>
178    where
179        T: MaybeSend + 'static,
180    {
181        match self.stream {
182            None => Task::done(Vec::new()),
183            Some(stream) => Task {
184                stream: Some(boxed_stream(
185                    stream::unfold(
186                        (stream, Some(Vec::new())),
187                        move |(mut stream, outputs)| async move {
188                            let mut outputs = outputs?;
189
190                            let Some(action) = stream.next().await else {
191                                return Some((
192                                    Some(Action::Output(outputs)),
193                                    (stream, None),
194                                ));
195                            };
196
197                            match action.output() {
198                                Ok(output) => {
199                                    outputs.push(output);
200
201                                    Some((None, (stream, Some(outputs))))
202                                }
203                                Err(action) => Some((
204                                    Some(action),
205                                    (stream, Some(outputs)),
206                                )),
207                            }
208                        },
209                    )
210                    .filter_map(future::ready),
211                )),
212                units: self.units,
213            },
214        }
215    }
216
217    /// Creates a new [`Task`] that discards the result of the current one.
218    ///
219    /// Useful if you only care about the side effects of a [`Task`].
220    pub fn discard<O>(self) -> Task<O>
221    where
222        T: MaybeSend + 'static,
223        O: MaybeSend + 'static,
224    {
225        self.then(|_| Task::none())
226    }
227
228    /// Creates a new [`Task`] that can be aborted with the returned [`Handle`].
229    pub fn abortable(self) -> (Self, Handle)
230    where
231        T: 'static,
232    {
233        let (stream, handle) = match self.stream {
234            Some(stream) => {
235                let (stream, handle) = stream::abortable(stream);
236
237                (Some(boxed_stream(stream)), InternalHandle::Manual(handle))
238            }
239            None => (
240                None,
241                InternalHandle::Manual(stream::AbortHandle::new_pair().0),
242            ),
243        };
244
245        (
246            Self {
247                stream,
248                units: self.units,
249            },
250            Handle { internal: handle },
251        )
252    }
253
254    /// Creates a new [`Task`] that runs the given [`Future`] and produces
255    /// its output.
256    pub fn future(future: impl Future<Output = T> + MaybeSend + 'static) -> Self
257    where
258        T: 'static,
259    {
260        Self::stream(stream::once(future))
261    }
262
263    /// Creates a new [`Task`] that runs the given [`Stream`] and produces
264    /// each of its items.
265    pub fn stream(stream: impl Stream<Item = T> + MaybeSend + 'static) -> Self
266    where
267        T: 'static,
268    {
269        Self {
270            stream: Some(boxed_stream(stream.map(Action::Output))),
271            units: 1,
272        }
273    }
274
275    /// Returns the amount of work "units" of the [`Task`].
276    pub fn units(&self) -> usize {
277        self.units
278    }
279}
280
281/// A handle to a [`Task`] that can be used for aborting it.
282#[derive(Debug, Clone)]
283pub struct Handle {
284    internal: InternalHandle,
285}
286
287#[derive(Debug, Clone)]
288enum InternalHandle {
289    Manual(stream::AbortHandle),
290    AbortOnDrop(Arc<stream::AbortHandle>),
291}
292
293impl InternalHandle {
294    pub fn as_ref(&self) -> &stream::AbortHandle {
295        match self {
296            InternalHandle::Manual(handle) => handle,
297            InternalHandle::AbortOnDrop(handle) => handle.as_ref(),
298        }
299    }
300}
301
302impl Handle {
303    /// Aborts the [`Task`] of this [`Handle`].
304    pub fn abort(&self) {
305        self.internal.as_ref().abort();
306    }
307
308    /// Returns a new [`Handle`] that will call [`Handle::abort`] whenever
309    /// all of its instances are dropped.
310    ///
311    /// If a [`Handle`] is cloned, [`Handle::abort`] will only be called
312    /// once all of the clones are dropped.
313    ///
314    /// This can be really useful if you do not want to worry about calling
315    /// [`Handle::abort`] yourself.
316    pub fn abort_on_drop(self) -> Self {
317        match &self.internal {
318            InternalHandle::Manual(handle) => Self {
319                internal: InternalHandle::AbortOnDrop(Arc::new(handle.clone())),
320            },
321            InternalHandle::AbortOnDrop(_) => self,
322        }
323    }
324
325    /// Returns `true` if the [`Task`] of this [`Handle`] has been aborted.
326    pub fn is_aborted(&self) -> bool {
327        self.internal.as_ref().is_aborted()
328    }
329}
330
331impl Drop for Handle {
332    fn drop(&mut self) {
333        if let InternalHandle::AbortOnDrop(handle) = &mut self.internal {
334            let handle = std::mem::replace(
335                handle,
336                Arc::new(stream::AbortHandle::new_pair().0),
337            );
338
339            if let Some(handle) = Arc::into_inner(handle) {
340                handle.abort();
341            }
342        }
343    }
344}
345
346impl<T> Task<Option<T>> {
347    /// Executes a new [`Task`] after this one, only when it produces `Some` value.
348    ///
349    /// The value is provided to the closure to create the subsequent [`Task`].
350    pub fn and_then<A>(
351        self,
352        f: impl Fn(T) -> Task<A> + MaybeSend + 'static,
353    ) -> Task<A>
354    where
355        T: MaybeSend + 'static,
356        A: MaybeSend + 'static,
357    {
358        self.then(move |option| option.map_or_else(Task::none, &f))
359    }
360}
361
362impl<T, E> Task<Result<T, E>> {
363    /// Executes a new [`Task`] after this one, only when it succeeds with an `Ok` value.
364    ///
365    /// The success value is provided to the closure to create the subsequent [`Task`].
366    pub fn and_then<A>(
367        self,
368        f: impl Fn(T) -> Task<A> + MaybeSend + 'static,
369    ) -> Task<A>
370    where
371        T: MaybeSend + 'static,
372        E: MaybeSend + 'static,
373        A: MaybeSend + 'static,
374    {
375        self.then(move |option| option.map_or_else(|_| Task::none(), &f))
376    }
377}
378
379impl<T> From<()> for Task<T> {
380    fn from(_value: ()) -> Self {
381        Self::none()
382    }
383}
384
385/// Creates a new [`Task`] that runs the given [`widget::Operation`] and produces
386/// its output.
387pub fn widget<T>(operation: impl widget::Operation<T> + 'static) -> Task<T>
388where
389    T: Send + 'static,
390{
391    channel(move |sender| {
392        let operation =
393            widget::operation::map(Box::new(operation), move |value| {
394                let _ = sender.clone().try_send(value);
395            });
396
397        Action::Widget(Box::new(operation))
398    })
399}
400
401/// Creates a new [`Task`] that executes the [`Action`] returned by the closure and
402/// produces the value fed to the [`oneshot::Sender`].
403pub fn oneshot<T>(f: impl FnOnce(oneshot::Sender<T>) -> Action<T>) -> Task<T>
404where
405    T: MaybeSend + 'static,
406{
407    let (sender, receiver) = oneshot::channel();
408
409    let action = f(sender);
410
411    Task {
412        stream: Some(boxed_stream(stream::once(async move { action }).chain(
413            receiver.into_stream().filter_map(|result| async move {
414                Some(Action::Output(result.ok()?))
415            }),
416        ))),
417        units: 1,
418    }
419}
420
421/// Creates a new [`Task`] that executes the [`Action`] returned by the closure and
422/// produces the values fed to the [`mpsc::Sender`].
423pub fn channel<T>(f: impl FnOnce(mpsc::Sender<T>) -> Action<T>) -> Task<T>
424where
425    T: MaybeSend + 'static,
426{
427    let (sender, receiver) = mpsc::channel(1);
428
429    let action = f(sender);
430
431    Task {
432        stream: Some(boxed_stream(
433            stream::once(async move { action })
434                .chain(receiver.map(|result| Action::Output(result))),
435        )),
436        units: 1,
437    }
438}
439
440/// Creates a new [`Task`] that executes the given [`Action`] and produces no output.
441pub fn effect<T>(action: impl Into<Action<Infallible>>) -> Task<T> {
442    let action = action.into();
443
444    Task {
445        stream: Some(boxed_stream(stream::once(async move {
446            action.output().expect_err("no output")
447        }))),
448        units: 1,
449    }
450}
451
452/// Returns the underlying [`Stream`] of the [`Task`].
453pub fn into_stream<T>(task: Task<T>) -> Option<BoxStream<Action<T>>> {
454    task.stream
455}