From 3a07c631add426a308607055d1b46d934f21e7f6 Mon Sep 17 00:00:00 2001 From: Héctor Ramón Jiménez Date: Fri, 24 Jan 2025 16:38:56 +0100 Subject: Implement `time::repeat` and simplify `Subscription::run_with` --- futures/src/backend/native/tokio.rs | 60 ++++++++++++++++++++----------------- futures/src/subscription.rs | 26 ++++++++-------- 2 files changed, 46 insertions(+), 40 deletions(-) (limited to 'futures') diff --git a/futures/src/backend/native/tokio.rs b/futures/src/backend/native/tokio.rs index 9dc3593d..e0be83a6 100644 --- a/futures/src/backend/native/tokio.rs +++ b/futures/src/backend/native/tokio.rs @@ -22,40 +22,25 @@ impl crate::Executor for Executor { pub mod time { //! Listen and react to time. - use crate::subscription::{self, Hasher, Subscription}; + use crate::core::time::{Duration, Instant}; + use crate::stream; + use crate::subscription::Subscription; + use crate::MaybeSend; + + use futures::SinkExt; + use std::future::Future; /// Returns a [`Subscription`] that produces messages at a set interval. /// /// The first message is produced after a `duration`, and then continues to /// produce more messages every `duration` after that. - pub fn every( - duration: std::time::Duration, - ) -> Subscription { - subscription::from_recipe(Every(duration)) - } - - #[derive(Debug)] - struct Every(std::time::Duration); - - impl subscription::Recipe for Every { - type Output = std::time::Instant; - - fn hash(&self, state: &mut Hasher) { - use std::hash::Hash; - - std::any::TypeId::of::().hash(state); - self.0.hash(state); - } - - fn stream( - self: Box, - _input: subscription::EventStream, - ) -> futures::stream::BoxStream<'static, Self::Output> { + pub fn every(duration: Duration) -> Subscription { + Subscription::run_with(duration, |duration| { use futures::stream::StreamExt; - let start = tokio::time::Instant::now() + self.0; + let start = tokio::time::Instant::now() + *duration; - let mut interval = tokio::time::interval_at(start, self.0); + let mut interval = tokio::time::interval_at(start, *duration); interval.set_missed_tick_behavior( tokio::time::MissedTickBehavior::Skip, ); @@ -67,6 +52,27 @@ pub mod time { }; stream.map(tokio::time::Instant::into_std).boxed() - } + }) + } + + /// Returns a [`Subscription`] that runs the given async function at a + /// set interval; producing the result of the function as output. + pub fn repeat(f: fn() -> F, interval: Duration) -> Subscription + where + F: Future + MaybeSend + 'static, + T: MaybeSend + 'static, + { + Subscription::run_with((f, interval), |(f, interval)| { + let f = *f; + let interval = *interval; + + stream::channel(1, move |mut output| async move { + loop { + let _ = output.send(f().await).await; + + tokio::time::sleep(interval).await; + } + }) + }) } } diff --git a/futures/src/subscription.rs b/futures/src/subscription.rs index eaea1a1f..82cba9a1 100644 --- a/futures/src/subscription.rs +++ b/futures/src/subscription.rs @@ -202,8 +202,8 @@ impl Subscription { T: 'static, { from_recipe(Runner { - id: builder, - spawn: move |_| builder(), + data: builder, + spawn: |builder, _| builder(), }) } @@ -211,15 +211,15 @@ impl Subscription { /// given [`Stream`]. /// /// The `id` will be used to uniquely identify the [`Subscription`]. - pub fn run_with_id(id: I, stream: S) -> Subscription + pub fn run_with(data: D, builder: fn(&D) -> S) -> Self where - I: Hash + 'static, + D: Hash + 'static, S: Stream + MaybeSend + 'static, T: 'static, { from_recipe(Runner { - id, - spawn: move |_| stream, + data: (data, builder), + spawn: |(data, builder), _| builder(data), }) } @@ -423,8 +423,8 @@ where T: 'static + MaybeSend, { from_recipe(Runner { - id, - spawn: |events| { + data: id, + spawn: |_, events| { use futures::future; use futures::stream::StreamExt; @@ -435,27 +435,27 @@ where struct Runner where - F: FnOnce(EventStream) -> S, + F: FnOnce(&I, EventStream) -> S, S: Stream, { - id: I, + data: I, spawn: F, } impl Recipe for Runner where I: Hash + 'static, - F: FnOnce(EventStream) -> S, + F: FnOnce(&I, EventStream) -> S, S: Stream + MaybeSend + 'static, { type Output = T; fn hash(&self, state: &mut Hasher) { std::any::TypeId::of::().hash(state); - self.id.hash(state); + self.data.hash(state); } fn stream(self: Box, input: EventStream) -> BoxStream { - crate::boxed_stream((self.spawn)(input)) + crate::boxed_stream((self.spawn)(&self.data, input)) } } -- cgit