From d575f4541126e2ab25908fe55c6805f16716b2a5 Mon Sep 17 00:00:00 2001 From: Héctor Ramón Jiménez Date: Thu, 5 Dec 2019 06:10:13 +0100 Subject: Draft first version of event subscriptions :tada: --- winit/src/application.rs | 79 ++++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 77 insertions(+), 2 deletions(-) (limited to 'winit/src/application.rs') diff --git a/winit/src/application.rs b/winit/src/application.rs index 3772a667..959e142d 100644 --- a/winit/src/application.rs +++ b/winit/src/application.rs @@ -2,9 +2,10 @@ use crate::{ conversion, input::{keyboard, mouse}, renderer::{Target, Windowed}, - Cache, Command, Container, Debug, Element, Event, Length, MouseCursor, - Settings, UserInterface, + subscription, Cache, Command, Container, Debug, Element, Event, Length, + MouseCursor, Settings, Subscription, UserInterface, }; +use std::collections::HashMap; /// An interactive, native cross-platform application. /// @@ -57,6 +58,9 @@ pub trait Application: Sized { /// [`Command`]: struct.Command.html fn update(&mut self, message: Self::Message) -> Command; + /// TODO + fn subscriptions(&self) -> Subscription; + /// Returns the widgets to display in the [`Application`]. /// /// These widgets can produce __messages__ based on user interaction. @@ -89,11 +93,15 @@ pub trait Application: Sized { let proxy = event_loop.create_proxy(); let mut thread_pool = futures::executor::ThreadPool::new().expect("Create thread pool"); + let mut alive_subscriptions = Subscriptions::new(); let mut external_messages = Vec::new(); let (mut application, init_command) = Self::new(); spawn(init_command, &mut thread_pool, &proxy); + let subscriptions = application.subscriptions(); + alive_subscriptions.update(subscriptions, &mut thread_pool, &proxy); + let mut title = application.title(); let window = { @@ -204,6 +212,13 @@ pub trait Application: Sized { debug.update_finished(); } + let subscriptions = application.subscriptions(); + alive_subscriptions.update( + subscriptions, + &mut thread_pool, + &proxy, + ); + // Update window title let new_title = application.title(); @@ -404,6 +419,66 @@ fn spawn( } } +pub struct Subscriptions { + alive: HashMap>, +} + +impl Subscriptions { + fn new() -> Self { + Self { + alive: HashMap::new(), + } + } + + fn update( + &mut self, + subscriptions: Subscription, + thread_pool: &mut futures::executor::ThreadPool, + proxy: &winit::event_loop::EventLoopProxy, + ) { + use futures::stream::StreamExt; + + let definitions = subscriptions.definitions(); + let mut alive = std::collections::HashSet::new(); + + for definition in definitions { + let id = definition.id(); + let _ = alive.insert(id); + + if !self.alive.contains_key(&id) { + let (stream, handle) = definition.stream(); + + let proxy = + std::sync::Arc::new(std::sync::Mutex::new(proxy.clone())); + + let future = stream.for_each(move |message| { + proxy + .lock() + .expect("Acquire event loop proxy lock") + .send_event(message) + .expect("Send subscription result to event loop"); + + futures::future::ready(()) + }); + + thread_pool.spawn_ok(future); + + let _ = self.alive.insert(id, handle); + } + } + + self.alive.retain(|id, handle| { + let is_still_alive = alive.contains(&id); + + if !is_still_alive { + handle.cancel(); + } + + is_still_alive + }); + } +} + // As defined in: http://www.unicode.org/faq/private_use.html // TODO: Remove once https://github.com/rust-windowing/winit/pull/1254 lands fn is_private_use_character(c: char) -> bool { -- cgit From 48145ba51e045f8b0b4788f3a75d20b9d9b7e6ad Mon Sep 17 00:00:00 2001 From: Héctor Ramón Jiménez Date: Sat, 7 Dec 2019 08:51:44 +0100 Subject: Use `oneshot` and `future::select` to cancel streams --- winit/src/application.rs | 50 +++++++++++++++++++++++------------------------- 1 file changed, 24 insertions(+), 26 deletions(-) (limited to 'winit/src/application.rs') diff --git a/winit/src/application.rs b/winit/src/application.rs index 959e142d..62970810 100644 --- a/winit/src/application.rs +++ b/winit/src/application.rs @@ -2,8 +2,8 @@ use crate::{ conversion, input::{keyboard, mouse}, renderer::{Target, Windowed}, - subscription, Cache, Command, Container, Debug, Element, Event, Length, - MouseCursor, Settings, Subscription, UserInterface, + Cache, Command, Container, Debug, Element, Event, Length, MouseCursor, + Settings, Subscription, UserInterface, }; use std::collections::HashMap; @@ -420,7 +420,7 @@ fn spawn( } pub struct Subscriptions { - alive: HashMap>, + alive: HashMap>, } impl Subscriptions { @@ -436,46 +436,44 @@ impl Subscriptions { thread_pool: &mut futures::executor::ThreadPool, proxy: &winit::event_loop::EventLoopProxy, ) { - use futures::stream::StreamExt; + use futures::{future::FutureExt, stream::StreamExt}; - let definitions = subscriptions.definitions(); + let handles = subscriptions.handles(); let mut alive = std::collections::HashSet::new(); - for definition in definitions { - let id = definition.id(); + for handle in handles { + let id = handle.id(); let _ = alive.insert(id); if !self.alive.contains_key(&id) { - let (stream, handle) = definition.stream(); + let (cancel, cancelled) = futures::channel::oneshot::channel(); + + let stream = handle.stream(); let proxy = std::sync::Arc::new(std::sync::Mutex::new(proxy.clone())); - let future = stream.for_each(move |message| { - proxy - .lock() - .expect("Acquire event loop proxy lock") - .send_event(message) - .expect("Send subscription result to event loop"); + let future = futures::future::select( + cancelled, + stream.for_each(move |message| { + proxy + .lock() + .expect("Acquire event loop proxy lock") + .send_event(message) + .expect("Send subscription result to event loop"); - futures::future::ready(()) - }); + futures::future::ready(()) + }), + ) + .map(|_| ()); thread_pool.spawn_ok(future); - let _ = self.alive.insert(id, handle); + let _ = self.alive.insert(id, cancel); } } - self.alive.retain(|id, handle| { - let is_still_alive = alive.contains(&id); - - if !is_still_alive { - handle.cancel(); - } - - is_still_alive - }); + self.alive.retain(|id, _| alive.contains(&id)); } } -- cgit From 98160406f714728afe718f305bf9d12be1676b2d Mon Sep 17 00:00:00 2001 From: Héctor Ramón Jiménez Date: Sun, 8 Dec 2019 08:21:26 +0100 Subject: Allow listening to runtime events in subscriptions --- winit/src/application.rs | 48 +++++++++++++++++++++++++++++++++++++++++------- 1 file changed, 41 insertions(+), 7 deletions(-) (limited to 'winit/src/application.rs') diff --git a/winit/src/application.rs b/winit/src/application.rs index 26ebdb05..49a01320 100644 --- a/winit/src/application.rs +++ b/winit/src/application.rs @@ -184,6 +184,10 @@ pub trait Application: Sized { debug.layout_finished(); debug.event_processing_started(); + events + .iter() + .for_each(|event| alive_subscriptions.send_event(*event)); + let mut messages = user_interface.update(&renderer, events.drain(..)); messages.extend(external_messages.drain(..)); @@ -207,7 +211,6 @@ pub trait Application: Sized { debug.update_started(); let command = application.update(message); - spawn(command, &mut thread_pool, &proxy); debug.update_finished(); } @@ -422,7 +425,12 @@ fn spawn( } pub struct Subscriptions { - alive: HashMap>, + alive: HashMap, +} + +pub struct Connection { + _cancel: futures::channel::oneshot::Sender<()>, + listener: Option>, } impl Subscriptions { @@ -440,17 +448,19 @@ impl Subscriptions { ) { use futures::{future::FutureExt, stream::StreamExt}; - let handles = subscriptions.handles(); + let connections = subscriptions.connections(); let mut alive = std::collections::HashSet::new(); - for handle in handles { - let id = handle.id(); + for connection in connections { + let id = connection.id(); let _ = alive.insert(id); if !self.alive.contains_key(&id) { let (cancel, cancelled) = futures::channel::oneshot::channel(); + let (event_sender, event_receiver) = + futures::channel::mpsc::channel(100); - let stream = handle.stream(); + let stream = connection.stream(event_receiver); let proxy = std::sync::Arc::new(std::sync::Mutex::new(proxy.clone())); @@ -471,12 +481,36 @@ impl Subscriptions { thread_pool.spawn_ok(future); - let _ = self.alive.insert(id, cancel); + let _ = self.alive.insert( + id, + Connection { + _cancel: cancel, + listener: if event_sender.is_closed() { + None + } else { + Some(event_sender) + }, + }, + ); } } self.alive.retain(|id, _| alive.contains(&id)); } + + fn send_event(&mut self, event: Event) { + self.alive + .values_mut() + .filter_map(|connection| connection.listener.as_mut()) + .for_each(|listener| { + if let Err(error) = listener.try_send(event) { + log::warn!( + "Error sending event to subscription: {:?}", + error + ); + } + }); + } } // As defined in: http://www.unicode.org/faq/private_use.html -- cgit From cdb7acf6c20fe13a09e75ea1c47d53ced6174698 Mon Sep 17 00:00:00 2001 From: Héctor Ramón Jiménez Date: Tue, 10 Dec 2019 03:43:00 +0100 Subject: Implement `Subscription::map` and `from_recipe` --- winit/src/application.rs | 20 ++++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) (limited to 'winit/src/application.rs') diff --git a/winit/src/application.rs b/winit/src/application.rs index 49a01320..fb31c44f 100644 --- a/winit/src/application.rs +++ b/winit/src/application.rs @@ -2,8 +2,8 @@ use crate::{ conversion, input::{keyboard, mouse}, renderer::{Target, Windowed}, - Cache, Command, Container, Debug, Element, Event, Length, MouseCursor, - Settings, Subscription, UserInterface, + Cache, Command, Container, Debug, Element, Event, Hasher, Length, + MouseCursor, Settings, Subscription, UserInterface, }; use std::collections::HashMap; @@ -448,11 +448,19 @@ impl Subscriptions { ) { use futures::{future::FutureExt, stream::StreamExt}; - let connections = subscriptions.connections(); + let recipes = subscriptions.recipes(); let mut alive = std::collections::HashSet::new(); - for connection in connections { - let id = connection.id(); + for recipe in recipes { + let id = { + use std::hash::Hasher as _; + + let mut hasher = Hasher::default(); + recipe.hash(&mut hasher); + + hasher.finish() + }; + let _ = alive.insert(id); if !self.alive.contains_key(&id) { @@ -460,7 +468,7 @@ impl Subscriptions { let (event_sender, event_receiver) = futures::channel::mpsc::channel(100); - let stream = connection.stream(event_receiver); + let stream = recipe.stream(event_receiver); let proxy = std::sync::Arc::new(std::sync::Mutex::new(proxy.clone())); -- cgit From c688452d7beb1b17ef8416fc101f8868767fc457 Mon Sep 17 00:00:00 2001 From: Héctor Ramón Jiménez Date: Sat, 14 Dec 2019 01:13:01 +0100 Subject: Consume `Recipe` when building a `Stream` --- winit/src/application.rs | 1 + 1 file changed, 1 insertion(+) (limited to 'winit/src/application.rs') diff --git a/winit/src/application.rs b/winit/src/application.rs index fb31c44f..67a035f7 100644 --- a/winit/src/application.rs +++ b/winit/src/application.rs @@ -470,6 +470,7 @@ impl Subscriptions { let stream = recipe.stream(event_receiver); + // TODO: Find out how to avoid using a mutex here let proxy = std::sync::Arc::new(std::sync::Mutex::new(proxy.clone())); -- cgit From 65ff3744a06ca5bcb2e1682862d49520bf7ccbfe Mon Sep 17 00:00:00 2001 From: Héctor Ramón Jiménez Date: Sat, 14 Dec 2019 01:33:31 +0100 Subject: Remove unnecessary event loop proxy `Mutex` I am not sure why I had to use it in the first place... --- winit/src/application.rs | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) (limited to 'winit/src/application.rs') diff --git a/winit/src/application.rs b/winit/src/application.rs index 67a035f7..25396b6f 100644 --- a/winit/src/application.rs +++ b/winit/src/application.rs @@ -469,17 +469,12 @@ impl Subscriptions { futures::channel::mpsc::channel(100); let stream = recipe.stream(event_receiver); - - // TODO: Find out how to avoid using a mutex here - let proxy = - std::sync::Arc::new(std::sync::Mutex::new(proxy.clone())); + let proxy = proxy.clone(); let future = futures::future::select( cancelled, stream.for_each(move |message| { proxy - .lock() - .expect("Acquire event loop proxy lock") .send_event(message) .expect("Send subscription result to event loop"); -- cgit From 293314405f5b8d4003db5ef8f428e659ae36872d Mon Sep 17 00:00:00 2001 From: Héctor Ramón Jiménez Date: Sat, 14 Dec 2019 04:49:13 +0100 Subject: Make `iced_native` subscription input opaque --- winit/src/application.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'winit/src/application.rs') diff --git a/winit/src/application.rs b/winit/src/application.rs index 25396b6f..2d30e9f3 100644 --- a/winit/src/application.rs +++ b/winit/src/application.rs @@ -468,7 +468,7 @@ impl Subscriptions { let (event_sender, event_receiver) = futures::channel::mpsc::channel(100); - let stream = recipe.stream(event_receiver); + let stream = recipe.stream(event_receiver.boxed()); let proxy = proxy.clone(); let future = futures::future::select( -- cgit From d6c3da21f7fe7a79bcfbc2a180dc111e42300a04 Mon Sep 17 00:00:00 2001 From: Héctor Ramón Jiménez Date: Sat, 14 Dec 2019 05:56:46 +0100 Subject: Write docs for subscriptions and reorganize a bit --- winit/src/application.rs | 124 +++++++---------------------------------------- 1 file changed, 18 insertions(+), 106 deletions(-) (limited to 'winit/src/application.rs') diff --git a/winit/src/application.rs b/winit/src/application.rs index 2d30e9f3..3b8ac16b 100644 --- a/winit/src/application.rs +++ b/winit/src/application.rs @@ -2,10 +2,9 @@ use crate::{ conversion, input::{keyboard, mouse}, renderer::{Target, Windowed}, - Cache, Command, Container, Debug, Element, Event, Hasher, Length, + subscription, Cache, Command, Container, Debug, Element, Event, Length, MouseCursor, Settings, Subscription, UserInterface, }; -use std::collections::HashMap; /// An interactive, native cross-platform application. /// @@ -58,8 +57,14 @@ pub trait Application: Sized { /// [`Command`]: struct.Command.html fn update(&mut self, message: Self::Message) -> Command; - /// TODO - fn subscriptions(&self) -> Subscription; + /// Returns the event `Subscription` for the current state of the + /// application. + /// + /// The messages produced by the `Subscription` will be handled by + /// [`update`](#tymethod.update). + /// + /// A `Subscription` will be kept alive as long as you keep returning it! + fn subscription(&self) -> Subscription; /// Returns the widgets to display in the [`Application`]. /// @@ -93,14 +98,14 @@ pub trait Application: Sized { let proxy = event_loop.create_proxy(); let mut thread_pool = futures::executor::ThreadPool::new().expect("Create thread pool"); - let mut alive_subscriptions = Subscriptions::new(); + let mut subscription_pool = subscription::Pool::new(); let mut external_messages = Vec::new(); let (mut application, init_command) = Self::new(); spawn(init_command, &mut thread_pool, &proxy); - let subscriptions = application.subscriptions(); - alive_subscriptions.update(subscriptions, &mut thread_pool, &proxy); + let subscription = application.subscription(); + subscription_pool.update(subscription, &mut thread_pool, &proxy); let mut title = application.title(); @@ -184,9 +189,9 @@ pub trait Application: Sized { debug.layout_finished(); debug.event_processing_started(); - events - .iter() - .for_each(|event| alive_subscriptions.send_event(*event)); + events.iter().for_each(|event| { + subscription_pool.broadcast_event(*event) + }); let mut messages = user_interface.update(&renderer, events.drain(..)); @@ -215,9 +220,9 @@ pub trait Application: Sized { debug.update_finished(); } - let subscriptions = application.subscriptions(); - alive_subscriptions.update( - subscriptions, + let subscription = application.subscription(); + subscription_pool.update( + subscription, &mut thread_pool, &proxy, ); @@ -424,99 +429,6 @@ fn spawn( } } -pub struct Subscriptions { - alive: HashMap, -} - -pub struct Connection { - _cancel: futures::channel::oneshot::Sender<()>, - listener: Option>, -} - -impl Subscriptions { - fn new() -> Self { - Self { - alive: HashMap::new(), - } - } - - fn update( - &mut self, - subscriptions: Subscription, - thread_pool: &mut futures::executor::ThreadPool, - proxy: &winit::event_loop::EventLoopProxy, - ) { - use futures::{future::FutureExt, stream::StreamExt}; - - let recipes = subscriptions.recipes(); - let mut alive = std::collections::HashSet::new(); - - for recipe in recipes { - let id = { - use std::hash::Hasher as _; - - let mut hasher = Hasher::default(); - recipe.hash(&mut hasher); - - hasher.finish() - }; - - let _ = alive.insert(id); - - if !self.alive.contains_key(&id) { - let (cancel, cancelled) = futures::channel::oneshot::channel(); - let (event_sender, event_receiver) = - futures::channel::mpsc::channel(100); - - let stream = recipe.stream(event_receiver.boxed()); - let proxy = proxy.clone(); - - let future = futures::future::select( - cancelled, - stream.for_each(move |message| { - proxy - .send_event(message) - .expect("Send subscription result to event loop"); - - futures::future::ready(()) - }), - ) - .map(|_| ()); - - thread_pool.spawn_ok(future); - - let _ = self.alive.insert( - id, - Connection { - _cancel: cancel, - listener: if event_sender.is_closed() { - None - } else { - Some(event_sender) - }, - }, - ); - } - } - - self.alive.retain(|id, _| alive.contains(&id)); - } - - fn send_event(&mut self, event: Event) { - self.alive - .values_mut() - .filter_map(|connection| connection.listener.as_mut()) - .for_each(|listener| { - if let Err(error) = listener.try_send(event) { - log::warn!( - "Error sending event to subscription: {:?}", - error - ); - } - }); - } -} - // As defined in: http://www.unicode.org/faq/private_use.html // TODO: Remove once https://github.com/rust-windowing/winit/pull/1254 lands fn is_private_use_character(c: char) -> bool { -- cgit