iced_futures/subscription/tracker.rs
1use crate::subscription::{Event, Hasher, Recipe};
2use crate::{BoxFuture, MaybeSend};
3
4use futures::channel::mpsc;
5use futures::sink::{Sink, SinkExt};
6use rustc_hash::FxHashMap;
7
8use std::hash::Hasher as _;
9
10/// A registry of subscription streams.
11///
12/// If you have an application that continuously returns a [`Subscription`],
13/// you can use a [`Tracker`] to keep track of the different recipes and keep
14/// its executions alive.
15///
16/// [`Subscription`]: crate::Subscription
17#[derive(Debug, Default)]
18pub struct Tracker {
19 subscriptions: FxHashMap<u64, Execution>,
20}
21
22#[derive(Debug)]
23pub struct Execution {
24 _cancel: futures::channel::oneshot::Sender<()>,
25 listener: Option<futures::channel::mpsc::Sender<Event>>,
26}
27
28impl Tracker {
29 /// Creates a new empty [`Tracker`].
30 pub fn new() -> Self {
31 Self {
32 subscriptions: FxHashMap::default(),
33 }
34 }
35
36 /// Updates the [`Tracker`] with the given [`Subscription`].
37 ///
38 /// A [`Subscription`] can cause new streams to be spawned or old streams
39 /// to be closed.
40 ///
41 /// The [`Tracker`] keeps track of these streams between calls to this
42 /// method:
43 ///
44 /// - If the provided [`Subscription`] contains a new [`Recipe`] that is
45 /// currently not being run, it will spawn a new stream and keep it alive.
46 /// - On the other hand, if a [`Recipe`] is currently in execution and the
47 /// provided [`Subscription`] does not contain it anymore, then the
48 /// [`Tracker`] will close and drop the relevant stream.
49 ///
50 /// It returns a list of futures that need to be spawned to materialize
51 /// the [`Tracker`] changes.
52 ///
53 /// [`Recipe`]: crate::subscription::Recipe
54 /// [`Subscription`]: crate::Subscription
55 pub fn update<Message, Receiver>(
56 &mut self,
57 recipes: impl Iterator<Item = Box<dyn Recipe<Output = Message>>>,
58 receiver: Receiver,
59 ) -> Vec<BoxFuture<()>>
60 where
61 Message: 'static + MaybeSend,
62 Receiver: 'static + Sink<Message, Error = mpsc::SendError> + Unpin + MaybeSend + Clone,
63 {
64 use futures::stream::StreamExt;
65
66 let mut futures: Vec<BoxFuture<()>> = Vec::new();
67 let mut alive = std::collections::HashSet::new();
68
69 for recipe in recipes {
70 let id = {
71 let mut hasher = Hasher::default();
72 recipe.hash(&mut hasher);
73
74 hasher.finish()
75 };
76
77 let _ = alive.insert(id);
78
79 if self.subscriptions.contains_key(&id) {
80 continue;
81 }
82
83 let (cancel, mut canceled) = futures::channel::oneshot::channel();
84
85 // TODO: Use bus if/when it supports async
86 let (event_sender, event_receiver) = futures::channel::mpsc::channel(100);
87
88 let mut receiver = receiver.clone();
89 let mut stream = recipe.stream(event_receiver.boxed());
90
91 let future = async move {
92 loop {
93 let select = futures::future::select(&mut canceled, stream.next());
94
95 match select.await {
96 futures::future::Either::Left(_)
97 | futures::future::Either::Right((None, _)) => break,
98 futures::future::Either::Right((Some(message), _)) => {
99 let _ = receiver.send(message).await;
100 }
101 }
102 }
103 };
104
105 let _ = self.subscriptions.insert(
106 id,
107 Execution {
108 _cancel: cancel,
109 listener: if event_sender.is_closed() {
110 None
111 } else {
112 Some(event_sender)
113 },
114 },
115 );
116
117 futures.push(Box::pin(future));
118 }
119
120 self.subscriptions.retain(|id, _| alive.contains(id));
121
122 futures
123 }
124
125 /// Broadcasts an event to the subscriptions currently alive.
126 ///
127 /// A subscription's [`Recipe::stream`] always receives a stream of events
128 /// as input. This stream can be used by some subscription to listen to
129 /// shell events.
130 ///
131 /// This method publishes the given event to all the subscription streams
132 /// currently open.
133 ///
134 /// [`Recipe::stream`]: crate::subscription::Recipe::stream
135 pub fn broadcast(&mut self, event: Event) {
136 self.subscriptions
137 .values_mut()
138 .filter_map(|connection| connection.listener.as_mut())
139 .for_each(|listener| {
140 if let Err(error) = listener.try_send(event.clone()) {
141 log::warn!("Error sending event to subscription: {error:?}");
142 }
143 });
144 }
145}