diff options
-rw-r--r-- | Cargo.toml | 1 | ||||
-rw-r--r-- | examples/clock/src/main.rs | 6 | ||||
-rw-r--r-- | examples/download_progress/src/download.rs | 142 | ||||
-rw-r--r-- | examples/websocket/Cargo.toml | 22 | ||||
-rw-r--r-- | examples/websocket/README.md | 17 | ||||
-rw-r--r-- | examples/websocket/src/echo.rs | 146 | ||||
-rw-r--r-- | examples/websocket/src/echo/server.rs | 57 | ||||
-rw-r--r-- | examples/websocket/src/main.rs | 162 | ||||
-rw-r--r-- | native/src/subscription.rs | 164 | ||||
-rw-r--r-- | native/src/subscription/events.rs | 35 |
10 files changed, 615 insertions, 137 deletions
@@ -88,6 +88,7 @@ members = [ "examples/tooltip", "examples/tour", "examples/url_handler", + "examples/websocket", ] [dependencies] diff --git a/examples/clock/src/main.rs b/examples/clock/src/main.rs index de9e879a..325ccc1a 100644 --- a/examples/clock/src/main.rs +++ b/examples/clock/src/main.rs @@ -66,16 +66,12 @@ impl Application for Clock { } fn view(&mut self) -> Element<Message> { - let canvas = Canvas::new(self) - .width(Length::Units(400)) - .height(Length::Units(400)); + let canvas = Canvas::new(self).width(Length::Fill).height(Length::Fill); Container::new(canvas) .width(Length::Fill) .height(Length::Fill) .padding(20) - .center_x() - .center_y() .into() } } diff --git a/examples/download_progress/src/download.rs b/examples/download_progress/src/download.rs index 08805f13..7db1206b 100644 --- a/examples/download_progress/src/download.rs +++ b/examples/download_progress/src/download.rs @@ -1,111 +1,79 @@ -use iced_futures::futures; -use std::hash::{Hash, Hasher}; +use iced_native::subscription; + +use std::hash::Hash; // Just a little utility function -pub fn file<I: 'static + Hash + Copy + Send, T: ToString>( +pub fn file<I: 'static + Hash + Copy + Send + Sync, T: ToString>( id: I, url: T, ) -> iced::Subscription<(I, Progress)> { - iced::Subscription::from_recipe(Download { - id, - url: url.to_string(), + subscription::unfold(id, State::Ready(url.to_string()), move |state| { + download(id, state) }) } +#[derive(Debug, Hash, Clone)] pub struct Download<I> { id: I, url: String, } -// Make sure iced can use our download stream -impl<H, I, T> iced_native::subscription::Recipe<H, I> for Download<T> -where - T: 'static + Hash + Copy + Send, - H: Hasher, -{ - type Output = (T, Progress); - - fn hash(&self, state: &mut H) { - struct Marker; - std::any::TypeId::of::<Marker>().hash(state); - - self.id.hash(state); - } +async fn download<I: Copy>( + id: I, + state: State, +) -> (Option<(I, Progress)>, State) { + match state { + State::Ready(url) => { + let response = reqwest::get(&url).await; - fn stream( - self: Box<Self>, - _input: futures::stream::BoxStream<'static, I>, - ) -> futures::stream::BoxStream<'static, Self::Output> { - let id = self.id; + match response { + Ok(response) => { + if let Some(total) = response.content_length() { + ( + Some((id, Progress::Started)), + State::Downloading { + response, + total, + downloaded: 0, + }, + ) + } else { + (Some((id, Progress::Errored)), State::Finished) + } + } + Err(_) => (Some((id, Progress::Errored)), State::Finished), + } + } + State::Downloading { + mut response, + total, + downloaded, + } => match response.chunk().await { + Ok(Some(chunk)) => { + let downloaded = downloaded + chunk.len() as u64; - Box::pin(futures::stream::unfold( - State::Ready(self.url), - move |state| async move { - match state { - State::Ready(url) => { - let response = reqwest::get(&url).await; + let percentage = (downloaded as f32 / total as f32) * 100.0; - match response { - Ok(response) => { - if let Some(total) = response.content_length() { - Some(( - (id, Progress::Started), - State::Downloading { - response, - total, - downloaded: 0, - }, - )) - } else { - Some(( - (id, Progress::Errored), - State::Finished, - )) - } - } - Err(_) => { - Some(((id, Progress::Errored), State::Finished)) - } - } - } + ( + Some((id, Progress::Advanced(percentage))), State::Downloading { - mut response, + response, total, downloaded, - } => match response.chunk().await { - Ok(Some(chunk)) => { - let downloaded = downloaded + chunk.len() as u64; - - let percentage = - (downloaded as f32 / total as f32) * 100.0; - - Some(( - (id, Progress::Advanced(percentage)), - State::Downloading { - response, - total, - downloaded, - }, - )) - } - Ok(None) => { - Some(((id, Progress::Finished), State::Finished)) - } - Err(_) => { - Some(((id, Progress::Errored), State::Finished)) - } }, - State::Finished => { - // We do not let the stream die, as it would start a - // new download repeatedly if the user is not careful - // in case of errors. - let _: () = iced::futures::future::pending().await; + ) + } + Ok(None) => (Some((id, Progress::Finished)), State::Finished), + Err(_) => (Some((id, Progress::Errored)), State::Finished), + }, + State::Finished => { + // We do not let the stream die, as it would start a + // new download repeatedly if the user is not careful + // in case of errors. + let _: () = iced::futures::future::pending().await; - None - } - } - }, - )) + unreachable!() + } } } diff --git a/examples/websocket/Cargo.toml b/examples/websocket/Cargo.toml new file mode 100644 index 00000000..6b4d9d10 --- /dev/null +++ b/examples/websocket/Cargo.toml @@ -0,0 +1,22 @@ +[package] +name = "websocket" +version = "0.1.0" +authors = ["Héctor Ramón Jiménez <hector0193@gmail.com>"] +edition = "2018" +publish = false + +[dependencies] +iced = { path = "../..", features = ["tokio", "debug"] } +iced_native = { path = "../../native" } +iced_futures = { path = "../../futures" } + +[dependencies.async-tungstenite] +version = "0.16" +features = ["tokio-rustls-webpki-roots"] + +[dependencies.tokio] +version = "1" +features = ["time"] + +[dependencies.warp] +version = "0.3" diff --git a/examples/websocket/README.md b/examples/websocket/README.md new file mode 100644 index 00000000..16e983da --- /dev/null +++ b/examples/websocket/README.md @@ -0,0 +1,17 @@ +## Websocket + +A simple example that keeps a WebSocket connection open to an echo server. + +The example consists of 3 modules: +- [`main`] contains the `Application` logic. +- [`echo`] implements a WebSocket client for the [`echo::server`] with `async-tungstenite`. +- [`echo::server`] implements a simple WebSocket echo server with `warp`. + +You can run it with `cargo run`: +``` +cargo run --package websocket +``` + +[`main`]: src/main.rs +[`echo`]: src/echo.rs +[`echo::server`]: src/echo/server.rs diff --git a/examples/websocket/src/echo.rs b/examples/websocket/src/echo.rs new file mode 100644 index 00000000..13596ddd --- /dev/null +++ b/examples/websocket/src/echo.rs @@ -0,0 +1,146 @@ +pub mod server; + +use iced_futures::futures; +use iced_native::subscription::{self, Subscription}; + +use futures::channel::mpsc; +use futures::sink::SinkExt; +use futures::stream::StreamExt; + +use async_tungstenite::tungstenite; + +pub fn connect() -> Subscription<Event> { + struct Connect; + + subscription::unfold( + std::any::TypeId::of::<Connect>(), + State::Disconnected, + |state| async move { + match state { + State::Disconnected => { + const ECHO_SERVER: &str = "ws://localhost:3030"; + + match async_tungstenite::tokio::connect_async(ECHO_SERVER) + .await + { + Ok((websocket, _)) => { + let (sender, receiver) = mpsc::channel(100); + + ( + Some(Event::Connected(Connection(sender))), + State::Connected(websocket, receiver), + ) + } + Err(_) => { + let _ = tokio::time::sleep( + tokio::time::Duration::from_secs(1), + ) + .await; + + (Some(Event::Disconnected), State::Disconnected) + } + } + } + State::Connected(mut websocket, mut input) => { + let mut fused_websocket = websocket.by_ref().fuse(); + + futures::select! { + received = fused_websocket.select_next_some() => { + match received { + Ok(tungstenite::Message::Text(message)) => { + ( + Some(Event::MessageReceived(Message::User(message))), + State::Connected(websocket, input) + ) + } + Ok(_) => { + (None, State::Connected(websocket, input)) + } + Err(_) => { + (Some(Event::Disconnected), State::Disconnected) + } + } + } + + message = input.select_next_some() => { + let result = websocket.send(tungstenite::Message::Text(String::from(message))).await; + + if result.is_ok() { + (None, State::Connected(websocket, input)) + } else { + (Some(Event::Disconnected), State::Disconnected) + } + } + } + } + } + }, + ) +} + +#[derive(Debug)] +enum State { + Disconnected, + Connected( + async_tungstenite::WebSocketStream< + async_tungstenite::tokio::ConnectStream, + >, + mpsc::Receiver<Message>, + ), +} + +#[derive(Debug, Clone)] +pub enum Event { + Connected(Connection), + Disconnected, + MessageReceived(Message), +} + +#[derive(Debug, Clone)] +pub struct Connection(mpsc::Sender<Message>); + +impl Connection { + pub fn send(&mut self, message: Message) { + let _ = self + .0 + .try_send(message) + .expect("Send message to echo server"); + } +} + +#[derive(Debug, Clone)] +pub enum Message { + Connected, + Disconnected, + User(String), +} + +impl Message { + pub fn new(message: &str) -> Option<Self> { + if message.is_empty() { + None + } else { + Some(Self::User(message.to_string())) + } + } + + pub fn connected() -> Self { + Message::Connected + } + + pub fn disconnected() -> Self { + Message::Disconnected + } +} + +impl From<Message> for String { + fn from(message: Message) -> Self { + match message { + Message::Connected => String::from("Connected successfully!"), + Message::Disconnected => { + String::from("Connection lost... Retrying...") + } + Message::User(message) => message, + } + } +} diff --git a/examples/websocket/src/echo/server.rs b/examples/websocket/src/echo/server.rs new file mode 100644 index 00000000..7702d417 --- /dev/null +++ b/examples/websocket/src/echo/server.rs @@ -0,0 +1,57 @@ +use iced_futures::futures; + +use futures::channel::mpsc; +use futures::{SinkExt, StreamExt}; +use warp::ws::WebSocket; +use warp::Filter; + +// Basic WebSocket echo server adapted from: +// https://github.com/seanmonstar/warp/blob/3ff2eaf41eb5ac9321620e5a6434d5b5ec6f313f/examples/websockets_chat.rs +// +// Copyright (c) 2018-2020 Sean McArthur +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. +pub async fn run() { + let routes = warp::path::end().and(warp::ws()).map(|ws: warp::ws::Ws| { + ws.on_upgrade(move |socket| user_connected(socket)) + }); + + warp::serve(routes).run(([127, 0, 0, 1], 3030)).await; +} + +async fn user_connected(ws: WebSocket) { + let (mut user_ws_tx, mut user_ws_rx) = ws.split(); + let (mut tx, mut rx) = mpsc::unbounded(); + + tokio::task::spawn(async move { + while let Some(message) = rx.next().await { + let _ = user_ws_tx.send(message).await.unwrap_or_else(|e| { + eprintln!("websocket send error: {}", e); + }); + } + }); + + while let Some(result) = user_ws_rx.next().await { + let msg = match result { + Ok(msg) => msg, + Err(_) => break, + }; + + let _ = tx.send(msg).await; + } +} diff --git a/examples/websocket/src/main.rs b/examples/websocket/src/main.rs new file mode 100644 index 00000000..c03a9f3a --- /dev/null +++ b/examples/websocket/src/main.rs @@ -0,0 +1,162 @@ +mod echo; + +use iced::alignment::{self, Alignment}; +use iced::button::{self, Button}; +use iced::executor; +use iced::scrollable::{self, Scrollable}; +use iced::text_input::{self, TextInput}; +use iced::{ + Application, Color, Column, Command, Container, Element, Length, Row, + Settings, Subscription, Text, +}; + +pub fn main() -> iced::Result { + WebSocket::run(Settings::default()) +} + +#[derive(Default)] +struct WebSocket { + messages: Vec<echo::Message>, + message_log: scrollable::State, + new_message: String, + new_message_state: text_input::State, + new_message_button: button::State, + state: State, +} + +#[derive(Debug, Clone)] +enum Message { + NewMessageChanged(String), + Send(echo::Message), + Echo(echo::Event), + Server, +} + +impl Application for WebSocket { + type Message = Message; + type Flags = (); + type Executor = executor::Default; + + fn new(_flags: Self::Flags) -> (Self, Command<Message>) { + ( + Self::default(), + Command::perform(echo::server::run(), |_| Message::Server), + ) + } + + fn title(&self) -> String { + String::from("WebSocket - Iced") + } + + fn update(&mut self, message: Message) -> Command<Message> { + match message { + Message::NewMessageChanged(new_message) => { + self.new_message = new_message; + } + Message::Send(message) => match &mut self.state { + State::Connected(connection) => { + self.new_message.clear(); + + connection.send(message); + } + State::Disconnected => {} + }, + Message::Echo(event) => match event { + echo::Event::Connected(connection) => { + self.state = State::Connected(connection); + + self.messages.push(echo::Message::connected()); + } + echo::Event::Disconnected => { + self.state = State::Disconnected; + + self.messages.push(echo::Message::disconnected()); + } + echo::Event::MessageReceived(message) => { + self.messages.push(message); + self.message_log.snap_to(1.0); + } + }, + Message::Server => {} + } + + Command::none() + } + + fn subscription(&self) -> Subscription<Message> { + echo::connect().map(Message::Echo) + } + + fn view(&mut self) -> Element<Message> { + let message_log = if self.messages.is_empty() { + Container::new( + Text::new("Your messages will appear here...") + .color(Color::from_rgb8(0x88, 0x88, 0x88)), + ) + .width(Length::Fill) + .height(Length::Fill) + .center_x() + .center_y() + .into() + } else { + self.messages + .iter() + .cloned() + .fold( + Scrollable::new(&mut self.message_log), + |scrollable, message| scrollable.push(Text::new(message)), + ) + .width(Length::Fill) + .height(Length::Fill) + .spacing(10) + .into() + }; + + let new_message_input = { + let mut input = TextInput::new( + &mut self.new_message_state, + "Type a message...", + &self.new_message, + Message::NewMessageChanged, + ) + .padding(10); + + let mut button = Button::new( + &mut self.new_message_button, + Text::new("Send") + .height(Length::Fill) + .vertical_alignment(alignment::Vertical::Center), + ) + .padding([0, 20]); + + if matches!(self.state, State::Connected(_)) { + if let Some(message) = echo::Message::new(&self.new_message) { + input = input.on_submit(Message::Send(message.clone())); + button = button.on_press(Message::Send(message)); + } + } + + Row::with_children(vec![input.into(), button.into()]) + .spacing(10) + .align_items(Alignment::Fill) + }; + + Column::with_children(vec![message_log, new_message_input.into()]) + .width(Length::Fill) + .height(Length::Fill) + .padding(20) + .spacing(10) + .into() + } +} + +enum State { + Disconnected, + Connected(echo::Connection), +} + +impl Default for State { + fn default() -> Self { + Self::Disconnected + } +} diff --git a/native/src/subscription.rs b/native/src/subscription.rs index 2950879c..63834654 100644 --- a/native/src/subscription.rs +++ b/native/src/subscription.rs @@ -1,8 +1,12 @@ //! Listen to external events in your application. use crate::event::{self, Event}; use crate::Hasher; + +use iced_futures::futures::{self, Future, Stream}; use iced_futures::BoxStream; +use std::hash::Hash; + /// A request to listen to external events. /// /// Besides performing async actions on demand with [`Command`], most @@ -29,20 +33,14 @@ pub type Tracker = pub use iced_futures::subscription::Recipe; -mod events; - -use events::Events; - /// Returns a [`Subscription`] to all the runtime events. /// /// This subscription will notify your application of any [`Event`] that was /// not captured by any widget. pub fn events() -> Subscription<Event> { - Subscription::from_recipe(Events { - f: |event, status| match status { - event::Status::Ignored => Some(event), - event::Status::Captured => None, - }, + events_with(|event, status| match status { + event::Status::Ignored => Some(event), + event::Status::Captured => None, }) } @@ -60,5 +58,151 @@ pub fn events_with<Message>( where Message: 'static + Send, { - Subscription::from_recipe(Events { f }) + Subscription::from_recipe(Runner { + id: f, + spawn: move |events| { + use futures::future; + use futures::stream::StreamExt; + + events.filter_map(move |(event, status)| { + future::ready(f(event, status)) + }) + }, + }) +} + +/// Returns a [`Subscription`] that will create and asynchronously run the +/// given [`Stream`]. +/// +/// The `id` will be used to uniquely identify the [`Subscription`]. +pub fn run<I, S, Message>(id: I, stream: S) -> Subscription<Message> +where + I: Hash + 'static, + S: Stream<Item = Message> + Send + 'static, + Message: 'static, +{ + Subscription::from_recipe(Runner { + id, + spawn: move |_| stream, + }) +} + +/// Returns a [`Subscription`] that will create and asynchronously run a +/// [`Stream`] that will call the provided closure to produce every `Message`. +/// +/// The `id` will be used to uniquely identify the [`Subscription`]. +/// +/// # Creating an asynchronous worker with bidirectional communication +/// You can leverage this helper to create a [`Subscription`] that spawns +/// an asynchronous worker in the background and establish a channel of +/// communication with an `iced` application. +/// +/// You can achieve this by creating an `mpsc` channel inside the closure +/// and returning the `Sender` as a `Message` for the `Application`: +/// +/// ``` +/// use iced_native::subscription::{self, Subscription}; +/// use iced_native::futures::channel::mpsc; +/// +/// pub enum Event { +/// Ready(mpsc::Sender<Input>), +/// WorkFinished, +/// // ... +/// } +/// +/// enum Input { +/// DoSomeWork, +/// // ... +/// } +/// +/// enum State { +/// Starting, +/// Ready(mpsc::Receiver<Input>), +/// } +/// +/// fn some_worker() -> Subscription<Event> { +/// struct SomeWorker; +/// +/// subscription::unfold(std::any::TypeId::of::<SomeWorker>(), State::Starting, |state| async move { +/// match state { +/// State::Starting => { +/// // Create channel +/// let (sender, receiver) = mpsc::channel(100); +/// +/// (Some(Event::Ready(sender)), State::Ready(receiver)) +/// } +/// State::Ready(mut receiver) => { +/// use iced_native::futures::StreamExt; +/// +/// // Read next input sent from `Application` +/// let input = receiver.select_next_some().await; +/// +/// match input { +/// Input::DoSomeWork => { +/// // Do some async work... +/// +/// // Finally, we can optionally return a message to tell the +/// // `Application` the work is done +/// (Some(Event::WorkFinished), State::Ready(receiver)) +/// } +/// } +/// } +/// } +/// }) +/// } +/// ``` +/// +/// Check out the [`websocket`] example, which showcases this pattern to maintain a WebSocket +/// connection open. +/// +/// [`websocket`]: https://github.com/iced-rs/iced/tree/0.4/examples/websocket +pub fn unfold<I, T, Fut, Message>( + id: I, + initial: T, + mut f: impl FnMut(T) -> Fut + Send + Sync + 'static, +) -> Subscription<Message> +where + I: Hash + 'static, + T: Send + 'static, + Fut: Future<Output = (Option<Message>, T)> + Send + 'static, + Message: 'static + Send, +{ + use futures::future::{self, FutureExt}; + use futures::stream::StreamExt; + + run( + id, + futures::stream::unfold(initial, move |state| f(state).map(Some)) + .filter_map(future::ready), + ) +} + +struct Runner<I, F, S, Message> +where + F: FnOnce(EventStream) -> S, + S: Stream<Item = Message>, +{ + id: I, + spawn: F, +} + +impl<I, S, F, Message> Recipe<Hasher, (Event, event::Status)> + for Runner<I, F, S, Message> +where + I: Hash + 'static, + F: FnOnce(EventStream) -> S, + S: Stream<Item = Message> + Send + 'static, +{ + type Output = Message; + + fn hash(&self, state: &mut Hasher) { + std::any::TypeId::of::<I>().hash(state); + self.id.hash(state); + } + + fn stream(self: Box<Self>, input: EventStream) -> BoxStream<Self::Output> { + use futures::stream::StreamExt; + + (self.spawn)(input).boxed() + } } diff --git a/native/src/subscription/events.rs b/native/src/subscription/events.rs deleted file mode 100644 index ca143bb3..00000000 --- a/native/src/subscription/events.rs +++ /dev/null @@ -1,35 +0,0 @@ -use crate::event::{self, Event}; -use crate::subscription::{EventStream, Recipe}; -use crate::Hasher; -use iced_futures::futures::future; -use iced_futures::futures::StreamExt; -use iced_futures::BoxStream; - -pub struct Events<Message> { - pub(super) f: fn(Event, event::Status) -> Option<Message>, -} - -impl<Message> Recipe<Hasher, (Event, event::Status)> for Events<Message> -where - Message: 'static + Send, -{ - type Output = Message; - - fn hash(&self, state: &mut Hasher) { - use std::hash::Hash; - - struct Marker; - std::any::TypeId::of::<Marker>().hash(state); - self.f.hash(state); - } - - fn stream( - self: Box<Self>, - event_stream: EventStream, - ) -> BoxStream<Self::Output> { - let stream = event_stream.filter_map(move |(event, status)| { - future::ready((self.f)(event, status)) - }); - iced_futures::boxed_stream(stream) - } -} |