summaryrefslogtreecommitdiffstats
path: root/futures/src/time.rs
diff options
context:
space:
mode:
Diffstat (limited to 'futures/src/time.rs')
-rw-r--r--futures/src/time.rs55
1 files changed, 49 insertions, 6 deletions
diff --git a/futures/src/time.rs b/futures/src/time.rs
index e87b4a83..86b4a4e7 100644
--- a/futures/src/time.rs
+++ b/futures/src/time.rs
@@ -5,8 +5,6 @@ use crate::subscription::{self, Subscription};
///
/// The first message is produced after a `duration`, and then continues to
/// produce more messages every `duration` after that.
-///
-/// [`Subscription`]: ../subscription/struct.Subscription.html
pub fn every<H: std::hash::Hasher, E>(
duration: std::time::Duration,
) -> Subscription<H, E, std::time::Instant> {
@@ -15,6 +13,33 @@ pub fn every<H: std::hash::Hasher, E>(
struct Every(std::time::Duration);
+#[cfg(all(
+ not(any(feature = "tokio_old", feature = "tokio", feature = "async-std")),
+ feature = "smol"
+))]
+impl<H, E> subscription::Recipe<H, E> for Every
+where
+ H: std::hash::Hasher,
+{
+ type Output = std::time::Instant;
+
+ fn hash(&self, state: &mut H) {
+ use std::hash::Hash;
+
+ std::any::TypeId::of::<Self>().hash(state);
+ self.0.hash(state);
+ }
+
+ fn stream(
+ self: Box<Self>,
+ _input: futures::stream::BoxStream<'static, E>,
+ ) -> futures::stream::BoxStream<'static, Self::Output> {
+ use futures::stream::StreamExt;
+
+ smol::Timer::interval(self.0).boxed()
+ }
+}
+
#[cfg(feature = "async-std")]
impl<H, E> subscription::Recipe<H, E> for Every
where
@@ -41,7 +66,10 @@ where
}
}
-#[cfg(all(feature = "tokio", not(feature = "async-std")))]
+#[cfg(all(
+ any(feature = "tokio", feature = "tokio_old"),
+ not(any(feature = "async-std", feature = "smol"))
+))]
impl<H, E> subscription::Recipe<H, E> for Every
where
H: std::hash::Hasher,
@@ -61,10 +89,25 @@ where
) -> futures::stream::BoxStream<'static, Self::Output> {
use futures::stream::StreamExt;
+ #[cfg(feature = "tokio_old")]
+ use tokio_old as tokio;
+
let start = tokio::time::Instant::now() + self.0;
- tokio::time::interval_at(start, self.0)
- .map(|_| std::time::Instant::now())
- .boxed()
+ let stream = {
+ #[cfg(feature = "tokio")]
+ {
+ futures::stream::unfold(
+ tokio::time::interval_at(start, self.0),
+ |mut interval| async move {
+ Some((interval.tick().await, interval))
+ },
+ )
+ }
+ #[cfg(feature = "tokio_old")]
+ tokio::time::interval_at(start, self.0)
+ };
+
+ stream.map(tokio::time::Instant::into_std).boxed()
}
}