diff options
Diffstat (limited to 'futures/src/time.rs')
-rw-r--r-- | futures/src/time.rs | 55 |
1 files changed, 49 insertions, 6 deletions
diff --git a/futures/src/time.rs b/futures/src/time.rs index e87b4a83..86b4a4e7 100644 --- a/futures/src/time.rs +++ b/futures/src/time.rs @@ -5,8 +5,6 @@ use crate::subscription::{self, Subscription}; /// /// The first message is produced after a `duration`, and then continues to /// produce more messages every `duration` after that. -/// -/// [`Subscription`]: ../subscription/struct.Subscription.html pub fn every<H: std::hash::Hasher, E>( duration: std::time::Duration, ) -> Subscription<H, E, std::time::Instant> { @@ -15,6 +13,33 @@ pub fn every<H: std::hash::Hasher, E>( struct Every(std::time::Duration); +#[cfg(all( + not(any(feature = "tokio_old", feature = "tokio", feature = "async-std")), + feature = "smol" +))] +impl<H, E> subscription::Recipe<H, E> for Every +where + H: std::hash::Hasher, +{ + type Output = std::time::Instant; + + fn hash(&self, state: &mut H) { + use std::hash::Hash; + + std::any::TypeId::of::<Self>().hash(state); + self.0.hash(state); + } + + fn stream( + self: Box<Self>, + _input: futures::stream::BoxStream<'static, E>, + ) -> futures::stream::BoxStream<'static, Self::Output> { + use futures::stream::StreamExt; + + smol::Timer::interval(self.0).boxed() + } +} + #[cfg(feature = "async-std")] impl<H, E> subscription::Recipe<H, E> for Every where @@ -41,7 +66,10 @@ where } } -#[cfg(all(feature = "tokio", not(feature = "async-std")))] +#[cfg(all( + any(feature = "tokio", feature = "tokio_old"), + not(any(feature = "async-std", feature = "smol")) +))] impl<H, E> subscription::Recipe<H, E> for Every where H: std::hash::Hasher, @@ -61,10 +89,25 @@ where ) -> futures::stream::BoxStream<'static, Self::Output> { use futures::stream::StreamExt; + #[cfg(feature = "tokio_old")] + use tokio_old as tokio; + let start = tokio::time::Instant::now() + self.0; - tokio::time::interval_at(start, self.0) - .map(|_| std::time::Instant::now()) - .boxed() + let stream = { + #[cfg(feature = "tokio")] + { + futures::stream::unfold( + tokio::time::interval_at(start, self.0), + |mut interval| async move { + Some((interval.tick().await, interval)) + }, + ) + } + #[cfg(feature = "tokio_old")] + tokio::time::interval_at(start, self.0) + }; + + stream.map(tokio::time::Instant::into_std).boxed() } } |