1use crate::Action;
3use crate::core::widget;
4use crate::futures::futures::channel::mpsc;
5use crate::futures::futures::channel::oneshot;
6use crate::futures::futures::future::{self, FutureExt};
7use crate::futures::futures::stream::{self, Stream, StreamExt};
8use crate::futures::{BoxStream, MaybeSend, boxed_stream};
9
10use std::convert::Infallible;
11use std::pin::Pin;
12use std::sync::Arc;
13use std::task;
14use std::thread;
15
16#[cfg(feature = "sipper")]
17#[doc(no_inline)]
18pub use sipper::{Never, Sender, Sipper, Straw, sipper, stream};
19
20#[must_use = "`Task` must be returned to the runtime to take effect; normally in your `update` or `new` functions."]
24pub struct Task<T> {
25 stream: Option<BoxStream<Action<T>>>,
26 units: usize,
27}
28
29impl<T> Task<T> {
30 pub fn none() -> Self {
32 Self {
33 stream: None,
34 units: 0,
35 }
36 }
37
38 pub fn done(value: T) -> Self
40 where
41 T: MaybeSend + 'static,
42 {
43 Self::future(future::ready(value))
44 }
45
46 pub fn perform<A>(
49 future: impl Future<Output = A> + MaybeSend + 'static,
50 f: impl FnOnce(A) -> T + MaybeSend + 'static,
51 ) -> Self
52 where
53 T: MaybeSend + 'static,
54 A: MaybeSend + 'static,
55 {
56 Self::future(future.map(f))
57 }
58
59 pub fn run<A>(
62 stream: impl Stream<Item = A> + MaybeSend + 'static,
63 f: impl Fn(A) -> T + MaybeSend + 'static,
64 ) -> Self
65 where
66 T: 'static,
67 {
68 Self::stream(stream.map(f))
69 }
70
71 #[cfg(feature = "sipper")]
74 pub fn sip<S>(
75 sipper: S,
76 on_progress: impl FnMut(S::Progress) -> T + MaybeSend + 'static,
77 on_output: impl FnOnce(<S as Future>::Output) -> T + MaybeSend + 'static,
78 ) -> Self
79 where
80 S: sipper::Core + MaybeSend + 'static,
81 T: MaybeSend + 'static,
82 {
83 Self::stream(stream(sipper::sipper(move |sender| async move {
84 on_output(sipper.with(on_progress).run(sender).await)
85 })))
86 }
87
88 pub fn batch(tasks: impl IntoIterator<Item = Self>) -> Self
91 where
92 T: 'static,
93 {
94 let mut select_all = stream::SelectAll::new();
95 let mut units = 0;
96
97 for task in tasks.into_iter() {
98 if let Some(stream) = task.stream {
99 select_all.push(stream);
100 }
101
102 units += task.units;
103 }
104
105 Self {
106 stream: Some(boxed_stream(select_all)),
107 units,
108 }
109 }
110
111 pub fn map<O>(self, mut f: impl FnMut(T) -> O + MaybeSend + 'static) -> Task<O>
113 where
114 T: MaybeSend + 'static,
115 O: MaybeSend + 'static,
116 {
117 self.then(move |output| Task::done(f(output)))
118 }
119
120 pub fn then<O>(self, mut f: impl FnMut(T) -> Task<O> + MaybeSend + 'static) -> Task<O>
126 where
127 T: MaybeSend + 'static,
128 O: MaybeSend + 'static,
129 {
130 Task {
131 stream: match self.stream {
132 None => None,
133 Some(stream) => Some(boxed_stream(stream.flat_map(move |action| {
134 match action.output() {
135 Ok(output) => f(output)
136 .stream
137 .unwrap_or_else(|| boxed_stream(stream::empty())),
138 Err(action) => boxed_stream(stream::once(async move { action })),
139 }
140 }))),
141 },
142 units: self.units,
143 }
144 }
145
146 pub fn chain(self, task: Self) -> Self
148 where
149 T: 'static,
150 {
151 match self.stream {
152 None => task,
153 Some(first) => match task.stream {
154 None => Self {
155 stream: Some(first),
156 units: self.units,
157 },
158 Some(second) => Self {
159 stream: Some(boxed_stream(first.chain(second))),
160 units: self.units + task.units,
161 },
162 },
163 }
164 }
165
166 pub fn collect(self) -> Task<Vec<T>>
168 where
169 T: MaybeSend + 'static,
170 {
171 match self.stream {
172 None => Task::done(Vec::new()),
173 Some(stream) => Task {
174 stream: Some(boxed_stream(
175 stream::unfold(
176 (stream, Some(Vec::new())),
177 move |(mut stream, outputs)| async move {
178 let mut outputs = outputs?;
179
180 let Some(action) = stream.next().await else {
181 return Some((Some(Action::Output(outputs)), (stream, None)));
182 };
183
184 match action.output() {
185 Ok(output) => {
186 outputs.push(output);
187
188 Some((None, (stream, Some(outputs))))
189 }
190 Err(action) => Some((Some(action), (stream, Some(outputs)))),
191 }
192 },
193 )
194 .filter_map(future::ready),
195 )),
196 units: self.units,
197 },
198 }
199 }
200
201 pub fn discard<O>(self) -> Task<O>
205 where
206 T: MaybeSend + 'static,
207 O: MaybeSend + 'static,
208 {
209 self.then(|_| Task::none())
210 }
211
212 pub fn abortable(self) -> (Self, Handle)
214 where
215 T: 'static,
216 {
217 let (stream, handle) = match self.stream {
218 Some(stream) => {
219 let (stream, handle) = stream::abortable(stream);
220
221 (Some(boxed_stream(stream)), InternalHandle::Manual(handle))
222 }
223 None => (
224 None,
225 InternalHandle::Manual(stream::AbortHandle::new_pair().0),
226 ),
227 };
228
229 (
230 Self {
231 stream,
232 units: self.units,
233 },
234 Handle { internal: handle },
235 )
236 }
237
238 pub fn future(future: impl Future<Output = T> + MaybeSend + 'static) -> Self
241 where
242 T: 'static,
243 {
244 Self::stream(stream::once(future))
245 }
246
247 pub fn stream(stream: impl Stream<Item = T> + MaybeSend + 'static) -> Self
250 where
251 T: 'static,
252 {
253 Self {
254 stream: Some(boxed_stream(
255 stream::once(yield_now())
256 .filter_map(|_| async { None })
257 .chain(stream.map(Action::Output)),
258 )),
259 units: 1,
260 }
261 }
262
263 pub fn units(&self) -> usize {
265 self.units
266 }
267}
268
269impl<T> std::fmt::Debug for Task<T> {
270 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
271 f.debug_struct(&format!("Task<{}>", std::any::type_name::<T>()))
272 .field("units", &self.units)
273 .finish()
274 }
275}
276
277#[derive(Debug, Clone)]
279pub struct Handle {
280 internal: InternalHandle,
281}
282
283#[derive(Debug, Clone)]
284enum InternalHandle {
285 Manual(stream::AbortHandle),
286 AbortOnDrop(Arc<stream::AbortHandle>),
287}
288
289impl InternalHandle {
290 pub fn as_ref(&self) -> &stream::AbortHandle {
291 match self {
292 InternalHandle::Manual(handle) => handle,
293 InternalHandle::AbortOnDrop(handle) => handle.as_ref(),
294 }
295 }
296}
297
298impl Handle {
299 pub fn abort(&self) {
301 self.internal.as_ref().abort();
302 }
303
304 pub fn abort_on_drop(self) -> Self {
313 match &self.internal {
314 InternalHandle::Manual(handle) => Self {
315 internal: InternalHandle::AbortOnDrop(Arc::new(handle.clone())),
316 },
317 InternalHandle::AbortOnDrop(_) => self,
318 }
319 }
320
321 pub fn is_aborted(&self) -> bool {
323 self.internal.as_ref().is_aborted()
324 }
325}
326
327impl Drop for Handle {
328 fn drop(&mut self) {
329 if let InternalHandle::AbortOnDrop(handle) = &mut self.internal {
330 let handle = std::mem::replace(handle, Arc::new(stream::AbortHandle::new_pair().0));
331
332 if let Some(handle) = Arc::into_inner(handle) {
333 handle.abort();
334 }
335 }
336 }
337}
338
339impl<T> Task<Option<T>> {
340 pub fn and_then<A>(self, f: impl Fn(T) -> Task<A> + MaybeSend + 'static) -> Task<A>
344 where
345 T: MaybeSend + 'static,
346 A: MaybeSend + 'static,
347 {
348 self.then(move |option| option.map_or_else(Task::none, &f))
349 }
350}
351
352impl<T, E> Task<Result<T, E>> {
353 pub fn and_then<A>(
357 self,
358 f: impl Fn(T) -> Task<Result<A, E>> + MaybeSend + 'static,
359 ) -> Task<Result<A, E>>
360 where
361 T: MaybeSend + 'static,
362 E: MaybeSend + 'static,
363 A: MaybeSend + 'static,
364 {
365 self.then(move |result| result.map_or_else(|error| Task::done(Err(error)), &f))
366 }
367
368 pub fn map_err<E2>(self, f: impl Fn(E) -> E2 + MaybeSend + 'static) -> Task<Result<T, E2>>
371 where
372 T: MaybeSend + 'static,
373 E: MaybeSend + 'static,
374 E2: MaybeSend + 'static,
375 {
376 self.map(move |result| result.map_err(&f))
377 }
378}
379
380impl<T> Default for Task<T> {
381 fn default() -> Self {
382 Self::none()
383 }
384}
385
386impl<T> From<()> for Task<T> {
387 fn from(_value: ()) -> Self {
388 Self::none()
389 }
390}
391
392pub fn widget<T>(operation: impl widget::Operation<T> + 'static) -> Task<T>
395where
396 T: Send + 'static,
397{
398 channel(move |sender| {
399 let operation = widget::operation::map(Box::new(operation), move |value| {
400 let _ = sender.clone().try_send(value);
401 });
402
403 Action::Widget(Box::new(operation))
404 })
405}
406
407pub fn oneshot<T>(f: impl FnOnce(oneshot::Sender<T>) -> Action<T>) -> Task<T>
410where
411 T: MaybeSend + 'static,
412{
413 let (sender, receiver) = oneshot::channel();
414
415 let action = f(sender);
416
417 Task {
418 stream: Some(boxed_stream(
419 stream::once(async move { action }).chain(
420 receiver
421 .into_stream()
422 .filter_map(|result| async move { Some(Action::Output(result.ok()?)) }),
423 ),
424 )),
425 units: 1,
426 }
427}
428
429pub fn channel<T>(f: impl FnOnce(mpsc::Sender<T>) -> Action<T>) -> Task<T>
432where
433 T: MaybeSend + 'static,
434{
435 let (sender, receiver) = mpsc::channel(1);
436
437 let action = f(sender);
438
439 Task {
440 stream: Some(boxed_stream(
441 stream::once(async move { action })
442 .chain(receiver.map(|result| Action::Output(result))),
443 )),
444 units: 1,
445 }
446}
447
448pub fn effect<T>(action: impl Into<Action<Infallible>>) -> Task<T> {
450 let action = action.into();
451
452 Task {
453 stream: Some(boxed_stream(stream::once(async move {
454 action.output().expect_err("no output")
455 }))),
456 units: 1,
457 }
458}
459
460pub fn into_stream<T>(task: Task<T>) -> Option<BoxStream<Action<T>>> {
462 task.stream
463}
464
465pub fn blocking<T>(f: impl FnOnce(mpsc::Sender<T>) + Send + 'static) -> Task<T>
470where
471 T: Send + 'static,
472{
473 let (sender, receiver) = mpsc::channel(1);
474
475 let _ = thread::spawn(move || {
476 f(sender);
477 });
478
479 Task::stream(receiver)
480}
481
482pub fn try_blocking<T, E>(
488 f: impl FnOnce(mpsc::Sender<T>) -> Result<(), E> + Send + 'static,
489) -> Task<Result<T, E>>
490where
491 T: Send + 'static,
492 E: Send + 'static,
493{
494 let (sender, receiver) = mpsc::channel(1);
495 let (error_sender, error_receiver) = oneshot::channel();
496
497 let _ = thread::spawn(move || {
498 if let Err(error) = f(sender) {
499 let _ = error_sender.send(Err(error));
500 }
501 });
502
503 Task::stream(stream::select(
504 receiver.map(Ok),
505 stream::once(error_receiver).filter_map(async |result| result.ok()),
506 ))
507}
508
509async fn yield_now() {
510 struct YieldNow {
511 yielded: bool,
512 }
513
514 impl Future for YieldNow {
515 type Output = ();
516
517 fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> task::Poll<()> {
518 if self.yielded {
519 return task::Poll::Ready(());
520 }
521
522 self.yielded = true;
523
524 cx.waker().wake_by_ref();
525
526 task::Poll::Pending
527 }
528 }
529
530 YieldNow { yielded: false }.await;
531}