diff options
author | 2025-01-24 16:38:56 +0100 | |
---|---|---|
committer | 2025-01-24 16:45:18 +0100 | |
commit | 3a07c631add426a308607055d1b46d934f21e7f6 (patch) | |
tree | b3562fa55b828d5cc64b41d10bd0820a26781613 | |
parent | 75a6f32a5ef49bb8e6d16506d84b07822a33c41b (diff) | |
download | iced-3a07c631add426a308607055d1b46d934f21e7f6.tar.gz iced-3a07c631add426a308607055d1b46d934f21e7f6.tar.bz2 iced-3a07c631add426a308607055d1b46d934f21e7f6.zip |
Implement `time::repeat` and simplify `Subscription::run_with`
-rw-r--r-- | futures/src/backend/native/tokio.rs | 60 | ||||
-rw-r--r-- | futures/src/subscription.rs | 26 | ||||
-rw-r--r-- | src/lib.rs | 4 |
3 files changed, 48 insertions, 42 deletions
diff --git a/futures/src/backend/native/tokio.rs b/futures/src/backend/native/tokio.rs index 9dc3593d..e0be83a6 100644 --- a/futures/src/backend/native/tokio.rs +++ b/futures/src/backend/native/tokio.rs @@ -22,40 +22,25 @@ impl crate::Executor for Executor { pub mod time { //! Listen and react to time. - use crate::subscription::{self, Hasher, Subscription}; + use crate::core::time::{Duration, Instant}; + use crate::stream; + use crate::subscription::Subscription; + use crate::MaybeSend; + + use futures::SinkExt; + use std::future::Future; /// Returns a [`Subscription`] that produces messages at a set interval. /// /// The first message is produced after a `duration`, and then continues to /// produce more messages every `duration` after that. - pub fn every( - duration: std::time::Duration, - ) -> Subscription<std::time::Instant> { - subscription::from_recipe(Every(duration)) - } - - #[derive(Debug)] - struct Every(std::time::Duration); - - impl subscription::Recipe for Every { - type Output = std::time::Instant; - - fn hash(&self, state: &mut Hasher) { - use std::hash::Hash; - - std::any::TypeId::of::<Self>().hash(state); - self.0.hash(state); - } - - fn stream( - self: Box<Self>, - _input: subscription::EventStream, - ) -> futures::stream::BoxStream<'static, Self::Output> { + pub fn every(duration: Duration) -> Subscription<Instant> { + Subscription::run_with(duration, |duration| { use futures::stream::StreamExt; - let start = tokio::time::Instant::now() + self.0; + let start = tokio::time::Instant::now() + *duration; - let mut interval = tokio::time::interval_at(start, self.0); + let mut interval = tokio::time::interval_at(start, *duration); interval.set_missed_tick_behavior( tokio::time::MissedTickBehavior::Skip, ); @@ -67,6 +52,27 @@ pub mod time { }; stream.map(tokio::time::Instant::into_std).boxed() - } + }) + } + + /// Returns a [`Subscription`] that runs the given async function at a + /// set interval; producing the result of the function as output. + pub fn repeat<F, T>(f: fn() -> F, interval: Duration) -> Subscription<T> + where + F: Future<Output = T> + MaybeSend + 'static, + T: MaybeSend + 'static, + { + Subscription::run_with((f, interval), |(f, interval)| { + let f = *f; + let interval = *interval; + + stream::channel(1, move |mut output| async move { + loop { + let _ = output.send(f().await).await; + + tokio::time::sleep(interval).await; + } + }) + }) } } diff --git a/futures/src/subscription.rs b/futures/src/subscription.rs index eaea1a1f..82cba9a1 100644 --- a/futures/src/subscription.rs +++ b/futures/src/subscription.rs @@ -202,8 +202,8 @@ impl<T> Subscription<T> { T: 'static, { from_recipe(Runner { - id: builder, - spawn: move |_| builder(), + data: builder, + spawn: |builder, _| builder(), }) } @@ -211,15 +211,15 @@ impl<T> Subscription<T> { /// given [`Stream`]. /// /// The `id` will be used to uniquely identify the [`Subscription`]. - pub fn run_with_id<I, S>(id: I, stream: S) -> Subscription<T> + pub fn run_with<D, S>(data: D, builder: fn(&D) -> S) -> Self where - I: Hash + 'static, + D: Hash + 'static, S: Stream<Item = T> + MaybeSend + 'static, T: 'static, { from_recipe(Runner { - id, - spawn: move |_| stream, + data: (data, builder), + spawn: |(data, builder), _| builder(data), }) } @@ -423,8 +423,8 @@ where T: 'static + MaybeSend, { from_recipe(Runner { - id, - spawn: |events| { + data: id, + spawn: |_, events| { use futures::future; use futures::stream::StreamExt; @@ -435,27 +435,27 @@ where struct Runner<I, F, S, T> where - F: FnOnce(EventStream) -> S, + F: FnOnce(&I, EventStream) -> S, S: Stream<Item = T>, { - id: I, + data: I, spawn: F, } impl<I, F, S, T> Recipe for Runner<I, F, S, T> where I: Hash + 'static, - F: FnOnce(EventStream) -> S, + F: FnOnce(&I, EventStream) -> S, S: Stream<Item = T> + MaybeSend + 'static, { type Output = T; fn hash(&self, state: &mut Hasher) { std::any::TypeId::of::<I>().hash(state); - self.id.hash(state); + self.data.hash(state); } fn stream(self: Box<Self>, input: EventStream) -> BoxStream<Self::Output> { - crate::boxed_stream((self.spawn)(input)) + crate::boxed_stream((self.spawn)(&self.data, input)) } } @@ -365,9 +365,9 @@ //! //! As with tasks, some modules expose convenient functions that build a [`Subscription`] for you—like //! [`time::every`] which can be used to listen to time, or [`keyboard::on_key_press`] which will notify you -//! of any key presses. But you can also create your own with [`Subscription::run`] and [`run_with_id`]. +//! of any key presses. But you can also create your own with [`Subscription::run`] and [`run_with`]. //! -//! [`run_with_id`]: Subscription::run_with_id +//! [`run_with`]: Subscription::run_with //! //! ## Scaling Applications //! The `update`, `view`, and `Message` triplet composes very nicely. |