From b9d75f38743113c054be3d97af36bdd2a7dd0d69 Mon Sep 17 00:00:00 2001 From: cel 🌸 Date: Thu, 17 Apr 2025 11:03:51 +0100 Subject: feat(filamento): compiles on wasm --- lampada/src/connection/read.rs | 55 +++++++++++++++++++++++++----------------- 1 file changed, 33 insertions(+), 22 deletions(-) (limited to 'lampada/src/connection/read.rs') diff --git a/lampada/src/connection/read.rs b/lampada/src/connection/read.rs index 4451d99..591a2cb 100644 --- a/lampada/src/connection/read.rs +++ b/lampada/src/connection/read.rs @@ -16,6 +16,8 @@ use tokio::{ sync::{mpsc, oneshot, Mutex}, task::{JoinHandle, JoinSet}, }; +#[cfg(target_arch = "wasm32")] +use tokio_with_wasm::alias as tokio; use tracing::info; use crate::{Connected, Logic, WriteMessage}; @@ -29,7 +31,7 @@ pub struct Read { disconnect_timedout: Fuse>, // all the threads spawned by the current connection session - tasks: JoinSet<()>, + // tasks: JoinSet<()>, // for handling incoming stanzas // jabber server must be able to both terminate the connection from error, and ask for data from the client (such as supported XEPs) @@ -46,13 +48,13 @@ pub struct Read { pub struct ReadState { pub supervisor_control: SupervisorSender, // TODO: when a stream dies, the iq gets from the server should not be replied to on the new stream - pub tasks: JoinSet<()>, + // pub tasks: JoinSet<()>, } impl Read { fn new( stream: BoundJabberReader, - tasks: JoinSet<()>, + // tasks: JoinSet<()>, connected: Connected, logic: Lgc, supervisor_control: SupervisorSender, @@ -64,7 +66,7 @@ impl Read { stream, disconnecting: false, disconnect_timedout: recv.fuse(), - tasks, + // tasks, connected, logic, supervisor_control, @@ -74,7 +76,12 @@ impl Read { } } -impl Read { +impl Read +where + Lgc: Logic + Clone + 'static, + #[cfg(not(target_arch = "wasm32"))] + Lgc: Send, +{ async fn run(mut self) { println!("started read thread"); // let stanza = self.stream.read::().await; @@ -100,7 +107,7 @@ impl Read { }) }, ReadControl::Abort(sender) => { - let _ = sender.send(ReadState { supervisor_control: self.supervisor_control, tasks: self.tasks }); + let _ = sender.send(ReadState { supervisor_control: self.supervisor_control }); break; }, }; @@ -112,11 +119,11 @@ impl Read { match s { Stanza::Error(error) => { self.logic.clone().handle_stream_error(error).await; - self.supervisor_control.send(SupervisorCommand::Reconnect(ReadState { supervisor_control: self.supervisor_control.clone(), tasks: self.tasks })).await; + self.supervisor_control.send(SupervisorCommand::Reconnect(ReadState { supervisor_control: self.supervisor_control.clone() })).await; break; }, _ => { - self.tasks.spawn(self.logic.clone().handle_stanza(s, self.connected.clone())); + tokio::spawn(self.logic.clone().handle_stanza(s, self.connected.clone())); } }; }, @@ -143,7 +150,7 @@ impl Read { _ => None, }; - let _ = self.on_crash.send((stream_error, ReadState { supervisor_control: self.supervisor_control, tasks: self.tasks })); + let _ = self.on_crash.send((stream_error, ReadState { supervisor_control: self.supervisor_control })); } break; }, @@ -189,27 +196,29 @@ impl DerefMut for ReadControlHandle { } impl ReadControlHandle { - pub fn new( + pub fn new( stream: BoundJabberReader, connected: Connected, logic: Lgc, supervisor_control: SupervisorSender, on_crash: oneshot::Sender<(Option, ReadState)>, - ) -> Self { + ) -> Self + where + Lgc: Logic + Clone + 'static, + #[cfg(not(target_arch = "wasm32"))] + Lgc: Send, + { let (control_sender, control_receiver) = mpsc::channel(20); let actor = Read::new( stream, - JoinSet::new(), + // JoinSet::new(), connected, logic, supervisor_control, control_receiver, on_crash, ); - #[cfg(target_arch = "wasm32")] - wasm_bindgen_futures::spawn_local(async move { actor.run().await }); - #[cfg(not(target_arch = "wasm32"))] tokio::spawn(async move { actor.run().await }); Self { @@ -217,28 +226,30 @@ impl ReadControlHandle { } } - pub fn reconnect( + pub fn reconnect( stream: BoundJabberReader, - tasks: JoinSet<()>, + // tasks: JoinSet<()>, connected: Connected, logic: Lgc, supervisor_control: SupervisorSender, on_crash: oneshot::Sender<(Option, ReadState)>, - ) -> Self { + ) -> Self + where + Lgc: Logic + Clone + 'static, + #[cfg(not(target_arch = "wasm32"))] + Lgc: Send, + { let (control_sender, control_receiver) = mpsc::channel(20); let actor = Read::new( stream, - tasks, + // tasks, connected, logic, supervisor_control, control_receiver, on_crash, ); - #[cfg(target_arch = "wasm32")] - wasm_bindgen_futures::spawn_local(async move { actor.run().await }); - #[cfg(not(target_arch = "wasm32"))] tokio::spawn(async move { actor.run().await }); Self { -- cgit