iced_beacon/
lib.rs

1pub use iced_core as core;
2pub use semver::Version;
3
4pub mod client;
5pub mod span;
6
7mod error;
8mod stream;
9
10pub use client::Client;
11pub use span::Span;
12
13use crate::core::theme;
14use crate::core::time::{Duration, SystemTime};
15use crate::error::Error;
16use crate::span::present;
17
18use futures::{SinkExt, Stream};
19use tokio::io::{self, AsyncReadExt, AsyncWriteExt};
20use tokio::net;
21use tokio::sync::mpsc;
22use tokio::task;
23
24#[derive(Debug, Clone)]
25pub struct Connection {
26    commands: mpsc::Sender<client::Command>,
27}
28
29impl Connection {
30    pub fn rewind_to<'a>(
31        &self,
32        message: usize,
33    ) -> impl Future<Output = ()> + 'a {
34        let commands = self.commands.clone();
35
36        async move {
37            let _ = commands.send(client::Command::RewindTo { message }).await;
38        }
39    }
40
41    pub fn go_live<'a>(&self) -> impl Future<Output = ()> + 'a {
42        let commands = self.commands.clone();
43
44        async move {
45            let _ = commands.send(client::Command::GoLive).await;
46        }
47    }
48}
49
50#[derive(Debug, Clone)]
51pub enum Event {
52    Connected {
53        connection: Connection,
54        at: SystemTime,
55        name: String,
56        version: Version,
57        theme: Option<theme::Palette>,
58        can_time_travel: bool,
59    },
60    Disconnected {
61        at: SystemTime,
62    },
63    ThemeChanged {
64        at: SystemTime,
65        palette: theme::Palette,
66    },
67    SpanFinished {
68        at: SystemTime,
69        duration: Duration,
70        span: Span,
71    },
72    QuitRequested {
73        at: SystemTime,
74    },
75    AlreadyRunning {
76        at: SystemTime,
77    },
78}
79
80impl Event {
81    pub fn at(&self) -> SystemTime {
82        match self {
83            Self::Connected { at, .. }
84            | Self::Disconnected { at, .. }
85            | Self::ThemeChanged { at, .. }
86            | Self::SpanFinished { at, .. }
87            | Self::QuitRequested { at }
88            | Self::AlreadyRunning { at } => *at,
89        }
90    }
91}
92
93pub fn is_running() -> bool {
94    std::net::TcpListener::bind(client::SERVER_ADDRESS).is_err()
95}
96
97pub fn run() -> impl Stream<Item = Event> {
98    stream::channel(|mut output| async move {
99        let mut buffer = Vec::new();
100
101        let server = loop {
102            match net::TcpListener::bind(client::SERVER_ADDRESS).await {
103                Ok(server) => break server,
104                Err(error) => {
105                    if error.kind() == io::ErrorKind::AddrInUse {
106                        let _ = output
107                            .send(Event::AlreadyRunning {
108                                at: SystemTime::now(),
109                            })
110                            .await;
111                    }
112                    delay().await;
113                }
114            };
115        };
116
117        loop {
118            let Ok((stream, _)) = server.accept().await else {
119                continue;
120            };
121
122            let (mut reader, mut writer) = {
123                let _ = stream.set_nodelay(true);
124                stream.into_split()
125            };
126
127            let (command_sender, mut command_receiver) = mpsc::channel(1);
128            let mut last_message = String::new();
129            let mut last_update_number = 0;
130            let mut last_tasks = 0;
131            let mut last_subscriptions = 0;
132            let mut last_present_layers = 0;
133            let mut last_prepare = present::Stage::default();
134            let mut last_render = present::Stage::default();
135
136            drop(task::spawn(async move {
137                let mut last_message_number = None;
138
139                while let Some(command) = command_receiver.recv().await {
140                    match command {
141                        client::Command::RewindTo { message } => {
142                            if Some(message) == last_message_number {
143                                continue;
144                            }
145
146                            last_message_number = Some(message);
147                        }
148                        client::Command::GoLive => {
149                            last_message_number = None;
150                        }
151                    }
152
153                    let _ =
154                        send(&mut writer, command).await.inspect_err(|error| {
155                            log::error!("Error when sending command: {error}")
156                        });
157                }
158            }));
159
160            loop {
161                match receive(&mut reader, &mut buffer).await {
162                    Ok(message) => {
163                        match message {
164                            client::Message::Connected {
165                                at,
166                                name,
167                                version,
168                                theme,
169                                can_time_travel,
170                            } => {
171                                let _ = output
172                                    .send(Event::Connected {
173                                        connection: Connection {
174                                            commands: command_sender.clone(),
175                                        },
176                                        at,
177                                        name,
178                                        version,
179                                        theme,
180                                        can_time_travel,
181                                    })
182                                    .await;
183                            }
184                            client::Message::EventLogged { at, event } => {
185                                match event {
186                                    client::Event::ThemeChanged(palette) => {
187                                        let _ = output
188                                            .send(Event::ThemeChanged {
189                                                at,
190                                                palette,
191                                            })
192                                            .await;
193                                    }
194                                    client::Event::SubscriptionsTracked(
195                                        amount_alive,
196                                    ) => {
197                                        last_subscriptions = amount_alive;
198                                    }
199                                    client::Event::MessageLogged {
200                                        number,
201                                        message,
202                                    } => {
203                                        last_update_number = number;
204                                        last_message = message;
205                                    }
206                                    client::Event::CommandsSpawned(
207                                        commands,
208                                    ) => {
209                                        last_tasks = commands;
210                                    }
211                                    client::Event::LayersRendered(layers) => {
212                                        last_present_layers = layers;
213                                    }
214                                    client::Event::SpanStarted(
215                                        span::Stage::Update,
216                                    ) => {
217                                        last_message.clear();
218                                        last_tasks = 0;
219                                    }
220                                    client::Event::SpanStarted(_) => {}
221                                    client::Event::SpanFinished(
222                                        stage,
223                                        duration,
224                                    ) => {
225                                        let span = match stage {
226                                            span::Stage::Boot => Span::Boot,
227                                            span::Stage::Update => {
228                                                Span::Update {
229                                                    number: last_update_number,
230                                                    message: last_message
231                                                        .clone(),
232                                                    tasks: last_tasks,
233                                                    subscriptions:
234                                                        last_subscriptions,
235                                                }
236                                            }
237                                            span::Stage::View(window) => {
238                                                Span::View { window }
239                                            }
240                                            span::Stage::Layout(window) => {
241                                                Span::Layout { window }
242                                            }
243                                            span::Stage::Interact(window) => {
244                                                Span::Interact { window }
245                                            }
246                                            span::Stage::Draw(window) => {
247                                                Span::Draw { window }
248                                            }
249                                            span::Stage::Prepare(primitive)
250                                            | span::Stage::Render(primitive) => {
251                                                let stage = if matches!(
252                                                    stage,
253                                                    span::Stage::Prepare(_),
254                                                ) {
255                                                    &mut last_prepare
256                                                } else {
257                                                    &mut last_render
258                                                };
259
260                                                let primitive = match primitive {
261                                                    present::Primitive::Quad => &mut stage.quads,
262                                                    present::Primitive::Triangle => &mut stage.triangles,
263                                                    present::Primitive::Shader => &mut stage.shaders,
264                                                    present::Primitive::Text => &mut stage.text,
265                                                    present::Primitive::Image => &mut stage.images,
266                                                };
267
268                                                *primitive += duration;
269
270                                                continue;
271                                            }
272                                            span::Stage::Present(window) => {
273                                                let span = Span::Present {
274                                                    window,
275                                                    prepare: last_prepare,
276                                                    render: last_render,
277                                                    layers: last_present_layers,
278                                                };
279
280                                                last_prepare =
281                                                    present::Stage::default();
282                                                last_render =
283                                                    present::Stage::default();
284                                                last_present_layers = 0;
285
286                                                span
287                                            }
288                                            span::Stage::Custom(name) => {
289                                                Span::Custom { name }
290                                            }
291                                        };
292
293                                        let _ = output
294                                            .send(Event::SpanFinished {
295                                                at,
296                                                duration,
297                                                span,
298                                            })
299                                            .await;
300                                    }
301                                }
302                            }
303                            client::Message::Quit { at } => {
304                                let _ = output
305                                    .send(Event::QuitRequested { at })
306                                    .await;
307                            }
308                        };
309                    }
310                    Err(Error::IOFailed(_)) => {
311                        let _ = output
312                            .send(Event::Disconnected {
313                                at: SystemTime::now(),
314                            })
315                            .await;
316                        break;
317                    }
318                    Err(Error::DecodingFailed(error)) => {
319                        log::warn!("Error decoding beacon output: {error}")
320                    }
321                }
322            }
323        }
324    })
325}
326
327async fn receive(
328    stream: &mut net::tcp::OwnedReadHalf,
329    buffer: &mut Vec<u8>,
330) -> Result<client::Message, Error> {
331    let size = stream.read_u64().await? as usize;
332
333    if buffer.len() < size {
334        buffer.resize(size, 0);
335    }
336
337    let _n = stream.read_exact(&mut buffer[..size]).await?;
338
339    Ok(bincode::deserialize(buffer)?)
340}
341
342async fn send(
343    stream: &mut net::tcp::OwnedWriteHalf,
344    command: client::Command,
345) -> Result<(), io::Error> {
346    let bytes = bincode::serialize(&command).expect("Encode input message");
347    let size = bytes.len() as u64;
348
349    stream.write_all(&size.to_be_bytes()).await?;
350    stream.write_all(&bytes).await?;
351    stream.flush().await?;
352
353    Ok(())
354}
355
356async fn delay() {
357    tokio::time::sleep(Duration::from_secs(2)).await;
358}