diff options
author | 2022-01-17 15:29:41 +0700 | |
---|---|---|
committer | 2022-01-17 15:29:41 +0700 | |
commit | ddbbe7353bce0827160cb8d539a3114c159b3745 (patch) | |
tree | e73bbb4280f489770cca745985b860620e9911cc | |
parent | 88f1168a0b58759efb622c0215edd3d6b23bd059 (diff) | |
download | iced-ddbbe7353bce0827160cb8d539a3114c159b3745.tar.gz iced-ddbbe7353bce0827160cb8d539a3114c159b3745.tar.bz2 iced-ddbbe7353bce0827160cb8d539a3114c159b3745.zip |
Simplify `run` and `unfold` helpers to build a `Subscription`
-rw-r--r-- | examples/download_progress/src/download.rs | 122 | ||||
-rw-r--r-- | native/src/subscription.rs | 43 |
2 files changed, 69 insertions, 96 deletions
diff --git a/examples/download_progress/src/download.rs b/examples/download_progress/src/download.rs index 20682e7a..7db1206b 100644 --- a/examples/download_progress/src/download.rs +++ b/examples/download_progress/src/download.rs @@ -1,22 +1,15 @@ -use futures::Stream; -use iced_futures::futures; use iced_native::subscription; use std::hash::Hash; // Just a little utility function -pub fn file<I: 'static + Hash + Copy + Send, T: ToString>( +pub fn file<I: 'static + Hash + Copy + Send + Sync, T: ToString>( id: I, url: T, ) -> iced::Subscription<(I, Progress)> { - subscription::run( - id, - Download { - id, - url: url.to_string(), - }, - download, - ) + subscription::unfold(id, State::Ready(url.to_string()), move |state| { + download(id, state) + }) } #[derive(Debug, Hash, Clone)] @@ -25,74 +18,63 @@ pub struct Download<I> { url: String, } -fn download<I: Copy>( - download: Download<I>, -) -> impl Stream<Item = (I, Progress)> { - let id = download.id; - - futures::stream::unfold( - State::Ready(download.url), - move |state| async move { - match state { - State::Ready(url) => { - let response = reqwest::get(&url).await; - - match response { - Ok(response) => { - if let Some(total) = response.content_length() { - Some(( - (id, Progress::Started), - State::Downloading { - response, - total, - downloaded: 0, - }, - )) - } else { - Some(((id, Progress::Errored), State::Finished)) - } - } - Err(_) => { - Some(((id, Progress::Errored), State::Finished)) - } - } - } - State::Downloading { - mut response, - total, - downloaded, - } => match response.chunk().await { - Ok(Some(chunk)) => { - let downloaded = downloaded + chunk.len() as u64; - - let percentage = - (downloaded as f32 / total as f32) * 100.0; +async fn download<I: Copy>( + id: I, + state: State, +) -> (Option<(I, Progress)>, State) { + match state { + State::Ready(url) => { + let response = reqwest::get(&url).await; - Some(( - (id, Progress::Advanced(percentage)), + match response { + Ok(response) => { + if let Some(total) = response.content_length() { + ( + Some((id, Progress::Started)), State::Downloading { response, total, - downloaded, + downloaded: 0, }, - )) + ) + } else { + (Some((id, Progress::Errored)), State::Finished) } - Ok(None) => { - Some(((id, Progress::Finished), State::Finished)) - } - Err(_) => Some(((id, Progress::Errored), State::Finished)), - }, - State::Finished => { - // We do not let the stream die, as it would start a - // new download repeatedly if the user is not careful - // in case of errors. - let _: () = iced::futures::future::pending().await; - - None } + Err(_) => (Some((id, Progress::Errored)), State::Finished), + } + } + State::Downloading { + mut response, + total, + downloaded, + } => match response.chunk().await { + Ok(Some(chunk)) => { + let downloaded = downloaded + chunk.len() as u64; + + let percentage = (downloaded as f32 / total as f32) * 100.0; + + ( + Some((id, Progress::Advanced(percentage))), + State::Downloading { + response, + total, + downloaded, + }, + ) } + Ok(None) => (Some((id, Progress::Finished)), State::Finished), + Err(_) => (Some((id, Progress::Errored)), State::Finished), }, - ) + State::Finished => { + // We do not let the stream die, as it would start a + // new download repeatedly if the user is not careful + // in case of errors. + let _: () = iced::futures::future::pending().await; + + unreachable!() + } + } } #[derive(Debug, Clone)] diff --git a/native/src/subscription.rs b/native/src/subscription.rs index 70cd269e..ece6556e 100644 --- a/native/src/subscription.rs +++ b/native/src/subscription.rs @@ -60,8 +60,7 @@ where { Subscription::from_recipe(Runner { id: f, - initial: (), - spawn: move |_, events| { + spawn: move |events| { use futures::future; use futures::stream::StreamExt; @@ -73,31 +72,25 @@ where } /// Returns a [`Subscription`] that will create and asynchronously run the -/// [`Stream`] returned by the provided closure. +/// given [`Stream`]. /// -/// The `initial` state will be used to uniquely identify the [`Subscription`]. -pub fn run<I, T, S, Message>( - id: I, - initial: T, - f: impl FnOnce(T) -> S + 'static, -) -> Subscription<Message> +/// The `id` will be used to uniquely identify the [`Subscription`]. +pub fn run<I, S, Message>(id: I, stream: S) -> Subscription<Message> where I: Hash + 'static, - T: 'static, S: Stream<Item = Message> + Send + 'static, Message: 'static, { Subscription::from_recipe(Runner { id, - initial, - spawn: move |initial, _| f(initial), + spawn: move |_| stream, }) } /// Returns a [`Subscription`] that will create and asynchronously run a /// [`Stream`] that will call the provided closure to produce every `Message`. /// -/// The `initial` state will be used to uniquely identify the [`Subscription`]. +/// The `id` will be used to uniquely identify the [`Subscription`]. pub fn unfold<I, T, Fut, Message>( id: I, initial: T, @@ -112,41 +105,39 @@ where use futures::future::{self, FutureExt}; use futures::stream::StreamExt; - run(id, initial, move |initial| { + run( + id, futures::stream::unfold(initial, move |state| f(state).map(Some)) - .filter_map(future::ready) - }) + .filter_map(future::ready), + ) } -struct Runner<I, T, F, S, Message> +struct Runner<I, F, S, Message> where - F: FnOnce(T, EventStream) -> S, + F: FnOnce(EventStream) -> S, S: Stream<Item = Message>, { id: I, - initial: T, spawn: F, } -impl<I, T, S, F, Message> Recipe<Hasher, (Event, event::Status)> - for Runner<I, T, F, S, Message> +impl<I, S, F, Message> Recipe<Hasher, (Event, event::Status)> + for Runner<I, F, S, Message> where I: Hash + 'static, - T: 'static, - F: FnOnce(T, EventStream) -> S, + F: FnOnce(EventStream) -> S, S: Stream<Item = Message> + Send + 'static, { type Output = Message; fn hash(&self, state: &mut Hasher) { - std::any::TypeId::of::<T>().hash(state); - + std::any::TypeId::of::<I>().hash(state); self.id.hash(state); } fn stream(self: Box<Self>, input: EventStream) -> BoxStream<Self::Output> { use futures::stream::StreamExt; - (self.spawn)(self.initial, input).boxed() + (self.spawn)(input).boxed() } } |