1use crate::Action;
3use crate::core::widget;
4use crate::futures::futures::channel::mpsc;
5use crate::futures::futures::channel::oneshot;
6use crate::futures::futures::future::{self, FutureExt};
7use crate::futures::futures::stream::{self, Stream, StreamExt};
8use crate::futures::{BoxStream, MaybeSend, boxed_stream};
9
10use std::convert::Infallible;
11use std::sync::Arc;
12
13#[cfg(feature = "sipper")]
14#[doc(no_inline)]
15pub use sipper::{Never, Sender, Sipper, Straw, sipper, stream};
16
17#[must_use = "`Task` must be returned to the runtime to take effect; normally in your `update` or `new` functions."]
21pub struct Task<T> {
22 stream: Option<BoxStream<Action<T>>>,
23 units: usize,
24}
25
26impl<T> Task<T> {
27 pub fn none() -> Self {
29 Self {
30 stream: None,
31 units: 0,
32 }
33 }
34
35 pub fn done(value: T) -> Self
37 where
38 T: MaybeSend + 'static,
39 {
40 Self::future(future::ready(value))
41 }
42
43 pub fn perform<A>(
46 future: impl Future<Output = A> + MaybeSend + 'static,
47 f: impl FnOnce(A) -> T + MaybeSend + 'static,
48 ) -> Self
49 where
50 T: MaybeSend + 'static,
51 A: MaybeSend + 'static,
52 {
53 Self::future(future.map(f))
54 }
55
56 pub fn run<A>(
59 stream: impl Stream<Item = A> + MaybeSend + 'static,
60 f: impl Fn(A) -> T + MaybeSend + 'static,
61 ) -> Self
62 where
63 T: 'static,
64 {
65 Self::stream(stream.map(f))
66 }
67
68 #[cfg(feature = "sipper")]
71 pub fn sip<S>(
72 sipper: S,
73 on_progress: impl FnMut(S::Progress) -> T + MaybeSend + 'static,
74 on_output: impl FnOnce(<S as Future>::Output) -> T + MaybeSend + 'static,
75 ) -> Self
76 where
77 S: sipper::Core + MaybeSend + 'static,
78 T: MaybeSend + 'static,
79 {
80 Self::stream(stream(sipper::sipper(move |sender| async move {
81 on_output(sipper.with(on_progress).run(sender).await)
82 })))
83 }
84
85 pub fn batch(tasks: impl IntoIterator<Item = Self>) -> Self
88 where
89 T: 'static,
90 {
91 let mut select_all = stream::SelectAll::new();
92 let mut units = 0;
93
94 for task in tasks.into_iter() {
95 if let Some(stream) = task.stream {
96 select_all.push(stream);
97 }
98
99 units += task.units;
100 }
101
102 Self {
103 stream: Some(boxed_stream(select_all)),
104 units,
105 }
106 }
107
108 pub fn map<O>(
110 self,
111 mut f: impl FnMut(T) -> O + MaybeSend + 'static,
112 ) -> Task<O>
113 where
114 T: MaybeSend + 'static,
115 O: MaybeSend + 'static,
116 {
117 self.then(move |output| Task::done(f(output)))
118 }
119
120 pub fn then<O>(
126 self,
127 mut f: impl FnMut(T) -> Task<O> + MaybeSend + 'static,
128 ) -> Task<O>
129 where
130 T: MaybeSend + 'static,
131 O: MaybeSend + 'static,
132 {
133 Task {
134 stream: match self.stream {
135 None => None,
136 Some(stream) => {
137 Some(boxed_stream(stream.flat_map(move |action| {
138 match action.output() {
139 Ok(output) => {
140 f(output).stream.unwrap_or_else(|| {
141 boxed_stream(stream::empty())
142 })
143 }
144 Err(action) => boxed_stream(stream::once(
145 async move { action },
146 )),
147 }
148 })))
149 }
150 },
151 units: self.units,
152 }
153 }
154
155 pub fn chain(self, task: Self) -> Self
157 where
158 T: 'static,
159 {
160 match self.stream {
161 None => task,
162 Some(first) => match task.stream {
163 None => Self {
164 stream: Some(first),
165 units: self.units,
166 },
167 Some(second) => Self {
168 stream: Some(boxed_stream(first.chain(second))),
169 units: self.units + task.units,
170 },
171 },
172 }
173 }
174
175 pub fn collect(self) -> Task<Vec<T>>
177 where
178 T: MaybeSend + 'static,
179 {
180 match self.stream {
181 None => Task::done(Vec::new()),
182 Some(stream) => Task {
183 stream: Some(boxed_stream(
184 stream::unfold(
185 (stream, Some(Vec::new())),
186 move |(mut stream, outputs)| async move {
187 let mut outputs = outputs?;
188
189 let Some(action) = stream.next().await else {
190 return Some((
191 Some(Action::Output(outputs)),
192 (stream, None),
193 ));
194 };
195
196 match action.output() {
197 Ok(output) => {
198 outputs.push(output);
199
200 Some((None, (stream, Some(outputs))))
201 }
202 Err(action) => Some((
203 Some(action),
204 (stream, Some(outputs)),
205 )),
206 }
207 },
208 )
209 .filter_map(future::ready),
210 )),
211 units: self.units,
212 },
213 }
214 }
215
216 pub fn discard<O>(self) -> Task<O>
220 where
221 T: MaybeSend + 'static,
222 O: MaybeSend + 'static,
223 {
224 self.then(|_| Task::none())
225 }
226
227 pub fn abortable(self) -> (Self, Handle)
229 where
230 T: 'static,
231 {
232 let (stream, handle) = match self.stream {
233 Some(stream) => {
234 let (stream, handle) = stream::abortable(stream);
235
236 (Some(boxed_stream(stream)), InternalHandle::Manual(handle))
237 }
238 None => (
239 None,
240 InternalHandle::Manual(stream::AbortHandle::new_pair().0),
241 ),
242 };
243
244 (
245 Self {
246 stream,
247 units: self.units,
248 },
249 Handle { internal: handle },
250 )
251 }
252
253 pub fn future(future: impl Future<Output = T> + MaybeSend + 'static) -> Self
256 where
257 T: 'static,
258 {
259 Self::stream(stream::once(future))
260 }
261
262 pub fn stream(stream: impl Stream<Item = T> + MaybeSend + 'static) -> Self
265 where
266 T: 'static,
267 {
268 Self {
269 stream: Some(boxed_stream(stream.map(Action::Output))),
270 units: 1,
271 }
272 }
273
274 pub fn units(&self) -> usize {
276 self.units
277 }
278}
279
280impl<T> std::fmt::Debug for Task<T> {
281 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
282 f.debug_struct(&format!("Task<{}>", std::any::type_name::<T>()))
283 .field("units", &self.units)
284 .finish()
285 }
286}
287
288#[derive(Debug, Clone)]
290pub struct Handle {
291 internal: InternalHandle,
292}
293
294#[derive(Debug, Clone)]
295enum InternalHandle {
296 Manual(stream::AbortHandle),
297 AbortOnDrop(Arc<stream::AbortHandle>),
298}
299
300impl InternalHandle {
301 pub fn as_ref(&self) -> &stream::AbortHandle {
302 match self {
303 InternalHandle::Manual(handle) => handle,
304 InternalHandle::AbortOnDrop(handle) => handle.as_ref(),
305 }
306 }
307}
308
309impl Handle {
310 pub fn abort(&self) {
312 self.internal.as_ref().abort();
313 }
314
315 pub fn abort_on_drop(self) -> Self {
324 match &self.internal {
325 InternalHandle::Manual(handle) => Self {
326 internal: InternalHandle::AbortOnDrop(Arc::new(handle.clone())),
327 },
328 InternalHandle::AbortOnDrop(_) => self,
329 }
330 }
331
332 pub fn is_aborted(&self) -> bool {
334 self.internal.as_ref().is_aborted()
335 }
336}
337
338impl Drop for Handle {
339 fn drop(&mut self) {
340 if let InternalHandle::AbortOnDrop(handle) = &mut self.internal {
341 let handle = std::mem::replace(
342 handle,
343 Arc::new(stream::AbortHandle::new_pair().0),
344 );
345
346 if let Some(handle) = Arc::into_inner(handle) {
347 handle.abort();
348 }
349 }
350 }
351}
352
353impl<T> Task<Option<T>> {
354 pub fn and_then<A>(
358 self,
359 f: impl Fn(T) -> Task<A> + MaybeSend + 'static,
360 ) -> Task<A>
361 where
362 T: MaybeSend + 'static,
363 A: MaybeSend + 'static,
364 {
365 self.then(move |option| option.map_or_else(Task::none, &f))
366 }
367}
368
369impl<T, E> Task<Result<T, E>> {
370 pub fn and_then<A>(
374 self,
375 f: impl Fn(T) -> Task<A> + MaybeSend + 'static,
376 ) -> Task<A>
377 where
378 T: MaybeSend + 'static,
379 E: MaybeSend + 'static,
380 A: MaybeSend + 'static,
381 {
382 self.then(move |option| option.map_or_else(|_| Task::none(), &f))
383 }
384}
385
386impl<T> From<()> for Task<T> {
387 fn from(_value: ()) -> Self {
388 Self::none()
389 }
390}
391
392pub fn widget<T>(operation: impl widget::Operation<T> + 'static) -> Task<T>
395where
396 T: Send + 'static,
397{
398 channel(move |sender| {
399 let operation =
400 widget::operation::map(Box::new(operation), move |value| {
401 let _ = sender.clone().try_send(value);
402 });
403
404 Action::Widget(Box::new(operation))
405 })
406}
407
408pub fn oneshot<T>(f: impl FnOnce(oneshot::Sender<T>) -> Action<T>) -> Task<T>
411where
412 T: MaybeSend + 'static,
413{
414 let (sender, receiver) = oneshot::channel();
415
416 let action = f(sender);
417
418 Task {
419 stream: Some(boxed_stream(stream::once(async move { action }).chain(
420 receiver.into_stream().filter_map(|result| async move {
421 Some(Action::Output(result.ok()?))
422 }),
423 ))),
424 units: 1,
425 }
426}
427
428pub fn channel<T>(f: impl FnOnce(mpsc::Sender<T>) -> Action<T>) -> Task<T>
431where
432 T: MaybeSend + 'static,
433{
434 let (sender, receiver) = mpsc::channel(1);
435
436 let action = f(sender);
437
438 Task {
439 stream: Some(boxed_stream(
440 stream::once(async move { action })
441 .chain(receiver.map(|result| Action::Output(result))),
442 )),
443 units: 1,
444 }
445}
446
447pub fn effect<T>(action: impl Into<Action<Infallible>>) -> Task<T> {
449 let action = action.into();
450
451 Task {
452 stream: Some(boxed_stream(stream::once(async move {
453 action.output().expect_err("no output")
454 }))),
455 units: 1,
456 }
457}
458
459pub fn into_stream<T>(task: Task<T>) -> Option<BoxStream<Action<T>>> {
461 task.stream
462}