diff options
author | 2019-12-07 08:51:44 +0100 | |
---|---|---|
committer | 2019-12-07 08:51:44 +0100 | |
commit | 48145ba51e045f8b0b4788f3a75d20b9d9b7e6ad (patch) | |
tree | 0dbd544f93a737ef64df648eafee3aa59c6bae62 /winit/src | |
parent | e55dfa75510aa11ec197796668a772f3be4c52c7 (diff) | |
download | iced-48145ba51e045f8b0b4788f3a75d20b9d9b7e6ad.tar.gz iced-48145ba51e045f8b0b4788f3a75d20b9d9b7e6ad.tar.bz2 iced-48145ba51e045f8b0b4788f3a75d20b9d9b7e6ad.zip |
Use `oneshot` and `future::select` to cancel streams
Diffstat (limited to 'winit/src')
-rw-r--r-- | winit/src/application.rs | 50 |
1 files changed, 24 insertions, 26 deletions
diff --git a/winit/src/application.rs b/winit/src/application.rs index 959e142d..62970810 100644 --- a/winit/src/application.rs +++ b/winit/src/application.rs @@ -2,8 +2,8 @@ use crate::{ conversion, input::{keyboard, mouse}, renderer::{Target, Windowed}, - subscription, Cache, Command, Container, Debug, Element, Event, Length, - MouseCursor, Settings, Subscription, UserInterface, + Cache, Command, Container, Debug, Element, Event, Length, MouseCursor, + Settings, Subscription, UserInterface, }; use std::collections::HashMap; @@ -420,7 +420,7 @@ fn spawn<Message: Send>( } pub struct Subscriptions { - alive: HashMap<u64, Box<dyn subscription::Handle>>, + alive: HashMap<u64, futures::channel::oneshot::Sender<()>>, } impl Subscriptions { @@ -436,46 +436,44 @@ impl Subscriptions { thread_pool: &mut futures::executor::ThreadPool, proxy: &winit::event_loop::EventLoopProxy<Message>, ) { - use futures::stream::StreamExt; + use futures::{future::FutureExt, stream::StreamExt}; - let definitions = subscriptions.definitions(); + let handles = subscriptions.handles(); let mut alive = std::collections::HashSet::new(); - for definition in definitions { - let id = definition.id(); + for handle in handles { + let id = handle.id(); let _ = alive.insert(id); if !self.alive.contains_key(&id) { - let (stream, handle) = definition.stream(); + let (cancel, cancelled) = futures::channel::oneshot::channel(); + + let stream = handle.stream(); let proxy = std::sync::Arc::new(std::sync::Mutex::new(proxy.clone())); - let future = stream.for_each(move |message| { - proxy - .lock() - .expect("Acquire event loop proxy lock") - .send_event(message) - .expect("Send subscription result to event loop"); + let future = futures::future::select( + cancelled, + stream.for_each(move |message| { + proxy + .lock() + .expect("Acquire event loop proxy lock") + .send_event(message) + .expect("Send subscription result to event loop"); - futures::future::ready(()) - }); + futures::future::ready(()) + }), + ) + .map(|_| ()); thread_pool.spawn_ok(future); - let _ = self.alive.insert(id, handle); + let _ = self.alive.insert(id, cancel); } } - self.alive.retain(|id, handle| { - let is_still_alive = alive.contains(&id); - - if !is_still_alive { - handle.cancel(); - } - - is_still_alive - }); + self.alive.retain(|id, _| alive.contains(&id)); } } |