From b6b51375cfd96e330d6ee22096dacf831a992aa7 Mon Sep 17 00:00:00 2001 From: Héctor Ramón Jiménez Date: Tue, 16 Apr 2024 21:50:28 +0200 Subject: Implement backpressure mechanism in `iced_winit::Proxy` --- winit/src/proxy.rs | 108 +++++++++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 93 insertions(+), 15 deletions(-) (limited to 'winit/src/proxy.rs') diff --git a/winit/src/proxy.rs b/winit/src/proxy.rs index 1d6c48bb..a35e8a27 100644 --- a/winit/src/proxy.rs +++ b/winit/src/proxy.rs @@ -1,28 +1,101 @@ use crate::futures::futures::{ channel::mpsc, + stream, task::{Context, Poll}, - Sink, + Future, Sink, StreamExt, }; use std::pin::Pin; -/// An event loop proxy that implements `Sink`. +/// An event loop proxy with backpressure that implements `Sink`. #[derive(Debug)] pub struct Proxy { raw: winit::event_loop::EventLoopProxy, + sender: mpsc::Sender, + notifier: mpsc::Sender, } impl Clone for Proxy { fn clone(&self) -> Self { Self { raw: self.raw.clone(), + sender: self.sender.clone(), + notifier: self.notifier.clone(), } } } impl Proxy { + const MAX_SIZE: usize = 100; + /// Creates a new [`Proxy`] from an `EventLoopProxy`. - pub fn new(raw: winit::event_loop::EventLoopProxy) -> Self { - Self { raw } + pub fn new( + raw: winit::event_loop::EventLoopProxy, + ) -> (Self, impl Future) { + let (notifier, processed) = mpsc::channel(Self::MAX_SIZE); + let (sender, receiver) = mpsc::channel(Self::MAX_SIZE); + let proxy = raw.clone(); + + let worker = async move { + enum Item { + MessageProduced(T), + BatchProcessed(usize), + } + + let mut receiver = receiver.map(Item::MessageProduced); + let mut processed = processed.map(Item::BatchProcessed); + + let mut count = 0; + + loop { + if count < Self::MAX_SIZE { + let mut stream = + stream::select(receiver.by_ref(), processed.by_ref()); + + match stream.select_next_some().await { + Item::MessageProduced(message) => { + let _ = proxy.send_event(message); + + count += 1; + } + Item::BatchProcessed(amount) => { + count = count.saturating_sub(amount); + } + } + } else if let Item::BatchProcessed(amount) = + processed.select_next_some().await + { + count = count.saturating_sub(amount); + } + } + }; + + ( + Self { + raw, + sender, + notifier, + }, + worker, + ) + } + + /// Sends a `Message` to the event loop. + /// + /// Note: This skips the backpressure mechanism with an unbounded + /// channel. Use sparingly! + pub fn send(&mut self, message: Message) + where + Message: std::fmt::Debug, + { + self.raw + .send_event(message) + .expect("Send message to event loop"); + } + + /// Frees an amount of slots for additional messages to be queued in + /// this [`Proxy`]. + pub fn free_slots(&mut self, amount: usize) { + let _ = self.notifier.start_send(amount); } } @@ -30,32 +103,37 @@ impl Sink for Proxy { type Error = mpsc::SendError; fn poll_ready( - self: Pin<&mut Self>, - _cx: &mut Context<'_>, + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, ) -> Poll> { - Poll::Ready(Ok(())) + self.sender.poll_ready(cx) } fn start_send( - self: Pin<&mut Self>, + mut self: Pin<&mut Self>, message: Message, ) -> Result<(), Self::Error> { - let _ = self.raw.send_event(message); - - Ok(()) + self.sender.start_send(message) } fn poll_flush( - self: Pin<&mut Self>, - _cx: &mut Context<'_>, + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, ) -> Poll> { - Poll::Ready(Ok(())) + match self.sender.poll_ready(cx) { + Poll::Ready(Err(ref e)) if e.is_disconnected() => { + // If the receiver disconnected, we consider the sink to be flushed. + Poll::Ready(Ok(())) + } + x => x, + } } fn poll_close( - self: Pin<&mut Self>, + mut self: Pin<&mut Self>, _cx: &mut Context<'_>, ) -> Poll> { + self.sender.disconnect(); Poll::Ready(Ok(())) } } -- cgit From a05b8044a9a82c1802d4d97f1723e24b9d9dad9c Mon Sep 17 00:00:00 2001 From: Héctor Ramón Jiménez Date: Wed, 17 Apr 2024 15:54:12 +0200 Subject: Fix `SelectNextSome` poll after termination panic in `iced_winit::Proxy` --- winit/src/proxy.rs | 37 +++++++++++++++---------------------- 1 file changed, 15 insertions(+), 22 deletions(-) (limited to 'winit/src/proxy.rs') diff --git a/winit/src/proxy.rs b/winit/src/proxy.rs index a35e8a27..3edc30ad 100644 --- a/winit/src/proxy.rs +++ b/winit/src/proxy.rs @@ -1,6 +1,6 @@ use crate::futures::futures::{ channel::mpsc, - stream, + select, task::{Context, Poll}, Future, Sink, StreamExt, }; @@ -31,40 +31,33 @@ impl Proxy { pub fn new( raw: winit::event_loop::EventLoopProxy, ) -> (Self, impl Future) { - let (notifier, processed) = mpsc::channel(Self::MAX_SIZE); - let (sender, receiver) = mpsc::channel(Self::MAX_SIZE); + let (notifier, mut processed) = mpsc::channel(Self::MAX_SIZE); + let (sender, mut receiver) = mpsc::channel(Self::MAX_SIZE); let proxy = raw.clone(); let worker = async move { - enum Item { - MessageProduced(T), - BatchProcessed(usize), - } - - let mut receiver = receiver.map(Item::MessageProduced); - let mut processed = processed.map(Item::BatchProcessed); - let mut count = 0; loop { if count < Self::MAX_SIZE { - let mut stream = - stream::select(receiver.by_ref(), processed.by_ref()); - - match stream.select_next_some().await { - Item::MessageProduced(message) => { + select! { + message = receiver.select_next_some() => { let _ = proxy.send_event(message); - count += 1; + + } + amount = processed.select_next_some() => { + count = count.saturating_sub(amount); } - Item::BatchProcessed(amount) => { + complete => break, + } + } else { + select! { + amount = processed.select_next_some() => { count = count.saturating_sub(amount); } + complete => break, } - } else if let Item::BatchProcessed(amount) = - processed.select_next_some().await - { - count = count.saturating_sub(amount); } } }; -- cgit