diff options
author | 2025-04-13 16:45:53 +0100 | |
---|---|---|
committer | 2025-04-13 16:45:53 +0100 | |
commit | cf51dcf052af89f8742d887bde2c93d735309bdd (patch) | |
tree | fb7d8b7a313511de845db6cfd186f3b87c472154 | |
parent | 603777c5f4b3f8248378af6fe5af65b54e0f120e (diff) | |
download | luz-wasm.tar.gz luz-wasm.tar.bz2 luz-wasm.zip |
feat(lampada): wasm + websockets supportwasm
-rw-r--r-- | lampada/Cargo.toml | 11 | ||||
-rw-r--r-- | lampada/src/connection/mod.rs | 21 | ||||
-rw-r--r-- | lampada/src/connection/read.rs | 28 | ||||
-rw-r--r-- | lampada/src/connection/write.rs | 33 |
4 files changed, 58 insertions, 35 deletions
diff --git a/lampada/Cargo.toml b/lampada/Cargo.toml index c68f9c6..22f133d 100644 --- a/lampada/Cargo.toml +++ b/lampada/Cargo.toml @@ -7,8 +7,15 @@ edition = "2021" futures = "0.3.31" luz = { version = "0.1.0", path = "../luz" } peanuts = { version = "0.1.0", git = "https://bunny.garden/peanuts" } -jid = { version = "0.1.0", path = "../jid", features = ["sqlx"] } +jid = { version = "0.1.0", path = "../jid" } stanza = { version = "0.1.0", path = "../stanza", features = ["xep_0203"] } -tokio = "1.42.0" +tokio = { version = "1.42.0", features = ["macros"] } tracing = "0.1.41" thiserror = "2.0.11" + +[target.'cfg(target_arch = "wasm32")'.dependencies] +tokio = { version = "1.42.0", features = ["macros", "rt", "time"] } +wasm-bindgen-futures = "0.4" + +# [target.'cfg(not(target_arch = "wasm32"))'.dependencies] +# tokio = { version = "1.42.0", features = ["rt-multi-thread"] } diff --git a/lampada/src/connection/mod.rs b/lampada/src/connection/mod.rs index 2d570ae..a3dde16 100644 --- a/lampada/src/connection/mod.rs +++ b/lampada/src/connection/mod.rs @@ -8,7 +8,7 @@ use std::{ }; use jid::JID; -use luz::{connection::Tls, jabber_stream::bound_stream::BoundJabberStream}; +use luz::jabber_stream::bound_stream::BoundJabberStream; use read::{ReadControl, ReadControlHandle, ReadState}; use stanza::{client::Stanza, stream_error::Error as StreamError}; use tokio::{ @@ -86,13 +86,13 @@ impl<Lgc: Logic + Clone + Send + 'static> Supervisor<Lgc> { info!("sent disconnect command"); tokio::select! { _ = async { tokio::join!( - async { let _ = (&mut self.write_control_handle.handle).await; }, - async { let _ = (&mut self.read_control_handle.handle).await; } + // async { let _ = (&mut self.write_control_handle.handle).await; }, + // async { let _ = (&mut self.read_control_handle.handle).await; } ) } => {}, // TODO: config timeout _ = async { tokio::time::sleep(Duration::from_secs(5)) } => { - (&mut self.read_control_handle.handle).abort(); - (&mut self.write_control_handle.handle).abort(); + // (&mut self.read_control_handle.handle).abort(); + // (&mut self.write_control_handle.handle).abort(); } } info!("disconnected"); @@ -257,7 +257,8 @@ impl<Lgc: Logic + Clone + Send + 'static> Supervisor<Lgc> { pub struct SupervisorHandle { sender: SupervisorSender, - handle: JoinHandle<()>, + // TODO: is not having handles fine? + // handle: JoinHandle<()>, } impl Deref for SupervisorHandle { @@ -295,7 +296,7 @@ impl DerefMut for SupervisorSender { impl SupervisorHandle { pub fn new<Lgc: Logic + Clone + Send + 'static>( - streams: BoundJabberStream<Tls>, + streams: BoundJabberStream, on_crash: oneshot::Sender<()>, jid: JID, server: JID, @@ -341,13 +342,15 @@ impl SupervisorHandle { logic, ); - let handle = tokio::spawn(async move { actor.run().await }); + #[cfg(target_arch = "wasm32")] + wasm_bindgen_futures::spawn_local(async move { actor.run().await }); + #[cfg(not(target_arch = "wasm32"))] + tokio::spawn(async move { actor.run().await }); ( write_handle, Self { sender: supervisor_sender, - handle, }, ) } diff --git a/lampada/src/connection/read.rs b/lampada/src/connection/read.rs index 2c7eb58..4451d99 100644 --- a/lampada/src/connection/read.rs +++ b/lampada/src/connection/read.rs @@ -8,7 +8,7 @@ use std::{ }; use futures::{future::Fuse, FutureExt}; -use luz::{connection::Tls, jabber_stream::bound_stream::BoundJabberReader}; +use luz::jabber_stream::bound_stream::BoundJabberReader; use stanza::client::Stanza; use stanza::stream::Error as StreamErrorStanza; use stanza::stream_error::Error as StreamError; @@ -24,7 +24,7 @@ use super::{write::WriteHandle, SupervisorCommand, SupervisorSender}; /// read actor pub struct Read<Lgc> { - stream: BoundJabberReader<Tls>, + stream: BoundJabberReader, disconnecting: bool, disconnect_timedout: Fuse<oneshot::Receiver<()>>, @@ -51,7 +51,7 @@ pub struct ReadState { impl<Lgc> Read<Lgc> { fn new( - stream: BoundJabberReader<Tls>, + stream: BoundJabberReader, tasks: JoinSet<()>, connected: Connected, logic: Lgc, @@ -129,7 +129,7 @@ impl<Lgc: Clone + Logic + Send + 'static> Read<Lgc> { let stream_error = match e { peanuts::Error::ReadError(error) => None, peanuts::Error::Utf8Error(utf8_error) => Some(StreamError::UnsupportedEncoding), - peanuts::Error::ParseError(_) => Some(StreamError::BadFormat), + peanuts::Error::ParseError(_, _) => Some(StreamError::BadFormat), peanuts::Error::EntityProcessError(_) => Some(StreamError::RestrictedXml), peanuts::Error::InvalidCharRef(char_ref_error) => Some(StreamError::UnsupportedEncoding), peanuts::Error::DuplicateNameSpaceDeclaration(namespace_declaration) => Some(StreamError::NotWellFormed), @@ -140,6 +140,7 @@ impl<Lgc: Clone + Logic + Send + 'static> Read<Lgc> { peanuts::Error::UndeclaredNamespace(_) => Some(StreamError::InvalidNamespace), peanuts::Error::Deserialize(deserialize_error) => Some(StreamError::InvalidXml), peanuts::Error::RootElementEnded => Some(StreamError::InvalidXml), + _ => None, }; let _ = self.on_crash.send((stream_error, ReadState { supervisor_control: self.supervisor_control, tasks: self.tasks })); @@ -169,7 +170,8 @@ pub enum ReadControl { pub struct ReadControlHandle { sender: mpsc::Sender<ReadControl>, - pub(crate) handle: JoinHandle<()>, + // TODO: same here + // pub(crate) handle: JoinHandle<()>, } impl Deref for ReadControlHandle { @@ -188,7 +190,7 @@ impl DerefMut for ReadControlHandle { impl ReadControlHandle { pub fn new<Lgc: Clone + Logic + Send + 'static>( - stream: BoundJabberReader<Tls>, + stream: BoundJabberReader, connected: Connected, logic: Lgc, supervisor_control: SupervisorSender, @@ -205,16 +207,18 @@ impl ReadControlHandle { control_receiver, on_crash, ); - let handle = tokio::spawn(async move { actor.run().await }); + #[cfg(target_arch = "wasm32")] + wasm_bindgen_futures::spawn_local(async move { actor.run().await }); + #[cfg(not(target_arch = "wasm32"))] + tokio::spawn(async move { actor.run().await }); Self { sender: control_sender, - handle, } } pub fn reconnect<Lgc: Clone + Logic + Send + 'static>( - stream: BoundJabberReader<Tls>, + stream: BoundJabberReader, tasks: JoinSet<()>, connected: Connected, logic: Lgc, @@ -232,11 +236,13 @@ impl ReadControlHandle { control_receiver, on_crash, ); - let handle = tokio::spawn(async move { actor.run().await }); + #[cfg(target_arch = "wasm32")] + wasm_bindgen_futures::spawn_local(async move { actor.run().await }); + #[cfg(not(target_arch = "wasm32"))] + tokio::spawn(async move { actor.run().await }); Self { sender: control_sender, - handle, } } } diff --git a/lampada/src/connection/write.rs b/lampada/src/connection/write.rs index 1070cdf..4c6ed24 100644 --- a/lampada/src/connection/write.rs +++ b/lampada/src/connection/write.rs @@ -1,6 +1,6 @@ use std::ops::{Deref, DerefMut}; -use luz::{connection::Tls, jabber_stream::bound_stream::BoundJabberWriter}; +use luz::jabber_stream::bound_stream::BoundJabberWriter; use stanza::{ client::Stanza, stream::Error as StreamErrorStanza, stream_error::Error as StreamError, }; @@ -13,7 +13,7 @@ use crate::error::WriteError; /// actor that receives jabber stanzas to write, and if there is an error, sends a message back to the supervisor then aborts, so the supervisor can spawn a new stream. pub struct Write { - stream: BoundJabberWriter<Tls>, + stream: BoundJabberWriter, /// connection session write queue stanza_receiver: mpsc::Receiver<WriteMessage>, @@ -41,7 +41,7 @@ pub enum WriteControl { impl Write { fn new( - stream: BoundJabberWriter<Tls>, + stream: BoundJabberWriter, stanza_receiver: mpsc::Receiver<WriteMessage>, control_receiver: mpsc::Receiver<WriteControl>, on_crash: oneshot::Sender<(WriteMessage, WriteState)>, @@ -192,7 +192,7 @@ impl DerefMut for WriteHandle { pub struct WriteControlHandle { sender: mpsc::Sender<WriteControl>, - pub(crate) handle: JoinHandle<()>, + // pub(crate) handle: JoinHandle<()>, } impl Deref for WriteControlHandle { @@ -211,14 +211,17 @@ impl DerefMut for WriteControlHandle { impl WriteControlHandle { pub fn new( - stream: BoundJabberWriter<Tls>, + stream: BoundJabberWriter, on_crash: oneshot::Sender<(WriteMessage, WriteState)>, ) -> (WriteHandle, Self) { let (control_sender, control_receiver) = mpsc::channel(20); let (stanza_sender, stanza_receiver) = mpsc::channel(20); let actor = Write::new(stream, stanza_receiver, control_receiver, on_crash); - let handle = tokio::spawn(async move { actor.run().await }); + #[cfg(target_arch = "wasm32")] + wasm_bindgen_futures::spawn_local(async move { actor.run().await }); + #[cfg(not(target_arch = "wasm32"))] + tokio::spawn(async move { actor.run().await }); ( WriteHandle { @@ -226,13 +229,12 @@ impl WriteControlHandle { }, Self { sender: control_sender, - handle, }, ) } pub fn reconnect_retry( - stream: BoundJabberWriter<Tls>, + stream: BoundJabberWriter, on_crash: oneshot::Sender<(WriteMessage, WriteState)>, stanza_receiver: mpsc::Receiver<WriteMessage>, retry_msg: WriteMessage, @@ -240,27 +242,32 @@ impl WriteControlHandle { let (control_sender, control_receiver) = mpsc::channel(20); let actor = Write::new(stream, stanza_receiver, control_receiver, on_crash); - let handle = tokio::spawn(async move { actor.run_reconnected(retry_msg).await }); + #[cfg(target_arch = "wasm32")] + wasm_bindgen_futures::spawn_local(async move { actor.run_reconnected(retry_msg).await }); + #[cfg(not(target_arch = "wasm32"))] + tokio::spawn(async move { actor.run_reconnected(retry_msg).await }); Self { sender: control_sender, - handle, } } pub fn reconnect( - stream: BoundJabberWriter<Tls>, + stream: BoundJabberWriter, on_crash: oneshot::Sender<(WriteMessage, WriteState)>, stanza_receiver: mpsc::Receiver<WriteMessage>, ) -> Self { let (control_sender, control_receiver) = mpsc::channel(20); let actor = Write::new(stream, stanza_receiver, control_receiver, on_crash); - let handle = tokio::spawn(async move { actor.run().await }); + + #[cfg(target_arch = "wasm32")] + wasm_bindgen_futures::spawn_local(async move { actor.run().await }); + #[cfg(not(target_arch = "wasm32"))] + tokio::spawn(async move { actor.run().await }); Self { sender: control_sender, - handle, } } } |