Skip to main content

iced_runtime/
task.rs

1//! Create runtime tasks.
2use crate::Action;
3use crate::core::widget;
4use crate::futures::futures::channel::mpsc;
5use crate::futures::futures::channel::oneshot;
6use crate::futures::futures::future::{self, FutureExt};
7use crate::futures::futures::stream::{self, Stream, StreamExt};
8use crate::futures::{BoxStream, MaybeSend, boxed_stream};
9
10use std::convert::Infallible;
11use std::pin::Pin;
12use std::sync::Arc;
13use std::task;
14use std::thread;
15
16#[cfg(feature = "sipper")]
17#[doc(no_inline)]
18pub use sipper::{Never, Sender, Sipper, Straw, sipper, stream};
19
20/// A set of concurrent actions to be performed by the iced runtime.
21///
22/// A [`Task`] _may_ produce a bunch of values of type `T`.
23#[must_use = "`Task` must be returned to the runtime to take effect; normally in your `update` or `new` functions."]
24pub struct Task<T> {
25    stream: Option<BoxStream<Action<T>>>,
26    units: usize,
27}
28
29impl<T> Task<T> {
30    /// Creates a [`Task`] that does nothing.
31    pub fn none() -> Self {
32        Self {
33            stream: None,
34            units: 0,
35        }
36    }
37
38    /// Creates a new [`Task`] that instantly produces the given value.
39    pub fn done(value: T) -> Self
40    where
41        T: MaybeSend + 'static,
42    {
43        Self {
44            stream: Some(boxed_stream(stream::once(future::ready(Action::Output(
45                value,
46            ))))),
47            units: 0,
48        }
49    }
50
51    /// Creates a [`Task`] that runs the given [`Future`] to completion and maps its
52    /// output with the given closure.
53    pub fn perform<A>(
54        future: impl Future<Output = A> + MaybeSend + 'static,
55        f: impl FnOnce(A) -> T + MaybeSend + 'static,
56    ) -> Self
57    where
58        T: MaybeSend + 'static,
59        A: MaybeSend + 'static,
60    {
61        Self::future(future.map(f))
62    }
63
64    /// Creates a [`Task`] that runs the given [`Stream`] to completion and maps each
65    /// item with the given closure.
66    pub fn run<A>(
67        stream: impl Stream<Item = A> + MaybeSend + 'static,
68        f: impl Fn(A) -> T + MaybeSend + 'static,
69    ) -> Self
70    where
71        T: 'static,
72    {
73        Self::stream(stream.map(f))
74    }
75
76    /// Creates a [`Task`] that runs the given [`Sipper`] to completion, mapping
77    /// progress with the first closure and the output with the second one.
78    #[cfg(feature = "sipper")]
79    pub fn sip<S>(
80        sipper: S,
81        on_progress: impl FnMut(S::Progress) -> T + MaybeSend + 'static,
82        on_output: impl FnOnce(<S as Future>::Output) -> T + MaybeSend + 'static,
83    ) -> Self
84    where
85        S: sipper::Core + MaybeSend + 'static,
86        T: MaybeSend + 'static,
87    {
88        Self::stream(stream(sipper::sipper(move |sender| async move {
89            on_output(sipper.with(on_progress).run(sender).await)
90        })))
91    }
92
93    /// Combines the given tasks and produces a single [`Task`] that will run all of them
94    /// in parallel.
95    pub fn batch(tasks: impl IntoIterator<Item = Self>) -> Self
96    where
97        T: 'static,
98    {
99        let mut select_all = stream::SelectAll::new();
100        let mut units = 0;
101
102        for task in tasks.into_iter() {
103            if let Some(stream) = task.stream {
104                select_all.push(stream);
105            }
106
107            units += task.units;
108        }
109
110        Self {
111            stream: Some(boxed_stream(select_all)),
112            units,
113        }
114    }
115
116    /// Maps the output of a [`Task`] with the given closure.
117    pub fn map<O>(self, mut f: impl FnMut(T) -> O + MaybeSend + 'static) -> Task<O>
118    where
119        T: MaybeSend + 'static,
120        O: MaybeSend + 'static,
121    {
122        self.then(move |output| Task::done(f(output)))
123    }
124
125    /// Performs a new [`Task`] for every output of the current [`Task`] using the
126    /// given closure.
127    ///
128    /// This is the monadic interface of [`Task`]—analogous to [`Future`] and
129    /// [`Stream`].
130    pub fn then<O>(self, mut f: impl FnMut(T) -> Task<O> + MaybeSend + 'static) -> Task<O>
131    where
132        T: MaybeSend + 'static,
133        O: MaybeSend + 'static,
134    {
135        Task {
136            stream: match self.stream {
137                None => None,
138                Some(stream) => Some(boxed_stream(stream.flat_map(move |action| {
139                    match action.output() {
140                        Ok(output) => f(output)
141                            .stream
142                            .unwrap_or_else(|| boxed_stream(stream::empty())),
143                        Err(action) => boxed_stream(stream::once(async move { action })),
144                    }
145                }))),
146            },
147            units: self.units,
148        }
149    }
150
151    /// Chains a new [`Task`] to be performed once the current one finishes completely.
152    pub fn chain(self, task: Self) -> Self
153    where
154        T: 'static,
155    {
156        match self.stream {
157            None => task,
158            Some(first) => match task.stream {
159                None => Self {
160                    stream: Some(first),
161                    units: self.units,
162                },
163                Some(second) => Self {
164                    stream: Some(boxed_stream(first.chain(second))),
165                    units: self.units + task.units,
166                },
167            },
168        }
169    }
170
171    /// Creates a new [`Task`] that collects all the output of the current one into a [`Vec`].
172    pub fn collect(self) -> Task<Vec<T>>
173    where
174        T: MaybeSend + 'static,
175    {
176        match self.stream {
177            None => Task::done(Vec::new()),
178            Some(stream) => Task {
179                stream: Some(boxed_stream(
180                    stream::unfold(
181                        (stream, Some(Vec::new())),
182                        move |(mut stream, outputs)| async move {
183                            let mut outputs = outputs?;
184
185                            let Some(action) = stream.next().await else {
186                                return Some((Some(Action::Output(outputs)), (stream, None)));
187                            };
188
189                            match action.output() {
190                                Ok(output) => {
191                                    outputs.push(output);
192
193                                    Some((None, (stream, Some(outputs))))
194                                }
195                                Err(action) => Some((Some(action), (stream, Some(outputs)))),
196                            }
197                        },
198                    )
199                    .filter_map(future::ready),
200                )),
201                units: self.units,
202            },
203        }
204    }
205
206    /// Creates a new [`Task`] that discards the result of the current one.
207    ///
208    /// Useful if you only care about the side effects of a [`Task`].
209    pub fn discard<O>(self) -> Task<O>
210    where
211        T: MaybeSend + 'static,
212        O: MaybeSend + 'static,
213    {
214        self.then(|_| Task::none())
215    }
216
217    /// Creates a new [`Task`] that can be aborted with the returned [`Handle`].
218    pub fn abortable(self) -> (Self, Handle)
219    where
220        T: 'static,
221    {
222        let (stream, handle) = match self.stream {
223            Some(stream) => {
224                let (stream, handle) = stream::abortable(stream);
225
226                (Some(boxed_stream(stream)), InternalHandle::Manual(handle))
227            }
228            None => (
229                None,
230                InternalHandle::Manual(stream::AbortHandle::new_pair().0),
231            ),
232        };
233
234        (
235            Self {
236                stream,
237                units: self.units,
238            },
239            Handle { internal: handle },
240        )
241    }
242
243    /// Creates a new [`Task`] that runs the given [`Future`] and produces
244    /// its output.
245    pub fn future(future: impl Future<Output = T> + MaybeSend + 'static) -> Self
246    where
247        T: 'static,
248    {
249        Self::stream(stream::once(future))
250    }
251
252    /// Creates a new [`Task`] that runs the given [`Stream`] and produces
253    /// each of its items.
254    pub fn stream(stream: impl Stream<Item = T> + MaybeSend + 'static) -> Self
255    where
256        T: 'static,
257    {
258        Self {
259            stream: Some(boxed_stream(
260                stream::once(yield_now())
261                    .filter_map(|_| async { None })
262                    .chain(stream.map(Action::Output)),
263            )),
264            units: 1,
265        }
266    }
267
268    /// Returns the amount of work "units" of the [`Task`].
269    pub fn units(&self) -> usize {
270        self.units
271    }
272}
273
274impl<T> std::fmt::Debug for Task<T> {
275    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
276        f.debug_struct(&format!("Task<{}>", std::any::type_name::<T>()))
277            .field("units", &self.units)
278            .finish()
279    }
280}
281
282/// A handle to a [`Task`] that can be used for aborting it.
283#[derive(Debug, Clone)]
284pub struct Handle {
285    internal: InternalHandle,
286}
287
288#[derive(Debug, Clone)]
289enum InternalHandle {
290    Manual(stream::AbortHandle),
291    AbortOnDrop(Arc<stream::AbortHandle>),
292}
293
294impl InternalHandle {
295    pub fn as_ref(&self) -> &stream::AbortHandle {
296        match self {
297            InternalHandle::Manual(handle) => handle,
298            InternalHandle::AbortOnDrop(handle) => handle.as_ref(),
299        }
300    }
301}
302
303impl Handle {
304    /// Aborts the [`Task`] of this [`Handle`].
305    pub fn abort(&self) {
306        self.internal.as_ref().abort();
307    }
308
309    /// Returns a new [`Handle`] that will call [`Handle::abort`] whenever
310    /// all of its instances are dropped.
311    ///
312    /// If a [`Handle`] is cloned, [`Handle::abort`] will only be called
313    /// once all of the clones are dropped.
314    ///
315    /// This can be really useful if you do not want to worry about calling
316    /// [`Handle::abort`] yourself.
317    pub fn abort_on_drop(self) -> Self {
318        match &self.internal {
319            InternalHandle::Manual(handle) => Self {
320                internal: InternalHandle::AbortOnDrop(Arc::new(handle.clone())),
321            },
322            InternalHandle::AbortOnDrop(_) => self,
323        }
324    }
325
326    /// Returns `true` if the [`Task`] of this [`Handle`] has been aborted.
327    pub fn is_aborted(&self) -> bool {
328        self.internal.as_ref().is_aborted()
329    }
330}
331
332impl Drop for Handle {
333    fn drop(&mut self) {
334        if let InternalHandle::AbortOnDrop(handle) = &mut self.internal {
335            let handle = std::mem::replace(handle, Arc::new(stream::AbortHandle::new_pair().0));
336
337            if let Some(handle) = Arc::into_inner(handle) {
338                handle.abort();
339            }
340        }
341    }
342}
343
344impl<T> Task<Option<T>> {
345    /// Executes a new [`Task`] after this one, only when it produces `Some` value.
346    ///
347    /// The value is provided to the closure to create the subsequent [`Task`].
348    pub fn and_then<A>(self, f: impl Fn(T) -> Task<A> + MaybeSend + 'static) -> Task<A>
349    where
350        T: MaybeSend + 'static,
351        A: MaybeSend + 'static,
352    {
353        self.then(move |option| option.map_or_else(Task::none, &f))
354    }
355}
356
357impl<T, E> Task<Result<T, E>> {
358    /// Executes a new [`Task`] after this one, only when it succeeds with an `Ok` value.
359    ///
360    /// The success value is provided to the closure to create the subsequent [`Task`].
361    pub fn and_then<A>(
362        self,
363        f: impl Fn(T) -> Task<Result<A, E>> + MaybeSend + 'static,
364    ) -> Task<Result<A, E>>
365    where
366        T: MaybeSend + 'static,
367        E: MaybeSend + 'static,
368        A: MaybeSend + 'static,
369    {
370        self.then(move |result| result.map_or_else(|error| Task::done(Err(error)), &f))
371    }
372
373    /// Maps the error type of this [`Task`] to a different one using the given
374    /// function.
375    pub fn map_err<E2>(self, f: impl Fn(E) -> E2 + MaybeSend + 'static) -> Task<Result<T, E2>>
376    where
377        T: MaybeSend + 'static,
378        E: MaybeSend + 'static,
379        E2: MaybeSend + 'static,
380    {
381        self.map(move |result| result.map_err(&f))
382    }
383}
384
385impl<T> Default for Task<T> {
386    fn default() -> Self {
387        Self::none()
388    }
389}
390
391impl<T> From<()> for Task<T> {
392    fn from(_value: ()) -> Self {
393        Self::none()
394    }
395}
396
397/// Creates a new [`Task`] that runs the given [`widget::Operation`] and produces
398/// its output.
399pub fn widget<T>(operation: impl widget::Operation<T> + 'static) -> Task<T>
400where
401    T: Send + 'static,
402{
403    channel(move |sender| {
404        let operation = widget::operation::map(Box::new(operation), move |value| {
405            let _ = sender.clone().try_send(value);
406        });
407
408        Action::Widget(Box::new(operation))
409    })
410}
411
412/// Creates a new [`Task`] that executes the [`Action`] returned by the closure and
413/// produces the value fed to the [`oneshot::Sender`].
414pub fn oneshot<T>(f: impl FnOnce(oneshot::Sender<T>) -> Action<T>) -> Task<T>
415where
416    T: MaybeSend + 'static,
417{
418    let (sender, receiver) = oneshot::channel();
419
420    let action = f(sender);
421
422    Task {
423        stream: Some(boxed_stream(
424            stream::once(async move { action }).chain(
425                receiver
426                    .into_stream()
427                    .filter_map(|result| async move { Some(Action::Output(result.ok()?)) }),
428            ),
429        )),
430        units: 1,
431    }
432}
433
434/// Creates a new [`Task`] that executes the [`Action`] returned by the closure and
435/// produces the values fed to the [`mpsc::Sender`].
436pub fn channel<T>(f: impl FnOnce(mpsc::Sender<T>) -> Action<T>) -> Task<T>
437where
438    T: MaybeSend + 'static,
439{
440    let (sender, receiver) = mpsc::channel(1);
441
442    let action = f(sender);
443
444    Task {
445        stream: Some(boxed_stream(
446            stream::once(async move { action })
447                .chain(receiver.map(|result| Action::Output(result))),
448        )),
449        units: 1,
450    }
451}
452
453/// Creates a new [`Task`] that executes the given [`Action`] and produces no output.
454pub fn effect<T>(action: impl Into<Action<Infallible>>) -> Task<T> {
455    let action = action.into();
456
457    Task {
458        stream: Some(boxed_stream(stream::once(async move {
459            action.output().expect_err("no output")
460        }))),
461        units: 1,
462    }
463}
464
465/// Returns the underlying [`Stream`] of the [`Task`].
466pub fn into_stream<T>(task: Task<T>) -> Option<BoxStream<Action<T>>> {
467    task.stream
468}
469
470/// Creates a new [`Task`] that will run the given closure in a new thread.
471///
472/// Any data sent by the closure through the [`mpsc::Sender`] will be produced
473/// by the [`Task`].
474pub fn blocking<T>(f: impl FnOnce(mpsc::Sender<T>) + Send + 'static) -> Task<T>
475where
476    T: Send + 'static,
477{
478    let (sender, receiver) = mpsc::channel(1);
479
480    let _ = thread::spawn(move || {
481        f(sender);
482    });
483
484    Task::stream(receiver)
485}
486
487/// Creates a new [`Task`] that will run the given closure that can fail in a new
488/// thread.
489///
490/// Any data sent by the closure through the [`mpsc::Sender`] will be produced
491/// by the [`Task`].
492pub fn try_blocking<T, E>(
493    f: impl FnOnce(mpsc::Sender<T>) -> Result<(), E> + Send + 'static,
494) -> Task<Result<T, E>>
495where
496    T: Send + 'static,
497    E: Send + 'static,
498{
499    let (sender, receiver) = mpsc::channel(1);
500    let (error_sender, error_receiver) = oneshot::channel();
501
502    let _ = thread::spawn(move || {
503        if let Err(error) = f(sender) {
504            let _ = error_sender.send(Err(error));
505        }
506    });
507
508    Task::stream(stream::select(
509        receiver.map(Ok),
510        stream::once(error_receiver).filter_map(async |result| result.ok()),
511    ))
512}
513
514async fn yield_now() {
515    struct YieldNow {
516        yielded: bool,
517    }
518
519    impl Future for YieldNow {
520        type Output = ();
521
522        fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> task::Poll<()> {
523            if self.yielded {
524                return task::Poll::Ready(());
525            }
526
527            self.yielded = true;
528
529            cx.waker().wake_by_ref();
530
531            task::Poll::Pending
532        }
533    }
534
535    YieldNow { yielded: false }.await;
536}