diff options
Diffstat (limited to 'lampada/src/connection/read.rs')
| -rw-r--r-- | lampada/src/connection/read.rs | 75 |
1 files changed, 48 insertions, 27 deletions
diff --git a/lampada/src/connection/read.rs b/lampada/src/connection/read.rs index 2c7eb58..ebcf66a 100644 --- a/lampada/src/connection/read.rs +++ b/lampada/src/connection/read.rs @@ -1,3 +1,7 @@ +// SPDX-FileCopyrightText: 2025 cel <cel@bunny.garden> +// +// SPDX-License-Identifier: AGPL-3.0-or-later + use std::{ collections::HashMap, marker::PhantomData, @@ -8,7 +12,7 @@ use std::{ }; use futures::{future::Fuse, FutureExt}; -use luz::{connection::Tls, jabber_stream::bound_stream::BoundJabberReader}; +use luz::jabber_stream::bound_stream::BoundJabberReader; use stanza::client::Stanza; use stanza::stream::Error as StreamErrorStanza; use stanza::stream_error::Error as StreamError; @@ -16,6 +20,8 @@ use tokio::{ sync::{mpsc, oneshot, Mutex}, task::{JoinHandle, JoinSet}, }; +#[cfg(target_arch = "wasm32")] +use tokio_with_wasm::alias as tokio; use tracing::info; use crate::{Connected, Logic, WriteMessage}; @@ -24,12 +30,12 @@ use super::{write::WriteHandle, SupervisorCommand, SupervisorSender}; /// read actor pub struct Read<Lgc> { - stream: BoundJabberReader<Tls>, + stream: BoundJabberReader, disconnecting: bool, disconnect_timedout: Fuse<oneshot::Receiver<()>>, // all the threads spawned by the current connection session - tasks: JoinSet<()>, + // tasks: JoinSet<()>, // for handling incoming stanzas // jabber server must be able to both terminate the connection from error, and ask for data from the client (such as supported XEPs) @@ -46,13 +52,13 @@ pub struct Read<Lgc> { pub struct ReadState { pub supervisor_control: SupervisorSender, // TODO: when a stream dies, the iq gets from the server should not be replied to on the new stream - pub tasks: JoinSet<()>, + // pub tasks: JoinSet<()>, } impl<Lgc> Read<Lgc> { fn new( - stream: BoundJabberReader<Tls>, - tasks: JoinSet<()>, + stream: BoundJabberReader, + // tasks: JoinSet<()>, connected: Connected, logic: Lgc, supervisor_control: SupervisorSender, @@ -64,7 +70,7 @@ impl<Lgc> Read<Lgc> { stream, disconnecting: false, disconnect_timedout: recv.fuse(), - tasks, + // tasks, connected, logic, supervisor_control, @@ -74,7 +80,12 @@ impl<Lgc> Read<Lgc> { } } -impl<Lgc: Clone + Logic + Send + 'static> Read<Lgc> { +impl<Lgc> Read<Lgc> +where + Lgc: Logic + Clone + 'static, + #[cfg(not(target_arch = "wasm32"))] + Lgc: Send, +{ async fn run(mut self) { println!("started read thread"); // let stanza = self.stream.read::<Stanza>().await; @@ -100,7 +111,7 @@ impl<Lgc: Clone + Logic + Send + 'static> Read<Lgc> { }) }, ReadControl::Abort(sender) => { - let _ = sender.send(ReadState { supervisor_control: self.supervisor_control, tasks: self.tasks }); + let _ = sender.send(ReadState { supervisor_control: self.supervisor_control }); break; }, }; @@ -112,11 +123,11 @@ impl<Lgc: Clone + Logic + Send + 'static> Read<Lgc> { match s { Stanza::Error(error) => { self.logic.clone().handle_stream_error(error).await; - self.supervisor_control.send(SupervisorCommand::Reconnect(ReadState { supervisor_control: self.supervisor_control.clone(), tasks: self.tasks })).await; + self.supervisor_control.send(SupervisorCommand::Reconnect(ReadState { supervisor_control: self.supervisor_control.clone() })).await; break; }, _ => { - self.tasks.spawn(self.logic.clone().handle_stanza(s, self.connected.clone())); + tokio::spawn(self.logic.clone().handle_stanza(s, self.connected.clone())); } }; }, @@ -129,7 +140,7 @@ impl<Lgc: Clone + Logic + Send + 'static> Read<Lgc> { let stream_error = match e { peanuts::Error::ReadError(error) => None, peanuts::Error::Utf8Error(utf8_error) => Some(StreamError::UnsupportedEncoding), - peanuts::Error::ParseError(_) => Some(StreamError::BadFormat), + peanuts::Error::ParseError(_, _) => Some(StreamError::BadFormat), peanuts::Error::EntityProcessError(_) => Some(StreamError::RestrictedXml), peanuts::Error::InvalidCharRef(char_ref_error) => Some(StreamError::UnsupportedEncoding), peanuts::Error::DuplicateNameSpaceDeclaration(namespace_declaration) => Some(StreamError::NotWellFormed), @@ -140,9 +151,10 @@ impl<Lgc: Clone + Logic + Send + 'static> Read<Lgc> { peanuts::Error::UndeclaredNamespace(_) => Some(StreamError::InvalidNamespace), peanuts::Error::Deserialize(deserialize_error) => Some(StreamError::InvalidXml), peanuts::Error::RootElementEnded => Some(StreamError::InvalidXml), + _ => None, }; - let _ = self.on_crash.send((stream_error, ReadState { supervisor_control: self.supervisor_control, tasks: self.tasks })); + let _ = self.on_crash.send((stream_error, ReadState { supervisor_control: self.supervisor_control })); } break; }, @@ -169,7 +181,8 @@ pub enum ReadControl { pub struct ReadControlHandle { sender: mpsc::Sender<ReadControl>, - pub(crate) handle: JoinHandle<()>, + // TODO: same here + // pub(crate) handle: JoinHandle<()>, } impl Deref for ReadControlHandle { @@ -187,56 +200,64 @@ impl DerefMut for ReadControlHandle { } impl ReadControlHandle { - pub fn new<Lgc: Clone + Logic + Send + 'static>( - stream: BoundJabberReader<Tls>, + pub fn new<Lgc>( + stream: BoundJabberReader, connected: Connected, logic: Lgc, supervisor_control: SupervisorSender, on_crash: oneshot::Sender<(Option<StreamError>, ReadState)>, - ) -> Self { + ) -> Self + where + Lgc: Logic + Clone + 'static, + #[cfg(not(target_arch = "wasm32"))] + Lgc: Send, + { let (control_sender, control_receiver) = mpsc::channel(20); let actor = Read::new( stream, - JoinSet::new(), + // JoinSet::new(), connected, logic, supervisor_control, control_receiver, on_crash, ); - let handle = tokio::spawn(async move { actor.run().await }); + tokio::spawn(async move { actor.run().await }); Self { sender: control_sender, - handle, } } - pub fn reconnect<Lgc: Clone + Logic + Send + 'static>( - stream: BoundJabberReader<Tls>, - tasks: JoinSet<()>, + pub fn reconnect<Lgc>( + stream: BoundJabberReader, + // tasks: JoinSet<()>, connected: Connected, logic: Lgc, supervisor_control: SupervisorSender, on_crash: oneshot::Sender<(Option<StreamError>, ReadState)>, - ) -> Self { + ) -> Self + where + Lgc: Logic + Clone + 'static, + #[cfg(not(target_arch = "wasm32"))] + Lgc: Send, + { let (control_sender, control_receiver) = mpsc::channel(20); let actor = Read::new( stream, - tasks, + // tasks, connected, logic, supervisor_control, control_receiver, on_crash, ); - let handle = tokio::spawn(async move { actor.run().await }); + tokio::spawn(async move { actor.run().await }); Self { sender: control_sender, - handle, } } } |
