aboutsummaryrefslogtreecommitdiffstats
path: root/lampada/src/lib.rs
diff options
context:
space:
mode:
Diffstat (limited to 'lampada/src/lib.rs')
-rw-r--r--lampada/src/lib.rs238
1 files changed, 238 insertions, 0 deletions
diff --git a/lampada/src/lib.rs b/lampada/src/lib.rs
new file mode 100644
index 0000000..c61c596
--- /dev/null
+++ b/lampada/src/lib.rs
@@ -0,0 +1,238 @@
+use std::{
+ collections::HashMap,
+ ops::{Deref, DerefMut},
+ str::FromStr,
+ sync::Arc,
+ time::Duration,
+};
+
+pub use connection::write::WriteMessage;
+pub use connection::SupervisorSender;
+use error::ConnectionError;
+use futures::{future::Fuse, FutureExt};
+use luz::JID;
+use stanza::client::{
+ iq::{self, Iq, IqType},
+ Stanza,
+};
+use tokio::{
+ sync::{mpsc, oneshot, Mutex},
+ task::JoinSet,
+ time::timeout,
+};
+use tracing::{debug, info};
+
+use crate::connection::write::WriteHandle;
+use crate::connection::{SupervisorCommand, SupervisorHandle};
+
+mod connection;
+pub mod error;
+
+#[derive(Clone)]
+pub struct Connected {
+ // full jid will stay stable across reconnections
+ jid: JID,
+ write_handle: WriteHandle,
+}
+
+impl Connected {
+ pub fn jid(&self) -> &JID {
+ &self.jid
+ }
+
+ pub fn write_handle(&self) -> &WriteHandle {
+ &self.write_handle
+ }
+}
+
+/// everything that a particular xmpp client must implement
+pub trait Logic {
+ /// the command message type
+ type Cmd;
+
+ /// run after binding to the stream (e.g. for a chat client, )
+ fn handle_connect(self, connection: Connected) -> impl std::future::Future<Output = ()> + Send;
+
+ /// run before closing the stream (e.g. send unavailable presence in a chat client)
+ fn handle_disconnect(
+ self,
+ connection: Connected,
+ ) -> impl std::future::Future<Output = ()> + Send;
+
+ /// run to handle an incoming xmpp stanza
+ fn handle_stanza(
+ self,
+ stanza: Stanza,
+ connection: Connected,
+ supervisor: SupervisorSender,
+ ) -> impl std::future::Future<Output = ()> + std::marker::Send;
+
+ /// run to handle a command message when a connection is currently established
+ fn handle_online(
+ self,
+ command: Self::Cmd,
+ connection: Connected,
+ ) -> impl std::future::Future<Output = ()> + std::marker::Send;
+
+ /// run to handle a command message when disconnected
+ fn handle_offline(
+ self,
+ command: Self::Cmd,
+ ) -> impl std::future::Future<Output = ()> + std::marker::Send;
+
+ /// run as cleanup after either an abort or a disconnect (e.g. reply to all pending requests with a disconnected error)
+ fn on_abort(self) -> impl std::future::Future<Output = ()> + std::marker::Send;
+
+ /// handle connection errors from the core client logic
+ fn handle_connection_error(
+ self,
+ error: ConnectionError,
+ ) -> impl std::future::Future<Output = ()> + std::marker::Send;
+
+ // async fn handle_stream_error(self, error) {}
+}
+
+/// an actor that implements xmpp core (rfc6120), manages connection/stream status, and delegates any other logic to the generic which implements Logic, allowing different kinds of clients (e.g. chat, social, pubsub) to be built upon the same core
+pub struct CoreClient<Lgc: Logic> {
+ jid: JID,
+ // TODO: use a dyn passwordprovider trait to avoid storing password in memory
+ password: Arc<String>,
+ receiver: mpsc::Receiver<CoreClientCommand<Lgc::Cmd>>,
+ connected: Option<(Connected, SupervisorHandle)>,
+ // TODO: will need to have an auto reconnect state as well (e.g. in case server shut down, to try and reconnect later)
+ // connected_intention: bool,
+ /// if connection was shut down due to e.g. server shutdown, supervisor must be able to mark client as disconnected
+ connection_supervisor_shutdown: Fuse<oneshot::Receiver<()>>,
+ logic: Lgc,
+ // config: LampConfig,
+ // TODO: will grow forever at this point, maybe not required as tasks will naturally shut down anyway?
+ tasks: JoinSet<()>,
+}
+
+impl<Lgc: Logic + Clone + Send + 'static> CoreClient<Lgc> {
+ /// create a new actor
+ pub fn new(
+ jid: JID,
+ password: String,
+ receiver: mpsc::Receiver<CoreClientCommand<Lgc::Cmd>>,
+ connected: Option<(Connected, SupervisorHandle)>,
+ connection_supervisor_shutdown: Fuse<oneshot::Receiver<()>>,
+ logic: Lgc,
+ ) -> Self {
+ Self {
+ jid,
+ password: Arc::new(password),
+ connected,
+ receiver,
+ connection_supervisor_shutdown,
+ logic,
+ tasks: JoinSet::new(),
+ }
+ }
+
+ /// run the actor
+ pub async fn run(mut self) {
+ loop {
+ let msg = tokio::select! {
+ // this is okay, as when created the supervisor (and connection) doesn't exist, but a bit messy
+ // THIS IS NOT OKAY LOLLLL - apparently fusing is the best option???
+ _ = &mut self.connection_supervisor_shutdown => {
+ self.connected = None;
+ continue;
+ }
+ Some(msg) = self.receiver.recv() => {
+ msg
+ },
+ else => break,
+ };
+ match msg {
+ CoreClientCommand::Connect => {
+ match self.connected {
+ Some(_) => {
+ self.logic
+ .clone()
+ .handle_connection_error(ConnectionError::AlreadyConnected)
+ .await;
+ }
+ None => {
+ let mut jid = self.jid.clone();
+ let mut domain = jid.domainpart.clone();
+ // TODO: check what happens upon reconnection with same resource (this is probably what one wants to do and why jid should be mutated from a bare jid to one with a resource)
+ let streams_result =
+ luz::connect_and_login(&mut jid, &*self.password, &mut domain)
+ .await;
+ match streams_result {
+ Ok(s) => {
+ debug!("ok stream result");
+ let (shutdown_send, shutdown_recv) = oneshot::channel::<()>();
+ let (writer, supervisor) = SupervisorHandle::new(
+ s,
+ shutdown_send,
+ jid.clone(),
+ self.password.clone(),
+ self.logic.clone(),
+ );
+
+ let shutdown_recv = shutdown_recv.fuse();
+ self.connection_supervisor_shutdown = shutdown_recv;
+
+ let connected = Connected {
+ jid,
+ write_handle: writer,
+ };
+
+ self.logic.clone().handle_connect(connected.clone()).await;
+
+ self.connected = Some((connected, supervisor));
+ }
+ Err(e) => {
+ tracing::error!("error: {}", e);
+ self.logic
+ .clone()
+ .handle_connection_error(ConnectionError::ConnectionFailed(
+ e.into(),
+ ))
+ .await;
+ }
+ }
+ }
+ };
+ }
+ CoreClientCommand::Disconnect => match self.connected {
+ None => {
+ self.logic
+ .clone()
+ .handle_connection_error(ConnectionError::AlreadyDisconnected)
+ .await;
+ }
+ ref mut c => {
+ if let Some((connected, supervisor_handle)) = c.take() {
+ let _ = supervisor_handle.send(SupervisorCommand::Disconnect).await;
+ } else {
+ unreachable!()
+ };
+ }
+ },
+ CoreClientCommand::Command(command) => {
+ match self.connected.as_ref() {
+ Some((w, s)) => self
+ .tasks
+ .spawn(self.logic.clone().handle_online(command, w.clone())),
+ None => self.tasks.spawn(self.logic.clone().handle_offline(command)),
+ };
+ }
+ }
+ }
+ }
+}
+
+// TODO: generate methods for each with a macro
+pub enum CoreClientCommand<C> {
+ // TODO: login invisible xep-0186
+ /// connect to XMPP chat server. gets roster and publishes initial presence.
+ Connect,
+ /// disconnect from XMPP chat server, sending unavailable presence then closing stream.
+ Disconnect,
+ /// TODO: generics
+ Command(C),
+}