diff options
Diffstat (limited to 'winit/src')
-rw-r--r-- | winit/src/application.rs | 29 | ||||
-rw-r--r-- | winit/src/lib.rs | 1 | ||||
-rw-r--r-- | winit/src/subscription.rs | 97 |
3 files changed, 124 insertions, 3 deletions
diff --git a/winit/src/application.rs b/winit/src/application.rs index 85d06d9b..3b8ac16b 100644 --- a/winit/src/application.rs +++ b/winit/src/application.rs @@ -2,8 +2,8 @@ use crate::{ conversion, input::{keyboard, mouse}, renderer::{Target, Windowed}, - Cache, Command, Container, Debug, Element, Event, Length, MouseCursor, - Settings, UserInterface, + subscription, Cache, Command, Container, Debug, Element, Event, Length, + MouseCursor, Settings, Subscription, UserInterface, }; /// An interactive, native cross-platform application. @@ -57,6 +57,15 @@ pub trait Application: Sized { /// [`Command`]: struct.Command.html fn update(&mut self, message: Self::Message) -> Command<Self::Message>; + /// Returns the event `Subscription` for the current state of the + /// application. + /// + /// The messages produced by the `Subscription` will be handled by + /// [`update`](#tymethod.update). + /// + /// A `Subscription` will be kept alive as long as you keep returning it! + fn subscription(&self) -> Subscription<Self::Message>; + /// Returns the widgets to display in the [`Application`]. /// /// These widgets can produce __messages__ based on user interaction. @@ -89,11 +98,15 @@ pub trait Application: Sized { let proxy = event_loop.create_proxy(); let mut thread_pool = futures::executor::ThreadPool::new().expect("Create thread pool"); + let mut subscription_pool = subscription::Pool::new(); let mut external_messages = Vec::new(); let (mut application, init_command) = Self::new(); spawn(init_command, &mut thread_pool, &proxy); + let subscription = application.subscription(); + subscription_pool.update(subscription, &mut thread_pool, &proxy); + let mut title = application.title(); let window = { @@ -176,6 +189,10 @@ pub trait Application: Sized { debug.layout_finished(); debug.event_processing_started(); + events.iter().for_each(|event| { + subscription_pool.broadcast_event(*event) + }); + let mut messages = user_interface.update(&renderer, events.drain(..)); messages.extend(external_messages.drain(..)); @@ -199,11 +216,17 @@ pub trait Application: Sized { debug.update_started(); let command = application.update(message); - spawn(command, &mut thread_pool, &proxy); debug.update_finished(); } + let subscription = application.subscription(); + subscription_pool.update( + subscription, + &mut thread_pool, + &proxy, + ); + // Update window title let new_title = application.title(); diff --git a/winit/src/lib.rs b/winit/src/lib.rs index df3a6997..8a1dc870 100644 --- a/winit/src/lib.rs +++ b/winit/src/lib.rs @@ -29,6 +29,7 @@ pub mod conversion; pub mod settings; mod application; +mod subscription; pub use application::Application; pub use settings::Settings; diff --git a/winit/src/subscription.rs b/winit/src/subscription.rs new file mode 100644 index 00000000..f55507af --- /dev/null +++ b/winit/src/subscription.rs @@ -0,0 +1,97 @@ +use iced_native::{Event, Hasher, Subscription}; +use std::collections::HashMap; + +pub struct Pool { + alive: HashMap<u64, Handle>, +} + +pub struct Handle { + _cancel: futures::channel::oneshot::Sender<()>, + listener: Option<futures::channel::mpsc::Sender<Event>>, +} + +impl Pool { + pub fn new() -> Self { + Self { + alive: HashMap::new(), + } + } + + pub fn update<Message: Send>( + &mut self, + subscription: Subscription<Message>, + thread_pool: &mut futures::executor::ThreadPool, + proxy: &winit::event_loop::EventLoopProxy<Message>, + ) { + use futures::{future::FutureExt, stream::StreamExt}; + + let recipes = subscription.recipes(); + let mut alive = std::collections::HashSet::new(); + + for recipe in recipes { + let id = { + use std::hash::Hasher as _; + + let mut hasher = Hasher::default(); + recipe.hash(&mut hasher); + + hasher.finish() + }; + + let _ = alive.insert(id); + + if !self.alive.contains_key(&id) { + let (cancel, cancelled) = futures::channel::oneshot::channel(); + + // TODO: Use bus if/when it supports async + let (event_sender, event_receiver) = + futures::channel::mpsc::channel(100); + + let stream = recipe.stream(event_receiver.boxed()); + let proxy = proxy.clone(); + + let future = futures::future::select( + cancelled, + stream.for_each(move |message| { + proxy + .send_event(message) + .expect("Send subscription result to event loop"); + + futures::future::ready(()) + }), + ) + .map(|_| ()); + + thread_pool.spawn_ok(future); + + let _ = self.alive.insert( + id, + Handle { + _cancel: cancel, + listener: if event_sender.is_closed() { + None + } else { + Some(event_sender) + }, + }, + ); + } + } + + self.alive.retain(|id, _| alive.contains(&id)); + } + + pub fn broadcast_event(&mut self, event: Event) { + self.alive + .values_mut() + .filter_map(|connection| connection.listener.as_mut()) + .for_each(|listener| { + if let Err(error) = listener.try_send(event) { + log::error!( + "Error sending event to subscription: {:?}", + error + ); + } + }); + } +} |