From a6531c840b97b1d30af5153c01fda69d09f43a08 Mon Sep 17 00:00:00 2001
From: Héctor Ramón Jiménez <hector0193@gmail.com>
Date: Thu, 5 Mar 2020 02:08:53 +0100
Subject: Implement `Subscription::with`

---
 examples/panes/src/main.rs  | 13 +++++----
 futures/src/subscription.rs | 70 +++++++++++++++++++++++++++++++++++++++++++++
 2 files changed, 77 insertions(+), 6 deletions(-)

diff --git a/examples/panes/src/main.rs b/examples/panes/src/main.rs
index 50b21fc5..b34ce205 100644
--- a/examples/panes/src/main.rs
+++ b/examples/panes/src/main.rs
@@ -77,8 +77,6 @@ impl Application for Launcher {
             }
         }
 
-        dbg!(self);
-
         Command::none()
     }
 
@@ -88,11 +86,14 @@ impl Application for Launcher {
                 match example {
                     Example::Clock(clock) => clock
                         .subscription()
-                        .map(move |message| Message::Clock(pane, message)),
+                        .with(pane)
+                        .map(|(pane, message)| Message::Clock(pane, message)),
 
-                    Example::Stopwatch(stopwatch) => stopwatch
-                        .subscription()
-                        .map(move |message| Message::Stopwatch(pane, message)),
+                    Example::Stopwatch(stopwatch) => {
+                        stopwatch.subscription().with(pane).map(
+                            |(pane, message)| Message::Stopwatch(pane, message),
+                        )
+                    }
                 }
             }));
 
diff --git a/futures/src/subscription.rs b/futures/src/subscription.rs
index b68444cd..8eccb7be 100644
--- a/futures/src/subscription.rs
+++ b/futures/src/subscription.rs
@@ -72,6 +72,34 @@ where
         self.recipes
     }
 
+    /// Adds a value to the [`Subscription`] context.
+    ///
+    /// The value will be part of the identity of a [`Subscription`].
+    ///
+    /// This is necessary if you want to use multiple instances of the same
+    /// [`Subscription`] to produce different kinds of messages based on some
+    /// external data.
+    ///
+    /// [`Subscription`]: struct.Subscription.html
+    pub fn with<T>(mut self, value: T) -> Subscription<H, E, (T, O)>
+    where
+        H: 'static,
+        E: 'static,
+        O: 'static,
+        T: std::hash::Hash + Clone + Send + Sync + 'static,
+    {
+        Subscription {
+            recipes: self
+                .recipes
+                .drain(..)
+                .map(|recipe| {
+                    Box::new(With::new(recipe, value.clone()))
+                        as Box<dyn Recipe<H, E, Output = (T, O)>>
+                })
+                .collect(),
+        }
+    }
+
     /// Transforms the [`Subscription`] output with the given function.
     ///
     /// [`Subscription`]: struct.Subscription.html
@@ -187,3 +215,45 @@ where
             .boxed()
     }
 }
+
+struct With<Hasher, Event, A, B> {
+    recipe: Box<dyn Recipe<Hasher, Event, Output = A>>,
+    value: B,
+}
+
+impl<H, E, A, B> With<H, E, A, B> {
+    fn new(recipe: Box<dyn Recipe<H, E, Output = A>>, value: B) -> Self {
+        With { recipe, value }
+    }
+}
+
+impl<H, E, A, B> Recipe<H, E> for With<H, E, A, B>
+where
+    A: 'static,
+    B: 'static + std::hash::Hash + Clone + Send + Sync,
+    H: std::hash::Hasher,
+{
+    type Output = (B, A);
+
+    fn hash(&self, state: &mut H) {
+        use std::hash::Hash;
+
+        std::any::TypeId::of::<B>().hash(state);
+        self.value.hash(state);
+        self.recipe.hash(state);
+    }
+
+    fn stream(
+        self: Box<Self>,
+        input: BoxStream<'static, E>,
+    ) -> futures::stream::BoxStream<'static, Self::Output> {
+        use futures::StreamExt;
+
+        let value = self.value;
+
+        self.recipe
+            .stream(input)
+            .map(move |element| (value.clone(), element))
+            .boxed()
+    }
+}
-- 
cgit