summaryrefslogtreecommitdiffstats
path: root/examples/websocket/src/echo.rs
diff options
context:
space:
mode:
Diffstat (limited to 'examples/websocket/src/echo.rs')
-rw-r--r--examples/websocket/src/echo.rs146
1 files changed, 146 insertions, 0 deletions
diff --git a/examples/websocket/src/echo.rs b/examples/websocket/src/echo.rs
new file mode 100644
index 00000000..13596ddd
--- /dev/null
+++ b/examples/websocket/src/echo.rs
@@ -0,0 +1,146 @@
+pub mod server;
+
+use iced_futures::futures;
+use iced_native::subscription::{self, Subscription};
+
+use futures::channel::mpsc;
+use futures::sink::SinkExt;
+use futures::stream::StreamExt;
+
+use async_tungstenite::tungstenite;
+
+pub fn connect() -> Subscription<Event> {
+ struct Connect;
+
+ subscription::unfold(
+ std::any::TypeId::of::<Connect>(),
+ State::Disconnected,
+ |state| async move {
+ match state {
+ State::Disconnected => {
+ const ECHO_SERVER: &str = "ws://localhost:3030";
+
+ match async_tungstenite::tokio::connect_async(ECHO_SERVER)
+ .await
+ {
+ Ok((websocket, _)) => {
+ let (sender, receiver) = mpsc::channel(100);
+
+ (
+ Some(Event::Connected(Connection(sender))),
+ State::Connected(websocket, receiver),
+ )
+ }
+ Err(_) => {
+ let _ = tokio::time::sleep(
+ tokio::time::Duration::from_secs(1),
+ )
+ .await;
+
+ (Some(Event::Disconnected), State::Disconnected)
+ }
+ }
+ }
+ State::Connected(mut websocket, mut input) => {
+ let mut fused_websocket = websocket.by_ref().fuse();
+
+ futures::select! {
+ received = fused_websocket.select_next_some() => {
+ match received {
+ Ok(tungstenite::Message::Text(message)) => {
+ (
+ Some(Event::MessageReceived(Message::User(message))),
+ State::Connected(websocket, input)
+ )
+ }
+ Ok(_) => {
+ (None, State::Connected(websocket, input))
+ }
+ Err(_) => {
+ (Some(Event::Disconnected), State::Disconnected)
+ }
+ }
+ }
+
+ message = input.select_next_some() => {
+ let result = websocket.send(tungstenite::Message::Text(String::from(message))).await;
+
+ if result.is_ok() {
+ (None, State::Connected(websocket, input))
+ } else {
+ (Some(Event::Disconnected), State::Disconnected)
+ }
+ }
+ }
+ }
+ }
+ },
+ )
+}
+
+#[derive(Debug)]
+enum State {
+ Disconnected,
+ Connected(
+ async_tungstenite::WebSocketStream<
+ async_tungstenite::tokio::ConnectStream,
+ >,
+ mpsc::Receiver<Message>,
+ ),
+}
+
+#[derive(Debug, Clone)]
+pub enum Event {
+ Connected(Connection),
+ Disconnected,
+ MessageReceived(Message),
+}
+
+#[derive(Debug, Clone)]
+pub struct Connection(mpsc::Sender<Message>);
+
+impl Connection {
+ pub fn send(&mut self, message: Message) {
+ let _ = self
+ .0
+ .try_send(message)
+ .expect("Send message to echo server");
+ }
+}
+
+#[derive(Debug, Clone)]
+pub enum Message {
+ Connected,
+ Disconnected,
+ User(String),
+}
+
+impl Message {
+ pub fn new(message: &str) -> Option<Self> {
+ if message.is_empty() {
+ None
+ } else {
+ Some(Self::User(message.to_string()))
+ }
+ }
+
+ pub fn connected() -> Self {
+ Message::Connected
+ }
+
+ pub fn disconnected() -> Self {
+ Message::Disconnected
+ }
+}
+
+impl From<Message> for String {
+ fn from(message: Message) -> Self {
+ match message {
+ Message::Connected => String::from("Connected successfully!"),
+ Message::Disconnected => {
+ String::from("Connection lost... Retrying...")
+ }
+ Message::User(message) => message,
+ }
+ }
+}