diff options
Diffstat (limited to '')
| -rw-r--r-- | futures/src/backend/native/tokio.rs | 60 | 
1 files changed, 33 insertions, 27 deletions
| diff --git a/futures/src/backend/native/tokio.rs b/futures/src/backend/native/tokio.rs index 9dc3593d..e0be83a6 100644 --- a/futures/src/backend/native/tokio.rs +++ b/futures/src/backend/native/tokio.rs @@ -22,40 +22,25 @@ impl crate::Executor for Executor {  pub mod time {      //! Listen and react to time. -    use crate::subscription::{self, Hasher, Subscription}; +    use crate::core::time::{Duration, Instant}; +    use crate::stream; +    use crate::subscription::Subscription; +    use crate::MaybeSend; + +    use futures::SinkExt; +    use std::future::Future;      /// Returns a [`Subscription`] that produces messages at a set interval.      ///      /// The first message is produced after a `duration`, and then continues to      /// produce more messages every `duration` after that. -    pub fn every( -        duration: std::time::Duration, -    ) -> Subscription<std::time::Instant> { -        subscription::from_recipe(Every(duration)) -    } - -    #[derive(Debug)] -    struct Every(std::time::Duration); - -    impl subscription::Recipe for Every { -        type Output = std::time::Instant; - -        fn hash(&self, state: &mut Hasher) { -            use std::hash::Hash; - -            std::any::TypeId::of::<Self>().hash(state); -            self.0.hash(state); -        } - -        fn stream( -            self: Box<Self>, -            _input: subscription::EventStream, -        ) -> futures::stream::BoxStream<'static, Self::Output> { +    pub fn every(duration: Duration) -> Subscription<Instant> { +        Subscription::run_with(duration, |duration| {              use futures::stream::StreamExt; -            let start = tokio::time::Instant::now() + self.0; +            let start = tokio::time::Instant::now() + *duration; -            let mut interval = tokio::time::interval_at(start, self.0); +            let mut interval = tokio::time::interval_at(start, *duration);              interval.set_missed_tick_behavior(                  tokio::time::MissedTickBehavior::Skip,              ); @@ -67,6 +52,27 @@ pub mod time {              };              stream.map(tokio::time::Instant::into_std).boxed() -        } +        }) +    } + +    /// Returns a [`Subscription`] that runs the given async function at a +    /// set interval; producing the result of the function as output. +    pub fn repeat<F, T>(f: fn() -> F, interval: Duration) -> Subscription<T> +    where +        F: Future<Output = T> + MaybeSend + 'static, +        T: MaybeSend + 'static, +    { +        Subscription::run_with((f, interval), |(f, interval)| { +            let f = *f; +            let interval = *interval; + +            stream::channel(1, move |mut output| async move { +                loop { +                    let _ = output.send(f().await).await; + +                    tokio::time::sleep(interval).await; +                } +            }) +        })      }  } | 
