From 3a07c631add426a308607055d1b46d934f21e7f6 Mon Sep 17 00:00:00 2001
From: Héctor Ramón Jiménez <hector@hecrj.dev>
Date: Fri, 24 Jan 2025 16:38:56 +0100
Subject: Implement `time::repeat` and simplify `Subscription::run_with`

---
 futures/src/subscription.rs | 26 +++++++++++++-------------
 1 file changed, 13 insertions(+), 13 deletions(-)

(limited to 'futures/src/subscription.rs')

diff --git a/futures/src/subscription.rs b/futures/src/subscription.rs
index eaea1a1f..82cba9a1 100644
--- a/futures/src/subscription.rs
+++ b/futures/src/subscription.rs
@@ -202,8 +202,8 @@ impl<T> Subscription<T> {
         T: 'static,
     {
         from_recipe(Runner {
-            id: builder,
-            spawn: move |_| builder(),
+            data: builder,
+            spawn: |builder, _| builder(),
         })
     }
 
@@ -211,15 +211,15 @@ impl<T> Subscription<T> {
     /// given [`Stream`].
     ///
     /// The `id` will be used to uniquely identify the [`Subscription`].
-    pub fn run_with_id<I, S>(id: I, stream: S) -> Subscription<T>
+    pub fn run_with<D, S>(data: D, builder: fn(&D) -> S) -> Self
     where
-        I: Hash + 'static,
+        D: Hash + 'static,
         S: Stream<Item = T> + MaybeSend + 'static,
         T: 'static,
     {
         from_recipe(Runner {
-            id,
-            spawn: move |_| stream,
+            data: (data, builder),
+            spawn: |(data, builder), _| builder(data),
         })
     }
 
@@ -423,8 +423,8 @@ where
     T: 'static + MaybeSend,
 {
     from_recipe(Runner {
-        id,
-        spawn: |events| {
+        data: id,
+        spawn: |_, events| {
             use futures::future;
             use futures::stream::StreamExt;
 
@@ -435,27 +435,27 @@ where
 
 struct Runner<I, F, S, T>
 where
-    F: FnOnce(EventStream) -> S,
+    F: FnOnce(&I, EventStream) -> S,
     S: Stream<Item = T>,
 {
-    id: I,
+    data: I,
     spawn: F,
 }
 
 impl<I, F, S, T> Recipe for Runner<I, F, S, T>
 where
     I: Hash + 'static,
-    F: FnOnce(EventStream) -> S,
+    F: FnOnce(&I, EventStream) -> S,
     S: Stream<Item = T> + MaybeSend + 'static,
 {
     type Output = T;
 
     fn hash(&self, state: &mut Hasher) {
         std::any::TypeId::of::<I>().hash(state);
-        self.id.hash(state);
+        self.data.hash(state);
     }
 
     fn stream(self: Box<Self>, input: EventStream) -> BoxStream<Self::Output> {
-        crate::boxed_stream((self.spawn)(input))
+        crate::boxed_stream((self.spawn)(&self.data, input))
     }
 }
-- 
cgit