iced_runtime/
task.rs

1//! Create runtime tasks.
2use crate::Action;
3use crate::core::widget;
4use crate::futures::futures::channel::mpsc;
5use crate::futures::futures::channel::oneshot;
6use crate::futures::futures::future::{self, FutureExt};
7use crate::futures::futures::stream::{self, Stream, StreamExt};
8use crate::futures::{BoxStream, MaybeSend, boxed_stream};
9
10use std::convert::Infallible;
11use std::sync::Arc;
12use std::thread;
13
14#[cfg(feature = "sipper")]
15#[doc(no_inline)]
16pub use sipper::{Never, Sender, Sipper, Straw, sipper, stream};
17
18/// A set of concurrent actions to be performed by the iced runtime.
19///
20/// A [`Task`] _may_ produce a bunch of values of type `T`.
21#[must_use = "`Task` must be returned to the runtime to take effect; normally in your `update` or `new` functions."]
22pub struct Task<T> {
23    stream: Option<BoxStream<Action<T>>>,
24    units: usize,
25}
26
27impl<T> Task<T> {
28    /// Creates a [`Task`] that does nothing.
29    pub fn none() -> Self {
30        Self {
31            stream: None,
32            units: 0,
33        }
34    }
35
36    /// Creates a new [`Task`] that instantly produces the given value.
37    pub fn done(value: T) -> Self
38    where
39        T: MaybeSend + 'static,
40    {
41        Self::future(future::ready(value))
42    }
43
44    /// Creates a [`Task`] that runs the given [`Future`] to completion and maps its
45    /// output with the given closure.
46    pub fn perform<A>(
47        future: impl Future<Output = A> + MaybeSend + 'static,
48        f: impl FnOnce(A) -> T + MaybeSend + 'static,
49    ) -> Self
50    where
51        T: MaybeSend + 'static,
52        A: MaybeSend + 'static,
53    {
54        Self::future(future.map(f))
55    }
56
57    /// Creates a [`Task`] that runs the given [`Stream`] to completion and maps each
58    /// item with the given closure.
59    pub fn run<A>(
60        stream: impl Stream<Item = A> + MaybeSend + 'static,
61        f: impl Fn(A) -> T + MaybeSend + 'static,
62    ) -> Self
63    where
64        T: 'static,
65    {
66        Self::stream(stream.map(f))
67    }
68
69    /// Creates a [`Task`] that runs the given [`Sipper`] to completion, mapping
70    /// progress with the first closure and the output with the second one.
71    #[cfg(feature = "sipper")]
72    pub fn sip<S>(
73        sipper: S,
74        on_progress: impl FnMut(S::Progress) -> T + MaybeSend + 'static,
75        on_output: impl FnOnce(<S as Future>::Output) -> T + MaybeSend + 'static,
76    ) -> Self
77    where
78        S: sipper::Core + MaybeSend + 'static,
79        T: MaybeSend + 'static,
80    {
81        Self::stream(stream(sipper::sipper(move |sender| async move {
82            on_output(sipper.with(on_progress).run(sender).await)
83        })))
84    }
85
86    /// Combines the given tasks and produces a single [`Task`] that will run all of them
87    /// in parallel.
88    pub fn batch(tasks: impl IntoIterator<Item = Self>) -> Self
89    where
90        T: 'static,
91    {
92        let mut select_all = stream::SelectAll::new();
93        let mut units = 0;
94
95        for task in tasks.into_iter() {
96            if let Some(stream) = task.stream {
97                select_all.push(stream);
98            }
99
100            units += task.units;
101        }
102
103        Self {
104            stream: Some(boxed_stream(select_all)),
105            units,
106        }
107    }
108
109    /// Maps the output of a [`Task`] with the given closure.
110    pub fn map<O>(
111        self,
112        mut f: impl FnMut(T) -> O + MaybeSend + 'static,
113    ) -> Task<O>
114    where
115        T: MaybeSend + 'static,
116        O: MaybeSend + 'static,
117    {
118        self.then(move |output| Task::done(f(output)))
119    }
120
121    /// Performs a new [`Task`] for every output of the current [`Task`] using the
122    /// given closure.
123    ///
124    /// This is the monadic interface of [`Task`]—analogous to [`Future`] and
125    /// [`Stream`].
126    pub fn then<O>(
127        self,
128        mut f: impl FnMut(T) -> Task<O> + MaybeSend + 'static,
129    ) -> Task<O>
130    where
131        T: MaybeSend + 'static,
132        O: MaybeSend + 'static,
133    {
134        Task {
135            stream: match self.stream {
136                None => None,
137                Some(stream) => {
138                    Some(boxed_stream(stream.flat_map(move |action| {
139                        match action.output() {
140                            Ok(output) => {
141                                f(output).stream.unwrap_or_else(|| {
142                                    boxed_stream(stream::empty())
143                                })
144                            }
145                            Err(action) => boxed_stream(stream::once(
146                                async move { action },
147                            )),
148                        }
149                    })))
150                }
151            },
152            units: self.units,
153        }
154    }
155
156    /// Chains a new [`Task`] to be performed once the current one finishes completely.
157    pub fn chain(self, task: Self) -> Self
158    where
159        T: 'static,
160    {
161        match self.stream {
162            None => task,
163            Some(first) => match task.stream {
164                None => Self {
165                    stream: Some(first),
166                    units: self.units,
167                },
168                Some(second) => Self {
169                    stream: Some(boxed_stream(first.chain(second))),
170                    units: self.units + task.units,
171                },
172            },
173        }
174    }
175
176    /// Creates a new [`Task`] that collects all the output of the current one into a [`Vec`].
177    pub fn collect(self) -> Task<Vec<T>>
178    where
179        T: MaybeSend + 'static,
180    {
181        match self.stream {
182            None => Task::done(Vec::new()),
183            Some(stream) => Task {
184                stream: Some(boxed_stream(
185                    stream::unfold(
186                        (stream, Some(Vec::new())),
187                        move |(mut stream, outputs)| async move {
188                            let mut outputs = outputs?;
189
190                            let Some(action) = stream.next().await else {
191                                return Some((
192                                    Some(Action::Output(outputs)),
193                                    (stream, None),
194                                ));
195                            };
196
197                            match action.output() {
198                                Ok(output) => {
199                                    outputs.push(output);
200
201                                    Some((None, (stream, Some(outputs))))
202                                }
203                                Err(action) => Some((
204                                    Some(action),
205                                    (stream, Some(outputs)),
206                                )),
207                            }
208                        },
209                    )
210                    .filter_map(future::ready),
211                )),
212                units: self.units,
213            },
214        }
215    }
216
217    /// Creates a new [`Task`] that discards the result of the current one.
218    ///
219    /// Useful if you only care about the side effects of a [`Task`].
220    pub fn discard<O>(self) -> Task<O>
221    where
222        T: MaybeSend + 'static,
223        O: MaybeSend + 'static,
224    {
225        self.then(|_| Task::none())
226    }
227
228    /// Creates a new [`Task`] that can be aborted with the returned [`Handle`].
229    pub fn abortable(self) -> (Self, Handle)
230    where
231        T: 'static,
232    {
233        let (stream, handle) = match self.stream {
234            Some(stream) => {
235                let (stream, handle) = stream::abortable(stream);
236
237                (Some(boxed_stream(stream)), InternalHandle::Manual(handle))
238            }
239            None => (
240                None,
241                InternalHandle::Manual(stream::AbortHandle::new_pair().0),
242            ),
243        };
244
245        (
246            Self {
247                stream,
248                units: self.units,
249            },
250            Handle { internal: handle },
251        )
252    }
253
254    /// Creates a new [`Task`] that runs the given [`Future`] and produces
255    /// its output.
256    pub fn future(future: impl Future<Output = T> + MaybeSend + 'static) -> Self
257    where
258        T: 'static,
259    {
260        Self::stream(stream::once(future))
261    }
262
263    /// Creates a new [`Task`] that runs the given [`Stream`] and produces
264    /// each of its items.
265    pub fn stream(stream: impl Stream<Item = T> + MaybeSend + 'static) -> Self
266    where
267        T: 'static,
268    {
269        Self {
270            stream: Some(boxed_stream(stream.map(Action::Output))),
271            units: 1,
272        }
273    }
274
275    /// Returns the amount of work "units" of the [`Task`].
276    pub fn units(&self) -> usize {
277        self.units
278    }
279}
280
281impl<T> std::fmt::Debug for Task<T> {
282    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
283        f.debug_struct(&format!("Task<{}>", std::any::type_name::<T>()))
284            .field("units", &self.units)
285            .finish()
286    }
287}
288
289/// A handle to a [`Task`] that can be used for aborting it.
290#[derive(Debug, Clone)]
291pub struct Handle {
292    internal: InternalHandle,
293}
294
295#[derive(Debug, Clone)]
296enum InternalHandle {
297    Manual(stream::AbortHandle),
298    AbortOnDrop(Arc<stream::AbortHandle>),
299}
300
301impl InternalHandle {
302    pub fn as_ref(&self) -> &stream::AbortHandle {
303        match self {
304            InternalHandle::Manual(handle) => handle,
305            InternalHandle::AbortOnDrop(handle) => handle.as_ref(),
306        }
307    }
308}
309
310impl Handle {
311    /// Aborts the [`Task`] of this [`Handle`].
312    pub fn abort(&self) {
313        self.internal.as_ref().abort();
314    }
315
316    /// Returns a new [`Handle`] that will call [`Handle::abort`] whenever
317    /// all of its instances are dropped.
318    ///
319    /// If a [`Handle`] is cloned, [`Handle::abort`] will only be called
320    /// once all of the clones are dropped.
321    ///
322    /// This can be really useful if you do not want to worry about calling
323    /// [`Handle::abort`] yourself.
324    pub fn abort_on_drop(self) -> Self {
325        match &self.internal {
326            InternalHandle::Manual(handle) => Self {
327                internal: InternalHandle::AbortOnDrop(Arc::new(handle.clone())),
328            },
329            InternalHandle::AbortOnDrop(_) => self,
330        }
331    }
332
333    /// Returns `true` if the [`Task`] of this [`Handle`] has been aborted.
334    pub fn is_aborted(&self) -> bool {
335        self.internal.as_ref().is_aborted()
336    }
337}
338
339impl Drop for Handle {
340    fn drop(&mut self) {
341        if let InternalHandle::AbortOnDrop(handle) = &mut self.internal {
342            let handle = std::mem::replace(
343                handle,
344                Arc::new(stream::AbortHandle::new_pair().0),
345            );
346
347            if let Some(handle) = Arc::into_inner(handle) {
348                handle.abort();
349            }
350        }
351    }
352}
353
354impl<T> Task<Option<T>> {
355    /// Executes a new [`Task`] after this one, only when it produces `Some` value.
356    ///
357    /// The value is provided to the closure to create the subsequent [`Task`].
358    pub fn and_then<A>(
359        self,
360        f: impl Fn(T) -> Task<A> + MaybeSend + 'static,
361    ) -> Task<A>
362    where
363        T: MaybeSend + 'static,
364        A: MaybeSend + 'static,
365    {
366        self.then(move |option| option.map_or_else(Task::none, &f))
367    }
368}
369
370impl<T, E> Task<Result<T, E>> {
371    /// Executes a new [`Task`] after this one, only when it succeeds with an `Ok` value.
372    ///
373    /// The success value is provided to the closure to create the subsequent [`Task`].
374    pub fn and_then<A>(
375        self,
376        f: impl Fn(T) -> Task<A> + MaybeSend + 'static,
377    ) -> Task<A>
378    where
379        T: MaybeSend + 'static,
380        E: MaybeSend + 'static,
381        A: MaybeSend + 'static,
382    {
383        self.then(move |option| option.map_or_else(|_| Task::none(), &f))
384    }
385}
386
387impl<T> Default for Task<T> {
388    fn default() -> Self {
389        Self::none()
390    }
391}
392
393impl<T> From<()> for Task<T> {
394    fn from(_value: ()) -> Self {
395        Self::none()
396    }
397}
398
399/// Creates a new [`Task`] that runs the given [`widget::Operation`] and produces
400/// its output.
401pub fn widget<T>(operation: impl widget::Operation<T> + 'static) -> Task<T>
402where
403    T: Send + 'static,
404{
405    channel(move |sender| {
406        let operation =
407            widget::operation::map(Box::new(operation), move |value| {
408                let _ = sender.clone().try_send(value);
409            });
410
411        Action::Widget(Box::new(operation))
412    })
413}
414
415/// Creates a new [`Task`] that executes the [`Action`] returned by the closure and
416/// produces the value fed to the [`oneshot::Sender`].
417pub fn oneshot<T>(f: impl FnOnce(oneshot::Sender<T>) -> Action<T>) -> Task<T>
418where
419    T: MaybeSend + 'static,
420{
421    let (sender, receiver) = oneshot::channel();
422
423    let action = f(sender);
424
425    Task {
426        stream: Some(boxed_stream(stream::once(async move { action }).chain(
427            receiver.into_stream().filter_map(|result| async move {
428                Some(Action::Output(result.ok()?))
429            }),
430        ))),
431        units: 1,
432    }
433}
434
435/// Creates a new [`Task`] that executes the [`Action`] returned by the closure and
436/// produces the values fed to the [`mpsc::Sender`].
437pub fn channel<T>(f: impl FnOnce(mpsc::Sender<T>) -> Action<T>) -> Task<T>
438where
439    T: MaybeSend + 'static,
440{
441    let (sender, receiver) = mpsc::channel(1);
442
443    let action = f(sender);
444
445    Task {
446        stream: Some(boxed_stream(
447            stream::once(async move { action })
448                .chain(receiver.map(|result| Action::Output(result))),
449        )),
450        units: 1,
451    }
452}
453
454/// Creates a new [`Task`] that executes the given [`Action`] and produces no output.
455pub fn effect<T>(action: impl Into<Action<Infallible>>) -> Task<T> {
456    let action = action.into();
457
458    Task {
459        stream: Some(boxed_stream(stream::once(async move {
460            action.output().expect_err("no output")
461        }))),
462        units: 1,
463    }
464}
465
466/// Returns the underlying [`Stream`] of the [`Task`].
467pub fn into_stream<T>(task: Task<T>) -> Option<BoxStream<Action<T>>> {
468    task.stream
469}
470
471/// Creates a new [`Task`] that will run the given closure in a new thread.
472///
473/// Any data sent by the closure through the [`mpsc::Sender`] will be produced
474/// by the [`Task`].
475pub fn blocking<T>(f: impl FnOnce(mpsc::Sender<T>) + Send + 'static) -> Task<T>
476where
477    T: Send + 'static,
478{
479    let (sender, receiver) = mpsc::channel(1);
480
481    let _ = thread::spawn(move || {
482        f(sender);
483    });
484
485    Task::stream(receiver)
486}
487
488/// Creates a new [`Task`] that will run the given closure that can fail in a new
489/// thread.
490///
491/// Any data sent by the closure through the [`mpsc::Sender`] will be produced
492/// by the [`Task`].
493pub fn try_blocking<T, E>(
494    f: impl FnOnce(mpsc::Sender<T>) -> Result<(), E> + Send + 'static,
495) -> Task<Result<T, E>>
496where
497    T: Send + 'static,
498    E: Send + 'static,
499{
500    let (sender, receiver) = mpsc::channel(1);
501    let (error_sender, error_receiver) = oneshot::channel();
502
503    let _ = thread::spawn(move || {
504        if let Err(error) = f(sender) {
505            let _ = error_sender.send(Err(error));
506        }
507    });
508
509    Task::stream(stream::select(
510        receiver.map(Ok),
511        stream::once(error_receiver).filter_map(async |result| result.ok()),
512    ))
513}