diff options
author | 2025-01-24 16:38:56 +0100 | |
---|---|---|
committer | 2025-01-24 16:45:18 +0100 | |
commit | 3a07c631add426a308607055d1b46d934f21e7f6 (patch) | |
tree | b3562fa55b828d5cc64b41d10bd0820a26781613 /futures/src/backend/native/tokio.rs | |
parent | 75a6f32a5ef49bb8e6d16506d84b07822a33c41b (diff) | |
download | iced-3a07c631add426a308607055d1b46d934f21e7f6.tar.gz iced-3a07c631add426a308607055d1b46d934f21e7f6.tar.bz2 iced-3a07c631add426a308607055d1b46d934f21e7f6.zip |
Implement `time::repeat` and simplify `Subscription::run_with`
Diffstat (limited to 'futures/src/backend/native/tokio.rs')
-rw-r--r-- | futures/src/backend/native/tokio.rs | 60 |
1 files changed, 33 insertions, 27 deletions
diff --git a/futures/src/backend/native/tokio.rs b/futures/src/backend/native/tokio.rs index 9dc3593d..e0be83a6 100644 --- a/futures/src/backend/native/tokio.rs +++ b/futures/src/backend/native/tokio.rs @@ -22,40 +22,25 @@ impl crate::Executor for Executor { pub mod time { //! Listen and react to time. - use crate::subscription::{self, Hasher, Subscription}; + use crate::core::time::{Duration, Instant}; + use crate::stream; + use crate::subscription::Subscription; + use crate::MaybeSend; + + use futures::SinkExt; + use std::future::Future; /// Returns a [`Subscription`] that produces messages at a set interval. /// /// The first message is produced after a `duration`, and then continues to /// produce more messages every `duration` after that. - pub fn every( - duration: std::time::Duration, - ) -> Subscription<std::time::Instant> { - subscription::from_recipe(Every(duration)) - } - - #[derive(Debug)] - struct Every(std::time::Duration); - - impl subscription::Recipe for Every { - type Output = std::time::Instant; - - fn hash(&self, state: &mut Hasher) { - use std::hash::Hash; - - std::any::TypeId::of::<Self>().hash(state); - self.0.hash(state); - } - - fn stream( - self: Box<Self>, - _input: subscription::EventStream, - ) -> futures::stream::BoxStream<'static, Self::Output> { + pub fn every(duration: Duration) -> Subscription<Instant> { + Subscription::run_with(duration, |duration| { use futures::stream::StreamExt; - let start = tokio::time::Instant::now() + self.0; + let start = tokio::time::Instant::now() + *duration; - let mut interval = tokio::time::interval_at(start, self.0); + let mut interval = tokio::time::interval_at(start, *duration); interval.set_missed_tick_behavior( tokio::time::MissedTickBehavior::Skip, ); @@ -67,6 +52,27 @@ pub mod time { }; stream.map(tokio::time::Instant::into_std).boxed() - } + }) + } + + /// Returns a [`Subscription`] that runs the given async function at a + /// set interval; producing the result of the function as output. + pub fn repeat<F, T>(f: fn() -> F, interval: Duration) -> Subscription<T> + where + F: Future<Output = T> + MaybeSend + 'static, + T: MaybeSend + 'static, + { + Subscription::run_with((f, interval), |(f, interval)| { + let f = *f; + let interval = *interval; + + stream::channel(1, move |mut output| async move { + loop { + let _ = output.send(f().await).await; + + tokio::time::sleep(interval).await; + } + }) + }) } } |