iced_futures/subscription/tracker.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153
use crate::subscription::{Event, Hasher, Recipe};
use crate::{BoxFuture, MaybeSend};
use futures::channel::mpsc;
use futures::sink::{Sink, SinkExt};
use rustc_hash::FxHashMap;
use std::hash::Hasher as _;
/// A registry of subscription streams.
///
/// If you have an application that continuously returns a [`Subscription`],
/// you can use a [`Tracker`] to keep track of the different recipes and keep
/// its executions alive.
///
/// [`Subscription`]: crate::Subscription
#[derive(Debug, Default)]
pub struct Tracker {
subscriptions: FxHashMap<u64, Execution>,
}
#[derive(Debug)]
pub struct Execution {
_cancel: futures::channel::oneshot::Sender<()>,
listener: Option<futures::channel::mpsc::Sender<Event>>,
}
impl Tracker {
/// Creates a new empty [`Tracker`].
pub fn new() -> Self {
Self {
subscriptions: FxHashMap::default(),
}
}
/// Updates the [`Tracker`] with the given [`Subscription`].
///
/// A [`Subscription`] can cause new streams to be spawned or old streams
/// to be closed.
///
/// The [`Tracker`] keeps track of these streams between calls to this
/// method:
///
/// - If the provided [`Subscription`] contains a new [`Recipe`] that is
/// currently not being run, it will spawn a new stream and keep it alive.
/// - On the other hand, if a [`Recipe`] is currently in execution and the
/// provided [`Subscription`] does not contain it anymore, then the
/// [`Tracker`] will close and drop the relevant stream.
///
/// It returns a list of futures that need to be spawned to materialize
/// the [`Tracker`] changes.
///
/// [`Recipe`]: crate::subscription::Recipe
/// [`Subscription`]: crate::Subscription
pub fn update<Message, Receiver>(
&mut self,
recipes: impl Iterator<Item = Box<dyn Recipe<Output = Message>>>,
receiver: Receiver,
) -> Vec<BoxFuture<()>>
where
Message: 'static + MaybeSend,
Receiver: 'static
+ Sink<Message, Error = mpsc::SendError>
+ Unpin
+ MaybeSend
+ Clone,
{
use futures::stream::StreamExt;
let mut futures: Vec<BoxFuture<()>> = Vec::new();
let mut alive = std::collections::HashSet::new();
for recipe in recipes {
let id = {
let mut hasher = Hasher::default();
recipe.hash(&mut hasher);
hasher.finish()
};
let _ = alive.insert(id);
if self.subscriptions.contains_key(&id) {
continue;
}
let (cancel, mut canceled) = futures::channel::oneshot::channel();
// TODO: Use bus if/when it supports async
let (event_sender, event_receiver) =
futures::channel::mpsc::channel(100);
let mut receiver = receiver.clone();
let mut stream = recipe.stream(event_receiver.boxed());
let future = async move {
loop {
let select =
futures::future::select(&mut canceled, stream.next());
match select.await {
futures::future::Either::Left(_)
| futures::future::Either::Right((None, _)) => break,
futures::future::Either::Right((Some(message), _)) => {
let _ = receiver.send(message).await;
}
}
}
};
let _ = self.subscriptions.insert(
id,
Execution {
_cancel: cancel,
listener: if event_sender.is_closed() {
None
} else {
Some(event_sender)
},
},
);
futures.push(Box::pin(future));
}
self.subscriptions.retain(|id, _| alive.contains(id));
futures
}
/// Broadcasts an event to the subscriptions currently alive.
///
/// A subscription's [`Recipe::stream`] always receives a stream of events
/// as input. This stream can be used by some subscription to listen to
/// shell events.
///
/// This method publishes the given event to all the subscription streams
/// currently open.
///
/// [`Recipe::stream`]: crate::subscription::Recipe::stream
pub fn broadcast(&mut self, event: Event) {
self.subscriptions
.values_mut()
.filter_map(|connection| connection.listener.as_mut())
.for_each(|listener| {
if let Err(error) = listener.try_send(event.clone()) {
log::warn!(
"Error sending event to subscription: {error:?}"
);
}
});
}
}