From d6c3da21f7fe7a79bcfbc2a180dc111e42300a04 Mon Sep 17 00:00:00 2001 From: Héctor Ramón Jiménez Date: Sat, 14 Dec 2019 05:56:46 +0100 Subject: Write docs for subscriptions and reorganize a bit --- winit/src/subscription.rs | 97 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 97 insertions(+) create mode 100644 winit/src/subscription.rs (limited to 'winit/src/subscription.rs') diff --git a/winit/src/subscription.rs b/winit/src/subscription.rs new file mode 100644 index 00000000..610bdc10 --- /dev/null +++ b/winit/src/subscription.rs @@ -0,0 +1,97 @@ +use iced_native::{Event, Hasher, Subscription}; +use std::collections::HashMap; + +pub struct Pool { + alive: HashMap, +} + +pub struct Handle { + _cancel: futures::channel::oneshot::Sender<()>, + listener: Option>, +} + +impl Pool { + pub fn new() -> Self { + Self { + alive: HashMap::new(), + } + } + + pub fn update( + &mut self, + subscription: Subscription, + thread_pool: &mut futures::executor::ThreadPool, + proxy: &winit::event_loop::EventLoopProxy, + ) { + use futures::{future::FutureExt, stream::StreamExt}; + + let recipes = subscription.recipes(); + let mut alive = std::collections::HashSet::new(); + + for recipe in recipes { + let id = { + use std::hash::Hasher as _; + + let mut hasher = Hasher::default(); + recipe.hash(&mut hasher); + + hasher.finish() + }; + + let _ = alive.insert(id); + + if !self.alive.contains_key(&id) { + let (cancel, cancelled) = futures::channel::oneshot::channel(); + + // TODO: Use bus if/when it supports async + let (event_sender, event_receiver) = + futures::channel::mpsc::channel(100); + + let stream = recipe.stream(event_receiver.boxed()); + let proxy = proxy.clone(); + + let future = futures::future::select( + cancelled, + stream.for_each(move |message| { + proxy + .send_event(message) + .expect("Send subscription result to event loop"); + + futures::future::ready(()) + }), + ) + .map(|_| ()); + + thread_pool.spawn_ok(future); + + let _ = self.alive.insert( + id, + Handle { + _cancel: cancel, + listener: if event_sender.is_closed() { + None + } else { + Some(event_sender) + }, + }, + ); + } + } + + self.alive.retain(|id, _| alive.contains(&id)); + } + + pub fn broadcast_event(&mut self, event: Event) { + self.alive + .values_mut() + .filter_map(|connection| connection.listener.as_mut()) + .for_each(|listener| { + if let Err(error) = listener.try_send(event) { + log::warn!( + "Error sending event to subscription: {:?}", + error + ); + } + }); + } +} -- cgit