diff options
Diffstat (limited to 'lampada/src/lib.rs')
-rw-r--r-- | lampada/src/lib.rs | 120 |
1 files changed, 81 insertions, 39 deletions
diff --git a/lampada/src/lib.rs b/lampada/src/lib.rs index 7346c42..d1d451e 100644 --- a/lampada/src/lib.rs +++ b/lampada/src/lib.rs @@ -1,3 +1,5 @@ +#![feature(where_clause_attrs)] + use std::{ collections::HashMap, ops::{Deref, DerefMut}, @@ -10,6 +12,7 @@ pub use connection::write::WriteMessage; pub use connection::SupervisorSender; use error::ConnectionError; use futures::{future::Fuse, FutureExt}; +use jid::{BareJID, FullJID}; use luz::JID; use stanza::client::{ iq::{self, Iq, IqType}, @@ -21,6 +24,8 @@ use tokio::{ task::JoinSet, time::timeout, }; +#[cfg(target_arch = "wasm32")] +use tokio_with_wasm::alias as tokio; use tracing::{debug, info}; use crate::connection::write::WriteHandle; @@ -32,24 +37,18 @@ pub mod error; #[derive(Clone)] pub struct Connected { // full jid will stay stable across reconnections - jid: JID, + jid: FullJID, write_handle: WriteHandle, - // the server jid - server: JID, } impl Connected { - pub fn jid(&self) -> &JID { + pub fn jid(&self) -> &FullJID { &self.jid } pub fn write_handle(&self) -> &WriteHandle { &self.write_handle } - - pub fn server(&self) -> &JID { - &self.server - } } /// everything that a particular xmpp client must implement @@ -58,20 +57,24 @@ pub trait Logic { type Cmd; /// run after binding to the stream (e.g. for a chat client, ) + #[cfg(not(target_arch = "wasm32"))] fn handle_connect(self, connection: Connected) -> impl std::future::Future<Output = ()> + Send; /// run before closing the stream (e.g. send unavailable presence in a chat client) + #[cfg(not(target_arch = "wasm32"))] fn handle_disconnect( self, connection: Connected, ) -> impl std::future::Future<Output = ()> + Send; + #[cfg(not(target_arch = "wasm32"))] fn handle_stream_error( self, stream_error: StreamError, ) -> impl std::future::Future<Output = ()> + Send; /// run to handle an incoming xmpp stanza + #[cfg(not(target_arch = "wasm32"))] fn handle_stanza( self, stanza: Stanza, @@ -79,6 +82,7 @@ pub trait Logic { ) -> impl std::future::Future<Output = ()> + std::marker::Send; /// run to handle a command message when a connection is currently established + #[cfg(not(target_arch = "wasm32"))] fn handle_online( self, command: Self::Cmd, @@ -86,21 +90,67 @@ pub trait Logic { ) -> impl std::future::Future<Output = ()> + std::marker::Send; /// run to handle a command message when disconnected + #[cfg(not(target_arch = "wasm32"))] fn handle_offline( self, command: Self::Cmd, ) -> impl std::future::Future<Output = ()> + std::marker::Send; /// run as cleanup after either an abort or a disconnect (e.g. reply to all pending requests with a disconnected error) + #[cfg(not(target_arch = "wasm32"))] fn on_abort(self) -> impl std::future::Future<Output = ()> + std::marker::Send; /// handle connection errors from the core client logic + #[cfg(not(target_arch = "wasm32"))] fn handle_connection_error( self, error: ConnectionError, ) -> impl std::future::Future<Output = ()> + std::marker::Send; // async fn handle_stream_error(self, error) {} + #[cfg(target_arch = "wasm32")] + fn handle_connect(self, connection: Connected) -> impl std::future::Future<Output = ()>; + + /// run before closing the stream (e.g. send unavailable presence in a chat client) + #[cfg(target_arch = "wasm32")] + fn handle_disconnect(self, connection: Connected) -> impl std::future::Future<Output = ()>; + + #[cfg(target_arch = "wasm32")] + fn handle_stream_error( + self, + stream_error: StreamError, + ) -> impl std::future::Future<Output = ()>; + + /// run to handle an incoming xmpp stanza + #[cfg(target_arch = "wasm32")] + fn handle_stanza( + self, + stanza: Stanza, + connection: Connected, + ) -> impl std::future::Future<Output = ()>; + + /// run to handle a command message when a connection is currently established + #[cfg(target_arch = "wasm32")] + fn handle_online( + self, + command: Self::Cmd, + connection: Connected, + ) -> impl std::future::Future<Output = ()>; + + /// run to handle a command message when disconnected + #[cfg(target_arch = "wasm32")] + fn handle_offline(self, command: Self::Cmd) -> impl std::future::Future<Output = ()>; + + /// run as cleanup after either an abort or a disconnect (e.g. reply to all pending requests with a disconnected error) + #[cfg(target_arch = "wasm32")] + fn on_abort(self) -> impl std::future::Future<Output = ()>; + + /// handle connection errors from the core client logic + #[cfg(target_arch = "wasm32")] + fn handle_connection_error( + self, + error: ConnectionError, + ) -> impl std::future::Future<Output = ()>; } /// an actor that implements xmpp core (rfc6120), manages connection/stream status, and delegates any other logic to the generic which implements Logic, allowing different kinds of clients (e.g. chat, social, pubsub) to be built upon the same core @@ -117,10 +167,15 @@ pub struct CoreClient<Lgc: Logic> { logic: Lgc, // config: LampConfig, // TODO: will grow forever at this point, maybe not required as tasks will naturally shut down anyway? - tasks: JoinSet<()>, + // tasks: JoinSet<()>, } -impl<Lgc: Logic + Clone + Send + 'static> CoreClient<Lgc> { +impl<Lgc> CoreClient<Lgc> +where + Lgc: Logic + Clone + 'static, + #[cfg(not(target_arch = "wasm32"))] + Lgc: Send, +{ /// create a new actor pub fn new( jid: JID, @@ -137,7 +192,7 @@ impl<Lgc: Logic + Clone + Send + 'static> CoreClient<Lgc> { receiver, connection_supervisor_shutdown, logic, - tasks: JoinSet::new(), + // tasks: JoinSet::new(), } } @@ -157,42 +212,27 @@ impl<Lgc: Logic + Clone + Send + 'static> CoreClient<Lgc> { else => break, }; match msg { - CoreClientCommand::Connect => { + CoreClientCommand::Connect(sender) => { match self.connected { Some(_) => { + sender.send(Err(ConnectionError::AlreadyConnected)); self.logic .clone() .handle_connection_error(ConnectionError::AlreadyConnected) .await; } None => { - let mut jid = self.jid.clone(); - let mut domain = jid.domainpart.clone(); // TODO: check what happens upon reconnection with same resource (this is probably what one wants to do and why jid should be mutated from a bare jid to one with a resource) let streams_result = - luz::connect_and_login(&mut jid, &*self.password, &mut domain) - .await; - let server: JID = match domain.parse() { - Ok(j) => j, - Err(e) => { - self.logic - .clone() - .handle_connection_error(ConnectionError::InvalidServerJID( - e, - )) - .await; - continue; - } - }; + luz::connect_and_login(&self.jid, &*self.password).await; match streams_result { - Ok(s) => { + Ok((s, full_jid)) => { debug!("ok stream result"); let (shutdown_send, shutdown_recv) = oneshot::channel::<()>(); let (writer, supervisor) = SupervisorHandle::new( s, shutdown_send, - jid.clone(), - server.clone(), + full_jid.clone(), self.password.clone(), self.logic.clone(), ); @@ -201,23 +241,25 @@ impl<Lgc: Logic + Clone + Send + 'static> CoreClient<Lgc> { self.connection_supervisor_shutdown = shutdown_recv; let connected = Connected { - jid, + jid: full_jid.clone(), write_handle: writer, - server, }; self.logic.clone().handle_connect(connected.clone()).await; self.connected = Some((connected, supervisor)); + // REMEMBER TO NOTIFY IT@S GOOD + sender.send(Ok(full_jid.resourcepart)); } Err(e) => { tracing::error!("error: {}", e); self.logic .clone() .handle_connection_error(ConnectionError::ConnectionFailed( - e.into(), + e.clone().into(), )) .await; + sender.send(Err(ConnectionError::ConnectionFailed(e))); } } } @@ -240,10 +282,10 @@ impl<Lgc: Logic + Clone + Send + 'static> CoreClient<Lgc> { }, CoreClientCommand::Command(command) => { match self.connected.as_ref() { - Some((w, s)) => self - .tasks - .spawn(self.logic.clone().handle_online(command, w.clone())), - None => self.tasks.spawn(self.logic.clone().handle_offline(command)), + Some((w, s)) => { + tokio::spawn(self.logic.clone().handle_online(command, w.clone())) + } + None => tokio::spawn(self.logic.clone().handle_offline(command)), }; } } @@ -255,7 +297,7 @@ impl<Lgc: Logic + Clone + Send + 'static> CoreClient<Lgc> { pub enum CoreClientCommand<C> { // TODO: login invisible xep-0186 /// connect to XMPP chat server. gets roster and publishes initial presence. - Connect, + Connect(oneshot::Sender<Result<String, ConnectionError>>), /// disconnect from XMPP chat server, sending unavailable presence then closing stream. Disconnect, /// TODO: generics |