1use crate::Error;
2use crate::core::time::{Duration, SystemTime};
3use crate::span;
4use crate::theme;
5
6use semver::Version;
7use serde::{Deserialize, Serialize};
8use tokio::io::{self, AsyncReadExt, AsyncWriteExt};
9use tokio::net;
10use tokio::sync::{Mutex, mpsc};
11use tokio::task;
12use tokio::time;
13
14use std::sync::Arc;
15use std::sync::atomic::{self, AtomicBool};
16use std::thread;
17
18#[derive(Debug, Clone)]
19pub struct Client {
20 sender: mpsc::Sender<Action>,
21 is_connected: Arc<AtomicBool>,
22 _handle: Arc<thread::JoinHandle<()>>,
23}
24
25#[derive(Debug, Clone, Serialize, Deserialize)]
26pub enum Message {
27 Connected {
28 at: SystemTime,
29 name: String,
30 version: Version,
31 theme: Option<theme::Palette>,
32 can_time_travel: bool,
33 },
34 EventLogged {
35 at: SystemTime,
36 event: Event,
37 },
38 Quit {
39 at: SystemTime,
40 },
41}
42
43#[derive(Debug, Clone, Serialize, Deserialize)]
44pub enum Event {
45 ThemeChanged(theme::Palette),
46 SpanStarted(span::Stage),
47 SpanFinished(span::Stage, Duration),
48 MessageLogged { number: usize, message: String },
49 CommandsSpawned(usize),
50 SubscriptionsTracked(usize),
51 LayersRendered(usize),
52}
53
54impl Client {
55 pub fn log(&self, event: Event) {
56 let _ = self.sender.try_send(Action::Send(Message::EventLogged {
57 at: SystemTime::now(),
58 event,
59 }));
60 }
61
62 pub fn is_connected(&self) -> bool {
63 self.is_connected.load(atomic::Ordering::Relaxed)
64 }
65
66 pub fn quit(&self) {
67 let _ = self.sender.try_send(Action::Send(Message::Quit {
68 at: SystemTime::now(),
69 }));
70 }
71
72 pub fn subscribe(&self) -> mpsc::Receiver<Command> {
73 let (sender, receiver) = mpsc::channel(100);
74 let _ = self.sender.try_send(Action::Forward(sender));
75
76 receiver
77 }
78}
79
80#[derive(Debug, Clone, Default)]
81pub struct Metadata {
82 pub name: &'static str,
83 pub theme: Option<theme::Palette>,
84 pub can_time_travel: bool,
85}
86
87#[must_use]
88pub fn connect(metadata: Metadata) -> Client {
89 let (sender, receiver) = mpsc::channel(10_000);
90 let is_connected = Arc::new(AtomicBool::new(false));
91
92 let handle = {
93 let is_connected = is_connected.clone();
94
95 std::thread::spawn(move || run(metadata, is_connected, receiver))
96 };
97
98 Client {
99 sender,
100 is_connected,
101 _handle: Arc::new(handle),
102 }
103}
104
105enum Action {
106 Send(Message),
107 Forward(mpsc::Sender<Command>),
108}
109
110#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
111pub enum Command {
112 RewindTo { message: usize },
113 GoLive,
114}
115
116#[tokio::main]
117async fn run(
118 mut metadata: Metadata,
119 is_connected: Arc<AtomicBool>,
120 mut receiver: mpsc::Receiver<Action>,
121) {
122 let version = semver::Version::parse(env!("CARGO_PKG_VERSION")).expect("Parse package version");
123
124 let command_sender = {
125 let (sender, _receiver) = mpsc::channel(1);
127
128 Arc::new(Mutex::new(sender))
129 };
130
131 loop {
132 match _connect().await {
133 Ok(stream) => {
134 is_connected.store(true, atomic::Ordering::Relaxed);
135
136 let (mut reader, mut writer) = stream.into_split();
137
138 let _ = send(
139 &mut writer,
140 Message::Connected {
141 at: SystemTime::now(),
142 name: metadata.name.to_owned(),
143 version: version.clone(),
144 can_time_travel: metadata.can_time_travel,
145 theme: metadata.theme,
146 },
147 )
148 .await;
149
150 {
151 let command_sender = command_sender.clone();
152
153 drop(task::spawn(async move {
154 let mut buffer = Vec::new();
155
156 loop {
157 match receive(&mut reader, &mut buffer).await {
158 Ok(command) => {
159 match command {
160 Command::RewindTo { .. } | Command::GoLive
161 if !metadata.can_time_travel =>
162 {
163 continue;
164 }
165 _ => {}
166 }
167
168 let sender = command_sender.lock().await;
169 let _ = sender.send(command).await;
170 }
171 Err(Error::DecodingFailed(_)) => {}
172 Err(Error::IOFailed(_)) => break,
173 }
174 }
175 }))
176 };
177
178 while let Some(action) = receiver.recv().await {
179 match action {
180 Action::Send(message) => {
181 if let Message::EventLogged {
182 event: Event::ThemeChanged(palette),
183 ..
184 } = message
185 {
186 metadata.theme = Some(palette);
187 }
188
189 match send(&mut writer, message).await {
190 Ok(()) => {}
191 Err(error) => {
192 if error.kind() != io::ErrorKind::BrokenPipe {
193 log::warn!("Error sending message to server: {error}");
194 }
195
196 is_connected.store(false, atomic::Ordering::Relaxed);
197 break;
198 }
199 }
200 }
201 Action::Forward(sender) => {
202 *command_sender.lock().await = sender;
203 }
204 }
205 }
206 }
207 Err(_) => {
208 is_connected.store(false, atomic::Ordering::Relaxed);
209 time::sleep(time::Duration::from_secs(2)).await;
210 }
211 }
212 }
213}
214
215pub fn server_address_from_env() -> String {
222 const DEFAULT_ADDRESS: &str = "127.0.0.1:9167";
223
224 std::env::var("ICED_BEACON_SERVER_ADDRESS").unwrap_or_else(|_| String::from(DEFAULT_ADDRESS))
225}
226
227async fn _connect() -> Result<net::TcpStream, io::Error> {
228 log::debug!("Attempting to connect to server...");
229 let stream = net::TcpStream::connect(server_address_from_env()).await?;
230
231 stream.set_nodelay(true)?;
232 stream.writable().await?;
233
234 Ok(stream)
235}
236
237async fn send(stream: &mut net::tcp::OwnedWriteHalf, message: Message) -> Result<(), io::Error> {
238 let bytes = bincode::serialize(&message).expect("Encode input message");
239 let size = bytes.len() as u64;
240
241 stream.write_all(&size.to_be_bytes()).await?;
242 stream.write_all(&bytes).await?;
243 stream.flush().await?;
244
245 Ok(())
246}
247
248async fn receive(
249 stream: &mut net::tcp::OwnedReadHalf,
250 buffer: &mut Vec<u8>,
251) -> Result<Command, Error> {
252 let size = stream.read_u64().await? as usize;
253
254 if buffer.len() < size {
255 buffer.resize(size, 0);
256 }
257
258 let _n = stream.read_exact(&mut buffer[..size]).await?;
259
260 Ok(bincode::deserialize(buffer)?)
261}