1use crate::Action;
3use crate::core::widget;
4use crate::futures::futures::channel::mpsc;
5use crate::futures::futures::channel::oneshot;
6use crate::futures::futures::future::{self, FutureExt};
7use crate::futures::futures::stream::{self, Stream, StreamExt};
8use crate::futures::{BoxStream, MaybeSend, boxed_stream};
9
10use std::convert::Infallible;
11use std::pin::Pin;
12use std::sync::Arc;
13use std::task;
14use std::thread;
15
16#[cfg(feature = "sipper")]
17#[doc(no_inline)]
18pub use sipper::{Never, Sender, Sipper, Straw, sipper, stream};
19
20#[must_use = "`Task` must be returned to the runtime to take effect; normally in your `update` or `new` functions."]
24pub struct Task<T> {
25 stream: Option<BoxStream<Action<T>>>,
26 units: usize,
27}
28
29impl<T> Task<T> {
30 pub fn none() -> Self {
32 Self {
33 stream: None,
34 units: 0,
35 }
36 }
37
38 pub fn done(value: T) -> Self
40 where
41 T: MaybeSend + 'static,
42 {
43 Self::future(future::ready(value))
44 }
45
46 pub fn perform<A>(
49 future: impl Future<Output = A> + MaybeSend + 'static,
50 f: impl FnOnce(A) -> T + MaybeSend + 'static,
51 ) -> Self
52 where
53 T: MaybeSend + 'static,
54 A: MaybeSend + 'static,
55 {
56 Self::future(future.map(f))
57 }
58
59 pub fn run<A>(
62 stream: impl Stream<Item = A> + MaybeSend + 'static,
63 f: impl Fn(A) -> T + MaybeSend + 'static,
64 ) -> Self
65 where
66 T: 'static,
67 {
68 Self::stream(stream.map(f))
69 }
70
71 #[cfg(feature = "sipper")]
74 pub fn sip<S>(
75 sipper: S,
76 on_progress: impl FnMut(S::Progress) -> T + MaybeSend + 'static,
77 on_output: impl FnOnce(<S as Future>::Output) -> T + MaybeSend + 'static,
78 ) -> Self
79 where
80 S: sipper::Core + MaybeSend + 'static,
81 T: MaybeSend + 'static,
82 {
83 Self::stream(stream(sipper::sipper(move |sender| async move {
84 on_output(sipper.with(on_progress).run(sender).await)
85 })))
86 }
87
88 pub fn batch(tasks: impl IntoIterator<Item = Self>) -> Self
91 where
92 T: 'static,
93 {
94 let mut select_all = stream::SelectAll::new();
95 let mut units = 0;
96
97 for task in tasks.into_iter() {
98 if let Some(stream) = task.stream {
99 select_all.push(stream);
100 }
101
102 units += task.units;
103 }
104
105 Self {
106 stream: Some(boxed_stream(select_all)),
107 units,
108 }
109 }
110
111 pub fn map<O>(
113 self,
114 mut f: impl FnMut(T) -> O + MaybeSend + 'static,
115 ) -> Task<O>
116 where
117 T: MaybeSend + 'static,
118 O: MaybeSend + 'static,
119 {
120 self.then(move |output| Task::done(f(output)))
121 }
122
123 pub fn then<O>(
129 self,
130 mut f: impl FnMut(T) -> Task<O> + MaybeSend + 'static,
131 ) -> Task<O>
132 where
133 T: MaybeSend + 'static,
134 O: MaybeSend + 'static,
135 {
136 Task {
137 stream: match self.stream {
138 None => None,
139 Some(stream) => {
140 Some(boxed_stream(stream.flat_map(move |action| {
141 match action.output() {
142 Ok(output) => {
143 f(output).stream.unwrap_or_else(|| {
144 boxed_stream(stream::empty())
145 })
146 }
147 Err(action) => boxed_stream(stream::once(
148 async move { action },
149 )),
150 }
151 })))
152 }
153 },
154 units: self.units,
155 }
156 }
157
158 pub fn chain(self, task: Self) -> Self
160 where
161 T: 'static,
162 {
163 match self.stream {
164 None => task,
165 Some(first) => match task.stream {
166 None => Self {
167 stream: Some(first),
168 units: self.units,
169 },
170 Some(second) => Self {
171 stream: Some(boxed_stream(first.chain(second))),
172 units: self.units + task.units,
173 },
174 },
175 }
176 }
177
178 pub fn collect(self) -> Task<Vec<T>>
180 where
181 T: MaybeSend + 'static,
182 {
183 match self.stream {
184 None => Task::done(Vec::new()),
185 Some(stream) => Task {
186 stream: Some(boxed_stream(
187 stream::unfold(
188 (stream, Some(Vec::new())),
189 move |(mut stream, outputs)| async move {
190 let mut outputs = outputs?;
191
192 let Some(action) = stream.next().await else {
193 return Some((
194 Some(Action::Output(outputs)),
195 (stream, None),
196 ));
197 };
198
199 match action.output() {
200 Ok(output) => {
201 outputs.push(output);
202
203 Some((None, (stream, Some(outputs))))
204 }
205 Err(action) => Some((
206 Some(action),
207 (stream, Some(outputs)),
208 )),
209 }
210 },
211 )
212 .filter_map(future::ready),
213 )),
214 units: self.units,
215 },
216 }
217 }
218
219 pub fn discard<O>(self) -> Task<O>
223 where
224 T: MaybeSend + 'static,
225 O: MaybeSend + 'static,
226 {
227 self.then(|_| Task::none())
228 }
229
230 pub fn abortable(self) -> (Self, Handle)
232 where
233 T: 'static,
234 {
235 let (stream, handle) = match self.stream {
236 Some(stream) => {
237 let (stream, handle) = stream::abortable(stream);
238
239 (Some(boxed_stream(stream)), InternalHandle::Manual(handle))
240 }
241 None => (
242 None,
243 InternalHandle::Manual(stream::AbortHandle::new_pair().0),
244 ),
245 };
246
247 (
248 Self {
249 stream,
250 units: self.units,
251 },
252 Handle { internal: handle },
253 )
254 }
255
256 pub fn future(future: impl Future<Output = T> + MaybeSend + 'static) -> Self
259 where
260 T: 'static,
261 {
262 Self::stream(stream::once(future))
263 }
264
265 pub fn stream(stream: impl Stream<Item = T> + MaybeSend + 'static) -> Self
268 where
269 T: 'static,
270 {
271 Self {
272 stream: Some(boxed_stream(
273 stream::once(yield_now())
274 .filter_map(|_| async { None })
275 .chain(stream.map(Action::Output)),
276 )),
277 units: 1,
278 }
279 }
280
281 pub fn units(&self) -> usize {
283 self.units
284 }
285}
286
287impl<T> std::fmt::Debug for Task<T> {
288 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
289 f.debug_struct(&format!("Task<{}>", std::any::type_name::<T>()))
290 .field("units", &self.units)
291 .finish()
292 }
293}
294
295#[derive(Debug, Clone)]
297pub struct Handle {
298 internal: InternalHandle,
299}
300
301#[derive(Debug, Clone)]
302enum InternalHandle {
303 Manual(stream::AbortHandle),
304 AbortOnDrop(Arc<stream::AbortHandle>),
305}
306
307impl InternalHandle {
308 pub fn as_ref(&self) -> &stream::AbortHandle {
309 match self {
310 InternalHandle::Manual(handle) => handle,
311 InternalHandle::AbortOnDrop(handle) => handle.as_ref(),
312 }
313 }
314}
315
316impl Handle {
317 pub fn abort(&self) {
319 self.internal.as_ref().abort();
320 }
321
322 pub fn abort_on_drop(self) -> Self {
331 match &self.internal {
332 InternalHandle::Manual(handle) => Self {
333 internal: InternalHandle::AbortOnDrop(Arc::new(handle.clone())),
334 },
335 InternalHandle::AbortOnDrop(_) => self,
336 }
337 }
338
339 pub fn is_aborted(&self) -> bool {
341 self.internal.as_ref().is_aborted()
342 }
343}
344
345impl Drop for Handle {
346 fn drop(&mut self) {
347 if let InternalHandle::AbortOnDrop(handle) = &mut self.internal {
348 let handle = std::mem::replace(
349 handle,
350 Arc::new(stream::AbortHandle::new_pair().0),
351 );
352
353 if let Some(handle) = Arc::into_inner(handle) {
354 handle.abort();
355 }
356 }
357 }
358}
359
360impl<T> Task<Option<T>> {
361 pub fn and_then<A>(
365 self,
366 f: impl Fn(T) -> Task<A> + MaybeSend + 'static,
367 ) -> Task<A>
368 where
369 T: MaybeSend + 'static,
370 A: MaybeSend + 'static,
371 {
372 self.then(move |option| option.map_or_else(Task::none, &f))
373 }
374}
375
376impl<T, E> Task<Result<T, E>> {
377 pub fn and_then<A>(
381 self,
382 f: impl Fn(T) -> Task<Result<A, E>> + MaybeSend + 'static,
383 ) -> Task<Result<A, E>>
384 where
385 T: MaybeSend + 'static,
386 E: MaybeSend + 'static,
387 A: MaybeSend + 'static,
388 {
389 self.then(move |result| {
390 result.map_or_else(|error| Task::done(Err(error)), &f)
391 })
392 }
393
394 pub fn map_err<E2>(
397 self,
398 f: impl Fn(E) -> E2 + MaybeSend + 'static,
399 ) -> Task<Result<T, E2>>
400 where
401 T: MaybeSend + 'static,
402 E: MaybeSend + 'static,
403 E2: MaybeSend + 'static,
404 {
405 self.map(move |result| result.map_err(&f))
406 }
407}
408
409impl<T> Default for Task<T> {
410 fn default() -> Self {
411 Self::none()
412 }
413}
414
415impl<T> From<()> for Task<T> {
416 fn from(_value: ()) -> Self {
417 Self::none()
418 }
419}
420
421pub fn widget<T>(operation: impl widget::Operation<T> + 'static) -> Task<T>
424where
425 T: Send + 'static,
426{
427 channel(move |sender| {
428 let operation =
429 widget::operation::map(Box::new(operation), move |value| {
430 let _ = sender.clone().try_send(value);
431 });
432
433 Action::Widget(Box::new(operation))
434 })
435}
436
437pub fn oneshot<T>(f: impl FnOnce(oneshot::Sender<T>) -> Action<T>) -> Task<T>
440where
441 T: MaybeSend + 'static,
442{
443 let (sender, receiver) = oneshot::channel();
444
445 let action = f(sender);
446
447 Task {
448 stream: Some(boxed_stream(stream::once(async move { action }).chain(
449 receiver.into_stream().filter_map(|result| async move {
450 Some(Action::Output(result.ok()?))
451 }),
452 ))),
453 units: 1,
454 }
455}
456
457pub fn channel<T>(f: impl FnOnce(mpsc::Sender<T>) -> Action<T>) -> Task<T>
460where
461 T: MaybeSend + 'static,
462{
463 let (sender, receiver) = mpsc::channel(1);
464
465 let action = f(sender);
466
467 Task {
468 stream: Some(boxed_stream(
469 stream::once(async move { action })
470 .chain(receiver.map(|result| Action::Output(result))),
471 )),
472 units: 1,
473 }
474}
475
476pub fn effect<T>(action: impl Into<Action<Infallible>>) -> Task<T> {
478 let action = action.into();
479
480 Task {
481 stream: Some(boxed_stream(stream::once(async move {
482 action.output().expect_err("no output")
483 }))),
484 units: 1,
485 }
486}
487
488pub fn into_stream<T>(task: Task<T>) -> Option<BoxStream<Action<T>>> {
490 task.stream
491}
492
493pub fn blocking<T>(f: impl FnOnce(mpsc::Sender<T>) + Send + 'static) -> Task<T>
498where
499 T: Send + 'static,
500{
501 let (sender, receiver) = mpsc::channel(1);
502
503 let _ = thread::spawn(move || {
504 f(sender);
505 });
506
507 Task::stream(receiver)
508}
509
510pub fn try_blocking<T, E>(
516 f: impl FnOnce(mpsc::Sender<T>) -> Result<(), E> + Send + 'static,
517) -> Task<Result<T, E>>
518where
519 T: Send + 'static,
520 E: Send + 'static,
521{
522 let (sender, receiver) = mpsc::channel(1);
523 let (error_sender, error_receiver) = oneshot::channel();
524
525 let _ = thread::spawn(move || {
526 if let Err(error) = f(sender) {
527 let _ = error_sender.send(Err(error));
528 }
529 });
530
531 Task::stream(stream::select(
532 receiver.map(Ok),
533 stream::once(error_receiver).filter_map(async |result| result.ok()),
534 ))
535}
536
537async fn yield_now() {
538 struct YieldNow {
539 yielded: bool,
540 }
541
542 impl Future for YieldNow {
543 type Output = ();
544
545 fn poll(
546 mut self: Pin<&mut Self>,
547 cx: &mut task::Context<'_>,
548 ) -> task::Poll<()> {
549 if self.yielded {
550 return task::Poll::Ready(());
551 }
552
553 self.yielded = true;
554
555 cx.waker().wake_by_ref();
556
557 task::Poll::Pending
558 }
559 }
560
561 YieldNow { yielded: false }.await;
562}