1use crate::Action;
3use crate::core::widget;
4use crate::futures::futures::channel::mpsc;
5use crate::futures::futures::channel::oneshot;
6use crate::futures::futures::future::{self, FutureExt};
7use crate::futures::futures::stream::{self, Stream, StreamExt};
8use crate::futures::{BoxStream, MaybeSend, boxed_stream};
9
10use std::sync::Arc;
11
12#[doc(no_inline)]
13pub use sipper::{Never, Sender, Sipper, Straw, sipper, stream};
14
15#[allow(missing_debug_implementations)]
19#[must_use = "`Task` must be returned to the runtime to take effect; normally in your `update` or `new` functions."]
20pub struct Task<T>(Option<BoxStream<Action<T>>>);
21
22impl<T> Task<T> {
23 pub fn none() -> Self {
25 Self(None)
26 }
27
28 pub fn done(value: T) -> Self
30 where
31 T: MaybeSend + 'static,
32 {
33 Self::future(future::ready(value))
34 }
35
36 pub fn perform<A>(
39 future: impl Future<Output = A> + MaybeSend + 'static,
40 f: impl FnOnce(A) -> T + MaybeSend + 'static,
41 ) -> Self
42 where
43 T: MaybeSend + 'static,
44 A: MaybeSend + 'static,
45 {
46 Self::future(future.map(f))
47 }
48
49 pub fn run<A>(
52 stream: impl Stream<Item = A> + MaybeSend + 'static,
53 f: impl Fn(A) -> T + MaybeSend + 'static,
54 ) -> Self
55 where
56 T: 'static,
57 {
58 Self::stream(stream.map(f))
59 }
60
61 pub fn sip<S>(
64 sipper: S,
65 on_progress: impl FnMut(S::Progress) -> T + MaybeSend + 'static,
66 on_output: impl FnOnce(<S as Future>::Output) -> T + MaybeSend + 'static,
67 ) -> Self
68 where
69 S: sipper::Core + MaybeSend + 'static,
70 T: MaybeSend + 'static,
71 {
72 Self::stream(stream(sipper::sipper(move |sender| async move {
73 on_output(sipper.with(on_progress).run(sender).await)
74 })))
75 }
76
77 pub fn batch(tasks: impl IntoIterator<Item = Self>) -> Self
80 where
81 T: 'static,
82 {
83 Self(Some(boxed_stream(stream::select_all(
84 tasks.into_iter().filter_map(|task| task.0),
85 ))))
86 }
87
88 pub fn map<O>(
90 self,
91 mut f: impl FnMut(T) -> O + MaybeSend + 'static,
92 ) -> Task<O>
93 where
94 T: MaybeSend + 'static,
95 O: MaybeSend + 'static,
96 {
97 self.then(move |output| Task::done(f(output)))
98 }
99
100 pub fn then<O>(
106 self,
107 mut f: impl FnMut(T) -> Task<O> + MaybeSend + 'static,
108 ) -> Task<O>
109 where
110 T: MaybeSend + 'static,
111 O: MaybeSend + 'static,
112 {
113 Task(match self.0 {
114 None => None,
115 Some(stream) => {
116 Some(boxed_stream(stream.flat_map(move |action| {
117 match action.output() {
118 Ok(output) => f(output)
119 .0
120 .unwrap_or_else(|| boxed_stream(stream::empty())),
121 Err(action) => {
122 boxed_stream(stream::once(async move { action }))
123 }
124 }
125 })))
126 }
127 })
128 }
129
130 pub fn chain(self, task: Self) -> Self
132 where
133 T: 'static,
134 {
135 match self.0 {
136 None => task,
137 Some(first) => match task.0 {
138 None => Task(Some(first)),
139 Some(second) => Task(Some(boxed_stream(first.chain(second)))),
140 },
141 }
142 }
143
144 pub fn collect(self) -> Task<Vec<T>>
146 where
147 T: MaybeSend + 'static,
148 {
149 match self.0 {
150 None => Task::done(Vec::new()),
151 Some(stream) => Task(Some(boxed_stream(
152 stream::unfold(
153 (stream, Some(Vec::new())),
154 move |(mut stream, outputs)| async move {
155 let mut outputs = outputs?;
156
157 let Some(action) = stream.next().await else {
158 return Some((
159 Some(Action::Output(outputs)),
160 (stream, None),
161 ));
162 };
163
164 match action.output() {
165 Ok(output) => {
166 outputs.push(output);
167
168 Some((None, (stream, Some(outputs))))
169 }
170 Err(action) => {
171 Some((Some(action), (stream, Some(outputs))))
172 }
173 }
174 },
175 )
176 .filter_map(future::ready),
177 ))),
178 }
179 }
180
181 pub fn discard<O>(self) -> Task<O>
185 where
186 T: MaybeSend + 'static,
187 O: MaybeSend + 'static,
188 {
189 self.then(|_| Task::none())
190 }
191
192 pub fn abortable(self) -> (Self, Handle)
194 where
195 T: 'static,
196 {
197 match self.0 {
198 Some(stream) => {
199 let (stream, handle) = stream::abortable(stream);
200
201 (
202 Self(Some(boxed_stream(stream))),
203 Handle {
204 internal: InternalHandle::Manual(handle),
205 },
206 )
207 }
208 None => (
209 Self(None),
210 Handle {
211 internal: InternalHandle::Manual(
212 stream::AbortHandle::new_pair().0,
213 ),
214 },
215 ),
216 }
217 }
218
219 pub fn future(future: impl Future<Output = T> + MaybeSend + 'static) -> Self
222 where
223 T: 'static,
224 {
225 Self::stream(stream::once(future))
226 }
227
228 pub fn stream(stream: impl Stream<Item = T> + MaybeSend + 'static) -> Self
231 where
232 T: 'static,
233 {
234 Self(Some(boxed_stream(stream.map(Action::Output))))
235 }
236}
237
238#[derive(Debug, Clone)]
240pub struct Handle {
241 internal: InternalHandle,
242}
243
244#[derive(Debug, Clone)]
245enum InternalHandle {
246 Manual(stream::AbortHandle),
247 AbortOnDrop(Arc<stream::AbortHandle>),
248}
249
250impl InternalHandle {
251 pub fn as_ref(&self) -> &stream::AbortHandle {
252 match self {
253 InternalHandle::Manual(handle) => handle,
254 InternalHandle::AbortOnDrop(handle) => handle.as_ref(),
255 }
256 }
257}
258
259impl Handle {
260 pub fn abort(&self) {
262 self.internal.as_ref().abort();
263 }
264
265 pub fn abort_on_drop(self) -> Self {
274 match &self.internal {
275 InternalHandle::Manual(handle) => Self {
276 internal: InternalHandle::AbortOnDrop(Arc::new(handle.clone())),
277 },
278 InternalHandle::AbortOnDrop(_) => self,
279 }
280 }
281
282 pub fn is_aborted(&self) -> bool {
284 self.internal.as_ref().is_aborted()
285 }
286}
287
288impl Drop for Handle {
289 fn drop(&mut self) {
290 if let InternalHandle::AbortOnDrop(handle) = &mut self.internal {
291 let handle = std::mem::replace(
292 handle,
293 Arc::new(stream::AbortHandle::new_pair().0),
294 );
295
296 if let Some(handle) = Arc::into_inner(handle) {
297 handle.abort();
298 }
299 }
300 }
301}
302
303impl<T> Task<Option<T>> {
304 pub fn and_then<A>(
308 self,
309 f: impl Fn(T) -> Task<A> + MaybeSend + 'static,
310 ) -> Task<A>
311 where
312 T: MaybeSend + 'static,
313 A: MaybeSend + 'static,
314 {
315 self.then(move |option| option.map_or_else(Task::none, &f))
316 }
317}
318
319impl<T, E> Task<Result<T, E>> {
320 pub fn and_then<A>(
324 self,
325 f: impl Fn(T) -> Task<A> + MaybeSend + 'static,
326 ) -> Task<A>
327 where
328 T: MaybeSend + 'static,
329 E: MaybeSend + 'static,
330 A: MaybeSend + 'static,
331 {
332 self.then(move |option| option.map_or_else(|_| Task::none(), &f))
333 }
334}
335
336impl<T> From<()> for Task<T> {
337 fn from(_value: ()) -> Self {
338 Self::none()
339 }
340}
341
342pub fn widget<T>(operation: impl widget::Operation<T> + 'static) -> Task<T>
345where
346 T: Send + 'static,
347{
348 channel(move |sender| {
349 let operation =
350 widget::operation::map(Box::new(operation), move |value| {
351 let _ = sender.clone().try_send(value);
352 });
353
354 Action::Widget(Box::new(operation))
355 })
356}
357
358pub fn oneshot<T>(f: impl FnOnce(oneshot::Sender<T>) -> Action<T>) -> Task<T>
361where
362 T: MaybeSend + 'static,
363{
364 let (sender, receiver) = oneshot::channel();
365
366 let action = f(sender);
367
368 Task(Some(boxed_stream(
369 stream::once(async move { action }).chain(
370 receiver.into_stream().filter_map(|result| async move {
371 Some(Action::Output(result.ok()?))
372 }),
373 ),
374 )))
375}
376
377pub fn channel<T>(f: impl FnOnce(mpsc::Sender<T>) -> Action<T>) -> Task<T>
380where
381 T: MaybeSend + 'static,
382{
383 let (sender, receiver) = mpsc::channel(1);
384
385 let action = f(sender);
386
387 Task(Some(boxed_stream(
388 stream::once(async move { action })
389 .chain(receiver.map(|result| Action::Output(result))),
390 )))
391}
392
393pub fn effect<T>(action: impl Into<Action<Never>>) -> Task<T> {
395 let action = action.into();
396
397 Task(Some(boxed_stream(stream::once(async move {
398 action.output().expect_err("no output")
399 }))))
400}
401
402pub fn into_stream<T>(task: Task<T>) -> Option<BoxStream<Action<T>>> {
404 task.0
405}