iced_futures/subscription/
tracker.rs

1use crate::subscription::{Event, Hasher, Recipe};
2use crate::{BoxFuture, MaybeSend};
3
4use futures::channel::mpsc;
5use futures::sink::{Sink, SinkExt};
6use rustc_hash::FxHashMap;
7
8use std::hash::Hasher as _;
9
10/// A registry of subscription streams.
11///
12/// If you have an application that continuously returns a [`Subscription`],
13/// you can use a [`Tracker`] to keep track of the different recipes and keep
14/// its executions alive.
15///
16/// [`Subscription`]: crate::Subscription
17#[derive(Debug, Default)]
18pub struct Tracker {
19    subscriptions: FxHashMap<u64, Execution>,
20}
21
22#[derive(Debug)]
23pub struct Execution {
24    _cancel: futures::channel::oneshot::Sender<()>,
25    listener: Option<futures::channel::mpsc::Sender<Event>>,
26}
27
28impl Tracker {
29    /// Creates a new empty [`Tracker`].
30    pub fn new() -> Self {
31        Self {
32            subscriptions: FxHashMap::default(),
33        }
34    }
35
36    /// Updates the [`Tracker`] with the given [`Subscription`].
37    ///
38    /// A [`Subscription`] can cause new streams to be spawned or old streams
39    /// to be closed.
40    ///
41    /// The [`Tracker`] keeps track of these streams between calls to this
42    /// method:
43    ///
44    /// - If the provided [`Subscription`] contains a new [`Recipe`] that is
45    ///   currently not being run, it will spawn a new stream and keep it alive.
46    /// - On the other hand, if a [`Recipe`] is currently in execution and the
47    ///   provided [`Subscription`] does not contain it anymore, then the
48    ///   [`Tracker`] will close and drop the relevant stream.
49    ///
50    /// It returns a list of futures that need to be spawned to materialize
51    /// the [`Tracker`] changes.
52    ///
53    /// [`Recipe`]: crate::subscription::Recipe
54    /// [`Subscription`]: crate::Subscription
55    pub fn update<Message, Receiver>(
56        &mut self,
57        recipes: impl Iterator<Item = Box<dyn Recipe<Output = Message>>>,
58        receiver: Receiver,
59    ) -> Vec<BoxFuture<()>>
60    where
61        Message: 'static + MaybeSend,
62        Receiver: 'static
63            + Sink<Message, Error = mpsc::SendError>
64            + Unpin
65            + MaybeSend
66            + Clone,
67    {
68        use futures::stream::StreamExt;
69
70        let mut futures: Vec<BoxFuture<()>> = Vec::new();
71        let mut alive = std::collections::HashSet::new();
72
73        for recipe in recipes {
74            let id = {
75                let mut hasher = Hasher::default();
76                recipe.hash(&mut hasher);
77
78                hasher.finish()
79            };
80
81            let _ = alive.insert(id);
82
83            if self.subscriptions.contains_key(&id) {
84                continue;
85            }
86
87            let (cancel, mut canceled) = futures::channel::oneshot::channel();
88
89            // TODO: Use bus if/when it supports async
90            let (event_sender, event_receiver) =
91                futures::channel::mpsc::channel(100);
92
93            let mut receiver = receiver.clone();
94            let mut stream = recipe.stream(event_receiver.boxed());
95
96            let future = async move {
97                loop {
98                    let select =
99                        futures::future::select(&mut canceled, stream.next());
100
101                    match select.await {
102                        futures::future::Either::Left(_)
103                        | futures::future::Either::Right((None, _)) => break,
104                        futures::future::Either::Right((Some(message), _)) => {
105                            let _ = receiver.send(message).await;
106                        }
107                    }
108                }
109            };
110
111            let _ = self.subscriptions.insert(
112                id,
113                Execution {
114                    _cancel: cancel,
115                    listener: if event_sender.is_closed() {
116                        None
117                    } else {
118                        Some(event_sender)
119                    },
120                },
121            );
122
123            futures.push(Box::pin(future));
124        }
125
126        self.subscriptions.retain(|id, _| alive.contains(id));
127
128        futures
129    }
130
131    /// Broadcasts an event to the subscriptions currently alive.
132    ///
133    /// A subscription's [`Recipe::stream`] always receives a stream of events
134    /// as input. This stream can be used by some subscription to listen to
135    /// shell events.
136    ///
137    /// This method publishes the given event to all the subscription streams
138    /// currently open.
139    ///
140    /// [`Recipe::stream`]: crate::subscription::Recipe::stream
141    pub fn broadcast(&mut self, event: Event) {
142        self.subscriptions
143            .values_mut()
144            .filter_map(|connection| connection.listener.as_mut())
145            .for_each(|listener| {
146                if let Err(error) = listener.try_send(event.clone()) {
147                    log::warn!(
148                        "Error sending event to subscription: {error:?}"
149                    );
150                }
151            });
152    }
153}