iced_futures/subscription/tracker.rs
1use crate::subscription::{Event, Hasher, Recipe};
2use crate::{BoxFuture, MaybeSend};
3
4use futures::channel::mpsc;
5use futures::sink::{Sink, SinkExt};
6use rustc_hash::FxHashMap;
7
8use std::hash::Hasher as _;
9
10/// A registry of subscription streams.
11///
12/// If you have an application that continuously returns a [`Subscription`],
13/// you can use a [`Tracker`] to keep track of the different recipes and keep
14/// its executions alive.
15///
16/// [`Subscription`]: crate::Subscription
17#[derive(Debug, Default)]
18pub struct Tracker {
19 subscriptions: FxHashMap<u64, Execution>,
20}
21
22#[derive(Debug)]
23pub struct Execution {
24 _cancel: futures::channel::oneshot::Sender<()>,
25 listener: Option<futures::channel::mpsc::Sender<Event>>,
26}
27
28impl Tracker {
29 /// Creates a new empty [`Tracker`].
30 pub fn new() -> Self {
31 Self {
32 subscriptions: FxHashMap::default(),
33 }
34 }
35
36 /// Updates the [`Tracker`] with the given [`Subscription`].
37 ///
38 /// A [`Subscription`] can cause new streams to be spawned or old streams
39 /// to be closed.
40 ///
41 /// The [`Tracker`] keeps track of these streams between calls to this
42 /// method:
43 ///
44 /// - If the provided [`Subscription`] contains a new [`Recipe`] that is
45 /// currently not being run, it will spawn a new stream and keep it alive.
46 /// - On the other hand, if a [`Recipe`] is currently in execution and the
47 /// provided [`Subscription`] does not contain it anymore, then the
48 /// [`Tracker`] will close and drop the relevant stream.
49 ///
50 /// It returns a list of futures that need to be spawned to materialize
51 /// the [`Tracker`] changes.
52 ///
53 /// [`Recipe`]: crate::subscription::Recipe
54 /// [`Subscription`]: crate::Subscription
55 pub fn update<Message, Receiver>(
56 &mut self,
57 recipes: impl Iterator<Item = Box<dyn Recipe<Output = Message>>>,
58 receiver: Receiver,
59 ) -> Vec<BoxFuture<()>>
60 where
61 Message: 'static + MaybeSend,
62 Receiver: 'static
63 + Sink<Message, Error = mpsc::SendError>
64 + Unpin
65 + MaybeSend
66 + Clone,
67 {
68 use futures::stream::StreamExt;
69
70 let mut futures: Vec<BoxFuture<()>> = Vec::new();
71 let mut alive = std::collections::HashSet::new();
72
73 for recipe in recipes {
74 let id = {
75 let mut hasher = Hasher::default();
76 recipe.hash(&mut hasher);
77
78 hasher.finish()
79 };
80
81 let _ = alive.insert(id);
82
83 if self.subscriptions.contains_key(&id) {
84 continue;
85 }
86
87 let (cancel, mut canceled) = futures::channel::oneshot::channel();
88
89 // TODO: Use bus if/when it supports async
90 let (event_sender, event_receiver) =
91 futures::channel::mpsc::channel(100);
92
93 let mut receiver = receiver.clone();
94 let mut stream = recipe.stream(event_receiver.boxed());
95
96 let future = async move {
97 loop {
98 let select =
99 futures::future::select(&mut canceled, stream.next());
100
101 match select.await {
102 futures::future::Either::Left(_)
103 | futures::future::Either::Right((None, _)) => break,
104 futures::future::Either::Right((Some(message), _)) => {
105 let _ = receiver.send(message).await;
106 }
107 }
108 }
109 };
110
111 let _ = self.subscriptions.insert(
112 id,
113 Execution {
114 _cancel: cancel,
115 listener: if event_sender.is_closed() {
116 None
117 } else {
118 Some(event_sender)
119 },
120 },
121 );
122
123 futures.push(Box::pin(future));
124 }
125
126 self.subscriptions.retain(|id, _| alive.contains(id));
127
128 futures
129 }
130
131 /// Broadcasts an event to the subscriptions currently alive.
132 ///
133 /// A subscription's [`Recipe::stream`] always receives a stream of events
134 /// as input. This stream can be used by some subscription to listen to
135 /// shell events.
136 ///
137 /// This method publishes the given event to all the subscription streams
138 /// currently open.
139 ///
140 /// [`Recipe::stream`]: crate::subscription::Recipe::stream
141 pub fn broadcast(&mut self, event: Event) {
142 self.subscriptions
143 .values_mut()
144 .filter_map(|connection| connection.listener.as_mut())
145 .for_each(|listener| {
146 if let Err(error) = listener.try_send(event.clone()) {
147 log::warn!(
148 "Error sending event to subscription: {error:?}"
149 );
150 }
151 });
152 }
153}