diff options
Diffstat (limited to '')
-rw-r--r-- | examples/websocket/src/echo.rs | 146 | ||||
-rw-r--r-- | examples/websocket/src/echo/server.rs | 57 |
2 files changed, 203 insertions, 0 deletions
diff --git a/examples/websocket/src/echo.rs b/examples/websocket/src/echo.rs new file mode 100644 index 00000000..13596ddd --- /dev/null +++ b/examples/websocket/src/echo.rs @@ -0,0 +1,146 @@ +pub mod server; + +use iced_futures::futures; +use iced_native::subscription::{self, Subscription}; + +use futures::channel::mpsc; +use futures::sink::SinkExt; +use futures::stream::StreamExt; + +use async_tungstenite::tungstenite; + +pub fn connect() -> Subscription<Event> { + struct Connect; + + subscription::unfold( + std::any::TypeId::of::<Connect>(), + State::Disconnected, + |state| async move { + match state { + State::Disconnected => { + const ECHO_SERVER: &str = "ws://localhost:3030"; + + match async_tungstenite::tokio::connect_async(ECHO_SERVER) + .await + { + Ok((websocket, _)) => { + let (sender, receiver) = mpsc::channel(100); + + ( + Some(Event::Connected(Connection(sender))), + State::Connected(websocket, receiver), + ) + } + Err(_) => { + let _ = tokio::time::sleep( + tokio::time::Duration::from_secs(1), + ) + .await; + + (Some(Event::Disconnected), State::Disconnected) + } + } + } + State::Connected(mut websocket, mut input) => { + let mut fused_websocket = websocket.by_ref().fuse(); + + futures::select! { + received = fused_websocket.select_next_some() => { + match received { + Ok(tungstenite::Message::Text(message)) => { + ( + Some(Event::MessageReceived(Message::User(message))), + State::Connected(websocket, input) + ) + } + Ok(_) => { + (None, State::Connected(websocket, input)) + } + Err(_) => { + (Some(Event::Disconnected), State::Disconnected) + } + } + } + + message = input.select_next_some() => { + let result = websocket.send(tungstenite::Message::Text(String::from(message))).await; + + if result.is_ok() { + (None, State::Connected(websocket, input)) + } else { + (Some(Event::Disconnected), State::Disconnected) + } + } + } + } + } + }, + ) +} + +#[derive(Debug)] +enum State { + Disconnected, + Connected( + async_tungstenite::WebSocketStream< + async_tungstenite::tokio::ConnectStream, + >, + mpsc::Receiver<Message>, + ), +} + +#[derive(Debug, Clone)] +pub enum Event { + Connected(Connection), + Disconnected, + MessageReceived(Message), +} + +#[derive(Debug, Clone)] +pub struct Connection(mpsc::Sender<Message>); + +impl Connection { + pub fn send(&mut self, message: Message) { + let _ = self + .0 + .try_send(message) + .expect("Send message to echo server"); + } +} + +#[derive(Debug, Clone)] +pub enum Message { + Connected, + Disconnected, + User(String), +} + +impl Message { + pub fn new(message: &str) -> Option<Self> { + if message.is_empty() { + None + } else { + Some(Self::User(message.to_string())) + } + } + + pub fn connected() -> Self { + Message::Connected + } + + pub fn disconnected() -> Self { + Message::Disconnected + } +} + +impl From<Message> for String { + fn from(message: Message) -> Self { + match message { + Message::Connected => String::from("Connected successfully!"), + Message::Disconnected => { + String::from("Connection lost... Retrying...") + } + Message::User(message) => message, + } + } +} diff --git a/examples/websocket/src/echo/server.rs b/examples/websocket/src/echo/server.rs new file mode 100644 index 00000000..7702d417 --- /dev/null +++ b/examples/websocket/src/echo/server.rs @@ -0,0 +1,57 @@ +use iced_futures::futures; + +use futures::channel::mpsc; +use futures::{SinkExt, StreamExt}; +use warp::ws::WebSocket; +use warp::Filter; + +// Basic WebSocket echo server adapted from: +// https://github.com/seanmonstar/warp/blob/3ff2eaf41eb5ac9321620e5a6434d5b5ec6f313f/examples/websockets_chat.rs +// +// Copyright (c) 2018-2020 Sean McArthur +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. +pub async fn run() { + let routes = warp::path::end().and(warp::ws()).map(|ws: warp::ws::Ws| { + ws.on_upgrade(move |socket| user_connected(socket)) + }); + + warp::serve(routes).run(([127, 0, 0, 1], 3030)).await; +} + +async fn user_connected(ws: WebSocket) { + let (mut user_ws_tx, mut user_ws_rx) = ws.split(); + let (mut tx, mut rx) = mpsc::unbounded(); + + tokio::task::spawn(async move { + while let Some(message) = rx.next().await { + let _ = user_ws_tx.send(message).await.unwrap_or_else(|e| { + eprintln!("websocket send error: {}", e); + }); + } + }); + + while let Some(result) = user_ws_rx.next().await { + let msg = match result { + Ok(msg) => msg, + Err(_) => break, + }; + + let _ = tx.send(msg).await; + } +} |