iced_futures/subscription/
tracker.rs

1use crate::subscription::{Event, Hasher, Recipe};
2use crate::{BoxFuture, MaybeSend};
3
4use futures::channel::mpsc;
5use futures::sink::{Sink, SinkExt};
6use rustc_hash::FxHashMap;
7
8use std::hash::Hasher as _;
9
10/// A registry of subscription streams.
11///
12/// If you have an application that continuously returns a [`Subscription`],
13/// you can use a [`Tracker`] to keep track of the different recipes and keep
14/// its executions alive.
15///
16/// [`Subscription`]: crate::Subscription
17#[derive(Debug, Default)]
18pub struct Tracker {
19    subscriptions: FxHashMap<u64, Execution>,
20}
21
22#[derive(Debug)]
23pub struct Execution {
24    _cancel: futures::channel::oneshot::Sender<()>,
25    listener: Option<futures::channel::mpsc::Sender<Event>>,
26}
27
28impl Tracker {
29    /// Creates a new empty [`Tracker`].
30    pub fn new() -> Self {
31        Self {
32            subscriptions: FxHashMap::default(),
33        }
34    }
35
36    /// Updates the [`Tracker`] with the given [`Subscription`].
37    ///
38    /// A [`Subscription`] can cause new streams to be spawned or old streams
39    /// to be closed.
40    ///
41    /// The [`Tracker`] keeps track of these streams between calls to this
42    /// method:
43    ///
44    /// - If the provided [`Subscription`] contains a new [`Recipe`] that is
45    ///   currently not being run, it will spawn a new stream and keep it alive.
46    /// - On the other hand, if a [`Recipe`] is currently in execution and the
47    ///   provided [`Subscription`] does not contain it anymore, then the
48    ///   [`Tracker`] will close and drop the relevant stream.
49    ///
50    /// It returns a list of futures that need to be spawned to materialize
51    /// the [`Tracker`] changes.
52    ///
53    /// [`Recipe`]: crate::subscription::Recipe
54    /// [`Subscription`]: crate::Subscription
55    pub fn update<Message, Receiver>(
56        &mut self,
57        recipes: impl Iterator<Item = Box<dyn Recipe<Output = Message>>>,
58        receiver: Receiver,
59    ) -> Vec<BoxFuture<()>>
60    where
61        Message: 'static + MaybeSend,
62        Receiver: 'static + Sink<Message, Error = mpsc::SendError> + Unpin + MaybeSend + Clone,
63    {
64        use futures::stream::StreamExt;
65
66        let mut futures: Vec<BoxFuture<()>> = Vec::new();
67        let mut alive = std::collections::HashSet::new();
68
69        for recipe in recipes {
70            let id = {
71                let mut hasher = Hasher::default();
72                recipe.hash(&mut hasher);
73
74                hasher.finish()
75            };
76
77            let _ = alive.insert(id);
78
79            if self.subscriptions.contains_key(&id) {
80                continue;
81            }
82
83            let (cancel, mut canceled) = futures::channel::oneshot::channel();
84
85            // TODO: Use bus if/when it supports async
86            let (event_sender, event_receiver) = futures::channel::mpsc::channel(100);
87
88            let mut receiver = receiver.clone();
89            let mut stream = recipe.stream(event_receiver.boxed());
90
91            let future = async move {
92                loop {
93                    let select = futures::future::select(&mut canceled, stream.next());
94
95                    match select.await {
96                        futures::future::Either::Left(_)
97                        | futures::future::Either::Right((None, _)) => break,
98                        futures::future::Either::Right((Some(message), _)) => {
99                            let _ = receiver.send(message).await;
100                        }
101                    }
102                }
103            };
104
105            let _ = self.subscriptions.insert(
106                id,
107                Execution {
108                    _cancel: cancel,
109                    listener: if event_sender.is_closed() {
110                        None
111                    } else {
112                        Some(event_sender)
113                    },
114                },
115            );
116
117            futures.push(Box::pin(future));
118        }
119
120        self.subscriptions.retain(|id, _| alive.contains(id));
121
122        futures
123    }
124
125    /// Broadcasts an event to the subscriptions currently alive.
126    ///
127    /// A subscription's [`Recipe::stream`] always receives a stream of events
128    /// as input. This stream can be used by some subscription to listen to
129    /// shell events.
130    ///
131    /// This method publishes the given event to all the subscription streams
132    /// currently open.
133    ///
134    /// [`Recipe::stream`]: crate::subscription::Recipe::stream
135    pub fn broadcast(&mut self, event: Event) {
136        self.subscriptions
137            .values_mut()
138            .filter_map(|connection| connection.listener.as_mut())
139            .for_each(|listener| {
140                if let Err(error) = listener.try_send(event.clone()) {
141                    log::warn!("Error sending event to subscription: {error:?}");
142                }
143            });
144    }
145}