1use crate::futures::futures::{
2 Future, Sink, StreamExt,
3 channel::mpsc,
4 select,
5 task::{Context, Poll},
6};
7use crate::runtime::Action;
8use std::pin::Pin;
9
10#[derive(Debug)]
12pub struct Proxy<T: 'static> {
13 raw: winit::event_loop::EventLoopProxy<Action<T>>,
14 sender: mpsc::Sender<Action<T>>,
15 notifier: mpsc::Sender<usize>,
16}
17
18impl<T: 'static> Clone for Proxy<T> {
19 fn clone(&self) -> Self {
20 Self {
21 raw: self.raw.clone(),
22 sender: self.sender.clone(),
23 notifier: self.notifier.clone(),
24 }
25 }
26}
27
28impl<T: 'static> Proxy<T> {
29 const MAX_SIZE: usize = 100;
30
31 pub fn new(
33 raw: winit::event_loop::EventLoopProxy<Action<T>>,
34 ) -> (Self, impl Future<Output = ()>) {
35 let (notifier, mut processed) = mpsc::channel(Self::MAX_SIZE);
36 let (sender, mut receiver) = mpsc::channel(Self::MAX_SIZE);
37 let proxy = raw.clone();
38
39 let worker = async move {
40 let mut count = 0;
41
42 loop {
43 if count < Self::MAX_SIZE {
44 select! {
45 message = receiver.select_next_some() => {
46 let _ = proxy.send_event(message);
47 count += 1;
48
49 }
50 amount = processed.select_next_some() => {
51 count = count.saturating_sub(amount);
52 }
53 complete => break,
54 }
55 } else {
56 select! {
57 amount = processed.select_next_some() => {
58 count = count.saturating_sub(amount);
59 }
60 complete => break,
61 }
62 }
63 }
64 };
65
66 (
67 Self {
68 raw,
69 sender,
70 notifier,
71 },
72 worker,
73 )
74 }
75
76 pub fn send(&self, value: T) {
81 self.send_action(Action::Output(value));
82 }
83
84 pub fn send_action(&self, action: Action<T>) {
89 let _ = self.raw.send_event(action);
90 }
91
92 pub fn free_slots(&mut self, amount: usize) {
95 let _ = self.notifier.start_send(amount);
96 }
97}
98
99impl<T: 'static> Sink<Action<T>> for Proxy<T> {
100 type Error = mpsc::SendError;
101
102 fn poll_ready(
103 mut self: Pin<&mut Self>,
104 cx: &mut Context<'_>,
105 ) -> Poll<Result<(), Self::Error>> {
106 self.sender.poll_ready(cx)
107 }
108
109 fn start_send(
110 mut self: Pin<&mut Self>,
111 action: Action<T>,
112 ) -> Result<(), Self::Error> {
113 self.sender.start_send(action)
114 }
115
116 fn poll_flush(
117 mut self: Pin<&mut Self>,
118 cx: &mut Context<'_>,
119 ) -> Poll<Result<(), Self::Error>> {
120 match self.sender.poll_ready(cx) {
121 Poll::Ready(Err(ref e)) if e.is_disconnected() => {
122 Poll::Ready(Ok(()))
124 }
125 x => x,
126 }
127 }
128
129 fn poll_close(
130 mut self: Pin<&mut Self>,
131 _cx: &mut Context<'_>,
132 ) -> Poll<Result<(), Self::Error>> {
133 self.sender.disconnect();
134 Poll::Ready(Ok(()))
135 }
136}