diff options
author | 2025-03-26 01:51:21 +0000 | |
---|---|---|
committer | 2025-03-26 01:51:21 +0000 | |
commit | 2f8671978e18c1e1e7834056ae674f32fbde3868 (patch) | |
tree | 8f4892b3d00bd03c57a93ab8cf2978d1f465d866 | |
parent | 5272456beab233acb53a140db6a85da5f4ccd2d2 (diff) | |
download | luz-2f8671978e18c1e1e7834056ae674f32fbde3868.tar.gz luz-2f8671978e18c1e1e7834056ae674f32fbde3868.tar.bz2 luz-2f8671978e18c1e1e7834056ae674f32fbde3868.zip |
refactor(luz): genericize logic into trait
-rw-r--r-- | luz/src/connection/mod.rs | 22 | ||||
-rw-r--r-- | luz/src/connection/read.rs | 27 | ||||
-rw-r--r-- | luz/src/error.rs | 37 | ||||
-rw-r--r-- | luz/src/lib.rs | 183 |
4 files changed, 131 insertions, 138 deletions
diff --git a/luz/src/connection/mod.rs b/luz/src/connection/mod.rs index e209d47..288de70 100644 --- a/luz/src/connection/mod.rs +++ b/luz/src/connection/mod.rs @@ -20,14 +20,14 @@ use write::{WriteControl, WriteControlHandle, WriteHandle, WriteMessage, WriteSt use crate::{ db::Db, - error::{Error, ReadError, WriteError}, - Connected, LogicState, UpdateMessage, + error::{ConnectionError, Error, ReadError, WriteError}, + Connected, Logic, LogicState, UpdateMessage, }; mod read; pub(crate) mod write; -pub struct Supervisor { +pub struct Supervisor<Lgc> { command_recv: mpsc::Receiver<SupervisorCommand>, reader_crash: oneshot::Receiver<ReadState>, writer_crash: oneshot::Receiver<(WriteMessage, WriteState)>, @@ -37,7 +37,7 @@ pub struct Supervisor { // jid in connected stays the same over the life of the supervisor (the connection session) connected: Connected, password: Arc<String>, - logic: LogicState, + logic: Lgc, } pub enum SupervisorCommand { @@ -52,7 +52,7 @@ pub enum ChildState { Read(ReadState), } -impl Supervisor { +impl<Lgc: Logic + Clone + Send + 'static> Supervisor<Lgc> { fn new( command_recv: mpsc::Receiver<SupervisorCommand>, reader_crash: oneshot::Receiver<ReadState>, @@ -62,7 +62,7 @@ impl Supervisor { on_crash: oneshot::Sender<()>, connected: Connected, password: Arc<String>, - logic: LogicState, + logic: Lgc, ) -> Self { Self { command_recv, @@ -161,7 +161,7 @@ impl Supervisor { let _ = msg.respond_to.send(Err(WriteError::LostConnection)); } // TODO: is this the correct error? - let _ = self.logic.update_sender.send(UpdateMessage::Error(Error::LostConnection)).await; + self.logic.handle_connection_error(ConnectionError::LostConnection).await; break; }, } @@ -209,7 +209,7 @@ impl Supervisor { let _ = msg.respond_to.send(Err(WriteError::LostConnection)); } // TODO: is this the correct error to send? - let _ = self.logic.update_sender.send(UpdateMessage::Error(Error::LostConnection)).await; + self.logic.handle_connection_error(ConnectionError::LostConnection).await; break; }, } @@ -259,7 +259,7 @@ impl Supervisor { msg.respond_to.send(Err(WriteError::LostConnection)); } // TODO: is this the correct error? - let _ = self.logic.update_sender.send(UpdateMessage::Error(Error::LostConnection)).await; + self.logic.handle_connection_error(ConnectionError::LostConnection).await; break; }, } @@ -311,12 +311,12 @@ impl DerefMut for SupervisorSender { } impl SupervisorHandle { - pub fn new( + pub fn new<Lgc: Logic + Clone + Send + 'static>( streams: BoundJabberStream<Tls>, on_crash: oneshot::Sender<()>, jid: JID, password: Arc<String>, - logic: LogicState, + logic: Lgc, ) -> (WriteHandle, Self) { let (command_send, command_recv) = mpsc::channel(20); let (writer_crash_send, writer_crash_recv) = oneshot::channel(); diff --git a/luz/src/connection/read.rs b/luz/src/connection/read.rs index d310a12..4e55bc5 100644 --- a/luz/src/connection/read.rs +++ b/luz/src/connection/read.rs @@ -1,5 +1,6 @@ use std::{ collections::HashMap, + marker::PhantomData, ops::{Deref, DerefMut}, str::FromStr, sync::Arc, @@ -22,13 +23,13 @@ use crate::{ error::{Error, IqError, MessageRecvError, PresenceError, ReadError, RosterError}, presence::{Offline, Online, Presence, PresenceType, Show}, roster::Contact, - Connected, LogicState, UpdateMessage, + Connected, Logic, LogicState, UpdateMessage, }; use super::{write::WriteHandle, SupervisorCommand, SupervisorSender}; /// read actor -pub struct Read { +pub struct Read<Lgc> { stream: BoundJabberReader<Tls>, disconnecting: bool, disconnect_timedout: oneshot::Receiver<()>, @@ -39,7 +40,7 @@ pub struct Read { // for handling incoming stanzas // jabber server must be able to both terminate the connection from error, and ask for data from the client (such as supported XEPs) connected: Connected, - logic: LogicState, + logic: Lgc, supervisor_control: SupervisorSender, // control stuff @@ -54,12 +55,12 @@ pub struct ReadState { pub tasks: JoinSet<()>, } -impl Read { +impl<Lgc> Read<Lgc> { fn new( stream: BoundJabberReader<Tls>, tasks: JoinSet<()>, connected: Connected, - logic: LogicState, + logic: Lgc, supervisor_control: SupervisorSender, control_receiver: mpsc::Receiver<ReadControl>, on_crash: oneshot::Sender<ReadState>, @@ -77,7 +78,9 @@ impl Read { on_crash, } } +} +impl<Lgc: Clone + Logic + Send + 'static> Read<Lgc> { async fn run(mut self) { println!("started read thread"); // let stanza = self.stream.read::<Stanza>().await; @@ -149,11 +152,7 @@ impl Read { } } println!("stopping read thread"); - // when it aborts, must clear iq map no matter what - let mut iqs = self.logic.pending.lock().await; - for (_id, sender) in iqs.drain() { - let _ = sender.send(Err(ReadError::LostConnection)); - } + self.logic.on_abort().await; } } @@ -188,10 +187,10 @@ impl DerefMut for ReadControlHandle { } impl ReadControlHandle { - pub fn new( + pub fn new<Lgc: Clone + Logic + Send + 'static>( stream: BoundJabberReader<Tls>, connected: Connected, - logic: LogicState, + logic: Lgc, supervisor_control: SupervisorSender, on_crash: oneshot::Sender<ReadState>, ) -> Self { @@ -214,11 +213,11 @@ impl ReadControlHandle { } } - pub fn reconnect( + pub fn reconnect<Lgc: Clone + Logic + Send + 'static>( stream: BoundJabberReader<Tls>, tasks: JoinSet<()>, connected: Connected, - logic: LogicState, + logic: Lgc, supervisor_control: SupervisorSender, on_crash: oneshot::Sender<ReadState>, ) -> Self { diff --git a/luz/src/error.rs b/luz/src/error.rs index dbe3e25..46f45a8 100644 --- a/luz/src/error.rs +++ b/luz/src/error.rs @@ -8,12 +8,32 @@ use tokio::{ }; #[derive(Debug, Error, Clone)] -pub enum Error { +pub enum ConnectionError { + #[error("connection failed: {0}")] + ConnectionFailed(#[from] jabber::Error), #[error("already connected")] AlreadyConnected, + #[error("already disconnected")] + AlreadyDisconnected, + #[error("lost connection")] + LostConnection, + // TODO: Display for Content + #[error("disconnected")] + Disconnected, +} + +// for the client logic impl +#[derive(Debug, Error, Clone)] +pub enum Error { + #[error("core error: {0}")] + Connection(#[from] ConnectionError), + #[error("received unrecognized/unsupported content: {0:?}")] + UnrecognizedContent(peanuts::element::Content), + #[error("iq receive error: {0}")] + Iq(IqError), // TODO: change to Connecting(ConnectingError) #[error("connecting: {0}")] - Connecting(#[from] ConnectionError), + Connecting(#[from] ConnectionJobError), #[error("presence: {0}")] Presence(#[from] PresenceError), #[error("set status: {0}")] @@ -27,17 +47,6 @@ pub enum Error { MessageSend(MessageSendError), #[error("message receive error: {0}")] MessageRecv(MessageRecvError), - #[error("already disconnected")] - AlreadyDisconnected, - #[error("lost connection")] - LostConnection, - // TODO: Display for Content - #[error("received unrecognized/unsupported content: {0:?}")] - UnrecognizedContent(peanuts::element::Content), - #[error("iq receive error: {0}")] - Iq(IqError), - #[error("disconnected")] - Disconnected, } #[derive(Debug, Error, Clone)] @@ -80,7 +89,7 @@ pub enum MessageRecvError { } #[derive(Debug, Clone, Error)] -pub enum ConnectionError { +pub enum ConnectionJobError { #[error("connection failed: {0}")] ConnectionFailed(#[from] jabber::Error), #[error("failed roster retreival: {0}")] diff --git a/luz/src/lib.rs b/luz/src/lib.rs index 3498ff1..b9c482c 100644 --- a/luz/src/lib.rs +++ b/luz/src/lib.rs @@ -11,8 +11,8 @@ use chrono::Utc; use connection::{write::WriteMessage, SupervisorSender}; use db::Db; use error::{ - ActorError, CommandError, ConnectionError, DatabaseError, IqError, MessageRecvError, - PresenceError, ReadError, RosterError, StatusError, WriteError, + ActorError, CommandError, ConnectionError, ConnectionJobError, DatabaseError, IqError, + MessageRecvError, PresenceError, ReadError, RosterError, StatusError, WriteError, }; use futures::{future::Fuse, FutureExt}; use jabber::JID; @@ -103,7 +103,7 @@ pub enum Command { #[derive(Debug)] pub struct Client { - sender: mpsc::Sender<LuzMessage>, + sender: mpsc::Sender<LuzMessage<Command>>, timeout: Duration, } @@ -117,7 +117,7 @@ impl Clone for Client { } impl Deref for Client { - type Target = mpsc::Sender<LuzMessage>; + type Target = mpsc::Sender<LuzMessage<Command>>; fn deref(&self) -> &Self::Target { &self.sender @@ -155,7 +155,8 @@ impl Client { update_sender: update_send, }; - let actor = Luz::new(jid, password, command_receiver, None, sup_recv, logic); + let actor: Luz<LogicState> = + Luz::new(jid, password, command_receiver, None, sup_recv, logic); tokio::spawn(async move { actor.run().await }); ( @@ -448,8 +449,10 @@ pub struct LogicState { update_sender: mpsc::Sender<UpdateMessage>, } -impl LogicState { - pub async fn handle_connect(self, connection: Connected) { +impl Logic for LogicState { + type Cmd = Command; + + async fn handle_connect(self, connection: Connected) { let (send, recv) = oneshot::channel(); debug!("getting roster"); self.clone() @@ -468,7 +471,7 @@ impl LogicState { let _ = self .update_sender .send(UpdateMessage::Error(Error::Connecting( - ConnectionError::StatusCacheError(e.into()), + ConnectionJobError::StatusCacheError(e.into()), ))) .await; Online::default() @@ -501,7 +504,7 @@ impl LogicState { let _ = self .update_sender .send(UpdateMessage::Error(Error::Connecting( - ConnectionError::SendPresence(WriteError::Actor(e.into())), + ConnectionJobError::SendPresence(WriteError::Actor(e.into())), ))) .await; } @@ -518,7 +521,7 @@ impl LogicState { let _ = self .update_sender .send(UpdateMessage::Error(Error::Connecting( - ConnectionError::RosterRetreival(RosterError::Write(WriteError::Actor( + ConnectionJobError::RosterRetreival(RosterError::Write(WriteError::Actor( e.into(), ))), ))) @@ -527,7 +530,7 @@ impl LogicState { } } - pub async fn handle_disconnect(self, connection: Connected) { + async fn handle_disconnect(self, connection: Connected) { // TODO: be able to set offline status message let offline_presence: stanza::client::presence::Presence = Offline::default().into_stanza(None); @@ -540,10 +543,7 @@ impl LogicState { .await; } - /// stanza errors (recoverable) - pub async fn handle_error(self, error: Error) {} - - pub async fn handle_stanza( + async fn handle_stanza( self, stanza: Stanza, connection: Connected, @@ -790,8 +790,7 @@ impl LogicState { } } - // pub async fn handle_stream_error(self, error) {} - pub async fn handle_online(self, command: Command, connection: Connected) { + async fn handle_online(self, command: Command, connection: Connected) { match command { Command::GetRoster(result_sender) => { // TODO: jid resource should probably be stored within the connection @@ -1438,7 +1437,7 @@ impl LogicState { } } - pub async fn handle_offline(self, command: Command) { + async fn handle_offline(self, command: Command) { match command { Command::GetRoster(sender) => { let roster = self.db.read_cached_roster().await; @@ -1539,6 +1538,24 @@ impl LogicState { } } } + // pub async fn handle_stream_error(self, error) {} + // stanza errors (recoverable) + // pub async fn handle_error(self, error: Error) {} + // when it aborts, must clear iq map no matter what + async fn on_abort(self) { + let mut iqs = self.pending.lock().await; + for (_id, sender) in iqs.drain() { + let _ = sender.send(Err(ReadError::LostConnection)); + } + } + + async fn handle_connection_error(self, error: ConnectionError) { + self.update_sender + .send(UpdateMessage::Error( + ConnectionError::AlreadyConnected.into(), + )) + .await; + } } #[derive(Clone)] @@ -1548,12 +1565,42 @@ pub struct Connected { write_handle: WriteHandle, } -pub trait Logic {} +pub trait Logic { + type Cmd; + + fn handle_connect(self, connection: Connected) -> impl std::future::Future<Output = ()> + Send; + fn handle_disconnect( + self, + connection: Connected, + ) -> impl std::future::Future<Output = ()> + Send; + fn handle_stanza( + self, + stanza: Stanza, + connection: Connected, + supervisor: SupervisorSender, + ) -> impl std::future::Future<Output = ()> + std::marker::Send; + fn handle_online( + self, + command: Self::Cmd, + connection: Connected, + ) -> impl std::future::Future<Output = ()> + std::marker::Send; + fn handle_offline( + self, + command: Self::Cmd, + ) -> impl std::future::Future<Output = ()> + std::marker::Send; + fn on_abort(self) -> impl std::future::Future<Output = ()> + std::marker::Send; + // TODO: look at these + fn handle_connection_error( + self, + error: ConnectionError, + ) -> impl std::future::Future<Output = ()> + std::marker::Send; + // async fn handle_stream_error(self, error) {} +} -pub struct Luz { +pub struct Luz<Lgc: Logic> { jid: JID, password: Arc<String>, - receiver: mpsc::Receiver<LuzMessage>, + receiver: mpsc::Receiver<LuzMessage<Lgc::Cmd>>, // TODO: use a dyn passwordprovider trait to avoid storing password in memory connected: Option<(Connected, SupervisorHandle)>, // connected_intention: bool, @@ -1562,19 +1609,19 @@ pub struct Luz { // TODO: will need to have an auto reconnect state as well (e.g. in case server shut down, to try and reconnect later) // TODO: will grow forever at this point, maybe not required as tasks will naturally shut down anyway? // TODO: genericize - logic: LogicState, + logic: Lgc, // config: LampConfig, tasks: JoinSet<()>, } -impl Luz { +impl<Lgc: Logic + Clone + Send + 'static> Luz<Lgc> { fn new( jid: JID, password: String, - receiver: mpsc::Receiver<LuzMessage>, + receiver: mpsc::Receiver<LuzMessage<Lgc::Cmd>>, connected: Option<(Connected, SupervisorHandle)>, connection_supervisor_shutdown: Fuse<oneshot::Receiver<()>>, - logic: LogicState, + logic: Lgc, ) -> Self { Self { jid, @@ -1609,8 +1656,7 @@ impl Luz { Some(_) => { self.logic .clone() - .update_sender - .send(UpdateMessage::Error(Error::AlreadyConnected)) + .handle_connection_error(ConnectionError::AlreadyConnected) .await; } None => { @@ -1646,11 +1692,12 @@ impl Luz { } Err(e) => { tracing::error!("error: {}", e); - let _ = self.logic.clone().update_sender.send( - UpdateMessage::Error(Error::Connecting( - ConnectionError::ConnectionFailed(e.into()), - )), - ); + self.logic + .clone() + .handle_connection_error(ConnectionError::ConnectionFailed( + e.into(), + )) + .await; } } } @@ -1658,10 +1705,9 @@ impl Luz { } LuzMessage::Disconnect => match self.connected { None => { - let _ = self - .logic - .update_sender - .send(UpdateMessage::Error(Error::AlreadyDisconnected)) + self.logic + .clone() + .handle_connection_error(ConnectionError::AlreadyDisconnected) .await; } ref mut c => { @@ -1691,76 +1737,15 @@ impl Luz { } } -// TODO: separate sender and receiver, store handle to Luz process to ensure dropping -// #[derive(Clone)] -#[derive(Debug)] -pub struct LuzHandle { - sender: mpsc::Sender<LuzMessage>, - timeout: Duration, -} - -impl Clone for LuzHandle { - fn clone(&self) -> Self { - Self { - sender: self.sender.clone(), - timeout: self.timeout, - } - } -} - -impl Deref for LuzHandle { - type Target = mpsc::Sender<LuzMessage>; - - fn deref(&self) -> &Self::Target { - &self.sender - } -} - -impl DerefMut for LuzHandle { - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.sender - } -} - -impl LuzHandle { - // TODO: database creation separate - pub fn new(jid: JID, password: String, logic: LogicState) -> Self { - let (command_sender, command_receiver) = mpsc::channel(20); - - // might be bad, first supervisor shutdown notification oneshot is never used (disgusting) - let (_sup_send, sup_recv) = oneshot::channel(); - let sup_recv = sup_recv.fuse(); - - let actor = Luz::new(jid, password, command_receiver, None, sup_recv, logic); - tokio::spawn(async move { actor.run().await }); - - Self { - sender: command_sender, - // TODO: configure timeout - timeout: Duration::from_secs(10), - } - } - - pub async fn connect(&self) -> Result<(), ActorError> { - self.send(LuzMessage::Connect).await?; - Ok(()) - } - - pub async fn disconnect(&self, offline: Offline) -> Result<(), ActorError> { - self.send(LuzMessage::Disconnect).await?; - Ok(()) - } -} - // TODO: generate methods for each with a macro -pub enum LuzMessage { +pub enum LuzMessage<C> { // TODO: login invisible xep-0186 /// connect to XMPP chat server. gets roster and publishes initial presence. Connect, /// disconnect from XMPP chat server, sending unavailable presence then closing stream. Disconnect, /// TODO: generics - Command(Command), + Command(C), } #[derive(Debug, Clone)] |