diff options
author | 2024-04-16 21:50:28 +0200 | |
---|---|---|
committer | 2024-04-16 21:50:28 +0200 | |
commit | b6b51375cfd96e330d6ee22096dacf831a992aa7 (patch) | |
tree | 293ec9c82eedc982b5c166ae0295494a8010ad47 | |
parent | e8ec6b94b68801ce4e95ada7c311320469f92a96 (diff) | |
download | iced-b6b51375cfd96e330d6ee22096dacf831a992aa7.tar.gz iced-b6b51375cfd96e330d6ee22096dacf831a992aa7.tar.bz2 iced-b6b51375cfd96e330d6ee22096dacf831a992aa7.zip |
Implement backpressure mechanism in `iced_winit::Proxy`
-rw-r--r-- | winit/src/application.rs | 79 | ||||
-rw-r--r-- | winit/src/multi_window.rs | 77 | ||||
-rw-r--r-- | winit/src/proxy.rs | 108 |
3 files changed, 152 insertions, 112 deletions
diff --git a/winit/src/application.rs b/winit/src/application.rs index 1ca80609..a447c9da 100644 --- a/winit/src/application.rs +++ b/winit/src/application.rs @@ -149,13 +149,14 @@ where let event_loop = EventLoopBuilder::with_user_event() .build() .expect("Create event loop"); - let proxy = event_loop.create_proxy(); + + let (proxy, worker) = Proxy::new(event_loop.create_proxy()); let runtime = { - let proxy = Proxy::new(event_loop.create_proxy()); let executor = E::new().map_err(Error::ExecutorCreationFailed)?; + executor.spawn(worker); - Runtime::new(executor, proxy) + Runtime::new(executor, proxy.clone()) }; let (application, init_command) = { @@ -305,7 +306,7 @@ async fn run_instance<A, E, C>( mut compositor: C, mut renderer: A::Renderer, mut runtime: Runtime<E, Proxy<A::Message>, A::Message>, - mut proxy: winit::event_loop::EventLoopProxy<A::Message>, + mut proxy: Proxy<A::Message>, mut debug: Debug, mut event_receiver: mpsc::UnboundedReceiver< winit::event::Event<A::Message>, @@ -370,6 +371,7 @@ async fn run_instance<A, E, C>( let mut mouse_interaction = mouse::Interaction::default(); let mut events = Vec::new(); let mut messages = Vec::new(); + let mut user_events = 0; let mut redraw_pending = false; debug.startup_finished(); @@ -396,6 +398,7 @@ async fn run_instance<A, E, C>( } event::Event::UserEvent(message) => { messages.push(message); + user_events += 1; } event::Event::WindowEvent { event: event::WindowEvent::RedrawRequested { .. }, @@ -593,6 +596,11 @@ async fn run_instance<A, E, C>( if should_exit { break; } + + if user_events > 0 { + proxy.free_slots(user_events); + user_events = 0; + } } if !redraw_pending { @@ -667,7 +675,7 @@ pub fn update<A: Application, C, E: Executor>( runtime: &mut Runtime<E, Proxy<A::Message>, A::Message>, clipboard: &mut Clipboard, should_exit: &mut bool, - proxy: &mut winit::event_loop::EventLoopProxy<A::Message>, + proxy: &mut Proxy<A::Message>, debug: &mut Debug, messages: &mut Vec<A::Message>, window: &winit::window::Window, @@ -717,7 +725,7 @@ pub fn run_command<A, C, E>( runtime: &mut Runtime<E, Proxy<A::Message>, A::Message>, clipboard: &mut Clipboard, should_exit: &mut bool, - proxy: &mut winit::event_loop::EventLoopProxy<A::Message>, + proxy: &mut Proxy<A::Message>, debug: &mut Debug, window: &winit::window::Window, ) where @@ -742,9 +750,7 @@ pub fn run_command<A, C, E>( clipboard::Action::Read(tag, kind) => { let message = tag(clipboard.read(kind)); - proxy - .send_event(message) - .expect("Send message to event loop"); + proxy.send(message); } clipboard::Action::Write(contents, kind) => { clipboard.write(kind, contents); @@ -774,25 +780,16 @@ pub fn run_command<A, C, E>( let size = window.inner_size().to_logical(window.scale_factor()); - proxy - .send_event(callback(Size::new( - size.width, - size.height, - ))) - .expect("Send message to event loop"); + proxy.send(callback(Size::new(size.width, size.height))); } window::Action::FetchMaximized(_id, callback) => { - proxy - .send_event(callback(window.is_maximized())) - .expect("Send message to event loop"); + proxy.send(callback(window.is_maximized())); } window::Action::Maximize(_id, maximized) => { window.set_maximized(maximized); } window::Action::FetchMinimized(_id, callback) => { - proxy - .send_event(callback(window.is_minimized())) - .expect("Send message to event loop"); + proxy.send(callback(window.is_minimized())); } window::Action::Minimize(_id, minimized) => { window.set_minimized(minimized); @@ -808,9 +805,7 @@ pub fn run_command<A, C, E>( }) .ok(); - proxy - .send_event(callback(position)) - .expect("Send message to event loop"); + proxy.send(callback(position)); } window::Action::Move(_id, position) => { window.set_outer_position(winit::dpi::LogicalPosition { @@ -835,9 +830,7 @@ pub fn run_command<A, C, E>( core::window::Mode::Hidden }; - proxy - .send_event(tag(mode)) - .expect("Send message to event loop"); + proxy.send(tag(mode)); } window::Action::ToggleMaximize(_id) => { window.set_maximized(!window.is_maximized()); @@ -865,17 +858,13 @@ pub fn run_command<A, C, E>( } } window::Action::FetchId(_id, tag) => { - proxy - .send_event(tag(window.id().into())) - .expect("Send message to event loop"); + proxy.send(tag(window.id().into())); } window::Action::RunWithHandle(_id, tag) => { use window::raw_window_handle::HasWindowHandle; if let Ok(handle) = window.window_handle() { - proxy - .send_event(tag(&handle)) - .expect("Send message to event loop"); + proxy.send(tag(&handle)); } } @@ -888,12 +877,10 @@ pub fn run_command<A, C, E>( &debug.overlay(), ); - proxy - .send_event(tag(window::Screenshot::new( - bytes, - state.physical_size(), - ))) - .expect("Send message to event loop."); + proxy.send(tag(window::Screenshot::new( + bytes, + state.physical_size(), + ))); } }, command::Action::System(action) => match action { @@ -901,7 +888,7 @@ pub fn run_command<A, C, E>( #[cfg(feature = "system")] { let graphics_info = compositor.fetch_information(); - let proxy = proxy.clone(); + let mut proxy = proxy.clone(); let _ = std::thread::spawn(move || { let information = @@ -909,9 +896,7 @@ pub fn run_command<A, C, E>( let message = _tag(information); - proxy - .send_event(message) - .expect("Send message to event loop"); + proxy.send(message); }); } } @@ -934,9 +919,7 @@ pub fn run_command<A, C, E>( match operation.finish() { operation::Outcome::None => {} operation::Outcome::Some(message) => { - proxy - .send_event(message) - .expect("Send message to event loop"); + proxy.send(message); } operation::Outcome::Chain(next) => { current_operation = Some(next); @@ -951,9 +934,7 @@ pub fn run_command<A, C, E>( // TODO: Error handling (?) compositor.load_font(bytes); - proxy - .send_event(tagger(Ok(()))) - .expect("Send message to event loop"); + proxy.send(tagger(Ok(()))); } command::Action::Custom(_) => { log::warn!("Unsupported custom action in `iced_winit` shell"); diff --git a/winit/src/multi_window.rs b/winit/src/multi_window.rs index 3537ac18..f832eb81 100644 --- a/winit/src/multi_window.rs +++ b/winit/src/multi_window.rs @@ -123,13 +123,13 @@ where .build() .expect("Create event loop"); - let proxy = event_loop.create_proxy(); + let (proxy, worker) = Proxy::new(event_loop.create_proxy()); let runtime = { - let proxy = Proxy::new(event_loop.create_proxy()); let executor = E::new().map_err(Error::ExecutorCreationFailed)?; + executor.spawn(worker); - Runtime::new(executor, proxy) + Runtime::new(executor, proxy.clone()) }; let (application, init_command) = { @@ -343,7 +343,7 @@ async fn run_instance<A, E, C>( mut application: A, mut compositor: C, mut runtime: Runtime<E, Proxy<A::Message>, A::Message>, - mut proxy: winit::event_loop::EventLoopProxy<A::Message>, + mut proxy: Proxy<A::Message>, mut debug: Debug, mut event_receiver: mpsc::UnboundedReceiver<Event<A::Message>>, mut control_sender: mpsc::UnboundedSender<Control>, @@ -408,6 +408,7 @@ async fn run_instance<A, E, C>( runtime.track(application.subscription().into_recipes()); let mut messages = Vec::new(); + let mut user_events = 0; debug.startup_finished(); @@ -482,6 +483,7 @@ async fn run_instance<A, E, C>( } event::Event::UserEvent(message) => { messages.push(message); + user_events += 1; } event::Event::WindowEvent { window_id: id, @@ -803,6 +805,11 @@ async fn run_instance<A, E, C>( &mut window_manager, cached_interfaces, )); + + if user_events > 0 { + proxy.free_slots(user_events); + user_events = 0; + } } } _ => {} @@ -845,7 +852,7 @@ fn update<A: Application, C, E: Executor>( runtime: &mut Runtime<E, Proxy<A::Message>, A::Message>, clipboard: &mut Clipboard, control_sender: &mut mpsc::UnboundedSender<Control>, - proxy: &mut winit::event_loop::EventLoopProxy<A::Message>, + proxy: &mut Proxy<A::Message>, debug: &mut Debug, messages: &mut Vec<A::Message>, window_manager: &mut WindowManager<A, C>, @@ -887,7 +894,7 @@ fn run_command<A, C, E>( runtime: &mut Runtime<E, Proxy<A::Message>, A::Message>, clipboard: &mut Clipboard, control_sender: &mut mpsc::UnboundedSender<Control>, - proxy: &mut winit::event_loop::EventLoopProxy<A::Message>, + proxy: &mut Proxy<A::Message>, debug: &mut Debug, window_manager: &mut WindowManager<A, C>, ui_caches: &mut FxHashMap<window::Id, user_interface::Cache>, @@ -913,9 +920,7 @@ fn run_command<A, C, E>( clipboard::Action::Read(tag, kind) => { let message = tag(clipboard.read(kind)); - proxy - .send_event(message) - .expect("Send message to event loop"); + proxy.send(message); } clipboard::Action::Write(contents, kind) => { clipboard.write(kind, contents); @@ -967,18 +972,12 @@ fn run_command<A, C, E>( .to_logical(window.raw.scale_factor()); proxy - .send_event(callback(Size::new( - size.width, - size.height, - ))) - .expect("Send message to event loop"); + .send(callback(Size::new(size.width, size.height))); } } window::Action::FetchMaximized(id, callback) => { if let Some(window) = window_manager.get_mut(id) { - proxy - .send_event(callback(window.raw.is_maximized())) - .expect("Send message to event loop"); + proxy.send(callback(window.raw.is_maximized())); } } window::Action::Maximize(id, maximized) => { @@ -988,9 +987,7 @@ fn run_command<A, C, E>( } window::Action::FetchMinimized(id, callback) => { if let Some(window) = window_manager.get_mut(id) { - proxy - .send_event(callback(window.raw.is_minimized())) - .expect("Send message to event loop"); + proxy.send(callback(window.raw.is_minimized())); } } window::Action::Minimize(id, minimized) => { @@ -1012,9 +1009,7 @@ fn run_command<A, C, E>( }) .ok(); - proxy - .send_event(callback(position)) - .expect("Send message to event loop"); + proxy.send(callback(position)); } } window::Action::Move(id, position) => { @@ -1049,9 +1044,7 @@ fn run_command<A, C, E>( core::window::Mode::Hidden }; - proxy - .send_event(tag(mode)) - .expect("Send message to event loop"); + proxy.send(tag(mode)); } } window::Action::ToggleMaximize(id) => { @@ -1099,9 +1092,7 @@ fn run_command<A, C, E>( } window::Action::FetchId(id, tag) => { if let Some(window) = window_manager.get_mut(id) { - proxy - .send_event(tag(window.raw.id().into())) - .expect("Send message to event loop"); + proxy.send(tag(window.raw.id().into())); } } window::Action::RunWithHandle(id, tag) => { @@ -1111,9 +1102,7 @@ fn run_command<A, C, E>( .get_mut(id) .and_then(|window| window.raw.window_handle().ok()) { - proxy - .send_event(tag(&handle)) - .expect("Send message to event loop"); + proxy.send(tag(&handle)); } } window::Action::Screenshot(id, tag) => { @@ -1126,12 +1115,10 @@ fn run_command<A, C, E>( &debug.overlay(), ); - proxy - .send_event(tag(window::Screenshot::new( - bytes, - window.state.physical_size(), - ))) - .expect("Event loop doesn't exist."); + proxy.send(tag(window::Screenshot::new( + bytes, + window.state.physical_size(), + ))); } } }, @@ -1140,7 +1127,7 @@ fn run_command<A, C, E>( #[cfg(feature = "system")] { let graphics_info = compositor.fetch_information(); - let proxy = proxy.clone(); + let mut proxy = proxy.clone(); let _ = std::thread::spawn(move || { let information = @@ -1148,9 +1135,7 @@ fn run_command<A, C, E>( let message = _tag(information); - proxy - .send_event(message) - .expect("Event loop doesn't exist."); + proxy.send(message); }); } } @@ -1175,9 +1160,7 @@ fn run_command<A, C, E>( match operation.finish() { operation::Outcome::None => {} operation::Outcome::Some(message) => { - proxy - .send_event(message) - .expect("Event loop doesn't exist."); + proxy.send(message); // operation completed, don't need to try to operate on rest of UIs break 'operate; @@ -1197,9 +1180,7 @@ fn run_command<A, C, E>( // TODO: Error handling (?) compositor.load_font(bytes.clone()); - proxy - .send_event(tagger(Ok(()))) - .expect("Send message to event loop"); + proxy.send(tagger(Ok(()))); } command::Action::Custom(_) => { log::warn!("Unsupported custom action in `iced_winit` shell"); diff --git a/winit/src/proxy.rs b/winit/src/proxy.rs index 1d6c48bb..a35e8a27 100644 --- a/winit/src/proxy.rs +++ b/winit/src/proxy.rs @@ -1,28 +1,101 @@ use crate::futures::futures::{ channel::mpsc, + stream, task::{Context, Poll}, - Sink, + Future, Sink, StreamExt, }; use std::pin::Pin; -/// An event loop proxy that implements `Sink`. +/// An event loop proxy with backpressure that implements `Sink`. #[derive(Debug)] pub struct Proxy<Message: 'static> { raw: winit::event_loop::EventLoopProxy<Message>, + sender: mpsc::Sender<Message>, + notifier: mpsc::Sender<usize>, } impl<Message: 'static> Clone for Proxy<Message> { fn clone(&self) -> Self { Self { raw: self.raw.clone(), + sender: self.sender.clone(), + notifier: self.notifier.clone(), } } } impl<Message: 'static> Proxy<Message> { + const MAX_SIZE: usize = 100; + /// Creates a new [`Proxy`] from an `EventLoopProxy`. - pub fn new(raw: winit::event_loop::EventLoopProxy<Message>) -> Self { - Self { raw } + pub fn new( + raw: winit::event_loop::EventLoopProxy<Message>, + ) -> (Self, impl Future<Output = ()>) { + let (notifier, processed) = mpsc::channel(Self::MAX_SIZE); + let (sender, receiver) = mpsc::channel(Self::MAX_SIZE); + let proxy = raw.clone(); + + let worker = async move { + enum Item<T> { + MessageProduced(T), + BatchProcessed(usize), + } + + let mut receiver = receiver.map(Item::MessageProduced); + let mut processed = processed.map(Item::BatchProcessed); + + let mut count = 0; + + loop { + if count < Self::MAX_SIZE { + let mut stream = + stream::select(receiver.by_ref(), processed.by_ref()); + + match stream.select_next_some().await { + Item::MessageProduced(message) => { + let _ = proxy.send_event(message); + + count += 1; + } + Item::BatchProcessed(amount) => { + count = count.saturating_sub(amount); + } + } + } else if let Item::BatchProcessed(amount) = + processed.select_next_some().await + { + count = count.saturating_sub(amount); + } + } + }; + + ( + Self { + raw, + sender, + notifier, + }, + worker, + ) + } + + /// Sends a `Message` to the event loop. + /// + /// Note: This skips the backpressure mechanism with an unbounded + /// channel. Use sparingly! + pub fn send(&mut self, message: Message) + where + Message: std::fmt::Debug, + { + self.raw + .send_event(message) + .expect("Send message to event loop"); + } + + /// Frees an amount of slots for additional messages to be queued in + /// this [`Proxy`]. + pub fn free_slots(&mut self, amount: usize) { + let _ = self.notifier.start_send(amount); } } @@ -30,32 +103,37 @@ impl<Message: 'static> Sink<Message> for Proxy<Message> { type Error = mpsc::SendError; fn poll_ready( - self: Pin<&mut Self>, - _cx: &mut Context<'_>, + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, ) -> Poll<Result<(), Self::Error>> { - Poll::Ready(Ok(())) + self.sender.poll_ready(cx) } fn start_send( - self: Pin<&mut Self>, + mut self: Pin<&mut Self>, message: Message, ) -> Result<(), Self::Error> { - let _ = self.raw.send_event(message); - - Ok(()) + self.sender.start_send(message) } fn poll_flush( - self: Pin<&mut Self>, - _cx: &mut Context<'_>, + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, ) -> Poll<Result<(), Self::Error>> { - Poll::Ready(Ok(())) + match self.sender.poll_ready(cx) { + Poll::Ready(Err(ref e)) if e.is_disconnected() => { + // If the receiver disconnected, we consider the sink to be flushed. + Poll::Ready(Ok(())) + } + x => x, + } } fn poll_close( - self: Pin<&mut Self>, + mut self: Pin<&mut Self>, _cx: &mut Context<'_>, ) -> Poll<Result<(), Self::Error>> { + self.sender.disconnect(); Poll::Ready(Ok(())) } } |