1use crate::Action;
3use crate::core::widget;
4use crate::futures::futures::channel::mpsc;
5use crate::futures::futures::channel::oneshot;
6use crate::futures::futures::future::{self, FutureExt};
7use crate::futures::futures::stream::{self, Stream, StreamExt};
8use crate::futures::{BoxStream, MaybeSend, boxed_stream};
9
10use std::convert::Infallible;
11use std::pin::Pin;
12use std::sync::Arc;
13use std::task;
14use std::thread;
15
16#[cfg(feature = "sipper")]
17#[doc(no_inline)]
18pub use sipper::{Never, Sender, Sipper, Straw, sipper, stream};
19
20#[must_use = "`Task` must be returned to the runtime to take effect; normally in your `update` or `new` functions."]
24pub struct Task<T> {
25 stream: Option<BoxStream<Action<T>>>,
26 units: usize,
27}
28
29impl<T> Task<T> {
30 pub fn none() -> Self {
32 Self {
33 stream: None,
34 units: 0,
35 }
36 }
37
38 pub fn done(value: T) -> Self
40 where
41 T: MaybeSend + 'static,
42 {
43 Self {
44 stream: Some(boxed_stream(stream::once(future::ready(Action::Output(
45 value,
46 ))))),
47 units: 0,
48 }
49 }
50
51 pub fn perform<A>(
54 future: impl Future<Output = A> + MaybeSend + 'static,
55 f: impl FnOnce(A) -> T + MaybeSend + 'static,
56 ) -> Self
57 where
58 T: MaybeSend + 'static,
59 A: MaybeSend + 'static,
60 {
61 Self::future(future.map(f))
62 }
63
64 pub fn run<A>(
67 stream: impl Stream<Item = A> + MaybeSend + 'static,
68 f: impl Fn(A) -> T + MaybeSend + 'static,
69 ) -> Self
70 where
71 T: 'static,
72 {
73 Self::stream(stream.map(f))
74 }
75
76 #[cfg(feature = "sipper")]
79 pub fn sip<S>(
80 sipper: S,
81 on_progress: impl FnMut(S::Progress) -> T + MaybeSend + 'static,
82 on_output: impl FnOnce(<S as Future>::Output) -> T + MaybeSend + 'static,
83 ) -> Self
84 where
85 S: sipper::Core + MaybeSend + 'static,
86 T: MaybeSend + 'static,
87 {
88 Self::stream(stream(sipper::sipper(move |sender| async move {
89 on_output(sipper.with(on_progress).run(sender).await)
90 })))
91 }
92
93 pub fn batch(tasks: impl IntoIterator<Item = Self>) -> Self
96 where
97 T: 'static,
98 {
99 let mut select_all = stream::SelectAll::new();
100 let mut units = 0;
101
102 for task in tasks.into_iter() {
103 if let Some(stream) = task.stream {
104 select_all.push(stream);
105 }
106
107 units += task.units;
108 }
109
110 Self {
111 stream: Some(boxed_stream(select_all)),
112 units,
113 }
114 }
115
116 pub fn map<O>(self, mut f: impl FnMut(T) -> O + MaybeSend + 'static) -> Task<O>
118 where
119 T: MaybeSend + 'static,
120 O: MaybeSend + 'static,
121 {
122 self.then(move |output| Task::done(f(output)))
123 }
124
125 pub fn then<O>(self, mut f: impl FnMut(T) -> Task<O> + MaybeSend + 'static) -> Task<O>
131 where
132 T: MaybeSend + 'static,
133 O: MaybeSend + 'static,
134 {
135 Task {
136 stream: match self.stream {
137 None => None,
138 Some(stream) => Some(boxed_stream(stream.flat_map(move |action| {
139 match action.output() {
140 Ok(output) => f(output)
141 .stream
142 .unwrap_or_else(|| boxed_stream(stream::empty())),
143 Err(action) => boxed_stream(stream::once(async move { action })),
144 }
145 }))),
146 },
147 units: self.units,
148 }
149 }
150
151 pub fn chain(self, task: Self) -> Self
153 where
154 T: 'static,
155 {
156 match self.stream {
157 None => task,
158 Some(first) => match task.stream {
159 None => Self {
160 stream: Some(first),
161 units: self.units,
162 },
163 Some(second) => Self {
164 stream: Some(boxed_stream(first.chain(second))),
165 units: self.units + task.units,
166 },
167 },
168 }
169 }
170
171 pub fn collect(self) -> Task<Vec<T>>
173 where
174 T: MaybeSend + 'static,
175 {
176 match self.stream {
177 None => Task::done(Vec::new()),
178 Some(stream) => Task {
179 stream: Some(boxed_stream(
180 stream::unfold(
181 (stream, Some(Vec::new())),
182 move |(mut stream, outputs)| async move {
183 let mut outputs = outputs?;
184
185 let Some(action) = stream.next().await else {
186 return Some((Some(Action::Output(outputs)), (stream, None)));
187 };
188
189 match action.output() {
190 Ok(output) => {
191 outputs.push(output);
192
193 Some((None, (stream, Some(outputs))))
194 }
195 Err(action) => Some((Some(action), (stream, Some(outputs)))),
196 }
197 },
198 )
199 .filter_map(future::ready),
200 )),
201 units: self.units,
202 },
203 }
204 }
205
206 pub fn discard<O>(self) -> Task<O>
210 where
211 T: MaybeSend + 'static,
212 O: MaybeSend + 'static,
213 {
214 self.then(|_| Task::none())
215 }
216
217 pub fn abortable(self) -> (Self, Handle)
219 where
220 T: 'static,
221 {
222 let (stream, handle) = match self.stream {
223 Some(stream) => {
224 let (stream, handle) = stream::abortable(stream);
225
226 (Some(boxed_stream(stream)), InternalHandle::Manual(handle))
227 }
228 None => (
229 None,
230 InternalHandle::Manual(stream::AbortHandle::new_pair().0),
231 ),
232 };
233
234 (
235 Self {
236 stream,
237 units: self.units,
238 },
239 Handle { internal: handle },
240 )
241 }
242
243 pub fn future(future: impl Future<Output = T> + MaybeSend + 'static) -> Self
246 where
247 T: 'static,
248 {
249 Self::stream(stream::once(future))
250 }
251
252 pub fn stream(stream: impl Stream<Item = T> + MaybeSend + 'static) -> Self
255 where
256 T: 'static,
257 {
258 Self {
259 stream: Some(boxed_stream(
260 stream::once(yield_now())
261 .filter_map(|_| async { None })
262 .chain(stream.map(Action::Output)),
263 )),
264 units: 1,
265 }
266 }
267
268 pub fn units(&self) -> usize {
270 self.units
271 }
272}
273
274impl<T> std::fmt::Debug for Task<T> {
275 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
276 f.debug_struct(&format!("Task<{}>", std::any::type_name::<T>()))
277 .field("units", &self.units)
278 .finish()
279 }
280}
281
282#[derive(Debug, Clone)]
284pub struct Handle {
285 internal: InternalHandle,
286}
287
288#[derive(Debug, Clone)]
289enum InternalHandle {
290 Manual(stream::AbortHandle),
291 AbortOnDrop(Arc<stream::AbortHandle>),
292}
293
294impl InternalHandle {
295 pub fn as_ref(&self) -> &stream::AbortHandle {
296 match self {
297 InternalHandle::Manual(handle) => handle,
298 InternalHandle::AbortOnDrop(handle) => handle.as_ref(),
299 }
300 }
301}
302
303impl Handle {
304 pub fn abort(&self) {
306 self.internal.as_ref().abort();
307 }
308
309 pub fn abort_on_drop(self) -> Self {
318 match &self.internal {
319 InternalHandle::Manual(handle) => Self {
320 internal: InternalHandle::AbortOnDrop(Arc::new(handle.clone())),
321 },
322 InternalHandle::AbortOnDrop(_) => self,
323 }
324 }
325
326 pub fn is_aborted(&self) -> bool {
328 self.internal.as_ref().is_aborted()
329 }
330}
331
332impl Drop for Handle {
333 fn drop(&mut self) {
334 if let InternalHandle::AbortOnDrop(handle) = &mut self.internal {
335 let handle = std::mem::replace(handle, Arc::new(stream::AbortHandle::new_pair().0));
336
337 if let Some(handle) = Arc::into_inner(handle) {
338 handle.abort();
339 }
340 }
341 }
342}
343
344impl<T> Task<Option<T>> {
345 pub fn and_then<A>(self, f: impl Fn(T) -> Task<A> + MaybeSend + 'static) -> Task<A>
349 where
350 T: MaybeSend + 'static,
351 A: MaybeSend + 'static,
352 {
353 self.then(move |option| option.map_or_else(Task::none, &f))
354 }
355}
356
357impl<T, E> Task<Result<T, E>> {
358 pub fn and_then<A>(
362 self,
363 f: impl Fn(T) -> Task<Result<A, E>> + MaybeSend + 'static,
364 ) -> Task<Result<A, E>>
365 where
366 T: MaybeSend + 'static,
367 E: MaybeSend + 'static,
368 A: MaybeSend + 'static,
369 {
370 self.then(move |result| result.map_or_else(|error| Task::done(Err(error)), &f))
371 }
372
373 pub fn map_err<E2>(self, f: impl Fn(E) -> E2 + MaybeSend + 'static) -> Task<Result<T, E2>>
376 where
377 T: MaybeSend + 'static,
378 E: MaybeSend + 'static,
379 E2: MaybeSend + 'static,
380 {
381 self.map(move |result| result.map_err(&f))
382 }
383}
384
385impl<T> Default for Task<T> {
386 fn default() -> Self {
387 Self::none()
388 }
389}
390
391impl<T> From<()> for Task<T> {
392 fn from(_value: ()) -> Self {
393 Self::none()
394 }
395}
396
397pub fn widget<T>(operation: impl widget::Operation<T> + 'static) -> Task<T>
400where
401 T: Send + 'static,
402{
403 channel(move |sender| {
404 let operation = widget::operation::map(Box::new(operation), move |value| {
405 let _ = sender.clone().try_send(value);
406 });
407
408 Action::Widget(Box::new(operation))
409 })
410}
411
412pub fn oneshot<T>(f: impl FnOnce(oneshot::Sender<T>) -> Action<T>) -> Task<T>
415where
416 T: MaybeSend + 'static,
417{
418 let (sender, receiver) = oneshot::channel();
419
420 let action = f(sender);
421
422 Task {
423 stream: Some(boxed_stream(
424 stream::once(async move { action }).chain(
425 receiver
426 .into_stream()
427 .filter_map(|result| async move { Some(Action::Output(result.ok()?)) }),
428 ),
429 )),
430 units: 1,
431 }
432}
433
434pub fn channel<T>(f: impl FnOnce(mpsc::Sender<T>) -> Action<T>) -> Task<T>
437where
438 T: MaybeSend + 'static,
439{
440 let (sender, receiver) = mpsc::channel(1);
441
442 let action = f(sender);
443
444 Task {
445 stream: Some(boxed_stream(
446 stream::once(async move { action })
447 .chain(receiver.map(|result| Action::Output(result))),
448 )),
449 units: 1,
450 }
451}
452
453pub fn effect<T>(action: impl Into<Action<Infallible>>) -> Task<T> {
455 let action = action.into();
456
457 Task {
458 stream: Some(boxed_stream(stream::once(async move {
459 action.output().expect_err("no output")
460 }))),
461 units: 1,
462 }
463}
464
465pub fn into_stream<T>(task: Task<T>) -> Option<BoxStream<Action<T>>> {
467 task.stream
468}
469
470pub fn blocking<T>(f: impl FnOnce(mpsc::Sender<T>) + Send + 'static) -> Task<T>
475where
476 T: Send + 'static,
477{
478 let (sender, receiver) = mpsc::channel(1);
479
480 let _ = thread::spawn(move || {
481 f(sender);
482 });
483
484 Task::stream(receiver)
485}
486
487pub fn try_blocking<T, E>(
493 f: impl FnOnce(mpsc::Sender<T>) -> Result<(), E> + Send + 'static,
494) -> Task<Result<T, E>>
495where
496 T: Send + 'static,
497 E: Send + 'static,
498{
499 let (sender, receiver) = mpsc::channel(1);
500 let (error_sender, error_receiver) = oneshot::channel();
501
502 let _ = thread::spawn(move || {
503 if let Err(error) = f(sender) {
504 let _ = error_sender.send(Err(error));
505 }
506 });
507
508 Task::stream(stream::select(
509 receiver.map(Ok),
510 stream::once(error_receiver).filter_map(async |result| result.ok()),
511 ))
512}
513
514async fn yield_now() {
515 struct YieldNow {
516 yielded: bool,
517 }
518
519 impl Future for YieldNow {
520 type Output = ();
521
522 fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> task::Poll<()> {
523 if self.yielded {
524 return task::Poll::Ready(());
525 }
526
527 self.yielded = true;
528
529 cx.waker().wake_by_ref();
530
531 task::Poll::Pending
532 }
533 }
534
535 YieldNow { yielded: false }.await;
536}