iced_futures/
subscription.rs

1//! Listen to external events in your application.
2mod tracker;
3
4pub use tracker::Tracker;
5
6use crate::core::event;
7use crate::core::theme;
8use crate::core::window;
9use crate::futures::Stream;
10use crate::{BoxStream, MaybeSend};
11
12use std::any::TypeId;
13use std::hash::Hash;
14
15/// A subscription event.
16#[derive(Debug, Clone, PartialEq)]
17pub enum Event {
18    /// A user interacted with a user interface in a window.
19    Interaction {
20        /// The window holding the interface of the interaction.
21        window: window::Id,
22        /// The [`Event`] describing the interaction.
23        ///
24        /// [`Event`]: event::Event
25        event: event::Event,
26
27        /// The [`event::Status`] of the interaction.
28        status: event::Status,
29    },
30
31    /// The system theme has changed.
32    SystemThemeChanged(theme::Mode),
33
34    /// A platform specific event.
35    PlatformSpecific(PlatformSpecific),
36}
37
38/// A platform specific event
39#[derive(Debug, Clone, PartialEq, Eq)]
40pub enum PlatformSpecific {
41    /// A MacOS specific event
42    MacOS(MacOS),
43}
44
45/// Describes an event specific to MacOS
46#[derive(Debug, Clone, PartialEq, Eq)]
47pub enum MacOS {
48    /// Triggered when the app receives an URL from the system
49    ///
50    /// _**Note:** For this event to be triggered, the executable needs to be properly [bundled]!_
51    ///
52    /// [bundled]: https://developer.apple.com/library/archive/documentation/CoreFoundation/Conceptual/CFBundles/BundleTypes/BundleTypes.html#//apple_ref/doc/uid/10000123i-CH101-SW19
53    ReceivedUrl(String),
54}
55
56/// A stream of runtime events.
57///
58/// It is the input of a [`Subscription`].
59pub type EventStream = BoxStream<Event>;
60
61/// The hasher used for identifying subscriptions.
62pub type Hasher = rustc_hash::FxHasher;
63
64/// A request to listen to external events.
65///
66/// Besides performing async actions on demand with `Task`, most
67/// applications also need to listen to external events passively.
68///
69/// A [`Subscription`] is normally provided to some runtime, like a `Task`,
70/// and it will generate events as long as the user keeps requesting it.
71///
72/// For instance, you can use a [`Subscription`] to listen to a `WebSocket`
73/// connection, keyboard presses, mouse events, time ticks, etc.
74///
75/// # The Lifetime of a [`Subscription`]
76/// Much like a [`Future`] or a [`Stream`], a [`Subscription`] does not produce any effects
77/// on its own. For a [`Subscription`] to run, it must be returned to the iced runtime—normally
78/// in the `subscription` function of an `application` or a `daemon`.
79///
80/// When a [`Subscription`] is provided to the runtime for the first time, the runtime will
81/// start running it asynchronously. Running a [`Subscription`] consists in building its underlying
82/// [`Stream`] and executing it in an async runtime.
83///
84/// Therefore, you can think of a [`Subscription`] as a "stream builder". It simply represents a way
85/// to build a certain [`Stream`] together with some way to _identify_ it.
86///
87/// Identification is important because when a specific [`Subscription`] stops being returned to the
88/// iced runtime, the runtime will kill its associated [`Stream`]. The runtime uses the identity of a
89/// [`Subscription`] to keep track of it.
90///
91/// This way, iced allows you to declaratively __subscribe__ to particular streams of data temporarily
92/// and whenever necessary.
93///
94/// ```
95/// # mod iced {
96/// #     pub mod time {
97/// #         pub use iced_futures::backend::default::time::every;
98/// #         pub use std::time::{Duration, Instant};
99/// #     }
100/// #
101/// #     pub use iced_futures::Subscription;
102/// # }
103/// use iced::time::{self, Duration, Instant};
104/// use iced::Subscription;
105///
106/// struct State {
107///     timer_enabled: bool,
108/// }
109///
110/// fn subscription(state: &State) -> Subscription<Instant> {
111///     if state.timer_enabled {
112///         time::every(Duration::from_secs(1))
113///     } else {
114///         Subscription::none()
115///     }
116/// }
117/// ```
118///
119/// [`Future`]: std::future::Future
120#[must_use = "`Subscription` must be returned to the runtime to take effect; normally in your `subscription` function."]
121pub struct Subscription<T> {
122    recipes: Vec<Box<dyn Recipe<Output = T>>>,
123}
124
125impl<T> Subscription<T> {
126    /// Returns an empty [`Subscription`] that will not produce any output.
127    pub fn none() -> Self {
128        Self {
129            recipes: Vec::new(),
130        }
131    }
132
133    /// Returns a [`Subscription`] that will call the given function to create and
134    /// asynchronously run the given [`Stream`].
135    ///
136    /// # Creating an asynchronous worker with bidirectional communication
137    /// You can leverage this helper to create a [`Subscription`] that spawns
138    /// an asynchronous worker in the background and establish a channel of
139    /// communication with an `iced` application.
140    ///
141    /// You can achieve this by creating an `mpsc` channel inside the closure
142    /// and returning the `Sender` as a `Message` for the `Application`:
143    ///
144    /// ```
145    /// # mod iced {
146    /// #     pub use iced_futures::Subscription;   
147    /// #     pub use iced_futures::futures;
148    /// #     pub use iced_futures::stream;
149    /// # }
150    /// use iced::futures::channel::mpsc;
151    /// use iced::futures::sink::SinkExt;
152    /// use iced::futures::Stream;
153    /// use iced::stream;
154    /// use iced::Subscription;
155    ///
156    /// pub enum Event {
157    ///     Ready(mpsc::Sender<Input>),
158    ///     WorkFinished,
159    ///     // ...
160    /// }
161    ///
162    /// enum Input {
163    ///     DoSomeWork,
164    ///     // ...
165    /// }
166    ///
167    /// fn some_worker() -> impl Stream<Item = Event> {
168    ///     stream::channel(100, async |mut output| {
169    ///         // Create channel
170    ///         let (sender, mut receiver) = mpsc::channel(100);
171    ///
172    ///         // Send the sender back to the application
173    ///         output.send(Event::Ready(sender)).await;
174    ///
175    ///         loop {
176    ///             use iced_futures::futures::StreamExt;
177    ///
178    ///             // Read next input sent from `Application`
179    ///             let input = receiver.select_next_some().await;
180    ///
181    ///             match input {
182    ///                 Input::DoSomeWork => {
183    ///                     // Do some async work...
184    ///
185    ///                     // Finally, we can optionally produce a message to tell the
186    ///                     // `Application` the work is done
187    ///                     output.send(Event::WorkFinished).await;
188    ///                 }
189    ///             }
190    ///         }
191    ///     })
192    /// }
193    ///
194    /// fn subscription() -> Subscription<Event> {
195    ///     Subscription::run(some_worker)
196    /// }
197    /// ```
198    ///
199    /// Check out the [`websocket`] example, which showcases this pattern to maintain a `WebSocket`
200    /// connection open.
201    ///
202    /// [`websocket`]: https://github.com/iced-rs/iced/tree/master/examples/websocket
203    pub fn run<S>(builder: fn() -> S) -> Self
204    where
205        S: Stream<Item = T> + MaybeSend + 'static,
206        T: 'static,
207    {
208        from_recipe(Runner {
209            data: builder,
210            spawn: |builder, _| builder(),
211        })
212    }
213
214    /// Returns a [`Subscription`] that will create and asynchronously run the
215    /// given [`Stream`].
216    ///
217    /// Both the `data` and the function pointer will be used to uniquely identify
218    /// the [`Subscription`].
219    pub fn run_with<D, S>(data: D, builder: fn(&D) -> S) -> Self
220    where
221        D: Hash + 'static,
222        S: Stream<Item = T> + MaybeSend + 'static,
223        T: 'static,
224    {
225        from_recipe(Runner {
226            data: (data, builder),
227            spawn: |(data, builder), _| builder(data),
228        })
229    }
230
231    /// Batches all the provided subscriptions and returns the resulting
232    /// [`Subscription`].
233    pub fn batch(
234        subscriptions: impl IntoIterator<Item = Subscription<T>>,
235    ) -> Self {
236        Self {
237            recipes: subscriptions
238                .into_iter()
239                .flat_map(|subscription| subscription.recipes)
240                .collect(),
241        }
242    }
243
244    /// Adds a value to the [`Subscription`] context.
245    ///
246    /// The value will be part of the identity of a [`Subscription`].
247    pub fn with<A>(self, value: A) -> Subscription<(A, T)>
248    where
249        T: 'static,
250        A: std::hash::Hash + Clone + Send + Sync + 'static,
251    {
252        struct With<A, B> {
253            recipe: Box<dyn Recipe<Output = A>>,
254            value: B,
255        }
256
257        impl<A, B> Recipe for With<A, B>
258        where
259            A: 'static,
260            B: 'static + std::hash::Hash + Clone + Send + Sync,
261        {
262            type Output = (B, A);
263
264            fn hash(&self, state: &mut Hasher) {
265                std::any::TypeId::of::<B>().hash(state);
266                self.value.hash(state);
267                self.recipe.hash(state);
268            }
269
270            fn stream(
271                self: Box<Self>,
272                input: EventStream,
273            ) -> BoxStream<Self::Output> {
274                use futures::StreamExt;
275
276                let value = self.value;
277
278                Box::pin(
279                    self.recipe
280                        .stream(input)
281                        .map(move |element| (value.clone(), element)),
282                )
283            }
284        }
285
286        Subscription {
287            recipes: self
288                .recipes
289                .into_iter()
290                .map(|recipe| {
291                    Box::new(With {
292                        recipe,
293                        value: value.clone(),
294                    }) as Box<dyn Recipe<Output = (A, T)>>
295                })
296                .collect(),
297        }
298    }
299
300    /// Transforms the [`Subscription`] output with the given function.
301    ///
302    /// The closure provided must be a non-capturing closure.
303    pub fn map<F, A>(self, f: F) -> Subscription<A>
304    where
305        T: 'static,
306        F: Fn(T) -> A + MaybeSend + Clone + 'static,
307        A: 'static,
308    {
309        const {
310            check_zero_sized::<F>();
311        }
312
313        struct Map<A, B, F>
314        where
315            F: Fn(A) -> B + 'static,
316        {
317            recipe: Box<dyn Recipe<Output = A>>,
318            mapper: F,
319        }
320
321        impl<A, B, F> Recipe for Map<A, B, F>
322        where
323            A: 'static,
324            B: 'static,
325            F: Fn(A) -> B + 'static + MaybeSend,
326        {
327            type Output = B;
328
329            fn hash(&self, state: &mut Hasher) {
330                TypeId::of::<F>().hash(state);
331                self.recipe.hash(state);
332            }
333
334            fn stream(
335                self: Box<Self>,
336                input: EventStream,
337            ) -> BoxStream<Self::Output> {
338                use futures::StreamExt;
339
340                Box::pin(self.recipe.stream(input).map(self.mapper))
341            }
342        }
343
344        Subscription {
345            recipes: self
346                .recipes
347                .into_iter()
348                .map(|recipe| {
349                    Box::new(Map {
350                        recipe,
351                        mapper: f.clone(),
352                    }) as Box<dyn Recipe<Output = A>>
353                })
354                .collect(),
355        }
356    }
357
358    /// Transforms the [`Subscription`] output with the given function, yielding only
359    /// values only when the function returns `Some(A)`.
360    ///
361    /// The closure provided must be a non-capturing closure.
362    pub fn filter_map<F, A>(mut self, f: F) -> Subscription<A>
363    where
364        T: MaybeSend + 'static,
365        F: Fn(T) -> Option<A> + MaybeSend + Clone + 'static,
366        A: MaybeSend + 'static,
367    {
368        const {
369            check_zero_sized::<F>();
370        }
371
372        struct FilterMap<A, B, F>
373        where
374            F: Fn(A) -> Option<B> + 'static,
375        {
376            recipe: Box<dyn Recipe<Output = A>>,
377            mapper: F,
378        }
379
380        impl<A, B, F> Recipe for FilterMap<A, B, F>
381        where
382            A: 'static,
383            B: 'static + MaybeSend,
384            F: Fn(A) -> Option<B> + MaybeSend,
385        {
386            type Output = B;
387
388            fn hash(&self, state: &mut Hasher) {
389                TypeId::of::<F>().hash(state);
390                self.recipe.hash(state);
391            }
392
393            fn stream(
394                self: Box<Self>,
395                input: EventStream,
396            ) -> BoxStream<Self::Output> {
397                use futures::StreamExt;
398                use futures::future;
399
400                let mapper = self.mapper;
401
402                Box::pin(
403                    self.recipe
404                        .stream(input)
405                        .filter_map(move |a| future::ready(mapper(a))),
406                )
407            }
408        }
409
410        Subscription {
411            recipes: self
412                .recipes
413                .drain(..)
414                .map(|recipe| {
415                    Box::new(FilterMap {
416                        recipe,
417                        mapper: f.clone(),
418                    }) as Box<dyn Recipe<Output = A>>
419                })
420                .collect(),
421        }
422    }
423
424    /// Returns the amount of recipe units in this [`Subscription`].
425    pub fn units(&self) -> usize {
426        self.recipes.len()
427    }
428}
429
430/// Creates a [`Subscription`] from a [`Recipe`] describing it.
431pub fn from_recipe<T>(
432    recipe: impl Recipe<Output = T> + 'static,
433) -> Subscription<T> {
434    Subscription {
435        recipes: vec![Box::new(recipe)],
436    }
437}
438
439/// Returns the different recipes of the [`Subscription`].
440pub fn into_recipes<T>(
441    subscription: Subscription<T>,
442) -> Vec<Box<dyn Recipe<Output = T>>> {
443    subscription.recipes
444}
445
446impl<T> std::fmt::Debug for Subscription<T> {
447    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
448        f.debug_struct("Subscription").finish()
449    }
450}
451
452/// The description of a [`Subscription`].
453///
454/// A [`Recipe`] is the internal definition of a [`Subscription`]. It is used
455/// by runtimes to run and identify subscriptions. You can use it to create your
456/// own!
457pub trait Recipe {
458    /// The events that will be produced by a [`Subscription`] with this
459    /// [`Recipe`].
460    type Output;
461
462    /// Hashes the [`Recipe`].
463    ///
464    /// This is used by runtimes to uniquely identify a [`Subscription`].
465    fn hash(&self, state: &mut Hasher);
466
467    /// Executes the [`Recipe`] and produces the stream of events of its
468    /// [`Subscription`].
469    fn stream(self: Box<Self>, input: EventStream) -> BoxStream<Self::Output>;
470}
471
472/// Creates a [`Subscription`] from a hashable id and a filter function.
473pub fn filter_map<I, F, T>(id: I, f: F) -> Subscription<T>
474where
475    I: Hash + 'static,
476    F: Fn(Event) -> Option<T> + MaybeSend + 'static,
477    T: 'static + MaybeSend,
478{
479    from_recipe(Runner {
480        data: id,
481        spawn: |_, events| {
482            use futures::future;
483            use futures::stream::StreamExt;
484
485            events.filter_map(move |event| future::ready(f(event)))
486        },
487    })
488}
489
490struct Runner<I, F, S, T>
491where
492    F: FnOnce(&I, EventStream) -> S,
493    S: Stream<Item = T>,
494{
495    data: I,
496    spawn: F,
497}
498
499impl<I, F, S, T> Recipe for Runner<I, F, S, T>
500where
501    I: Hash + 'static,
502    F: FnOnce(&I, EventStream) -> S,
503    S: Stream<Item = T> + MaybeSend + 'static,
504{
505    type Output = T;
506
507    fn hash(&self, state: &mut Hasher) {
508        std::any::TypeId::of::<I>().hash(state);
509        self.data.hash(state);
510    }
511
512    fn stream(self: Box<Self>, input: EventStream) -> BoxStream<Self::Output> {
513        crate::boxed_stream((self.spawn)(&self.data, input))
514    }
515}
516
517const fn check_zero_sized<T>() {
518    if std::mem::size_of::<T>() != 0 {
519        panic!(
520            "The Subscription closure provided is not non-capturing. \
521            Closures given to Subscription::map or filter_map cannot \
522            capture external variables. If you need to capture state, \
523            consider using Subscription::with."
524        );
525    }
526}