1use crate::Action;
3use crate::core::widget;
4use crate::futures::futures::channel::mpsc;
5use crate::futures::futures::channel::oneshot;
6use crate::futures::futures::future::{self, FutureExt};
7use crate::futures::futures::stream::{self, Stream, StreamExt};
8use crate::futures::{BoxStream, MaybeSend, boxed_stream};
9
10use std::convert::Infallible;
11use std::sync::Arc;
12use std::thread;
13
14#[cfg(feature = "sipper")]
15#[doc(no_inline)]
16pub use sipper::{Never, Sender, Sipper, Straw, sipper, stream};
17
18#[must_use = "`Task` must be returned to the runtime to take effect; normally in your `update` or `new` functions."]
22pub struct Task<T> {
23 stream: Option<BoxStream<Action<T>>>,
24 units: usize,
25}
26
27impl<T> Task<T> {
28 pub fn none() -> Self {
30 Self {
31 stream: None,
32 units: 0,
33 }
34 }
35
36 pub fn done(value: T) -> Self
38 where
39 T: MaybeSend + 'static,
40 {
41 Self::future(future::ready(value))
42 }
43
44 pub fn perform<A>(
47 future: impl Future<Output = A> + MaybeSend + 'static,
48 f: impl FnOnce(A) -> T + MaybeSend + 'static,
49 ) -> Self
50 where
51 T: MaybeSend + 'static,
52 A: MaybeSend + 'static,
53 {
54 Self::future(future.map(f))
55 }
56
57 pub fn run<A>(
60 stream: impl Stream<Item = A> + MaybeSend + 'static,
61 f: impl Fn(A) -> T + MaybeSend + 'static,
62 ) -> Self
63 where
64 T: 'static,
65 {
66 Self::stream(stream.map(f))
67 }
68
69 #[cfg(feature = "sipper")]
72 pub fn sip<S>(
73 sipper: S,
74 on_progress: impl FnMut(S::Progress) -> T + MaybeSend + 'static,
75 on_output: impl FnOnce(<S as Future>::Output) -> T + MaybeSend + 'static,
76 ) -> Self
77 where
78 S: sipper::Core + MaybeSend + 'static,
79 T: MaybeSend + 'static,
80 {
81 Self::stream(stream(sipper::sipper(move |sender| async move {
82 on_output(sipper.with(on_progress).run(sender).await)
83 })))
84 }
85
86 pub fn batch(tasks: impl IntoIterator<Item = Self>) -> Self
89 where
90 T: 'static,
91 {
92 let mut select_all = stream::SelectAll::new();
93 let mut units = 0;
94
95 for task in tasks.into_iter() {
96 if let Some(stream) = task.stream {
97 select_all.push(stream);
98 }
99
100 units += task.units;
101 }
102
103 Self {
104 stream: Some(boxed_stream(select_all)),
105 units,
106 }
107 }
108
109 pub fn map<O>(
111 self,
112 mut f: impl FnMut(T) -> O + MaybeSend + 'static,
113 ) -> Task<O>
114 where
115 T: MaybeSend + 'static,
116 O: MaybeSend + 'static,
117 {
118 self.then(move |output| Task::done(f(output)))
119 }
120
121 pub fn then<O>(
127 self,
128 mut f: impl FnMut(T) -> Task<O> + MaybeSend + 'static,
129 ) -> Task<O>
130 where
131 T: MaybeSend + 'static,
132 O: MaybeSend + 'static,
133 {
134 Task {
135 stream: match self.stream {
136 None => None,
137 Some(stream) => {
138 Some(boxed_stream(stream.flat_map(move |action| {
139 match action.output() {
140 Ok(output) => {
141 f(output).stream.unwrap_or_else(|| {
142 boxed_stream(stream::empty())
143 })
144 }
145 Err(action) => boxed_stream(stream::once(
146 async move { action },
147 )),
148 }
149 })))
150 }
151 },
152 units: self.units,
153 }
154 }
155
156 pub fn chain(self, task: Self) -> Self
158 where
159 T: 'static,
160 {
161 match self.stream {
162 None => task,
163 Some(first) => match task.stream {
164 None => Self {
165 stream: Some(first),
166 units: self.units,
167 },
168 Some(second) => Self {
169 stream: Some(boxed_stream(first.chain(second))),
170 units: self.units + task.units,
171 },
172 },
173 }
174 }
175
176 pub fn collect(self) -> Task<Vec<T>>
178 where
179 T: MaybeSend + 'static,
180 {
181 match self.stream {
182 None => Task::done(Vec::new()),
183 Some(stream) => Task {
184 stream: Some(boxed_stream(
185 stream::unfold(
186 (stream, Some(Vec::new())),
187 move |(mut stream, outputs)| async move {
188 let mut outputs = outputs?;
189
190 let Some(action) = stream.next().await else {
191 return Some((
192 Some(Action::Output(outputs)),
193 (stream, None),
194 ));
195 };
196
197 match action.output() {
198 Ok(output) => {
199 outputs.push(output);
200
201 Some((None, (stream, Some(outputs))))
202 }
203 Err(action) => Some((
204 Some(action),
205 (stream, Some(outputs)),
206 )),
207 }
208 },
209 )
210 .filter_map(future::ready),
211 )),
212 units: self.units,
213 },
214 }
215 }
216
217 pub fn discard<O>(self) -> Task<O>
221 where
222 T: MaybeSend + 'static,
223 O: MaybeSend + 'static,
224 {
225 self.then(|_| Task::none())
226 }
227
228 pub fn abortable(self) -> (Self, Handle)
230 where
231 T: 'static,
232 {
233 let (stream, handle) = match self.stream {
234 Some(stream) => {
235 let (stream, handle) = stream::abortable(stream);
236
237 (Some(boxed_stream(stream)), InternalHandle::Manual(handle))
238 }
239 None => (
240 None,
241 InternalHandle::Manual(stream::AbortHandle::new_pair().0),
242 ),
243 };
244
245 (
246 Self {
247 stream,
248 units: self.units,
249 },
250 Handle { internal: handle },
251 )
252 }
253
254 pub fn future(future: impl Future<Output = T> + MaybeSend + 'static) -> Self
257 where
258 T: 'static,
259 {
260 Self::stream(stream::once(future))
261 }
262
263 pub fn stream(stream: impl Stream<Item = T> + MaybeSend + 'static) -> Self
266 where
267 T: 'static,
268 {
269 Self {
270 stream: Some(boxed_stream(stream.map(Action::Output))),
271 units: 1,
272 }
273 }
274
275 pub fn units(&self) -> usize {
277 self.units
278 }
279}
280
281impl<T> std::fmt::Debug for Task<T> {
282 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
283 f.debug_struct(&format!("Task<{}>", std::any::type_name::<T>()))
284 .field("units", &self.units)
285 .finish()
286 }
287}
288
289#[derive(Debug, Clone)]
291pub struct Handle {
292 internal: InternalHandle,
293}
294
295#[derive(Debug, Clone)]
296enum InternalHandle {
297 Manual(stream::AbortHandle),
298 AbortOnDrop(Arc<stream::AbortHandle>),
299}
300
301impl InternalHandle {
302 pub fn as_ref(&self) -> &stream::AbortHandle {
303 match self {
304 InternalHandle::Manual(handle) => handle,
305 InternalHandle::AbortOnDrop(handle) => handle.as_ref(),
306 }
307 }
308}
309
310impl Handle {
311 pub fn abort(&self) {
313 self.internal.as_ref().abort();
314 }
315
316 pub fn abort_on_drop(self) -> Self {
325 match &self.internal {
326 InternalHandle::Manual(handle) => Self {
327 internal: InternalHandle::AbortOnDrop(Arc::new(handle.clone())),
328 },
329 InternalHandle::AbortOnDrop(_) => self,
330 }
331 }
332
333 pub fn is_aborted(&self) -> bool {
335 self.internal.as_ref().is_aborted()
336 }
337}
338
339impl Drop for Handle {
340 fn drop(&mut self) {
341 if let InternalHandle::AbortOnDrop(handle) = &mut self.internal {
342 let handle = std::mem::replace(
343 handle,
344 Arc::new(stream::AbortHandle::new_pair().0),
345 );
346
347 if let Some(handle) = Arc::into_inner(handle) {
348 handle.abort();
349 }
350 }
351 }
352}
353
354impl<T> Task<Option<T>> {
355 pub fn and_then<A>(
359 self,
360 f: impl Fn(T) -> Task<A> + MaybeSend + 'static,
361 ) -> Task<A>
362 where
363 T: MaybeSend + 'static,
364 A: MaybeSend + 'static,
365 {
366 self.then(move |option| option.map_or_else(Task::none, &f))
367 }
368}
369
370impl<T, E> Task<Result<T, E>> {
371 pub fn and_then<A>(
375 self,
376 f: impl Fn(T) -> Task<A> + MaybeSend + 'static,
377 ) -> Task<A>
378 where
379 T: MaybeSend + 'static,
380 E: MaybeSend + 'static,
381 A: MaybeSend + 'static,
382 {
383 self.then(move |option| option.map_or_else(|_| Task::none(), &f))
384 }
385}
386
387impl<T> Default for Task<T> {
388 fn default() -> Self {
389 Self::none()
390 }
391}
392
393impl<T> From<()> for Task<T> {
394 fn from(_value: ()) -> Self {
395 Self::none()
396 }
397}
398
399pub fn widget<T>(operation: impl widget::Operation<T> + 'static) -> Task<T>
402where
403 T: Send + 'static,
404{
405 channel(move |sender| {
406 let operation =
407 widget::operation::map(Box::new(operation), move |value| {
408 let _ = sender.clone().try_send(value);
409 });
410
411 Action::Widget(Box::new(operation))
412 })
413}
414
415pub fn oneshot<T>(f: impl FnOnce(oneshot::Sender<T>) -> Action<T>) -> Task<T>
418where
419 T: MaybeSend + 'static,
420{
421 let (sender, receiver) = oneshot::channel();
422
423 let action = f(sender);
424
425 Task {
426 stream: Some(boxed_stream(stream::once(async move { action }).chain(
427 receiver.into_stream().filter_map(|result| async move {
428 Some(Action::Output(result.ok()?))
429 }),
430 ))),
431 units: 1,
432 }
433}
434
435pub fn channel<T>(f: impl FnOnce(mpsc::Sender<T>) -> Action<T>) -> Task<T>
438where
439 T: MaybeSend + 'static,
440{
441 let (sender, receiver) = mpsc::channel(1);
442
443 let action = f(sender);
444
445 Task {
446 stream: Some(boxed_stream(
447 stream::once(async move { action })
448 .chain(receiver.map(|result| Action::Output(result))),
449 )),
450 units: 1,
451 }
452}
453
454pub fn effect<T>(action: impl Into<Action<Infallible>>) -> Task<T> {
456 let action = action.into();
457
458 Task {
459 stream: Some(boxed_stream(stream::once(async move {
460 action.output().expect_err("no output")
461 }))),
462 units: 1,
463 }
464}
465
466pub fn into_stream<T>(task: Task<T>) -> Option<BoxStream<Action<T>>> {
468 task.stream
469}
470
471pub fn blocking<T>(f: impl FnOnce(mpsc::Sender<T>) + Send + 'static) -> Task<T>
476where
477 T: Send + 'static,
478{
479 let (sender, receiver) = mpsc::channel(1);
480
481 let _ = thread::spawn(move || {
482 f(sender);
483 });
484
485 Task::stream(receiver)
486}
487
488pub fn try_blocking<T, E>(
494 f: impl FnOnce(mpsc::Sender<T>) -> Result<(), E> + Send + 'static,
495) -> Task<Result<T, E>>
496where
497 T: Send + 'static,
498 E: Send + 'static,
499{
500 let (sender, receiver) = mpsc::channel(1);
501 let (error_sender, error_receiver) = oneshot::channel();
502
503 let _ = thread::spawn(move || {
504 if let Err(error) = f(sender) {
505 let _ = error_sender.send(Err(error));
506 }
507 });
508
509 Task::stream(stream::select(
510 receiver.map(Ok),
511 stream::once(error_receiver).filter_map(async |result| result.ok()),
512 ))
513}