From 810b445f8d2f429e9ad07625f9b67dba09783d7a Mon Sep 17 00:00:00 2001 From: Héctor Ramón Jiménez Date: Fri, 14 Jan 2022 19:43:06 +0700 Subject: Rewrite `events` and `events_with` with a new `Runner` abstraction --- native/src/subscription.rs | 72 +++++++++++++++++++++++++++++++++------ native/src/subscription/events.rs | 35 ------------------- 2 files changed, 62 insertions(+), 45 deletions(-) delete mode 100644 native/src/subscription/events.rs diff --git a/native/src/subscription.rs b/native/src/subscription.rs index 2950879c..7a1c5852 100644 --- a/native/src/subscription.rs +++ b/native/src/subscription.rs @@ -1,8 +1,12 @@ //! Listen to external events in your application. use crate::event::{self, Event}; use crate::Hasher; + +use iced_futures::futures::{self, Stream}; use iced_futures::BoxStream; +use std::hash::Hash; + /// A request to listen to external events. /// /// Besides performing async actions on demand with [`Command`], most @@ -29,20 +33,14 @@ pub type Tracker = pub use iced_futures::subscription::Recipe; -mod events; - -use events::Events; - /// Returns a [`Subscription`] to all the runtime events. /// /// This subscription will notify your application of any [`Event`] that was /// not captured by any widget. pub fn events() -> Subscription { - Subscription::from_recipe(Events { - f: |event, status| match status { - event::Status::Ignored => Some(event), - event::Status::Captured => None, - }, + events_with(|event, status| match status { + event::Status::Ignored => Some(event), + event::Status::Captured => None, }) } @@ -60,5 +58,59 @@ pub fn events_with( where Message: 'static + Send, { - Subscription::from_recipe(Events { f }) + #[derive(Debug, Clone, Copy, Hash)] + struct Events(u64); + + let hash = { + use std::hash::Hasher as _; + + let mut hasher = Hasher::default(); + + f.hash(&mut hasher); + + hasher.finish() + }; + + Subscription::from_recipe(Runner { + initial: Events(hash), + spawn: move |_, events| { + use futures::future; + use futures::stream::StreamExt; + + events.filter_map(move |(event, status)| { + future::ready(f(event, status)) + }) + }, + }) +} + +struct Runner +where + F: FnOnce(T, EventStream) -> S, + S: Stream, +{ + initial: T, + spawn: F, +} + +impl Recipe + for Runner +where + T: Clone + Hash + 'static, + F: FnOnce(T, EventStream) -> S, + S: Stream + Send + 'static, +{ + type Output = Message; + + fn hash(&self, state: &mut Hasher) { + std::any::TypeId::of::().hash(state); + + self.initial.hash(state); + } + + fn stream(self: Box, input: EventStream) -> BoxStream { + use futures::stream::StreamExt; + + (self.spawn)(self.initial, input).boxed() + } } diff --git a/native/src/subscription/events.rs b/native/src/subscription/events.rs deleted file mode 100644 index ca143bb3..00000000 --- a/native/src/subscription/events.rs +++ /dev/null @@ -1,35 +0,0 @@ -use crate::event::{self, Event}; -use crate::subscription::{EventStream, Recipe}; -use crate::Hasher; -use iced_futures::futures::future; -use iced_futures::futures::StreamExt; -use iced_futures::BoxStream; - -pub struct Events { - pub(super) f: fn(Event, event::Status) -> Option, -} - -impl Recipe for Events -where - Message: 'static + Send, -{ - type Output = Message; - - fn hash(&self, state: &mut Hasher) { - use std::hash::Hash; - - struct Marker; - std::any::TypeId::of::().hash(state); - self.f.hash(state); - } - - fn stream( - self: Box, - event_stream: EventStream, - ) -> BoxStream { - let stream = event_stream.filter_map(move |(event, status)| { - future::ready((self.f)(event, status)) - }); - iced_futures::boxed_stream(stream) - } -} -- cgit From 7442d0b66f1c7b4c912ad5ac358dafcc8b07824e Mon Sep 17 00:00:00 2001 From: Héctor Ramón Jiménez Date: Fri, 14 Jan 2022 19:43:54 +0700 Subject: Implement `subscription::run` :tada: --- native/src/subscription.rs | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/native/src/subscription.rs b/native/src/subscription.rs index 7a1c5852..420797d1 100644 --- a/native/src/subscription.rs +++ b/native/src/subscription.rs @@ -84,6 +84,25 @@ where }) } +/// Returns a [`Subscription`] that will create and asynchronously run the +/// [`Stream`] returned by the provided closure. +/// +/// The `initial` state will be used to uniquely identify the [`Subscription`]. +pub fn run( + initial: T, + f: impl FnOnce(T) -> S + 'static, +) -> Subscription +where + Message: 'static, + T: Clone + Hash + 'static, + S: Stream + Send + 'static, +{ + Subscription::from_recipe(Runner { + initial, + spawn: move |initial, _| f(initial), + }) +} + struct Runner where F: FnOnce(T, EventStream) -> S, -- cgit From 2a3271dc106b702a6b81888506578ec5f845281b Mon Sep 17 00:00:00 2001 From: Héctor Ramón Jiménez Date: Fri, 14 Jan 2022 19:55:27 +0700 Subject: Implement `subscription::unfold` :tada: --- native/src/subscription.rs | 23 ++++++++++++++++++++++- 1 file changed, 22 insertions(+), 1 deletion(-) diff --git a/native/src/subscription.rs b/native/src/subscription.rs index 420797d1..9ff89ccf 100644 --- a/native/src/subscription.rs +++ b/native/src/subscription.rs @@ -2,7 +2,8 @@ use crate::event::{self, Event}; use crate::Hasher; -use iced_futures::futures::{self, Stream}; +use iced_futures::futures::channel::mpsc; +use iced_futures::futures::{self, Future, Stream}; use iced_futures::BoxStream; use std::hash::Hash; @@ -103,6 +104,26 @@ where }) } +/// Returns a [`Subscription`] that will create and asynchronously run a +/// [`Stream`] that will call the provided closure to produce every `Message`. +/// +/// The `initial` state will be used to uniquely identify the [`Subscription`]. +pub fn unfold( + initial: T, + mut f: impl FnMut(T) -> Fut + Send + Sync + 'static, +) -> Subscription +where + Message: 'static, + T: Clone + Hash + Send + 'static, + Fut: Future + Send + 'static, +{ + use futures::future::FutureExt; + + run(initial, move |initial| { + futures::stream::unfold(initial, move |state| f(state).map(Some)) + }) +} + struct Runner where F: FnOnce(T, EventStream) -> S, -- cgit From 35e4f307595cbb67687afcbc8d96ad97109210b5 Mon Sep 17 00:00:00 2001 From: Héctor Ramón Jiménez Date: Fri, 14 Jan 2022 19:55:42 +0700 Subject: Implement `subscription::worker` :tada: --- native/src/subscription.rs | 44 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 44 insertions(+) diff --git a/native/src/subscription.rs b/native/src/subscription.rs index 9ff89ccf..85ad96fa 100644 --- a/native/src/subscription.rs +++ b/native/src/subscription.rs @@ -124,6 +124,50 @@ where }) } +/// Returns a [`Subscription`] that will open a channel and asynchronously run a +/// [`Stream`] that will call the provided closure to produce every `Message`. +/// +/// When the [`Subscription`] starts, an `on_ready` message will be produced +/// containing the [`mpsc::Sender`] end of the channel, which can be used by +/// the parent application to send `Input` to the running [`Subscription`]. +/// +/// The provided closure should use the [`mpsc::Receiver`] argument to await for +/// any `Input`. +/// +/// This function is really useful to create asynchronous workers with +/// bidirectional communication with a parent application. +/// +/// The `initial` state will be used to uniquely identify the [`Subscription`]. +pub fn worker( + initial: T, + on_ready: impl FnOnce(mpsc::Sender) -> Message + 'static, + f: impl FnMut(T, &mut mpsc::Receiver) -> Fut + Send + Sync + 'static, +) -> Subscription +where + T: Clone + Hash + Send + 'static, + Fut: Future + Send + 'static, + Message: Send + 'static, + Input: Send + 'static, +{ + use futures::future; + use futures::stream::StreamExt; + + run(initial, move |initial| { + let (sender, receiver) = mpsc::channel(100); + + futures::stream::once(future::ready(on_ready(sender))).chain( + futures::stream::unfold( + (f, initial, receiver), + move |(mut f, state, mut receiver)| async { + let (message, state) = f(state, &mut receiver).await; + + Some((message, (f, state, receiver))) + }, + ), + ) + }) +} + struct Runner where F: FnOnce(T, EventStream) -> S, -- cgit From 75348c5b8cbe7f997e4c704021fdce8fd9bd862c Mon Sep 17 00:00:00 2001 From: Héctor Ramón Jiménez Date: Sat, 15 Jan 2022 11:45:19 +0700 Subject: Use `subscription::run` for `download_progress` example --- examples/download_progress/src/download.rs | 153 +++++++++++++---------------- 1 file changed, 69 insertions(+), 84 deletions(-) diff --git a/examples/download_progress/src/download.rs b/examples/download_progress/src/download.rs index 08805f13..06d5d3fd 100644 --- a/examples/download_progress/src/download.rs +++ b/examples/download_progress/src/download.rs @@ -1,112 +1,97 @@ +use futures::Stream; use iced_futures::futures; -use std::hash::{Hash, Hasher}; +use iced_native::subscription; + +use std::hash::Hash; // Just a little utility function pub fn file( id: I, url: T, ) -> iced::Subscription<(I, Progress)> { - iced::Subscription::from_recipe(Download { - id, - url: url.to_string(), - }) + subscription::run( + Download { + id, + url: url.to_string(), + }, + download, + ) } +#[derive(Debug, Hash, Clone)] pub struct Download { id: I, url: String, } -// Make sure iced can use our download stream -impl iced_native::subscription::Recipe for Download -where - T: 'static + Hash + Copy + Send, - H: Hasher, -{ - type Output = (T, Progress); - - fn hash(&self, state: &mut H) { - struct Marker; - std::any::TypeId::of::().hash(state); - - self.id.hash(state); - } - - fn stream( - self: Box, - _input: futures::stream::BoxStream<'static, I>, - ) -> futures::stream::BoxStream<'static, Self::Output> { - let id = self.id; +fn download( + download: Download, +) -> impl Stream { + let id = download.id; - Box::pin(futures::stream::unfold( - State::Ready(self.url), - move |state| async move { - match state { - State::Ready(url) => { - let response = reqwest::get(&url).await; + futures::stream::unfold( + State::Ready(download.url), + move |state| async move { + match state { + State::Ready(url) => { + let response = reqwest::get(&url).await; - match response { - Ok(response) => { - if let Some(total) = response.content_length() { - Some(( - (id, Progress::Started), - State::Downloading { - response, - total, - downloaded: 0, - }, - )) - } else { - Some(( - (id, Progress::Errored), - State::Finished, - )) - } - } - Err(_) => { + match response { + Ok(response) => { + if let Some(total) = response.content_length() { + Some(( + (id, Progress::Started), + State::Downloading { + response, + total, + downloaded: 0, + }, + )) + } else { Some(((id, Progress::Errored), State::Finished)) } } - } - State::Downloading { - mut response, - total, - downloaded, - } => match response.chunk().await { - Ok(Some(chunk)) => { - let downloaded = downloaded + chunk.len() as u64; - - let percentage = - (downloaded as f32 / total as f32) * 100.0; - - Some(( - (id, Progress::Advanced(percentage)), - State::Downloading { - response, - total, - downloaded, - }, - )) - } - Ok(None) => { - Some(((id, Progress::Finished), State::Finished)) - } Err(_) => { Some(((id, Progress::Errored), State::Finished)) } - }, - State::Finished => { - // We do not let the stream die, as it would start a - // new download repeatedly if the user is not careful - // in case of errors. - let _: () = iced::futures::future::pending().await; + } + } + State::Downloading { + mut response, + total, + downloaded, + } => match response.chunk().await { + Ok(Some(chunk)) => { + let downloaded = downloaded + chunk.len() as u64; - None + let percentage = + (downloaded as f32 / total as f32) * 100.0; + + Some(( + (id, Progress::Advanced(percentage)), + State::Downloading { + response, + total, + downloaded, + }, + )) + } + Ok(None) => { + Some(((id, Progress::Finished), State::Finished)) } + Err(_) => Some(((id, Progress::Errored), State::Finished)), + }, + State::Finished => { + // We do not let the stream die, as it would start a + // new download repeatedly if the user is not careful + // in case of errors. + let _: () = iced::futures::future::pending().await; + + None } - }, - )) - } + } + }, + ) } #[derive(Debug, Clone)] -- cgit From 2f557731f313ccc94b5d0ebb4ee5603624670daf Mon Sep 17 00:00:00 2001 From: Héctor Ramón Jiménez Date: Sat, 15 Jan 2022 11:56:19 +0700 Subject: Make `clock` example responsive --- examples/clock/src/main.rs | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/examples/clock/src/main.rs b/examples/clock/src/main.rs index de9e879a..325ccc1a 100644 --- a/examples/clock/src/main.rs +++ b/examples/clock/src/main.rs @@ -66,16 +66,12 @@ impl Application for Clock { } fn view(&mut self) -> Element { - let canvas = Canvas::new(self) - .width(Length::Units(400)) - .height(Length::Units(400)); + let canvas = Canvas::new(self).width(Length::Fill).height(Length::Fill); Container::new(canvas) .width(Length::Fill) .height(Length::Fill) .padding(20) - .center_x() - .center_y() .into() } } -- cgit From dc50a2830ab553dfc5dfc28d4fd1af6b3981c656 Mon Sep 17 00:00:00 2001 From: Héctor Ramón Jiménez Date: Sun, 16 Jan 2022 15:50:19 +0700 Subject: Draft `websocket` example :tada: --- Cargo.toml | 1 + examples/download_progress/src/download.rs | 1 + examples/websocket/Cargo.toml | 22 ++++ examples/websocket/README.md | 12 +++ examples/websocket/src/echo.rs | 146 ++++++++++++++++++++++++++ examples/websocket/src/echo/server.rs | 57 ++++++++++ examples/websocket/src/main.rs | 162 +++++++++++++++++++++++++++++ native/src/subscription.rs | 98 +++++------------ 8 files changed, 426 insertions(+), 73 deletions(-) create mode 100644 examples/websocket/Cargo.toml create mode 100644 examples/websocket/README.md create mode 100644 examples/websocket/src/echo.rs create mode 100644 examples/websocket/src/echo/server.rs create mode 100644 examples/websocket/src/main.rs diff --git a/Cargo.toml b/Cargo.toml index bb50888a..e553c78f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -88,6 +88,7 @@ members = [ "examples/tooltip", "examples/tour", "examples/url_handler", + "examples/websocket", ] [dependencies] diff --git a/examples/download_progress/src/download.rs b/examples/download_progress/src/download.rs index 06d5d3fd..20682e7a 100644 --- a/examples/download_progress/src/download.rs +++ b/examples/download_progress/src/download.rs @@ -10,6 +10,7 @@ pub fn file( url: T, ) -> iced::Subscription<(I, Progress)> { subscription::run( + id, Download { id, url: url.to_string(), diff --git a/examples/websocket/Cargo.toml b/examples/websocket/Cargo.toml new file mode 100644 index 00000000..6b4d9d10 --- /dev/null +++ b/examples/websocket/Cargo.toml @@ -0,0 +1,22 @@ +[package] +name = "websocket" +version = "0.1.0" +authors = ["Héctor Ramón Jiménez "] +edition = "2018" +publish = false + +[dependencies] +iced = { path = "../..", features = ["tokio", "debug"] } +iced_native = { path = "../../native" } +iced_futures = { path = "../../futures" } + +[dependencies.async-tungstenite] +version = "0.16" +features = ["tokio-rustls-webpki-roots"] + +[dependencies.tokio] +version = "1" +features = ["time"] + +[dependencies.warp] +version = "0.3" diff --git a/examples/websocket/README.md b/examples/websocket/README.md new file mode 100644 index 00000000..fcdee9f3 --- /dev/null +++ b/examples/websocket/README.md @@ -0,0 +1,12 @@ +## Websocket + +A simple example that keeps a WebSocket connection open to an echo server. + +The __[`main`]__ file contains all the code of the example. + +You can run it with `cargo run`: +``` +cargo run --package websocket +``` + +[`main`]: src/main.rs diff --git a/examples/websocket/src/echo.rs b/examples/websocket/src/echo.rs new file mode 100644 index 00000000..13596ddd --- /dev/null +++ b/examples/websocket/src/echo.rs @@ -0,0 +1,146 @@ +pub mod server; + +use iced_futures::futures; +use iced_native::subscription::{self, Subscription}; + +use futures::channel::mpsc; +use futures::sink::SinkExt; +use futures::stream::StreamExt; + +use async_tungstenite::tungstenite; + +pub fn connect() -> Subscription { + struct Connect; + + subscription::unfold( + std::any::TypeId::of::(), + State::Disconnected, + |state| async move { + match state { + State::Disconnected => { + const ECHO_SERVER: &str = "ws://localhost:3030"; + + match async_tungstenite::tokio::connect_async(ECHO_SERVER) + .await + { + Ok((websocket, _)) => { + let (sender, receiver) = mpsc::channel(100); + + ( + Some(Event::Connected(Connection(sender))), + State::Connected(websocket, receiver), + ) + } + Err(_) => { + let _ = tokio::time::sleep( + tokio::time::Duration::from_secs(1), + ) + .await; + + (Some(Event::Disconnected), State::Disconnected) + } + } + } + State::Connected(mut websocket, mut input) => { + let mut fused_websocket = websocket.by_ref().fuse(); + + futures::select! { + received = fused_websocket.select_next_some() => { + match received { + Ok(tungstenite::Message::Text(message)) => { + ( + Some(Event::MessageReceived(Message::User(message))), + State::Connected(websocket, input) + ) + } + Ok(_) => { + (None, State::Connected(websocket, input)) + } + Err(_) => { + (Some(Event::Disconnected), State::Disconnected) + } + } + } + + message = input.select_next_some() => { + let result = websocket.send(tungstenite::Message::Text(String::from(message))).await; + + if result.is_ok() { + (None, State::Connected(websocket, input)) + } else { + (Some(Event::Disconnected), State::Disconnected) + } + } + } + } + } + }, + ) +} + +#[derive(Debug)] +enum State { + Disconnected, + Connected( + async_tungstenite::WebSocketStream< + async_tungstenite::tokio::ConnectStream, + >, + mpsc::Receiver, + ), +} + +#[derive(Debug, Clone)] +pub enum Event { + Connected(Connection), + Disconnected, + MessageReceived(Message), +} + +#[derive(Debug, Clone)] +pub struct Connection(mpsc::Sender); + +impl Connection { + pub fn send(&mut self, message: Message) { + let _ = self + .0 + .try_send(message) + .expect("Send message to echo server"); + } +} + +#[derive(Debug, Clone)] +pub enum Message { + Connected, + Disconnected, + User(String), +} + +impl Message { + pub fn new(message: &str) -> Option { + if message.is_empty() { + None + } else { + Some(Self::User(message.to_string())) + } + } + + pub fn connected() -> Self { + Message::Connected + } + + pub fn disconnected() -> Self { + Message::Disconnected + } +} + +impl From for String { + fn from(message: Message) -> Self { + match message { + Message::Connected => String::from("Connected successfully!"), + Message::Disconnected => { + String::from("Connection lost... Retrying...") + } + Message::User(message) => message, + } + } +} diff --git a/examples/websocket/src/echo/server.rs b/examples/websocket/src/echo/server.rs new file mode 100644 index 00000000..7702d417 --- /dev/null +++ b/examples/websocket/src/echo/server.rs @@ -0,0 +1,57 @@ +use iced_futures::futures; + +use futures::channel::mpsc; +use futures::{SinkExt, StreamExt}; +use warp::ws::WebSocket; +use warp::Filter; + +// Basic WebSocket echo server adapted from: +// https://github.com/seanmonstar/warp/blob/3ff2eaf41eb5ac9321620e5a6434d5b5ec6f313f/examples/websockets_chat.rs +// +// Copyright (c) 2018-2020 Sean McArthur +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. +pub async fn run() { + let routes = warp::path::end().and(warp::ws()).map(|ws: warp::ws::Ws| { + ws.on_upgrade(move |socket| user_connected(socket)) + }); + + warp::serve(routes).run(([127, 0, 0, 1], 3030)).await; +} + +async fn user_connected(ws: WebSocket) { + let (mut user_ws_tx, mut user_ws_rx) = ws.split(); + let (mut tx, mut rx) = mpsc::unbounded(); + + tokio::task::spawn(async move { + while let Some(message) = rx.next().await { + let _ = user_ws_tx.send(message).await.unwrap_or_else(|e| { + eprintln!("websocket send error: {}", e); + }); + } + }); + + while let Some(result) = user_ws_rx.next().await { + let msg = match result { + Ok(msg) => msg, + Err(_) => break, + }; + + let _ = tx.send(msg).await; + } +} diff --git a/examples/websocket/src/main.rs b/examples/websocket/src/main.rs new file mode 100644 index 00000000..c03a9f3a --- /dev/null +++ b/examples/websocket/src/main.rs @@ -0,0 +1,162 @@ +mod echo; + +use iced::alignment::{self, Alignment}; +use iced::button::{self, Button}; +use iced::executor; +use iced::scrollable::{self, Scrollable}; +use iced::text_input::{self, TextInput}; +use iced::{ + Application, Color, Column, Command, Container, Element, Length, Row, + Settings, Subscription, Text, +}; + +pub fn main() -> iced::Result { + WebSocket::run(Settings::default()) +} + +#[derive(Default)] +struct WebSocket { + messages: Vec, + message_log: scrollable::State, + new_message: String, + new_message_state: text_input::State, + new_message_button: button::State, + state: State, +} + +#[derive(Debug, Clone)] +enum Message { + NewMessageChanged(String), + Send(echo::Message), + Echo(echo::Event), + Server, +} + +impl Application for WebSocket { + type Message = Message; + type Flags = (); + type Executor = executor::Default; + + fn new(_flags: Self::Flags) -> (Self, Command) { + ( + Self::default(), + Command::perform(echo::server::run(), |_| Message::Server), + ) + } + + fn title(&self) -> String { + String::from("WebSocket - Iced") + } + + fn update(&mut self, message: Message) -> Command { + match message { + Message::NewMessageChanged(new_message) => { + self.new_message = new_message; + } + Message::Send(message) => match &mut self.state { + State::Connected(connection) => { + self.new_message.clear(); + + connection.send(message); + } + State::Disconnected => {} + }, + Message::Echo(event) => match event { + echo::Event::Connected(connection) => { + self.state = State::Connected(connection); + + self.messages.push(echo::Message::connected()); + } + echo::Event::Disconnected => { + self.state = State::Disconnected; + + self.messages.push(echo::Message::disconnected()); + } + echo::Event::MessageReceived(message) => { + self.messages.push(message); + self.message_log.snap_to(1.0); + } + }, + Message::Server => {} + } + + Command::none() + } + + fn subscription(&self) -> Subscription { + echo::connect().map(Message::Echo) + } + + fn view(&mut self) -> Element { + let message_log = if self.messages.is_empty() { + Container::new( + Text::new("Your messages will appear here...") + .color(Color::from_rgb8(0x88, 0x88, 0x88)), + ) + .width(Length::Fill) + .height(Length::Fill) + .center_x() + .center_y() + .into() + } else { + self.messages + .iter() + .cloned() + .fold( + Scrollable::new(&mut self.message_log), + |scrollable, message| scrollable.push(Text::new(message)), + ) + .width(Length::Fill) + .height(Length::Fill) + .spacing(10) + .into() + }; + + let new_message_input = { + let mut input = TextInput::new( + &mut self.new_message_state, + "Type a message...", + &self.new_message, + Message::NewMessageChanged, + ) + .padding(10); + + let mut button = Button::new( + &mut self.new_message_button, + Text::new("Send") + .height(Length::Fill) + .vertical_alignment(alignment::Vertical::Center), + ) + .padding([0, 20]); + + if matches!(self.state, State::Connected(_)) { + if let Some(message) = echo::Message::new(&self.new_message) { + input = input.on_submit(Message::Send(message.clone())); + button = button.on_press(Message::Send(message)); + } + } + + Row::with_children(vec![input.into(), button.into()]) + .spacing(10) + .align_items(Alignment::Fill) + }; + + Column::with_children(vec![message_log, new_message_input.into()]) + .width(Length::Fill) + .height(Length::Fill) + .padding(20) + .spacing(10) + .into() + } +} + +enum State { + Disconnected, + Connected(echo::Connection), +} + +impl Default for State { + fn default() -> Self { + Self::Disconnected + } +} diff --git a/native/src/subscription.rs b/native/src/subscription.rs index 85ad96fa..70cd269e 100644 --- a/native/src/subscription.rs +++ b/native/src/subscription.rs @@ -2,7 +2,6 @@ use crate::event::{self, Event}; use crate::Hasher; -use iced_futures::futures::channel::mpsc; use iced_futures::futures::{self, Future, Stream}; use iced_futures::BoxStream; @@ -59,21 +58,9 @@ pub fn events_with( where Message: 'static + Send, { - #[derive(Debug, Clone, Copy, Hash)] - struct Events(u64); - - let hash = { - use std::hash::Hasher as _; - - let mut hasher = Hasher::default(); - - f.hash(&mut hasher); - - hasher.finish() - }; - Subscription::from_recipe(Runner { - initial: Events(hash), + id: f, + initial: (), spawn: move |_, events| { use futures::future; use futures::stream::StreamExt; @@ -89,16 +76,19 @@ where /// [`Stream`] returned by the provided closure. /// /// The `initial` state will be used to uniquely identify the [`Subscription`]. -pub fn run( +pub fn run( + id: I, initial: T, f: impl FnOnce(T) -> S + 'static, ) -> Subscription where - Message: 'static, - T: Clone + Hash + 'static, + I: Hash + 'static, + T: 'static, S: Stream + Send + 'static, + Message: 'static, { Subscription::from_recipe(Runner { + id, initial, spawn: move |initial, _| f(initial), }) @@ -108,79 +98,41 @@ where /// [`Stream`] that will call the provided closure to produce every `Message`. /// /// The `initial` state will be used to uniquely identify the [`Subscription`]. -pub fn unfold( +pub fn unfold( + id: I, initial: T, mut f: impl FnMut(T) -> Fut + Send + Sync + 'static, ) -> Subscription where - Message: 'static, - T: Clone + Hash + Send + 'static, - Fut: Future + Send + 'static, -{ - use futures::future::FutureExt; - - run(initial, move |initial| { - futures::stream::unfold(initial, move |state| f(state).map(Some)) - }) -} - -/// Returns a [`Subscription`] that will open a channel and asynchronously run a -/// [`Stream`] that will call the provided closure to produce every `Message`. -/// -/// When the [`Subscription`] starts, an `on_ready` message will be produced -/// containing the [`mpsc::Sender`] end of the channel, which can be used by -/// the parent application to send `Input` to the running [`Subscription`]. -/// -/// The provided closure should use the [`mpsc::Receiver`] argument to await for -/// any `Input`. -/// -/// This function is really useful to create asynchronous workers with -/// bidirectional communication with a parent application. -/// -/// The `initial` state will be used to uniquely identify the [`Subscription`]. -pub fn worker( - initial: T, - on_ready: impl FnOnce(mpsc::Sender) -> Message + 'static, - f: impl FnMut(T, &mut mpsc::Receiver) -> Fut + Send + Sync + 'static, -) -> Subscription -where - T: Clone + Hash + Send + 'static, - Fut: Future + Send + 'static, - Message: Send + 'static, - Input: Send + 'static, + I: Hash + 'static, + T: Send + 'static, + Fut: Future, T)> + Send + 'static, + Message: 'static + Send, { - use futures::future; + use futures::future::{self, FutureExt}; use futures::stream::StreamExt; - run(initial, move |initial| { - let (sender, receiver) = mpsc::channel(100); - - futures::stream::once(future::ready(on_ready(sender))).chain( - futures::stream::unfold( - (f, initial, receiver), - move |(mut f, state, mut receiver)| async { - let (message, state) = f(state, &mut receiver).await; - - Some((message, (f, state, receiver))) - }, - ), - ) + run(id, initial, move |initial| { + futures::stream::unfold(initial, move |state| f(state).map(Some)) + .filter_map(future::ready) }) } -struct Runner +struct Runner where F: FnOnce(T, EventStream) -> S, S: Stream, { + id: I, initial: T, spawn: F, } -impl Recipe - for Runner +impl Recipe + for Runner where - T: Clone + Hash + 'static, + I: Hash + 'static, + T: 'static, F: FnOnce(T, EventStream) -> S, S: Stream + Send + 'static, { @@ -189,7 +141,7 @@ where fn hash(&self, state: &mut Hasher) { std::any::TypeId::of::().hash(state); - self.initial.hash(state); + self.id.hash(state); } fn stream(self: Box, input: EventStream) -> BoxStream { -- cgit From 88f1168a0b58759efb622c0215edd3d6b23bd059 Mon Sep 17 00:00:00 2001 From: Héctor Ramón Jiménez Date: Sun, 16 Jan 2022 19:59:59 +0700 Subject: Update `README` file of `websocket` example --- examples/websocket/README.md | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/examples/websocket/README.md b/examples/websocket/README.md index fcdee9f3..16e983da 100644 --- a/examples/websocket/README.md +++ b/examples/websocket/README.md @@ -2,7 +2,10 @@ A simple example that keeps a WebSocket connection open to an echo server. -The __[`main`]__ file contains all the code of the example. +The example consists of 3 modules: +- [`main`] contains the `Application` logic. +- [`echo`] implements a WebSocket client for the [`echo::server`] with `async-tungstenite`. +- [`echo::server`] implements a simple WebSocket echo server with `warp`. You can run it with `cargo run`: ``` @@ -10,3 +13,5 @@ cargo run --package websocket ``` [`main`]: src/main.rs +[`echo`]: src/echo.rs +[`echo::server`]: src/echo/server.rs -- cgit From ddbbe7353bce0827160cb8d539a3114c159b3745 Mon Sep 17 00:00:00 2001 From: Héctor Ramón Jiménez Date: Mon, 17 Jan 2022 15:29:41 +0700 Subject: Simplify `run` and `unfold` helpers to build a `Subscription` --- examples/download_progress/src/download.rs | 122 ++++++++++++----------------- native/src/subscription.rs | 43 ++++------ 2 files changed, 69 insertions(+), 96 deletions(-) diff --git a/examples/download_progress/src/download.rs b/examples/download_progress/src/download.rs index 20682e7a..7db1206b 100644 --- a/examples/download_progress/src/download.rs +++ b/examples/download_progress/src/download.rs @@ -1,22 +1,15 @@ -use futures::Stream; -use iced_futures::futures; use iced_native::subscription; use std::hash::Hash; // Just a little utility function -pub fn file( +pub fn file( id: I, url: T, ) -> iced::Subscription<(I, Progress)> { - subscription::run( - id, - Download { - id, - url: url.to_string(), - }, - download, - ) + subscription::unfold(id, State::Ready(url.to_string()), move |state| { + download(id, state) + }) } #[derive(Debug, Hash, Clone)] @@ -25,74 +18,63 @@ pub struct Download { url: String, } -fn download( - download: Download, -) -> impl Stream { - let id = download.id; - - futures::stream::unfold( - State::Ready(download.url), - move |state| async move { - match state { - State::Ready(url) => { - let response = reqwest::get(&url).await; - - match response { - Ok(response) => { - if let Some(total) = response.content_length() { - Some(( - (id, Progress::Started), - State::Downloading { - response, - total, - downloaded: 0, - }, - )) - } else { - Some(((id, Progress::Errored), State::Finished)) - } - } - Err(_) => { - Some(((id, Progress::Errored), State::Finished)) - } - } - } - State::Downloading { - mut response, - total, - downloaded, - } => match response.chunk().await { - Ok(Some(chunk)) => { - let downloaded = downloaded + chunk.len() as u64; - - let percentage = - (downloaded as f32 / total as f32) * 100.0; +async fn download( + id: I, + state: State, +) -> (Option<(I, Progress)>, State) { + match state { + State::Ready(url) => { + let response = reqwest::get(&url).await; - Some(( - (id, Progress::Advanced(percentage)), + match response { + Ok(response) => { + if let Some(total) = response.content_length() { + ( + Some((id, Progress::Started)), State::Downloading { response, total, - downloaded, + downloaded: 0, }, - )) + ) + } else { + (Some((id, Progress::Errored)), State::Finished) } - Ok(None) => { - Some(((id, Progress::Finished), State::Finished)) - } - Err(_) => Some(((id, Progress::Errored), State::Finished)), - }, - State::Finished => { - // We do not let the stream die, as it would start a - // new download repeatedly if the user is not careful - // in case of errors. - let _: () = iced::futures::future::pending().await; - - None } + Err(_) => (Some((id, Progress::Errored)), State::Finished), + } + } + State::Downloading { + mut response, + total, + downloaded, + } => match response.chunk().await { + Ok(Some(chunk)) => { + let downloaded = downloaded + chunk.len() as u64; + + let percentage = (downloaded as f32 / total as f32) * 100.0; + + ( + Some((id, Progress::Advanced(percentage))), + State::Downloading { + response, + total, + downloaded, + }, + ) } + Ok(None) => (Some((id, Progress::Finished)), State::Finished), + Err(_) => (Some((id, Progress::Errored)), State::Finished), }, - ) + State::Finished => { + // We do not let the stream die, as it would start a + // new download repeatedly if the user is not careful + // in case of errors. + let _: () = iced::futures::future::pending().await; + + unreachable!() + } + } } #[derive(Debug, Clone)] diff --git a/native/src/subscription.rs b/native/src/subscription.rs index 70cd269e..ece6556e 100644 --- a/native/src/subscription.rs +++ b/native/src/subscription.rs @@ -60,8 +60,7 @@ where { Subscription::from_recipe(Runner { id: f, - initial: (), - spawn: move |_, events| { + spawn: move |events| { use futures::future; use futures::stream::StreamExt; @@ -73,31 +72,25 @@ where } /// Returns a [`Subscription`] that will create and asynchronously run the -/// [`Stream`] returned by the provided closure. +/// given [`Stream`]. /// -/// The `initial` state will be used to uniquely identify the [`Subscription`]. -pub fn run( - id: I, - initial: T, - f: impl FnOnce(T) -> S + 'static, -) -> Subscription +/// The `id` will be used to uniquely identify the [`Subscription`]. +pub fn run(id: I, stream: S) -> Subscription where I: Hash + 'static, - T: 'static, S: Stream + Send + 'static, Message: 'static, { Subscription::from_recipe(Runner { id, - initial, - spawn: move |initial, _| f(initial), + spawn: move |_| stream, }) } /// Returns a [`Subscription`] that will create and asynchronously run a /// [`Stream`] that will call the provided closure to produce every `Message`. /// -/// The `initial` state will be used to uniquely identify the [`Subscription`]. +/// The `id` will be used to uniquely identify the [`Subscription`]. pub fn unfold( id: I, initial: T, @@ -112,41 +105,39 @@ where use futures::future::{self, FutureExt}; use futures::stream::StreamExt; - run(id, initial, move |initial| { + run( + id, futures::stream::unfold(initial, move |state| f(state).map(Some)) - .filter_map(future::ready) - }) + .filter_map(future::ready), + ) } -struct Runner +struct Runner where - F: FnOnce(T, EventStream) -> S, + F: FnOnce(EventStream) -> S, S: Stream, { id: I, - initial: T, spawn: F, } -impl Recipe - for Runner +impl Recipe + for Runner where I: Hash + 'static, - T: 'static, - F: FnOnce(T, EventStream) -> S, + F: FnOnce(EventStream) -> S, S: Stream + Send + 'static, { type Output = Message; fn hash(&self, state: &mut Hasher) { - std::any::TypeId::of::().hash(state); - + std::any::TypeId::of::().hash(state); self.id.hash(state); } fn stream(self: Box, input: EventStream) -> BoxStream { use futures::stream::StreamExt; - (self.spawn)(self.initial, input).boxed() + (self.spawn)(input).boxed() } } -- cgit From 5ce8653fb51c035e0a6fe1ba7ab363018cdf107b Mon Sep 17 00:00:00 2001 From: Héctor Ramón Jiménez Date: Mon, 17 Jan 2022 15:48:37 +0700 Subject: Add worker example to docs of `subscription::unfold` --- native/src/subscription.rs | 65 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 65 insertions(+) diff --git a/native/src/subscription.rs b/native/src/subscription.rs index ece6556e..63834654 100644 --- a/native/src/subscription.rs +++ b/native/src/subscription.rs @@ -91,6 +91,71 @@ where /// [`Stream`] that will call the provided closure to produce every `Message`. /// /// The `id` will be used to uniquely identify the [`Subscription`]. +/// +/// # Creating an asynchronous worker with bidirectional communication +/// You can leverage this helper to create a [`Subscription`] that spawns +/// an asynchronous worker in the background and establish a channel of +/// communication with an `iced` application. +/// +/// You can achieve this by creating an `mpsc` channel inside the closure +/// and returning the `Sender` as a `Message` for the `Application`: +/// +/// ``` +/// use iced_native::subscription::{self, Subscription}; +/// use iced_native::futures::channel::mpsc; +/// +/// pub enum Event { +/// Ready(mpsc::Sender), +/// WorkFinished, +/// // ... +/// } +/// +/// enum Input { +/// DoSomeWork, +/// // ... +/// } +/// +/// enum State { +/// Starting, +/// Ready(mpsc::Receiver), +/// } +/// +/// fn some_worker() -> Subscription { +/// struct SomeWorker; +/// +/// subscription::unfold(std::any::TypeId::of::(), State::Starting, |state| async move { +/// match state { +/// State::Starting => { +/// // Create channel +/// let (sender, receiver) = mpsc::channel(100); +/// +/// (Some(Event::Ready(sender)), State::Ready(receiver)) +/// } +/// State::Ready(mut receiver) => { +/// use iced_native::futures::StreamExt; +/// +/// // Read next input sent from `Application` +/// let input = receiver.select_next_some().await; +/// +/// match input { +/// Input::DoSomeWork => { +/// // Do some async work... +/// +/// // Finally, we can optionally return a message to tell the +/// // `Application` the work is done +/// (Some(Event::WorkFinished), State::Ready(receiver)) +/// } +/// } +/// } +/// } +/// }) +/// } +/// ``` +/// +/// Check out the [`websocket`] example, which showcases this pattern to maintain a WebSocket +/// connection open. +/// +/// [`websocket`]: https://github.com/iced-rs/iced/tree/0.4/examples/websocket pub fn unfold( id: I, initial: T, -- cgit