iced_beacon/
client.rs

1use crate::Error;
2use crate::core::time::{Duration, SystemTime};
3use crate::span;
4use crate::theme;
5
6use semver::Version;
7use serde::{Deserialize, Serialize};
8use tokio::io::{self, AsyncReadExt, AsyncWriteExt};
9use tokio::net;
10use tokio::sync::{Mutex, mpsc};
11use tokio::task;
12use tokio::time;
13
14use std::sync::Arc;
15use std::sync::atomic::{self, AtomicBool};
16use std::thread;
17
18pub const SERVER_ADDRESS: &str = "127.0.0.1:9167";
19
20#[derive(Debug, Clone)]
21pub struct Client {
22    sender: mpsc::Sender<Action>,
23    is_connected: Arc<AtomicBool>,
24    _handle: Arc<thread::JoinHandle<()>>,
25}
26
27#[derive(Debug, Clone, Serialize, Deserialize)]
28pub enum Message {
29    Connected {
30        at: SystemTime,
31        name: String,
32        version: Version,
33        theme: Option<theme::Palette>,
34        can_time_travel: bool,
35    },
36    EventLogged {
37        at: SystemTime,
38        event: Event,
39    },
40    Quit {
41        at: SystemTime,
42    },
43}
44
45#[derive(Debug, Clone, Serialize, Deserialize)]
46pub enum Event {
47    ThemeChanged(theme::Palette),
48    SpanStarted(span::Stage),
49    SpanFinished(span::Stage, Duration),
50    MessageLogged { number: usize, message: String },
51    CommandsSpawned(usize),
52    SubscriptionsTracked(usize),
53    LayersRendered(usize),
54}
55
56impl Client {
57    pub fn log(&self, event: Event) {
58        let _ = self.sender.try_send(Action::Send(Message::EventLogged {
59            at: SystemTime::now(),
60            event,
61        }));
62    }
63
64    pub fn is_connected(&self) -> bool {
65        self.is_connected.load(atomic::Ordering::Relaxed)
66    }
67
68    pub fn quit(&self) {
69        let _ = self.sender.try_send(Action::Send(Message::Quit {
70            at: SystemTime::now(),
71        }));
72    }
73
74    pub fn subscribe(&self) -> mpsc::Receiver<Command> {
75        let (sender, receiver) = mpsc::channel(100);
76        let _ = self.sender.try_send(Action::Forward(sender));
77
78        receiver
79    }
80}
81
82#[derive(Debug, Clone, Default)]
83pub struct Metadata {
84    pub name: &'static str,
85    pub theme: Option<theme::Palette>,
86    pub can_time_travel: bool,
87}
88
89#[must_use]
90pub fn connect(metadata: Metadata) -> Client {
91    let (sender, receiver) = mpsc::channel(10_000);
92    let is_connected = Arc::new(AtomicBool::new(false));
93
94    let handle = {
95        let is_connected = is_connected.clone();
96
97        std::thread::spawn(move || run(metadata, is_connected, receiver))
98    };
99
100    Client {
101        sender,
102        is_connected,
103        _handle: Arc::new(handle),
104    }
105}
106
107enum Action {
108    Send(Message),
109    Forward(mpsc::Sender<Command>),
110}
111
112#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
113pub enum Command {
114    RewindTo { message: usize },
115    GoLive,
116}
117
118#[tokio::main]
119async fn run(
120    mut metadata: Metadata,
121    is_connected: Arc<AtomicBool>,
122    mut receiver: mpsc::Receiver<Action>,
123) {
124    let version = semver::Version::parse(env!("CARGO_PKG_VERSION"))
125        .expect("Parse package version");
126
127    let command_sender = {
128        // Discard by default
129        let (sender, _receiver) = mpsc::channel(1);
130
131        Arc::new(Mutex::new(sender))
132    };
133
134    loop {
135        match _connect().await {
136            Ok(stream) => {
137                is_connected.store(true, atomic::Ordering::Relaxed);
138
139                let (mut reader, mut writer) = stream.into_split();
140
141                let _ = send(
142                    &mut writer,
143                    Message::Connected {
144                        at: SystemTime::now(),
145                        name: metadata.name.to_owned(),
146                        version: version.clone(),
147                        can_time_travel: metadata.can_time_travel,
148                        theme: metadata.theme,
149                    },
150                )
151                .await;
152
153                {
154                    let command_sender = command_sender.clone();
155
156                    drop(task::spawn(async move {
157                        let mut buffer = Vec::new();
158
159                        loop {
160                            match receive(&mut reader, &mut buffer).await {
161                                Ok(command) => {
162                                    match command {
163                                        Command::RewindTo { .. }
164                                        | Command::GoLive
165                                            if !metadata.can_time_travel =>
166                                        {
167                                            continue;
168                                        }
169                                        _ => {}
170                                    }
171
172                                    let sender = command_sender.lock().await;
173                                    let _ = sender.send(command).await;
174                                }
175                                Err(Error::DecodingFailed(_)) => {}
176                                Err(Error::IOFailed(_)) => break,
177                            }
178                        }
179                    }))
180                };
181
182                while let Some(action) = receiver.recv().await {
183                    match action {
184                        Action::Send(message) => {
185                            if let Message::EventLogged {
186                                event: Event::ThemeChanged(palette),
187                                ..
188                            } = message
189                            {
190                                metadata.theme = Some(palette);
191                            }
192
193                            match send(&mut writer, message).await {
194                                Ok(()) => {}
195                                Err(error) => {
196                                    if error.kind() != io::ErrorKind::BrokenPipe
197                                    {
198                                        log::warn!(
199                                            "Error sending message to server: {error}"
200                                        );
201                                    }
202
203                                    is_connected.store(
204                                        false,
205                                        atomic::Ordering::Relaxed,
206                                    );
207                                    break;
208                                }
209                            }
210                        }
211                        Action::Forward(sender) => {
212                            *command_sender.lock().await = sender;
213                        }
214                    }
215                }
216            }
217            Err(_) => {
218                is_connected.store(false, atomic::Ordering::Relaxed);
219                time::sleep(time::Duration::from_secs(2)).await;
220            }
221        }
222    }
223}
224
225async fn _connect() -> Result<net::TcpStream, io::Error> {
226    log::debug!("Attempting to connect to server...");
227    let stream = net::TcpStream::connect(SERVER_ADDRESS).await?;
228
229    stream.set_nodelay(true)?;
230    stream.writable().await?;
231
232    Ok(stream)
233}
234
235async fn send(
236    stream: &mut net::tcp::OwnedWriteHalf,
237    message: Message,
238) -> Result<(), io::Error> {
239    let bytes = bincode::serialize(&message).expect("Encode input message");
240    let size = bytes.len() as u64;
241
242    stream.write_all(&size.to_be_bytes()).await?;
243    stream.write_all(&bytes).await?;
244    stream.flush().await?;
245
246    Ok(())
247}
248
249async fn receive(
250    stream: &mut net::tcp::OwnedReadHalf,
251    buffer: &mut Vec<u8>,
252) -> Result<Command, Error> {
253    let size = stream.read_u64().await? as usize;
254
255    if buffer.len() < size {
256        buffer.resize(size, 0);
257    }
258
259    let _n = stream.read_exact(&mut buffer[..size]).await?;
260
261    Ok(bincode::deserialize(buffer)?)
262}