iced_runtime/
task.rs

1//! Create runtime tasks.
2use crate::Action;
3use crate::core::widget;
4use crate::futures::futures::channel::mpsc;
5use crate::futures::futures::channel::oneshot;
6use crate::futures::futures::future::{self, FutureExt};
7use crate::futures::futures::stream::{self, Stream, StreamExt};
8use crate::futures::{BoxStream, MaybeSend, boxed_stream};
9
10use std::convert::Infallible;
11use std::pin::Pin;
12use std::sync::Arc;
13use std::task;
14use std::thread;
15
16#[cfg(feature = "sipper")]
17#[doc(no_inline)]
18pub use sipper::{Never, Sender, Sipper, Straw, sipper, stream};
19
20/// A set of concurrent actions to be performed by the iced runtime.
21///
22/// A [`Task`] _may_ produce a bunch of values of type `T`.
23#[must_use = "`Task` must be returned to the runtime to take effect; normally in your `update` or `new` functions."]
24pub struct Task<T> {
25    stream: Option<BoxStream<Action<T>>>,
26    units: usize,
27}
28
29impl<T> Task<T> {
30    /// Creates a [`Task`] that does nothing.
31    pub fn none() -> Self {
32        Self {
33            stream: None,
34            units: 0,
35        }
36    }
37
38    /// Creates a new [`Task`] that instantly produces the given value.
39    pub fn done(value: T) -> Self
40    where
41        T: MaybeSend + 'static,
42    {
43        Self::future(future::ready(value))
44    }
45
46    /// Creates a [`Task`] that runs the given [`Future`] to completion and maps its
47    /// output with the given closure.
48    pub fn perform<A>(
49        future: impl Future<Output = A> + MaybeSend + 'static,
50        f: impl FnOnce(A) -> T + MaybeSend + 'static,
51    ) -> Self
52    where
53        T: MaybeSend + 'static,
54        A: MaybeSend + 'static,
55    {
56        Self::future(future.map(f))
57    }
58
59    /// Creates a [`Task`] that runs the given [`Stream`] to completion and maps each
60    /// item with the given closure.
61    pub fn run<A>(
62        stream: impl Stream<Item = A> + MaybeSend + 'static,
63        f: impl Fn(A) -> T + MaybeSend + 'static,
64    ) -> Self
65    where
66        T: 'static,
67    {
68        Self::stream(stream.map(f))
69    }
70
71    /// Creates a [`Task`] that runs the given [`Sipper`] to completion, mapping
72    /// progress with the first closure and the output with the second one.
73    #[cfg(feature = "sipper")]
74    pub fn sip<S>(
75        sipper: S,
76        on_progress: impl FnMut(S::Progress) -> T + MaybeSend + 'static,
77        on_output: impl FnOnce(<S as Future>::Output) -> T + MaybeSend + 'static,
78    ) -> Self
79    where
80        S: sipper::Core + MaybeSend + 'static,
81        T: MaybeSend + 'static,
82    {
83        Self::stream(stream(sipper::sipper(move |sender| async move {
84            on_output(sipper.with(on_progress).run(sender).await)
85        })))
86    }
87
88    /// Combines the given tasks and produces a single [`Task`] that will run all of them
89    /// in parallel.
90    pub fn batch(tasks: impl IntoIterator<Item = Self>) -> Self
91    where
92        T: 'static,
93    {
94        let mut select_all = stream::SelectAll::new();
95        let mut units = 0;
96
97        for task in tasks.into_iter() {
98            if let Some(stream) = task.stream {
99                select_all.push(stream);
100            }
101
102            units += task.units;
103        }
104
105        Self {
106            stream: Some(boxed_stream(select_all)),
107            units,
108        }
109    }
110
111    /// Maps the output of a [`Task`] with the given closure.
112    pub fn map<O>(self, mut f: impl FnMut(T) -> O + MaybeSend + 'static) -> Task<O>
113    where
114        T: MaybeSend + 'static,
115        O: MaybeSend + 'static,
116    {
117        self.then(move |output| Task::done(f(output)))
118    }
119
120    /// Performs a new [`Task`] for every output of the current [`Task`] using the
121    /// given closure.
122    ///
123    /// This is the monadic interface of [`Task`]—analogous to [`Future`] and
124    /// [`Stream`].
125    pub fn then<O>(self, mut f: impl FnMut(T) -> Task<O> + MaybeSend + 'static) -> Task<O>
126    where
127        T: MaybeSend + 'static,
128        O: MaybeSend + 'static,
129    {
130        Task {
131            stream: match self.stream {
132                None => None,
133                Some(stream) => Some(boxed_stream(stream.flat_map(move |action| {
134                    match action.output() {
135                        Ok(output) => f(output)
136                            .stream
137                            .unwrap_or_else(|| boxed_stream(stream::empty())),
138                        Err(action) => boxed_stream(stream::once(async move { action })),
139                    }
140                }))),
141            },
142            units: self.units,
143        }
144    }
145
146    /// Chains a new [`Task`] to be performed once the current one finishes completely.
147    pub fn chain(self, task: Self) -> Self
148    where
149        T: 'static,
150    {
151        match self.stream {
152            None => task,
153            Some(first) => match task.stream {
154                None => Self {
155                    stream: Some(first),
156                    units: self.units,
157                },
158                Some(second) => Self {
159                    stream: Some(boxed_stream(first.chain(second))),
160                    units: self.units + task.units,
161                },
162            },
163        }
164    }
165
166    /// Creates a new [`Task`] that collects all the output of the current one into a [`Vec`].
167    pub fn collect(self) -> Task<Vec<T>>
168    where
169        T: MaybeSend + 'static,
170    {
171        match self.stream {
172            None => Task::done(Vec::new()),
173            Some(stream) => Task {
174                stream: Some(boxed_stream(
175                    stream::unfold(
176                        (stream, Some(Vec::new())),
177                        move |(mut stream, outputs)| async move {
178                            let mut outputs = outputs?;
179
180                            let Some(action) = stream.next().await else {
181                                return Some((Some(Action::Output(outputs)), (stream, None)));
182                            };
183
184                            match action.output() {
185                                Ok(output) => {
186                                    outputs.push(output);
187
188                                    Some((None, (stream, Some(outputs))))
189                                }
190                                Err(action) => Some((Some(action), (stream, Some(outputs)))),
191                            }
192                        },
193                    )
194                    .filter_map(future::ready),
195                )),
196                units: self.units,
197            },
198        }
199    }
200
201    /// Creates a new [`Task`] that discards the result of the current one.
202    ///
203    /// Useful if you only care about the side effects of a [`Task`].
204    pub fn discard<O>(self) -> Task<O>
205    where
206        T: MaybeSend + 'static,
207        O: MaybeSend + 'static,
208    {
209        self.then(|_| Task::none())
210    }
211
212    /// Creates a new [`Task`] that can be aborted with the returned [`Handle`].
213    pub fn abortable(self) -> (Self, Handle)
214    where
215        T: 'static,
216    {
217        let (stream, handle) = match self.stream {
218            Some(stream) => {
219                let (stream, handle) = stream::abortable(stream);
220
221                (Some(boxed_stream(stream)), InternalHandle::Manual(handle))
222            }
223            None => (
224                None,
225                InternalHandle::Manual(stream::AbortHandle::new_pair().0),
226            ),
227        };
228
229        (
230            Self {
231                stream,
232                units: self.units,
233            },
234            Handle { internal: handle },
235        )
236    }
237
238    /// Creates a new [`Task`] that runs the given [`Future`] and produces
239    /// its output.
240    pub fn future(future: impl Future<Output = T> + MaybeSend + 'static) -> Self
241    where
242        T: 'static,
243    {
244        Self::stream(stream::once(future))
245    }
246
247    /// Creates a new [`Task`] that runs the given [`Stream`] and produces
248    /// each of its items.
249    pub fn stream(stream: impl Stream<Item = T> + MaybeSend + 'static) -> Self
250    where
251        T: 'static,
252    {
253        Self {
254            stream: Some(boxed_stream(
255                stream::once(yield_now())
256                    .filter_map(|_| async { None })
257                    .chain(stream.map(Action::Output)),
258            )),
259            units: 1,
260        }
261    }
262
263    /// Returns the amount of work "units" of the [`Task`].
264    pub fn units(&self) -> usize {
265        self.units
266    }
267}
268
269impl<T> std::fmt::Debug for Task<T> {
270    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
271        f.debug_struct(&format!("Task<{}>", std::any::type_name::<T>()))
272            .field("units", &self.units)
273            .finish()
274    }
275}
276
277/// A handle to a [`Task`] that can be used for aborting it.
278#[derive(Debug, Clone)]
279pub struct Handle {
280    internal: InternalHandle,
281}
282
283#[derive(Debug, Clone)]
284enum InternalHandle {
285    Manual(stream::AbortHandle),
286    AbortOnDrop(Arc<stream::AbortHandle>),
287}
288
289impl InternalHandle {
290    pub fn as_ref(&self) -> &stream::AbortHandle {
291        match self {
292            InternalHandle::Manual(handle) => handle,
293            InternalHandle::AbortOnDrop(handle) => handle.as_ref(),
294        }
295    }
296}
297
298impl Handle {
299    /// Aborts the [`Task`] of this [`Handle`].
300    pub fn abort(&self) {
301        self.internal.as_ref().abort();
302    }
303
304    /// Returns a new [`Handle`] that will call [`Handle::abort`] whenever
305    /// all of its instances are dropped.
306    ///
307    /// If a [`Handle`] is cloned, [`Handle::abort`] will only be called
308    /// once all of the clones are dropped.
309    ///
310    /// This can be really useful if you do not want to worry about calling
311    /// [`Handle::abort`] yourself.
312    pub fn abort_on_drop(self) -> Self {
313        match &self.internal {
314            InternalHandle::Manual(handle) => Self {
315                internal: InternalHandle::AbortOnDrop(Arc::new(handle.clone())),
316            },
317            InternalHandle::AbortOnDrop(_) => self,
318        }
319    }
320
321    /// Returns `true` if the [`Task`] of this [`Handle`] has been aborted.
322    pub fn is_aborted(&self) -> bool {
323        self.internal.as_ref().is_aborted()
324    }
325}
326
327impl Drop for Handle {
328    fn drop(&mut self) {
329        if let InternalHandle::AbortOnDrop(handle) = &mut self.internal {
330            let handle = std::mem::replace(handle, Arc::new(stream::AbortHandle::new_pair().0));
331
332            if let Some(handle) = Arc::into_inner(handle) {
333                handle.abort();
334            }
335        }
336    }
337}
338
339impl<T> Task<Option<T>> {
340    /// Executes a new [`Task`] after this one, only when it produces `Some` value.
341    ///
342    /// The value is provided to the closure to create the subsequent [`Task`].
343    pub fn and_then<A>(self, f: impl Fn(T) -> Task<A> + MaybeSend + 'static) -> Task<A>
344    where
345        T: MaybeSend + 'static,
346        A: MaybeSend + 'static,
347    {
348        self.then(move |option| option.map_or_else(Task::none, &f))
349    }
350}
351
352impl<T, E> Task<Result<T, E>> {
353    /// Executes a new [`Task`] after this one, only when it succeeds with an `Ok` value.
354    ///
355    /// The success value is provided to the closure to create the subsequent [`Task`].
356    pub fn and_then<A>(
357        self,
358        f: impl Fn(T) -> Task<Result<A, E>> + MaybeSend + 'static,
359    ) -> Task<Result<A, E>>
360    where
361        T: MaybeSend + 'static,
362        E: MaybeSend + 'static,
363        A: MaybeSend + 'static,
364    {
365        self.then(move |result| result.map_or_else(|error| Task::done(Err(error)), &f))
366    }
367
368    /// Maps the error type of this [`Task`] to a different one using the given
369    /// function.
370    pub fn map_err<E2>(self, f: impl Fn(E) -> E2 + MaybeSend + 'static) -> Task<Result<T, E2>>
371    where
372        T: MaybeSend + 'static,
373        E: MaybeSend + 'static,
374        E2: MaybeSend + 'static,
375    {
376        self.map(move |result| result.map_err(&f))
377    }
378}
379
380impl<T> Default for Task<T> {
381    fn default() -> Self {
382        Self::none()
383    }
384}
385
386impl<T> From<()> for Task<T> {
387    fn from(_value: ()) -> Self {
388        Self::none()
389    }
390}
391
392/// Creates a new [`Task`] that runs the given [`widget::Operation`] and produces
393/// its output.
394pub fn widget<T>(operation: impl widget::Operation<T> + 'static) -> Task<T>
395where
396    T: Send + 'static,
397{
398    channel(move |sender| {
399        let operation = widget::operation::map(Box::new(operation), move |value| {
400            let _ = sender.clone().try_send(value);
401        });
402
403        Action::Widget(Box::new(operation))
404    })
405}
406
407/// Creates a new [`Task`] that executes the [`Action`] returned by the closure and
408/// produces the value fed to the [`oneshot::Sender`].
409pub fn oneshot<T>(f: impl FnOnce(oneshot::Sender<T>) -> Action<T>) -> Task<T>
410where
411    T: MaybeSend + 'static,
412{
413    let (sender, receiver) = oneshot::channel();
414
415    let action = f(sender);
416
417    Task {
418        stream: Some(boxed_stream(
419            stream::once(async move { action }).chain(
420                receiver
421                    .into_stream()
422                    .filter_map(|result| async move { Some(Action::Output(result.ok()?)) }),
423            ),
424        )),
425        units: 1,
426    }
427}
428
429/// Creates a new [`Task`] that executes the [`Action`] returned by the closure and
430/// produces the values fed to the [`mpsc::Sender`].
431pub fn channel<T>(f: impl FnOnce(mpsc::Sender<T>) -> Action<T>) -> Task<T>
432where
433    T: MaybeSend + 'static,
434{
435    let (sender, receiver) = mpsc::channel(1);
436
437    let action = f(sender);
438
439    Task {
440        stream: Some(boxed_stream(
441            stream::once(async move { action })
442                .chain(receiver.map(|result| Action::Output(result))),
443        )),
444        units: 1,
445    }
446}
447
448/// Creates a new [`Task`] that executes the given [`Action`] and produces no output.
449pub fn effect<T>(action: impl Into<Action<Infallible>>) -> Task<T> {
450    let action = action.into();
451
452    Task {
453        stream: Some(boxed_stream(stream::once(async move {
454            action.output().expect_err("no output")
455        }))),
456        units: 1,
457    }
458}
459
460/// Returns the underlying [`Stream`] of the [`Task`].
461pub fn into_stream<T>(task: Task<T>) -> Option<BoxStream<Action<T>>> {
462    task.stream
463}
464
465/// Creates a new [`Task`] that will run the given closure in a new thread.
466///
467/// Any data sent by the closure through the [`mpsc::Sender`] will be produced
468/// by the [`Task`].
469pub fn blocking<T>(f: impl FnOnce(mpsc::Sender<T>) + Send + 'static) -> Task<T>
470where
471    T: Send + 'static,
472{
473    let (sender, receiver) = mpsc::channel(1);
474
475    let _ = thread::spawn(move || {
476        f(sender);
477    });
478
479    Task::stream(receiver)
480}
481
482/// Creates a new [`Task`] that will run the given closure that can fail in a new
483/// thread.
484///
485/// Any data sent by the closure through the [`mpsc::Sender`] will be produced
486/// by the [`Task`].
487pub fn try_blocking<T, E>(
488    f: impl FnOnce(mpsc::Sender<T>) -> Result<(), E> + Send + 'static,
489) -> Task<Result<T, E>>
490where
491    T: Send + 'static,
492    E: Send + 'static,
493{
494    let (sender, receiver) = mpsc::channel(1);
495    let (error_sender, error_receiver) = oneshot::channel();
496
497    let _ = thread::spawn(move || {
498        if let Err(error) = f(sender) {
499            let _ = error_sender.send(Err(error));
500        }
501    });
502
503    Task::stream(stream::select(
504        receiver.map(Ok),
505        stream::once(error_receiver).filter_map(async |result| result.ok()),
506    ))
507}
508
509async fn yield_now() {
510    struct YieldNow {
511        yielded: bool,
512    }
513
514    impl Future for YieldNow {
515        type Output = ();
516
517        fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> task::Poll<()> {
518            if self.yielded {
519                return task::Poll::Ready(());
520            }
521
522            self.yielded = true;
523
524            cx.waker().wake_by_ref();
525
526            task::Poll::Pending
527        }
528    }
529
530    YieldNow { yielded: false }.await;
531}