1use crate::Action;
3use crate::core::widget;
4use crate::futures::futures::channel::mpsc;
5use crate::futures::futures::channel::oneshot;
6use crate::futures::futures::future::{self, FutureExt};
7use crate::futures::futures::stream::{self, Stream, StreamExt};
8use crate::futures::{BoxStream, MaybeSend, boxed_stream};
9
10use std::convert::Infallible;
11use std::sync::Arc;
12
13#[cfg(feature = "sipper")]
14#[doc(no_inline)]
15pub use sipper::{Never, Sender, Sipper, Straw, sipper, stream};
16
17#[allow(missing_debug_implementations)]
21#[must_use = "`Task` must be returned to the runtime to take effect; normally in your `update` or `new` functions."]
22pub struct Task<T> {
23 stream: Option<BoxStream<Action<T>>>,
24 units: usize,
25}
26
27impl<T> Task<T> {
28 pub fn none() -> Self {
30 Self {
31 stream: None,
32 units: 0,
33 }
34 }
35
36 pub fn done(value: T) -> Self
38 where
39 T: MaybeSend + 'static,
40 {
41 Self::future(future::ready(value))
42 }
43
44 pub fn perform<A>(
47 future: impl Future<Output = A> + MaybeSend + 'static,
48 f: impl FnOnce(A) -> T + MaybeSend + 'static,
49 ) -> Self
50 where
51 T: MaybeSend + 'static,
52 A: MaybeSend + 'static,
53 {
54 Self::future(future.map(f))
55 }
56
57 pub fn run<A>(
60 stream: impl Stream<Item = A> + MaybeSend + 'static,
61 f: impl Fn(A) -> T + MaybeSend + 'static,
62 ) -> Self
63 where
64 T: 'static,
65 {
66 Self::stream(stream.map(f))
67 }
68
69 #[cfg(feature = "sipper")]
72 pub fn sip<S>(
73 sipper: S,
74 on_progress: impl FnMut(S::Progress) -> T + MaybeSend + 'static,
75 on_output: impl FnOnce(<S as Future>::Output) -> T + MaybeSend + 'static,
76 ) -> Self
77 where
78 S: sipper::Core + MaybeSend + 'static,
79 T: MaybeSend + 'static,
80 {
81 Self::stream(stream(sipper::sipper(move |sender| async move {
82 on_output(sipper.with(on_progress).run(sender).await)
83 })))
84 }
85
86 pub fn batch(tasks: impl IntoIterator<Item = Self>) -> Self
89 where
90 T: 'static,
91 {
92 let mut select_all = stream::SelectAll::new();
93 let mut units = 0;
94
95 for task in tasks.into_iter() {
96 if let Some(stream) = task.stream {
97 select_all.push(stream);
98 }
99
100 units += task.units;
101 }
102
103 Self {
104 stream: Some(boxed_stream(select_all)),
105 units,
106 }
107 }
108
109 pub fn map<O>(
111 self,
112 mut f: impl FnMut(T) -> O + MaybeSend + 'static,
113 ) -> Task<O>
114 where
115 T: MaybeSend + 'static,
116 O: MaybeSend + 'static,
117 {
118 self.then(move |output| Task::done(f(output)))
119 }
120
121 pub fn then<O>(
127 self,
128 mut f: impl FnMut(T) -> Task<O> + MaybeSend + 'static,
129 ) -> Task<O>
130 where
131 T: MaybeSend + 'static,
132 O: MaybeSend + 'static,
133 {
134 Task {
135 stream: match self.stream {
136 None => None,
137 Some(stream) => {
138 Some(boxed_stream(stream.flat_map(move |action| {
139 match action.output() {
140 Ok(output) => {
141 f(output).stream.unwrap_or_else(|| {
142 boxed_stream(stream::empty())
143 })
144 }
145 Err(action) => boxed_stream(stream::once(
146 async move { action },
147 )),
148 }
149 })))
150 }
151 },
152 units: self.units,
153 }
154 }
155
156 pub fn chain(self, task: Self) -> Self
158 where
159 T: 'static,
160 {
161 match self.stream {
162 None => task,
163 Some(first) => match task.stream {
164 None => Self {
165 stream: Some(first),
166 units: self.units,
167 },
168 Some(second) => Self {
169 stream: Some(boxed_stream(first.chain(second))),
170 units: self.units + task.units,
171 },
172 },
173 }
174 }
175
176 pub fn collect(self) -> Task<Vec<T>>
178 where
179 T: MaybeSend + 'static,
180 {
181 match self.stream {
182 None => Task::done(Vec::new()),
183 Some(stream) => Task {
184 stream: Some(boxed_stream(
185 stream::unfold(
186 (stream, Some(Vec::new())),
187 move |(mut stream, outputs)| async move {
188 let mut outputs = outputs?;
189
190 let Some(action) = stream.next().await else {
191 return Some((
192 Some(Action::Output(outputs)),
193 (stream, None),
194 ));
195 };
196
197 match action.output() {
198 Ok(output) => {
199 outputs.push(output);
200
201 Some((None, (stream, Some(outputs))))
202 }
203 Err(action) => Some((
204 Some(action),
205 (stream, Some(outputs)),
206 )),
207 }
208 },
209 )
210 .filter_map(future::ready),
211 )),
212 units: self.units,
213 },
214 }
215 }
216
217 pub fn discard<O>(self) -> Task<O>
221 where
222 T: MaybeSend + 'static,
223 O: MaybeSend + 'static,
224 {
225 self.then(|_| Task::none())
226 }
227
228 pub fn abortable(self) -> (Self, Handle)
230 where
231 T: 'static,
232 {
233 let (stream, handle) = match self.stream {
234 Some(stream) => {
235 let (stream, handle) = stream::abortable(stream);
236
237 (Some(boxed_stream(stream)), InternalHandle::Manual(handle))
238 }
239 None => (
240 None,
241 InternalHandle::Manual(stream::AbortHandle::new_pair().0),
242 ),
243 };
244
245 (
246 Self {
247 stream,
248 units: self.units,
249 },
250 Handle { internal: handle },
251 )
252 }
253
254 pub fn future(future: impl Future<Output = T> + MaybeSend + 'static) -> Self
257 where
258 T: 'static,
259 {
260 Self::stream(stream::once(future))
261 }
262
263 pub fn stream(stream: impl Stream<Item = T> + MaybeSend + 'static) -> Self
266 where
267 T: 'static,
268 {
269 Self {
270 stream: Some(boxed_stream(stream.map(Action::Output))),
271 units: 1,
272 }
273 }
274
275 pub fn units(&self) -> usize {
277 self.units
278 }
279}
280
281#[derive(Debug, Clone)]
283pub struct Handle {
284 internal: InternalHandle,
285}
286
287#[derive(Debug, Clone)]
288enum InternalHandle {
289 Manual(stream::AbortHandle),
290 AbortOnDrop(Arc<stream::AbortHandle>),
291}
292
293impl InternalHandle {
294 pub fn as_ref(&self) -> &stream::AbortHandle {
295 match self {
296 InternalHandle::Manual(handle) => handle,
297 InternalHandle::AbortOnDrop(handle) => handle.as_ref(),
298 }
299 }
300}
301
302impl Handle {
303 pub fn abort(&self) {
305 self.internal.as_ref().abort();
306 }
307
308 pub fn abort_on_drop(self) -> Self {
317 match &self.internal {
318 InternalHandle::Manual(handle) => Self {
319 internal: InternalHandle::AbortOnDrop(Arc::new(handle.clone())),
320 },
321 InternalHandle::AbortOnDrop(_) => self,
322 }
323 }
324
325 pub fn is_aborted(&self) -> bool {
327 self.internal.as_ref().is_aborted()
328 }
329}
330
331impl Drop for Handle {
332 fn drop(&mut self) {
333 if let InternalHandle::AbortOnDrop(handle) = &mut self.internal {
334 let handle = std::mem::replace(
335 handle,
336 Arc::new(stream::AbortHandle::new_pair().0),
337 );
338
339 if let Some(handle) = Arc::into_inner(handle) {
340 handle.abort();
341 }
342 }
343 }
344}
345
346impl<T> Task<Option<T>> {
347 pub fn and_then<A>(
351 self,
352 f: impl Fn(T) -> Task<A> + MaybeSend + 'static,
353 ) -> Task<A>
354 where
355 T: MaybeSend + 'static,
356 A: MaybeSend + 'static,
357 {
358 self.then(move |option| option.map_or_else(Task::none, &f))
359 }
360}
361
362impl<T, E> Task<Result<T, E>> {
363 pub fn and_then<A>(
367 self,
368 f: impl Fn(T) -> Task<A> + MaybeSend + 'static,
369 ) -> Task<A>
370 where
371 T: MaybeSend + 'static,
372 E: MaybeSend + 'static,
373 A: MaybeSend + 'static,
374 {
375 self.then(move |option| option.map_or_else(|_| Task::none(), &f))
376 }
377}
378
379impl<T> From<()> for Task<T> {
380 fn from(_value: ()) -> Self {
381 Self::none()
382 }
383}
384
385pub fn widget<T>(operation: impl widget::Operation<T> + 'static) -> Task<T>
388where
389 T: Send + 'static,
390{
391 channel(move |sender| {
392 let operation =
393 widget::operation::map(Box::new(operation), move |value| {
394 let _ = sender.clone().try_send(value);
395 });
396
397 Action::Widget(Box::new(operation))
398 })
399}
400
401pub fn oneshot<T>(f: impl FnOnce(oneshot::Sender<T>) -> Action<T>) -> Task<T>
404where
405 T: MaybeSend + 'static,
406{
407 let (sender, receiver) = oneshot::channel();
408
409 let action = f(sender);
410
411 Task {
412 stream: Some(boxed_stream(stream::once(async move { action }).chain(
413 receiver.into_stream().filter_map(|result| async move {
414 Some(Action::Output(result.ok()?))
415 }),
416 ))),
417 units: 1,
418 }
419}
420
421pub fn channel<T>(f: impl FnOnce(mpsc::Sender<T>) -> Action<T>) -> Task<T>
424where
425 T: MaybeSend + 'static,
426{
427 let (sender, receiver) = mpsc::channel(1);
428
429 let action = f(sender);
430
431 Task {
432 stream: Some(boxed_stream(
433 stream::once(async move { action })
434 .chain(receiver.map(|result| Action::Output(result))),
435 )),
436 units: 1,
437 }
438}
439
440pub fn effect<T>(action: impl Into<Action<Infallible>>) -> Task<T> {
442 let action = action.into();
443
444 Task {
445 stream: Some(boxed_stream(stream::once(async move {
446 action.output().expect_err("no output")
447 }))),
448 units: 1,
449 }
450}
451
452pub fn into_stream<T>(task: Task<T>) -> Option<BoxStream<Action<T>>> {
454 task.stream
455}