From dc50a2830ab553dfc5dfc28d4fd1af6b3981c656 Mon Sep 17 00:00:00 2001 From: Héctor Ramón Jiménez Date: Sun, 16 Jan 2022 15:50:19 +0700 Subject: Draft `websocket` example :tada: --- native/src/subscription.rs | 98 ++++++++++++---------------------------------- 1 file changed, 25 insertions(+), 73 deletions(-) (limited to 'native') diff --git a/native/src/subscription.rs b/native/src/subscription.rs index 85ad96fa..70cd269e 100644 --- a/native/src/subscription.rs +++ b/native/src/subscription.rs @@ -2,7 +2,6 @@ use crate::event::{self, Event}; use crate::Hasher; -use iced_futures::futures::channel::mpsc; use iced_futures::futures::{self, Future, Stream}; use iced_futures::BoxStream; @@ -59,21 +58,9 @@ pub fn events_with( where Message: 'static + Send, { - #[derive(Debug, Clone, Copy, Hash)] - struct Events(u64); - - let hash = { - use std::hash::Hasher as _; - - let mut hasher = Hasher::default(); - - f.hash(&mut hasher); - - hasher.finish() - }; - Subscription::from_recipe(Runner { - initial: Events(hash), + id: f, + initial: (), spawn: move |_, events| { use futures::future; use futures::stream::StreamExt; @@ -89,16 +76,19 @@ where /// [`Stream`] returned by the provided closure. /// /// The `initial` state will be used to uniquely identify the [`Subscription`]. -pub fn run( +pub fn run( + id: I, initial: T, f: impl FnOnce(T) -> S + 'static, ) -> Subscription where - Message: 'static, - T: Clone + Hash + 'static, + I: Hash + 'static, + T: 'static, S: Stream + Send + 'static, + Message: 'static, { Subscription::from_recipe(Runner { + id, initial, spawn: move |initial, _| f(initial), }) @@ -108,79 +98,41 @@ where /// [`Stream`] that will call the provided closure to produce every `Message`. /// /// The `initial` state will be used to uniquely identify the [`Subscription`]. -pub fn unfold( +pub fn unfold( + id: I, initial: T, mut f: impl FnMut(T) -> Fut + Send + Sync + 'static, ) -> Subscription where - Message: 'static, - T: Clone + Hash + Send + 'static, - Fut: Future + Send + 'static, -{ - use futures::future::FutureExt; - - run(initial, move |initial| { - futures::stream::unfold(initial, move |state| f(state).map(Some)) - }) -} - -/// Returns a [`Subscription`] that will open a channel and asynchronously run a -/// [`Stream`] that will call the provided closure to produce every `Message`. -/// -/// When the [`Subscription`] starts, an `on_ready` message will be produced -/// containing the [`mpsc::Sender`] end of the channel, which can be used by -/// the parent application to send `Input` to the running [`Subscription`]. -/// -/// The provided closure should use the [`mpsc::Receiver`] argument to await for -/// any `Input`. -/// -/// This function is really useful to create asynchronous workers with -/// bidirectional communication with a parent application. -/// -/// The `initial` state will be used to uniquely identify the [`Subscription`]. -pub fn worker( - initial: T, - on_ready: impl FnOnce(mpsc::Sender) -> Message + 'static, - f: impl FnMut(T, &mut mpsc::Receiver) -> Fut + Send + Sync + 'static, -) -> Subscription -where - T: Clone + Hash + Send + 'static, - Fut: Future + Send + 'static, - Message: Send + 'static, - Input: Send + 'static, + I: Hash + 'static, + T: Send + 'static, + Fut: Future, T)> + Send + 'static, + Message: 'static + Send, { - use futures::future; + use futures::future::{self, FutureExt}; use futures::stream::StreamExt; - run(initial, move |initial| { - let (sender, receiver) = mpsc::channel(100); - - futures::stream::once(future::ready(on_ready(sender))).chain( - futures::stream::unfold( - (f, initial, receiver), - move |(mut f, state, mut receiver)| async { - let (message, state) = f(state, &mut receiver).await; - - Some((message, (f, state, receiver))) - }, - ), - ) + run(id, initial, move |initial| { + futures::stream::unfold(initial, move |state| f(state).map(Some)) + .filter_map(future::ready) }) } -struct Runner +struct Runner where F: FnOnce(T, EventStream) -> S, S: Stream, { + id: I, initial: T, spawn: F, } -impl Recipe - for Runner +impl Recipe + for Runner where - T: Clone + Hash + 'static, + I: Hash + 'static, + T: 'static, F: FnOnce(T, EventStream) -> S, S: Stream + Send + 'static, { @@ -189,7 +141,7 @@ where fn hash(&self, state: &mut Hasher) { std::any::TypeId::of::().hash(state); - self.initial.hash(state); + self.id.hash(state); } fn stream(self: Box, input: EventStream) -> BoxStream { -- cgit