From b9d75f38743113c054be3d97af36bdd2a7dd0d69 Mon Sep 17 00:00:00 2001 From: cel 🌸 Date: Thu, 17 Apr 2025 11:03:51 +0100 Subject: feat(filamento): compiles on wasm --- lampada/Cargo.toml | 6 ++-- lampada/src/connection/mod.rs | 27 ++++++++++----- lampada/src/connection/read.rs | 55 ++++++++++++++++++------------ lampada/src/connection/write.rs | 11 ++---- lampada/src/error.rs | 11 +++--- lampada/src/lib.rs | 74 +++++++++++++++++++++++++++++++++++++---- 6 files changed, 130 insertions(+), 54 deletions(-) (limited to 'lampada') diff --git a/lampada/Cargo.toml b/lampada/Cargo.toml index 22f133d..090aa9f 100644 --- a/lampada/Cargo.toml +++ b/lampada/Cargo.toml @@ -9,13 +9,13 @@ luz = { version = "0.1.0", path = "../luz" } peanuts = { version = "0.1.0", git = "https://bunny.garden/peanuts" } jid = { version = "0.1.0", path = "../jid" } stanza = { version = "0.1.0", path = "../stanza", features = ["xep_0203"] } -tokio = { version = "1.42.0", features = ["macros"] } +tokio = { workspace = true, features = ["macros", "sync"] } tracing = "0.1.41" thiserror = "2.0.11" [target.'cfg(target_arch = "wasm32")'.dependencies] -tokio = { version = "1.42.0", features = ["macros", "rt", "time"] } -wasm-bindgen-futures = "0.4" +tokio = { workspace = true, features = ["macros", "rt", "time", "sync"] } +tokio_with_wasm = { version = "0.8.2", features = ["macros", "rt", "time", "sync"] } # [target.'cfg(not(target_arch = "wasm32"))'.dependencies] # tokio = { version = "1.42.0", features = ["rt-multi-thread"] } diff --git a/lampada/src/connection/mod.rs b/lampada/src/connection/mod.rs index a3dde16..3a3187f 100644 --- a/lampada/src/connection/mod.rs +++ b/lampada/src/connection/mod.rs @@ -15,6 +15,8 @@ use tokio::{ sync::{mpsc, oneshot, Mutex}, task::{JoinHandle, JoinSet}, }; +#[cfg(target_arch = "wasm32")] +use tokio_with_wasm::alias as tokio; use tracing::info; use write::{WriteControl, WriteControlHandle, WriteHandle, WriteMessage, WriteState}; @@ -46,7 +48,12 @@ pub enum SupervisorCommand { Reconnect(ReadState), } -impl Supervisor { +impl Supervisor +where + Lgc: Logic + Clone + 'static, + #[cfg(not(target_arch = "wasm32"))] + Lgc: Send, +{ fn new( command_recv: mpsc::Receiver, reader_crash: oneshot::Receiver<(Option, ReadState)>, @@ -129,7 +136,7 @@ impl Supervisor { self.reader_crash = recv; self.read_control_handle = ReadControlHandle::reconnect( read, - read_state.tasks, + // read_state.tasks, self.connected.clone(), self.logic.clone(), read_state.supervisor_control, @@ -177,7 +184,7 @@ impl Supervisor { self.reader_crash = recv; self.read_control_handle = ReadControlHandle::reconnect( read, - read_state.tasks, + // read_state.tasks, self.connected.clone(), self.logic.clone(), read_state.supervisor_control, @@ -225,7 +232,7 @@ impl Supervisor { self.reader_crash = recv; self.read_control_handle = ReadControlHandle::reconnect( read, - read_state.tasks, + // read_state.tasks, self.connected.clone(), self.logic.clone(), read_state.supervisor_control, @@ -295,14 +302,19 @@ impl DerefMut for SupervisorSender { } impl SupervisorHandle { - pub fn new( + pub fn new( streams: BoundJabberStream, on_crash: oneshot::Sender<()>, jid: JID, server: JID, password: Arc, logic: Lgc, - ) -> (WriteHandle, Self) { + ) -> (WriteHandle, Self) + where + Lgc: Logic + Clone + 'static, + #[cfg(not(target_arch = "wasm32"))] + Lgc: Send, + { let (command_send, command_recv) = mpsc::channel(20); let (writer_crash_send, writer_crash_recv) = oneshot::channel(); let (reader_crash_send, reader_crash_recv) = oneshot::channel(); @@ -342,9 +354,6 @@ impl SupervisorHandle { logic, ); - #[cfg(target_arch = "wasm32")] - wasm_bindgen_futures::spawn_local(async move { actor.run().await }); - #[cfg(not(target_arch = "wasm32"))] tokio::spawn(async move { actor.run().await }); ( diff --git a/lampada/src/connection/read.rs b/lampada/src/connection/read.rs index 4451d99..591a2cb 100644 --- a/lampada/src/connection/read.rs +++ b/lampada/src/connection/read.rs @@ -16,6 +16,8 @@ use tokio::{ sync::{mpsc, oneshot, Mutex}, task::{JoinHandle, JoinSet}, }; +#[cfg(target_arch = "wasm32")] +use tokio_with_wasm::alias as tokio; use tracing::info; use crate::{Connected, Logic, WriteMessage}; @@ -29,7 +31,7 @@ pub struct Read { disconnect_timedout: Fuse>, // all the threads spawned by the current connection session - tasks: JoinSet<()>, + // tasks: JoinSet<()>, // for handling incoming stanzas // jabber server must be able to both terminate the connection from error, and ask for data from the client (such as supported XEPs) @@ -46,13 +48,13 @@ pub struct Read { pub struct ReadState { pub supervisor_control: SupervisorSender, // TODO: when a stream dies, the iq gets from the server should not be replied to on the new stream - pub tasks: JoinSet<()>, + // pub tasks: JoinSet<()>, } impl Read { fn new( stream: BoundJabberReader, - tasks: JoinSet<()>, + // tasks: JoinSet<()>, connected: Connected, logic: Lgc, supervisor_control: SupervisorSender, @@ -64,7 +66,7 @@ impl Read { stream, disconnecting: false, disconnect_timedout: recv.fuse(), - tasks, + // tasks, connected, logic, supervisor_control, @@ -74,7 +76,12 @@ impl Read { } } -impl Read { +impl Read +where + Lgc: Logic + Clone + 'static, + #[cfg(not(target_arch = "wasm32"))] + Lgc: Send, +{ async fn run(mut self) { println!("started read thread"); // let stanza = self.stream.read::().await; @@ -100,7 +107,7 @@ impl Read { }) }, ReadControl::Abort(sender) => { - let _ = sender.send(ReadState { supervisor_control: self.supervisor_control, tasks: self.tasks }); + let _ = sender.send(ReadState { supervisor_control: self.supervisor_control }); break; }, }; @@ -112,11 +119,11 @@ impl Read { match s { Stanza::Error(error) => { self.logic.clone().handle_stream_error(error).await; - self.supervisor_control.send(SupervisorCommand::Reconnect(ReadState { supervisor_control: self.supervisor_control.clone(), tasks: self.tasks })).await; + self.supervisor_control.send(SupervisorCommand::Reconnect(ReadState { supervisor_control: self.supervisor_control.clone() })).await; break; }, _ => { - self.tasks.spawn(self.logic.clone().handle_stanza(s, self.connected.clone())); + tokio::spawn(self.logic.clone().handle_stanza(s, self.connected.clone())); } }; }, @@ -143,7 +150,7 @@ impl Read { _ => None, }; - let _ = self.on_crash.send((stream_error, ReadState { supervisor_control: self.supervisor_control, tasks: self.tasks })); + let _ = self.on_crash.send((stream_error, ReadState { supervisor_control: self.supervisor_control })); } break; }, @@ -189,27 +196,29 @@ impl DerefMut for ReadControlHandle { } impl ReadControlHandle { - pub fn new( + pub fn new( stream: BoundJabberReader, connected: Connected, logic: Lgc, supervisor_control: SupervisorSender, on_crash: oneshot::Sender<(Option, ReadState)>, - ) -> Self { + ) -> Self + where + Lgc: Logic + Clone + 'static, + #[cfg(not(target_arch = "wasm32"))] + Lgc: Send, + { let (control_sender, control_receiver) = mpsc::channel(20); let actor = Read::new( stream, - JoinSet::new(), + // JoinSet::new(), connected, logic, supervisor_control, control_receiver, on_crash, ); - #[cfg(target_arch = "wasm32")] - wasm_bindgen_futures::spawn_local(async move { actor.run().await }); - #[cfg(not(target_arch = "wasm32"))] tokio::spawn(async move { actor.run().await }); Self { @@ -217,28 +226,30 @@ impl ReadControlHandle { } } - pub fn reconnect( + pub fn reconnect( stream: BoundJabberReader, - tasks: JoinSet<()>, + // tasks: JoinSet<()>, connected: Connected, logic: Lgc, supervisor_control: SupervisorSender, on_crash: oneshot::Sender<(Option, ReadState)>, - ) -> Self { + ) -> Self + where + Lgc: Logic + Clone + 'static, + #[cfg(not(target_arch = "wasm32"))] + Lgc: Send, + { let (control_sender, control_receiver) = mpsc::channel(20); let actor = Read::new( stream, - tasks, + // tasks, connected, logic, supervisor_control, control_receiver, on_crash, ); - #[cfg(target_arch = "wasm32")] - wasm_bindgen_futures::spawn_local(async move { actor.run().await }); - #[cfg(not(target_arch = "wasm32"))] tokio::spawn(async move { actor.run().await }); Self { diff --git a/lampada/src/connection/write.rs b/lampada/src/connection/write.rs index 4c6ed24..b982eea 100644 --- a/lampada/src/connection/write.rs +++ b/lampada/src/connection/write.rs @@ -8,6 +8,8 @@ use tokio::{ sync::{mpsc, oneshot}, task::JoinHandle, }; +#[cfg(target_arch = "wasm32")] +use tokio_with_wasm::alias as tokio; use crate::error::WriteError; @@ -218,9 +220,6 @@ impl WriteControlHandle { let (stanza_sender, stanza_receiver) = mpsc::channel(20); let actor = Write::new(stream, stanza_receiver, control_receiver, on_crash); - #[cfg(target_arch = "wasm32")] - wasm_bindgen_futures::spawn_local(async move { actor.run().await }); - #[cfg(not(target_arch = "wasm32"))] tokio::spawn(async move { actor.run().await }); ( @@ -242,9 +241,6 @@ impl WriteControlHandle { let (control_sender, control_receiver) = mpsc::channel(20); let actor = Write::new(stream, stanza_receiver, control_receiver, on_crash); - #[cfg(target_arch = "wasm32")] - wasm_bindgen_futures::spawn_local(async move { actor.run_reconnected(retry_msg).await }); - #[cfg(not(target_arch = "wasm32"))] tokio::spawn(async move { actor.run_reconnected(retry_msg).await }); Self { @@ -261,9 +257,6 @@ impl WriteControlHandle { let actor = Write::new(stream, stanza_receiver, control_receiver, on_crash); - #[cfg(target_arch = "wasm32")] - wasm_bindgen_futures::spawn_local(async move { actor.run().await }); - #[cfg(not(target_arch = "wasm32"))] tokio::spawn(async move { actor.run().await }); Self { diff --git a/lampada/src/error.rs b/lampada/src/error.rs index 8104155..40be012 100644 --- a/lampada/src/error.rs +++ b/lampada/src/error.rs @@ -1,11 +1,14 @@ use std::sync::Arc; +#[cfg(not(target_arch = "wasm32"))] +use ::tokio::time::error::Elapsed; use stanza::client::Stanza; use thiserror::Error; -use tokio::{ - sync::{mpsc::error::SendError, oneshot::error::RecvError}, - time::error::Elapsed, -}; +use tokio::sync::{mpsc::error::SendError, oneshot::error::RecvError}; +#[cfg(target_arch = "wasm32")] +use tokio::time::Elapsed; +#[cfg(target_arch = "wasm32")] +use tokio_with_wasm::alias as tokio; #[derive(Debug, Error, Clone)] pub enum ConnectionError { diff --git a/lampada/src/lib.rs b/lampada/src/lib.rs index 7346c42..dacc56d 100644 --- a/lampada/src/lib.rs +++ b/lampada/src/lib.rs @@ -1,3 +1,5 @@ +#![feature(where_clause_attrs)] + use std::{ collections::HashMap, ops::{Deref, DerefMut}, @@ -21,6 +23,8 @@ use tokio::{ task::JoinSet, time::timeout, }; +#[cfg(target_arch = "wasm32")] +use tokio_with_wasm::alias as tokio; use tracing::{debug, info}; use crate::connection::write::WriteHandle; @@ -58,20 +62,24 @@ pub trait Logic { type Cmd; /// run after binding to the stream (e.g. for a chat client, ) + #[cfg(not(target_arch = "wasm32"))] fn handle_connect(self, connection: Connected) -> impl std::future::Future + Send; /// run before closing the stream (e.g. send unavailable presence in a chat client) + #[cfg(not(target_arch = "wasm32"))] fn handle_disconnect( self, connection: Connected, ) -> impl std::future::Future + Send; + #[cfg(not(target_arch = "wasm32"))] fn handle_stream_error( self, stream_error: StreamError, ) -> impl std::future::Future + Send; /// run to handle an incoming xmpp stanza + #[cfg(not(target_arch = "wasm32"))] fn handle_stanza( self, stanza: Stanza, @@ -79,6 +87,7 @@ pub trait Logic { ) -> impl std::future::Future + std::marker::Send; /// run to handle a command message when a connection is currently established + #[cfg(not(target_arch = "wasm32"))] fn handle_online( self, command: Self::Cmd, @@ -86,21 +95,67 @@ pub trait Logic { ) -> impl std::future::Future + std::marker::Send; /// run to handle a command message when disconnected + #[cfg(not(target_arch = "wasm32"))] fn handle_offline( self, command: Self::Cmd, ) -> impl std::future::Future + std::marker::Send; /// run as cleanup after either an abort or a disconnect (e.g. reply to all pending requests with a disconnected error) + #[cfg(not(target_arch = "wasm32"))] fn on_abort(self) -> impl std::future::Future + std::marker::Send; /// handle connection errors from the core client logic + #[cfg(not(target_arch = "wasm32"))] fn handle_connection_error( self, error: ConnectionError, ) -> impl std::future::Future + std::marker::Send; // async fn handle_stream_error(self, error) {} + #[cfg(target_arch = "wasm32")] + fn handle_connect(self, connection: Connected) -> impl std::future::Future; + + /// run before closing the stream (e.g. send unavailable presence in a chat client) + #[cfg(target_arch = "wasm32")] + fn handle_disconnect(self, connection: Connected) -> impl std::future::Future; + + #[cfg(target_arch = "wasm32")] + fn handle_stream_error( + self, + stream_error: StreamError, + ) -> impl std::future::Future; + + /// run to handle an incoming xmpp stanza + #[cfg(target_arch = "wasm32")] + fn handle_stanza( + self, + stanza: Stanza, + connection: Connected, + ) -> impl std::future::Future; + + /// run to handle a command message when a connection is currently established + #[cfg(target_arch = "wasm32")] + fn handle_online( + self, + command: Self::Cmd, + connection: Connected, + ) -> impl std::future::Future; + + /// run to handle a command message when disconnected + #[cfg(target_arch = "wasm32")] + fn handle_offline(self, command: Self::Cmd) -> impl std::future::Future; + + /// run as cleanup after either an abort or a disconnect (e.g. reply to all pending requests with a disconnected error) + #[cfg(target_arch = "wasm32")] + fn on_abort(self) -> impl std::future::Future; + + /// handle connection errors from the core client logic + #[cfg(target_arch = "wasm32")] + fn handle_connection_error( + self, + error: ConnectionError, + ) -> impl std::future::Future; } /// an actor that implements xmpp core (rfc6120), manages connection/stream status, and delegates any other logic to the generic which implements Logic, allowing different kinds of clients (e.g. chat, social, pubsub) to be built upon the same core @@ -117,10 +172,15 @@ pub struct CoreClient { logic: Lgc, // config: LampConfig, // TODO: will grow forever at this point, maybe not required as tasks will naturally shut down anyway? - tasks: JoinSet<()>, + // tasks: JoinSet<()>, } -impl CoreClient { +impl CoreClient +where + Lgc: Logic + Clone + 'static, + #[cfg(not(target_arch = "wasm32"))] + Lgc: Send, +{ /// create a new actor pub fn new( jid: JID, @@ -137,7 +197,7 @@ impl CoreClient { receiver, connection_supervisor_shutdown, logic, - tasks: JoinSet::new(), + // tasks: JoinSet::new(), } } @@ -240,10 +300,10 @@ impl CoreClient { }, CoreClientCommand::Command(command) => { match self.connected.as_ref() { - Some((w, s)) => self - .tasks - .spawn(self.logic.clone().handle_online(command, w.clone())), - None => self.tasks.spawn(self.logic.clone().handle_offline(command)), + Some((w, s)) => { + tokio::spawn(self.logic.clone().handle_online(command, w.clone())) + } + None => tokio::spawn(self.logic.clone().handle_offline(command)), }; } } -- cgit