iced_beacon/
lib.rs

1pub use iced_core as core;
2pub use semver::Version;
3
4pub mod client;
5pub mod span;
6
7mod error;
8mod stream;
9
10pub use client::Client;
11pub use span::Span;
12
13use crate::core::theme;
14use crate::core::time::{Duration, SystemTime};
15use crate::error::Error;
16use crate::span::present;
17
18use futures::{SinkExt, Stream};
19use tokio::io::{self, AsyncReadExt, AsyncWriteExt};
20use tokio::net;
21use tokio::sync::mpsc;
22use tokio::task;
23
24#[derive(Debug, Clone)]
25pub struct Connection {
26    commands: mpsc::Sender<client::Command>,
27}
28
29impl Connection {
30    pub fn rewind_to<'a>(&self, message: usize) -> impl Future<Output = ()> + 'a {
31        let commands = self.commands.clone();
32
33        async move {
34            let _ = commands.send(client::Command::RewindTo { message }).await;
35        }
36    }
37
38    pub fn go_live<'a>(&self) -> impl Future<Output = ()> + 'a {
39        let commands = self.commands.clone();
40
41        async move {
42            let _ = commands.send(client::Command::GoLive).await;
43        }
44    }
45}
46
47#[derive(Debug, Clone)]
48pub enum Event {
49    Connected {
50        connection: Connection,
51        at: SystemTime,
52        name: String,
53        version: Version,
54        theme: Option<theme::Palette>,
55        can_time_travel: bool,
56    },
57    Disconnected {
58        at: SystemTime,
59    },
60    ThemeChanged {
61        at: SystemTime,
62        palette: theme::Palette,
63    },
64    SpanFinished {
65        at: SystemTime,
66        duration: Duration,
67        span: Span,
68    },
69    QuitRequested {
70        at: SystemTime,
71    },
72    AlreadyRunning {
73        at: SystemTime,
74    },
75}
76
77impl Event {
78    pub fn at(&self) -> SystemTime {
79        match self {
80            Self::Connected { at, .. }
81            | Self::Disconnected { at, .. }
82            | Self::ThemeChanged { at, .. }
83            | Self::SpanFinished { at, .. }
84            | Self::QuitRequested { at }
85            | Self::AlreadyRunning { at } => *at,
86        }
87    }
88}
89
90pub fn is_running() -> bool {
91    std::net::TcpListener::bind(client::server_address_from_env()).is_err()
92}
93
94pub fn run() -> impl Stream<Item = Event> {
95    stream::channel(|mut output| async move {
96        let mut buffer = Vec::new();
97
98        let server = loop {
99            match net::TcpListener::bind(client::server_address_from_env()).await {
100                Ok(server) => break server,
101                Err(error) => {
102                    if error.kind() == io::ErrorKind::AddrInUse {
103                        let _ = output
104                            .send(Event::AlreadyRunning {
105                                at: SystemTime::now(),
106                            })
107                            .await;
108                    }
109                    delay().await;
110                }
111            };
112        };
113
114        loop {
115            let Ok((stream, _)) = server.accept().await else {
116                continue;
117            };
118
119            let (mut reader, mut writer) = {
120                let _ = stream.set_nodelay(true);
121                stream.into_split()
122            };
123
124            let (command_sender, mut command_receiver) = mpsc::channel(1);
125            let mut last_message = String::new();
126            let mut last_update_number = 0;
127            let mut last_tasks = 0;
128            let mut last_subscriptions = 0;
129            let mut last_present_layers = 0;
130            let mut last_prepare = present::Stage::default();
131            let mut last_render = present::Stage::default();
132
133            drop(task::spawn(async move {
134                let mut last_message_number = None;
135
136                while let Some(command) = command_receiver.recv().await {
137                    match command {
138                        client::Command::RewindTo { message } => {
139                            if Some(message) == last_message_number {
140                                continue;
141                            }
142
143                            last_message_number = Some(message);
144                        }
145                        client::Command::GoLive => {
146                            last_message_number = None;
147                        }
148                    }
149
150                    let _ = send(&mut writer, command)
151                        .await
152                        .inspect_err(|error| log::error!("Error when sending command: {error}"));
153                }
154            }));
155
156            loop {
157                match receive(&mut reader, &mut buffer).await {
158                    Ok(message) => {
159                        match message {
160                            client::Message::Connected {
161                                at,
162                                name,
163                                version,
164                                theme,
165                                can_time_travel,
166                            } => {
167                                let _ = output
168                                    .send(Event::Connected {
169                                        connection: Connection {
170                                            commands: command_sender.clone(),
171                                        },
172                                        at,
173                                        name,
174                                        version,
175                                        theme,
176                                        can_time_travel,
177                                    })
178                                    .await;
179                            }
180                            client::Message::EventLogged { at, event } => match event {
181                                client::Event::ThemeChanged(palette) => {
182                                    let _ = output.send(Event::ThemeChanged { at, palette }).await;
183                                }
184                                client::Event::SubscriptionsTracked(amount_alive) => {
185                                    last_subscriptions = amount_alive;
186                                }
187                                client::Event::MessageLogged { number, message } => {
188                                    last_update_number = number;
189                                    last_message = message;
190                                }
191                                client::Event::CommandsSpawned(commands) => {
192                                    last_tasks = commands;
193                                }
194                                client::Event::LayersRendered(layers) => {
195                                    last_present_layers = layers;
196                                }
197                                client::Event::SpanStarted(span::Stage::Update) => {
198                                    last_message.clear();
199                                    last_tasks = 0;
200                                }
201                                client::Event::SpanStarted(_) => {}
202                                client::Event::SpanFinished(stage, duration) => {
203                                    let span = match stage {
204                                        span::Stage::Boot => Span::Boot,
205                                        span::Stage::Update => Span::Update {
206                                            number: last_update_number,
207                                            message: last_message.clone(),
208                                            tasks: last_tasks,
209                                            subscriptions: last_subscriptions,
210                                        },
211                                        span::Stage::View(window) => Span::View { window },
212                                        span::Stage::Layout(window) => Span::Layout { window },
213                                        span::Stage::Interact(window) => Span::Interact { window },
214                                        span::Stage::Draw(window) => Span::Draw { window },
215                                        span::Stage::Prepare(primitive)
216                                        | span::Stage::Render(primitive) => {
217                                            let stage = if matches!(stage, span::Stage::Prepare(_),)
218                                            {
219                                                &mut last_prepare
220                                            } else {
221                                                &mut last_render
222                                            };
223
224                                            let primitive = match primitive {
225                                                present::Primitive::Quad => &mut stage.quads,
226                                                present::Primitive::Triangle => {
227                                                    &mut stage.triangles
228                                                }
229                                                present::Primitive::Shader => &mut stage.shaders,
230                                                present::Primitive::Text => &mut stage.text,
231                                                present::Primitive::Image => &mut stage.images,
232                                            };
233
234                                            *primitive += duration;
235
236                                            continue;
237                                        }
238                                        span::Stage::Present(window) => {
239                                            let span = Span::Present {
240                                                window,
241                                                prepare: last_prepare,
242                                                render: last_render,
243                                                layers: last_present_layers,
244                                            };
245
246                                            last_prepare = present::Stage::default();
247                                            last_render = present::Stage::default();
248                                            last_present_layers = 0;
249
250                                            span
251                                        }
252                                        span::Stage::Custom(name) => Span::Custom { name },
253                                    };
254
255                                    let _ = output
256                                        .send(Event::SpanFinished { at, duration, span })
257                                        .await;
258                                }
259                            },
260                            client::Message::Quit { at } => {
261                                let _ = output.send(Event::QuitRequested { at }).await;
262                            }
263                        };
264                    }
265                    Err(Error::IOFailed(_)) => {
266                        let _ = output
267                            .send(Event::Disconnected {
268                                at: SystemTime::now(),
269                            })
270                            .await;
271                        break;
272                    }
273                    Err(Error::DecodingFailed(error)) => {
274                        log::warn!("Error decoding beacon output: {error}")
275                    }
276                }
277            }
278        }
279    })
280}
281
282async fn receive(
283    stream: &mut net::tcp::OwnedReadHalf,
284    buffer: &mut Vec<u8>,
285) -> Result<client::Message, Error> {
286    let size = stream.read_u64().await? as usize;
287
288    if buffer.len() < size {
289        buffer.resize(size, 0);
290    }
291
292    let _n = stream.read_exact(&mut buffer[..size]).await?;
293
294    Ok(bincode::deserialize(buffer)?)
295}
296
297async fn send(
298    stream: &mut net::tcp::OwnedWriteHalf,
299    command: client::Command,
300) -> Result<(), io::Error> {
301    let bytes = bincode::serialize(&command).expect("Encode input message");
302    let size = bytes.len() as u64;
303
304    stream.write_all(&size.to_be_bytes()).await?;
305    stream.write_all(&bytes).await?;
306    stream.flush().await?;
307
308    Ok(())
309}
310
311async fn delay() {
312    tokio::time::sleep(Duration::from_secs(2)).await;
313}