iced_runtime/
task.rs

1//! Create runtime tasks.
2use crate::Action;
3use crate::core::widget;
4use crate::futures::futures::channel::mpsc;
5use crate::futures::futures::channel::oneshot;
6use crate::futures::futures::future::{self, FutureExt};
7use crate::futures::futures::stream::{self, Stream, StreamExt};
8use crate::futures::{BoxStream, MaybeSend, boxed_stream};
9
10use std::convert::Infallible;
11use std::pin::Pin;
12use std::sync::Arc;
13use std::task;
14use std::thread;
15
16#[cfg(feature = "sipper")]
17#[doc(no_inline)]
18pub use sipper::{Never, Sender, Sipper, Straw, sipper, stream};
19
20/// A set of concurrent actions to be performed by the iced runtime.
21///
22/// A [`Task`] _may_ produce a bunch of values of type `T`.
23#[must_use = "`Task` must be returned to the runtime to take effect; normally in your `update` or `new` functions."]
24pub struct Task<T> {
25    stream: Option<BoxStream<Action<T>>>,
26    units: usize,
27}
28
29impl<T> Task<T> {
30    /// Creates a [`Task`] that does nothing.
31    pub fn none() -> Self {
32        Self {
33            stream: None,
34            units: 0,
35        }
36    }
37
38    /// Creates a new [`Task`] that instantly produces the given value.
39    pub fn done(value: T) -> Self
40    where
41        T: MaybeSend + 'static,
42    {
43        Self::future(future::ready(value))
44    }
45
46    /// Creates a [`Task`] that runs the given [`Future`] to completion and maps its
47    /// output with the given closure.
48    pub fn perform<A>(
49        future: impl Future<Output = A> + MaybeSend + 'static,
50        f: impl FnOnce(A) -> T + MaybeSend + 'static,
51    ) -> Self
52    where
53        T: MaybeSend + 'static,
54        A: MaybeSend + 'static,
55    {
56        Self::future(future.map(f))
57    }
58
59    /// Creates a [`Task`] that runs the given [`Stream`] to completion and maps each
60    /// item with the given closure.
61    pub fn run<A>(
62        stream: impl Stream<Item = A> + MaybeSend + 'static,
63        f: impl Fn(A) -> T + MaybeSend + 'static,
64    ) -> Self
65    where
66        T: 'static,
67    {
68        Self::stream(stream.map(f))
69    }
70
71    /// Creates a [`Task`] that runs the given [`Sipper`] to completion, mapping
72    /// progress with the first closure and the output with the second one.
73    #[cfg(feature = "sipper")]
74    pub fn sip<S>(
75        sipper: S,
76        on_progress: impl FnMut(S::Progress) -> T + MaybeSend + 'static,
77        on_output: impl FnOnce(<S as Future>::Output) -> T + MaybeSend + 'static,
78    ) -> Self
79    where
80        S: sipper::Core + MaybeSend + 'static,
81        T: MaybeSend + 'static,
82    {
83        Self::stream(stream(sipper::sipper(move |sender| async move {
84            on_output(sipper.with(on_progress).run(sender).await)
85        })))
86    }
87
88    /// Combines the given tasks and produces a single [`Task`] that will run all of them
89    /// in parallel.
90    pub fn batch(tasks: impl IntoIterator<Item = Self>) -> Self
91    where
92        T: 'static,
93    {
94        let mut select_all = stream::SelectAll::new();
95        let mut units = 0;
96
97        for task in tasks.into_iter() {
98            if let Some(stream) = task.stream {
99                select_all.push(stream);
100            }
101
102            units += task.units;
103        }
104
105        Self {
106            stream: Some(boxed_stream(select_all)),
107            units,
108        }
109    }
110
111    /// Maps the output of a [`Task`] with the given closure.
112    pub fn map<O>(
113        self,
114        mut f: impl FnMut(T) -> O + MaybeSend + 'static,
115    ) -> Task<O>
116    where
117        T: MaybeSend + 'static,
118        O: MaybeSend + 'static,
119    {
120        self.then(move |output| Task::done(f(output)))
121    }
122
123    /// Performs a new [`Task`] for every output of the current [`Task`] using the
124    /// given closure.
125    ///
126    /// This is the monadic interface of [`Task`]—analogous to [`Future`] and
127    /// [`Stream`].
128    pub fn then<O>(
129        self,
130        mut f: impl FnMut(T) -> Task<O> + MaybeSend + 'static,
131    ) -> Task<O>
132    where
133        T: MaybeSend + 'static,
134        O: MaybeSend + 'static,
135    {
136        Task {
137            stream: match self.stream {
138                None => None,
139                Some(stream) => {
140                    Some(boxed_stream(stream.flat_map(move |action| {
141                        match action.output() {
142                            Ok(output) => {
143                                f(output).stream.unwrap_or_else(|| {
144                                    boxed_stream(stream::empty())
145                                })
146                            }
147                            Err(action) => boxed_stream(stream::once(
148                                async move { action },
149                            )),
150                        }
151                    })))
152                }
153            },
154            units: self.units,
155        }
156    }
157
158    /// Chains a new [`Task`] to be performed once the current one finishes completely.
159    pub fn chain(self, task: Self) -> Self
160    where
161        T: 'static,
162    {
163        match self.stream {
164            None => task,
165            Some(first) => match task.stream {
166                None => Self {
167                    stream: Some(first),
168                    units: self.units,
169                },
170                Some(second) => Self {
171                    stream: Some(boxed_stream(first.chain(second))),
172                    units: self.units + task.units,
173                },
174            },
175        }
176    }
177
178    /// Creates a new [`Task`] that collects all the output of the current one into a [`Vec`].
179    pub fn collect(self) -> Task<Vec<T>>
180    where
181        T: MaybeSend + 'static,
182    {
183        match self.stream {
184            None => Task::done(Vec::new()),
185            Some(stream) => Task {
186                stream: Some(boxed_stream(
187                    stream::unfold(
188                        (stream, Some(Vec::new())),
189                        move |(mut stream, outputs)| async move {
190                            let mut outputs = outputs?;
191
192                            let Some(action) = stream.next().await else {
193                                return Some((
194                                    Some(Action::Output(outputs)),
195                                    (stream, None),
196                                ));
197                            };
198
199                            match action.output() {
200                                Ok(output) => {
201                                    outputs.push(output);
202
203                                    Some((None, (stream, Some(outputs))))
204                                }
205                                Err(action) => Some((
206                                    Some(action),
207                                    (stream, Some(outputs)),
208                                )),
209                            }
210                        },
211                    )
212                    .filter_map(future::ready),
213                )),
214                units: self.units,
215            },
216        }
217    }
218
219    /// Creates a new [`Task`] that discards the result of the current one.
220    ///
221    /// Useful if you only care about the side effects of a [`Task`].
222    pub fn discard<O>(self) -> Task<O>
223    where
224        T: MaybeSend + 'static,
225        O: MaybeSend + 'static,
226    {
227        self.then(|_| Task::none())
228    }
229
230    /// Creates a new [`Task`] that can be aborted with the returned [`Handle`].
231    pub fn abortable(self) -> (Self, Handle)
232    where
233        T: 'static,
234    {
235        let (stream, handle) = match self.stream {
236            Some(stream) => {
237                let (stream, handle) = stream::abortable(stream);
238
239                (Some(boxed_stream(stream)), InternalHandle::Manual(handle))
240            }
241            None => (
242                None,
243                InternalHandle::Manual(stream::AbortHandle::new_pair().0),
244            ),
245        };
246
247        (
248            Self {
249                stream,
250                units: self.units,
251            },
252            Handle { internal: handle },
253        )
254    }
255
256    /// Creates a new [`Task`] that runs the given [`Future`] and produces
257    /// its output.
258    pub fn future(future: impl Future<Output = T> + MaybeSend + 'static) -> Self
259    where
260        T: 'static,
261    {
262        Self::stream(stream::once(future))
263    }
264
265    /// Creates a new [`Task`] that runs the given [`Stream`] and produces
266    /// each of its items.
267    pub fn stream(stream: impl Stream<Item = T> + MaybeSend + 'static) -> Self
268    where
269        T: 'static,
270    {
271        Self {
272            stream: Some(boxed_stream(
273                stream::once(yield_now())
274                    .filter_map(|_| async { None })
275                    .chain(stream.map(Action::Output)),
276            )),
277            units: 1,
278        }
279    }
280
281    /// Returns the amount of work "units" of the [`Task`].
282    pub fn units(&self) -> usize {
283        self.units
284    }
285}
286
287impl<T> std::fmt::Debug for Task<T> {
288    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
289        f.debug_struct(&format!("Task<{}>", std::any::type_name::<T>()))
290            .field("units", &self.units)
291            .finish()
292    }
293}
294
295/// A handle to a [`Task`] that can be used for aborting it.
296#[derive(Debug, Clone)]
297pub struct Handle {
298    internal: InternalHandle,
299}
300
301#[derive(Debug, Clone)]
302enum InternalHandle {
303    Manual(stream::AbortHandle),
304    AbortOnDrop(Arc<stream::AbortHandle>),
305}
306
307impl InternalHandle {
308    pub fn as_ref(&self) -> &stream::AbortHandle {
309        match self {
310            InternalHandle::Manual(handle) => handle,
311            InternalHandle::AbortOnDrop(handle) => handle.as_ref(),
312        }
313    }
314}
315
316impl Handle {
317    /// Aborts the [`Task`] of this [`Handle`].
318    pub fn abort(&self) {
319        self.internal.as_ref().abort();
320    }
321
322    /// Returns a new [`Handle`] that will call [`Handle::abort`] whenever
323    /// all of its instances are dropped.
324    ///
325    /// If a [`Handle`] is cloned, [`Handle::abort`] will only be called
326    /// once all of the clones are dropped.
327    ///
328    /// This can be really useful if you do not want to worry about calling
329    /// [`Handle::abort`] yourself.
330    pub fn abort_on_drop(self) -> Self {
331        match &self.internal {
332            InternalHandle::Manual(handle) => Self {
333                internal: InternalHandle::AbortOnDrop(Arc::new(handle.clone())),
334            },
335            InternalHandle::AbortOnDrop(_) => self,
336        }
337    }
338
339    /// Returns `true` if the [`Task`] of this [`Handle`] has been aborted.
340    pub fn is_aborted(&self) -> bool {
341        self.internal.as_ref().is_aborted()
342    }
343}
344
345impl Drop for Handle {
346    fn drop(&mut self) {
347        if let InternalHandle::AbortOnDrop(handle) = &mut self.internal {
348            let handle = std::mem::replace(
349                handle,
350                Arc::new(stream::AbortHandle::new_pair().0),
351            );
352
353            if let Some(handle) = Arc::into_inner(handle) {
354                handle.abort();
355            }
356        }
357    }
358}
359
360impl<T> Task<Option<T>> {
361    /// Executes a new [`Task`] after this one, only when it produces `Some` value.
362    ///
363    /// The value is provided to the closure to create the subsequent [`Task`].
364    pub fn and_then<A>(
365        self,
366        f: impl Fn(T) -> Task<A> + MaybeSend + 'static,
367    ) -> Task<A>
368    where
369        T: MaybeSend + 'static,
370        A: MaybeSend + 'static,
371    {
372        self.then(move |option| option.map_or_else(Task::none, &f))
373    }
374}
375
376impl<T, E> Task<Result<T, E>> {
377    /// Executes a new [`Task`] after this one, only when it succeeds with an `Ok` value.
378    ///
379    /// The success value is provided to the closure to create the subsequent [`Task`].
380    pub fn and_then<A>(
381        self,
382        f: impl Fn(T) -> Task<Result<A, E>> + MaybeSend + 'static,
383    ) -> Task<Result<A, E>>
384    where
385        T: MaybeSend + 'static,
386        E: MaybeSend + 'static,
387        A: MaybeSend + 'static,
388    {
389        self.then(move |result| {
390            result.map_or_else(|error| Task::done(Err(error)), &f)
391        })
392    }
393
394    /// Maps the error type of this [`Task`] to a different one using the given
395    /// function.
396    pub fn map_err<E2>(
397        self,
398        f: impl Fn(E) -> E2 + MaybeSend + 'static,
399    ) -> Task<Result<T, E2>>
400    where
401        T: MaybeSend + 'static,
402        E: MaybeSend + 'static,
403        E2: MaybeSend + 'static,
404    {
405        self.map(move |result| result.map_err(&f))
406    }
407}
408
409impl<T> Default for Task<T> {
410    fn default() -> Self {
411        Self::none()
412    }
413}
414
415impl<T> From<()> for Task<T> {
416    fn from(_value: ()) -> Self {
417        Self::none()
418    }
419}
420
421/// Creates a new [`Task`] that runs the given [`widget::Operation`] and produces
422/// its output.
423pub fn widget<T>(operation: impl widget::Operation<T> + 'static) -> Task<T>
424where
425    T: Send + 'static,
426{
427    channel(move |sender| {
428        let operation =
429            widget::operation::map(Box::new(operation), move |value| {
430                let _ = sender.clone().try_send(value);
431            });
432
433        Action::Widget(Box::new(operation))
434    })
435}
436
437/// Creates a new [`Task`] that executes the [`Action`] returned by the closure and
438/// produces the value fed to the [`oneshot::Sender`].
439pub fn oneshot<T>(f: impl FnOnce(oneshot::Sender<T>) -> Action<T>) -> Task<T>
440where
441    T: MaybeSend + 'static,
442{
443    let (sender, receiver) = oneshot::channel();
444
445    let action = f(sender);
446
447    Task {
448        stream: Some(boxed_stream(stream::once(async move { action }).chain(
449            receiver.into_stream().filter_map(|result| async move {
450                Some(Action::Output(result.ok()?))
451            }),
452        ))),
453        units: 1,
454    }
455}
456
457/// Creates a new [`Task`] that executes the [`Action`] returned by the closure and
458/// produces the values fed to the [`mpsc::Sender`].
459pub fn channel<T>(f: impl FnOnce(mpsc::Sender<T>) -> Action<T>) -> Task<T>
460where
461    T: MaybeSend + 'static,
462{
463    let (sender, receiver) = mpsc::channel(1);
464
465    let action = f(sender);
466
467    Task {
468        stream: Some(boxed_stream(
469            stream::once(async move { action })
470                .chain(receiver.map(|result| Action::Output(result))),
471        )),
472        units: 1,
473    }
474}
475
476/// Creates a new [`Task`] that executes the given [`Action`] and produces no output.
477pub fn effect<T>(action: impl Into<Action<Infallible>>) -> Task<T> {
478    let action = action.into();
479
480    Task {
481        stream: Some(boxed_stream(stream::once(async move {
482            action.output().expect_err("no output")
483        }))),
484        units: 1,
485    }
486}
487
488/// Returns the underlying [`Stream`] of the [`Task`].
489pub fn into_stream<T>(task: Task<T>) -> Option<BoxStream<Action<T>>> {
490    task.stream
491}
492
493/// Creates a new [`Task`] that will run the given closure in a new thread.
494///
495/// Any data sent by the closure through the [`mpsc::Sender`] will be produced
496/// by the [`Task`].
497pub fn blocking<T>(f: impl FnOnce(mpsc::Sender<T>) + Send + 'static) -> Task<T>
498where
499    T: Send + 'static,
500{
501    let (sender, receiver) = mpsc::channel(1);
502
503    let _ = thread::spawn(move || {
504        f(sender);
505    });
506
507    Task::stream(receiver)
508}
509
510/// Creates a new [`Task`] that will run the given closure that can fail in a new
511/// thread.
512///
513/// Any data sent by the closure through the [`mpsc::Sender`] will be produced
514/// by the [`Task`].
515pub fn try_blocking<T, E>(
516    f: impl FnOnce(mpsc::Sender<T>) -> Result<(), E> + Send + 'static,
517) -> Task<Result<T, E>>
518where
519    T: Send + 'static,
520    E: Send + 'static,
521{
522    let (sender, receiver) = mpsc::channel(1);
523    let (error_sender, error_receiver) = oneshot::channel();
524
525    let _ = thread::spawn(move || {
526        if let Err(error) = f(sender) {
527            let _ = error_sender.send(Err(error));
528        }
529    });
530
531    Task::stream(stream::select(
532        receiver.map(Ok),
533        stream::once(error_receiver).filter_map(async |result| result.ok()),
534    ))
535}
536
537async fn yield_now() {
538    struct YieldNow {
539        yielded: bool,
540    }
541
542    impl Future for YieldNow {
543        type Output = ();
544
545        fn poll(
546            mut self: Pin<&mut Self>,
547            cx: &mut task::Context<'_>,
548        ) -> task::Poll<()> {
549            if self.yielded {
550                return task::Poll::Ready(());
551            }
552
553            self.yielded = true;
554
555            cx.waker().wake_by_ref();
556
557            task::Poll::Pending
558        }
559    }
560
561    YieldNow { yielded: false }.await;
562}