1pub use iced_core as core;
2pub use semver::Version;
3
4pub mod client;
5pub mod span;
6
7mod error;
8mod stream;
9
10pub use client::Client;
11pub use span::Span;
12
13use crate::core::theme;
14use crate::core::time::{Duration, SystemTime};
15use crate::error::Error;
16use crate::span::present;
17
18use futures::{SinkExt, Stream};
19use tokio::io::{self, AsyncReadExt, AsyncWriteExt};
20use tokio::net;
21use tokio::sync::mpsc;
22use tokio::task;
23
24#[derive(Debug, Clone)]
25pub struct Connection {
26 commands: mpsc::Sender<client::Command>,
27}
28
29impl Connection {
30 pub fn rewind_to<'a>(
31 &self,
32 message: usize,
33 ) -> impl Future<Output = ()> + 'a {
34 let commands = self.commands.clone();
35
36 async move {
37 let _ = commands.send(client::Command::RewindTo { message }).await;
38 }
39 }
40
41 pub fn go_live<'a>(&self) -> impl Future<Output = ()> + 'a {
42 let commands = self.commands.clone();
43
44 async move {
45 let _ = commands.send(client::Command::GoLive).await;
46 }
47 }
48}
49
50#[derive(Debug, Clone)]
51pub enum Event {
52 Connected {
53 connection: Connection,
54 at: SystemTime,
55 name: String,
56 version: Version,
57 theme: Option<theme::Palette>,
58 can_time_travel: bool,
59 },
60 Disconnected {
61 at: SystemTime,
62 },
63 ThemeChanged {
64 at: SystemTime,
65 palette: theme::Palette,
66 },
67 SpanFinished {
68 at: SystemTime,
69 duration: Duration,
70 span: Span,
71 },
72 QuitRequested {
73 at: SystemTime,
74 },
75 AlreadyRunning {
76 at: SystemTime,
77 },
78}
79
80impl Event {
81 pub fn at(&self) -> SystemTime {
82 match self {
83 Self::Connected { at, .. }
84 | Self::Disconnected { at, .. }
85 | Self::ThemeChanged { at, .. }
86 | Self::SpanFinished { at, .. }
87 | Self::QuitRequested { at }
88 | Self::AlreadyRunning { at } => *at,
89 }
90 }
91}
92
93pub fn is_running() -> bool {
94 std::net::TcpListener::bind(client::SERVER_ADDRESS).is_err()
95}
96
97pub fn run() -> impl Stream<Item = Event> {
98 stream::channel(|mut output| async move {
99 let mut buffer = Vec::new();
100
101 let server = loop {
102 match net::TcpListener::bind(client::SERVER_ADDRESS).await {
103 Ok(server) => break server,
104 Err(error) => {
105 if error.kind() == io::ErrorKind::AddrInUse {
106 let _ = output
107 .send(Event::AlreadyRunning {
108 at: SystemTime::now(),
109 })
110 .await;
111 }
112 delay().await;
113 }
114 };
115 };
116
117 loop {
118 let Ok((stream, _)) = server.accept().await else {
119 continue;
120 };
121
122 let (mut reader, mut writer) = {
123 let _ = stream.set_nodelay(true);
124 stream.into_split()
125 };
126
127 let (command_sender, mut command_receiver) = mpsc::channel(1);
128 let mut last_message = String::new();
129 let mut last_update_number = 0;
130 let mut last_tasks = 0;
131 let mut last_subscriptions = 0;
132 let mut last_present_layers = 0;
133 let mut last_prepare = present::Stage::default();
134 let mut last_render = present::Stage::default();
135
136 drop(task::spawn(async move {
137 let mut last_message_number = None;
138
139 while let Some(command) = command_receiver.recv().await {
140 match command {
141 client::Command::RewindTo { message } => {
142 if Some(message) == last_message_number {
143 continue;
144 }
145
146 last_message_number = Some(message);
147 }
148 client::Command::GoLive => {
149 last_message_number = None;
150 }
151 }
152
153 let _ =
154 send(&mut writer, command).await.inspect_err(|error| {
155 log::error!("Error when sending command: {error}")
156 });
157 }
158 }));
159
160 loop {
161 match receive(&mut reader, &mut buffer).await {
162 Ok(message) => {
163 match message {
164 client::Message::Connected {
165 at,
166 name,
167 version,
168 theme,
169 can_time_travel,
170 } => {
171 let _ = output
172 .send(Event::Connected {
173 connection: Connection {
174 commands: command_sender.clone(),
175 },
176 at,
177 name,
178 version,
179 theme,
180 can_time_travel,
181 })
182 .await;
183 }
184 client::Message::EventLogged { at, event } => {
185 match event {
186 client::Event::ThemeChanged(palette) => {
187 let _ = output
188 .send(Event::ThemeChanged {
189 at,
190 palette,
191 })
192 .await;
193 }
194 client::Event::SubscriptionsTracked(
195 amount_alive,
196 ) => {
197 last_subscriptions = amount_alive;
198 }
199 client::Event::MessageLogged {
200 number,
201 message,
202 } => {
203 last_update_number = number;
204 last_message = message;
205 }
206 client::Event::CommandsSpawned(
207 commands,
208 ) => {
209 last_tasks = commands;
210 }
211 client::Event::LayersRendered(layers) => {
212 last_present_layers = layers;
213 }
214 client::Event::SpanStarted(
215 span::Stage::Update,
216 ) => {
217 last_message.clear();
218 last_tasks = 0;
219 }
220 client::Event::SpanStarted(_) => {}
221 client::Event::SpanFinished(
222 stage,
223 duration,
224 ) => {
225 let span = match stage {
226 span::Stage::Boot => Span::Boot,
227 span::Stage::Update => {
228 Span::Update {
229 number: last_update_number,
230 message: last_message
231 .clone(),
232 tasks: last_tasks,
233 subscriptions:
234 last_subscriptions,
235 }
236 }
237 span::Stage::View(window) => {
238 Span::View { window }
239 }
240 span::Stage::Layout(window) => {
241 Span::Layout { window }
242 }
243 span::Stage::Interact(window) => {
244 Span::Interact { window }
245 }
246 span::Stage::Draw(window) => {
247 Span::Draw { window }
248 }
249 span::Stage::Prepare(primitive)
250 | span::Stage::Render(primitive) => {
251 let stage = if matches!(
252 stage,
253 span::Stage::Prepare(_),
254 ) {
255 &mut last_prepare
256 } else {
257 &mut last_render
258 };
259
260 let primitive = match primitive {
261 present::Primitive::Quad => &mut stage.quads,
262 present::Primitive::Triangle => &mut stage.triangles,
263 present::Primitive::Shader => &mut stage.shaders,
264 present::Primitive::Text => &mut stage.text,
265 present::Primitive::Image => &mut stage.images,
266 };
267
268 *primitive += duration;
269
270 continue;
271 }
272 span::Stage::Present(window) => {
273 let span = Span::Present {
274 window,
275 prepare: last_prepare,
276 render: last_render,
277 layers: last_present_layers,
278 };
279
280 last_prepare =
281 present::Stage::default();
282 last_render =
283 present::Stage::default();
284 last_present_layers = 0;
285
286 span
287 }
288 span::Stage::Custom(name) => {
289 Span::Custom { name }
290 }
291 };
292
293 let _ = output
294 .send(Event::SpanFinished {
295 at,
296 duration,
297 span,
298 })
299 .await;
300 }
301 }
302 }
303 client::Message::Quit { at } => {
304 let _ = output
305 .send(Event::QuitRequested { at })
306 .await;
307 }
308 };
309 }
310 Err(Error::IOFailed(_)) => {
311 let _ = output
312 .send(Event::Disconnected {
313 at: SystemTime::now(),
314 })
315 .await;
316 break;
317 }
318 Err(Error::DecodingFailed(error)) => {
319 log::warn!("Error decoding beacon output: {error}")
320 }
321 }
322 }
323 }
324 })
325}
326
327async fn receive(
328 stream: &mut net::tcp::OwnedReadHalf,
329 buffer: &mut Vec<u8>,
330) -> Result<client::Message, Error> {
331 let size = stream.read_u64().await? as usize;
332
333 if buffer.len() < size {
334 buffer.resize(size, 0);
335 }
336
337 let _n = stream.read_exact(&mut buffer[..size]).await?;
338
339 Ok(bincode::deserialize(buffer)?)
340}
341
342async fn send(
343 stream: &mut net::tcp::OwnedWriteHalf,
344 command: client::Command,
345) -> Result<(), io::Error> {
346 let bytes = bincode::serialize(&command).expect("Encode input message");
347 let size = bytes.len() as u64;
348
349 stream.write_all(&size.to_be_bytes()).await?;
350 stream.write_all(&bytes).await?;
351 stream.flush().await?;
352
353 Ok(())
354}
355
356async fn delay() {
357 tokio::time::sleep(Duration::from_secs(2)).await;
358}