//! Generate events asynchronously for you application. /// An event subscription. pub struct Subscription { recipes: Vec>>, } impl Subscription where H: std::hash::Hasher, { pub fn none() -> Self { Self { recipes: Vec::new(), } } pub fn from_recipe( recipe: impl Recipe + 'static, ) -> Self { Self { recipes: vec![Box::new(recipe)], } } pub fn batch( subscriptions: impl Iterator>, ) -> Self { Self { recipes: subscriptions .flat_map(|subscription| subscription.recipes) .collect(), } } pub fn recipes(self) -> Vec>> { self.recipes } pub fn map( mut self, f: impl Fn(O) -> A + Send + Sync + 'static, ) -> Subscription where H: 'static, I: 'static, O: 'static, A: 'static, { let function = std::sync::Arc::new(f); Subscription { recipes: self .recipes .drain(..) .map(|recipe| { Box::new(Map::new(recipe, function.clone())) as Box> }) .collect(), } } } impl std::fmt::Debug for Subscription { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("Subscription").finish() } } /// The connection of an event subscription. pub trait Recipe { type Output; fn hash(&self, state: &mut Hasher); fn stream( self: Box, input: Input, ) -> futures::stream::BoxStream<'static, Self::Output>; } struct Map { recipe: Box>, mapper: std::sync::Arc B + Send + Sync>, } impl Map { fn new( recipe: Box>, mapper: std::sync::Arc B + Send + Sync + 'static>, ) -> Self { Map { recipe, mapper } } } impl Recipe for Map where A: 'static, B: 'static, H: std::hash::Hasher, { type Output = B; fn hash(&self, state: &mut H) { use std::hash::Hash; std::any::TypeId::of::().hash(state); self.recipe.hash(state); } fn stream( self: Box, input: I, ) -> futures::stream::BoxStream<'static, Self::Output> { use futures::StreamExt; let mapper = self.mapper; self.recipe .stream(input) .map(move |element| mapper(element)) .boxed() } }