diff options
| author | 2025-01-24 18:59:44 +0100 | |
|---|---|---|
| committer | 2025-01-24 18:59:44 +0100 | |
| commit | 5eedf5798c0fcd92cfd3f304ce9454bb6c274f09 (patch) | |
| tree | 0d8d28ef8c062ea1a1651b77531ed091dfe5e561 /futures | |
| parent | 654c4b37148bda655f0c6676845d2b4ead801f40 (diff) | |
| parent | 3d893ae01b64eb51b2f5854949c694090118e052 (diff) | |
| download | iced-5eedf5798c0fcd92cfd3f304ce9454bb6c274f09.tar.gz iced-5eedf5798c0fcd92cfd3f304ce9454bb6c274f09.tar.bz2 iced-5eedf5798c0fcd92cfd3f304ce9454bb6c274f09.zip | |
Merge pull request #2747 from iced-rs/time-repeat-subscription
Implement `time::repeat` and simplify `Subscription::run_with`
Diffstat (limited to '')
| -rw-r--r-- | futures/src/backend/native/tokio.rs | 60 | ||||
| -rw-r--r-- | futures/src/subscription.rs | 26 | 
2 files changed, 46 insertions, 40 deletions
| diff --git a/futures/src/backend/native/tokio.rs b/futures/src/backend/native/tokio.rs index 9dc3593d..e0be83a6 100644 --- a/futures/src/backend/native/tokio.rs +++ b/futures/src/backend/native/tokio.rs @@ -22,40 +22,25 @@ impl crate::Executor for Executor {  pub mod time {      //! Listen and react to time. -    use crate::subscription::{self, Hasher, Subscription}; +    use crate::core::time::{Duration, Instant}; +    use crate::stream; +    use crate::subscription::Subscription; +    use crate::MaybeSend; + +    use futures::SinkExt; +    use std::future::Future;      /// Returns a [`Subscription`] that produces messages at a set interval.      ///      /// The first message is produced after a `duration`, and then continues to      /// produce more messages every `duration` after that. -    pub fn every( -        duration: std::time::Duration, -    ) -> Subscription<std::time::Instant> { -        subscription::from_recipe(Every(duration)) -    } - -    #[derive(Debug)] -    struct Every(std::time::Duration); - -    impl subscription::Recipe for Every { -        type Output = std::time::Instant; - -        fn hash(&self, state: &mut Hasher) { -            use std::hash::Hash; - -            std::any::TypeId::of::<Self>().hash(state); -            self.0.hash(state); -        } - -        fn stream( -            self: Box<Self>, -            _input: subscription::EventStream, -        ) -> futures::stream::BoxStream<'static, Self::Output> { +    pub fn every(duration: Duration) -> Subscription<Instant> { +        Subscription::run_with(duration, |duration| {              use futures::stream::StreamExt; -            let start = tokio::time::Instant::now() + self.0; +            let start = tokio::time::Instant::now() + *duration; -            let mut interval = tokio::time::interval_at(start, self.0); +            let mut interval = tokio::time::interval_at(start, *duration);              interval.set_missed_tick_behavior(                  tokio::time::MissedTickBehavior::Skip,              ); @@ -67,6 +52,27 @@ pub mod time {              };              stream.map(tokio::time::Instant::into_std).boxed() -        } +        }) +    } + +    /// Returns a [`Subscription`] that runs the given async function at a +    /// set interval; producing the result of the function as output. +    pub fn repeat<F, T>(f: fn() -> F, interval: Duration) -> Subscription<T> +    where +        F: Future<Output = T> + MaybeSend + 'static, +        T: MaybeSend + 'static, +    { +        Subscription::run_with((f, interval), |(f, interval)| { +            let f = *f; +            let interval = *interval; + +            stream::channel(1, move |mut output| async move { +                loop { +                    let _ = output.send(f().await).await; + +                    tokio::time::sleep(interval).await; +                } +            }) +        })      }  } diff --git a/futures/src/subscription.rs b/futures/src/subscription.rs index eaea1a1f..82cba9a1 100644 --- a/futures/src/subscription.rs +++ b/futures/src/subscription.rs @@ -202,8 +202,8 @@ impl<T> Subscription<T> {          T: 'static,      {          from_recipe(Runner { -            id: builder, -            spawn: move |_| builder(), +            data: builder, +            spawn: |builder, _| builder(),          })      } @@ -211,15 +211,15 @@ impl<T> Subscription<T> {      /// given [`Stream`].      ///      /// The `id` will be used to uniquely identify the [`Subscription`]. -    pub fn run_with_id<I, S>(id: I, stream: S) -> Subscription<T> +    pub fn run_with<D, S>(data: D, builder: fn(&D) -> S) -> Self      where -        I: Hash + 'static, +        D: Hash + 'static,          S: Stream<Item = T> + MaybeSend + 'static,          T: 'static,      {          from_recipe(Runner { -            id, -            spawn: move |_| stream, +            data: (data, builder), +            spawn: |(data, builder), _| builder(data),          })      } @@ -423,8 +423,8 @@ where      T: 'static + MaybeSend,  {      from_recipe(Runner { -        id, -        spawn: |events| { +        data: id, +        spawn: |_, events| {              use futures::future;              use futures::stream::StreamExt; @@ -435,27 +435,27 @@ where  struct Runner<I, F, S, T>  where -    F: FnOnce(EventStream) -> S, +    F: FnOnce(&I, EventStream) -> S,      S: Stream<Item = T>,  { -    id: I, +    data: I,      spawn: F,  }  impl<I, F, S, T> Recipe for Runner<I, F, S, T>  where      I: Hash + 'static, -    F: FnOnce(EventStream) -> S, +    F: FnOnce(&I, EventStream) -> S,      S: Stream<Item = T> + MaybeSend + 'static,  {      type Output = T;      fn hash(&self, state: &mut Hasher) {          std::any::TypeId::of::<I>().hash(state); -        self.id.hash(state); +        self.data.hash(state);      }      fn stream(self: Box<Self>, input: EventStream) -> BoxStream<Self::Output> { -        crate::boxed_stream((self.spawn)(input)) +        crate::boxed_stream((self.spawn)(&self.data, input))      }  } | 
