iced_futures/
subscription.rs

1//! Listen to external events in your application.
2mod tracker;
3
4pub use tracker::Tracker;
5
6use crate::core::event;
7use crate::core::window;
8use crate::futures::Stream;
9use crate::{BoxStream, MaybeSend};
10
11use std::any::TypeId;
12use std::hash::Hash;
13
14/// A subscription event.
15#[derive(Debug, Clone, PartialEq)]
16pub enum Event {
17    /// A user interacted with a user interface in a window.
18    Interaction {
19        /// The window holding the interface of the interaction.
20        window: window::Id,
21        /// The [`Event`] describing the interaction.
22        ///
23        /// [`Event`]: event::Event
24        event: event::Event,
25
26        /// The [`event::Status`] of the interaction.
27        status: event::Status,
28    },
29
30    /// A platform specific event.
31    PlatformSpecific(PlatformSpecific),
32}
33
34/// A platform specific event
35#[derive(Debug, Clone, PartialEq, Eq)]
36pub enum PlatformSpecific {
37    /// A MacOS specific event
38    MacOS(MacOS),
39}
40
41/// Describes an event specific to MacOS
42#[derive(Debug, Clone, PartialEq, Eq)]
43pub enum MacOS {
44    /// Triggered when the app receives an URL from the system
45    ///
46    /// _**Note:** For this event to be triggered, the executable needs to be properly [bundled]!_
47    ///
48    /// [bundled]: https://developer.apple.com/library/archive/documentation/CoreFoundation/Conceptual/CFBundles/BundleTypes/BundleTypes.html#//apple_ref/doc/uid/10000123i-CH101-SW19
49    ReceivedUrl(String),
50}
51
52/// A stream of runtime events.
53///
54/// It is the input of a [`Subscription`].
55pub type EventStream = BoxStream<Event>;
56
57/// The hasher used for identifying subscriptions.
58pub type Hasher = rustc_hash::FxHasher;
59
60/// A request to listen to external events.
61///
62/// Besides performing async actions on demand with `Task`, most
63/// applications also need to listen to external events passively.
64///
65/// A [`Subscription`] is normally provided to some runtime, like a `Task`,
66/// and it will generate events as long as the user keeps requesting it.
67///
68/// For instance, you can use a [`Subscription`] to listen to a `WebSocket`
69/// connection, keyboard presses, mouse events, time ticks, etc.
70///
71/// # The Lifetime of a [`Subscription`]
72/// Much like a [`Future`] or a [`Stream`], a [`Subscription`] does not produce any effects
73/// on its own. For a [`Subscription`] to run, it must be returned to the iced runtime—normally
74/// in the `subscription` function of an `application` or a `daemon`.
75///
76/// When a [`Subscription`] is provided to the runtime for the first time, the runtime will
77/// start running it asynchronously. Running a [`Subscription`] consists in building its underlying
78/// [`Stream`] and executing it in an async runtime.
79///
80/// Therefore, you can think of a [`Subscription`] as a "stream builder". It simply represents a way
81/// to build a certain [`Stream`] together with some way to _identify_ it.
82///
83/// Identification is important because when a specific [`Subscription`] stops being returned to the
84/// iced runtime, the runtime will kill its associated [`Stream`]. The runtime uses the identity of a
85/// [`Subscription`] to keep track of it.
86///
87/// This way, iced allows you to declaratively __subscribe__ to particular streams of data temporarily
88/// and whenever necessary.
89///
90/// ```
91/// # mod iced {
92/// #     pub mod time {
93/// #         pub use iced_futures::backend::default::time::every;
94/// #         pub use std::time::{Duration, Instant};
95/// #     }
96/// #
97/// #     pub use iced_futures::Subscription;
98/// # }
99/// use iced::time::{self, Duration, Instant};
100/// use iced::Subscription;
101///
102/// struct State {
103///     timer_enabled: bool,
104/// }
105///
106/// fn subscription(state: &State) -> Subscription<Instant> {
107///     if state.timer_enabled {
108///         time::every(Duration::from_secs(1))
109///     } else {
110///         Subscription::none()
111///     }
112/// }
113/// ```
114///
115/// [`Future`]: std::future::Future
116#[must_use = "`Subscription` must be returned to the runtime to take effect; normally in your `subscription` function."]
117pub struct Subscription<T> {
118    recipes: Vec<Box<dyn Recipe<Output = T>>>,
119}
120
121impl<T> Subscription<T> {
122    /// Returns an empty [`Subscription`] that will not produce any output.
123    pub fn none() -> Self {
124        Self {
125            recipes: Vec::new(),
126        }
127    }
128
129    /// Returns a [`Subscription`] that will call the given function to create and
130    /// asynchronously run the given [`Stream`].
131    ///
132    /// # Creating an asynchronous worker with bidirectional communication
133    /// You can leverage this helper to create a [`Subscription`] that spawns
134    /// an asynchronous worker in the background and establish a channel of
135    /// communication with an `iced` application.
136    ///
137    /// You can achieve this by creating an `mpsc` channel inside the closure
138    /// and returning the `Sender` as a `Message` for the `Application`:
139    ///
140    /// ```
141    /// # mod iced {
142    /// #     pub use iced_futures::Subscription;   
143    /// #     pub use iced_futures::futures;
144    /// #     pub use iced_futures::stream;
145    /// # }
146    /// use iced::futures::channel::mpsc;
147    /// use iced::futures::sink::SinkExt;
148    /// use iced::futures::Stream;
149    /// use iced::stream;
150    /// use iced::Subscription;
151    ///
152    /// pub enum Event {
153    ///     Ready(mpsc::Sender<Input>),
154    ///     WorkFinished,
155    ///     // ...
156    /// }
157    ///
158    /// enum Input {
159    ///     DoSomeWork,
160    ///     // ...
161    /// }
162    ///
163    /// fn some_worker() -> impl Stream<Item = Event> {
164    ///     stream::channel(100, async |mut output| {
165    ///         // Create channel
166    ///         let (sender, mut receiver) = mpsc::channel(100);
167    ///
168    ///         // Send the sender back to the application
169    ///         output.send(Event::Ready(sender)).await;
170    ///
171    ///         loop {
172    ///             use iced_futures::futures::StreamExt;
173    ///
174    ///             // Read next input sent from `Application`
175    ///             let input = receiver.select_next_some().await;
176    ///
177    ///             match input {
178    ///                 Input::DoSomeWork => {
179    ///                     // Do some async work...
180    ///
181    ///                     // Finally, we can optionally produce a message to tell the
182    ///                     // `Application` the work is done
183    ///                     output.send(Event::WorkFinished).await;
184    ///                 }
185    ///             }
186    ///         }
187    ///     })
188    /// }
189    ///
190    /// fn subscription() -> Subscription<Event> {
191    ///     Subscription::run(some_worker)
192    /// }
193    /// ```
194    ///
195    /// Check out the [`websocket`] example, which showcases this pattern to maintain a `WebSocket`
196    /// connection open.
197    ///
198    /// [`websocket`]: https://github.com/iced-rs/iced/tree/0.13/examples/websocket
199    pub fn run<S>(builder: fn() -> S) -> Self
200    where
201        S: Stream<Item = T> + MaybeSend + 'static,
202        T: 'static,
203    {
204        from_recipe(Runner {
205            data: builder,
206            spawn: |builder, _| builder(),
207        })
208    }
209
210    /// Returns a [`Subscription`] that will create and asynchronously run the
211    /// given [`Stream`].
212    ///
213    /// Both the `data` and the function pointer will be used to uniquely identify
214    /// the [`Subscription`].
215    pub fn run_with<D, S>(data: D, builder: fn(&D) -> S) -> Self
216    where
217        D: Hash + 'static,
218        S: Stream<Item = T> + MaybeSend + 'static,
219        T: 'static,
220    {
221        from_recipe(Runner {
222            data: (data, builder),
223            spawn: |(data, builder), _| builder(data),
224        })
225    }
226
227    /// Batches all the provided subscriptions and returns the resulting
228    /// [`Subscription`].
229    pub fn batch(
230        subscriptions: impl IntoIterator<Item = Subscription<T>>,
231    ) -> Self {
232        Self {
233            recipes: subscriptions
234                .into_iter()
235                .flat_map(|subscription| subscription.recipes)
236                .collect(),
237        }
238    }
239
240    /// Adds a value to the [`Subscription`] context.
241    ///
242    /// The value will be part of the identity of a [`Subscription`].
243    pub fn with<A>(mut self, value: A) -> Subscription<(A, T)>
244    where
245        T: 'static,
246        A: std::hash::Hash + Clone + Send + Sync + 'static,
247    {
248        Subscription {
249            recipes: self
250                .recipes
251                .drain(..)
252                .map(|recipe| {
253                    Box::new(With::new(recipe, value.clone()))
254                        as Box<dyn Recipe<Output = (A, T)>>
255                })
256                .collect(),
257        }
258    }
259
260    /// Transforms the [`Subscription`] output with the given function.
261    ///
262    /// # Panics
263    /// The closure provided must be a non-capturing closure. The method
264    /// will panic in debug mode otherwise.
265    pub fn map<F, A>(mut self, f: F) -> Subscription<A>
266    where
267        T: 'static,
268        F: Fn(T) -> A + MaybeSend + Clone + 'static,
269        A: 'static,
270    {
271        debug_assert!(
272            std::mem::size_of::<F>() == 0,
273            "the closure {} provided in `Subscription::map` is capturing",
274            std::any::type_name::<F>(),
275        );
276
277        Subscription {
278            recipes: self
279                .recipes
280                .drain(..)
281                .map(move |recipe| {
282                    Box::new(Map::new(recipe, f.clone()))
283                        as Box<dyn Recipe<Output = A>>
284                })
285                .collect(),
286        }
287    }
288
289    /// Returns the amount of recipe units in this [`Subscription`].
290    pub fn units(&self) -> usize {
291        self.recipes.len()
292    }
293}
294
295/// Creates a [`Subscription`] from a [`Recipe`] describing it.
296pub fn from_recipe<T>(
297    recipe: impl Recipe<Output = T> + 'static,
298) -> Subscription<T> {
299    Subscription {
300        recipes: vec![Box::new(recipe)],
301    }
302}
303
304/// Returns the different recipes of the [`Subscription`].
305pub fn into_recipes<T>(
306    subscription: Subscription<T>,
307) -> Vec<Box<dyn Recipe<Output = T>>> {
308    subscription.recipes
309}
310
311impl<T> std::fmt::Debug for Subscription<T> {
312    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
313        f.debug_struct("Subscription").finish()
314    }
315}
316
317/// The description of a [`Subscription`].
318///
319/// A [`Recipe`] is the internal definition of a [`Subscription`]. It is used
320/// by runtimes to run and identify subscriptions. You can use it to create your
321/// own!
322///
323/// # Examples
324/// The repository has a couple of [examples] that use a custom [`Recipe`]:
325///
326/// - [`download_progress`], a basic application that asynchronously downloads
327///   a dummy file of 100 MB and tracks the download progress.
328/// - [`stopwatch`], a watch with start/stop and reset buttons showcasing how
329///   to listen to time.
330///
331/// [examples]: https://github.com/iced-rs/iced/tree/0.13/examples
332/// [`download_progress`]: https://github.com/iced-rs/iced/tree/0.13/examples/download_progress
333/// [`stopwatch`]: https://github.com/iced-rs/iced/tree/0.13/examples/stopwatch
334pub trait Recipe {
335    /// The events that will be produced by a [`Subscription`] with this
336    /// [`Recipe`].
337    type Output;
338
339    /// Hashes the [`Recipe`].
340    ///
341    /// This is used by runtimes to uniquely identify a [`Subscription`].
342    fn hash(&self, state: &mut Hasher);
343
344    /// Executes the [`Recipe`] and produces the stream of events of its
345    /// [`Subscription`].
346    fn stream(self: Box<Self>, input: EventStream) -> BoxStream<Self::Output>;
347}
348
349struct Map<A, B, F>
350where
351    F: Fn(A) -> B + 'static,
352{
353    recipe: Box<dyn Recipe<Output = A>>,
354    mapper: F,
355}
356
357impl<A, B, F> Map<A, B, F>
358where
359    F: Fn(A) -> B + 'static,
360{
361    fn new(recipe: Box<dyn Recipe<Output = A>>, mapper: F) -> Self {
362        Map { recipe, mapper }
363    }
364}
365
366impl<A, B, F> Recipe for Map<A, B, F>
367where
368    A: 'static,
369    B: 'static,
370    F: Fn(A) -> B + 'static + MaybeSend,
371{
372    type Output = B;
373
374    fn hash(&self, state: &mut Hasher) {
375        TypeId::of::<F>().hash(state);
376        self.recipe.hash(state);
377    }
378
379    fn stream(self: Box<Self>, input: EventStream) -> BoxStream<Self::Output> {
380        use futures::StreamExt;
381
382        let mapper = self.mapper;
383
384        Box::pin(self.recipe.stream(input).map(mapper))
385    }
386}
387
388struct With<A, B> {
389    recipe: Box<dyn Recipe<Output = A>>,
390    value: B,
391}
392
393impl<A, B> With<A, B> {
394    fn new(recipe: Box<dyn Recipe<Output = A>>, value: B) -> Self {
395        With { recipe, value }
396    }
397}
398
399impl<A, B> Recipe for With<A, B>
400where
401    A: 'static,
402    B: 'static + std::hash::Hash + Clone + Send + Sync,
403{
404    type Output = (B, A);
405
406    fn hash(&self, state: &mut Hasher) {
407        std::any::TypeId::of::<B>().hash(state);
408        self.value.hash(state);
409        self.recipe.hash(state);
410    }
411
412    fn stream(self: Box<Self>, input: EventStream) -> BoxStream<Self::Output> {
413        use futures::StreamExt;
414
415        let value = self.value;
416
417        Box::pin(
418            self.recipe
419                .stream(input)
420                .map(move |element| (value.clone(), element)),
421        )
422    }
423}
424
425pub(crate) fn filter_map<I, F, T>(id: I, f: F) -> Subscription<T>
426where
427    I: Hash + 'static,
428    F: Fn(Event) -> Option<T> + MaybeSend + 'static,
429    T: 'static + MaybeSend,
430{
431    from_recipe(Runner {
432        data: id,
433        spawn: |_, events| {
434            use futures::future;
435            use futures::stream::StreamExt;
436
437            events.filter_map(move |event| future::ready(f(event)))
438        },
439    })
440}
441
442struct Runner<I, F, S, T>
443where
444    F: FnOnce(&I, EventStream) -> S,
445    S: Stream<Item = T>,
446{
447    data: I,
448    spawn: F,
449}
450
451impl<I, F, S, T> Recipe for Runner<I, F, S, T>
452where
453    I: Hash + 'static,
454    F: FnOnce(&I, EventStream) -> S,
455    S: Stream<Item = T> + MaybeSend + 'static,
456{
457    type Output = T;
458
459    fn hash(&self, state: &mut Hasher) {
460        std::any::TypeId::of::<I>().hash(state);
461        self.data.hash(state);
462    }
463
464    fn stream(self: Box<Self>, input: EventStream) -> BoxStream<Self::Output> {
465        crate::boxed_stream((self.spawn)(&self.data, input))
466    }
467}