1use crate::futures::futures::{
2 Future, Sink, StreamExt,
3 channel::mpsc,
4 select,
5 task::{Context, Poll},
6};
7use crate::graphics::shell;
8use crate::runtime::Action;
9use crate::runtime::window;
10use std::pin::Pin;
11
12#[derive(Debug)]
14pub struct Proxy<T: 'static> {
15 raw: winit::event_loop::EventLoopProxy<Action<T>>,
16 sender: mpsc::Sender<Action<T>>,
17 notifier: mpsc::Sender<usize>,
18}
19
20impl<T: 'static> Clone for Proxy<T> {
21 fn clone(&self) -> Self {
22 Self {
23 raw: self.raw.clone(),
24 sender: self.sender.clone(),
25 notifier: self.notifier.clone(),
26 }
27 }
28}
29
30impl<T: 'static> Proxy<T> {
31 const MAX_SIZE: usize = 100;
32
33 pub fn new(
35 raw: winit::event_loop::EventLoopProxy<Action<T>>,
36 ) -> (Self, impl Future<Output = ()>) {
37 let (notifier, mut processed) = mpsc::channel(Self::MAX_SIZE);
38 let (sender, mut receiver) = mpsc::channel(Self::MAX_SIZE);
39 let proxy = raw.clone();
40
41 let worker = async move {
42 let mut count = 0;
43
44 loop {
45 if count < Self::MAX_SIZE {
46 select! {
47 message = receiver.select_next_some() => {
48 let _ = proxy.send_event(message);
49 count += 1;
50
51 }
52 amount = processed.select_next_some() => {
53 count = count.saturating_sub(amount);
54 }
55 complete => break,
56 }
57 } else {
58 select! {
59 amount = processed.select_next_some() => {
60 count = count.saturating_sub(amount);
61 }
62 complete => break,
63 }
64 }
65 }
66 };
67
68 (
69 Self {
70 raw,
71 sender,
72 notifier,
73 },
74 worker,
75 )
76 }
77
78 pub fn send(&self, value: T) {
83 self.send_action(Action::Output(value));
84 }
85
86 pub fn send_action(&self, action: Action<T>) {
91 let _ = self.raw.send_event(action);
92 }
93
94 pub fn free_slots(&mut self, amount: usize) {
97 let _ = self.notifier.start_send(amount);
98 }
99}
100
101impl<T: 'static> Sink<Action<T>> for Proxy<T> {
102 type Error = mpsc::SendError;
103
104 fn poll_ready(
105 mut self: Pin<&mut Self>,
106 cx: &mut Context<'_>,
107 ) -> Poll<Result<(), Self::Error>> {
108 self.sender.poll_ready(cx)
109 }
110
111 fn start_send(
112 mut self: Pin<&mut Self>,
113 action: Action<T>,
114 ) -> Result<(), Self::Error> {
115 self.sender.start_send(action)
116 }
117
118 fn poll_flush(
119 mut self: Pin<&mut Self>,
120 cx: &mut Context<'_>,
121 ) -> Poll<Result<(), Self::Error>> {
122 match self.sender.poll_ready(cx) {
123 Poll::Ready(Err(ref e)) if e.is_disconnected() => {
124 Poll::Ready(Ok(()))
126 }
127 x => x,
128 }
129 }
130
131 fn poll_close(
132 mut self: Pin<&mut Self>,
133 _cx: &mut Context<'_>,
134 ) -> Poll<Result<(), Self::Error>> {
135 self.sender.disconnect();
136 Poll::Ready(Ok(()))
137 }
138}
139
140impl<T> shell::Notifier for Proxy<T>
141where
142 T: Send,
143{
144 fn request_redraw(&self) {
145 self.send_action(Action::Window(window::Action::RedrawAll));
146 }
147
148 fn invalidate_layout(&self) {
149 self.send_action(Action::Window(window::Action::RelayoutAll));
150 }
151}