summaryrefslogtreecommitdiffstats
path: root/futures
diff options
context:
space:
mode:
authorLibravatar Héctor Ramón Jiménez <hector@hecrj.dev>2025-01-24 16:38:56 +0100
committerLibravatar Héctor Ramón Jiménez <hector@hecrj.dev>2025-01-24 16:45:18 +0100
commit3a07c631add426a308607055d1b46d934f21e7f6 (patch)
treeb3562fa55b828d5cc64b41d10bd0820a26781613 /futures
parent75a6f32a5ef49bb8e6d16506d84b07822a33c41b (diff)
downloadiced-3a07c631add426a308607055d1b46d934f21e7f6.tar.gz
iced-3a07c631add426a308607055d1b46d934f21e7f6.tar.bz2
iced-3a07c631add426a308607055d1b46d934f21e7f6.zip
Implement `time::repeat` and simplify `Subscription::run_with`
Diffstat (limited to 'futures')
-rw-r--r--futures/src/backend/native/tokio.rs60
-rw-r--r--futures/src/subscription.rs26
2 files changed, 46 insertions, 40 deletions
diff --git a/futures/src/backend/native/tokio.rs b/futures/src/backend/native/tokio.rs
index 9dc3593d..e0be83a6 100644
--- a/futures/src/backend/native/tokio.rs
+++ b/futures/src/backend/native/tokio.rs
@@ -22,40 +22,25 @@ impl crate::Executor for Executor {
pub mod time {
//! Listen and react to time.
- use crate::subscription::{self, Hasher, Subscription};
+ use crate::core::time::{Duration, Instant};
+ use crate::stream;
+ use crate::subscription::Subscription;
+ use crate::MaybeSend;
+
+ use futures::SinkExt;
+ use std::future::Future;
/// Returns a [`Subscription`] that produces messages at a set interval.
///
/// The first message is produced after a `duration`, and then continues to
/// produce more messages every `duration` after that.
- pub fn every(
- duration: std::time::Duration,
- ) -> Subscription<std::time::Instant> {
- subscription::from_recipe(Every(duration))
- }
-
- #[derive(Debug)]
- struct Every(std::time::Duration);
-
- impl subscription::Recipe for Every {
- type Output = std::time::Instant;
-
- fn hash(&self, state: &mut Hasher) {
- use std::hash::Hash;
-
- std::any::TypeId::of::<Self>().hash(state);
- self.0.hash(state);
- }
-
- fn stream(
- self: Box<Self>,
- _input: subscription::EventStream,
- ) -> futures::stream::BoxStream<'static, Self::Output> {
+ pub fn every(duration: Duration) -> Subscription<Instant> {
+ Subscription::run_with(duration, |duration| {
use futures::stream::StreamExt;
- let start = tokio::time::Instant::now() + self.0;
+ let start = tokio::time::Instant::now() + *duration;
- let mut interval = tokio::time::interval_at(start, self.0);
+ let mut interval = tokio::time::interval_at(start, *duration);
interval.set_missed_tick_behavior(
tokio::time::MissedTickBehavior::Skip,
);
@@ -67,6 +52,27 @@ pub mod time {
};
stream.map(tokio::time::Instant::into_std).boxed()
- }
+ })
+ }
+
+ /// Returns a [`Subscription`] that runs the given async function at a
+ /// set interval; producing the result of the function as output.
+ pub fn repeat<F, T>(f: fn() -> F, interval: Duration) -> Subscription<T>
+ where
+ F: Future<Output = T> + MaybeSend + 'static,
+ T: MaybeSend + 'static,
+ {
+ Subscription::run_with((f, interval), |(f, interval)| {
+ let f = *f;
+ let interval = *interval;
+
+ stream::channel(1, move |mut output| async move {
+ loop {
+ let _ = output.send(f().await).await;
+
+ tokio::time::sleep(interval).await;
+ }
+ })
+ })
}
}
diff --git a/futures/src/subscription.rs b/futures/src/subscription.rs
index eaea1a1f..82cba9a1 100644
--- a/futures/src/subscription.rs
+++ b/futures/src/subscription.rs
@@ -202,8 +202,8 @@ impl<T> Subscription<T> {
T: 'static,
{
from_recipe(Runner {
- id: builder,
- spawn: move |_| builder(),
+ data: builder,
+ spawn: |builder, _| builder(),
})
}
@@ -211,15 +211,15 @@ impl<T> Subscription<T> {
/// given [`Stream`].
///
/// The `id` will be used to uniquely identify the [`Subscription`].
- pub fn run_with_id<I, S>(id: I, stream: S) -> Subscription<T>
+ pub fn run_with<D, S>(data: D, builder: fn(&D) -> S) -> Self
where
- I: Hash + 'static,
+ D: Hash + 'static,
S: Stream<Item = T> + MaybeSend + 'static,
T: 'static,
{
from_recipe(Runner {
- id,
- spawn: move |_| stream,
+ data: (data, builder),
+ spawn: |(data, builder), _| builder(data),
})
}
@@ -423,8 +423,8 @@ where
T: 'static + MaybeSend,
{
from_recipe(Runner {
- id,
- spawn: |events| {
+ data: id,
+ spawn: |_, events| {
use futures::future;
use futures::stream::StreamExt;
@@ -435,27 +435,27 @@ where
struct Runner<I, F, S, T>
where
- F: FnOnce(EventStream) -> S,
+ F: FnOnce(&I, EventStream) -> S,
S: Stream<Item = T>,
{
- id: I,
+ data: I,
spawn: F,
}
impl<I, F, S, T> Recipe for Runner<I, F, S, T>
where
I: Hash + 'static,
- F: FnOnce(EventStream) -> S,
+ F: FnOnce(&I, EventStream) -> S,
S: Stream<Item = T> + MaybeSend + 'static,
{
type Output = T;
fn hash(&self, state: &mut Hasher) {
std::any::TypeId::of::<I>().hash(state);
- self.id.hash(state);
+ self.data.hash(state);
}
fn stream(self: Box<Self>, input: EventStream) -> BoxStream<Self::Output> {
- crate::boxed_stream((self.spawn)(input))
+ crate::boxed_stream((self.spawn)(&self.data, input))
}
}