1use crate::Error;
2use crate::core::time::{Duration, SystemTime};
3use crate::span;
4use crate::theme;
5
6use semver::Version;
7use serde::{Deserialize, Serialize};
8use tokio::io::{self, AsyncReadExt, AsyncWriteExt};
9use tokio::net;
10use tokio::sync::{Mutex, mpsc};
11use tokio::task;
12use tokio::time;
13
14use std::sync::Arc;
15use std::sync::atomic::{self, AtomicBool};
16use std::thread;
17
18pub const SERVER_ADDRESS: &str = "127.0.0.1:9167";
19
20#[derive(Debug, Clone)]
21pub struct Client {
22 sender: mpsc::Sender<Action>,
23 is_connected: Arc<AtomicBool>,
24 _handle: Arc<thread::JoinHandle<()>>,
25}
26
27#[derive(Debug, Clone, Serialize, Deserialize)]
28pub enum Message {
29 Connected {
30 at: SystemTime,
31 name: String,
32 version: Version,
33 theme: Option<theme::Palette>,
34 can_time_travel: bool,
35 },
36 EventLogged {
37 at: SystemTime,
38 event: Event,
39 },
40 Quit {
41 at: SystemTime,
42 },
43}
44
45#[derive(Debug, Clone, Serialize, Deserialize)]
46pub enum Event {
47 ThemeChanged(theme::Palette),
48 SpanStarted(span::Stage),
49 SpanFinished(span::Stage, Duration),
50 MessageLogged { number: usize, message: String },
51 CommandsSpawned(usize),
52 SubscriptionsTracked(usize),
53 LayersRendered(usize),
54}
55
56impl Client {
57 pub fn log(&self, event: Event) {
58 let _ = self.sender.try_send(Action::Send(Message::EventLogged {
59 at: SystemTime::now(),
60 event,
61 }));
62 }
63
64 pub fn is_connected(&self) -> bool {
65 self.is_connected.load(atomic::Ordering::Relaxed)
66 }
67
68 pub fn quit(&self) {
69 let _ = self.sender.try_send(Action::Send(Message::Quit {
70 at: SystemTime::now(),
71 }));
72 }
73
74 pub fn subscribe(&self) -> mpsc::Receiver<Command> {
75 let (sender, receiver) = mpsc::channel(100);
76 let _ = self.sender.try_send(Action::Forward(sender));
77
78 receiver
79 }
80}
81
82#[derive(Debug, Clone, Default)]
83pub struct Metadata {
84 pub name: &'static str,
85 pub theme: Option<theme::Palette>,
86 pub can_time_travel: bool,
87}
88
89#[must_use]
90pub fn connect(metadata: Metadata) -> Client {
91 let (sender, receiver) = mpsc::channel(10_000);
92 let is_connected = Arc::new(AtomicBool::new(false));
93
94 let handle = {
95 let is_connected = is_connected.clone();
96
97 std::thread::spawn(move || run(metadata, is_connected, receiver))
98 };
99
100 Client {
101 sender,
102 is_connected,
103 _handle: Arc::new(handle),
104 }
105}
106
107enum Action {
108 Send(Message),
109 Forward(mpsc::Sender<Command>),
110}
111
112#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
113pub enum Command {
114 RewindTo { message: usize },
115 GoLive,
116}
117
118#[tokio::main]
119async fn run(
120 mut metadata: Metadata,
121 is_connected: Arc<AtomicBool>,
122 mut receiver: mpsc::Receiver<Action>,
123) {
124 let version = semver::Version::parse(env!("CARGO_PKG_VERSION"))
125 .expect("Parse package version");
126
127 let command_sender = {
128 let (sender, _receiver) = mpsc::channel(1);
130
131 Arc::new(Mutex::new(sender))
132 };
133
134 loop {
135 match _connect().await {
136 Ok(stream) => {
137 is_connected.store(true, atomic::Ordering::Relaxed);
138
139 let (mut reader, mut writer) = stream.into_split();
140
141 let _ = send(
142 &mut writer,
143 Message::Connected {
144 at: SystemTime::now(),
145 name: metadata.name.to_owned(),
146 version: version.clone(),
147 can_time_travel: metadata.can_time_travel,
148 theme: metadata.theme,
149 },
150 )
151 .await;
152
153 {
154 let command_sender = command_sender.clone();
155
156 drop(task::spawn(async move {
157 let mut buffer = Vec::new();
158
159 loop {
160 match receive(&mut reader, &mut buffer).await {
161 Ok(command) => {
162 match command {
163 Command::RewindTo { .. }
164 | Command::GoLive
165 if !metadata.can_time_travel =>
166 {
167 continue;
168 }
169 _ => {}
170 }
171
172 let sender = command_sender.lock().await;
173 let _ = sender.send(command).await;
174 }
175 Err(Error::DecodingFailed(_)) => {}
176 Err(Error::IOFailed(_)) => break,
177 }
178 }
179 }))
180 };
181
182 while let Some(action) = receiver.recv().await {
183 match action {
184 Action::Send(message) => {
185 if let Message::EventLogged {
186 event: Event::ThemeChanged(palette),
187 ..
188 } = message
189 {
190 metadata.theme = Some(palette);
191 }
192
193 match send(&mut writer, message).await {
194 Ok(()) => {}
195 Err(error) => {
196 if error.kind() != io::ErrorKind::BrokenPipe
197 {
198 log::warn!(
199 "Error sending message to server: {error}"
200 );
201 }
202
203 is_connected.store(
204 false,
205 atomic::Ordering::Relaxed,
206 );
207 break;
208 }
209 }
210 }
211 Action::Forward(sender) => {
212 *command_sender.lock().await = sender;
213 }
214 }
215 }
216 }
217 Err(_) => {
218 is_connected.store(false, atomic::Ordering::Relaxed);
219 time::sleep(time::Duration::from_secs(2)).await;
220 }
221 }
222 }
223}
224
225async fn _connect() -> Result<net::TcpStream, io::Error> {
226 log::debug!("Attempting to connect to server...");
227 let stream = net::TcpStream::connect(SERVER_ADDRESS).await?;
228
229 stream.set_nodelay(true)?;
230 stream.writable().await?;
231
232 Ok(stream)
233}
234
235async fn send(
236 stream: &mut net::tcp::OwnedWriteHalf,
237 message: Message,
238) -> Result<(), io::Error> {
239 let bytes = bincode::serialize(&message).expect("Encode input message");
240 let size = bytes.len() as u64;
241
242 stream.write_all(&size.to_be_bytes()).await?;
243 stream.write_all(&bytes).await?;
244 stream.flush().await?;
245
246 Ok(())
247}
248
249async fn receive(
250 stream: &mut net::tcp::OwnedReadHalf,
251 buffer: &mut Vec<u8>,
252) -> Result<Command, Error> {
253 let size = stream.read_u64().await? as usize;
254
255 if buffer.len() < size {
256 buffer.resize(size, 0);
257 }
258
259 let _n = stream.read_exact(&mut buffer[..size]).await?;
260
261 Ok(bincode::deserialize(buffer)?)
262}