summaryrefslogtreecommitdiffstats
path: root/futures/src/subscription
diff options
context:
space:
mode:
authorLibravatar Héctor Ramón Jiménez <hector0193@gmail.com>2020-01-19 10:17:08 +0100
committerLibravatar Héctor Ramón Jiménez <hector0193@gmail.com>2020-01-19 10:17:44 +0100
commitb5b17ed4d800c03beb3ad535d1069a7784e8dc1d (patch)
treeb9e6477bd11bd6784f8ee61e818b5f5ff1a44318 /futures/src/subscription
parentd50ff9b5d97d9c3d6c6c70a9b4efe764b6126c86 (diff)
downloadiced-b5b17ed4d800c03beb3ad535d1069a7784e8dc1d.tar.gz
iced-b5b17ed4d800c03beb3ad535d1069a7784e8dc1d.tar.bz2
iced-b5b17ed4d800c03beb3ad535d1069a7784e8dc1d.zip
Create `iced_futures` and wire everything up
Diffstat (limited to 'futures/src/subscription')
-rw-r--r--futures/src/subscription/tracker.rs112
1 files changed, 112 insertions, 0 deletions
diff --git a/futures/src/subscription/tracker.rs b/futures/src/subscription/tracker.rs
new file mode 100644
index 00000000..a942b619
--- /dev/null
+++ b/futures/src/subscription/tracker.rs
@@ -0,0 +1,112 @@
+use crate::Subscription;
+
+use futures::{future::BoxFuture, sink::Sink};
+use std::collections::HashMap;
+use std::marker::PhantomData;
+
+#[derive(Debug)]
+pub struct Tracker<Hasher, Event> {
+ subscriptions: HashMap<u64, Execution<Event>>,
+ _hasher: PhantomData<Hasher>,
+}
+
+#[derive(Debug)]
+pub struct Execution<Event> {
+ _cancel: futures::channel::oneshot::Sender<()>,
+ listener: Option<futures::channel::mpsc::Sender<Event>>,
+}
+
+impl<Hasher, Event> Tracker<Hasher, Event>
+where
+ Hasher: std::hash::Hasher + Default,
+ Event: 'static + Send + Clone,
+{
+ pub fn new() -> Self {
+ Self {
+ subscriptions: HashMap::new(),
+ _hasher: PhantomData,
+ }
+ }
+
+ pub fn update<Message, Receiver>(
+ &mut self,
+ subscription: Subscription<Hasher, Event, Message>,
+ receiver: Receiver,
+ ) -> Vec<BoxFuture<'static, ()>>
+ where
+ Message: 'static + Send,
+ Receiver: 'static
+ + Sink<Message, Error = core::convert::Infallible>
+ + Unpin
+ + Send
+ + Clone,
+ {
+ use futures::{future::FutureExt, stream::StreamExt};
+
+ let mut futures = Vec::new();
+
+ let recipes = subscription.recipes();
+ let mut alive = std::collections::HashSet::new();
+
+ for recipe in recipes {
+ let id = {
+ let mut hasher = Hasher::default();
+ recipe.hash(&mut hasher);
+
+ hasher.finish()
+ };
+
+ let _ = alive.insert(id);
+
+ if self.subscriptions.contains_key(&id) {
+ continue;
+ }
+
+ let (cancel, cancelled) = futures::channel::oneshot::channel();
+
+ // TODO: Use bus if/when it supports async
+ let (event_sender, event_receiver) =
+ futures::channel::mpsc::channel(100);
+
+ let stream = recipe.stream(event_receiver.boxed());
+
+ let future = futures::future::select(
+ cancelled,
+ stream.map(Ok).forward(receiver.clone()),
+ )
+ .map(|_| ());
+
+ let _ = self.subscriptions.insert(
+ id,
+ Execution {
+ _cancel: cancel,
+ listener: if event_sender.is_closed() {
+ None
+ } else {
+ Some(event_sender)
+ },
+ },
+ );
+
+ futures.push(future.boxed());
+ }
+
+ self.subscriptions.retain(|id, _| alive.contains(&id));
+
+ futures
+ }
+
+ pub fn broadcast(&mut self, event: Event) {
+ self.subscriptions
+ .values_mut()
+ .filter_map(|connection| connection.listener.as_mut())
+ .for_each(|listener| {
+ if let Err(error) = listener.try_send(event.clone()) {
+ log::error!(
+ "Error sending event to subscription: {:?}",
+ error
+ );
+ }
+ });
+ }
+}