From b5b17ed4d800c03beb3ad535d1069a7784e8dc1d Mon Sep 17 00:00:00 2001 From: Héctor Ramón Jiménez Date: Sun, 19 Jan 2020 10:17:08 +0100 Subject: Create `iced_futures` and wire everything up --- futures/src/subscription/tracker.rs | 112 ++++++++++++++++++++++++++++++++++++ 1 file changed, 112 insertions(+) create mode 100644 futures/src/subscription/tracker.rs (limited to 'futures/src/subscription/tracker.rs') diff --git a/futures/src/subscription/tracker.rs b/futures/src/subscription/tracker.rs new file mode 100644 index 00000000..a942b619 --- /dev/null +++ b/futures/src/subscription/tracker.rs @@ -0,0 +1,112 @@ +use crate::Subscription; + +use futures::{future::BoxFuture, sink::Sink}; +use std::collections::HashMap; +use std::marker::PhantomData; + +#[derive(Debug)] +pub struct Tracker { + subscriptions: HashMap>, + _hasher: PhantomData, +} + +#[derive(Debug)] +pub struct Execution { + _cancel: futures::channel::oneshot::Sender<()>, + listener: Option>, +} + +impl Tracker +where + Hasher: std::hash::Hasher + Default, + Event: 'static + Send + Clone, +{ + pub fn new() -> Self { + Self { + subscriptions: HashMap::new(), + _hasher: PhantomData, + } + } + + pub fn update( + &mut self, + subscription: Subscription, + receiver: Receiver, + ) -> Vec> + where + Message: 'static + Send, + Receiver: 'static + + Sink + + Unpin + + Send + + Clone, + { + use futures::{future::FutureExt, stream::StreamExt}; + + let mut futures = Vec::new(); + + let recipes = subscription.recipes(); + let mut alive = std::collections::HashSet::new(); + + for recipe in recipes { + let id = { + let mut hasher = Hasher::default(); + recipe.hash(&mut hasher); + + hasher.finish() + }; + + let _ = alive.insert(id); + + if self.subscriptions.contains_key(&id) { + continue; + } + + let (cancel, cancelled) = futures::channel::oneshot::channel(); + + // TODO: Use bus if/when it supports async + let (event_sender, event_receiver) = + futures::channel::mpsc::channel(100); + + let stream = recipe.stream(event_receiver.boxed()); + + let future = futures::future::select( + cancelled, + stream.map(Ok).forward(receiver.clone()), + ) + .map(|_| ()); + + let _ = self.subscriptions.insert( + id, + Execution { + _cancel: cancel, + listener: if event_sender.is_closed() { + None + } else { + Some(event_sender) + }, + }, + ); + + futures.push(future.boxed()); + } + + self.subscriptions.retain(|id, _| alive.contains(&id)); + + futures + } + + pub fn broadcast(&mut self, event: Event) { + self.subscriptions + .values_mut() + .filter_map(|connection| connection.listener.as_mut()) + .for_each(|listener| { + if let Err(error) = listener.try_send(event.clone()) { + log::error!( + "Error sending event to subscription: {:?}", + error + ); + } + }); + } +} -- cgit From f14009601e270e43bdf29b8f4842cf136fbbd8b9 Mon Sep 17 00:00:00 2001 From: Héctor Ramón Jiménez Date: Mon, 20 Jan 2020 09:49:17 +0100 Subject: Write documentation for `iced_futures` --- futures/src/subscription/tracker.rs | 36 ++++++++++++++++++++++++++++++++++++ 1 file changed, 36 insertions(+) (limited to 'futures/src/subscription/tracker.rs') diff --git a/futures/src/subscription/tracker.rs b/futures/src/subscription/tracker.rs index a942b619..c8a1ee18 100644 --- a/futures/src/subscription/tracker.rs +++ b/futures/src/subscription/tracker.rs @@ -4,6 +4,11 @@ use futures::{future::BoxFuture, sink::Sink}; use std::collections::HashMap; use std::marker::PhantomData; +/// A registry of subscription streams. +/// +/// If you have an application that continuously returns a [`Subscription`], +/// you can use a [`Tracker`] to keep track of the different recipes and keep +/// its executions alive. #[derive(Debug)] pub struct Tracker { subscriptions: HashMap>, @@ -21,6 +26,9 @@ where Hasher: std::hash::Hasher + Default, Event: 'static + Send + Clone, { + /// Creates a new empty [`Tracker`]. + /// + /// [`Tracker`]: struct.Tracker.html pub fn new() -> Self { Self { subscriptions: HashMap::new(), @@ -28,6 +36,26 @@ where } } + /// Updates the [`Tracker`] with the given [`Subscription`]. + /// + /// A [`Subscription`] can cause new streams to be spawned or old streams + /// to be closed. + /// + /// The [`Tracker`] keeps track of these streams between calls to this + /// method: + /// + /// - If the provided [`Subscription`] contains a new [`Recipe`] that is + /// currently not being run, it will spawn a new stream and keep it alive. + /// - On the other hand, if a [`Recipe`] is currently in execution and the + /// provided [`Subscription`] does not contain it anymore, then the + /// [`Tracker`] will close and drop the relevant stream. + /// + /// It returns a list of futures that need to be spawned to materialize + /// the [`Tracker`] changes. + /// + /// [`Tracker`]: struct.Tracker.html + /// [`Subscription`]: struct.Subscription.html + /// [`Recipe`]: trait.Recipe.html pub fn update( &mut self, subscription: Subscription, @@ -96,6 +124,14 @@ where futures } + /// Broadcasts an event to the subscriptions currently alive. + /// + /// A subscription's [`Recipe::stream`] always receives a stream of events + /// as input. This stream can be used by some subscription to listen to + /// shell events. + /// + /// This method publishes the given event to all the subscription streams + /// currently open. pub fn broadcast(&mut self, event: Event) { self.subscriptions .values_mut() -- cgit