diff options
Diffstat (limited to 'lampada/src/lib.rs')
-rw-r--r-- | lampada/src/lib.rs | 238 |
1 files changed, 238 insertions, 0 deletions
diff --git a/lampada/src/lib.rs b/lampada/src/lib.rs new file mode 100644 index 0000000..c61c596 --- /dev/null +++ b/lampada/src/lib.rs @@ -0,0 +1,238 @@ +use std::{ + collections::HashMap, + ops::{Deref, DerefMut}, + str::FromStr, + sync::Arc, + time::Duration, +}; + +pub use connection::write::WriteMessage; +pub use connection::SupervisorSender; +use error::ConnectionError; +use futures::{future::Fuse, FutureExt}; +use luz::JID; +use stanza::client::{ + iq::{self, Iq, IqType}, + Stanza, +}; +use tokio::{ + sync::{mpsc, oneshot, Mutex}, + task::JoinSet, + time::timeout, +}; +use tracing::{debug, info}; + +use crate::connection::write::WriteHandle; +use crate::connection::{SupervisorCommand, SupervisorHandle}; + +mod connection; +pub mod error; + +#[derive(Clone)] +pub struct Connected { + // full jid will stay stable across reconnections + jid: JID, + write_handle: WriteHandle, +} + +impl Connected { + pub fn jid(&self) -> &JID { + &self.jid + } + + pub fn write_handle(&self) -> &WriteHandle { + &self.write_handle + } +} + +/// everything that a particular xmpp client must implement +pub trait Logic { + /// the command message type + type Cmd; + + /// run after binding to the stream (e.g. for a chat client, ) + fn handle_connect(self, connection: Connected) -> impl std::future::Future<Output = ()> + Send; + + /// run before closing the stream (e.g. send unavailable presence in a chat client) + fn handle_disconnect( + self, + connection: Connected, + ) -> impl std::future::Future<Output = ()> + Send; + + /// run to handle an incoming xmpp stanza + fn handle_stanza( + self, + stanza: Stanza, + connection: Connected, + supervisor: SupervisorSender, + ) -> impl std::future::Future<Output = ()> + std::marker::Send; + + /// run to handle a command message when a connection is currently established + fn handle_online( + self, + command: Self::Cmd, + connection: Connected, + ) -> impl std::future::Future<Output = ()> + std::marker::Send; + + /// run to handle a command message when disconnected + fn handle_offline( + self, + command: Self::Cmd, + ) -> impl std::future::Future<Output = ()> + std::marker::Send; + + /// run as cleanup after either an abort or a disconnect (e.g. reply to all pending requests with a disconnected error) + fn on_abort(self) -> impl std::future::Future<Output = ()> + std::marker::Send; + + /// handle connection errors from the core client logic + fn handle_connection_error( + self, + error: ConnectionError, + ) -> impl std::future::Future<Output = ()> + std::marker::Send; + + // async fn handle_stream_error(self, error) {} +} + +/// an actor that implements xmpp core (rfc6120), manages connection/stream status, and delegates any other logic to the generic which implements Logic, allowing different kinds of clients (e.g. chat, social, pubsub) to be built upon the same core +pub struct CoreClient<Lgc: Logic> { + jid: JID, + // TODO: use a dyn passwordprovider trait to avoid storing password in memory + password: Arc<String>, + receiver: mpsc::Receiver<CoreClientCommand<Lgc::Cmd>>, + connected: Option<(Connected, SupervisorHandle)>, + // TODO: will need to have an auto reconnect state as well (e.g. in case server shut down, to try and reconnect later) + // connected_intention: bool, + /// if connection was shut down due to e.g. server shutdown, supervisor must be able to mark client as disconnected + connection_supervisor_shutdown: Fuse<oneshot::Receiver<()>>, + logic: Lgc, + // config: LampConfig, + // TODO: will grow forever at this point, maybe not required as tasks will naturally shut down anyway? + tasks: JoinSet<()>, +} + +impl<Lgc: Logic + Clone + Send + 'static> CoreClient<Lgc> { + /// create a new actor + pub fn new( + jid: JID, + password: String, + receiver: mpsc::Receiver<CoreClientCommand<Lgc::Cmd>>, + connected: Option<(Connected, SupervisorHandle)>, + connection_supervisor_shutdown: Fuse<oneshot::Receiver<()>>, + logic: Lgc, + ) -> Self { + Self { + jid, + password: Arc::new(password), + connected, + receiver, + connection_supervisor_shutdown, + logic, + tasks: JoinSet::new(), + } + } + + /// run the actor + pub async fn run(mut self) { + loop { + let msg = tokio::select! { + // this is okay, as when created the supervisor (and connection) doesn't exist, but a bit messy + // THIS IS NOT OKAY LOLLLL - apparently fusing is the best option??? + _ = &mut self.connection_supervisor_shutdown => { + self.connected = None; + continue; + } + Some(msg) = self.receiver.recv() => { + msg + }, + else => break, + }; + match msg { + CoreClientCommand::Connect => { + match self.connected { + Some(_) => { + self.logic + .clone() + .handle_connection_error(ConnectionError::AlreadyConnected) + .await; + } + None => { + let mut jid = self.jid.clone(); + let mut domain = jid.domainpart.clone(); + // TODO: check what happens upon reconnection with same resource (this is probably what one wants to do and why jid should be mutated from a bare jid to one with a resource) + let streams_result = + luz::connect_and_login(&mut jid, &*self.password, &mut domain) + .await; + match streams_result { + Ok(s) => { + debug!("ok stream result"); + let (shutdown_send, shutdown_recv) = oneshot::channel::<()>(); + let (writer, supervisor) = SupervisorHandle::new( + s, + shutdown_send, + jid.clone(), + self.password.clone(), + self.logic.clone(), + ); + + let shutdown_recv = shutdown_recv.fuse(); + self.connection_supervisor_shutdown = shutdown_recv; + + let connected = Connected { + jid, + write_handle: writer, + }; + + self.logic.clone().handle_connect(connected.clone()).await; + + self.connected = Some((connected, supervisor)); + } + Err(e) => { + tracing::error!("error: {}", e); + self.logic + .clone() + .handle_connection_error(ConnectionError::ConnectionFailed( + e.into(), + )) + .await; + } + } + } + }; + } + CoreClientCommand::Disconnect => match self.connected { + None => { + self.logic + .clone() + .handle_connection_error(ConnectionError::AlreadyDisconnected) + .await; + } + ref mut c => { + if let Some((connected, supervisor_handle)) = c.take() { + let _ = supervisor_handle.send(SupervisorCommand::Disconnect).await; + } else { + unreachable!() + }; + } + }, + CoreClientCommand::Command(command) => { + match self.connected.as_ref() { + Some((w, s)) => self + .tasks + .spawn(self.logic.clone().handle_online(command, w.clone())), + None => self.tasks.spawn(self.logic.clone().handle_offline(command)), + }; + } + } + } + } +} + +// TODO: generate methods for each with a macro +pub enum CoreClientCommand<C> { + // TODO: login invisible xep-0186 + /// connect to XMPP chat server. gets roster and publishes initial presence. + Connect, + /// disconnect from XMPP chat server, sending unavailable presence then closing stream. + Disconnect, + /// TODO: generics + Command(C), +} |