iced_futures/subscription.rs
1//! Listen to external events in your application.
2mod tracker;
3
4pub use tracker::Tracker;
5
6use crate::core::event;
7use crate::core::theme;
8use crate::core::window;
9use crate::futures::Stream;
10use crate::{BoxStream, MaybeSend};
11
12use std::any::TypeId;
13use std::hash::Hash;
14
15/// A subscription event.
16#[derive(Debug, Clone, PartialEq)]
17pub enum Event {
18 /// A user interacted with a user interface in a window.
19 Interaction {
20 /// The window holding the interface of the interaction.
21 window: window::Id,
22 /// The [`Event`] describing the interaction.
23 ///
24 /// [`Event`]: event::Event
25 event: event::Event,
26
27 /// The [`event::Status`] of the interaction.
28 status: event::Status,
29 },
30
31 /// The system theme has changed.
32 SystemThemeChanged(theme::Mode),
33
34 /// A platform specific event.
35 PlatformSpecific(PlatformSpecific),
36}
37
38/// A platform specific event
39#[derive(Debug, Clone, PartialEq, Eq)]
40pub enum PlatformSpecific {
41 /// A MacOS specific event
42 MacOS(MacOS),
43}
44
45/// Describes an event specific to MacOS
46#[derive(Debug, Clone, PartialEq, Eq)]
47pub enum MacOS {
48 /// Triggered when the app receives an URL from the system
49 ///
50 /// _**Note:** For this event to be triggered, the executable needs to be properly [bundled]!_
51 ///
52 /// [bundled]: https://developer.apple.com/library/archive/documentation/CoreFoundation/Conceptual/CFBundles/BundleTypes/BundleTypes.html#//apple_ref/doc/uid/10000123i-CH101-SW19
53 ReceivedUrl(String),
54}
55
56/// A stream of runtime events.
57///
58/// It is the input of a [`Subscription`].
59pub type EventStream = BoxStream<Event>;
60
61/// The hasher used for identifying subscriptions.
62pub type Hasher = rustc_hash::FxHasher;
63
64/// A request to listen to external events.
65///
66/// Besides performing async actions on demand with `Task`, most
67/// applications also need to listen to external events passively.
68///
69/// A [`Subscription`] is normally provided to some runtime, like a `Task`,
70/// and it will generate events as long as the user keeps requesting it.
71///
72/// For instance, you can use a [`Subscription`] to listen to a `WebSocket`
73/// connection, keyboard presses, mouse events, time ticks, etc.
74///
75/// # The Lifetime of a [`Subscription`]
76/// Much like a [`Future`] or a [`Stream`], a [`Subscription`] does not produce any effects
77/// on its own. For a [`Subscription`] to run, it must be returned to the iced runtime—normally
78/// in the `subscription` function of an `application` or a `daemon`.
79///
80/// When a [`Subscription`] is provided to the runtime for the first time, the runtime will
81/// start running it asynchronously. Running a [`Subscription`] consists in building its underlying
82/// [`Stream`] and executing it in an async runtime.
83///
84/// Therefore, you can think of a [`Subscription`] as a "stream builder". It simply represents a way
85/// to build a certain [`Stream`] together with some way to _identify_ it.
86///
87/// Identification is important because when a specific [`Subscription`] stops being returned to the
88/// iced runtime, the runtime will kill its associated [`Stream`]. The runtime uses the identity of a
89/// [`Subscription`] to keep track of it.
90///
91/// This way, iced allows you to declaratively __subscribe__ to particular streams of data temporarily
92/// and whenever necessary.
93///
94/// ```
95/// # mod iced {
96/// # pub mod time {
97/// # pub use iced_futures::backend::default::time::every;
98/// # pub use std::time::{Duration, Instant};
99/// # }
100/// #
101/// # pub use iced_futures::Subscription;
102/// # }
103/// use iced::time::{self, Duration, Instant};
104/// use iced::Subscription;
105///
106/// struct State {
107/// timer_enabled: bool,
108/// }
109///
110/// fn subscription(state: &State) -> Subscription<Instant> {
111/// if state.timer_enabled {
112/// time::every(Duration::from_secs(1))
113/// } else {
114/// Subscription::none()
115/// }
116/// }
117/// ```
118///
119/// [`Future`]: std::future::Future
120#[must_use = "`Subscription` must be returned to the runtime to take effect; normally in your `subscription` function."]
121pub struct Subscription<T> {
122 recipes: Vec<Box<dyn Recipe<Output = T>>>,
123}
124
125impl<T> Subscription<T> {
126 /// Returns an empty [`Subscription`] that will not produce any output.
127 pub fn none() -> Self {
128 Self {
129 recipes: Vec::new(),
130 }
131 }
132
133 /// Returns a [`Subscription`] that will call the given function to create and
134 /// asynchronously run the given [`Stream`].
135 ///
136 /// # Creating an asynchronous worker with bidirectional communication
137 /// You can leverage this helper to create a [`Subscription`] that spawns
138 /// an asynchronous worker in the background and establish a channel of
139 /// communication with an `iced` application.
140 ///
141 /// You can achieve this by creating an `mpsc` channel inside the closure
142 /// and returning the `Sender` as a `Message` for the `Application`:
143 ///
144 /// ```
145 /// # mod iced {
146 /// # pub use iced_futures::Subscription;
147 /// # pub use iced_futures::futures;
148 /// # pub use iced_futures::stream;
149 /// # }
150 /// use iced::futures::channel::mpsc;
151 /// use iced::futures::sink::SinkExt;
152 /// use iced::futures::Stream;
153 /// use iced::stream;
154 /// use iced::Subscription;
155 ///
156 /// pub enum Event {
157 /// Ready(mpsc::Sender<Input>),
158 /// WorkFinished,
159 /// // ...
160 /// }
161 ///
162 /// enum Input {
163 /// DoSomeWork,
164 /// // ...
165 /// }
166 ///
167 /// fn some_worker() -> impl Stream<Item = Event> {
168 /// stream::channel(100, async |mut output| {
169 /// // Create channel
170 /// let (sender, mut receiver) = mpsc::channel(100);
171 ///
172 /// // Send the sender back to the application
173 /// output.send(Event::Ready(sender)).await;
174 ///
175 /// loop {
176 /// use iced_futures::futures::StreamExt;
177 ///
178 /// // Read next input sent from `Application`
179 /// let input = receiver.select_next_some().await;
180 ///
181 /// match input {
182 /// Input::DoSomeWork => {
183 /// // Do some async work...
184 ///
185 /// // Finally, we can optionally produce a message to tell the
186 /// // `Application` the work is done
187 /// output.send(Event::WorkFinished).await;
188 /// }
189 /// }
190 /// }
191 /// })
192 /// }
193 ///
194 /// fn subscription() -> Subscription<Event> {
195 /// Subscription::run(some_worker)
196 /// }
197 /// ```
198 ///
199 /// Check out the [`websocket`] example, which showcases this pattern to maintain a `WebSocket`
200 /// connection open.
201 ///
202 /// [`websocket`]: https://github.com/iced-rs/iced/tree/master/examples/websocket
203 pub fn run<S>(builder: fn() -> S) -> Self
204 where
205 S: Stream<Item = T> + MaybeSend + 'static,
206 T: 'static,
207 {
208 from_recipe(Runner {
209 data: builder,
210 spawn: |builder, _| builder(),
211 })
212 }
213
214 /// Returns a [`Subscription`] that will create and asynchronously run the
215 /// given [`Stream`].
216 ///
217 /// Both the `data` and the function pointer will be used to uniquely identify
218 /// the [`Subscription`].
219 pub fn run_with<D, S>(data: D, builder: fn(&D) -> S) -> Self
220 where
221 D: Hash + 'static,
222 S: Stream<Item = T> + MaybeSend + 'static,
223 T: 'static,
224 {
225 from_recipe(Runner {
226 data: (data, builder),
227 spawn: |(data, builder), _| builder(data),
228 })
229 }
230
231 /// Batches all the provided subscriptions and returns the resulting
232 /// [`Subscription`].
233 pub fn batch(subscriptions: impl IntoIterator<Item = Subscription<T>>) -> Self {
234 Self {
235 recipes: subscriptions
236 .into_iter()
237 .flat_map(|subscription| subscription.recipes)
238 .collect(),
239 }
240 }
241
242 /// Adds a value to the [`Subscription`] context.
243 ///
244 /// The value will be part of the identity of a [`Subscription`].
245 pub fn with<A>(self, value: A) -> Subscription<(A, T)>
246 where
247 T: 'static,
248 A: std::hash::Hash + Clone + Send + Sync + 'static,
249 {
250 struct With<A, B> {
251 recipe: Box<dyn Recipe<Output = A>>,
252 value: B,
253 }
254
255 impl<A, B> Recipe for With<A, B>
256 where
257 A: 'static,
258 B: 'static + std::hash::Hash + Clone + Send + Sync,
259 {
260 type Output = (B, A);
261
262 fn hash(&self, state: &mut Hasher) {
263 std::any::TypeId::of::<B>().hash(state);
264 self.value.hash(state);
265 self.recipe.hash(state);
266 }
267
268 fn stream(self: Box<Self>, input: EventStream) -> BoxStream<Self::Output> {
269 use futures::StreamExt;
270
271 let value = self.value;
272
273 Box::pin(
274 self.recipe
275 .stream(input)
276 .map(move |element| (value.clone(), element)),
277 )
278 }
279 }
280
281 Subscription {
282 recipes: self
283 .recipes
284 .into_iter()
285 .map(|recipe| {
286 Box::new(With {
287 recipe,
288 value: value.clone(),
289 }) as Box<dyn Recipe<Output = (A, T)>>
290 })
291 .collect(),
292 }
293 }
294
295 /// Transforms the [`Subscription`] output with the given function.
296 ///
297 /// The closure provided must be a non-capturing closure.
298 pub fn map<F, A>(self, f: F) -> Subscription<A>
299 where
300 T: 'static,
301 F: Fn(T) -> A + MaybeSend + Clone + 'static,
302 A: 'static,
303 {
304 const {
305 check_zero_sized::<F>();
306 }
307
308 struct Map<A, B, F>
309 where
310 F: Fn(A) -> B + 'static,
311 {
312 recipe: Box<dyn Recipe<Output = A>>,
313 mapper: F,
314 }
315
316 impl<A, B, F> Recipe for Map<A, B, F>
317 where
318 A: 'static,
319 B: 'static,
320 F: Fn(A) -> B + 'static + MaybeSend,
321 {
322 type Output = B;
323
324 fn hash(&self, state: &mut Hasher) {
325 TypeId::of::<F>().hash(state);
326 self.recipe.hash(state);
327 }
328
329 fn stream(self: Box<Self>, input: EventStream) -> BoxStream<Self::Output> {
330 use futures::StreamExt;
331
332 Box::pin(self.recipe.stream(input).map(self.mapper))
333 }
334 }
335
336 Subscription {
337 recipes: self
338 .recipes
339 .into_iter()
340 .map(|recipe| {
341 Box::new(Map {
342 recipe,
343 mapper: f.clone(),
344 }) as Box<dyn Recipe<Output = A>>
345 })
346 .collect(),
347 }
348 }
349
350 /// Transforms the [`Subscription`] output with the given function, yielding only
351 /// values only when the function returns `Some(A)`.
352 ///
353 /// The closure provided must be a non-capturing closure.
354 pub fn filter_map<F, A>(mut self, f: F) -> Subscription<A>
355 where
356 T: MaybeSend + 'static,
357 F: Fn(T) -> Option<A> + MaybeSend + Clone + 'static,
358 A: MaybeSend + 'static,
359 {
360 const {
361 check_zero_sized::<F>();
362 }
363
364 struct FilterMap<A, B, F>
365 where
366 F: Fn(A) -> Option<B> + 'static,
367 {
368 recipe: Box<dyn Recipe<Output = A>>,
369 mapper: F,
370 }
371
372 impl<A, B, F> Recipe for FilterMap<A, B, F>
373 where
374 A: 'static,
375 B: 'static + MaybeSend,
376 F: Fn(A) -> Option<B> + MaybeSend,
377 {
378 type Output = B;
379
380 fn hash(&self, state: &mut Hasher) {
381 TypeId::of::<F>().hash(state);
382 self.recipe.hash(state);
383 }
384
385 fn stream(self: Box<Self>, input: EventStream) -> BoxStream<Self::Output> {
386 use futures::StreamExt;
387 use futures::future;
388
389 let mapper = self.mapper;
390
391 Box::pin(
392 self.recipe
393 .stream(input)
394 .filter_map(move |a| future::ready(mapper(a))),
395 )
396 }
397 }
398
399 Subscription {
400 recipes: self
401 .recipes
402 .drain(..)
403 .map(|recipe| {
404 Box::new(FilterMap {
405 recipe,
406 mapper: f.clone(),
407 }) as Box<dyn Recipe<Output = A>>
408 })
409 .collect(),
410 }
411 }
412
413 /// Returns the amount of recipe units in this [`Subscription`].
414 pub fn units(&self) -> usize {
415 self.recipes.len()
416 }
417}
418
419/// Creates a [`Subscription`] from a [`Recipe`] describing it.
420pub fn from_recipe<T>(recipe: impl Recipe<Output = T> + 'static) -> Subscription<T> {
421 Subscription {
422 recipes: vec![Box::new(recipe)],
423 }
424}
425
426/// Returns the different recipes of the [`Subscription`].
427pub fn into_recipes<T>(subscription: Subscription<T>) -> Vec<Box<dyn Recipe<Output = T>>> {
428 subscription.recipes
429}
430
431impl<T> std::fmt::Debug for Subscription<T> {
432 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
433 f.debug_struct("Subscription").finish()
434 }
435}
436
437/// The description of a [`Subscription`].
438///
439/// A [`Recipe`] is the internal definition of a [`Subscription`]. It is used
440/// by runtimes to run and identify subscriptions. You can use it to create your
441/// own!
442pub trait Recipe {
443 /// The events that will be produced by a [`Subscription`] with this
444 /// [`Recipe`].
445 type Output;
446
447 /// Hashes the [`Recipe`].
448 ///
449 /// This is used by runtimes to uniquely identify a [`Subscription`].
450 fn hash(&self, state: &mut Hasher);
451
452 /// Executes the [`Recipe`] and produces the stream of events of its
453 /// [`Subscription`].
454 fn stream(self: Box<Self>, input: EventStream) -> BoxStream<Self::Output>;
455}
456
457/// Creates a [`Subscription`] from a hashable id and a filter function.
458pub fn filter_map<I, F, T>(id: I, f: F) -> Subscription<T>
459where
460 I: Hash + 'static,
461 F: Fn(Event) -> Option<T> + MaybeSend + 'static,
462 T: 'static + MaybeSend,
463{
464 from_recipe(Runner {
465 data: id,
466 spawn: |_, events| {
467 use futures::future;
468 use futures::stream::StreamExt;
469
470 events.filter_map(move |event| future::ready(f(event)))
471 },
472 })
473}
474
475struct Runner<I, F, S, T>
476where
477 F: FnOnce(&I, EventStream) -> S,
478 S: Stream<Item = T>,
479{
480 data: I,
481 spawn: F,
482}
483
484impl<I, F, S, T> Recipe for Runner<I, F, S, T>
485where
486 I: Hash + 'static,
487 F: FnOnce(&I, EventStream) -> S,
488 S: Stream<Item = T> + MaybeSend + 'static,
489{
490 type Output = T;
491
492 fn hash(&self, state: &mut Hasher) {
493 std::any::TypeId::of::<I>().hash(state);
494 self.data.hash(state);
495 }
496
497 fn stream(self: Box<Self>, input: EventStream) -> BoxStream<Self::Output> {
498 crate::boxed_stream((self.spawn)(&self.data, input))
499 }
500}
501
502const fn check_zero_sized<T>() {
503 if std::mem::size_of::<T>() != 0 {
504 panic!(
505 "The Subscription closure provided is not non-capturing. \
506 Closures given to Subscription::map or filter_map cannot \
507 capture external variables. If you need to capture state, \
508 consider using Subscription::with."
509 );
510 }
511}