diff options
-rw-r--r-- | core/src/subscription.rs | 37 | ||||
-rw-r--r-- | examples/clock.rs | 49 | ||||
-rw-r--r-- | winit/src/application.rs | 50 |
3 files changed, 48 insertions, 88 deletions
diff --git a/core/src/subscription.rs b/core/src/subscription.rs index 1e6695d6..796982c7 100644 --- a/core/src/subscription.rs +++ b/core/src/subscription.rs @@ -2,60 +2,51 @@ /// An event subscription. pub struct Subscription<T> { - definitions: Vec<Box<dyn Definition<Message = T>>>, + handles: Vec<Box<dyn Handle<Output = T>>>, } impl<T> Subscription<T> { pub fn none() -> Self { Self { - definitions: Vec::new(), + handles: Vec::new(), } } pub fn batch(subscriptions: impl Iterator<Item = Subscription<T>>) -> Self { Self { - definitions: subscriptions - .flat_map(|subscription| subscription.definitions) + handles: subscriptions + .flat_map(|subscription| subscription.handles) .collect(), } } - pub fn definitions(self) -> Vec<Box<dyn Definition<Message = T>>> { - self.definitions + pub fn handles(self) -> Vec<Box<dyn Handle<Output = T>>> { + self.handles } } impl<T, A> From<A> for Subscription<T> where - A: Definition<Message = T> + 'static, + A: Handle<Output = T> + 'static, { - fn from(definition: A) -> Self { + fn from(handle: A) -> Self { Self { - definitions: vec![Box::new(definition)], + handles: vec![Box::new(handle)], } } } -/// The definition of an event subscription. -pub trait Definition { - type Message; +/// The handle of an event subscription. +pub trait Handle { + type Output; fn id(&self) -> u64; - fn stream( - &self, - ) -> ( - futures::stream::BoxStream<'static, Self::Message>, - Box<dyn Handle>, - ); -} - -pub trait Handle { - fn cancel(&mut self); + fn stream(&self) -> futures::stream::BoxStream<'static, Self::Output>; } impl<T> std::fmt::Debug for Subscription<T> { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("Command").finish() + f.debug_struct("Subscription").finish() } } diff --git a/examples/clock.rs b/examples/clock.rs index b1ee8ab1..5a404bfa 100644 --- a/examples/clock.rs +++ b/examples/clock.rs @@ -82,7 +82,7 @@ impl Application for Clock { } mod time { - use std::sync::{Arc, Mutex}; + use std::sync::Arc; pub fn every<Message>( duration: std::time::Duration, @@ -108,59 +108,30 @@ mod time { >, } - struct TickState { - alive: Arc<Mutex<bool>>, - } - - impl iced::subscription::Handle for TickState { - fn cancel(&mut self) { - match self.alive.lock() { - Ok(mut guard) => *guard = false, - _ => {} - } - } - } - - impl<Message> iced::subscription::Definition for Tick<Message> + impl<Message> iced::subscription::Handle for Tick<Message> where Message: 'static, { - type Message = Message; + type Output = Message; fn id(&self) -> u64 { 0 } - fn stream( - &self, - ) -> ( - futures::stream::BoxStream<'static, Message>, - Box<dyn iced::subscription::Handle>, - ) { + fn stream(&self) -> futures::stream::BoxStream<'static, Message> { use futures::StreamExt; let duration = self.duration.clone(); let function = self.message.clone(); - let alive = Arc::new(Mutex::new(true)); - - let state = TickState { - alive: alive.clone(), - }; - - let stream = futures::stream::poll_fn(move |_| { - std::thread::sleep(duration); - - if !*alive.lock().unwrap() { - return std::task::Poll::Ready(None); - } - let now = chrono::Local::now(); + let stream = + futures::stream::iter(std::iter::repeat(())).map(move |_| { + std::thread::sleep(duration); - std::task::Poll::Ready(Some(now)) - }) - .map(move |time| function(time)); + function(chrono::Local::now()) + }); - (stream.boxed(), Box::new(state)) + stream.boxed() } } } diff --git a/winit/src/application.rs b/winit/src/application.rs index 959e142d..62970810 100644 --- a/winit/src/application.rs +++ b/winit/src/application.rs @@ -2,8 +2,8 @@ use crate::{ conversion, input::{keyboard, mouse}, renderer::{Target, Windowed}, - subscription, Cache, Command, Container, Debug, Element, Event, Length, - MouseCursor, Settings, Subscription, UserInterface, + Cache, Command, Container, Debug, Element, Event, Length, MouseCursor, + Settings, Subscription, UserInterface, }; use std::collections::HashMap; @@ -420,7 +420,7 @@ fn spawn<Message: Send>( } pub struct Subscriptions { - alive: HashMap<u64, Box<dyn subscription::Handle>>, + alive: HashMap<u64, futures::channel::oneshot::Sender<()>>, } impl Subscriptions { @@ -436,46 +436,44 @@ impl Subscriptions { thread_pool: &mut futures::executor::ThreadPool, proxy: &winit::event_loop::EventLoopProxy<Message>, ) { - use futures::stream::StreamExt; + use futures::{future::FutureExt, stream::StreamExt}; - let definitions = subscriptions.definitions(); + let handles = subscriptions.handles(); let mut alive = std::collections::HashSet::new(); - for definition in definitions { - let id = definition.id(); + for handle in handles { + let id = handle.id(); let _ = alive.insert(id); if !self.alive.contains_key(&id) { - let (stream, handle) = definition.stream(); + let (cancel, cancelled) = futures::channel::oneshot::channel(); + + let stream = handle.stream(); let proxy = std::sync::Arc::new(std::sync::Mutex::new(proxy.clone())); - let future = stream.for_each(move |message| { - proxy - .lock() - .expect("Acquire event loop proxy lock") - .send_event(message) - .expect("Send subscription result to event loop"); + let future = futures::future::select( + cancelled, + stream.for_each(move |message| { + proxy + .lock() + .expect("Acquire event loop proxy lock") + .send_event(message) + .expect("Send subscription result to event loop"); - futures::future::ready(()) - }); + futures::future::ready(()) + }), + ) + .map(|_| ()); thread_pool.spawn_ok(future); - let _ = self.alive.insert(id, handle); + let _ = self.alive.insert(id, cancel); } } - self.alive.retain(|id, handle| { - let is_still_alive = alive.contains(&id); - - if !is_still_alive { - handle.cancel(); - } - - is_still_alive - }); + self.alive.retain(|id, _| alive.contains(&id)); } } |