diff options
Diffstat (limited to 'lampada/src')
-rw-r--r-- | lampada/src/connection/mod.rs | 69 | ||||
-rw-r--r-- | lampada/src/connection/read.rs | 78 | ||||
-rw-r--r-- | lampada/src/connection/write.rs | 26 | ||||
-rw-r--r-- | lampada/src/error.rs | 15 | ||||
-rw-r--r-- | lampada/src/lib.rs | 120 |
5 files changed, 190 insertions, 118 deletions
diff --git a/lampada/src/connection/mod.rs b/lampada/src/connection/mod.rs index 2d570ae..51c3758 100644 --- a/lampada/src/connection/mod.rs +++ b/lampada/src/connection/mod.rs @@ -7,14 +7,16 @@ use std::{ time::Duration, }; -use jid::JID; -use luz::{connection::Tls, jabber_stream::bound_stream::BoundJabberStream}; +use jid::{BareJID, FullJID, JID}; +use luz::jabber_stream::bound_stream::BoundJabberStream; use read::{ReadControl, ReadControlHandle, ReadState}; use stanza::{client::Stanza, stream_error::Error as StreamError}; use tokio::{ sync::{mpsc, oneshot, Mutex}, task::{JoinHandle, JoinSet}, }; +#[cfg(target_arch = "wasm32")] +use tokio_with_wasm::alias as tokio; use tracing::info; use write::{WriteControl, WriteControlHandle, WriteHandle, WriteMessage, WriteState}; @@ -46,7 +48,12 @@ pub enum SupervisorCommand { Reconnect(ReadState), } -impl<Lgc: Logic + Clone + Send + 'static> Supervisor<Lgc> { +impl<Lgc> Supervisor<Lgc> +where + Lgc: Logic + Clone + 'static, + #[cfg(not(target_arch = "wasm32"))] + Lgc: Send, +{ fn new( command_recv: mpsc::Receiver<SupervisorCommand>, reader_crash: oneshot::Receiver<(Option<StreamError>, ReadState)>, @@ -86,13 +93,13 @@ impl<Lgc: Logic + Clone + Send + 'static> Supervisor<Lgc> { info!("sent disconnect command"); tokio::select! { _ = async { tokio::join!( - async { let _ = (&mut self.write_control_handle.handle).await; }, - async { let _ = (&mut self.read_control_handle.handle).await; } + // async { let _ = (&mut self.write_control_handle.handle).await; }, + // async { let _ = (&mut self.read_control_handle.handle).await; } ) } => {}, // TODO: config timeout _ = async { tokio::time::sleep(Duration::from_secs(5)) } => { - (&mut self.read_control_handle.handle).abort(); - (&mut self.write_control_handle.handle).abort(); + // (&mut self.read_control_handle.handle).abort(); + // (&mut self.write_control_handle.handle).abort(); } } info!("disconnected"); @@ -114,12 +121,11 @@ impl<Lgc: Logic + Clone + Send + 'static> Supervisor<Lgc> { break } - let mut jid = self.connected.jid.clone(); - let mut domain = jid.domainpart.clone(); + let mut jid = self.connected.jid.clone().into(); // TODO: make sure connect_and_login does not modify the jid, but instead returns a jid. or something like that - let connection = luz::connect_and_login(&mut jid, &*self.password, &mut domain).await; + let connection = luz::connect_and_login(&jid, &*self.password).await; match connection { - Ok(c) => { + Ok((c, full_jid)) => { let (read, write) = c.split(); let (send, recv) = oneshot::channel(); self.writer_crash = recv; @@ -129,7 +135,7 @@ impl<Lgc: Logic + Clone + Send + 'static> Supervisor<Lgc> { self.reader_crash = recv; self.read_control_handle = ReadControlHandle::reconnect( read, - read_state.tasks, + // read_state.tasks, self.connected.clone(), self.logic.clone(), read_state.supervisor_control, @@ -162,12 +168,11 @@ impl<Lgc: Logic + Clone + Send + 'static> Supervisor<Lgc> { else => break, }; - let mut jid = self.connected.jid.clone(); - let mut domain = jid.domainpart.clone(); + let mut jid = self.connected.jid.clone().into(); // TODO: same here - let connection = luz::connect_and_login(&mut jid, &*self.password, &mut domain).await; + let connection = luz::connect_and_login(&jid, &*self.password).await; match connection { - Ok(c) => { + Ok((c, full_jid)) => { let (read, write) = c.split(); let (send, recv) = oneshot::channel(); self.writer_crash = recv; @@ -177,7 +182,7 @@ impl<Lgc: Logic + Clone + Send + 'static> Supervisor<Lgc> { self.reader_crash = recv; self.read_control_handle = ReadControlHandle::reconnect( read, - read_state.tasks, + // read_state.tasks, self.connected.clone(), self.logic.clone(), read_state.supervisor_control, @@ -207,11 +212,10 @@ impl<Lgc: Logic + Clone + Send + 'static> Supervisor<Lgc> { else => break, }; - let mut jid = self.connected.jid.clone(); - let mut domain = jid.domainpart.clone(); - let connection = luz::connect_and_login(&mut jid, &*self.password, &mut domain).await; + let mut jid = self.connected.jid.clone().into(); + let connection = luz::connect_and_login(&jid, &*self.password).await; match connection { - Ok(c) => { + Ok((c, full_jid)) => { let (read, write) = c.split(); let (send, recv) = oneshot::channel(); self.writer_crash = recv; @@ -225,7 +229,7 @@ impl<Lgc: Logic + Clone + Send + 'static> Supervisor<Lgc> { self.reader_crash = recv; self.read_control_handle = ReadControlHandle::reconnect( read, - read_state.tasks, + // read_state.tasks, self.connected.clone(), self.logic.clone(), read_state.supervisor_control, @@ -257,7 +261,8 @@ impl<Lgc: Logic + Clone + Send + 'static> Supervisor<Lgc> { pub struct SupervisorHandle { sender: SupervisorSender, - handle: JoinHandle<()>, + // TODO: is not having handles fine? + // handle: JoinHandle<()>, } impl Deref for SupervisorHandle { @@ -294,14 +299,18 @@ impl DerefMut for SupervisorSender { } impl SupervisorHandle { - pub fn new<Lgc: Logic + Clone + Send + 'static>( - streams: BoundJabberStream<Tls>, + pub fn new<Lgc>( + streams: BoundJabberStream, on_crash: oneshot::Sender<()>, - jid: JID, - server: JID, + jid: FullJID, password: Arc<String>, logic: Lgc, - ) -> (WriteHandle, Self) { + ) -> (WriteHandle, Self) + where + Lgc: Logic + Clone + 'static, + #[cfg(not(target_arch = "wasm32"))] + Lgc: Send, + { let (command_send, command_recv) = mpsc::channel(20); let (writer_crash_send, writer_crash_recv) = oneshot::channel(); let (reader_crash_send, reader_crash_recv) = oneshot::channel(); @@ -314,7 +323,6 @@ impl SupervisorHandle { let connected = Connected { jid, write_handle: write_handle.clone(), - server, }; let supervisor_sender = SupervisorSender { @@ -341,13 +349,12 @@ impl SupervisorHandle { logic, ); - let handle = tokio::spawn(async move { actor.run().await }); + tokio::spawn(async move { actor.run().await }); ( write_handle, Self { sender: supervisor_sender, - handle, }, ) } diff --git a/lampada/src/connection/read.rs b/lampada/src/connection/read.rs index 640ca8e..591a2cb 100644 --- a/lampada/src/connection/read.rs +++ b/lampada/src/connection/read.rs @@ -7,7 +7,8 @@ use std::{ time::Duration, }; -use luz::{connection::Tls, jabber_stream::bound_stream::BoundJabberReader}; +use futures::{future::Fuse, FutureExt}; +use luz::jabber_stream::bound_stream::BoundJabberReader; use stanza::client::Stanza; use stanza::stream::Error as StreamErrorStanza; use stanza::stream_error::Error as StreamError; @@ -15,6 +16,8 @@ use tokio::{ sync::{mpsc, oneshot, Mutex}, task::{JoinHandle, JoinSet}, }; +#[cfg(target_arch = "wasm32")] +use tokio_with_wasm::alias as tokio; use tracing::info; use crate::{Connected, Logic, WriteMessage}; @@ -23,12 +26,12 @@ use super::{write::WriteHandle, SupervisorCommand, SupervisorSender}; /// read actor pub struct Read<Lgc> { - stream: BoundJabberReader<Tls>, + stream: BoundJabberReader, disconnecting: bool, - disconnect_timedout: oneshot::Receiver<()>, + disconnect_timedout: Fuse<oneshot::Receiver<()>>, // all the threads spawned by the current connection session - tasks: JoinSet<()>, + // tasks: JoinSet<()>, // for handling incoming stanzas // jabber server must be able to both terminate the connection from error, and ask for data from the client (such as supported XEPs) @@ -45,13 +48,13 @@ pub struct Read<Lgc> { pub struct ReadState { pub supervisor_control: SupervisorSender, // TODO: when a stream dies, the iq gets from the server should not be replied to on the new stream - pub tasks: JoinSet<()>, + // pub tasks: JoinSet<()>, } impl<Lgc> Read<Lgc> { fn new( - stream: BoundJabberReader<Tls>, - tasks: JoinSet<()>, + stream: BoundJabberReader, + // tasks: JoinSet<()>, connected: Connected, logic: Lgc, supervisor_control: SupervisorSender, @@ -62,8 +65,8 @@ impl<Lgc> Read<Lgc> { Self { stream, disconnecting: false, - disconnect_timedout: recv, - tasks, + disconnect_timedout: recv.fuse(), + // tasks, connected, logic, supervisor_control, @@ -73,7 +76,12 @@ impl<Lgc> Read<Lgc> { } } -impl<Lgc: Clone + Logic + Send + 'static> Read<Lgc> { +impl<Lgc> Read<Lgc> +where + Lgc: Logic + Clone + 'static, + #[cfg(not(target_arch = "wasm32"))] + Lgc: Send, +{ async fn run(mut self) { println!("started read thread"); // let stanza = self.stream.read::<Stanza>().await; @@ -91,7 +99,7 @@ impl<Lgc: Clone + Logic + Send + 'static> Read<Lgc> { // when disconnect received, ReadControl::Disconnect => { let (send, recv) = oneshot::channel(); - self.disconnect_timedout = recv; + self.disconnect_timedout = recv.fuse(); self.disconnecting = true; tokio::spawn(async { tokio::time::sleep(Duration::from_secs(10)).await; @@ -99,7 +107,7 @@ impl<Lgc: Clone + Logic + Send + 'static> Read<Lgc> { }) }, ReadControl::Abort(sender) => { - let _ = sender.send(ReadState { supervisor_control: self.supervisor_control, tasks: self.tasks }); + let _ = sender.send(ReadState { supervisor_control: self.supervisor_control }); break; }, }; @@ -111,11 +119,11 @@ impl<Lgc: Clone + Logic + Send + 'static> Read<Lgc> { match s { Stanza::Error(error) => { self.logic.clone().handle_stream_error(error).await; - self.supervisor_control.send(SupervisorCommand::Reconnect(ReadState { supervisor_control: self.supervisor_control.clone(), tasks: self.tasks })).await; + self.supervisor_control.send(SupervisorCommand::Reconnect(ReadState { supervisor_control: self.supervisor_control.clone() })).await; break; }, _ => { - self.tasks.spawn(self.logic.clone().handle_stanza(s, self.connected.clone())); + tokio::spawn(self.logic.clone().handle_stanza(s, self.connected.clone())); } }; }, @@ -128,7 +136,7 @@ impl<Lgc: Clone + Logic + Send + 'static> Read<Lgc> { let stream_error = match e { peanuts::Error::ReadError(error) => None, peanuts::Error::Utf8Error(utf8_error) => Some(StreamError::UnsupportedEncoding), - peanuts::Error::ParseError(_) => Some(StreamError::BadFormat), + peanuts::Error::ParseError(_, _) => Some(StreamError::BadFormat), peanuts::Error::EntityProcessError(_) => Some(StreamError::RestrictedXml), peanuts::Error::InvalidCharRef(char_ref_error) => Some(StreamError::UnsupportedEncoding), peanuts::Error::DuplicateNameSpaceDeclaration(namespace_declaration) => Some(StreamError::NotWellFormed), @@ -139,9 +147,10 @@ impl<Lgc: Clone + Logic + Send + 'static> Read<Lgc> { peanuts::Error::UndeclaredNamespace(_) => Some(StreamError::InvalidNamespace), peanuts::Error::Deserialize(deserialize_error) => Some(StreamError::InvalidXml), peanuts::Error::RootElementEnded => Some(StreamError::InvalidXml), + _ => None, }; - let _ = self.on_crash.send((stream_error, ReadState { supervisor_control: self.supervisor_control, tasks: self.tasks })); + let _ = self.on_crash.send((stream_error, ReadState { supervisor_control: self.supervisor_control })); } break; }, @@ -168,7 +177,8 @@ pub enum ReadControl { pub struct ReadControlHandle { sender: mpsc::Sender<ReadControl>, - pub(crate) handle: JoinHandle<()>, + // TODO: same here + // pub(crate) handle: JoinHandle<()>, } impl Deref for ReadControlHandle { @@ -186,56 +196,64 @@ impl DerefMut for ReadControlHandle { } impl ReadControlHandle { - pub fn new<Lgc: Clone + Logic + Send + 'static>( - stream: BoundJabberReader<Tls>, + pub fn new<Lgc>( + stream: BoundJabberReader, connected: Connected, logic: Lgc, supervisor_control: SupervisorSender, on_crash: oneshot::Sender<(Option<StreamError>, ReadState)>, - ) -> Self { + ) -> Self + where + Lgc: Logic + Clone + 'static, + #[cfg(not(target_arch = "wasm32"))] + Lgc: Send, + { let (control_sender, control_receiver) = mpsc::channel(20); let actor = Read::new( stream, - JoinSet::new(), + // JoinSet::new(), connected, logic, supervisor_control, control_receiver, on_crash, ); - let handle = tokio::spawn(async move { actor.run().await }); + tokio::spawn(async move { actor.run().await }); Self { sender: control_sender, - handle, } } - pub fn reconnect<Lgc: Clone + Logic + Send + 'static>( - stream: BoundJabberReader<Tls>, - tasks: JoinSet<()>, + pub fn reconnect<Lgc>( + stream: BoundJabberReader, + // tasks: JoinSet<()>, connected: Connected, logic: Lgc, supervisor_control: SupervisorSender, on_crash: oneshot::Sender<(Option<StreamError>, ReadState)>, - ) -> Self { + ) -> Self + where + Lgc: Logic + Clone + 'static, + #[cfg(not(target_arch = "wasm32"))] + Lgc: Send, + { let (control_sender, control_receiver) = mpsc::channel(20); let actor = Read::new( stream, - tasks, + // tasks, connected, logic, supervisor_control, control_receiver, on_crash, ); - let handle = tokio::spawn(async move { actor.run().await }); + tokio::spawn(async move { actor.run().await }); Self { sender: control_sender, - handle, } } } diff --git a/lampada/src/connection/write.rs b/lampada/src/connection/write.rs index 1070cdf..b982eea 100644 --- a/lampada/src/connection/write.rs +++ b/lampada/src/connection/write.rs @@ -1,6 +1,6 @@ use std::ops::{Deref, DerefMut}; -use luz::{connection::Tls, jabber_stream::bound_stream::BoundJabberWriter}; +use luz::jabber_stream::bound_stream::BoundJabberWriter; use stanza::{ client::Stanza, stream::Error as StreamErrorStanza, stream_error::Error as StreamError, }; @@ -8,12 +8,14 @@ use tokio::{ sync::{mpsc, oneshot}, task::JoinHandle, }; +#[cfg(target_arch = "wasm32")] +use tokio_with_wasm::alias as tokio; use crate::error::WriteError; /// actor that receives jabber stanzas to write, and if there is an error, sends a message back to the supervisor then aborts, so the supervisor can spawn a new stream. pub struct Write { - stream: BoundJabberWriter<Tls>, + stream: BoundJabberWriter, /// connection session write queue stanza_receiver: mpsc::Receiver<WriteMessage>, @@ -41,7 +43,7 @@ pub enum WriteControl { impl Write { fn new( - stream: BoundJabberWriter<Tls>, + stream: BoundJabberWriter, stanza_receiver: mpsc::Receiver<WriteMessage>, control_receiver: mpsc::Receiver<WriteControl>, on_crash: oneshot::Sender<(WriteMessage, WriteState)>, @@ -192,7 +194,7 @@ impl DerefMut for WriteHandle { pub struct WriteControlHandle { sender: mpsc::Sender<WriteControl>, - pub(crate) handle: JoinHandle<()>, + // pub(crate) handle: JoinHandle<()>, } impl Deref for WriteControlHandle { @@ -211,14 +213,14 @@ impl DerefMut for WriteControlHandle { impl WriteControlHandle { pub fn new( - stream: BoundJabberWriter<Tls>, + stream: BoundJabberWriter, on_crash: oneshot::Sender<(WriteMessage, WriteState)>, ) -> (WriteHandle, Self) { let (control_sender, control_receiver) = mpsc::channel(20); let (stanza_sender, stanza_receiver) = mpsc::channel(20); let actor = Write::new(stream, stanza_receiver, control_receiver, on_crash); - let handle = tokio::spawn(async move { actor.run().await }); + tokio::spawn(async move { actor.run().await }); ( WriteHandle { @@ -226,13 +228,12 @@ impl WriteControlHandle { }, Self { sender: control_sender, - handle, }, ) } pub fn reconnect_retry( - stream: BoundJabberWriter<Tls>, + stream: BoundJabberWriter, on_crash: oneshot::Sender<(WriteMessage, WriteState)>, stanza_receiver: mpsc::Receiver<WriteMessage>, retry_msg: WriteMessage, @@ -240,27 +241,26 @@ impl WriteControlHandle { let (control_sender, control_receiver) = mpsc::channel(20); let actor = Write::new(stream, stanza_receiver, control_receiver, on_crash); - let handle = tokio::spawn(async move { actor.run_reconnected(retry_msg).await }); + tokio::spawn(async move { actor.run_reconnected(retry_msg).await }); Self { sender: control_sender, - handle, } } pub fn reconnect( - stream: BoundJabberWriter<Tls>, + stream: BoundJabberWriter, on_crash: oneshot::Sender<(WriteMessage, WriteState)>, stanza_receiver: mpsc::Receiver<WriteMessage>, ) -> Self { let (control_sender, control_receiver) = mpsc::channel(20); let actor = Write::new(stream, stanza_receiver, control_receiver, on_crash); - let handle = tokio::spawn(async move { actor.run().await }); + + tokio::spawn(async move { actor.run().await }); Self { sender: control_sender, - handle, } } } diff --git a/lampada/src/error.rs b/lampada/src/error.rs index 8104155..a5af3a2 100644 --- a/lampada/src/error.rs +++ b/lampada/src/error.rs @@ -2,10 +2,13 @@ use std::sync::Arc; use stanza::client::Stanza; use thiserror::Error; -use tokio::{ - sync::{mpsc::error::SendError, oneshot::error::RecvError}, - time::error::Elapsed, -}; +use tokio::sync::{mpsc::error::SendError, oneshot::error::RecvError}; +#[cfg(not(target_arch = "wasm32"))] +use tokio::time::error::Elapsed; +#[cfg(target_arch = "wasm32")] +use tokio::time::Elapsed; +#[cfg(target_arch = "wasm32")] +use tokio_with_wasm::alias as tokio; #[derive(Debug, Error, Clone)] pub enum ConnectionError { @@ -21,10 +24,11 @@ pub enum ConnectionError { #[error("disconnected")] Disconnected, #[error("invalid server jid: {0}")] - InvalidServerJID(#[from] jid::ParseError), + InvalidServerJID(#[from] jid::JIDError), } #[derive(Debug, Error, Clone)] +#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize))] pub enum CommandError<T> { #[error("actor: {0}")] Actor(ActorError), @@ -58,6 +62,7 @@ pub enum ReadError { } #[derive(Debug, Error, Clone)] +#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize))] pub enum ActorError { #[error("receive timed out")] Timeout, diff --git a/lampada/src/lib.rs b/lampada/src/lib.rs index 7346c42..d1d451e 100644 --- a/lampada/src/lib.rs +++ b/lampada/src/lib.rs @@ -1,3 +1,5 @@ +#![feature(where_clause_attrs)] + use std::{ collections::HashMap, ops::{Deref, DerefMut}, @@ -10,6 +12,7 @@ pub use connection::write::WriteMessage; pub use connection::SupervisorSender; use error::ConnectionError; use futures::{future::Fuse, FutureExt}; +use jid::{BareJID, FullJID}; use luz::JID; use stanza::client::{ iq::{self, Iq, IqType}, @@ -21,6 +24,8 @@ use tokio::{ task::JoinSet, time::timeout, }; +#[cfg(target_arch = "wasm32")] +use tokio_with_wasm::alias as tokio; use tracing::{debug, info}; use crate::connection::write::WriteHandle; @@ -32,24 +37,18 @@ pub mod error; #[derive(Clone)] pub struct Connected { // full jid will stay stable across reconnections - jid: JID, + jid: FullJID, write_handle: WriteHandle, - // the server jid - server: JID, } impl Connected { - pub fn jid(&self) -> &JID { + pub fn jid(&self) -> &FullJID { &self.jid } pub fn write_handle(&self) -> &WriteHandle { &self.write_handle } - - pub fn server(&self) -> &JID { - &self.server - } } /// everything that a particular xmpp client must implement @@ -58,20 +57,24 @@ pub trait Logic { type Cmd; /// run after binding to the stream (e.g. for a chat client, ) + #[cfg(not(target_arch = "wasm32"))] fn handle_connect(self, connection: Connected) -> impl std::future::Future<Output = ()> + Send; /// run before closing the stream (e.g. send unavailable presence in a chat client) + #[cfg(not(target_arch = "wasm32"))] fn handle_disconnect( self, connection: Connected, ) -> impl std::future::Future<Output = ()> + Send; + #[cfg(not(target_arch = "wasm32"))] fn handle_stream_error( self, stream_error: StreamError, ) -> impl std::future::Future<Output = ()> + Send; /// run to handle an incoming xmpp stanza + #[cfg(not(target_arch = "wasm32"))] fn handle_stanza( self, stanza: Stanza, @@ -79,6 +82,7 @@ pub trait Logic { ) -> impl std::future::Future<Output = ()> + std::marker::Send; /// run to handle a command message when a connection is currently established + #[cfg(not(target_arch = "wasm32"))] fn handle_online( self, command: Self::Cmd, @@ -86,21 +90,67 @@ pub trait Logic { ) -> impl std::future::Future<Output = ()> + std::marker::Send; /// run to handle a command message when disconnected + #[cfg(not(target_arch = "wasm32"))] fn handle_offline( self, command: Self::Cmd, ) -> impl std::future::Future<Output = ()> + std::marker::Send; /// run as cleanup after either an abort or a disconnect (e.g. reply to all pending requests with a disconnected error) + #[cfg(not(target_arch = "wasm32"))] fn on_abort(self) -> impl std::future::Future<Output = ()> + std::marker::Send; /// handle connection errors from the core client logic + #[cfg(not(target_arch = "wasm32"))] fn handle_connection_error( self, error: ConnectionError, ) -> impl std::future::Future<Output = ()> + std::marker::Send; // async fn handle_stream_error(self, error) {} + #[cfg(target_arch = "wasm32")] + fn handle_connect(self, connection: Connected) -> impl std::future::Future<Output = ()>; + + /// run before closing the stream (e.g. send unavailable presence in a chat client) + #[cfg(target_arch = "wasm32")] + fn handle_disconnect(self, connection: Connected) -> impl std::future::Future<Output = ()>; + + #[cfg(target_arch = "wasm32")] + fn handle_stream_error( + self, + stream_error: StreamError, + ) -> impl std::future::Future<Output = ()>; + + /// run to handle an incoming xmpp stanza + #[cfg(target_arch = "wasm32")] + fn handle_stanza( + self, + stanza: Stanza, + connection: Connected, + ) -> impl std::future::Future<Output = ()>; + + /// run to handle a command message when a connection is currently established + #[cfg(target_arch = "wasm32")] + fn handle_online( + self, + command: Self::Cmd, + connection: Connected, + ) -> impl std::future::Future<Output = ()>; + + /// run to handle a command message when disconnected + #[cfg(target_arch = "wasm32")] + fn handle_offline(self, command: Self::Cmd) -> impl std::future::Future<Output = ()>; + + /// run as cleanup after either an abort or a disconnect (e.g. reply to all pending requests with a disconnected error) + #[cfg(target_arch = "wasm32")] + fn on_abort(self) -> impl std::future::Future<Output = ()>; + + /// handle connection errors from the core client logic + #[cfg(target_arch = "wasm32")] + fn handle_connection_error( + self, + error: ConnectionError, + ) -> impl std::future::Future<Output = ()>; } /// an actor that implements xmpp core (rfc6120), manages connection/stream status, and delegates any other logic to the generic which implements Logic, allowing different kinds of clients (e.g. chat, social, pubsub) to be built upon the same core @@ -117,10 +167,15 @@ pub struct CoreClient<Lgc: Logic> { logic: Lgc, // config: LampConfig, // TODO: will grow forever at this point, maybe not required as tasks will naturally shut down anyway? - tasks: JoinSet<()>, + // tasks: JoinSet<()>, } -impl<Lgc: Logic + Clone + Send + 'static> CoreClient<Lgc> { +impl<Lgc> CoreClient<Lgc> +where + Lgc: Logic + Clone + 'static, + #[cfg(not(target_arch = "wasm32"))] + Lgc: Send, +{ /// create a new actor pub fn new( jid: JID, @@ -137,7 +192,7 @@ impl<Lgc: Logic + Clone + Send + 'static> CoreClient<Lgc> { receiver, connection_supervisor_shutdown, logic, - tasks: JoinSet::new(), + // tasks: JoinSet::new(), } } @@ -157,42 +212,27 @@ impl<Lgc: Logic + Clone + Send + 'static> CoreClient<Lgc> { else => break, }; match msg { - CoreClientCommand::Connect => { + CoreClientCommand::Connect(sender) => { match self.connected { Some(_) => { + sender.send(Err(ConnectionError::AlreadyConnected)); self.logic .clone() .handle_connection_error(ConnectionError::AlreadyConnected) .await; } None => { - let mut jid = self.jid.clone(); - let mut domain = jid.domainpart.clone(); // TODO: check what happens upon reconnection with same resource (this is probably what one wants to do and why jid should be mutated from a bare jid to one with a resource) let streams_result = - luz::connect_and_login(&mut jid, &*self.password, &mut domain) - .await; - let server: JID = match domain.parse() { - Ok(j) => j, - Err(e) => { - self.logic - .clone() - .handle_connection_error(ConnectionError::InvalidServerJID( - e, - )) - .await; - continue; - } - }; + luz::connect_and_login(&self.jid, &*self.password).await; match streams_result { - Ok(s) => { + Ok((s, full_jid)) => { debug!("ok stream result"); let (shutdown_send, shutdown_recv) = oneshot::channel::<()>(); let (writer, supervisor) = SupervisorHandle::new( s, shutdown_send, - jid.clone(), - server.clone(), + full_jid.clone(), self.password.clone(), self.logic.clone(), ); @@ -201,23 +241,25 @@ impl<Lgc: Logic + Clone + Send + 'static> CoreClient<Lgc> { self.connection_supervisor_shutdown = shutdown_recv; let connected = Connected { - jid, + jid: full_jid.clone(), write_handle: writer, - server, }; self.logic.clone().handle_connect(connected.clone()).await; self.connected = Some((connected, supervisor)); + // REMEMBER TO NOTIFY IT@S GOOD + sender.send(Ok(full_jid.resourcepart)); } Err(e) => { tracing::error!("error: {}", e); self.logic .clone() .handle_connection_error(ConnectionError::ConnectionFailed( - e.into(), + e.clone().into(), )) .await; + sender.send(Err(ConnectionError::ConnectionFailed(e))); } } } @@ -240,10 +282,10 @@ impl<Lgc: Logic + Clone + Send + 'static> CoreClient<Lgc> { }, CoreClientCommand::Command(command) => { match self.connected.as_ref() { - Some((w, s)) => self - .tasks - .spawn(self.logic.clone().handle_online(command, w.clone())), - None => self.tasks.spawn(self.logic.clone().handle_offline(command)), + Some((w, s)) => { + tokio::spawn(self.logic.clone().handle_online(command, w.clone())) + } + None => tokio::spawn(self.logic.clone().handle_offline(command)), }; } } @@ -255,7 +297,7 @@ impl<Lgc: Logic + Clone + Send + 'static> CoreClient<Lgc> { pub enum CoreClientCommand<C> { // TODO: login invisible xep-0186 /// connect to XMPP chat server. gets roster and publishes initial presence. - Connect, + Connect(oneshot::Sender<Result<String, ConnectionError>>), /// disconnect from XMPP chat server, sending unavailable presence then closing stream. Disconnect, /// TODO: generics |