From b5b17ed4d800c03beb3ad535d1069a7784e8dc1d Mon Sep 17 00:00:00 2001 From: Héctor Ramón Jiménez Date: Sun, 19 Jan 2020 10:17:08 +0100 Subject: Create `iced_futures` and wire everything up --- Cargo.toml | 1 + core/Cargo.toml | 10 -- core/src/command.rs | 100 ------------------- core/src/lib.rs | 18 ---- core/src/runtime.rs | 74 -------------- core/src/runtime/executor.rs | 11 --- core/src/subscription.rs | 188 ------------------------------------ core/src/subscription/tracker.rs | 112 --------------------- futures/Cargo.toml | 23 +++++ futures/src/command.rs | 100 +++++++++++++++++++ futures/src/lib.rs | 8 ++ futures/src/runtime.rs | 79 +++++++++++++++ futures/src/runtime/executor.rs | 26 +++++ futures/src/subscription.rs | 188 ++++++++++++++++++++++++++++++++++++ futures/src/subscription/tracker.rs | 112 +++++++++++++++++++++ native/Cargo.toml | 9 +- native/src/lib.rs | 7 +- native/src/runtime.rs | 14 +++ native/src/subscription.rs | 6 +- web/Cargo.toml | 9 +- web/src/lib.rs | 6 +- web/src/subscription.rs | 4 +- winit/Cargo.toml | 12 ++- winit/src/application.rs | 57 ++++------- winit/src/lib.rs | 3 +- winit/src/proxy.rs | 57 +++++++++++ winit/src/subscription.rs | 97 ------------------- 27 files changed, 666 insertions(+), 665 deletions(-) delete mode 100644 core/src/command.rs delete mode 100644 core/src/runtime.rs delete mode 100644 core/src/runtime/executor.rs delete mode 100644 core/src/subscription.rs delete mode 100644 core/src/subscription/tracker.rs create mode 100644 futures/Cargo.toml create mode 100644 futures/src/command.rs create mode 100644 futures/src/lib.rs create mode 100644 futures/src/runtime.rs create mode 100644 futures/src/runtime/executor.rs create mode 100644 futures/src/subscription.rs create mode 100644 futures/src/subscription/tracker.rs create mode 100644 native/src/runtime.rs create mode 100644 winit/src/proxy.rs delete mode 100644 winit/src/subscription.rs diff --git a/Cargo.toml b/Cargo.toml index aeb8382e..fbe3b9f2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,6 +23,7 @@ maintenance = { status = "actively-developed" } [workspace] members = [ "core", + "futures", "native", "style", "web", diff --git a/core/Cargo.toml b/core/Cargo.toml index 5e1a5532..22bc7ceb 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -7,14 +7,4 @@ description = "The essential concepts of Iced" license = "MIT" repository = "https://github.com/hecrj/iced" -[features] -# Exposes a future-based `Command` type -command = ["futures"] -# Exposes a future-based `Subscription` type -subscription = ["futures", "log"] -# Exposes a `runtime` module meant to abstract over different future executors -runtime = ["command", "subscription"] - [dependencies] -futures = { version = "0.3", optional = true } -log = { version = "0.4", optional = true } diff --git a/core/src/command.rs b/core/src/command.rs deleted file mode 100644 index e7885fb8..00000000 --- a/core/src/command.rs +++ /dev/null @@ -1,100 +0,0 @@ -use futures::future::{BoxFuture, Future, FutureExt}; - -/// A collection of async operations. -/// -/// You should be able to turn a future easily into a [`Command`], either by -/// using the `From` trait or [`Command::perform`]. -/// -/// [`Command`]: struct.Command.html -pub struct Command { - futures: Vec>, -} - -impl Command { - /// Creates an empty [`Command`]. - /// - /// In other words, a [`Command`] that does nothing. - /// - /// [`Command`]: struct.Command.html - pub fn none() -> Self { - Self { - futures: Vec::new(), - } - } - - /// Creates a [`Command`] that performs the action of the given future. - /// - /// [`Command`]: struct.Command.html - pub fn perform( - future: impl Future + 'static + Send, - f: impl Fn(T) -> A + 'static + Send, - ) -> Command { - Command { - futures: vec![future.map(f).boxed()], - } - } - - /// Applies a transformation to the result of a [`Command`]. - /// - /// [`Command`]: struct.Command.html - pub fn map( - mut self, - f: impl Fn(T) -> A + 'static + Send + Sync, - ) -> Command - where - T: 'static, - { - let f = std::sync::Arc::new(f); - - Command { - futures: self - .futures - .drain(..) - .map(|future| { - let f = f.clone(); - - future.map(move |result| f(result)).boxed() - }) - .collect(), - } - } - - /// Creates a [`Command`] that performs the actions of all the given - /// commands. - /// - /// Once this command is run, all the commands will be exectued at once. - /// - /// [`Command`]: struct.Command.html - pub fn batch(commands: impl IntoIterator>) -> Self { - Self { - futures: commands - .into_iter() - .flat_map(|command| command.futures) - .collect(), - } - } - - /// Converts a [`Command`] into its underlying list of futures. - /// - /// [`Command`]: struct.Command.html - pub fn futures(self) -> Vec> { - self.futures - } -} - -impl From for Command -where - A: Future + 'static + Send, -{ - fn from(future: A) -> Self { - Self { - futures: vec![future.boxed()], - } - } -} - -impl std::fmt::Debug for Command { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("Command").finish() - } -} diff --git a/core/src/lib.rs b/core/src/lib.rs index 760acefe..bec307ad 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -32,21 +32,3 @@ pub use length::Length; pub use point::Point; pub use rectangle::Rectangle; pub use vector::Vector; - -#[cfg(feature = "command")] -mod command; - -#[cfg(feature = "command")] -pub use command::Command; - -#[cfg(feature = "subscription")] -pub mod subscription; - -#[cfg(feature = "subscription")] -pub use subscription::Subscription; - -#[cfg(feature = "runtime")] -mod runtime; - -#[cfg(feature = "runtime")] -pub use runtime::Runtime; diff --git a/core/src/runtime.rs b/core/src/runtime.rs deleted file mode 100644 index 31234d11..00000000 --- a/core/src/runtime.rs +++ /dev/null @@ -1,74 +0,0 @@ -mod executor; - -pub use executor::Executor; - -use crate::{subscription, Command, Subscription}; - -use futures::Sink; -use std::marker::PhantomData; - -#[derive(Debug)] -pub struct Runtime { - executor: Executor, - subscriptions: subscription::Tracker, - receiver: Receiver, - _message: PhantomData, -} - -impl - Runtime -where - Hasher: std::hash::Hasher + Default, - Event: Send + Clone + 'static, - Executor: self::Executor, - Receiver: Sink - + Unpin - + Send - + Clone - + 'static, - Message: Send + 'static, -{ - pub fn new(receiver: Receiver) -> Self { - Self { - executor: Executor::new(), - subscriptions: subscription::Tracker::new(), - receiver, - _message: PhantomData, - } - } - - pub fn spawn(&mut self, command: Command) { - use futures::{FutureExt, SinkExt}; - - let futures = command.futures(); - - for future in futures { - let mut receiver = self.receiver.clone(); - - self.executor.spawn(future.then(|message| { - async move { - let _ = receiver.send(message).await; - - () - } - })); - } - } - - pub fn track( - &mut self, - subscription: Subscription, - ) { - let futures = self - .subscriptions - .update(subscription, self.receiver.clone()); - - for future in futures { - self.executor.spawn(future); - } - } - - pub fn broadcast(&mut self, event: Event) { - self.subscriptions.broadcast(event); - } -} diff --git a/core/src/runtime/executor.rs b/core/src/runtime/executor.rs deleted file mode 100644 index d171c6d5..00000000 --- a/core/src/runtime/executor.rs +++ /dev/null @@ -1,11 +0,0 @@ -use futures::Future; - -pub trait Executor { - fn new() -> Self; - - fn spawn(&self, future: impl Future + Send + 'static); - - fn enter(&self, f: impl FnOnce() -> R) -> R { - f() - } -} diff --git a/core/src/subscription.rs b/core/src/subscription.rs deleted file mode 100644 index 87e51e48..00000000 --- a/core/src/subscription.rs +++ /dev/null @@ -1,188 +0,0 @@ -//! Listen to external events in your application. -mod tracker; - -pub use tracker::Tracker; - -use futures::stream::BoxStream; - -/// A request to listen to external events. -/// -/// Besides performing async actions on demand with [`Command`], most -/// applications also need to listen to external events passively. -/// -/// A [`Subscription`] is normally provided to some runtime, like a [`Command`], -/// and it will generate events as long as the user keeps requesting it. -/// -/// For instance, you can use a [`Subscription`] to listen to a WebSocket -/// connection, keyboard presses, mouse events, time ticks, etc. -/// -/// This type is normally aliased by runtimes with a specific `Input` and/or -/// `Hasher`. -/// -/// [`Command`]: ../struct.Command.html -/// [`Subscription`]: struct.Subscription.html -pub struct Subscription { - recipes: Vec>>, -} - -impl Subscription -where - H: std::hash::Hasher, -{ - /// Returns an empty [`Subscription`] that will not produce any output. - /// - /// [`Subscription`]: struct.Subscription.html - pub fn none() -> Self { - Self { - recipes: Vec::new(), - } - } - - /// Creates a [`Subscription`] from a [`Recipe`] describing it. - /// - /// [`Subscription`]: struct.Subscription.html - /// [`Recipe`]: trait.Recipe.html - pub fn from_recipe( - recipe: impl Recipe + 'static, - ) -> Self { - Self { - recipes: vec![Box::new(recipe)], - } - } - - /// Batches all the provided subscriptions and returns the resulting - /// [`Subscription`]. - /// - /// [`Subscription`]: struct.Subscription.html - pub fn batch( - subscriptions: impl IntoIterator>, - ) -> Self { - Self { - recipes: subscriptions - .into_iter() - .flat_map(|subscription| subscription.recipes) - .collect(), - } - } - - /// Returns the different recipes of the [`Subscription`]. - /// - /// [`Subscription`]: struct.Subscription.html - pub fn recipes(self) -> Vec>> { - self.recipes - } - - /// Transforms the [`Subscription`] output with the given function. - /// - /// [`Subscription`]: struct.Subscription.html - pub fn map( - mut self, - f: impl Fn(O) -> A + Send + Sync + 'static, - ) -> Subscription - where - H: 'static, - I: 'static, - O: 'static, - A: 'static, - { - let function = std::sync::Arc::new(f); - - Subscription { - recipes: self - .recipes - .drain(..) - .map(|recipe| { - Box::new(Map::new(recipe, function.clone())) - as Box> - }) - .collect(), - } - } -} - -impl std::fmt::Debug for Subscription { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("Subscription").finish() - } -} - -/// The description of a [`Subscription`]. -/// -/// A [`Recipe`] is the internal definition of a [`Subscription`]. It is used -/// by runtimes to run and identify subscriptions. You can use it to create your -/// own! -/// -/// [`Subscription`]: struct.Subscription.html -/// [`Recipe`]: trait.Recipe.html -pub trait Recipe { - /// The events that will be produced by a [`Subscription`] with this - /// [`Recipe`]. - /// - /// [`Subscription`]: struct.Subscription.html - /// [`Recipe`]: trait.Recipe.html - type Output; - - /// Hashes the [`Recipe`]. - /// - /// This is used by runtimes to uniquely identify a [`Subscription`]. - /// - /// [`Subscription`]: struct.Subscription.html - /// [`Recipe`]: trait.Recipe.html - fn hash(&self, state: &mut Hasher); - - /// Executes the [`Recipe`] and produces the stream of events of its - /// [`Subscription`]. - /// - /// It receives some generic `Input`, which is normally defined by runtimes. - /// - /// [`Subscription`]: struct.Subscription.html - /// [`Recipe`]: trait.Recipe.html - fn stream( - self: Box, - input: BoxStream<'static, Input>, - ) -> BoxStream<'static, Self::Output>; -} - -struct Map { - recipe: Box>, - mapper: std::sync::Arc B + Send + Sync>, -} - -impl Map { - fn new( - recipe: Box>, - mapper: std::sync::Arc B + Send + Sync + 'static>, - ) -> Self { - Map { recipe, mapper } - } -} - -impl Recipe for Map -where - A: 'static, - B: 'static, - H: std::hash::Hasher, -{ - type Output = B; - - fn hash(&self, state: &mut H) { - use std::hash::Hash; - - std::any::TypeId::of::().hash(state); - self.recipe.hash(state); - } - - fn stream( - self: Box, - input: BoxStream<'static, I>, - ) -> futures::stream::BoxStream<'static, Self::Output> { - use futures::StreamExt; - - let mapper = self.mapper; - - self.recipe - .stream(input) - .map(move |element| mapper(element)) - .boxed() - } -} diff --git a/core/src/subscription/tracker.rs b/core/src/subscription/tracker.rs deleted file mode 100644 index a942b619..00000000 --- a/core/src/subscription/tracker.rs +++ /dev/null @@ -1,112 +0,0 @@ -use crate::Subscription; - -use futures::{future::BoxFuture, sink::Sink}; -use std::collections::HashMap; -use std::marker::PhantomData; - -#[derive(Debug)] -pub struct Tracker { - subscriptions: HashMap>, - _hasher: PhantomData, -} - -#[derive(Debug)] -pub struct Execution { - _cancel: futures::channel::oneshot::Sender<()>, - listener: Option>, -} - -impl Tracker -where - Hasher: std::hash::Hasher + Default, - Event: 'static + Send + Clone, -{ - pub fn new() -> Self { - Self { - subscriptions: HashMap::new(), - _hasher: PhantomData, - } - } - - pub fn update( - &mut self, - subscription: Subscription, - receiver: Receiver, - ) -> Vec> - where - Message: 'static + Send, - Receiver: 'static - + Sink - + Unpin - + Send - + Clone, - { - use futures::{future::FutureExt, stream::StreamExt}; - - let mut futures = Vec::new(); - - let recipes = subscription.recipes(); - let mut alive = std::collections::HashSet::new(); - - for recipe in recipes { - let id = { - let mut hasher = Hasher::default(); - recipe.hash(&mut hasher); - - hasher.finish() - }; - - let _ = alive.insert(id); - - if self.subscriptions.contains_key(&id) { - continue; - } - - let (cancel, cancelled) = futures::channel::oneshot::channel(); - - // TODO: Use bus if/when it supports async - let (event_sender, event_receiver) = - futures::channel::mpsc::channel(100); - - let stream = recipe.stream(event_receiver.boxed()); - - let future = futures::future::select( - cancelled, - stream.map(Ok).forward(receiver.clone()), - ) - .map(|_| ()); - - let _ = self.subscriptions.insert( - id, - Execution { - _cancel: cancel, - listener: if event_sender.is_closed() { - None - } else { - Some(event_sender) - }, - }, - ); - - futures.push(future.boxed()); - } - - self.subscriptions.retain(|id, _| alive.contains(&id)); - - futures - } - - pub fn broadcast(&mut self, event: Event) { - self.subscriptions - .values_mut() - .filter_map(|connection| connection.listener.as_mut()) - .for_each(|listener| { - if let Err(error) = listener.try_send(event.clone()) { - log::error!( - "Error sending event to subscription: {:?}", - error - ); - } - }); - } -} diff --git a/futures/Cargo.toml b/futures/Cargo.toml new file mode 100644 index 00000000..fe0d378c --- /dev/null +++ b/futures/Cargo.toml @@ -0,0 +1,23 @@ +[package] +name = "iced_futures" +version = "0.1.0-alpha" +authors = ["Héctor Ramón Jiménez "] +edition = "2018" +description = "Commands, subscriptions, and runtimes for Iced" +license = "MIT" +repository = "https://github.com/hecrj/iced" +documentation = "https://docs.rs/iced_futures" +keywords = ["gui", "ui", "graphics", "interface", "futures"] +categories = ["gui"] + +[dependencies] +log = "0.4" + +[dependencies.futures] +version = "0.3" +features = ["thread-pool"] + +[dependencies.tokio] +version = "0.2" +optional = true +features = ["rt-core"] diff --git a/futures/src/command.rs b/futures/src/command.rs new file mode 100644 index 00000000..e7885fb8 --- /dev/null +++ b/futures/src/command.rs @@ -0,0 +1,100 @@ +use futures::future::{BoxFuture, Future, FutureExt}; + +/// A collection of async operations. +/// +/// You should be able to turn a future easily into a [`Command`], either by +/// using the `From` trait or [`Command::perform`]. +/// +/// [`Command`]: struct.Command.html +pub struct Command { + futures: Vec>, +} + +impl Command { + /// Creates an empty [`Command`]. + /// + /// In other words, a [`Command`] that does nothing. + /// + /// [`Command`]: struct.Command.html + pub fn none() -> Self { + Self { + futures: Vec::new(), + } + } + + /// Creates a [`Command`] that performs the action of the given future. + /// + /// [`Command`]: struct.Command.html + pub fn perform( + future: impl Future + 'static + Send, + f: impl Fn(T) -> A + 'static + Send, + ) -> Command { + Command { + futures: vec![future.map(f).boxed()], + } + } + + /// Applies a transformation to the result of a [`Command`]. + /// + /// [`Command`]: struct.Command.html + pub fn map( + mut self, + f: impl Fn(T) -> A + 'static + Send + Sync, + ) -> Command + where + T: 'static, + { + let f = std::sync::Arc::new(f); + + Command { + futures: self + .futures + .drain(..) + .map(|future| { + let f = f.clone(); + + future.map(move |result| f(result)).boxed() + }) + .collect(), + } + } + + /// Creates a [`Command`] that performs the actions of all the given + /// commands. + /// + /// Once this command is run, all the commands will be exectued at once. + /// + /// [`Command`]: struct.Command.html + pub fn batch(commands: impl IntoIterator>) -> Self { + Self { + futures: commands + .into_iter() + .flat_map(|command| command.futures) + .collect(), + } + } + + /// Converts a [`Command`] into its underlying list of futures. + /// + /// [`Command`]: struct.Command.html + pub fn futures(self) -> Vec> { + self.futures + } +} + +impl From for Command +where + A: Future + 'static + Send, +{ + fn from(future: A) -> Self { + Self { + futures: vec![future.boxed()], + } + } +} + +impl std::fmt::Debug for Command { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("Command").finish() + } +} diff --git a/futures/src/lib.rs b/futures/src/lib.rs new file mode 100644 index 00000000..f6bcf85a --- /dev/null +++ b/futures/src/lib.rs @@ -0,0 +1,8 @@ +mod command; + +pub mod runtime; +pub mod subscription; + +pub use command::Command; +pub use runtime::Runtime; +pub use subscription::Subscription; diff --git a/futures/src/runtime.rs b/futures/src/runtime.rs new file mode 100644 index 00000000..bc1ad8ac --- /dev/null +++ b/futures/src/runtime.rs @@ -0,0 +1,79 @@ +//! Run commands and subscriptions. +mod executor; + +pub use executor::Executor; + +use crate::{subscription, Command, Subscription}; + +use futures::Sink; +use std::marker::PhantomData; + +#[derive(Debug)] +pub struct Runtime { + executor: Executor, + receiver: Receiver, + subscriptions: subscription::Tracker, + _message: PhantomData, +} + +impl + Runtime +where + Hasher: std::hash::Hasher + Default, + Event: Send + Clone + 'static, + Executor: self::Executor, + Receiver: Sink + + Unpin + + Send + + Clone + + 'static, + Message: Send + 'static, +{ + pub fn new(executor: Executor, receiver: Receiver) -> Self { + Self { + executor, + receiver, + subscriptions: subscription::Tracker::new(), + _message: PhantomData, + } + } + + pub fn enter(&self, f: impl FnOnce() -> R) -> R { + self.executor.enter(f) + } + + pub fn spawn(&mut self, command: Command) { + use futures::{FutureExt, SinkExt}; + + let futures = command.futures(); + + for future in futures { + let mut receiver = self.receiver.clone(); + + self.executor.spawn(future.then(|message| { + async move { + let _ = receiver.send(message).await; + + () + } + })); + } + } + + pub fn track( + &mut self, + subscription: Subscription, + ) { + let futures = self + .subscriptions + .update(subscription, self.receiver.clone()); + + for future in futures { + self.executor.spawn(future); + } + } + + pub fn broadcast(&mut self, event: Event) { + self.subscriptions.broadcast(event); + } +} diff --git a/futures/src/runtime/executor.rs b/futures/src/runtime/executor.rs new file mode 100644 index 00000000..855aa105 --- /dev/null +++ b/futures/src/runtime/executor.rs @@ -0,0 +1,26 @@ +use futures::Future; + +pub trait Executor { + fn spawn(&self, future: impl Future + Send + 'static); + + fn enter(&self, f: impl FnOnce() -> R) -> R { + f() + } +} + +impl Executor for futures::executor::ThreadPool { + fn spawn(&self, future: impl Future + Send + 'static) { + self.spawn_ok(future); + } +} + +#[cfg(feature = "tokio")] +impl Executor for tokio::runtime::Runtime { + fn spawn(&self, future: impl Future + Send + 'static) { + let _ = tokio::runtime::Runtime::spawn(self, future); + } + + fn enter(&self, f: impl FnOnce() -> R) -> R { + tokio::runtime::Runtime::enter(self, f) + } +} diff --git a/futures/src/subscription.rs b/futures/src/subscription.rs new file mode 100644 index 00000000..87e51e48 --- /dev/null +++ b/futures/src/subscription.rs @@ -0,0 +1,188 @@ +//! Listen to external events in your application. +mod tracker; + +pub use tracker::Tracker; + +use futures::stream::BoxStream; + +/// A request to listen to external events. +/// +/// Besides performing async actions on demand with [`Command`], most +/// applications also need to listen to external events passively. +/// +/// A [`Subscription`] is normally provided to some runtime, like a [`Command`], +/// and it will generate events as long as the user keeps requesting it. +/// +/// For instance, you can use a [`Subscription`] to listen to a WebSocket +/// connection, keyboard presses, mouse events, time ticks, etc. +/// +/// This type is normally aliased by runtimes with a specific `Input` and/or +/// `Hasher`. +/// +/// [`Command`]: ../struct.Command.html +/// [`Subscription`]: struct.Subscription.html +pub struct Subscription { + recipes: Vec>>, +} + +impl Subscription +where + H: std::hash::Hasher, +{ + /// Returns an empty [`Subscription`] that will not produce any output. + /// + /// [`Subscription`]: struct.Subscription.html + pub fn none() -> Self { + Self { + recipes: Vec::new(), + } + } + + /// Creates a [`Subscription`] from a [`Recipe`] describing it. + /// + /// [`Subscription`]: struct.Subscription.html + /// [`Recipe`]: trait.Recipe.html + pub fn from_recipe( + recipe: impl Recipe + 'static, + ) -> Self { + Self { + recipes: vec![Box::new(recipe)], + } + } + + /// Batches all the provided subscriptions and returns the resulting + /// [`Subscription`]. + /// + /// [`Subscription`]: struct.Subscription.html + pub fn batch( + subscriptions: impl IntoIterator>, + ) -> Self { + Self { + recipes: subscriptions + .into_iter() + .flat_map(|subscription| subscription.recipes) + .collect(), + } + } + + /// Returns the different recipes of the [`Subscription`]. + /// + /// [`Subscription`]: struct.Subscription.html + pub fn recipes(self) -> Vec>> { + self.recipes + } + + /// Transforms the [`Subscription`] output with the given function. + /// + /// [`Subscription`]: struct.Subscription.html + pub fn map( + mut self, + f: impl Fn(O) -> A + Send + Sync + 'static, + ) -> Subscription + where + H: 'static, + I: 'static, + O: 'static, + A: 'static, + { + let function = std::sync::Arc::new(f); + + Subscription { + recipes: self + .recipes + .drain(..) + .map(|recipe| { + Box::new(Map::new(recipe, function.clone())) + as Box> + }) + .collect(), + } + } +} + +impl std::fmt::Debug for Subscription { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("Subscription").finish() + } +} + +/// The description of a [`Subscription`]. +/// +/// A [`Recipe`] is the internal definition of a [`Subscription`]. It is used +/// by runtimes to run and identify subscriptions. You can use it to create your +/// own! +/// +/// [`Subscription`]: struct.Subscription.html +/// [`Recipe`]: trait.Recipe.html +pub trait Recipe { + /// The events that will be produced by a [`Subscription`] with this + /// [`Recipe`]. + /// + /// [`Subscription`]: struct.Subscription.html + /// [`Recipe`]: trait.Recipe.html + type Output; + + /// Hashes the [`Recipe`]. + /// + /// This is used by runtimes to uniquely identify a [`Subscription`]. + /// + /// [`Subscription`]: struct.Subscription.html + /// [`Recipe`]: trait.Recipe.html + fn hash(&self, state: &mut Hasher); + + /// Executes the [`Recipe`] and produces the stream of events of its + /// [`Subscription`]. + /// + /// It receives some generic `Input`, which is normally defined by runtimes. + /// + /// [`Subscription`]: struct.Subscription.html + /// [`Recipe`]: trait.Recipe.html + fn stream( + self: Box, + input: BoxStream<'static, Input>, + ) -> BoxStream<'static, Self::Output>; +} + +struct Map { + recipe: Box>, + mapper: std::sync::Arc B + Send + Sync>, +} + +impl Map { + fn new( + recipe: Box>, + mapper: std::sync::Arc B + Send + Sync + 'static>, + ) -> Self { + Map { recipe, mapper } + } +} + +impl Recipe for Map +where + A: 'static, + B: 'static, + H: std::hash::Hasher, +{ + type Output = B; + + fn hash(&self, state: &mut H) { + use std::hash::Hash; + + std::any::TypeId::of::().hash(state); + self.recipe.hash(state); + } + + fn stream( + self: Box, + input: BoxStream<'static, I>, + ) -> futures::stream::BoxStream<'static, Self::Output> { + use futures::StreamExt; + + let mapper = self.mapper; + + self.recipe + .stream(input) + .map(move |element| mapper(element)) + .boxed() + } +} diff --git a/futures/src/subscription/tracker.rs b/futures/src/subscription/tracker.rs new file mode 100644 index 00000000..a942b619 --- /dev/null +++ b/futures/src/subscription/tracker.rs @@ -0,0 +1,112 @@ +use crate::Subscription; + +use futures::{future::BoxFuture, sink::Sink}; +use std::collections::HashMap; +use std::marker::PhantomData; + +#[derive(Debug)] +pub struct Tracker { + subscriptions: HashMap>, + _hasher: PhantomData, +} + +#[derive(Debug)] +pub struct Execution { + _cancel: futures::channel::oneshot::Sender<()>, + listener: Option>, +} + +impl Tracker +where + Hasher: std::hash::Hasher + Default, + Event: 'static + Send + Clone, +{ + pub fn new() -> Self { + Self { + subscriptions: HashMap::new(), + _hasher: PhantomData, + } + } + + pub fn update( + &mut self, + subscription: Subscription, + receiver: Receiver, + ) -> Vec> + where + Message: 'static + Send, + Receiver: 'static + + Sink + + Unpin + + Send + + Clone, + { + use futures::{future::FutureExt, stream::StreamExt}; + + let mut futures = Vec::new(); + + let recipes = subscription.recipes(); + let mut alive = std::collections::HashSet::new(); + + for recipe in recipes { + let id = { + let mut hasher = Hasher::default(); + recipe.hash(&mut hasher); + + hasher.finish() + }; + + let _ = alive.insert(id); + + if self.subscriptions.contains_key(&id) { + continue; + } + + let (cancel, cancelled) = futures::channel::oneshot::channel(); + + // TODO: Use bus if/when it supports async + let (event_sender, event_receiver) = + futures::channel::mpsc::channel(100); + + let stream = recipe.stream(event_receiver.boxed()); + + let future = futures::future::select( + cancelled, + stream.map(Ok).forward(receiver.clone()), + ) + .map(|_| ()); + + let _ = self.subscriptions.insert( + id, + Execution { + _cancel: cancel, + listener: if event_sender.is_closed() { + None + } else { + Some(event_sender) + }, + }, + ); + + futures.push(future.boxed()); + } + + self.subscriptions.retain(|id, _| alive.contains(&id)); + + futures + } + + pub fn broadcast(&mut self, event: Event) { + self.subscriptions + .values_mut() + .filter_map(|connection| connection.listener.as_mut()) + .for_each(|listener| { + if let Err(error) = listener.try_send(event.clone()) { + log::error!( + "Error sending event to subscription: {:?}", + error + ); + } + }); + } +} diff --git a/native/Cargo.toml b/native/Cargo.toml index a31b6627..57a869e2 100644 --- a/native/Cargo.toml +++ b/native/Cargo.toml @@ -8,8 +8,15 @@ license = "MIT" repository = "https://github.com/hecrj/iced" [dependencies] -iced_core = { version = "0.1.0", path = "../core", features = ["command", "subscription"] } twox-hash = "1.5" raw-window-handle = "0.3" unicode-segmentation = "1.6" futures = "0.3" + +[dependencies.iced_core] +version = "0.1.0" +path = "../core" + +[dependencies.iced_futures] +version = "0.1.0-alpha" +path = "../futures" diff --git a/native/src/lib.rs b/native/src/lib.rs index 340b9ea7..7730c6a3 100644 --- a/native/src/lib.rs +++ b/native/src/lib.rs @@ -42,6 +42,7 @@ pub mod input; pub mod layout; pub mod renderer; +pub mod runtime; pub mod subscription; pub mod widget; pub mod window; @@ -55,9 +56,10 @@ mod size; mod user_interface; pub use iced_core::{ - Align, Background, Color, Command, Font, HorizontalAlignment, Length, - Point, Rectangle, Vector, VerticalAlignment, + Align, Background, Color, Font, HorizontalAlignment, Length, Point, + Rectangle, Vector, VerticalAlignment, }; +pub use iced_futures::Command; pub use clipboard::Clipboard; pub use element::Element; @@ -66,6 +68,7 @@ pub use hasher::Hasher; pub use layout::Layout; pub use mouse_cursor::MouseCursor; pub use renderer::Renderer; +pub use runtime::Runtime; pub use size::Size; pub use subscription::Subscription; pub use user_interface::{Cache, UserInterface}; diff --git a/native/src/runtime.rs b/native/src/runtime.rs new file mode 100644 index 00000000..2b3abbf1 --- /dev/null +++ b/native/src/runtime.rs @@ -0,0 +1,14 @@ +//! Run commands and subscriptions. +use crate::{Event, Hasher}; + +/// A native runtime with a generic executor and receiver of results. +/// +/// It can be used by shells to easily spawn a [`Command`] or track a +/// [`Subscription`]. +/// +/// [`Command`]: ../struct.Command.html +/// [`Subscription`]: ../struct.Subscription.html +pub type Runtime = + iced_futures::Runtime; + +pub use iced_futures::runtime::Executor; diff --git a/native/src/subscription.rs b/native/src/subscription.rs index cd0822c1..43f1758a 100644 --- a/native/src/subscription.rs +++ b/native/src/subscription.rs @@ -15,7 +15,7 @@ use futures::stream::BoxStream; /// /// [`Command`]: ../struct.Command.html /// [`Subscription`]: struct.Subscription.html -pub type Subscription = iced_core::Subscription; +pub type Subscription = iced_futures::Subscription; /// A stream of runtime events. /// @@ -27,9 +27,9 @@ pub type EventStream = BoxStream<'static, Event>; /// A native [`Subscription`] tracker. /// /// [`Subscription`]: type.Subscription.html -pub type Tracker = iced_core::subscription::Tracker; +pub type Tracker = iced_futures::subscription::Tracker; -pub use iced_core::subscription::Recipe; +pub use iced_futures::subscription::Recipe; mod events; diff --git a/web/Cargo.toml b/web/Cargo.toml index 605c7462..ea092575 100644 --- a/web/Cargo.toml +++ b/web/Cargo.toml @@ -15,12 +15,19 @@ categories = ["web-programming"] maintenance = { status = "actively-developed" } [dependencies] -iced_core = { version = "0.1.0", path = "../core", features = ["command", "subscription"] } dodrio = "0.1.0" wasm-bindgen = "0.2.51" wasm-bindgen-futures = "0.4" futures = "0.3" +[dependencies.iced_core] +version = "0.1.0" +path = "../core" + +[dependencies.iced_futures] +version = "0.1.0-alpha" +path = "../futures" + [dependencies.web-sys] version = "0.3.27" features = [ diff --git a/web/src/lib.rs b/web/src/lib.rs index 7ea22e85..b183c390 100644 --- a/web/src/lib.rs +++ b/web/src/lib.rs @@ -72,9 +72,10 @@ pub use dodrio; pub use element::Element; pub use hasher::Hasher; pub use iced_core::{ - Align, Background, Color, Command, Font, HorizontalAlignment, Length, + Align, Background, Color, Font, HorizontalAlignment, Length, VerticalAlignment, }; +pub use iced_futures::Command; pub use style::Style; pub use subscription::Subscription; pub use widget::*; @@ -148,7 +149,6 @@ pub trait Application { } } - struct Instance { title: String, ui: Rc>>>, @@ -167,7 +167,7 @@ impl Clone for Instance { impl Instance where - Message: 'static + Message: 'static, { fn new(ui: impl Application + 'static) -> Self { Self { diff --git a/web/src/subscription.rs b/web/src/subscription.rs index 4638c8ab..6b8415c0 100644 --- a/web/src/subscription.rs +++ b/web/src/subscription.rs @@ -14,6 +14,6 @@ use crate::Hasher; /// /// [`Command`]: ../struct.Command.html /// [`Subscription`]: struct.Subscription.html -pub type Subscription = iced_core::Subscription; +pub type Subscription = iced_futures::Subscription; -pub use iced_core::subscription::Recipe; +pub use iced_futures::subscription::Recipe; diff --git a/winit/Cargo.toml b/winit/Cargo.toml index 5727f8cf..3ed37dd5 100644 --- a/winit/Cargo.toml +++ b/winit/Cargo.toml @@ -14,11 +14,17 @@ categories = ["gui"] debug = [] [dependencies] -iced_native = { version = "0.1.0-alpha", path = "../native" } winit = { version = "0.20.0-alpha3", git = "https://github.com/hecrj/winit", rev = "709808eb4e69044705fcb214bcc30556db761405"} -window_clipboard = { git = "https://github.com/hecrj/window_clipboard", rev = "22c6dd6c04cd05d528029b50a30c56417cd4bebf" } -futures = { version = "0.3", features = ["thread-pool"] } log = "0.4" +futures = { version = "0.3", features = ["thread-pool"] } + +[dependencies.iced_native] +version = "0.1.0-alpha" +path = "../native" + +[dependencies.window_clipboard] +git = "https://github.com/hecrj/window_clipboard" +rev = "22c6dd6c04cd05d528029b50a30c56417cd4bebf" [target.'cfg(target_os = "windows")'.dependencies.winapi] version = "0.3.6" diff --git a/winit/src/application.rs b/winit/src/application.rs index a14924ac..076ac092 100644 --- a/winit/src/application.rs +++ b/winit/src/application.rs @@ -1,9 +1,10 @@ use crate::{ conversion, input::{keyboard, mouse}, - subscription, window, Cache, Clipboard, Command, Debug, Element, Event, - Mode, MouseCursor, Settings, Size, Subscription, UserInterface, + window, Cache, Clipboard, Command, Debug, Element, Event, Mode, + MouseCursor, Proxy, Settings, Size, Subscription, UserInterface, }; +use iced_native::Runtime; /// An interactive, native cross-platform application. /// @@ -109,17 +110,19 @@ pub trait Application: Sized { debug.startup_started(); let event_loop = EventLoop::with_user_event(); - let proxy = event_loop.create_proxy(); - let mut thread_pool = - futures::executor::ThreadPool::new().expect("Create thread pool"); - let mut subscription_pool = subscription::Pool::new(); + let mut runtime = { + let thread_pool = futures::executor::ThreadPool::new() + .expect("Create thread pool"); + + Runtime::new(thread_pool, Proxy::new(event_loop.create_proxy())) + }; let mut external_messages = Vec::new(); let (mut application, init_command) = Self::new(); - spawn(init_command, &mut thread_pool, &proxy); + runtime.spawn(init_command); let subscription = application.subscription(); - subscription_pool.update(subscription, &mut thread_pool, &proxy); + runtime.track(subscription); let mut title = application.title(); let mut mode = application.mode(); @@ -212,7 +215,7 @@ pub trait Application: Sized { events .iter() .cloned() - .for_each(|event| subscription_pool.broadcast_event(event)); + .for_each(|event| runtime.broadcast(event)); let mut messages = user_interface.update( &renderer, @@ -241,17 +244,15 @@ pub trait Application: Sized { debug.log_message(&message); debug.update_started(); - let command = application.update(message); - spawn(command, &mut thread_pool, &proxy); + let command = + runtime.enter(|| application.update(message)); + runtime.spawn(command); debug.update_finished(); } - let subscription = application.subscription(); - subscription_pool.update( - subscription, - &mut thread_pool, - &proxy, - ); + let subscription = + runtime.enter(|| application.subscription()); + runtime.track(subscription); // Update window title let new_title = application.title(); @@ -463,28 +464,6 @@ fn to_physical(size: winit::dpi::LogicalSize, dpi: f64) -> (u16, u16) { ) } -fn spawn( - command: Command, - thread_pool: &mut futures::executor::ThreadPool, - proxy: &winit::event_loop::EventLoopProxy, -) { - use futures::FutureExt; - - let futures = command.futures(); - - for future in futures { - let proxy = proxy.clone(); - - let future = future.map(move |message| { - proxy - .send_event(message) - .expect("Send command result to event loop"); - }); - - thread_pool.spawn_ok(future); - } -} - // As defined in: http://www.unicode.org/faq/private_use.html // TODO: Remove once https://github.com/rust-windowing/winit/pull/1254 lands fn is_private_use_character(c: char) -> bool { diff --git a/winit/src/lib.rs b/winit/src/lib.rs index 9000f977..056ae8f0 100644 --- a/winit/src/lib.rs +++ b/winit/src/lib.rs @@ -31,7 +31,7 @@ pub mod settings; mod application; mod clipboard; mod mode; -mod subscription; +mod proxy; // We disable debug capabilities on release builds unless the `debug` feature // is explicitly enabled. @@ -48,3 +48,4 @@ pub use settings::Settings; use clipboard::Clipboard; use debug::Debug; +use proxy::Proxy; diff --git a/winit/src/proxy.rs b/winit/src/proxy.rs new file mode 100644 index 00000000..7e8dee98 --- /dev/null +++ b/winit/src/proxy.rs @@ -0,0 +1,57 @@ +use futures::{ + task::{Context, Poll}, + Sink, +}; +use std::pin::Pin; + +pub struct Proxy { + raw: winit::event_loop::EventLoopProxy, +} + +impl Clone for Proxy { + fn clone(&self) -> Self { + Self { + raw: self.raw.clone(), + } + } +} + +impl Proxy { + pub fn new(raw: winit::event_loop::EventLoopProxy) -> Self { + Self { raw } + } +} + +impl Sink for Proxy { + type Error = core::convert::Infallible; + + fn poll_ready( + self: Pin<&mut Self>, + _cx: &mut Context<'_>, + ) -> Poll> { + Poll::Ready(Ok(())) + } + + fn start_send( + self: Pin<&mut Self>, + message: Message, + ) -> Result<(), Self::Error> { + let _ = self.raw.send_event(message); + + Ok(()) + } + + fn poll_flush( + self: Pin<&mut Self>, + _cx: &mut Context<'_>, + ) -> Poll> { + Poll::Ready(Ok(())) + } + + fn poll_close( + self: Pin<&mut Self>, + _cx: &mut Context<'_>, + ) -> Poll> { + Poll::Ready(Ok(())) + } +} diff --git a/winit/src/subscription.rs b/winit/src/subscription.rs deleted file mode 100644 index bad68d55..00000000 --- a/winit/src/subscription.rs +++ /dev/null @@ -1,97 +0,0 @@ -use iced_native::{Event, Hasher, Subscription}; -use std::collections::HashMap; - -pub struct Pool { - alive: HashMap, -} - -pub struct Handle { - _cancel: futures::channel::oneshot::Sender<()>, - listener: Option>, -} - -impl Pool { - pub fn new() -> Self { - Self { - alive: HashMap::new(), - } - } - - pub fn update( - &mut self, - subscription: Subscription, - thread_pool: &mut futures::executor::ThreadPool, - proxy: &winit::event_loop::EventLoopProxy, - ) { - use futures::{future::FutureExt, stream::StreamExt}; - - let recipes = subscription.recipes(); - let mut alive = std::collections::HashSet::new(); - - for recipe in recipes { - let id = { - use std::hash::Hasher as _; - - let mut hasher = Hasher::default(); - recipe.hash(&mut hasher); - - hasher.finish() - }; - - let _ = alive.insert(id); - - if !self.alive.contains_key(&id) { - let (cancel, cancelled) = futures::channel::oneshot::channel(); - - // TODO: Use bus if/when it supports async - let (event_sender, event_receiver) = - futures::channel::mpsc::channel(100); - - let stream = recipe.stream(event_receiver.boxed()); - let proxy = proxy.clone(); - - let future = futures::future::select( - cancelled, - stream.for_each(move |message| { - proxy - .send_event(message) - .expect("Send subscription result to event loop"); - - futures::future::ready(()) - }), - ) - .map(|_| ()); - - thread_pool.spawn_ok(future); - - let _ = self.alive.insert( - id, - Handle { - _cancel: cancel, - listener: if event_sender.is_closed() { - None - } else { - Some(event_sender) - }, - }, - ); - } - } - - self.alive.retain(|id, _| alive.contains(&id)); - } - - pub fn broadcast_event(&mut self, event: Event) { - self.alive - .values_mut() - .filter_map(|connection| connection.listener.as_mut()) - .for_each(|listener| { - if let Err(error) = listener.try_send(event.clone()) { - log::error!( - "Error sending event to subscription: {:?}", - error - ); - } - }); - } -} -- cgit