From ddbbe7353bce0827160cb8d539a3114c159b3745 Mon Sep 17 00:00:00 2001 From: Héctor Ramón Jiménez Date: Mon, 17 Jan 2022 15:29:41 +0700 Subject: Simplify `run` and `unfold` helpers to build a `Subscription` --- native/src/subscription.rs | 43 +++++++++++++++++-------------------------- 1 file changed, 17 insertions(+), 26 deletions(-) (limited to 'native') diff --git a/native/src/subscription.rs b/native/src/subscription.rs index 70cd269e..ece6556e 100644 --- a/native/src/subscription.rs +++ b/native/src/subscription.rs @@ -60,8 +60,7 @@ where { Subscription::from_recipe(Runner { id: f, - initial: (), - spawn: move |_, events| { + spawn: move |events| { use futures::future; use futures::stream::StreamExt; @@ -73,31 +72,25 @@ where } /// Returns a [`Subscription`] that will create and asynchronously run the -/// [`Stream`] returned by the provided closure. +/// given [`Stream`]. /// -/// The `initial` state will be used to uniquely identify the [`Subscription`]. -pub fn run( - id: I, - initial: T, - f: impl FnOnce(T) -> S + 'static, -) -> Subscription +/// The `id` will be used to uniquely identify the [`Subscription`]. +pub fn run(id: I, stream: S) -> Subscription where I: Hash + 'static, - T: 'static, S: Stream + Send + 'static, Message: 'static, { Subscription::from_recipe(Runner { id, - initial, - spawn: move |initial, _| f(initial), + spawn: move |_| stream, }) } /// Returns a [`Subscription`] that will create and asynchronously run a /// [`Stream`] that will call the provided closure to produce every `Message`. /// -/// The `initial` state will be used to uniquely identify the [`Subscription`]. +/// The `id` will be used to uniquely identify the [`Subscription`]. pub fn unfold( id: I, initial: T, @@ -112,41 +105,39 @@ where use futures::future::{self, FutureExt}; use futures::stream::StreamExt; - run(id, initial, move |initial| { + run( + id, futures::stream::unfold(initial, move |state| f(state).map(Some)) - .filter_map(future::ready) - }) + .filter_map(future::ready), + ) } -struct Runner +struct Runner where - F: FnOnce(T, EventStream) -> S, + F: FnOnce(EventStream) -> S, S: Stream, { id: I, - initial: T, spawn: F, } -impl Recipe - for Runner +impl Recipe + for Runner where I: Hash + 'static, - T: 'static, - F: FnOnce(T, EventStream) -> S, + F: FnOnce(EventStream) -> S, S: Stream + Send + 'static, { type Output = Message; fn hash(&self, state: &mut Hasher) { - std::any::TypeId::of::().hash(state); - + std::any::TypeId::of::().hash(state); self.id.hash(state); } fn stream(self: Box, input: EventStream) -> BoxStream { use futures::stream::StreamExt; - (self.spawn)(self.initial, input).boxed() + (self.spawn)(input).boxed() } } -- cgit