iced_futures/
subscription.rs

1//! Listen to external events in your application.
2mod tracker;
3
4pub use tracker::Tracker;
5
6use crate::core::event;
7use crate::core::window;
8use crate::futures::Stream;
9use crate::{BoxStream, MaybeSend};
10
11use std::any::TypeId;
12use std::hash::Hash;
13
14/// A subscription event.
15#[derive(Debug, Clone, PartialEq)]
16pub enum Event {
17    /// A user interacted with a user interface in a window.
18    Interaction {
19        /// The window holding the interface of the interaction.
20        window: window::Id,
21        /// The [`Event`] describing the interaction.
22        ///
23        /// [`Event`]: event::Event
24        event: event::Event,
25
26        /// The [`event::Status`] of the interaction.
27        status: event::Status,
28    },
29
30    /// A platform specific event.
31    PlatformSpecific(PlatformSpecific),
32}
33
34/// A platform specific event
35#[derive(Debug, Clone, PartialEq, Eq)]
36pub enum PlatformSpecific {
37    /// A MacOS specific event
38    MacOS(MacOS),
39}
40
41/// Describes an event specific to MacOS
42#[derive(Debug, Clone, PartialEq, Eq)]
43pub enum MacOS {
44    /// Triggered when the app receives an URL from the system
45    ///
46    /// _**Note:** For this event to be triggered, the executable needs to be properly [bundled]!_
47    ///
48    /// [bundled]: https://developer.apple.com/library/archive/documentation/CoreFoundation/Conceptual/CFBundles/BundleTypes/BundleTypes.html#//apple_ref/doc/uid/10000123i-CH101-SW19
49    ReceivedUrl(String),
50}
51
52/// A stream of runtime events.
53///
54/// It is the input of a [`Subscription`].
55pub type EventStream = BoxStream<Event>;
56
57/// The hasher used for identifying subscriptions.
58pub type Hasher = rustc_hash::FxHasher;
59
60/// A request to listen to external events.
61///
62/// Besides performing async actions on demand with `Task`, most
63/// applications also need to listen to external events passively.
64///
65/// A [`Subscription`] is normally provided to some runtime, like a `Task`,
66/// and it will generate events as long as the user keeps requesting it.
67///
68/// For instance, you can use a [`Subscription`] to listen to a `WebSocket`
69/// connection, keyboard presses, mouse events, time ticks, etc.
70///
71/// # The Lifetime of a [`Subscription`]
72/// Much like a [`Future`] or a [`Stream`], a [`Subscription`] does not produce any effects
73/// on its own. For a [`Subscription`] to run, it must be returned to the iced runtime—normally
74/// in the `subscription` function of an `application` or a `daemon`.
75///
76/// When a [`Subscription`] is provided to the runtime for the first time, the runtime will
77/// start running it asynchronously. Running a [`Subscription`] consists in building its underlying
78/// [`Stream`] and executing it in an async runtime.
79///
80/// Therefore, you can think of a [`Subscription`] as a "stream builder". It simply represents a way
81/// to build a certain [`Stream`] together with some way to _identify_ it.
82///
83/// Identification is important because when a specific [`Subscription`] stops being returned to the
84/// iced runtime, the runtime will kill its associated [`Stream`]. The runtime uses the identity of a
85/// [`Subscription`] to keep track of it.
86///
87/// This way, iced allows you to declaratively __subscribe__ to particular streams of data temporarily
88/// and whenever necessary.
89///
90/// ```
91/// # mod iced {
92/// #     pub mod time {
93/// #         pub use iced_futures::backend::default::time::every;
94/// #         pub use std::time::{Duration, Instant};
95/// #     }
96/// #
97/// #     pub use iced_futures::Subscription;
98/// # }
99/// use iced::time::{self, Duration, Instant};
100/// use iced::Subscription;
101///
102/// struct State {
103///     timer_enabled: bool,
104/// }
105///
106/// fn subscription(state: &State) -> Subscription<Instant> {
107///     if state.timer_enabled {
108///         time::every(Duration::from_secs(1))
109///     } else {
110///         Subscription::none()
111///     }
112/// }
113/// ```
114///
115/// [`Future`]: std::future::Future
116#[must_use = "`Subscription` must be returned to the runtime to take effect; normally in your `subscription` function."]
117pub struct Subscription<T> {
118    recipes: Vec<Box<dyn Recipe<Output = T>>>,
119}
120
121impl<T> Subscription<T> {
122    /// Returns an empty [`Subscription`] that will not produce any output.
123    pub fn none() -> Self {
124        Self {
125            recipes: Vec::new(),
126        }
127    }
128
129    /// Returns a [`Subscription`] that will call the given function to create and
130    /// asynchronously run the given [`Stream`].
131    ///
132    /// # Creating an asynchronous worker with bidirectional communication
133    /// You can leverage this helper to create a [`Subscription`] that spawns
134    /// an asynchronous worker in the background and establish a channel of
135    /// communication with an `iced` application.
136    ///
137    /// You can achieve this by creating an `mpsc` channel inside the closure
138    /// and returning the `Sender` as a `Message` for the `Application`:
139    ///
140    /// ```
141    /// # mod iced {
142    /// #     pub use iced_futures::Subscription;   
143    /// #     pub use iced_futures::futures;
144    /// #     pub use iced_futures::stream;
145    /// # }
146    /// use iced::futures::channel::mpsc;
147    /// use iced::futures::sink::SinkExt;
148    /// use iced::futures::Stream;
149    /// use iced::stream;
150    /// use iced::Subscription;
151    ///
152    /// pub enum Event {
153    ///     Ready(mpsc::Sender<Input>),
154    ///     WorkFinished,
155    ///     // ...
156    /// }
157    ///
158    /// enum Input {
159    ///     DoSomeWork,
160    ///     // ...
161    /// }
162    ///
163    /// fn some_worker() -> impl Stream<Item = Event> {
164    ///     stream::channel(100, async |mut output| {
165    ///         // Create channel
166    ///         let (sender, mut receiver) = mpsc::channel(100);
167    ///
168    ///         // Send the sender back to the application
169    ///         output.send(Event::Ready(sender)).await;
170    ///
171    ///         loop {
172    ///             use iced_futures::futures::StreamExt;
173    ///
174    ///             // Read next input sent from `Application`
175    ///             let input = receiver.select_next_some().await;
176    ///
177    ///             match input {
178    ///                 Input::DoSomeWork => {
179    ///                     // Do some async work...
180    ///
181    ///                     // Finally, we can optionally produce a message to tell the
182    ///                     // `Application` the work is done
183    ///                     output.send(Event::WorkFinished).await;
184    ///                 }
185    ///             }
186    ///         }
187    ///     })
188    /// }
189    ///
190    /// fn subscription() -> Subscription<Event> {
191    ///     Subscription::run(some_worker)
192    /// }
193    /// ```
194    ///
195    /// Check out the [`websocket`] example, which showcases this pattern to maintain a `WebSocket`
196    /// connection open.
197    ///
198    /// [`websocket`]: https://github.com/iced-rs/iced/tree/0.13/examples/websocket
199    pub fn run<S>(builder: fn() -> S) -> Self
200    where
201        S: Stream<Item = T> + MaybeSend + 'static,
202        T: 'static,
203    {
204        from_recipe(Runner {
205            data: builder,
206            spawn: |builder, _| builder(),
207        })
208    }
209
210    /// Returns a [`Subscription`] that will create and asynchronously run the
211    /// given [`Stream`].
212    ///
213    /// Both the `data` and the function pointer will be used to uniquely identify
214    /// the [`Subscription`].
215    pub fn run_with<D, S>(data: D, builder: fn(&D) -> S) -> Self
216    where
217        D: Hash + 'static,
218        S: Stream<Item = T> + MaybeSend + 'static,
219        T: 'static,
220    {
221        from_recipe(Runner {
222            data: (data, builder),
223            spawn: |(data, builder), _| builder(data),
224        })
225    }
226
227    /// Batches all the provided subscriptions and returns the resulting
228    /// [`Subscription`].
229    pub fn batch(
230        subscriptions: impl IntoIterator<Item = Subscription<T>>,
231    ) -> Self {
232        Self {
233            recipes: subscriptions
234                .into_iter()
235                .flat_map(|subscription| subscription.recipes)
236                .collect(),
237        }
238    }
239
240    /// Adds a value to the [`Subscription`] context.
241    ///
242    /// The value will be part of the identity of a [`Subscription`].
243    pub fn with<A>(mut self, value: A) -> Subscription<(A, T)>
244    where
245        T: 'static,
246        A: std::hash::Hash + Clone + Send + Sync + 'static,
247    {
248        Subscription {
249            recipes: self
250                .recipes
251                .drain(..)
252                .map(|recipe| {
253                    Box::new(With::new(recipe, value.clone()))
254                        as Box<dyn Recipe<Output = (A, T)>>
255                })
256                .collect(),
257        }
258    }
259
260    /// Transforms the [`Subscription`] output with the given function.
261    ///
262    /// # Panics
263    /// The closure provided must be a non-capturing closure. The method
264    /// will panic in debug mode otherwise.
265    pub fn map<F, A>(mut self, f: F) -> Subscription<A>
266    where
267        T: 'static,
268        F: Fn(T) -> A + MaybeSend + Clone + 'static,
269        A: 'static,
270    {
271        debug_assert!(
272            std::mem::size_of::<F>() == 0,
273            "the closure {} provided in `Subscription::map` is capturing",
274            std::any::type_name::<F>(),
275        );
276
277        Subscription {
278            recipes: self
279                .recipes
280                .drain(..)
281                .map(move |recipe| {
282                    Box::new(Map::new(recipe, f.clone()))
283                        as Box<dyn Recipe<Output = A>>
284                })
285                .collect(),
286        }
287    }
288}
289
290/// Creates a [`Subscription`] from a [`Recipe`] describing it.
291pub fn from_recipe<T>(
292    recipe: impl Recipe<Output = T> + 'static,
293) -> Subscription<T> {
294    Subscription {
295        recipes: vec![Box::new(recipe)],
296    }
297}
298
299/// Returns the different recipes of the [`Subscription`].
300pub fn into_recipes<T>(
301    subscription: Subscription<T>,
302) -> Vec<Box<dyn Recipe<Output = T>>> {
303    subscription.recipes
304}
305
306impl<T> std::fmt::Debug for Subscription<T> {
307    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
308        f.debug_struct("Subscription").finish()
309    }
310}
311
312/// The description of a [`Subscription`].
313///
314/// A [`Recipe`] is the internal definition of a [`Subscription`]. It is used
315/// by runtimes to run and identify subscriptions. You can use it to create your
316/// own!
317///
318/// # Examples
319/// The repository has a couple of [examples] that use a custom [`Recipe`]:
320///
321/// - [`download_progress`], a basic application that asynchronously downloads
322///   a dummy file of 100 MB and tracks the download progress.
323/// - [`stopwatch`], a watch with start/stop and reset buttons showcasing how
324///   to listen to time.
325///
326/// [examples]: https://github.com/iced-rs/iced/tree/0.13/examples
327/// [`download_progress`]: https://github.com/iced-rs/iced/tree/0.13/examples/download_progress
328/// [`stopwatch`]: https://github.com/iced-rs/iced/tree/0.13/examples/stopwatch
329pub trait Recipe {
330    /// The events that will be produced by a [`Subscription`] with this
331    /// [`Recipe`].
332    type Output;
333
334    /// Hashes the [`Recipe`].
335    ///
336    /// This is used by runtimes to uniquely identify a [`Subscription`].
337    fn hash(&self, state: &mut Hasher);
338
339    /// Executes the [`Recipe`] and produces the stream of events of its
340    /// [`Subscription`].
341    fn stream(self: Box<Self>, input: EventStream) -> BoxStream<Self::Output>;
342}
343
344struct Map<A, B, F>
345where
346    F: Fn(A) -> B + 'static,
347{
348    recipe: Box<dyn Recipe<Output = A>>,
349    mapper: F,
350}
351
352impl<A, B, F> Map<A, B, F>
353where
354    F: Fn(A) -> B + 'static,
355{
356    fn new(recipe: Box<dyn Recipe<Output = A>>, mapper: F) -> Self {
357        Map { recipe, mapper }
358    }
359}
360
361impl<A, B, F> Recipe for Map<A, B, F>
362where
363    A: 'static,
364    B: 'static,
365    F: Fn(A) -> B + 'static + MaybeSend,
366{
367    type Output = B;
368
369    fn hash(&self, state: &mut Hasher) {
370        TypeId::of::<F>().hash(state);
371        self.recipe.hash(state);
372    }
373
374    fn stream(self: Box<Self>, input: EventStream) -> BoxStream<Self::Output> {
375        use futures::StreamExt;
376
377        let mapper = self.mapper;
378
379        Box::pin(self.recipe.stream(input).map(mapper))
380    }
381}
382
383struct With<A, B> {
384    recipe: Box<dyn Recipe<Output = A>>,
385    value: B,
386}
387
388impl<A, B> With<A, B> {
389    fn new(recipe: Box<dyn Recipe<Output = A>>, value: B) -> Self {
390        With { recipe, value }
391    }
392}
393
394impl<A, B> Recipe for With<A, B>
395where
396    A: 'static,
397    B: 'static + std::hash::Hash + Clone + Send + Sync,
398{
399    type Output = (B, A);
400
401    fn hash(&self, state: &mut Hasher) {
402        std::any::TypeId::of::<B>().hash(state);
403        self.value.hash(state);
404        self.recipe.hash(state);
405    }
406
407    fn stream(self: Box<Self>, input: EventStream) -> BoxStream<Self::Output> {
408        use futures::StreamExt;
409
410        let value = self.value;
411
412        Box::pin(
413            self.recipe
414                .stream(input)
415                .map(move |element| (value.clone(), element)),
416        )
417    }
418}
419
420pub(crate) fn filter_map<I, F, T>(id: I, f: F) -> Subscription<T>
421where
422    I: Hash + 'static,
423    F: Fn(Event) -> Option<T> + MaybeSend + 'static,
424    T: 'static + MaybeSend,
425{
426    from_recipe(Runner {
427        data: id,
428        spawn: |_, events| {
429            use futures::future;
430            use futures::stream::StreamExt;
431
432            events.filter_map(move |event| future::ready(f(event)))
433        },
434    })
435}
436
437struct Runner<I, F, S, T>
438where
439    F: FnOnce(&I, EventStream) -> S,
440    S: Stream<Item = T>,
441{
442    data: I,
443    spawn: F,
444}
445
446impl<I, F, S, T> Recipe for Runner<I, F, S, T>
447where
448    I: Hash + 'static,
449    F: FnOnce(&I, EventStream) -> S,
450    S: Stream<Item = T> + MaybeSend + 'static,
451{
452    type Output = T;
453
454    fn hash(&self, state: &mut Hasher) {
455        std::any::TypeId::of::<I>().hash(state);
456        self.data.hash(state);
457    }
458
459    fn stream(self: Box<Self>, input: EventStream) -> BoxStream<Self::Output> {
460        crate::boxed_stream((self.spawn)(&self.data, input))
461    }
462}