diff options
author | 2022-01-16 15:50:19 +0700 | |
---|---|---|
committer | 2022-01-16 15:50:19 +0700 | |
commit | dc50a2830ab553dfc5dfc28d4fd1af6b3981c656 (patch) | |
tree | 4ff8bce15f88a0ca610551fdd81afa968f7fda58 | |
parent | 2f557731f313ccc94b5d0ebb4ee5603624670daf (diff) | |
download | iced-dc50a2830ab553dfc5dfc28d4fd1af6b3981c656.tar.gz iced-dc50a2830ab553dfc5dfc28d4fd1af6b3981c656.tar.bz2 iced-dc50a2830ab553dfc5dfc28d4fd1af6b3981c656.zip |
Draft `websocket` example :tada:
-rw-r--r-- | Cargo.toml | 1 | ||||
-rw-r--r-- | examples/download_progress/src/download.rs | 1 | ||||
-rw-r--r-- | examples/websocket/Cargo.toml | 22 | ||||
-rw-r--r-- | examples/websocket/README.md | 12 | ||||
-rw-r--r-- | examples/websocket/src/echo.rs | 146 | ||||
-rw-r--r-- | examples/websocket/src/echo/server.rs | 57 | ||||
-rw-r--r-- | examples/websocket/src/main.rs | 162 | ||||
-rw-r--r-- | native/src/subscription.rs | 98 |
8 files changed, 426 insertions, 73 deletions
@@ -88,6 +88,7 @@ members = [ "examples/tooltip", "examples/tour", "examples/url_handler", + "examples/websocket", ] [dependencies] diff --git a/examples/download_progress/src/download.rs b/examples/download_progress/src/download.rs index 06d5d3fd..20682e7a 100644 --- a/examples/download_progress/src/download.rs +++ b/examples/download_progress/src/download.rs @@ -10,6 +10,7 @@ pub fn file<I: 'static + Hash + Copy + Send, T: ToString>( url: T, ) -> iced::Subscription<(I, Progress)> { subscription::run( + id, Download { id, url: url.to_string(), diff --git a/examples/websocket/Cargo.toml b/examples/websocket/Cargo.toml new file mode 100644 index 00000000..6b4d9d10 --- /dev/null +++ b/examples/websocket/Cargo.toml @@ -0,0 +1,22 @@ +[package] +name = "websocket" +version = "0.1.0" +authors = ["Héctor Ramón Jiménez <hector0193@gmail.com>"] +edition = "2018" +publish = false + +[dependencies] +iced = { path = "../..", features = ["tokio", "debug"] } +iced_native = { path = "../../native" } +iced_futures = { path = "../../futures" } + +[dependencies.async-tungstenite] +version = "0.16" +features = ["tokio-rustls-webpki-roots"] + +[dependencies.tokio] +version = "1" +features = ["time"] + +[dependencies.warp] +version = "0.3" diff --git a/examples/websocket/README.md b/examples/websocket/README.md new file mode 100644 index 00000000..fcdee9f3 --- /dev/null +++ b/examples/websocket/README.md @@ -0,0 +1,12 @@ +## Websocket + +A simple example that keeps a WebSocket connection open to an echo server. + +The __[`main`]__ file contains all the code of the example. + +You can run it with `cargo run`: +``` +cargo run --package websocket +``` + +[`main`]: src/main.rs diff --git a/examples/websocket/src/echo.rs b/examples/websocket/src/echo.rs new file mode 100644 index 00000000..13596ddd --- /dev/null +++ b/examples/websocket/src/echo.rs @@ -0,0 +1,146 @@ +pub mod server; + +use iced_futures::futures; +use iced_native::subscription::{self, Subscription}; + +use futures::channel::mpsc; +use futures::sink::SinkExt; +use futures::stream::StreamExt; + +use async_tungstenite::tungstenite; + +pub fn connect() -> Subscription<Event> { + struct Connect; + + subscription::unfold( + std::any::TypeId::of::<Connect>(), + State::Disconnected, + |state| async move { + match state { + State::Disconnected => { + const ECHO_SERVER: &str = "ws://localhost:3030"; + + match async_tungstenite::tokio::connect_async(ECHO_SERVER) + .await + { + Ok((websocket, _)) => { + let (sender, receiver) = mpsc::channel(100); + + ( + Some(Event::Connected(Connection(sender))), + State::Connected(websocket, receiver), + ) + } + Err(_) => { + let _ = tokio::time::sleep( + tokio::time::Duration::from_secs(1), + ) + .await; + + (Some(Event::Disconnected), State::Disconnected) + } + } + } + State::Connected(mut websocket, mut input) => { + let mut fused_websocket = websocket.by_ref().fuse(); + + futures::select! { + received = fused_websocket.select_next_some() => { + match received { + Ok(tungstenite::Message::Text(message)) => { + ( + Some(Event::MessageReceived(Message::User(message))), + State::Connected(websocket, input) + ) + } + Ok(_) => { + (None, State::Connected(websocket, input)) + } + Err(_) => { + (Some(Event::Disconnected), State::Disconnected) + } + } + } + + message = input.select_next_some() => { + let result = websocket.send(tungstenite::Message::Text(String::from(message))).await; + + if result.is_ok() { + (None, State::Connected(websocket, input)) + } else { + (Some(Event::Disconnected), State::Disconnected) + } + } + } + } + } + }, + ) +} + +#[derive(Debug)] +enum State { + Disconnected, + Connected( + async_tungstenite::WebSocketStream< + async_tungstenite::tokio::ConnectStream, + >, + mpsc::Receiver<Message>, + ), +} + +#[derive(Debug, Clone)] +pub enum Event { + Connected(Connection), + Disconnected, + MessageReceived(Message), +} + +#[derive(Debug, Clone)] +pub struct Connection(mpsc::Sender<Message>); + +impl Connection { + pub fn send(&mut self, message: Message) { + let _ = self + .0 + .try_send(message) + .expect("Send message to echo server"); + } +} + +#[derive(Debug, Clone)] +pub enum Message { + Connected, + Disconnected, + User(String), +} + +impl Message { + pub fn new(message: &str) -> Option<Self> { + if message.is_empty() { + None + } else { + Some(Self::User(message.to_string())) + } + } + + pub fn connected() -> Self { + Message::Connected + } + + pub fn disconnected() -> Self { + Message::Disconnected + } +} + +impl From<Message> for String { + fn from(message: Message) -> Self { + match message { + Message::Connected => String::from("Connected successfully!"), + Message::Disconnected => { + String::from("Connection lost... Retrying...") + } + Message::User(message) => message, + } + } +} diff --git a/examples/websocket/src/echo/server.rs b/examples/websocket/src/echo/server.rs new file mode 100644 index 00000000..7702d417 --- /dev/null +++ b/examples/websocket/src/echo/server.rs @@ -0,0 +1,57 @@ +use iced_futures::futures; + +use futures::channel::mpsc; +use futures::{SinkExt, StreamExt}; +use warp::ws::WebSocket; +use warp::Filter; + +// Basic WebSocket echo server adapted from: +// https://github.com/seanmonstar/warp/blob/3ff2eaf41eb5ac9321620e5a6434d5b5ec6f313f/examples/websockets_chat.rs +// +// Copyright (c) 2018-2020 Sean McArthur +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. +pub async fn run() { + let routes = warp::path::end().and(warp::ws()).map(|ws: warp::ws::Ws| { + ws.on_upgrade(move |socket| user_connected(socket)) + }); + + warp::serve(routes).run(([127, 0, 0, 1], 3030)).await; +} + +async fn user_connected(ws: WebSocket) { + let (mut user_ws_tx, mut user_ws_rx) = ws.split(); + let (mut tx, mut rx) = mpsc::unbounded(); + + tokio::task::spawn(async move { + while let Some(message) = rx.next().await { + let _ = user_ws_tx.send(message).await.unwrap_or_else(|e| { + eprintln!("websocket send error: {}", e); + }); + } + }); + + while let Some(result) = user_ws_rx.next().await { + let msg = match result { + Ok(msg) => msg, + Err(_) => break, + }; + + let _ = tx.send(msg).await; + } +} diff --git a/examples/websocket/src/main.rs b/examples/websocket/src/main.rs new file mode 100644 index 00000000..c03a9f3a --- /dev/null +++ b/examples/websocket/src/main.rs @@ -0,0 +1,162 @@ +mod echo; + +use iced::alignment::{self, Alignment}; +use iced::button::{self, Button}; +use iced::executor; +use iced::scrollable::{self, Scrollable}; +use iced::text_input::{self, TextInput}; +use iced::{ + Application, Color, Column, Command, Container, Element, Length, Row, + Settings, Subscription, Text, +}; + +pub fn main() -> iced::Result { + WebSocket::run(Settings::default()) +} + +#[derive(Default)] +struct WebSocket { + messages: Vec<echo::Message>, + message_log: scrollable::State, + new_message: String, + new_message_state: text_input::State, + new_message_button: button::State, + state: State, +} + +#[derive(Debug, Clone)] +enum Message { + NewMessageChanged(String), + Send(echo::Message), + Echo(echo::Event), + Server, +} + +impl Application for WebSocket { + type Message = Message; + type Flags = (); + type Executor = executor::Default; + + fn new(_flags: Self::Flags) -> (Self, Command<Message>) { + ( + Self::default(), + Command::perform(echo::server::run(), |_| Message::Server), + ) + } + + fn title(&self) -> String { + String::from("WebSocket - Iced") + } + + fn update(&mut self, message: Message) -> Command<Message> { + match message { + Message::NewMessageChanged(new_message) => { + self.new_message = new_message; + } + Message::Send(message) => match &mut self.state { + State::Connected(connection) => { + self.new_message.clear(); + + connection.send(message); + } + State::Disconnected => {} + }, + Message::Echo(event) => match event { + echo::Event::Connected(connection) => { + self.state = State::Connected(connection); + + self.messages.push(echo::Message::connected()); + } + echo::Event::Disconnected => { + self.state = State::Disconnected; + + self.messages.push(echo::Message::disconnected()); + } + echo::Event::MessageReceived(message) => { + self.messages.push(message); + self.message_log.snap_to(1.0); + } + }, + Message::Server => {} + } + + Command::none() + } + + fn subscription(&self) -> Subscription<Message> { + echo::connect().map(Message::Echo) + } + + fn view(&mut self) -> Element<Message> { + let message_log = if self.messages.is_empty() { + Container::new( + Text::new("Your messages will appear here...") + .color(Color::from_rgb8(0x88, 0x88, 0x88)), + ) + .width(Length::Fill) + .height(Length::Fill) + .center_x() + .center_y() + .into() + } else { + self.messages + .iter() + .cloned() + .fold( + Scrollable::new(&mut self.message_log), + |scrollable, message| scrollable.push(Text::new(message)), + ) + .width(Length::Fill) + .height(Length::Fill) + .spacing(10) + .into() + }; + + let new_message_input = { + let mut input = TextInput::new( + &mut self.new_message_state, + "Type a message...", + &self.new_message, + Message::NewMessageChanged, + ) + .padding(10); + + let mut button = Button::new( + &mut self.new_message_button, + Text::new("Send") + .height(Length::Fill) + .vertical_alignment(alignment::Vertical::Center), + ) + .padding([0, 20]); + + if matches!(self.state, State::Connected(_)) { + if let Some(message) = echo::Message::new(&self.new_message) { + input = input.on_submit(Message::Send(message.clone())); + button = button.on_press(Message::Send(message)); + } + } + + Row::with_children(vec![input.into(), button.into()]) + .spacing(10) + .align_items(Alignment::Fill) + }; + + Column::with_children(vec![message_log, new_message_input.into()]) + .width(Length::Fill) + .height(Length::Fill) + .padding(20) + .spacing(10) + .into() + } +} + +enum State { + Disconnected, + Connected(echo::Connection), +} + +impl Default for State { + fn default() -> Self { + Self::Disconnected + } +} diff --git a/native/src/subscription.rs b/native/src/subscription.rs index 85ad96fa..70cd269e 100644 --- a/native/src/subscription.rs +++ b/native/src/subscription.rs @@ -2,7 +2,6 @@ use crate::event::{self, Event}; use crate::Hasher; -use iced_futures::futures::channel::mpsc; use iced_futures::futures::{self, Future, Stream}; use iced_futures::BoxStream; @@ -59,21 +58,9 @@ pub fn events_with<Message>( where Message: 'static + Send, { - #[derive(Debug, Clone, Copy, Hash)] - struct Events(u64); - - let hash = { - use std::hash::Hasher as _; - - let mut hasher = Hasher::default(); - - f.hash(&mut hasher); - - hasher.finish() - }; - Subscription::from_recipe(Runner { - initial: Events(hash), + id: f, + initial: (), spawn: move |_, events| { use futures::future; use futures::stream::StreamExt; @@ -89,16 +76,19 @@ where /// [`Stream`] returned by the provided closure. /// /// The `initial` state will be used to uniquely identify the [`Subscription`]. -pub fn run<T, S, Message>( +pub fn run<I, T, S, Message>( + id: I, initial: T, f: impl FnOnce(T) -> S + 'static, ) -> Subscription<Message> where - Message: 'static, - T: Clone + Hash + 'static, + I: Hash + 'static, + T: 'static, S: Stream<Item = Message> + Send + 'static, + Message: 'static, { Subscription::from_recipe(Runner { + id, initial, spawn: move |initial, _| f(initial), }) @@ -108,79 +98,41 @@ where /// [`Stream`] that will call the provided closure to produce every `Message`. /// /// The `initial` state will be used to uniquely identify the [`Subscription`]. -pub fn unfold<T, Fut, Message>( +pub fn unfold<I, T, Fut, Message>( + id: I, initial: T, mut f: impl FnMut(T) -> Fut + Send + Sync + 'static, ) -> Subscription<Message> where - Message: 'static, - T: Clone + Hash + Send + 'static, - Fut: Future<Output = (Message, T)> + Send + 'static, -{ - use futures::future::FutureExt; - - run(initial, move |initial| { - futures::stream::unfold(initial, move |state| f(state).map(Some)) - }) -} - -/// Returns a [`Subscription`] that will open a channel and asynchronously run a -/// [`Stream`] that will call the provided closure to produce every `Message`. -/// -/// When the [`Subscription`] starts, an `on_ready` message will be produced -/// containing the [`mpsc::Sender`] end of the channel, which can be used by -/// the parent application to send `Input` to the running [`Subscription`]. -/// -/// The provided closure should use the [`mpsc::Receiver`] argument to await for -/// any `Input`. -/// -/// This function is really useful to create asynchronous workers with -/// bidirectional communication with a parent application. -/// -/// The `initial` state will be used to uniquely identify the [`Subscription`]. -pub fn worker<T, Fut, Message, Input>( - initial: T, - on_ready: impl FnOnce(mpsc::Sender<Input>) -> Message + 'static, - f: impl FnMut(T, &mut mpsc::Receiver<Input>) -> Fut + Send + Sync + 'static, -) -> Subscription<Message> -where - T: Clone + Hash + Send + 'static, - Fut: Future<Output = (Message, T)> + Send + 'static, - Message: Send + 'static, - Input: Send + 'static, + I: Hash + 'static, + T: Send + 'static, + Fut: Future<Output = (Option<Message>, T)> + Send + 'static, + Message: 'static + Send, { - use futures::future; + use futures::future::{self, FutureExt}; use futures::stream::StreamExt; - run(initial, move |initial| { - let (sender, receiver) = mpsc::channel(100); - - futures::stream::once(future::ready(on_ready(sender))).chain( - futures::stream::unfold( - (f, initial, receiver), - move |(mut f, state, mut receiver)| async { - let (message, state) = f(state, &mut receiver).await; - - Some((message, (f, state, receiver))) - }, - ), - ) + run(id, initial, move |initial| { + futures::stream::unfold(initial, move |state| f(state).map(Some)) + .filter_map(future::ready) }) } -struct Runner<T, F, S, Message> +struct Runner<I, T, F, S, Message> where F: FnOnce(T, EventStream) -> S, S: Stream<Item = Message>, { + id: I, initial: T, spawn: F, } -impl<T, S, F, Message> Recipe<Hasher, (Event, event::Status)> - for Runner<T, F, S, Message> +impl<I, T, S, F, Message> Recipe<Hasher, (Event, event::Status)> + for Runner<I, T, F, S, Message> where - T: Clone + Hash + 'static, + I: Hash + 'static, + T: 'static, F: FnOnce(T, EventStream) -> S, S: Stream<Item = Message> + Send + 'static, { @@ -189,7 +141,7 @@ where fn hash(&self, state: &mut Hasher) { std::any::TypeId::of::<T>().hash(state); - self.initial.hash(state); + self.id.hash(state); } fn stream(self: Box<Self>, input: EventStream) -> BoxStream<Self::Output> { |