diff options
author | 2025-03-26 14:29:40 +0000 | |
---|---|---|
committer | 2025-03-26 14:29:40 +0000 | |
commit | 2211f324782cdc617b4b5ecd071178e372539fe4 (patch) | |
tree | a5ea5ce11d748424447dee23173d3cb8aec648ea /lampada/src | |
parent | 2f8671978e18c1e1e7834056ae674f32fbde3868 (diff) | |
download | luz-2211f324782cdc617b4b5ecd071178e372539fe4.tar.gz luz-2211f324782cdc617b4b5ecd071178e372539fe4.tar.bz2 luz-2211f324782cdc617b4b5ecd071178e372539fe4.zip |
refactor: rename crates and move client logic to separate crate `filament`
Diffstat (limited to '')
-rw-r--r-- | lampada/src/connection/mod.rs (renamed from luz/src/connection/mod.rs) | 19 | ||||
-rw-r--r-- | lampada/src/connection/read.rs (renamed from luz/src/connection/read.rs) | 15 | ||||
-rw-r--r-- | lampada/src/connection/write.rs (renamed from luz/src/connection/write.rs) | 2 | ||||
-rw-r--r-- | lampada/src/error.rs | 82 | ||||
-rw-r--r-- | lampada/src/lib.rs | 238 | ||||
-rw-r--r-- | lampada/src/main.rs (renamed from luz/src/main.rs) | 6 |
6 files changed, 337 insertions, 25 deletions
diff --git a/luz/src/connection/mod.rs b/lampada/src/connection/mod.rs index 288de70..1e767b0 100644 --- a/luz/src/connection/mod.rs +++ b/lampada/src/connection/mod.rs @@ -7,8 +7,8 @@ use std::{ time::Duration, }; -use jabber::{connection::Tls, jabber_stream::bound_stream::BoundJabberStream}; use jid::JID; +use luz::{connection::Tls, jabber_stream::bound_stream::BoundJabberStream}; use read::{ReadControl, ReadControlHandle, ReadState}; use stanza::client::Stanza; use tokio::{ @@ -19,9 +19,8 @@ use tracing::info; use write::{WriteControl, WriteControlHandle, WriteHandle, WriteMessage, WriteState}; use crate::{ - db::Db, - error::{ConnectionError, Error, ReadError, WriteError}, - Connected, Logic, LogicState, UpdateMessage, + error::{ConnectionError, WriteError}, + Connected, Logic, }; mod read; @@ -84,7 +83,9 @@ impl<Lgc: Logic + Clone + Send + 'static> Supervisor<Lgc> { match msg { SupervisorCommand::Disconnect => { info!("disconnecting"); - // TODO: do handle_disconnect here + self.logic + .handle_disconnect(self.connected.clone()) + .await; let _ = self.write_control_handle.send(WriteControl::Disconnect).await; let _ = self.read_control_handle.send(ReadControl::Disconnect).await; info!("sent disconnect command"); @@ -108,11 +109,11 @@ impl<Lgc: Logic + Clone + Send + 'static> Supervisor<Lgc> { // send abort to read stream, as already done, consider let (read_state, mut write_state); match state { - // TODO: proper state things for read and write thread ChildState::Write(receiver) => { write_state = receiver; let (send, recv) = oneshot::channel(); let _ = self.read_control_handle.send(ReadControl::Abort(send)).await; + // TODO: need a tokio select, in case the state arrives from somewhere else if let Ok(state) = recv.await { read_state = state; } else { @@ -135,7 +136,7 @@ impl<Lgc: Logic + Clone + Send + 'static> Supervisor<Lgc> { let mut jid = self.connected.jid.clone(); let mut domain = jid.domainpart.clone(); // TODO: make sure connect_and_login does not modify the jid, but instead returns a jid. or something like that - let connection = jabber::connect_and_login(&mut jid, &*self.password, &mut domain).await; + let connection = luz::connect_and_login(&mut jid, &*self.password, &mut domain).await; match connection { Ok(c) => { let (read, write) = c.split(); @@ -182,7 +183,7 @@ impl<Lgc: Logic + Clone + Send + 'static> Supervisor<Lgc> { let mut jid = self.connected.jid.clone(); let mut domain = jid.domainpart.clone(); // TODO: same here - let connection = jabber::connect_and_login(&mut jid, &*self.password, &mut domain).await; + let connection = luz::connect_and_login(&mut jid, &*self.password, &mut domain).await; match connection { Ok(c) => { let (read, write) = c.split(); @@ -226,7 +227,7 @@ impl<Lgc: Logic + Clone + Send + 'static> Supervisor<Lgc> { let mut jid = self.connected.jid.clone(); let mut domain = jid.domainpart.clone(); - let connection = jabber::connect_and_login(&mut jid, &*self.password, &mut domain).await; + let connection = luz::connect_and_login(&mut jid, &*self.password, &mut domain).await; match connection { Ok(c) => { let (read, write) = c.split(); diff --git a/luz/src/connection/read.rs b/lampada/src/connection/read.rs index 4e55bc5..cc69387 100644 --- a/luz/src/connection/read.rs +++ b/lampada/src/connection/read.rs @@ -7,24 +7,15 @@ use std::{ time::Duration, }; -use chrono::{DateTime, Utc}; -use jabber::{connection::Tls, jabber_stream::bound_stream::BoundJabberReader}; +use luz::{connection::Tls, jabber_stream::bound_stream::BoundJabberReader}; use stanza::client::Stanza; use tokio::{ sync::{mpsc, oneshot, Mutex}, task::{JoinHandle, JoinSet}, }; use tracing::info; -use uuid::Uuid; - -use crate::{ - chat::{Body, Message}, - db::Db, - error::{Error, IqError, MessageRecvError, PresenceError, ReadError, RosterError}, - presence::{Offline, Online, Presence, PresenceType, Show}, - roster::Contact, - Connected, Logic, LogicState, UpdateMessage, -}; + +use crate::{Connected, Logic}; use super::{write::WriteHandle, SupervisorCommand, SupervisorSender}; diff --git a/luz/src/connection/write.rs b/lampada/src/connection/write.rs index ff78b81..8f0c34b 100644 --- a/luz/src/connection/write.rs +++ b/lampada/src/connection/write.rs @@ -1,6 +1,6 @@ use std::ops::{Deref, DerefMut}; -use jabber::{connection::Tls, jabber_stream::bound_stream::BoundJabberWriter}; +use luz::{connection::Tls, jabber_stream::bound_stream::BoundJabberWriter}; use stanza::client::Stanza; use tokio::{ sync::{mpsc, oneshot}, diff --git a/lampada/src/error.rs b/lampada/src/error.rs new file mode 100644 index 0000000..cdfb4db --- /dev/null +++ b/lampada/src/error.rs @@ -0,0 +1,82 @@ +use std::sync::Arc; + +use stanza::client::Stanza; +use thiserror::Error; +use tokio::{ + sync::{mpsc::error::SendError, oneshot::error::RecvError}, + time::error::Elapsed, +}; + +#[derive(Debug, Error, Clone)] +pub enum ConnectionError { + #[error("connection failed: {0}")] + ConnectionFailed(#[from] luz::Error), + #[error("already connected")] + AlreadyConnected, + #[error("already disconnected")] + AlreadyDisconnected, + #[error("lost connection")] + LostConnection, + // TODO: Display for Content + #[error("disconnected")] + Disconnected, +} + +#[derive(Debug, Error, Clone)] +pub enum CommandError<T> { + #[error("actor: {0}")] + Actor(ActorError), + #[error("{0}")] + Error(#[from] T), +} + +#[derive(Debug, Error, Clone)] +pub enum WriteError { + #[error("xml: {0}")] + XML(#[from] peanuts::Error), + #[error("lost connection")] + LostConnection, + // TODO: should this be in writeerror or separate? + #[error("actor: {0}")] + Actor(#[from] ActorError), + #[error("disconnected")] + Disconnected, +} + +// TODO: separate peanuts read and write error? +// TODO: which crate +#[derive(Debug, Error, Clone)] +pub enum ReadError { + #[error("xml: {0}")] + XML(#[from] peanuts::Error), + #[error("lost connection")] + LostConnection, +} + +#[derive(Debug, Error, Clone)] +pub enum ActorError { + #[error("receive timed out")] + Timeout, + #[error("could not send message to actor, channel closed")] + Send, + #[error("could not receive message from actor, channel closed")] + Receive, +} + +impl From<Elapsed> for ActorError { + fn from(_e: Elapsed) -> Self { + Self::Timeout + } +} + +impl<T> From<SendError<T>> for ActorError { + fn from(_e: SendError<T>) -> Self { + Self::Send + } +} + +impl From<RecvError> for ActorError { + fn from(_e: RecvError) -> Self { + Self::Receive + } +} diff --git a/lampada/src/lib.rs b/lampada/src/lib.rs new file mode 100644 index 0000000..c61c596 --- /dev/null +++ b/lampada/src/lib.rs @@ -0,0 +1,238 @@ +use std::{ + collections::HashMap, + ops::{Deref, DerefMut}, + str::FromStr, + sync::Arc, + time::Duration, +}; + +pub use connection::write::WriteMessage; +pub use connection::SupervisorSender; +use error::ConnectionError; +use futures::{future::Fuse, FutureExt}; +use luz::JID; +use stanza::client::{ + iq::{self, Iq, IqType}, + Stanza, +}; +use tokio::{ + sync::{mpsc, oneshot, Mutex}, + task::JoinSet, + time::timeout, +}; +use tracing::{debug, info}; + +use crate::connection::write::WriteHandle; +use crate::connection::{SupervisorCommand, SupervisorHandle}; + +mod connection; +pub mod error; + +#[derive(Clone)] +pub struct Connected { + // full jid will stay stable across reconnections + jid: JID, + write_handle: WriteHandle, +} + +impl Connected { + pub fn jid(&self) -> &JID { + &self.jid + } + + pub fn write_handle(&self) -> &WriteHandle { + &self.write_handle + } +} + +/// everything that a particular xmpp client must implement +pub trait Logic { + /// the command message type + type Cmd; + + /// run after binding to the stream (e.g. for a chat client, ) + fn handle_connect(self, connection: Connected) -> impl std::future::Future<Output = ()> + Send; + + /// run before closing the stream (e.g. send unavailable presence in a chat client) + fn handle_disconnect( + self, + connection: Connected, + ) -> impl std::future::Future<Output = ()> + Send; + + /// run to handle an incoming xmpp stanza + fn handle_stanza( + self, + stanza: Stanza, + connection: Connected, + supervisor: SupervisorSender, + ) -> impl std::future::Future<Output = ()> + std::marker::Send; + + /// run to handle a command message when a connection is currently established + fn handle_online( + self, + command: Self::Cmd, + connection: Connected, + ) -> impl std::future::Future<Output = ()> + std::marker::Send; + + /// run to handle a command message when disconnected + fn handle_offline( + self, + command: Self::Cmd, + ) -> impl std::future::Future<Output = ()> + std::marker::Send; + + /// run as cleanup after either an abort or a disconnect (e.g. reply to all pending requests with a disconnected error) + fn on_abort(self) -> impl std::future::Future<Output = ()> + std::marker::Send; + + /// handle connection errors from the core client logic + fn handle_connection_error( + self, + error: ConnectionError, + ) -> impl std::future::Future<Output = ()> + std::marker::Send; + + // async fn handle_stream_error(self, error) {} +} + +/// an actor that implements xmpp core (rfc6120), manages connection/stream status, and delegates any other logic to the generic which implements Logic, allowing different kinds of clients (e.g. chat, social, pubsub) to be built upon the same core +pub struct CoreClient<Lgc: Logic> { + jid: JID, + // TODO: use a dyn passwordprovider trait to avoid storing password in memory + password: Arc<String>, + receiver: mpsc::Receiver<CoreClientCommand<Lgc::Cmd>>, + connected: Option<(Connected, SupervisorHandle)>, + // TODO: will need to have an auto reconnect state as well (e.g. in case server shut down, to try and reconnect later) + // connected_intention: bool, + /// if connection was shut down due to e.g. server shutdown, supervisor must be able to mark client as disconnected + connection_supervisor_shutdown: Fuse<oneshot::Receiver<()>>, + logic: Lgc, + // config: LampConfig, + // TODO: will grow forever at this point, maybe not required as tasks will naturally shut down anyway? + tasks: JoinSet<()>, +} + +impl<Lgc: Logic + Clone + Send + 'static> CoreClient<Lgc> { + /// create a new actor + pub fn new( + jid: JID, + password: String, + receiver: mpsc::Receiver<CoreClientCommand<Lgc::Cmd>>, + connected: Option<(Connected, SupervisorHandle)>, + connection_supervisor_shutdown: Fuse<oneshot::Receiver<()>>, + logic: Lgc, + ) -> Self { + Self { + jid, + password: Arc::new(password), + connected, + receiver, + connection_supervisor_shutdown, + logic, + tasks: JoinSet::new(), + } + } + + /// run the actor + pub async fn run(mut self) { + loop { + let msg = tokio::select! { + // this is okay, as when created the supervisor (and connection) doesn't exist, but a bit messy + // THIS IS NOT OKAY LOLLLL - apparently fusing is the best option??? + _ = &mut self.connection_supervisor_shutdown => { + self.connected = None; + continue; + } + Some(msg) = self.receiver.recv() => { + msg + }, + else => break, + }; + match msg { + CoreClientCommand::Connect => { + match self.connected { + Some(_) => { + self.logic + .clone() + .handle_connection_error(ConnectionError::AlreadyConnected) + .await; + } + None => { + let mut jid = self.jid.clone(); + let mut domain = jid.domainpart.clone(); + // TODO: check what happens upon reconnection with same resource (this is probably what one wants to do and why jid should be mutated from a bare jid to one with a resource) + let streams_result = + luz::connect_and_login(&mut jid, &*self.password, &mut domain) + .await; + match streams_result { + Ok(s) => { + debug!("ok stream result"); + let (shutdown_send, shutdown_recv) = oneshot::channel::<()>(); + let (writer, supervisor) = SupervisorHandle::new( + s, + shutdown_send, + jid.clone(), + self.password.clone(), + self.logic.clone(), + ); + + let shutdown_recv = shutdown_recv.fuse(); + self.connection_supervisor_shutdown = shutdown_recv; + + let connected = Connected { + jid, + write_handle: writer, + }; + + self.logic.clone().handle_connect(connected.clone()).await; + + self.connected = Some((connected, supervisor)); + } + Err(e) => { + tracing::error!("error: {}", e); + self.logic + .clone() + .handle_connection_error(ConnectionError::ConnectionFailed( + e.into(), + )) + .await; + } + } + } + }; + } + CoreClientCommand::Disconnect => match self.connected { + None => { + self.logic + .clone() + .handle_connection_error(ConnectionError::AlreadyDisconnected) + .await; + } + ref mut c => { + if let Some((connected, supervisor_handle)) = c.take() { + let _ = supervisor_handle.send(SupervisorCommand::Disconnect).await; + } else { + unreachable!() + }; + } + }, + CoreClientCommand::Command(command) => { + match self.connected.as_ref() { + Some((w, s)) => self + .tasks + .spawn(self.logic.clone().handle_online(command, w.clone())), + None => self.tasks.spawn(self.logic.clone().handle_offline(command)), + }; + } + } + } + } +} + +// TODO: generate methods for each with a macro +pub enum CoreClientCommand<C> { + // TODO: login invisible xep-0186 + /// connect to XMPP chat server. gets roster and publishes initial presence. + Connect, + /// disconnect from XMPP chat server, sending unavailable presence then closing stream. + Disconnect, + /// TODO: generics + Command(C), +} diff --git a/luz/src/main.rs b/lampada/src/main.rs index 5aeef14..7b7469d 100644 --- a/luz/src/main.rs +++ b/lampada/src/main.rs @@ -1,7 +1,7 @@ use std::{path::Path, str::FromStr, time::Duration}; use jid::JID; -use luz::{db::Db, LuzHandle, LuzMessage}; +use lampada::{db::Db, CoreClientCommand, LuzHandle}; use sqlx::SqlitePool; use tokio::{ io::{AsyncReadExt, AsyncWriteExt}, @@ -24,11 +24,11 @@ async fn main() { } }); - luz.send(LuzMessage::Connect).await.unwrap(); + luz.send(CoreClientCommand::Connect).await.unwrap(); let (send, recv) = oneshot::channel(); tokio::time::sleep(Duration::from_secs(5)).await; info!("sending message"); - luz.send(LuzMessage::SendMessage( + luz.send(CoreClientCommand::SendMessage( JID::from_str("cel@blos.sm").unwrap(), luz::chat::Body { body: "hallo!!!".to_string(), |