1use crate::futures::futures::{
2 Future, Sink, StreamExt,
3 channel::mpsc,
4 select,
5 task::{Context, Poll},
6};
7use crate::graphics::shell;
8use crate::runtime::Action;
9use crate::runtime::window;
10use std::pin::Pin;
11
12#[derive(Debug)]
14pub struct Proxy<T: 'static> {
15 raw: winit::event_loop::EventLoopProxy<Action<T>>,
16 sender: mpsc::Sender<Action<T>>,
17 notifier: mpsc::Sender<usize>,
18}
19
20impl<T: 'static> Clone for Proxy<T> {
21 fn clone(&self) -> Self {
22 Self {
23 raw: self.raw.clone(),
24 sender: self.sender.clone(),
25 notifier: self.notifier.clone(),
26 }
27 }
28}
29
30impl<T: 'static> Proxy<T> {
31 const MAX_SIZE: usize = 100;
32
33 pub fn new(
35 raw: winit::event_loop::EventLoopProxy<Action<T>>,
36 ) -> (Self, impl Future<Output = ()>) {
37 let (notifier, mut processed) = mpsc::channel(Self::MAX_SIZE);
38 let (sender, mut receiver) = mpsc::channel(Self::MAX_SIZE);
39 let proxy = raw.clone();
40
41 let worker = async move {
42 let mut count = 0;
43
44 loop {
45 if count < Self::MAX_SIZE {
46 select! {
47 message = receiver.select_next_some() => {
48 let _ = proxy.send_event(message);
49 count += 1;
50 }
51 amount = processed.select_next_some() => {
52 count = count.saturating_sub(amount);
53 }
54 complete => break,
55 }
56 } else {
57 select! {
58 amount = processed.select_next_some() => {
59 count = count.saturating_sub(amount);
60 }
61 complete => break,
62 }
63 }
64 }
65 };
66
67 (
68 Self {
69 raw,
70 sender,
71 notifier,
72 },
73 worker,
74 )
75 }
76
77 pub fn input(&self) -> mpsc::Sender<Action<T>> {
79 self.sender.clone()
80 }
81
82 pub fn send(&self, value: T) {
87 self.send_action(Action::Output(value));
88 }
89
90 pub fn send_action(&self, action: Action<T>) {
95 let _ = self.raw.send_event(action);
96 }
97
98 pub fn free_slots(&mut self, amount: usize) {
101 let _ = self.notifier.start_send(amount);
102 }
103}
104
105impl<T: 'static> Sink<Action<T>> for Proxy<T> {
106 type Error = mpsc::SendError;
107
108 fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
109 self.sender.poll_ready(cx)
110 }
111
112 fn start_send(mut self: Pin<&mut Self>, action: Action<T>) -> Result<(), Self::Error> {
113 self.sender.start_send(action)
114 }
115
116 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
117 match self.sender.poll_ready(cx) {
118 Poll::Ready(Err(ref e)) if e.is_disconnected() => {
119 Poll::Ready(Ok(()))
121 }
122 x => x,
123 }
124 }
125
126 fn poll_close(
127 mut self: Pin<&mut Self>,
128 _cx: &mut Context<'_>,
129 ) -> Poll<Result<(), Self::Error>> {
130 self.sender.disconnect();
131 Poll::Ready(Ok(()))
132 }
133}
134
135impl<T> shell::Notifier for Proxy<T>
136where
137 T: Send,
138{
139 fn tick(&self) {
140 self.send_action(Action::Tick);
141 }
142
143 fn request_redraw(&self) {
144 self.send_action(Action::Window(window::Action::RedrawAll));
145 }
146
147 fn invalidate_layout(&self) {
148 self.send_action(Action::Window(window::Action::RelayoutAll));
149 }
150}