aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorLibravatar cel 🌸 <cel@bunny.garden>2025-04-13 16:45:53 +0100
committerLibravatar cel 🌸 <cel@bunny.garden>2025-04-13 16:45:53 +0100
commitcf51dcf052af89f8742d887bde2c93d735309bdd (patch)
treefb7d8b7a313511de845db6cfd186f3b87c472154
parent603777c5f4b3f8248378af6fe5af65b54e0f120e (diff)
downloadluz-wasm.tar.gz
luz-wasm.tar.bz2
luz-wasm.zip
feat(lampada): wasm + websockets supportwasm
-rw-r--r--lampada/Cargo.toml11
-rw-r--r--lampada/src/connection/mod.rs21
-rw-r--r--lampada/src/connection/read.rs28
-rw-r--r--lampada/src/connection/write.rs33
4 files changed, 58 insertions, 35 deletions
diff --git a/lampada/Cargo.toml b/lampada/Cargo.toml
index c68f9c6..22f133d 100644
--- a/lampada/Cargo.toml
+++ b/lampada/Cargo.toml
@@ -7,8 +7,15 @@ edition = "2021"
futures = "0.3.31"
luz = { version = "0.1.0", path = "../luz" }
peanuts = { version = "0.1.0", git = "https://bunny.garden/peanuts" }
-jid = { version = "0.1.0", path = "../jid", features = ["sqlx"] }
+jid = { version = "0.1.0", path = "../jid" }
stanza = { version = "0.1.0", path = "../stanza", features = ["xep_0203"] }
-tokio = "1.42.0"
+tokio = { version = "1.42.0", features = ["macros"] }
tracing = "0.1.41"
thiserror = "2.0.11"
+
+[target.'cfg(target_arch = "wasm32")'.dependencies]
+tokio = { version = "1.42.0", features = ["macros", "rt", "time"] }
+wasm-bindgen-futures = "0.4"
+
+# [target.'cfg(not(target_arch = "wasm32"))'.dependencies]
+# tokio = { version = "1.42.0", features = ["rt-multi-thread"] }
diff --git a/lampada/src/connection/mod.rs b/lampada/src/connection/mod.rs
index 2d570ae..a3dde16 100644
--- a/lampada/src/connection/mod.rs
+++ b/lampada/src/connection/mod.rs
@@ -8,7 +8,7 @@ use std::{
};
use jid::JID;
-use luz::{connection::Tls, jabber_stream::bound_stream::BoundJabberStream};
+use luz::jabber_stream::bound_stream::BoundJabberStream;
use read::{ReadControl, ReadControlHandle, ReadState};
use stanza::{client::Stanza, stream_error::Error as StreamError};
use tokio::{
@@ -86,13 +86,13 @@ impl<Lgc: Logic + Clone + Send + 'static> Supervisor<Lgc> {
info!("sent disconnect command");
tokio::select! {
_ = async { tokio::join!(
- async { let _ = (&mut self.write_control_handle.handle).await; },
- async { let _ = (&mut self.read_control_handle.handle).await; }
+ // async { let _ = (&mut self.write_control_handle.handle).await; },
+ // async { let _ = (&mut self.read_control_handle.handle).await; }
) } => {},
// TODO: config timeout
_ = async { tokio::time::sleep(Duration::from_secs(5)) } => {
- (&mut self.read_control_handle.handle).abort();
- (&mut self.write_control_handle.handle).abort();
+ // (&mut self.read_control_handle.handle).abort();
+ // (&mut self.write_control_handle.handle).abort();
}
}
info!("disconnected");
@@ -257,7 +257,8 @@ impl<Lgc: Logic + Clone + Send + 'static> Supervisor<Lgc> {
pub struct SupervisorHandle {
sender: SupervisorSender,
- handle: JoinHandle<()>,
+ // TODO: is not having handles fine?
+ // handle: JoinHandle<()>,
}
impl Deref for SupervisorHandle {
@@ -295,7 +296,7 @@ impl DerefMut for SupervisorSender {
impl SupervisorHandle {
pub fn new<Lgc: Logic + Clone + Send + 'static>(
- streams: BoundJabberStream<Tls>,
+ streams: BoundJabberStream,
on_crash: oneshot::Sender<()>,
jid: JID,
server: JID,
@@ -341,13 +342,15 @@ impl SupervisorHandle {
logic,
);
- let handle = tokio::spawn(async move { actor.run().await });
+ #[cfg(target_arch = "wasm32")]
+ wasm_bindgen_futures::spawn_local(async move { actor.run().await });
+ #[cfg(not(target_arch = "wasm32"))]
+ tokio::spawn(async move { actor.run().await });
(
write_handle,
Self {
sender: supervisor_sender,
- handle,
},
)
}
diff --git a/lampada/src/connection/read.rs b/lampada/src/connection/read.rs
index 2c7eb58..4451d99 100644
--- a/lampada/src/connection/read.rs
+++ b/lampada/src/connection/read.rs
@@ -8,7 +8,7 @@ use std::{
};
use futures::{future::Fuse, FutureExt};
-use luz::{connection::Tls, jabber_stream::bound_stream::BoundJabberReader};
+use luz::jabber_stream::bound_stream::BoundJabberReader;
use stanza::client::Stanza;
use stanza::stream::Error as StreamErrorStanza;
use stanza::stream_error::Error as StreamError;
@@ -24,7 +24,7 @@ use super::{write::WriteHandle, SupervisorCommand, SupervisorSender};
/// read actor
pub struct Read<Lgc> {
- stream: BoundJabberReader<Tls>,
+ stream: BoundJabberReader,
disconnecting: bool,
disconnect_timedout: Fuse<oneshot::Receiver<()>>,
@@ -51,7 +51,7 @@ pub struct ReadState {
impl<Lgc> Read<Lgc> {
fn new(
- stream: BoundJabberReader<Tls>,
+ stream: BoundJabberReader,
tasks: JoinSet<()>,
connected: Connected,
logic: Lgc,
@@ -129,7 +129,7 @@ impl<Lgc: Clone + Logic + Send + 'static> Read<Lgc> {
let stream_error = match e {
peanuts::Error::ReadError(error) => None,
peanuts::Error::Utf8Error(utf8_error) => Some(StreamError::UnsupportedEncoding),
- peanuts::Error::ParseError(_) => Some(StreamError::BadFormat),
+ peanuts::Error::ParseError(_, _) => Some(StreamError::BadFormat),
peanuts::Error::EntityProcessError(_) => Some(StreamError::RestrictedXml),
peanuts::Error::InvalidCharRef(char_ref_error) => Some(StreamError::UnsupportedEncoding),
peanuts::Error::DuplicateNameSpaceDeclaration(namespace_declaration) => Some(StreamError::NotWellFormed),
@@ -140,6 +140,7 @@ impl<Lgc: Clone + Logic + Send + 'static> Read<Lgc> {
peanuts::Error::UndeclaredNamespace(_) => Some(StreamError::InvalidNamespace),
peanuts::Error::Deserialize(deserialize_error) => Some(StreamError::InvalidXml),
peanuts::Error::RootElementEnded => Some(StreamError::InvalidXml),
+ _ => None,
};
let _ = self.on_crash.send((stream_error, ReadState { supervisor_control: self.supervisor_control, tasks: self.tasks }));
@@ -169,7 +170,8 @@ pub enum ReadControl {
pub struct ReadControlHandle {
sender: mpsc::Sender<ReadControl>,
- pub(crate) handle: JoinHandle<()>,
+ // TODO: same here
+ // pub(crate) handle: JoinHandle<()>,
}
impl Deref for ReadControlHandle {
@@ -188,7 +190,7 @@ impl DerefMut for ReadControlHandle {
impl ReadControlHandle {
pub fn new<Lgc: Clone + Logic + Send + 'static>(
- stream: BoundJabberReader<Tls>,
+ stream: BoundJabberReader,
connected: Connected,
logic: Lgc,
supervisor_control: SupervisorSender,
@@ -205,16 +207,18 @@ impl ReadControlHandle {
control_receiver,
on_crash,
);
- let handle = tokio::spawn(async move { actor.run().await });
+ #[cfg(target_arch = "wasm32")]
+ wasm_bindgen_futures::spawn_local(async move { actor.run().await });
+ #[cfg(not(target_arch = "wasm32"))]
+ tokio::spawn(async move { actor.run().await });
Self {
sender: control_sender,
- handle,
}
}
pub fn reconnect<Lgc: Clone + Logic + Send + 'static>(
- stream: BoundJabberReader<Tls>,
+ stream: BoundJabberReader,
tasks: JoinSet<()>,
connected: Connected,
logic: Lgc,
@@ -232,11 +236,13 @@ impl ReadControlHandle {
control_receiver,
on_crash,
);
- let handle = tokio::spawn(async move { actor.run().await });
+ #[cfg(target_arch = "wasm32")]
+ wasm_bindgen_futures::spawn_local(async move { actor.run().await });
+ #[cfg(not(target_arch = "wasm32"))]
+ tokio::spawn(async move { actor.run().await });
Self {
sender: control_sender,
- handle,
}
}
}
diff --git a/lampada/src/connection/write.rs b/lampada/src/connection/write.rs
index 1070cdf..4c6ed24 100644
--- a/lampada/src/connection/write.rs
+++ b/lampada/src/connection/write.rs
@@ -1,6 +1,6 @@
use std::ops::{Deref, DerefMut};
-use luz::{connection::Tls, jabber_stream::bound_stream::BoundJabberWriter};
+use luz::jabber_stream::bound_stream::BoundJabberWriter;
use stanza::{
client::Stanza, stream::Error as StreamErrorStanza, stream_error::Error as StreamError,
};
@@ -13,7 +13,7 @@ use crate::error::WriteError;
/// actor that receives jabber stanzas to write, and if there is an error, sends a message back to the supervisor then aborts, so the supervisor can spawn a new stream.
pub struct Write {
- stream: BoundJabberWriter<Tls>,
+ stream: BoundJabberWriter,
/// connection session write queue
stanza_receiver: mpsc::Receiver<WriteMessage>,
@@ -41,7 +41,7 @@ pub enum WriteControl {
impl Write {
fn new(
- stream: BoundJabberWriter<Tls>,
+ stream: BoundJabberWriter,
stanza_receiver: mpsc::Receiver<WriteMessage>,
control_receiver: mpsc::Receiver<WriteControl>,
on_crash: oneshot::Sender<(WriteMessage, WriteState)>,
@@ -192,7 +192,7 @@ impl DerefMut for WriteHandle {
pub struct WriteControlHandle {
sender: mpsc::Sender<WriteControl>,
- pub(crate) handle: JoinHandle<()>,
+ // pub(crate) handle: JoinHandle<()>,
}
impl Deref for WriteControlHandle {
@@ -211,14 +211,17 @@ impl DerefMut for WriteControlHandle {
impl WriteControlHandle {
pub fn new(
- stream: BoundJabberWriter<Tls>,
+ stream: BoundJabberWriter,
on_crash: oneshot::Sender<(WriteMessage, WriteState)>,
) -> (WriteHandle, Self) {
let (control_sender, control_receiver) = mpsc::channel(20);
let (stanza_sender, stanza_receiver) = mpsc::channel(20);
let actor = Write::new(stream, stanza_receiver, control_receiver, on_crash);
- let handle = tokio::spawn(async move { actor.run().await });
+ #[cfg(target_arch = "wasm32")]
+ wasm_bindgen_futures::spawn_local(async move { actor.run().await });
+ #[cfg(not(target_arch = "wasm32"))]
+ tokio::spawn(async move { actor.run().await });
(
WriteHandle {
@@ -226,13 +229,12 @@ impl WriteControlHandle {
},
Self {
sender: control_sender,
- handle,
},
)
}
pub fn reconnect_retry(
- stream: BoundJabberWriter<Tls>,
+ stream: BoundJabberWriter,
on_crash: oneshot::Sender<(WriteMessage, WriteState)>,
stanza_receiver: mpsc::Receiver<WriteMessage>,
retry_msg: WriteMessage,
@@ -240,27 +242,32 @@ impl WriteControlHandle {
let (control_sender, control_receiver) = mpsc::channel(20);
let actor = Write::new(stream, stanza_receiver, control_receiver, on_crash);
- let handle = tokio::spawn(async move { actor.run_reconnected(retry_msg).await });
+ #[cfg(target_arch = "wasm32")]
+ wasm_bindgen_futures::spawn_local(async move { actor.run_reconnected(retry_msg).await });
+ #[cfg(not(target_arch = "wasm32"))]
+ tokio::spawn(async move { actor.run_reconnected(retry_msg).await });
Self {
sender: control_sender,
- handle,
}
}
pub fn reconnect(
- stream: BoundJabberWriter<Tls>,
+ stream: BoundJabberWriter,
on_crash: oneshot::Sender<(WriteMessage, WriteState)>,
stanza_receiver: mpsc::Receiver<WriteMessage>,
) -> Self {
let (control_sender, control_receiver) = mpsc::channel(20);
let actor = Write::new(stream, stanza_receiver, control_receiver, on_crash);
- let handle = tokio::spawn(async move { actor.run().await });
+
+ #[cfg(target_arch = "wasm32")]
+ wasm_bindgen_futures::spawn_local(async move { actor.run().await });
+ #[cfg(not(target_arch = "wasm32"))]
+ tokio::spawn(async move { actor.run().await });
Self {
sender: control_sender,
- handle,
}
}
}