diff options
Diffstat (limited to 'futures/src')
| -rw-r--r-- | futures/src/backend/native/async_std.rs | 14 | ||||
| -rw-r--r-- | futures/src/backend/native/smol.rs | 14 | ||||
| -rw-r--r-- | futures/src/backend/native/tokio.rs | 14 | ||||
| -rw-r--r-- | futures/src/backend/wasm/wasm_bindgen.rs | 14 | ||||
| -rw-r--r-- | futures/src/command.rs | 70 | ||||
| -rw-r--r-- | futures/src/lib.rs | 3 | ||||
| -rw-r--r-- | futures/src/runtime.rs | 25 | ||||
| -rw-r--r-- | futures/src/subscription.rs | 349 | ||||
| -rw-r--r-- | futures/src/subscription/tracker.rs | 51 | 
9 files changed, 349 insertions, 205 deletions
| diff --git a/futures/src/backend/native/async_std.rs b/futures/src/backend/native/async_std.rs index b324dbf1..52b0e914 100644 --- a/futures/src/backend/native/async_std.rs +++ b/futures/src/backend/native/async_std.rs @@ -18,28 +18,26 @@ impl crate::Executor for Executor {  pub mod time {      //! Listen and react to time. +    use crate::core::Hasher;      use crate::subscription::{self, Subscription};      /// Returns a [`Subscription`] that produces messages at a set interval.      ///      /// The first message is produced after a `duration`, and then continues to      /// produce more messages every `duration` after that. -    pub fn every<H: std::hash::Hasher, E>( +    pub fn every(          duration: std::time::Duration, -    ) -> Subscription<H, E, std::time::Instant> { +    ) -> Subscription<std::time::Instant> {          Subscription::from_recipe(Every(duration))      }      #[derive(Debug)]      struct Every(std::time::Duration); -    impl<H, E> subscription::Recipe<H, E> for Every -    where -        H: std::hash::Hasher, -    { +    impl subscription::Recipe for Every {          type Output = std::time::Instant; -        fn hash(&self, state: &mut H) { +        fn hash(&self, state: &mut Hasher) {              use std::hash::Hash;              std::any::TypeId::of::<Self>().hash(state); @@ -48,7 +46,7 @@ pub mod time {          fn stream(              self: Box<Self>, -            _input: futures::stream::BoxStream<'static, E>, +            _input: subscription::EventStream,          ) -> futures::stream::BoxStream<'static, Self::Output> {              use futures::stream::StreamExt; diff --git a/futures/src/backend/native/smol.rs b/futures/src/backend/native/smol.rs index d5201cde..30bc8291 100644 --- a/futures/src/backend/native/smol.rs +++ b/futures/src/backend/native/smol.rs @@ -19,28 +19,26 @@ impl crate::Executor for Executor {  pub mod time {      //! Listen and react to time. +    use crate::core::Hasher;      use crate::subscription::{self, Subscription};      /// Returns a [`Subscription`] that produces messages at a set interval.      ///      /// The first message is produced after a `duration`, and then continues to      /// produce more messages every `duration` after that. -    pub fn every<H: std::hash::Hasher, E>( +    pub fn every(          duration: std::time::Duration, -    ) -> Subscription<H, E, std::time::Instant> { +    ) -> Subscription<std::time::Instant> {          Subscription::from_recipe(Every(duration))      }      #[derive(Debug)]      struct Every(std::time::Duration); -    impl<H, E> subscription::Recipe<H, E> for Every -    where -        H: std::hash::Hasher, -    { +    impl subscription::Recipe for Every {          type Output = std::time::Instant; -        fn hash(&self, state: &mut H) { +        fn hash(&self, state: &mut Hasher) {              use std::hash::Hash;              std::any::TypeId::of::<Self>().hash(state); @@ -49,7 +47,7 @@ pub mod time {          fn stream(              self: Box<Self>, -            _input: futures::stream::BoxStream<'static, E>, +            _input: subscription::EventStream,          ) -> futures::stream::BoxStream<'static, Self::Output> {              use futures::stream::StreamExt; diff --git a/futures/src/backend/native/tokio.rs b/futures/src/backend/native/tokio.rs index dd818bd1..4698a105 100644 --- a/futures/src/backend/native/tokio.rs +++ b/futures/src/backend/native/tokio.rs @@ -22,28 +22,26 @@ impl crate::Executor for Executor {  pub mod time {      //! Listen and react to time. +    use crate::core::Hasher;      use crate::subscription::{self, Subscription};      /// Returns a [`Subscription`] that produces messages at a set interval.      ///      /// The first message is produced after a `duration`, and then continues to      /// produce more messages every `duration` after that. -    pub fn every<H: std::hash::Hasher, E>( +    pub fn every(          duration: std::time::Duration, -    ) -> Subscription<H, E, std::time::Instant> { +    ) -> Subscription<std::time::Instant> {          Subscription::from_recipe(Every(duration))      }      #[derive(Debug)]      struct Every(std::time::Duration); -    impl<H, E> subscription::Recipe<H, E> for Every -    where -        H: std::hash::Hasher, -    { +    impl subscription::Recipe for Every {          type Output = std::time::Instant; -        fn hash(&self, state: &mut H) { +        fn hash(&self, state: &mut Hasher) {              use std::hash::Hash;              std::any::TypeId::of::<Self>().hash(state); @@ -52,7 +50,7 @@ pub mod time {          fn stream(              self: Box<Self>, -            _input: futures::stream::BoxStream<'static, E>, +            _input: subscription::EventStream,          ) -> futures::stream::BoxStream<'static, Self::Output> {              use futures::stream::StreamExt; diff --git a/futures/src/backend/wasm/wasm_bindgen.rs b/futures/src/backend/wasm/wasm_bindgen.rs index b726501a..2666f1b4 100644 --- a/futures/src/backend/wasm/wasm_bindgen.rs +++ b/futures/src/backend/wasm/wasm_bindgen.rs @@ -16,6 +16,7 @@ impl crate::Executor for Executor {  pub mod time {      //! Listen and react to time. +    use crate::core::Hasher;      use crate::subscription::{self, Subscription};      use crate::BoxStream; @@ -23,22 +24,19 @@ pub mod time {      ///      /// The first message is produced after a `duration`, and then continues to      /// produce more messages every `duration` after that. -    pub fn every<H: std::hash::Hasher, E>( +    pub fn every(          duration: std::time::Duration, -    ) -> Subscription<H, E, wasm_timer::Instant> { +    ) -> Subscription<wasm_timer::Instant> {          Subscription::from_recipe(Every(duration))      }      #[derive(Debug)]      struct Every(std::time::Duration); -    impl<H, E> subscription::Recipe<H, E> for Every -    where -        H: std::hash::Hasher, -    { +    impl subscription::Recipe for Every {          type Output = wasm_timer::Instant; -        fn hash(&self, state: &mut H) { +        fn hash(&self, state: &mut Hasher) {              use std::hash::Hash;              std::any::TypeId::of::<Self>().hash(state); @@ -47,7 +45,7 @@ pub mod time {          fn stream(              self: Box<Self>, -            _input: BoxStream<E>, +            _input: subscription::EventStream,          ) -> BoxStream<Self::Output> {              use futures::stream::StreamExt; diff --git a/futures/src/command.rs b/futures/src/command.rs deleted file mode 100644 index 3d1ec3f9..00000000 --- a/futures/src/command.rs +++ /dev/null @@ -1,70 +0,0 @@ -/// A set of asynchronous actions to be performed by some runtime. -#[must_use = "`Command` must be returned to runtime to take effect"] -#[derive(Debug)] -pub struct Command<T>(Internal<T>); - -#[derive(Debug)] -enum Internal<T> { -    None, -    Single(T), -    Batch(Vec<T>), -} - -impl<T> Command<T> { -    /// Creates an empty [`Command`]. -    /// -    /// In other words, a [`Command`] that does nothing. -    pub const fn none() -> Self { -        Self(Internal::None) -    } - -    /// Creates a [`Command`] that performs a single action. -    pub const fn single(action: T) -> Self { -        Self(Internal::Single(action)) -    } - -    /// Creates a [`Command`] that performs the actions of all the given -    /// commands. -    /// -    /// Once this command is run, all the commands will be executed at once. -    pub fn batch(commands: impl IntoIterator<Item = Command<T>>) -> Self { -        let mut batch = Vec::new(); - -        for Command(command) in commands { -            match command { -                Internal::None => {} -                Internal::Single(command) => batch.push(command), -                Internal::Batch(commands) => batch.extend(commands), -            } -        } - -        Self(Internal::Batch(batch)) -    } - -    /// Applies a transformation to the result of a [`Command`]. -    pub fn map<A>(self, f: impl Fn(T) -> A) -> Command<A> -    where -        T: 'static, -    { -        let Command(command) = self; - -        match command { -            Internal::None => Command::none(), -            Internal::Single(action) => Command::single(f(action)), -            Internal::Batch(batch) => { -                Command(Internal::Batch(batch.into_iter().map(f).collect())) -            } -        } -    } - -    /// Returns all of the actions of the [`Command`]. -    pub fn actions(self) -> Vec<T> { -        let Command(command) = self; - -        match command { -            Internal::None => Vec::new(), -            Internal::Single(action) => vec![action], -            Internal::Batch(batch) => batch, -        } -    } -} diff --git a/futures/src/lib.rs b/futures/src/lib.rs index c0982db7..397fc2d2 100644 --- a/futures/src/lib.rs +++ b/futures/src/lib.rs @@ -18,8 +18,8 @@  #![allow(clippy::inherent_to_string, clippy::type_complexity)]  #![cfg_attr(docsrs, feature(doc_cfg))]  pub use futures; +pub use iced_core as core; -mod command;  mod maybe_send;  mod runtime; @@ -27,7 +27,6 @@ pub mod backend;  pub mod executor;  pub mod subscription; -pub use command::Command;  pub use executor::Executor;  pub use maybe_send::MaybeSend;  pub use platform::*; diff --git a/futures/src/runtime.rs b/futures/src/runtime.rs index 24f9f241..2241a494 100644 --- a/futures/src/runtime.rs +++ b/futures/src/runtime.rs @@ -1,6 +1,7 @@  //! Run commands and keep track of subscriptions. +use crate::core::event::{self, Event};  use crate::subscription; -use crate::{BoxFuture, Executor, MaybeSend, Subscription}; +use crate::{BoxFuture, Executor, MaybeSend};  use futures::{channel::mpsc, Sink};  use std::marker::PhantomData; @@ -12,18 +13,15 @@ use std::marker::PhantomData;  ///  /// [`Command`]: crate::Command  #[derive(Debug)] -pub struct Runtime<Hasher, Event, Executor, Sender, Message> { +pub struct Runtime<Executor, Sender, Message> {      executor: Executor,      sender: Sender, -    subscriptions: subscription::Tracker<Hasher, Event>, +    subscriptions: subscription::Tracker,      _message: PhantomData<Message>,  } -impl<Hasher, Event, Executor, Sender, Message> -    Runtime<Hasher, Event, Executor, Sender, Message> +impl<Executor, Sender, Message> Runtime<Executor, Sender, Message>  where -    Hasher: std::hash::Hasher + Default, -    Event: Send + Clone + 'static,      Executor: self::Executor,      Sender: Sink<Message, Error = mpsc::SendError>          + Unpin @@ -79,7 +77,9 @@ where      /// [`Tracker::update`]: subscription::Tracker::update      pub fn track(          &mut self, -        subscription: Subscription<Hasher, Event, Message>, +        recipes: impl IntoIterator< +            Item = Box<dyn subscription::Recipe<Output = Message>>, +        >,      ) {          let Runtime {              executor, @@ -88,8 +88,9 @@ where              ..          } = self; -        let futures = executor -            .enter(|| subscriptions.update(subscription, sender.clone())); +        let futures = executor.enter(|| { +            subscriptions.update(recipes.into_iter(), sender.clone()) +        });          for future in futures {              executor.spawn(future); @@ -102,7 +103,7 @@ where      /// See [`Tracker::broadcast`] to learn more.      ///      /// [`Tracker::broadcast`]: subscription::Tracker::broadcast -    pub fn broadcast(&mut self, event: Event) { -        self.subscriptions.broadcast(event); +    pub fn broadcast(&mut self, event: Event, status: event::Status) { +        self.subscriptions.broadcast(event, status);      }  } diff --git a/futures/src/subscription.rs b/futures/src/subscription.rs index 18c66a5a..801c2694 100644 --- a/futures/src/subscription.rs +++ b/futures/src/subscription.rs @@ -3,7 +3,20 @@ mod tracker;  pub use tracker::Tracker; -use crate::BoxStream; +use crate::core::event::{self, Event}; +use crate::core::window; +use crate::core::Hasher; +use crate::futures::{Future, Stream}; +use crate::{BoxStream, MaybeSend}; + +use futures::channel::mpsc; +use futures::never::Never; +use std::hash::Hash; + +/// A stream of runtime events. +/// +/// It is the input of a [`Subscription`]. +pub type EventStream = BoxStream<(Event, event::Status)>;  /// A request to listen to external events.  /// @@ -16,19 +29,13 @@ use crate::BoxStream;  /// For instance, you can use a [`Subscription`] to listen to a WebSocket  /// connection, keyboard presses, mouse events, time ticks, etc.  /// -/// This type is normally aliased by runtimes with a specific `Event` and/or -/// `Hasher`. -///  /// [`Command`]: crate::Command  #[must_use = "`Subscription` must be returned to runtime to take effect"] -pub struct Subscription<Hasher, Event, Output> { -    recipes: Vec<Box<dyn Recipe<Hasher, Event, Output = Output>>>, +pub struct Subscription<Message> { +    recipes: Vec<Box<dyn Recipe<Output = Message>>>,  } -impl<H, E, O> Subscription<H, E, O> -where -    H: std::hash::Hasher, -{ +impl<Message> Subscription<Message> {      /// Returns an empty [`Subscription`] that will not produce any output.      pub fn none() -> Self {          Self { @@ -38,7 +45,7 @@ where      /// Creates a [`Subscription`] from a [`Recipe`] describing it.      pub fn from_recipe( -        recipe: impl Recipe<H, E, Output = O> + 'static, +        recipe: impl Recipe<Output = Message> + 'static,      ) -> Self {          Self {              recipes: vec![Box::new(recipe)], @@ -48,7 +55,7 @@ where      /// Batches all the provided subscriptions and returns the resulting      /// [`Subscription`].      pub fn batch( -        subscriptions: impl IntoIterator<Item = Subscription<H, E, O>>, +        subscriptions: impl IntoIterator<Item = Subscription<Message>>,      ) -> Self {          Self {              recipes: subscriptions @@ -59,18 +66,16 @@ where      }      /// Returns the different recipes of the [`Subscription`]. -    pub fn recipes(self) -> Vec<Box<dyn Recipe<H, E, Output = O>>> { +    pub fn into_recipes(self) -> Vec<Box<dyn Recipe<Output = Message>>> {          self.recipes      }      /// Adds a value to the [`Subscription`] context.      ///      /// The value will be part of the identity of a [`Subscription`]. -    pub fn with<T>(mut self, value: T) -> Subscription<H, E, (T, O)> +    pub fn with<T>(mut self, value: T) -> Subscription<(T, Message)>      where -        H: 'static, -        E: 'static, -        O: 'static, +        Message: 'static,          T: std::hash::Hash + Clone + Send + Sync + 'static,      {          Subscription { @@ -79,18 +84,16 @@ where                  .drain(..)                  .map(|recipe| {                      Box::new(With::new(recipe, value.clone())) -                        as Box<dyn Recipe<H, E, Output = (T, O)>> +                        as Box<dyn Recipe<Output = (T, Message)>>                  })                  .collect(),          }      }      /// Transforms the [`Subscription`] output with the given function. -    pub fn map<A>(mut self, f: fn(O) -> A) -> Subscription<H, E, A> +    pub fn map<A>(mut self, f: fn(Message) -> A) -> Subscription<A>      where -        H: 'static, -        E: 'static, -        O: 'static, +        Message: 'static,          A: 'static,      {          Subscription { @@ -98,15 +101,14 @@ where                  .recipes                  .drain(..)                  .map(|recipe| { -                    Box::new(Map::new(recipe, f)) -                        as Box<dyn Recipe<H, E, Output = A>> +                    Box::new(Map::new(recipe, f)) as Box<dyn Recipe<Output = A>>                  })                  .collect(),          }      }  } -impl<I, O, H> std::fmt::Debug for Subscription<I, O, H> { +impl<Message> std::fmt::Debug for Subscription<Message> {      fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {          f.debug_struct("Subscription").finish()      } @@ -129,7 +131,7 @@ impl<I, O, H> std::fmt::Debug for Subscription<I, O, H> {  /// [examples]: https://github.com/iced-rs/iced/tree/0.9/examples  /// [`download_progress`]: https://github.com/iced-rs/iced/tree/0.9/examples/download_progress  /// [`stopwatch`]: https://github.com/iced-rs/iced/tree/0.9/examples/stopwatch -pub trait Recipe<Hasher: std::hash::Hasher, Event> { +pub trait Recipe {      /// The events that will be produced by a [`Subscription`] with this      /// [`Recipe`].      type Output; @@ -141,45 +143,33 @@ pub trait Recipe<Hasher: std::hash::Hasher, Event> {      /// Executes the [`Recipe`] and produces the stream of events of its      /// [`Subscription`]. -    /// -    /// It receives some stream of generic events, which is normally defined by -    /// shells. -    fn stream( -        self: Box<Self>, -        input: BoxStream<Event>, -    ) -> BoxStream<Self::Output>; +    fn stream(self: Box<Self>, input: EventStream) -> BoxStream<Self::Output>;  } -struct Map<Hasher, Event, A, B> { -    recipe: Box<dyn Recipe<Hasher, Event, Output = A>>, +struct Map<A, B> { +    recipe: Box<dyn Recipe<Output = A>>,      mapper: fn(A) -> B,  } -impl<H, E, A, B> Map<H, E, A, B> { -    fn new( -        recipe: Box<dyn Recipe<H, E, Output = A>>, -        mapper: fn(A) -> B, -    ) -> Self { +impl<A, B> Map<A, B> { +    fn new(recipe: Box<dyn Recipe<Output = A>>, mapper: fn(A) -> B) -> Self {          Map { recipe, mapper }      }  } -impl<H, E, A, B> Recipe<H, E> for Map<H, E, A, B> +impl<A, B> Recipe for Map<A, B>  where      A: 'static,      B: 'static, -    H: std::hash::Hasher,  {      type Output = B; -    fn hash(&self, state: &mut H) { -        use std::hash::Hash; - +    fn hash(&self, state: &mut Hasher) {          self.recipe.hash(state);          self.mapper.hash(state);      } -    fn stream(self: Box<Self>, input: BoxStream<E>) -> BoxStream<Self::Output> { +    fn stream(self: Box<Self>, input: EventStream) -> BoxStream<Self::Output> {          use futures::StreamExt;          let mapper = self.mapper; @@ -188,34 +178,31 @@ where      }  } -struct With<Hasher, Event, A, B> { -    recipe: Box<dyn Recipe<Hasher, Event, Output = A>>, +struct With<A, B> { +    recipe: Box<dyn Recipe<Output = A>>,      value: B,  } -impl<H, E, A, B> With<H, E, A, B> { -    fn new(recipe: Box<dyn Recipe<H, E, Output = A>>, value: B) -> Self { +impl<A, B> With<A, B> { +    fn new(recipe: Box<dyn Recipe<Output = A>>, value: B) -> Self {          With { recipe, value }      }  } -impl<H, E, A, B> Recipe<H, E> for With<H, E, A, B> +impl<A, B> Recipe for With<A, B>  where      A: 'static,      B: 'static + std::hash::Hash + Clone + Send + Sync, -    H: std::hash::Hasher,  {      type Output = (B, A); -    fn hash(&self, state: &mut H) { -        use std::hash::Hash; - +    fn hash(&self, state: &mut Hasher) {          std::any::TypeId::of::<B>().hash(state);          self.value.hash(state);          self.recipe.hash(state);      } -    fn stream(self: Box<Self>, input: BoxStream<E>) -> BoxStream<Self::Output> { +    fn stream(self: Box<Self>, input: EventStream) -> BoxStream<Self::Output> {          use futures::StreamExt;          let value = self.value; @@ -227,3 +214,253 @@ where          )      }  } + +/// Returns a [`Subscription`] to all the ignored runtime events. +/// +/// This subscription will notify your application of any [`Event`] that was +/// not captured by any widget. +pub fn events() -> Subscription<Event> { +    events_with(|event, status| match status { +        event::Status::Ignored => Some(event), +        event::Status::Captured => None, +    }) +} + +/// Returns a [`Subscription`] that filters all the runtime events with the +/// provided function, producing messages accordingly. +/// +/// This subscription will call the provided function for every [`Event`] +/// handled by the runtime. If the function: +/// +/// - Returns `None`, the [`Event`] will be discarded. +/// - Returns `Some` message, the `Message` will be produced. +pub fn events_with<Message>( +    f: fn(Event, event::Status) -> Option<Message>, +) -> Subscription<Message> +where +    Message: 'static + MaybeSend, +{ +    #[derive(Hash)] +    struct EventsWith; + +    Subscription::from_recipe(Runner { +        id: (EventsWith, f), +        spawn: move |events| { +            use futures::future; +            use futures::stream::StreamExt; + +            events.filter_map(move |(event, status)| { +                future::ready(match event { +                    Event::Window(window::Event::RedrawRequested(_)) => None, +                    _ => f(event, status), +                }) +            }) +        }, +    }) +} + +/// Returns a [`Subscription`] that produces a message for every runtime event, +/// including the redraw request events. +/// +/// **Warning:** This [`Subscription`], if unfiltered, may produce messages in +/// an infinite loop. +pub fn raw_events<Message>( +    f: fn(Event, event::Status) -> Option<Message>, +) -> Subscription<Message> +where +    Message: 'static + MaybeSend, +{ +    #[derive(Hash)] +    struct RawEvents; + +    Subscription::from_recipe(Runner { +        id: (RawEvents, f), +        spawn: move |events| { +            use futures::future; +            use futures::stream::StreamExt; + +            events.filter_map(move |(event, status)| { +                future::ready(f(event, status)) +            }) +        }, +    }) +} + +/// Returns a [`Subscription`] that will call the given function to create and +/// asynchronously run the given [`Stream`]. +pub fn run<S, Message>(builder: fn() -> S) -> Subscription<Message> +where +    S: Stream<Item = Message> + MaybeSend + 'static, +    Message: 'static, +{ +    Subscription::from_recipe(Runner { +        id: builder, +        spawn: move |_| builder(), +    }) +} + +/// Returns a [`Subscription`] that will create and asynchronously run the +/// given [`Stream`]. +/// +/// The `id` will be used to uniquely identify the [`Subscription`]. +pub fn run_with_id<I, S, Message>(id: I, stream: S) -> Subscription<Message> +where +    I: Hash + 'static, +    S: Stream<Item = Message> + MaybeSend + 'static, +    Message: 'static, +{ +    Subscription::from_recipe(Runner { +        id, +        spawn: move |_| stream, +    }) +} + +/// Returns a [`Subscription`] that will create and asynchronously run a +/// [`Stream`] that will call the provided closure to produce every `Message`. +/// +/// The `id` will be used to uniquely identify the [`Subscription`]. +pub fn unfold<I, T, Fut, Message>( +    id: I, +    initial: T, +    mut f: impl FnMut(T) -> Fut + MaybeSend + Sync + 'static, +) -> Subscription<Message> +where +    I: Hash + 'static, +    T: MaybeSend + 'static, +    Fut: Future<Output = (Message, T)> + MaybeSend + 'static, +    Message: 'static + MaybeSend, +{ +    use futures::future::FutureExt; + +    run_with_id( +        id, +        futures::stream::unfold(initial, move |state| f(state).map(Some)), +    ) +} + +/// Creates a [`Subscription`] that publishes the events sent from a [`Future`] +/// to an [`mpsc::Sender`] with the given bounds. +/// +/// # Creating an asynchronous worker with bidirectional communication +/// You can leverage this helper to create a [`Subscription`] that spawns +/// an asynchronous worker in the background and establish a channel of +/// communication with an `iced` application. +/// +/// You can achieve this by creating an `mpsc` channel inside the closure +/// and returning the `Sender` as a `Message` for the `Application`: +/// +/// ``` +/// use iced_futures::subscription::{self, Subscription}; +/// use iced_futures::futures::channel::mpsc; +/// use iced_futures::futures::sink::SinkExt; +/// +/// pub enum Event { +///     Ready(mpsc::Sender<Input>), +///     WorkFinished, +///     // ... +/// } +/// +/// enum Input { +///     DoSomeWork, +///     // ... +/// } +/// +/// enum State { +///     Starting, +///     Ready(mpsc::Receiver<Input>), +/// } +/// +/// fn some_worker() -> Subscription<Event> { +///     struct SomeWorker; +/// +///     subscription::channel(std::any::TypeId::of::<SomeWorker>(), 100, |mut output| async move { +///         let mut state = State::Starting; +/// +///         loop { +///             match &mut state { +///                 State::Starting => { +///                     // Create channel +///                     let (sender, receiver) = mpsc::channel(100); +/// +///                     // Send the sender back to the application +///                     output.send(Event::Ready(sender)).await; +/// +///                     // We are ready to receive messages +///                     state = State::Ready(receiver); +///                 } +///                 State::Ready(receiver) => { +///                     use iced_futures::futures::StreamExt; +/// +///                     // Read next input sent from `Application` +///                     let input = receiver.select_next_some().await; +/// +///                     match input { +///                         Input::DoSomeWork => { +///                             // Do some async work... +/// +///                             // Finally, we can optionally produce a message to tell the +///                             // `Application` the work is done +///                             output.send(Event::WorkFinished).await; +///                         } +///                     } +///                 } +///             } +///         } +///     }) +/// } +/// ``` +/// +/// Check out the [`websocket`] example, which showcases this pattern to maintain a WebSocket +/// connection open. +/// +/// [`websocket`]: https://github.com/iced-rs/iced/tree/0.9/examples/websocket +pub fn channel<I, Fut, Message>( +    id: I, +    size: usize, +    f: impl Fn(mpsc::Sender<Message>) -> Fut + MaybeSend + Sync + 'static, +) -> Subscription<Message> +where +    I: Hash + 'static, +    Fut: Future<Output = Never> + MaybeSend + 'static, +    Message: 'static + MaybeSend, +{ +    use futures::stream::{self, StreamExt}; + +    Subscription::from_recipe(Runner { +        id, +        spawn: move |_| { +            let (sender, receiver) = mpsc::channel(size); + +            let runner = stream::once(f(sender)).map(|_| unreachable!()); + +            stream::select(receiver, runner) +        }, +    }) +} + +struct Runner<I, F, S, Message> +where +    F: FnOnce(EventStream) -> S, +    S: Stream<Item = Message>, +{ +    id: I, +    spawn: F, +} + +impl<I, S, F, Message> Recipe for Runner<I, F, S, Message> +where +    I: Hash + 'static, +    F: FnOnce(EventStream) -> S, +    S: Stream<Item = Message> + MaybeSend + 'static, +{ +    type Output = Message; + +    fn hash(&self, state: &mut Hasher) { +        std::any::TypeId::of::<I>().hash(state); +        self.id.hash(state); +    } + +    fn stream(self: Box<Self>, input: EventStream) -> BoxStream<Self::Output> { +        crate::boxed_stream((self.spawn)(input)) +    } +} diff --git a/futures/src/subscription/tracker.rs b/futures/src/subscription/tracker.rs index 9fe110b0..ae71cd25 100644 --- a/futures/src/subscription/tracker.rs +++ b/futures/src/subscription/tracker.rs @@ -1,38 +1,35 @@ -use crate::{BoxFuture, MaybeSend, Subscription}; +use crate::core::event::{self, Event}; +use crate::core::Hasher; +use crate::subscription::Recipe; +use crate::{BoxFuture, MaybeSend}; -use futures::{ -    channel::mpsc, -    sink::{Sink, SinkExt}, -}; -use std::{collections::HashMap, marker::PhantomData}; +use futures::channel::mpsc; +use futures::sink::{Sink, SinkExt}; + +use std::collections::HashMap; +use std::hash::Hasher as _;  /// A registry of subscription streams.  ///  /// If you have an application that continuously returns a [`Subscription`],  /// you can use a [`Tracker`] to keep track of the different recipes and keep  /// its executions alive. -#[derive(Debug)] -pub struct Tracker<Hasher, Event> { -    subscriptions: HashMap<u64, Execution<Event>>, -    _hasher: PhantomData<Hasher>, +#[derive(Debug, Default)] +pub struct Tracker { +    subscriptions: HashMap<u64, Execution>,  }  #[derive(Debug)] -pub struct Execution<Event> { +pub struct Execution {      _cancel: futures::channel::oneshot::Sender<()>, -    listener: Option<futures::channel::mpsc::Sender<Event>>, +    listener: Option<futures::channel::mpsc::Sender<(Event, event::Status)>>,  } -impl<Hasher, Event> Tracker<Hasher, Event> -where -    Hasher: std::hash::Hasher + Default, -    Event: 'static + Send + Clone, -{ +impl Tracker {      /// Creates a new empty [`Tracker`].      pub fn new() -> Self {          Self {              subscriptions: HashMap::new(), -            _hasher: PhantomData,          }      } @@ -56,7 +53,7 @@ where      /// [`Recipe`]: crate::subscription::Recipe      pub fn update<Message, Receiver>(          &mut self, -        subscription: Subscription<Hasher, Event, Message>, +        recipes: impl Iterator<Item = Box<dyn Recipe<Output = Message>>>,          receiver: Receiver,      ) -> Vec<BoxFuture<()>>      where @@ -70,8 +67,6 @@ where          use futures::stream::StreamExt;          let mut futures: Vec<BoxFuture<()>> = Vec::new(); - -        let recipes = subscription.recipes();          let mut alive = std::collections::HashSet::new();          for recipe in recipes { @@ -142,12 +137,12 @@ where      /// currently open.      ///      /// [`Recipe::stream`]: crate::subscription::Recipe::stream -    pub fn broadcast(&mut self, event: Event) { +    pub fn broadcast(&mut self, event: Event, status: event::Status) {          self.subscriptions              .values_mut()              .filter_map(|connection| connection.listener.as_mut())              .for_each(|listener| { -                if let Err(error) = listener.try_send(event.clone()) { +                if let Err(error) = listener.try_send((event.clone(), status)) {                      log::warn!(                          "Error sending event to subscription: {:?}",                          error @@ -156,13 +151,3 @@ where              });      }  } - -impl<Hasher, Event> Default for Tracker<Hasher, Event> -where -    Hasher: std::hash::Hasher + Default, -    Event: 'static + Send + Clone, -{ -    fn default() -> Self { -        Self::new() -    } -} | 
