From a6531c840b97b1d30af5153c01fda69d09f43a08 Mon Sep 17 00:00:00 2001 From: Héctor Ramón Jiménez Date: Thu, 5 Mar 2020 02:08:53 +0100 Subject: Implement `Subscription::with` --- futures/src/subscription.rs | 70 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 70 insertions(+) (limited to 'futures') diff --git a/futures/src/subscription.rs b/futures/src/subscription.rs index b68444cd..8eccb7be 100644 --- a/futures/src/subscription.rs +++ b/futures/src/subscription.rs @@ -72,6 +72,34 @@ where self.recipes } + /// Adds a value to the [`Subscription`] context. + /// + /// The value will be part of the identity of a [`Subscription`]. + /// + /// This is necessary if you want to use multiple instances of the same + /// [`Subscription`] to produce different kinds of messages based on some + /// external data. + /// + /// [`Subscription`]: struct.Subscription.html + pub fn with(mut self, value: T) -> Subscription + where + H: 'static, + E: 'static, + O: 'static, + T: std::hash::Hash + Clone + Send + Sync + 'static, + { + Subscription { + recipes: self + .recipes + .drain(..) + .map(|recipe| { + Box::new(With::new(recipe, value.clone())) + as Box> + }) + .collect(), + } + } + /// Transforms the [`Subscription`] output with the given function. /// /// [`Subscription`]: struct.Subscription.html @@ -187,3 +215,45 @@ where .boxed() } } + +struct With { + recipe: Box>, + value: B, +} + +impl With { + fn new(recipe: Box>, value: B) -> Self { + With { recipe, value } + } +} + +impl Recipe for With +where + A: 'static, + B: 'static + std::hash::Hash + Clone + Send + Sync, + H: std::hash::Hasher, +{ + type Output = (B, A); + + fn hash(&self, state: &mut H) { + use std::hash::Hash; + + std::any::TypeId::of::().hash(state); + self.value.hash(state); + self.recipe.hash(state); + } + + fn stream( + self: Box, + input: BoxStream<'static, E>, + ) -> futures::stream::BoxStream<'static, Self::Output> { + use futures::StreamExt; + + let value = self.value; + + self.recipe + .stream(input) + .map(move |element| (value.clone(), element)) + .boxed() + } +} -- cgit