iced_beacon/
client.rs

1use crate::Error;
2use crate::core::time::{Duration, SystemTime};
3use crate::span;
4use crate::theme;
5
6use semver::Version;
7use serde::{Deserialize, Serialize};
8use tokio::io::{self, AsyncReadExt, AsyncWriteExt};
9use tokio::net;
10use tokio::sync::{Mutex, mpsc};
11use tokio::task;
12use tokio::time;
13
14use std::sync::Arc;
15use std::sync::atomic::{self, AtomicBool};
16use std::thread;
17
18#[derive(Debug, Clone)]
19pub struct Client {
20    sender: mpsc::Sender<Action>,
21    is_connected: Arc<AtomicBool>,
22    _handle: Arc<thread::JoinHandle<()>>,
23}
24
25#[derive(Debug, Clone, Serialize, Deserialize)]
26pub enum Message {
27    Connected {
28        at: SystemTime,
29        name: String,
30        version: Version,
31        theme: Option<theme::Palette>,
32        can_time_travel: bool,
33    },
34    EventLogged {
35        at: SystemTime,
36        event: Event,
37    },
38    Quit {
39        at: SystemTime,
40    },
41}
42
43#[derive(Debug, Clone, Serialize, Deserialize)]
44pub enum Event {
45    ThemeChanged(theme::Palette),
46    SpanStarted(span::Stage),
47    SpanFinished(span::Stage, Duration),
48    MessageLogged { number: usize, message: String },
49    CommandsSpawned(usize),
50    SubscriptionsTracked(usize),
51    LayersRendered(usize),
52}
53
54impl Client {
55    pub fn log(&self, event: Event) {
56        let _ = self.sender.try_send(Action::Send(Message::EventLogged {
57            at: SystemTime::now(),
58            event,
59        }));
60    }
61
62    pub fn is_connected(&self) -> bool {
63        self.is_connected.load(atomic::Ordering::Relaxed)
64    }
65
66    pub fn quit(&self) {
67        let _ = self.sender.try_send(Action::Send(Message::Quit {
68            at: SystemTime::now(),
69        }));
70    }
71
72    pub fn subscribe(&self) -> mpsc::Receiver<Command> {
73        let (sender, receiver) = mpsc::channel(100);
74        let _ = self.sender.try_send(Action::Forward(sender));
75
76        receiver
77    }
78}
79
80#[derive(Debug, Clone, Default)]
81pub struct Metadata {
82    pub name: &'static str,
83    pub theme: Option<theme::Palette>,
84    pub can_time_travel: bool,
85}
86
87#[must_use]
88pub fn connect(metadata: Metadata) -> Client {
89    let (sender, receiver) = mpsc::channel(10_000);
90    let is_connected = Arc::new(AtomicBool::new(false));
91
92    let handle = {
93        let is_connected = is_connected.clone();
94
95        std::thread::spawn(move || run(metadata, is_connected, receiver))
96    };
97
98    Client {
99        sender,
100        is_connected,
101        _handle: Arc::new(handle),
102    }
103}
104
105enum Action {
106    Send(Message),
107    Forward(mpsc::Sender<Command>),
108}
109
110#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
111pub enum Command {
112    RewindTo { message: usize },
113    GoLive,
114}
115
116#[tokio::main]
117async fn run(
118    mut metadata: Metadata,
119    is_connected: Arc<AtomicBool>,
120    mut receiver: mpsc::Receiver<Action>,
121) {
122    let version = semver::Version::parse(env!("CARGO_PKG_VERSION")).expect("Parse package version");
123
124    let command_sender = {
125        // Discard by default
126        let (sender, _receiver) = mpsc::channel(1);
127
128        Arc::new(Mutex::new(sender))
129    };
130
131    loop {
132        match _connect().await {
133            Ok(stream) => {
134                is_connected.store(true, atomic::Ordering::Relaxed);
135
136                let (mut reader, mut writer) = stream.into_split();
137
138                let _ = send(
139                    &mut writer,
140                    Message::Connected {
141                        at: SystemTime::now(),
142                        name: metadata.name.to_owned(),
143                        version: version.clone(),
144                        can_time_travel: metadata.can_time_travel,
145                        theme: metadata.theme,
146                    },
147                )
148                .await;
149
150                {
151                    let command_sender = command_sender.clone();
152
153                    drop(task::spawn(async move {
154                        let mut buffer = Vec::new();
155
156                        loop {
157                            match receive(&mut reader, &mut buffer).await {
158                                Ok(command) => {
159                                    match command {
160                                        Command::RewindTo { .. } | Command::GoLive
161                                            if !metadata.can_time_travel =>
162                                        {
163                                            continue;
164                                        }
165                                        _ => {}
166                                    }
167
168                                    let sender = command_sender.lock().await;
169                                    let _ = sender.send(command).await;
170                                }
171                                Err(Error::DecodingFailed(_)) => {}
172                                Err(Error::IOFailed(_)) => break,
173                            }
174                        }
175                    }))
176                };
177
178                while let Some(action) = receiver.recv().await {
179                    match action {
180                        Action::Send(message) => {
181                            if let Message::EventLogged {
182                                event: Event::ThemeChanged(palette),
183                                ..
184                            } = message
185                            {
186                                metadata.theme = Some(palette);
187                            }
188
189                            match send(&mut writer, message).await {
190                                Ok(()) => {}
191                                Err(error) => {
192                                    if error.kind() != io::ErrorKind::BrokenPipe {
193                                        log::warn!("Error sending message to server: {error}");
194                                    }
195
196                                    is_connected.store(false, atomic::Ordering::Relaxed);
197                                    break;
198                                }
199                            }
200                        }
201                        Action::Forward(sender) => {
202                            *command_sender.lock().await = sender;
203                        }
204                    }
205                }
206            }
207            Err(_) => {
208                is_connected.store(false, atomic::Ordering::Relaxed);
209                time::sleep(time::Duration::from_secs(2)).await;
210            }
211        }
212    }
213}
214
215/// Returns the address of the beacon server in this environment.
216///
217/// The value of the `ICED_BEACON_SERVER_ADDRESS` env variable will
218/// be returned, if defined.
219///
220/// Otherwise, a default local server address will be returned.
221pub fn server_address_from_env() -> String {
222    const DEFAULT_ADDRESS: &str = "127.0.0.1:9167";
223
224    std::env::var("ICED_BEACON_SERVER_ADDRESS").unwrap_or_else(|_| String::from(DEFAULT_ADDRESS))
225}
226
227async fn _connect() -> Result<net::TcpStream, io::Error> {
228    log::debug!("Attempting to connect to server...");
229    let stream = net::TcpStream::connect(server_address_from_env()).await?;
230
231    stream.set_nodelay(true)?;
232    stream.writable().await?;
233
234    Ok(stream)
235}
236
237async fn send(stream: &mut net::tcp::OwnedWriteHalf, message: Message) -> Result<(), io::Error> {
238    let bytes = bincode::serialize(&message).expect("Encode input message");
239    let size = bytes.len() as u64;
240
241    stream.write_all(&size.to_be_bytes()).await?;
242    stream.write_all(&bytes).await?;
243    stream.flush().await?;
244
245    Ok(())
246}
247
248async fn receive(
249    stream: &mut net::tcp::OwnedReadHalf,
250    buffer: &mut Vec<u8>,
251) -> Result<Command, Error> {
252    let size = stream.read_u64().await? as usize;
253
254    if buffer.len() < size {
255        buffer.resize(size, 0);
256    }
257
258    let _n = stream.read_exact(&mut buffer[..size]).await?;
259
260    Ok(bincode::deserialize(buffer)?)
261}