summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorLibravatar Héctor Ramón Jiménez <hector0193@gmail.com>2020-01-19 08:36:44 +0100
committerLibravatar Héctor Ramón Jiménez <hector0193@gmail.com>2020-01-19 08:36:44 +0100
commit32f7ca261f0655938ae7c8919599b020ddea8ff8 (patch)
tree65e0e9b440c70225b42a8125f94bc17ca1e77201
parent6ca5e6184f9f1c12b427bdafcce0b4e9fbc5bb14 (diff)
downloadiced-32f7ca261f0655938ae7c8919599b020ddea8ff8.tar.gz
iced-32f7ca261f0655938ae7c8919599b020ddea8ff8.tar.bz2
iced-32f7ca261f0655938ae7c8919599b020ddea8ff8.zip
Implement `subscription::Tracker` in `iced_core`
-rw-r--r--core/Cargo.toml3
-rw-r--r--core/src/lib.rs2
-rw-r--r--core/src/subscription.rs11
-rw-r--r--core/src/subscription/tracker.rs112
-rw-r--r--examples/stopwatch.rs2
-rw-r--r--native/src/subscription.rs7
-rw-r--r--native/src/subscription/events.rs2
7 files changed, 131 insertions, 8 deletions
diff --git a/core/Cargo.toml b/core/Cargo.toml
index 0a8fd8ef..4e019ba9 100644
--- a/core/Cargo.toml
+++ b/core/Cargo.toml
@@ -11,7 +11,8 @@ repository = "https://github.com/hecrj/iced"
# Exposes a future-based `Command` type
command = ["futures"]
# Exposes a future-based `Subscription` type
-subscription = ["futures"]
+subscription = ["futures", "log"]
[dependencies]
futures = { version = "0.3", optional = true }
+log = { version = "0.4", optional = true }
diff --git a/core/src/lib.rs b/core/src/lib.rs
index 821b09c1..6f13c310 100644
--- a/core/src/lib.rs
+++ b/core/src/lib.rs
@@ -9,7 +9,7 @@
//! [Iced]: https://github.com/hecrj/iced
//! [`iced_native`]: https://github.com/hecrj/iced/tree/master/native
//! [`iced_web`]: https://github.com/hecrj/iced/tree/master/web
-#![deny(missing_docs)]
+//#![deny(missing_docs)]
#![deny(missing_debug_implementations)]
#![deny(unused_results)]
#![deny(unsafe_code)]
diff --git a/core/src/subscription.rs b/core/src/subscription.rs
index d9e7e388..87e51e48 100644
--- a/core/src/subscription.rs
+++ b/core/src/subscription.rs
@@ -1,4 +1,9 @@
//! Listen to external events in your application.
+mod tracker;
+
+pub use tracker::Tracker;
+
+use futures::stream::BoxStream;
/// A request to listen to external events.
///
@@ -134,8 +139,8 @@ pub trait Recipe<Hasher: std::hash::Hasher, Input> {
/// [`Recipe`]: trait.Recipe.html
fn stream(
self: Box<Self>,
- input: Input,
- ) -> futures::stream::BoxStream<'static, Self::Output>;
+ input: BoxStream<'static, Input>,
+ ) -> BoxStream<'static, Self::Output>;
}
struct Map<Hasher, Input, A, B> {
@@ -169,7 +174,7 @@ where
fn stream(
self: Box<Self>,
- input: I,
+ input: BoxStream<'static, I>,
) -> futures::stream::BoxStream<'static, Self::Output> {
use futures::StreamExt;
diff --git a/core/src/subscription/tracker.rs b/core/src/subscription/tracker.rs
new file mode 100644
index 00000000..826f60c0
--- /dev/null
+++ b/core/src/subscription/tracker.rs
@@ -0,0 +1,112 @@
+use crate::Subscription;
+
+use futures::{future::BoxFuture, sink::Sink};
+use std::collections::HashMap;
+use std::marker::PhantomData;
+
+#[derive(Debug)]
+pub struct Tracker<Hasher, Event> {
+ subscriptions: HashMap<u64, Execution<Event>>,
+ _hasher: PhantomData<Hasher>,
+}
+
+#[derive(Debug)]
+pub struct Execution<Event> {
+ _cancel: futures::channel::oneshot::Sender<()>,
+ listener: Option<futures::channel::mpsc::Sender<Event>>,
+}
+
+impl<Hasher, Event> Tracker<Hasher, Event>
+where
+ Hasher: std::hash::Hasher + Default,
+ Event: 'static + Send + Clone,
+{
+ pub fn new() -> Self {
+ Self {
+ subscriptions: HashMap::new(),
+ _hasher: PhantomData,
+ }
+ }
+
+ pub fn update<Message, S>(
+ &mut self,
+ subscription: Subscription<Hasher, Event, Message>,
+ sink: S,
+ ) -> Vec<BoxFuture<'static, ()>>
+ where
+ Message: 'static + Send,
+ S: 'static
+ + Sink<Message, Error = core::convert::Infallible>
+ + Unpin
+ + Send
+ + Clone,
+ {
+ use futures::{future::FutureExt, stream::StreamExt};
+
+ let mut futures = Vec::new();
+
+ let recipes = subscription.recipes();
+ let mut alive = std::collections::HashSet::new();
+
+ for recipe in recipes {
+ let id = {
+ let mut hasher = Hasher::default();
+ recipe.hash(&mut hasher);
+
+ hasher.finish()
+ };
+
+ let _ = alive.insert(id);
+
+ if self.subscriptions.contains_key(&id) {
+ continue;
+ }
+
+ let (cancel, cancelled) = futures::channel::oneshot::channel();
+
+ // TODO: Use bus if/when it supports async
+ let (event_sender, event_receiver) =
+ futures::channel::mpsc::channel(100);
+
+ let stream = recipe.stream(event_receiver.boxed());
+
+ let future = futures::future::select(
+ cancelled,
+ stream.map(Ok).forward(sink.clone()),
+ )
+ .map(|_| ());
+
+ let _ = self.subscriptions.insert(
+ id,
+ Execution {
+ _cancel: cancel,
+ listener: if event_sender.is_closed() {
+ None
+ } else {
+ Some(event_sender)
+ },
+ },
+ );
+
+ futures.push(future.boxed());
+ }
+
+ self.subscriptions.retain(|id, _| alive.contains(&id));
+
+ futures
+ }
+
+ pub fn broadcast(&mut self, event: Event) {
+ self.subscriptions
+ .values_mut()
+ .filter_map(|connection| connection.listener.as_mut())
+ .for_each(|listener| {
+ if let Err(error) = listener.try_send(event.clone()) {
+ log::error!(
+ "Error sending event to subscription: {:?}",
+ error
+ );
+ }
+ });
+ }
+}
diff --git a/examples/stopwatch.rs b/examples/stopwatch.rs
index c9a61ee9..2bc85c4d 100644
--- a/examples/stopwatch.rs
+++ b/examples/stopwatch.rs
@@ -165,7 +165,7 @@ mod time {
fn stream(
self: Box<Self>,
- _input: I,
+ _input: futures::stream::BoxStream<'static, I>,
) -> futures::stream::BoxStream<'static, Self::Output> {
use futures::stream::StreamExt;
diff --git a/native/src/subscription.rs b/native/src/subscription.rs
index db88867a..cd0822c1 100644
--- a/native/src/subscription.rs
+++ b/native/src/subscription.rs
@@ -15,7 +15,7 @@ use futures::stream::BoxStream;
///
/// [`Command`]: ../struct.Command.html
/// [`Subscription`]: struct.Subscription.html
-pub type Subscription<T> = iced_core::Subscription<Hasher, EventStream, T>;
+pub type Subscription<T> = iced_core::Subscription<Hasher, Event, T>;
/// A stream of runtime events.
///
@@ -24,6 +24,11 @@ pub type Subscription<T> = iced_core::Subscription<Hasher, EventStream, T>;
/// [`Subscription`]: type.Subscription.html
pub type EventStream = BoxStream<'static, Event>;
+/// A native [`Subscription`] tracker.
+///
+/// [`Subscription`]: type.Subscription.html
+pub type Tracker = iced_core::subscription::Tracker<Hasher, Event>;
+
pub use iced_core::subscription::Recipe;
mod events;
diff --git a/native/src/subscription/events.rs b/native/src/subscription/events.rs
index b7301828..6ff2c0fb 100644
--- a/native/src/subscription/events.rs
+++ b/native/src/subscription/events.rs
@@ -5,7 +5,7 @@ use crate::{
pub struct Events;
-impl Recipe<Hasher, EventStream> for Events {
+impl Recipe<Hasher, Event> for Events {
type Output = Event;
fn hash(&self, state: &mut Hasher) {