summaryrefslogtreecommitdiffstats
path: root/futures
diff options
context:
space:
mode:
Diffstat (limited to 'futures')
-rw-r--r--futures/src/subscription.rs70
1 files changed, 70 insertions, 0 deletions
diff --git a/futures/src/subscription.rs b/futures/src/subscription.rs
index b68444cd..8eccb7be 100644
--- a/futures/src/subscription.rs
+++ b/futures/src/subscription.rs
@@ -72,6 +72,34 @@ where
self.recipes
}
+ /// Adds a value to the [`Subscription`] context.
+ ///
+ /// The value will be part of the identity of a [`Subscription`].
+ ///
+ /// This is necessary if you want to use multiple instances of the same
+ /// [`Subscription`] to produce different kinds of messages based on some
+ /// external data.
+ ///
+ /// [`Subscription`]: struct.Subscription.html
+ pub fn with<T>(mut self, value: T) -> Subscription<H, E, (T, O)>
+ where
+ H: 'static,
+ E: 'static,
+ O: 'static,
+ T: std::hash::Hash + Clone + Send + Sync + 'static,
+ {
+ Subscription {
+ recipes: self
+ .recipes
+ .drain(..)
+ .map(|recipe| {
+ Box::new(With::new(recipe, value.clone()))
+ as Box<dyn Recipe<H, E, Output = (T, O)>>
+ })
+ .collect(),
+ }
+ }
+
/// Transforms the [`Subscription`] output with the given function.
///
/// [`Subscription`]: struct.Subscription.html
@@ -187,3 +215,45 @@ where
.boxed()
}
}
+
+struct With<Hasher, Event, A, B> {
+ recipe: Box<dyn Recipe<Hasher, Event, Output = A>>,
+ value: B,
+}
+
+impl<H, E, A, B> With<H, E, A, B> {
+ fn new(recipe: Box<dyn Recipe<H, E, Output = A>>, value: B) -> Self {
+ With { recipe, value }
+ }
+}
+
+impl<H, E, A, B> Recipe<H, E> for With<H, E, A, B>
+where
+ A: 'static,
+ B: 'static + std::hash::Hash + Clone + Send + Sync,
+ H: std::hash::Hasher,
+{
+ type Output = (B, A);
+
+ fn hash(&self, state: &mut H) {
+ use std::hash::Hash;
+
+ std::any::TypeId::of::<B>().hash(state);
+ self.value.hash(state);
+ self.recipe.hash(state);
+ }
+
+ fn stream(
+ self: Box<Self>,
+ input: BoxStream<'static, E>,
+ ) -> futures::stream::BoxStream<'static, Self::Output> {
+ use futures::StreamExt;
+
+ let value = self.value;
+
+ self.recipe
+ .stream(input)
+ .map(move |element| (value.clone(), element))
+ .boxed()
+ }
+}