diff options
Diffstat (limited to 'futures')
| -rw-r--r-- | futures/src/subscription.rs | 70 | 
1 files changed, 70 insertions, 0 deletions
| diff --git a/futures/src/subscription.rs b/futures/src/subscription.rs index b68444cd..8eccb7be 100644 --- a/futures/src/subscription.rs +++ b/futures/src/subscription.rs @@ -72,6 +72,34 @@ where          self.recipes      } +    /// Adds a value to the [`Subscription`] context. +    /// +    /// The value will be part of the identity of a [`Subscription`]. +    /// +    /// This is necessary if you want to use multiple instances of the same +    /// [`Subscription`] to produce different kinds of messages based on some +    /// external data. +    /// +    /// [`Subscription`]: struct.Subscription.html +    pub fn with<T>(mut self, value: T) -> Subscription<H, E, (T, O)> +    where +        H: 'static, +        E: 'static, +        O: 'static, +        T: std::hash::Hash + Clone + Send + Sync + 'static, +    { +        Subscription { +            recipes: self +                .recipes +                .drain(..) +                .map(|recipe| { +                    Box::new(With::new(recipe, value.clone())) +                        as Box<dyn Recipe<H, E, Output = (T, O)>> +                }) +                .collect(), +        } +    } +      /// Transforms the [`Subscription`] output with the given function.      ///      /// [`Subscription`]: struct.Subscription.html @@ -187,3 +215,45 @@ where              .boxed()      }  } + +struct With<Hasher, Event, A, B> { +    recipe: Box<dyn Recipe<Hasher, Event, Output = A>>, +    value: B, +} + +impl<H, E, A, B> With<H, E, A, B> { +    fn new(recipe: Box<dyn Recipe<H, E, Output = A>>, value: B) -> Self { +        With { recipe, value } +    } +} + +impl<H, E, A, B> Recipe<H, E> for With<H, E, A, B> +where +    A: 'static, +    B: 'static + std::hash::Hash + Clone + Send + Sync, +    H: std::hash::Hasher, +{ +    type Output = (B, A); + +    fn hash(&self, state: &mut H) { +        use std::hash::Hash; + +        std::any::TypeId::of::<B>().hash(state); +        self.value.hash(state); +        self.recipe.hash(state); +    } + +    fn stream( +        self: Box<Self>, +        input: BoxStream<'static, E>, +    ) -> futures::stream::BoxStream<'static, Self::Output> { +        use futures::StreamExt; + +        let value = self.value; + +        self.recipe +            .stream(input) +            .map(move |element| (value.clone(), element)) +            .boxed() +    } +} | 
