1pub use iced_core as core;
2pub use semver::Version;
3
4pub mod client;
5pub mod span;
6
7mod error;
8mod stream;
9
10pub use client::Client;
11pub use span::Span;
12
13use crate::core::theme;
14use crate::core::time::{Duration, SystemTime};
15use crate::error::Error;
16use crate::span::present;
17
18use futures::{SinkExt, Stream};
19use tokio::io::{self, AsyncReadExt, AsyncWriteExt};
20use tokio::net;
21use tokio::sync::mpsc;
22use tokio::task;
23
24#[derive(Debug, Clone)]
25pub struct Connection {
26 commands: mpsc::Sender<client::Command>,
27}
28
29impl Connection {
30 pub fn rewind_to<'a>(&self, message: usize) -> impl Future<Output = ()> + 'a {
31 let commands = self.commands.clone();
32
33 async move {
34 let _ = commands.send(client::Command::RewindTo { message }).await;
35 }
36 }
37
38 pub fn go_live<'a>(&self) -> impl Future<Output = ()> + 'a {
39 let commands = self.commands.clone();
40
41 async move {
42 let _ = commands.send(client::Command::GoLive).await;
43 }
44 }
45}
46
47#[derive(Debug, Clone)]
48pub enum Event {
49 Connected {
50 connection: Connection,
51 at: SystemTime,
52 name: String,
53 version: Version,
54 theme: Option<theme::Palette>,
55 can_time_travel: bool,
56 },
57 Disconnected {
58 at: SystemTime,
59 },
60 ThemeChanged {
61 at: SystemTime,
62 palette: theme::Palette,
63 },
64 SpanFinished {
65 at: SystemTime,
66 duration: Duration,
67 span: Span,
68 },
69 QuitRequested {
70 at: SystemTime,
71 },
72 AlreadyRunning {
73 at: SystemTime,
74 },
75}
76
77impl Event {
78 pub fn at(&self) -> SystemTime {
79 match self {
80 Self::Connected { at, .. }
81 | Self::Disconnected { at, .. }
82 | Self::ThemeChanged { at, .. }
83 | Self::SpanFinished { at, .. }
84 | Self::QuitRequested { at }
85 | Self::AlreadyRunning { at } => *at,
86 }
87 }
88}
89
90pub fn is_running() -> bool {
91 std::net::TcpListener::bind(client::server_address_from_env()).is_err()
92}
93
94pub fn run() -> impl Stream<Item = Event> {
95 stream::channel(|mut output| async move {
96 let mut buffer = Vec::new();
97
98 let server = loop {
99 match net::TcpListener::bind(client::server_address_from_env()).await {
100 Ok(server) => break server,
101 Err(error) => {
102 if error.kind() == io::ErrorKind::AddrInUse {
103 let _ = output
104 .send(Event::AlreadyRunning {
105 at: SystemTime::now(),
106 })
107 .await;
108 }
109 delay().await;
110 }
111 };
112 };
113
114 loop {
115 let Ok((stream, _)) = server.accept().await else {
116 continue;
117 };
118
119 let (mut reader, mut writer) = {
120 let _ = stream.set_nodelay(true);
121 stream.into_split()
122 };
123
124 let (command_sender, mut command_receiver) = mpsc::channel(1);
125 let mut last_message = String::new();
126 let mut last_update_number = 0;
127 let mut last_tasks = 0;
128 let mut last_subscriptions = 0;
129 let mut last_present_layers = 0;
130 let mut last_prepare = present::Stage::default();
131 let mut last_render = present::Stage::default();
132
133 drop(task::spawn(async move {
134 let mut last_message_number = None;
135
136 while let Some(command) = command_receiver.recv().await {
137 match command {
138 client::Command::RewindTo { message } => {
139 if Some(message) == last_message_number {
140 continue;
141 }
142
143 last_message_number = Some(message);
144 }
145 client::Command::GoLive => {
146 last_message_number = None;
147 }
148 }
149
150 let _ = send(&mut writer, command)
151 .await
152 .inspect_err(|error| log::error!("Error when sending command: {error}"));
153 }
154 }));
155
156 loop {
157 match receive(&mut reader, &mut buffer).await {
158 Ok(message) => {
159 match message {
160 client::Message::Connected {
161 at,
162 name,
163 version,
164 theme,
165 can_time_travel,
166 } => {
167 let _ = output
168 .send(Event::Connected {
169 connection: Connection {
170 commands: command_sender.clone(),
171 },
172 at,
173 name,
174 version,
175 theme,
176 can_time_travel,
177 })
178 .await;
179 }
180 client::Message::EventLogged { at, event } => match event {
181 client::Event::ThemeChanged(palette) => {
182 let _ = output.send(Event::ThemeChanged { at, palette }).await;
183 }
184 client::Event::SubscriptionsTracked(amount_alive) => {
185 last_subscriptions = amount_alive;
186 }
187 client::Event::MessageLogged { number, message } => {
188 last_update_number = number;
189 last_message = message;
190 }
191 client::Event::CommandsSpawned(commands) => {
192 last_tasks = commands;
193 }
194 client::Event::LayersRendered(layers) => {
195 last_present_layers = layers;
196 }
197 client::Event::SpanStarted(span::Stage::Update) => {
198 last_message.clear();
199 last_tasks = 0;
200 }
201 client::Event::SpanStarted(_) => {}
202 client::Event::SpanFinished(stage, duration) => {
203 let span = match stage {
204 span::Stage::Boot => Span::Boot,
205 span::Stage::Update => Span::Update {
206 number: last_update_number,
207 message: last_message.clone(),
208 tasks: last_tasks,
209 subscriptions: last_subscriptions,
210 },
211 span::Stage::View(window) => Span::View { window },
212 span::Stage::Layout(window) => Span::Layout { window },
213 span::Stage::Interact(window) => Span::Interact { window },
214 span::Stage::Draw(window) => Span::Draw { window },
215 span::Stage::Prepare(primitive)
216 | span::Stage::Render(primitive) => {
217 let stage = if matches!(stage, span::Stage::Prepare(_),)
218 {
219 &mut last_prepare
220 } else {
221 &mut last_render
222 };
223
224 let primitive = match primitive {
225 present::Primitive::Quad => &mut stage.quads,
226 present::Primitive::Triangle => {
227 &mut stage.triangles
228 }
229 present::Primitive::Shader => &mut stage.shaders,
230 present::Primitive::Text => &mut stage.text,
231 present::Primitive::Image => &mut stage.images,
232 };
233
234 *primitive += duration;
235
236 continue;
237 }
238 span::Stage::Present(window) => {
239 let span = Span::Present {
240 window,
241 prepare: last_prepare,
242 render: last_render,
243 layers: last_present_layers,
244 };
245
246 last_prepare = present::Stage::default();
247 last_render = present::Stage::default();
248 last_present_layers = 0;
249
250 span
251 }
252 span::Stage::Custom(name) => Span::Custom { name },
253 };
254
255 let _ = output
256 .send(Event::SpanFinished { at, duration, span })
257 .await;
258 }
259 },
260 client::Message::Quit { at } => {
261 let _ = output.send(Event::QuitRequested { at }).await;
262 }
263 };
264 }
265 Err(Error::IOFailed(_)) => {
266 let _ = output
267 .send(Event::Disconnected {
268 at: SystemTime::now(),
269 })
270 .await;
271 break;
272 }
273 Err(Error::DecodingFailed(error)) => {
274 log::warn!("Error decoding beacon output: {error}")
275 }
276 }
277 }
278 }
279 })
280}
281
282async fn receive(
283 stream: &mut net::tcp::OwnedReadHalf,
284 buffer: &mut Vec<u8>,
285) -> Result<client::Message, Error> {
286 let size = stream.read_u64().await? as usize;
287
288 if buffer.len() < size {
289 buffer.resize(size, 0);
290 }
291
292 let _n = stream.read_exact(&mut buffer[..size]).await?;
293
294 Ok(bincode::deserialize(buffer)?)
295}
296
297async fn send(
298 stream: &mut net::tcp::OwnedWriteHalf,
299 command: client::Command,
300) -> Result<(), io::Error> {
301 let bytes = bincode::serialize(&command).expect("Encode input message");
302 let size = bytes.len() as u64;
303
304 stream.write_all(&size.to_be_bytes()).await?;
305 stream.write_all(&bytes).await?;
306 stream.flush().await?;
307
308 Ok(())
309}
310
311async fn delay() {
312 tokio::time::sleep(Duration::from_secs(2)).await;
313}