diff options
author | 2022-01-16 15:50:19 +0700 | |
---|---|---|
committer | 2022-01-16 15:50:19 +0700 | |
commit | dc50a2830ab553dfc5dfc28d4fd1af6b3981c656 (patch) | |
tree | 4ff8bce15f88a0ca610551fdd81afa968f7fda58 /native | |
parent | 2f557731f313ccc94b5d0ebb4ee5603624670daf (diff) | |
download | iced-dc50a2830ab553dfc5dfc28d4fd1af6b3981c656.tar.gz iced-dc50a2830ab553dfc5dfc28d4fd1af6b3981c656.tar.bz2 iced-dc50a2830ab553dfc5dfc28d4fd1af6b3981c656.zip |
Draft `websocket` example :tada:
Diffstat (limited to 'native')
-rw-r--r-- | native/src/subscription.rs | 98 |
1 files changed, 25 insertions, 73 deletions
diff --git a/native/src/subscription.rs b/native/src/subscription.rs index 85ad96fa..70cd269e 100644 --- a/native/src/subscription.rs +++ b/native/src/subscription.rs @@ -2,7 +2,6 @@ use crate::event::{self, Event}; use crate::Hasher; -use iced_futures::futures::channel::mpsc; use iced_futures::futures::{self, Future, Stream}; use iced_futures::BoxStream; @@ -59,21 +58,9 @@ pub fn events_with<Message>( where Message: 'static + Send, { - #[derive(Debug, Clone, Copy, Hash)] - struct Events(u64); - - let hash = { - use std::hash::Hasher as _; - - let mut hasher = Hasher::default(); - - f.hash(&mut hasher); - - hasher.finish() - }; - Subscription::from_recipe(Runner { - initial: Events(hash), + id: f, + initial: (), spawn: move |_, events| { use futures::future; use futures::stream::StreamExt; @@ -89,16 +76,19 @@ where /// [`Stream`] returned by the provided closure. /// /// The `initial` state will be used to uniquely identify the [`Subscription`]. -pub fn run<T, S, Message>( +pub fn run<I, T, S, Message>( + id: I, initial: T, f: impl FnOnce(T) -> S + 'static, ) -> Subscription<Message> where - Message: 'static, - T: Clone + Hash + 'static, + I: Hash + 'static, + T: 'static, S: Stream<Item = Message> + Send + 'static, + Message: 'static, { Subscription::from_recipe(Runner { + id, initial, spawn: move |initial, _| f(initial), }) @@ -108,79 +98,41 @@ where /// [`Stream`] that will call the provided closure to produce every `Message`. /// /// The `initial` state will be used to uniquely identify the [`Subscription`]. -pub fn unfold<T, Fut, Message>( +pub fn unfold<I, T, Fut, Message>( + id: I, initial: T, mut f: impl FnMut(T) -> Fut + Send + Sync + 'static, ) -> Subscription<Message> where - Message: 'static, - T: Clone + Hash + Send + 'static, - Fut: Future<Output = (Message, T)> + Send + 'static, -{ - use futures::future::FutureExt; - - run(initial, move |initial| { - futures::stream::unfold(initial, move |state| f(state).map(Some)) - }) -} - -/// Returns a [`Subscription`] that will open a channel and asynchronously run a -/// [`Stream`] that will call the provided closure to produce every `Message`. -/// -/// When the [`Subscription`] starts, an `on_ready` message will be produced -/// containing the [`mpsc::Sender`] end of the channel, which can be used by -/// the parent application to send `Input` to the running [`Subscription`]. -/// -/// The provided closure should use the [`mpsc::Receiver`] argument to await for -/// any `Input`. -/// -/// This function is really useful to create asynchronous workers with -/// bidirectional communication with a parent application. -/// -/// The `initial` state will be used to uniquely identify the [`Subscription`]. -pub fn worker<T, Fut, Message, Input>( - initial: T, - on_ready: impl FnOnce(mpsc::Sender<Input>) -> Message + 'static, - f: impl FnMut(T, &mut mpsc::Receiver<Input>) -> Fut + Send + Sync + 'static, -) -> Subscription<Message> -where - T: Clone + Hash + Send + 'static, - Fut: Future<Output = (Message, T)> + Send + 'static, - Message: Send + 'static, - Input: Send + 'static, + I: Hash + 'static, + T: Send + 'static, + Fut: Future<Output = (Option<Message>, T)> + Send + 'static, + Message: 'static + Send, { - use futures::future; + use futures::future::{self, FutureExt}; use futures::stream::StreamExt; - run(initial, move |initial| { - let (sender, receiver) = mpsc::channel(100); - - futures::stream::once(future::ready(on_ready(sender))).chain( - futures::stream::unfold( - (f, initial, receiver), - move |(mut f, state, mut receiver)| async { - let (message, state) = f(state, &mut receiver).await; - - Some((message, (f, state, receiver))) - }, - ), - ) + run(id, initial, move |initial| { + futures::stream::unfold(initial, move |state| f(state).map(Some)) + .filter_map(future::ready) }) } -struct Runner<T, F, S, Message> +struct Runner<I, T, F, S, Message> where F: FnOnce(T, EventStream) -> S, S: Stream<Item = Message>, { + id: I, initial: T, spawn: F, } -impl<T, S, F, Message> Recipe<Hasher, (Event, event::Status)> - for Runner<T, F, S, Message> +impl<I, T, S, F, Message> Recipe<Hasher, (Event, event::Status)> + for Runner<I, T, F, S, Message> where - T: Clone + Hash + 'static, + I: Hash + 'static, + T: 'static, F: FnOnce(T, EventStream) -> S, S: Stream<Item = Message> + Send + 'static, { @@ -189,7 +141,7 @@ where fn hash(&self, state: &mut Hasher) { std::any::TypeId::of::<T>().hash(state); - self.initial.hash(state); + self.id.hash(state); } fn stream(self: Box<Self>, input: EventStream) -> BoxStream<Self::Output> { |