iced_futures/subscription.rs
1//! Listen to external events in your application.
2mod tracker;
3
4pub use tracker::Tracker;
5
6use crate::core::event;
7use crate::core::window;
8use crate::futures::Stream;
9use crate::{BoxStream, MaybeSend};
10
11use std::any::TypeId;
12use std::hash::Hash;
13
14/// A subscription event.
15#[derive(Debug, Clone, PartialEq)]
16pub enum Event {
17 /// A user interacted with a user interface in a window.
18 Interaction {
19 /// The window holding the interface of the interaction.
20 window: window::Id,
21 /// The [`Event`] describing the interaction.
22 ///
23 /// [`Event`]: event::Event
24 event: event::Event,
25
26 /// The [`event::Status`] of the interaction.
27 status: event::Status,
28 },
29
30 /// A platform specific event.
31 PlatformSpecific(PlatformSpecific),
32}
33
34/// A platform specific event
35#[derive(Debug, Clone, PartialEq, Eq)]
36pub enum PlatformSpecific {
37 /// A MacOS specific event
38 MacOS(MacOS),
39}
40
41/// Describes an event specific to MacOS
42#[derive(Debug, Clone, PartialEq, Eq)]
43pub enum MacOS {
44 /// Triggered when the app receives an URL from the system
45 ///
46 /// _**Note:** For this event to be triggered, the executable needs to be properly [bundled]!_
47 ///
48 /// [bundled]: https://developer.apple.com/library/archive/documentation/CoreFoundation/Conceptual/CFBundles/BundleTypes/BundleTypes.html#//apple_ref/doc/uid/10000123i-CH101-SW19
49 ReceivedUrl(String),
50}
51
52/// A stream of runtime events.
53///
54/// It is the input of a [`Subscription`].
55pub type EventStream = BoxStream<Event>;
56
57/// The hasher used for identifying subscriptions.
58pub type Hasher = rustc_hash::FxHasher;
59
60/// A request to listen to external events.
61///
62/// Besides performing async actions on demand with `Task`, most
63/// applications also need to listen to external events passively.
64///
65/// A [`Subscription`] is normally provided to some runtime, like a `Task`,
66/// and it will generate events as long as the user keeps requesting it.
67///
68/// For instance, you can use a [`Subscription`] to listen to a `WebSocket`
69/// connection, keyboard presses, mouse events, time ticks, etc.
70///
71/// # The Lifetime of a [`Subscription`]
72/// Much like a [`Future`] or a [`Stream`], a [`Subscription`] does not produce any effects
73/// on its own. For a [`Subscription`] to run, it must be returned to the iced runtime—normally
74/// in the `subscription` function of an `application` or a `daemon`.
75///
76/// When a [`Subscription`] is provided to the runtime for the first time, the runtime will
77/// start running it asynchronously. Running a [`Subscription`] consists in building its underlying
78/// [`Stream`] and executing it in an async runtime.
79///
80/// Therefore, you can think of a [`Subscription`] as a "stream builder". It simply represents a way
81/// to build a certain [`Stream`] together with some way to _identify_ it.
82///
83/// Identification is important because when a specific [`Subscription`] stops being returned to the
84/// iced runtime, the runtime will kill its associated [`Stream`]. The runtime uses the identity of a
85/// [`Subscription`] to keep track of it.
86///
87/// This way, iced allows you to declaratively __subscribe__ to particular streams of data temporarily
88/// and whenever necessary.
89///
90/// ```
91/// # mod iced {
92/// # pub mod time {
93/// # pub use iced_futures::backend::default::time::every;
94/// # pub use std::time::{Duration, Instant};
95/// # }
96/// #
97/// # pub use iced_futures::Subscription;
98/// # }
99/// use iced::time::{self, Duration, Instant};
100/// use iced::Subscription;
101///
102/// struct State {
103/// timer_enabled: bool,
104/// }
105///
106/// fn subscription(state: &State) -> Subscription<Instant> {
107/// if state.timer_enabled {
108/// time::every(Duration::from_secs(1))
109/// } else {
110/// Subscription::none()
111/// }
112/// }
113/// ```
114///
115/// [`Future`]: std::future::Future
116#[must_use = "`Subscription` must be returned to the runtime to take effect; normally in your `subscription` function."]
117pub struct Subscription<T> {
118 recipes: Vec<Box<dyn Recipe<Output = T>>>,
119}
120
121impl<T> Subscription<T> {
122 /// Returns an empty [`Subscription`] that will not produce any output.
123 pub fn none() -> Self {
124 Self {
125 recipes: Vec::new(),
126 }
127 }
128
129 /// Returns a [`Subscription`] that will call the given function to create and
130 /// asynchronously run the given [`Stream`].
131 ///
132 /// # Creating an asynchronous worker with bidirectional communication
133 /// You can leverage this helper to create a [`Subscription`] that spawns
134 /// an asynchronous worker in the background and establish a channel of
135 /// communication with an `iced` application.
136 ///
137 /// You can achieve this by creating an `mpsc` channel inside the closure
138 /// and returning the `Sender` as a `Message` for the `Application`:
139 ///
140 /// ```
141 /// # mod iced {
142 /// # pub use iced_futures::Subscription;
143 /// # pub use iced_futures::futures;
144 /// # pub use iced_futures::stream;
145 /// # }
146 /// use iced::futures::channel::mpsc;
147 /// use iced::futures::sink::SinkExt;
148 /// use iced::futures::Stream;
149 /// use iced::stream;
150 /// use iced::Subscription;
151 ///
152 /// pub enum Event {
153 /// Ready(mpsc::Sender<Input>),
154 /// WorkFinished,
155 /// // ...
156 /// }
157 ///
158 /// enum Input {
159 /// DoSomeWork,
160 /// // ...
161 /// }
162 ///
163 /// fn some_worker() -> impl Stream<Item = Event> {
164 /// stream::channel(100, async |mut output| {
165 /// // Create channel
166 /// let (sender, mut receiver) = mpsc::channel(100);
167 ///
168 /// // Send the sender back to the application
169 /// output.send(Event::Ready(sender)).await;
170 ///
171 /// loop {
172 /// use iced_futures::futures::StreamExt;
173 ///
174 /// // Read next input sent from `Application`
175 /// let input = receiver.select_next_some().await;
176 ///
177 /// match input {
178 /// Input::DoSomeWork => {
179 /// // Do some async work...
180 ///
181 /// // Finally, we can optionally produce a message to tell the
182 /// // `Application` the work is done
183 /// output.send(Event::WorkFinished).await;
184 /// }
185 /// }
186 /// }
187 /// })
188 /// }
189 ///
190 /// fn subscription() -> Subscription<Event> {
191 /// Subscription::run(some_worker)
192 /// }
193 /// ```
194 ///
195 /// Check out the [`websocket`] example, which showcases this pattern to maintain a `WebSocket`
196 /// connection open.
197 ///
198 /// [`websocket`]: https://github.com/iced-rs/iced/tree/0.13/examples/websocket
199 pub fn run<S>(builder: fn() -> S) -> Self
200 where
201 S: Stream<Item = T> + MaybeSend + 'static,
202 T: 'static,
203 {
204 from_recipe(Runner {
205 data: builder,
206 spawn: |builder, _| builder(),
207 })
208 }
209
210 /// Returns a [`Subscription`] that will create and asynchronously run the
211 /// given [`Stream`].
212 ///
213 /// Both the `data` and the function pointer will be used to uniquely identify
214 /// the [`Subscription`].
215 pub fn run_with<D, S>(data: D, builder: fn(&D) -> S) -> Self
216 where
217 D: Hash + 'static,
218 S: Stream<Item = T> + MaybeSend + 'static,
219 T: 'static,
220 {
221 from_recipe(Runner {
222 data: (data, builder),
223 spawn: |(data, builder), _| builder(data),
224 })
225 }
226
227 /// Batches all the provided subscriptions and returns the resulting
228 /// [`Subscription`].
229 pub fn batch(
230 subscriptions: impl IntoIterator<Item = Subscription<T>>,
231 ) -> Self {
232 Self {
233 recipes: subscriptions
234 .into_iter()
235 .flat_map(|subscription| subscription.recipes)
236 .collect(),
237 }
238 }
239
240 /// Adds a value to the [`Subscription`] context.
241 ///
242 /// The value will be part of the identity of a [`Subscription`].
243 pub fn with<A>(mut self, value: A) -> Subscription<(A, T)>
244 where
245 T: 'static,
246 A: std::hash::Hash + Clone + Send + Sync + 'static,
247 {
248 Subscription {
249 recipes: self
250 .recipes
251 .drain(..)
252 .map(|recipe| {
253 Box::new(With::new(recipe, value.clone()))
254 as Box<dyn Recipe<Output = (A, T)>>
255 })
256 .collect(),
257 }
258 }
259
260 /// Transforms the [`Subscription`] output with the given function.
261 ///
262 /// # Panics
263 /// The closure provided must be a non-capturing closure. The method
264 /// will panic in debug mode otherwise.
265 pub fn map<F, A>(mut self, f: F) -> Subscription<A>
266 where
267 T: 'static,
268 F: Fn(T) -> A + MaybeSend + Clone + 'static,
269 A: 'static,
270 {
271 debug_assert!(
272 std::mem::size_of::<F>() == 0,
273 "the closure {} provided in `Subscription::map` is capturing",
274 std::any::type_name::<F>(),
275 );
276
277 Subscription {
278 recipes: self
279 .recipes
280 .drain(..)
281 .map(move |recipe| {
282 Box::new(Map::new(recipe, f.clone()))
283 as Box<dyn Recipe<Output = A>>
284 })
285 .collect(),
286 }
287 }
288}
289
290/// Creates a [`Subscription`] from a [`Recipe`] describing it.
291pub fn from_recipe<T>(
292 recipe: impl Recipe<Output = T> + 'static,
293) -> Subscription<T> {
294 Subscription {
295 recipes: vec![Box::new(recipe)],
296 }
297}
298
299/// Returns the different recipes of the [`Subscription`].
300pub fn into_recipes<T>(
301 subscription: Subscription<T>,
302) -> Vec<Box<dyn Recipe<Output = T>>> {
303 subscription.recipes
304}
305
306impl<T> std::fmt::Debug for Subscription<T> {
307 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
308 f.debug_struct("Subscription").finish()
309 }
310}
311
312/// The description of a [`Subscription`].
313///
314/// A [`Recipe`] is the internal definition of a [`Subscription`]. It is used
315/// by runtimes to run and identify subscriptions. You can use it to create your
316/// own!
317///
318/// # Examples
319/// The repository has a couple of [examples] that use a custom [`Recipe`]:
320///
321/// - [`download_progress`], a basic application that asynchronously downloads
322/// a dummy file of 100 MB and tracks the download progress.
323/// - [`stopwatch`], a watch with start/stop and reset buttons showcasing how
324/// to listen to time.
325///
326/// [examples]: https://github.com/iced-rs/iced/tree/0.13/examples
327/// [`download_progress`]: https://github.com/iced-rs/iced/tree/0.13/examples/download_progress
328/// [`stopwatch`]: https://github.com/iced-rs/iced/tree/0.13/examples/stopwatch
329pub trait Recipe {
330 /// The events that will be produced by a [`Subscription`] with this
331 /// [`Recipe`].
332 type Output;
333
334 /// Hashes the [`Recipe`].
335 ///
336 /// This is used by runtimes to uniquely identify a [`Subscription`].
337 fn hash(&self, state: &mut Hasher);
338
339 /// Executes the [`Recipe`] and produces the stream of events of its
340 /// [`Subscription`].
341 fn stream(self: Box<Self>, input: EventStream) -> BoxStream<Self::Output>;
342}
343
344struct Map<A, B, F>
345where
346 F: Fn(A) -> B + 'static,
347{
348 recipe: Box<dyn Recipe<Output = A>>,
349 mapper: F,
350}
351
352impl<A, B, F> Map<A, B, F>
353where
354 F: Fn(A) -> B + 'static,
355{
356 fn new(recipe: Box<dyn Recipe<Output = A>>, mapper: F) -> Self {
357 Map { recipe, mapper }
358 }
359}
360
361impl<A, B, F> Recipe for Map<A, B, F>
362where
363 A: 'static,
364 B: 'static,
365 F: Fn(A) -> B + 'static + MaybeSend,
366{
367 type Output = B;
368
369 fn hash(&self, state: &mut Hasher) {
370 TypeId::of::<F>().hash(state);
371 self.recipe.hash(state);
372 }
373
374 fn stream(self: Box<Self>, input: EventStream) -> BoxStream<Self::Output> {
375 use futures::StreamExt;
376
377 let mapper = self.mapper;
378
379 Box::pin(self.recipe.stream(input).map(mapper))
380 }
381}
382
383struct With<A, B> {
384 recipe: Box<dyn Recipe<Output = A>>,
385 value: B,
386}
387
388impl<A, B> With<A, B> {
389 fn new(recipe: Box<dyn Recipe<Output = A>>, value: B) -> Self {
390 With { recipe, value }
391 }
392}
393
394impl<A, B> Recipe for With<A, B>
395where
396 A: 'static,
397 B: 'static + std::hash::Hash + Clone + Send + Sync,
398{
399 type Output = (B, A);
400
401 fn hash(&self, state: &mut Hasher) {
402 std::any::TypeId::of::<B>().hash(state);
403 self.value.hash(state);
404 self.recipe.hash(state);
405 }
406
407 fn stream(self: Box<Self>, input: EventStream) -> BoxStream<Self::Output> {
408 use futures::StreamExt;
409
410 let value = self.value;
411
412 Box::pin(
413 self.recipe
414 .stream(input)
415 .map(move |element| (value.clone(), element)),
416 )
417 }
418}
419
420pub(crate) fn filter_map<I, F, T>(id: I, f: F) -> Subscription<T>
421where
422 I: Hash + 'static,
423 F: Fn(Event) -> Option<T> + MaybeSend + 'static,
424 T: 'static + MaybeSend,
425{
426 from_recipe(Runner {
427 data: id,
428 spawn: |_, events| {
429 use futures::future;
430 use futures::stream::StreamExt;
431
432 events.filter_map(move |event| future::ready(f(event)))
433 },
434 })
435}
436
437struct Runner<I, F, S, T>
438where
439 F: FnOnce(&I, EventStream) -> S,
440 S: Stream<Item = T>,
441{
442 data: I,
443 spawn: F,
444}
445
446impl<I, F, S, T> Recipe for Runner<I, F, S, T>
447where
448 I: Hash + 'static,
449 F: FnOnce(&I, EventStream) -> S,
450 S: Stream<Item = T> + MaybeSend + 'static,
451{
452 type Output = T;
453
454 fn hash(&self, state: &mut Hasher) {
455 std::any::TypeId::of::<I>().hash(state);
456 self.data.hash(state);
457 }
458
459 fn stream(self: Box<Self>, input: EventStream) -> BoxStream<Self::Output> {
460 crate::boxed_stream((self.spawn)(&self.data, input))
461 }
462}