summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--examples/download_progress/src/download.rs122
-rw-r--r--native/src/subscription.rs43
2 files changed, 69 insertions, 96 deletions
diff --git a/examples/download_progress/src/download.rs b/examples/download_progress/src/download.rs
index 20682e7a..7db1206b 100644
--- a/examples/download_progress/src/download.rs
+++ b/examples/download_progress/src/download.rs
@@ -1,22 +1,15 @@
-use futures::Stream;
-use iced_futures::futures;
use iced_native::subscription;
use std::hash::Hash;
// Just a little utility function
-pub fn file<I: 'static + Hash + Copy + Send, T: ToString>(
+pub fn file<I: 'static + Hash + Copy + Send + Sync, T: ToString>(
id: I,
url: T,
) -> iced::Subscription<(I, Progress)> {
- subscription::run(
- id,
- Download {
- id,
- url: url.to_string(),
- },
- download,
- )
+ subscription::unfold(id, State::Ready(url.to_string()), move |state| {
+ download(id, state)
+ })
}
#[derive(Debug, Hash, Clone)]
@@ -25,74 +18,63 @@ pub struct Download<I> {
url: String,
}
-fn download<I: Copy>(
- download: Download<I>,
-) -> impl Stream<Item = (I, Progress)> {
- let id = download.id;
-
- futures::stream::unfold(
- State::Ready(download.url),
- move |state| async move {
- match state {
- State::Ready(url) => {
- let response = reqwest::get(&url).await;
-
- match response {
- Ok(response) => {
- if let Some(total) = response.content_length() {
- Some((
- (id, Progress::Started),
- State::Downloading {
- response,
- total,
- downloaded: 0,
- },
- ))
- } else {
- Some(((id, Progress::Errored), State::Finished))
- }
- }
- Err(_) => {
- Some(((id, Progress::Errored), State::Finished))
- }
- }
- }
- State::Downloading {
- mut response,
- total,
- downloaded,
- } => match response.chunk().await {
- Ok(Some(chunk)) => {
- let downloaded = downloaded + chunk.len() as u64;
-
- let percentage =
- (downloaded as f32 / total as f32) * 100.0;
+async fn download<I: Copy>(
+ id: I,
+ state: State,
+) -> (Option<(I, Progress)>, State) {
+ match state {
+ State::Ready(url) => {
+ let response = reqwest::get(&url).await;
- Some((
- (id, Progress::Advanced(percentage)),
+ match response {
+ Ok(response) => {
+ if let Some(total) = response.content_length() {
+ (
+ Some((id, Progress::Started)),
State::Downloading {
response,
total,
- downloaded,
+ downloaded: 0,
},
- ))
+ )
+ } else {
+ (Some((id, Progress::Errored)), State::Finished)
}
- Ok(None) => {
- Some(((id, Progress::Finished), State::Finished))
- }
- Err(_) => Some(((id, Progress::Errored), State::Finished)),
- },
- State::Finished => {
- // We do not let the stream die, as it would start a
- // new download repeatedly if the user is not careful
- // in case of errors.
- let _: () = iced::futures::future::pending().await;
-
- None
}
+ Err(_) => (Some((id, Progress::Errored)), State::Finished),
+ }
+ }
+ State::Downloading {
+ mut response,
+ total,
+ downloaded,
+ } => match response.chunk().await {
+ Ok(Some(chunk)) => {
+ let downloaded = downloaded + chunk.len() as u64;
+
+ let percentage = (downloaded as f32 / total as f32) * 100.0;
+
+ (
+ Some((id, Progress::Advanced(percentage))),
+ State::Downloading {
+ response,
+ total,
+ downloaded,
+ },
+ )
}
+ Ok(None) => (Some((id, Progress::Finished)), State::Finished),
+ Err(_) => (Some((id, Progress::Errored)), State::Finished),
},
- )
+ State::Finished => {
+ // We do not let the stream die, as it would start a
+ // new download repeatedly if the user is not careful
+ // in case of errors.
+ let _: () = iced::futures::future::pending().await;
+
+ unreachable!()
+ }
+ }
}
#[derive(Debug, Clone)]
diff --git a/native/src/subscription.rs b/native/src/subscription.rs
index 70cd269e..ece6556e 100644
--- a/native/src/subscription.rs
+++ b/native/src/subscription.rs
@@ -60,8 +60,7 @@ where
{
Subscription::from_recipe(Runner {
id: f,
- initial: (),
- spawn: move |_, events| {
+ spawn: move |events| {
use futures::future;
use futures::stream::StreamExt;
@@ -73,31 +72,25 @@ where
}
/// Returns a [`Subscription`] that will create and asynchronously run the
-/// [`Stream`] returned by the provided closure.
+/// given [`Stream`].
///
-/// The `initial` state will be used to uniquely identify the [`Subscription`].
-pub fn run<I, T, S, Message>(
- id: I,
- initial: T,
- f: impl FnOnce(T) -> S + 'static,
-) -> Subscription<Message>
+/// The `id` will be used to uniquely identify the [`Subscription`].
+pub fn run<I, S, Message>(id: I, stream: S) -> Subscription<Message>
where
I: Hash + 'static,
- T: 'static,
S: Stream<Item = Message> + Send + 'static,
Message: 'static,
{
Subscription::from_recipe(Runner {
id,
- initial,
- spawn: move |initial, _| f(initial),
+ spawn: move |_| stream,
})
}
/// Returns a [`Subscription`] that will create and asynchronously run a
/// [`Stream`] that will call the provided closure to produce every `Message`.
///
-/// The `initial` state will be used to uniquely identify the [`Subscription`].
+/// The `id` will be used to uniquely identify the [`Subscription`].
pub fn unfold<I, T, Fut, Message>(
id: I,
initial: T,
@@ -112,41 +105,39 @@ where
use futures::future::{self, FutureExt};
use futures::stream::StreamExt;
- run(id, initial, move |initial| {
+ run(
+ id,
futures::stream::unfold(initial, move |state| f(state).map(Some))
- .filter_map(future::ready)
- })
+ .filter_map(future::ready),
+ )
}
-struct Runner<I, T, F, S, Message>
+struct Runner<I, F, S, Message>
where
- F: FnOnce(T, EventStream) -> S,
+ F: FnOnce(EventStream) -> S,
S: Stream<Item = Message>,
{
id: I,
- initial: T,
spawn: F,
}
-impl<I, T, S, F, Message> Recipe<Hasher, (Event, event::Status)>
- for Runner<I, T, F, S, Message>
+impl<I, S, F, Message> Recipe<Hasher, (Event, event::Status)>
+ for Runner<I, F, S, Message>
where
I: Hash + 'static,
- T: 'static,
- F: FnOnce(T, EventStream) -> S,
+ F: FnOnce(EventStream) -> S,
S: Stream<Item = Message> + Send + 'static,
{
type Output = Message;
fn hash(&self, state: &mut Hasher) {
- std::any::TypeId::of::<T>().hash(state);
-
+ std::any::TypeId::of::<I>().hash(state);
self.id.hash(state);
}
fn stream(self: Box<Self>, input: EventStream) -> BoxStream<Self::Output> {
use futures::stream::StreamExt;
- (self.spawn)(self.initial, input).boxed()
+ (self.spawn)(input).boxed()
}
}