summaryrefslogtreecommitdiffstats
path: root/futures/src/time.rs
diff options
context:
space:
mode:
Diffstat (limited to 'futures/src/time.rs')
-rw-r--r--futures/src/time.rs47
1 files changed, 43 insertions, 4 deletions
diff --git a/futures/src/time.rs b/futures/src/time.rs
index 5e9ea436..86b4a4e7 100644
--- a/futures/src/time.rs
+++ b/futures/src/time.rs
@@ -13,6 +13,33 @@ pub fn every<H: std::hash::Hasher, E>(
struct Every(std::time::Duration);
+#[cfg(all(
+ not(any(feature = "tokio_old", feature = "tokio", feature = "async-std")),
+ feature = "smol"
+))]
+impl<H, E> subscription::Recipe<H, E> for Every
+where
+ H: std::hash::Hasher,
+{
+ type Output = std::time::Instant;
+
+ fn hash(&self, state: &mut H) {
+ use std::hash::Hash;
+
+ std::any::TypeId::of::<Self>().hash(state);
+ self.0.hash(state);
+ }
+
+ fn stream(
+ self: Box<Self>,
+ _input: futures::stream::BoxStream<'static, E>,
+ ) -> futures::stream::BoxStream<'static, Self::Output> {
+ use futures::stream::StreamExt;
+
+ smol::Timer::interval(self.0).boxed()
+ }
+}
+
#[cfg(feature = "async-std")]
impl<H, E> subscription::Recipe<H, E> for Every
where
@@ -41,7 +68,7 @@ where
#[cfg(all(
any(feature = "tokio", feature = "tokio_old"),
- not(feature = "async-std")
+ not(any(feature = "async-std", feature = "smol"))
))]
impl<H, E> subscription::Recipe<H, E> for Every
where
@@ -67,8 +94,20 @@ where
let start = tokio::time::Instant::now() + self.0;
- tokio::time::interval_at(start, self.0)
- .map(|_| std::time::Instant::now())
- .boxed()
+ let stream = {
+ #[cfg(feature = "tokio")]
+ {
+ futures::stream::unfold(
+ tokio::time::interval_at(start, self.0),
+ |mut interval| async move {
+ Some((interval.tick().await, interval))
+ },
+ )
+ }
+ #[cfg(feature = "tokio_old")]
+ tokio::time::interval_at(start, self.0)
+ };
+
+ stream.map(tokio::time::Instant::into_std).boxed()
}
}