1use crate::subscription;
3use crate::{BoxStream, Executor, MaybeSend};
4
5use futures::{Sink, SinkExt, channel::mpsc};
6use std::marker::PhantomData;
7
8#[derive(Debug)]
15pub struct Runtime<Executor, Sender, Message> {
16 executor: Executor,
17 sender: Sender,
18 subscriptions: subscription::Tracker,
19 _message: PhantomData<Message>,
20}
21
22impl<Executor, Sender, Message> Runtime<Executor, Sender, Message>
23where
24 Executor: self::Executor,
25 Sender: Sink<Message, Error = mpsc::SendError> + Unpin + MaybeSend + Clone + 'static,
26 Message: MaybeSend + 'static,
27{
28 pub fn new(executor: Executor, sender: Sender) -> Self {
34 Self {
35 executor,
36 sender,
37 subscriptions: subscription::Tracker::new(),
38 _message: PhantomData,
39 }
40 }
41
42 pub fn enter<R>(&self, f: impl FnOnce() -> R) -> R {
46 self.executor.enter(f)
47 }
48
49 #[cfg(not(target_arch = "wasm32"))]
51 pub fn block_on<T>(&mut self, future: impl Future<Output = T>) -> T {
52 self.executor.block_on(future)
53 }
54
55 pub fn run(&mut self, stream: BoxStream<Message>) {
62 use futures::{FutureExt, StreamExt};
63
64 let sender = self.sender.clone();
65 let future = stream.map(Ok).forward(sender).map(|result| match result {
66 Ok(()) => (),
67 Err(error) => {
68 log::warn!("Stream could not run until completion: {error}");
69 }
70 });
71
72 self.executor.spawn(future);
73 }
74
75 pub fn send(&mut self, message: Message) {
77 let mut sender = self.sender.clone();
78
79 self.executor.spawn(async move {
80 let _ = sender.send(message).await;
81 });
82 }
83
84 pub fn track(
92 &mut self,
93 recipes: impl IntoIterator<Item = Box<dyn subscription::Recipe<Output = Message>>>,
94 ) {
95 let Runtime {
96 executor,
97 subscriptions,
98 sender,
99 ..
100 } = self;
101
102 let futures = executor.enter(|| subscriptions.update(recipes.into_iter(), sender.clone()));
103
104 for future in futures {
105 executor.spawn(future);
106 }
107 }
108
109 pub fn broadcast(&mut self, event: subscription::Event) {
116 self.subscriptions.broadcast(event);
117 }
118}