aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorLibravatar cel 🌸 <cel@bunny.garden>2025-03-26 01:51:21 +0000
committerLibravatar cel 🌸 <cel@bunny.garden>2025-03-26 01:51:21 +0000
commit2f8671978e18c1e1e7834056ae674f32fbde3868 (patch)
tree8f4892b3d00bd03c57a93ab8cf2978d1f465d866
parent5272456beab233acb53a140db6a85da5f4ccd2d2 (diff)
downloadluz-2f8671978e18c1e1e7834056ae674f32fbde3868.tar.gz
luz-2f8671978e18c1e1e7834056ae674f32fbde3868.tar.bz2
luz-2f8671978e18c1e1e7834056ae674f32fbde3868.zip
refactor(luz): genericize logic into trait
-rw-r--r--luz/src/connection/mod.rs22
-rw-r--r--luz/src/connection/read.rs27
-rw-r--r--luz/src/error.rs37
-rw-r--r--luz/src/lib.rs183
4 files changed, 131 insertions, 138 deletions
diff --git a/luz/src/connection/mod.rs b/luz/src/connection/mod.rs
index e209d47..288de70 100644
--- a/luz/src/connection/mod.rs
+++ b/luz/src/connection/mod.rs
@@ -20,14 +20,14 @@ use write::{WriteControl, WriteControlHandle, WriteHandle, WriteMessage, WriteSt
use crate::{
db::Db,
- error::{Error, ReadError, WriteError},
- Connected, LogicState, UpdateMessage,
+ error::{ConnectionError, Error, ReadError, WriteError},
+ Connected, Logic, LogicState, UpdateMessage,
};
mod read;
pub(crate) mod write;
-pub struct Supervisor {
+pub struct Supervisor<Lgc> {
command_recv: mpsc::Receiver<SupervisorCommand>,
reader_crash: oneshot::Receiver<ReadState>,
writer_crash: oneshot::Receiver<(WriteMessage, WriteState)>,
@@ -37,7 +37,7 @@ pub struct Supervisor {
// jid in connected stays the same over the life of the supervisor (the connection session)
connected: Connected,
password: Arc<String>,
- logic: LogicState,
+ logic: Lgc,
}
pub enum SupervisorCommand {
@@ -52,7 +52,7 @@ pub enum ChildState {
Read(ReadState),
}
-impl Supervisor {
+impl<Lgc: Logic + Clone + Send + 'static> Supervisor<Lgc> {
fn new(
command_recv: mpsc::Receiver<SupervisorCommand>,
reader_crash: oneshot::Receiver<ReadState>,
@@ -62,7 +62,7 @@ impl Supervisor {
on_crash: oneshot::Sender<()>,
connected: Connected,
password: Arc<String>,
- logic: LogicState,
+ logic: Lgc,
) -> Self {
Self {
command_recv,
@@ -161,7 +161,7 @@ impl Supervisor {
let _ = msg.respond_to.send(Err(WriteError::LostConnection));
}
// TODO: is this the correct error?
- let _ = self.logic.update_sender.send(UpdateMessage::Error(Error::LostConnection)).await;
+ self.logic.handle_connection_error(ConnectionError::LostConnection).await;
break;
},
}
@@ -209,7 +209,7 @@ impl Supervisor {
let _ = msg.respond_to.send(Err(WriteError::LostConnection));
}
// TODO: is this the correct error to send?
- let _ = self.logic.update_sender.send(UpdateMessage::Error(Error::LostConnection)).await;
+ self.logic.handle_connection_error(ConnectionError::LostConnection).await;
break;
},
}
@@ -259,7 +259,7 @@ impl Supervisor {
msg.respond_to.send(Err(WriteError::LostConnection));
}
// TODO: is this the correct error?
- let _ = self.logic.update_sender.send(UpdateMessage::Error(Error::LostConnection)).await;
+ self.logic.handle_connection_error(ConnectionError::LostConnection).await;
break;
},
}
@@ -311,12 +311,12 @@ impl DerefMut for SupervisorSender {
}
impl SupervisorHandle {
- pub fn new(
+ pub fn new<Lgc: Logic + Clone + Send + 'static>(
streams: BoundJabberStream<Tls>,
on_crash: oneshot::Sender<()>,
jid: JID,
password: Arc<String>,
- logic: LogicState,
+ logic: Lgc,
) -> (WriteHandle, Self) {
let (command_send, command_recv) = mpsc::channel(20);
let (writer_crash_send, writer_crash_recv) = oneshot::channel();
diff --git a/luz/src/connection/read.rs b/luz/src/connection/read.rs
index d310a12..4e55bc5 100644
--- a/luz/src/connection/read.rs
+++ b/luz/src/connection/read.rs
@@ -1,5 +1,6 @@
use std::{
collections::HashMap,
+ marker::PhantomData,
ops::{Deref, DerefMut},
str::FromStr,
sync::Arc,
@@ -22,13 +23,13 @@ use crate::{
error::{Error, IqError, MessageRecvError, PresenceError, ReadError, RosterError},
presence::{Offline, Online, Presence, PresenceType, Show},
roster::Contact,
- Connected, LogicState, UpdateMessage,
+ Connected, Logic, LogicState, UpdateMessage,
};
use super::{write::WriteHandle, SupervisorCommand, SupervisorSender};
/// read actor
-pub struct Read {
+pub struct Read<Lgc> {
stream: BoundJabberReader<Tls>,
disconnecting: bool,
disconnect_timedout: oneshot::Receiver<()>,
@@ -39,7 +40,7 @@ pub struct Read {
// for handling incoming stanzas
// jabber server must be able to both terminate the connection from error, and ask for data from the client (such as supported XEPs)
connected: Connected,
- logic: LogicState,
+ logic: Lgc,
supervisor_control: SupervisorSender,
// control stuff
@@ -54,12 +55,12 @@ pub struct ReadState {
pub tasks: JoinSet<()>,
}
-impl Read {
+impl<Lgc> Read<Lgc> {
fn new(
stream: BoundJabberReader<Tls>,
tasks: JoinSet<()>,
connected: Connected,
- logic: LogicState,
+ logic: Lgc,
supervisor_control: SupervisorSender,
control_receiver: mpsc::Receiver<ReadControl>,
on_crash: oneshot::Sender<ReadState>,
@@ -77,7 +78,9 @@ impl Read {
on_crash,
}
}
+}
+impl<Lgc: Clone + Logic + Send + 'static> Read<Lgc> {
async fn run(mut self) {
println!("started read thread");
// let stanza = self.stream.read::<Stanza>().await;
@@ -149,11 +152,7 @@ impl Read {
}
}
println!("stopping read thread");
- // when it aborts, must clear iq map no matter what
- let mut iqs = self.logic.pending.lock().await;
- for (_id, sender) in iqs.drain() {
- let _ = sender.send(Err(ReadError::LostConnection));
- }
+ self.logic.on_abort().await;
}
}
@@ -188,10 +187,10 @@ impl DerefMut for ReadControlHandle {
}
impl ReadControlHandle {
- pub fn new(
+ pub fn new<Lgc: Clone + Logic + Send + 'static>(
stream: BoundJabberReader<Tls>,
connected: Connected,
- logic: LogicState,
+ logic: Lgc,
supervisor_control: SupervisorSender,
on_crash: oneshot::Sender<ReadState>,
) -> Self {
@@ -214,11 +213,11 @@ impl ReadControlHandle {
}
}
- pub fn reconnect(
+ pub fn reconnect<Lgc: Clone + Logic + Send + 'static>(
stream: BoundJabberReader<Tls>,
tasks: JoinSet<()>,
connected: Connected,
- logic: LogicState,
+ logic: Lgc,
supervisor_control: SupervisorSender,
on_crash: oneshot::Sender<ReadState>,
) -> Self {
diff --git a/luz/src/error.rs b/luz/src/error.rs
index dbe3e25..46f45a8 100644
--- a/luz/src/error.rs
+++ b/luz/src/error.rs
@@ -8,12 +8,32 @@ use tokio::{
};
#[derive(Debug, Error, Clone)]
-pub enum Error {
+pub enum ConnectionError {
+ #[error("connection failed: {0}")]
+ ConnectionFailed(#[from] jabber::Error),
#[error("already connected")]
AlreadyConnected,
+ #[error("already disconnected")]
+ AlreadyDisconnected,
+ #[error("lost connection")]
+ LostConnection,
+ // TODO: Display for Content
+ #[error("disconnected")]
+ Disconnected,
+}
+
+// for the client logic impl
+#[derive(Debug, Error, Clone)]
+pub enum Error {
+ #[error("core error: {0}")]
+ Connection(#[from] ConnectionError),
+ #[error("received unrecognized/unsupported content: {0:?}")]
+ UnrecognizedContent(peanuts::element::Content),
+ #[error("iq receive error: {0}")]
+ Iq(IqError),
// TODO: change to Connecting(ConnectingError)
#[error("connecting: {0}")]
- Connecting(#[from] ConnectionError),
+ Connecting(#[from] ConnectionJobError),
#[error("presence: {0}")]
Presence(#[from] PresenceError),
#[error("set status: {0}")]
@@ -27,17 +47,6 @@ pub enum Error {
MessageSend(MessageSendError),
#[error("message receive error: {0}")]
MessageRecv(MessageRecvError),
- #[error("already disconnected")]
- AlreadyDisconnected,
- #[error("lost connection")]
- LostConnection,
- // TODO: Display for Content
- #[error("received unrecognized/unsupported content: {0:?}")]
- UnrecognizedContent(peanuts::element::Content),
- #[error("iq receive error: {0}")]
- Iq(IqError),
- #[error("disconnected")]
- Disconnected,
}
#[derive(Debug, Error, Clone)]
@@ -80,7 +89,7 @@ pub enum MessageRecvError {
}
#[derive(Debug, Clone, Error)]
-pub enum ConnectionError {
+pub enum ConnectionJobError {
#[error("connection failed: {0}")]
ConnectionFailed(#[from] jabber::Error),
#[error("failed roster retreival: {0}")]
diff --git a/luz/src/lib.rs b/luz/src/lib.rs
index 3498ff1..b9c482c 100644
--- a/luz/src/lib.rs
+++ b/luz/src/lib.rs
@@ -11,8 +11,8 @@ use chrono::Utc;
use connection::{write::WriteMessage, SupervisorSender};
use db::Db;
use error::{
- ActorError, CommandError, ConnectionError, DatabaseError, IqError, MessageRecvError,
- PresenceError, ReadError, RosterError, StatusError, WriteError,
+ ActorError, CommandError, ConnectionError, ConnectionJobError, DatabaseError, IqError,
+ MessageRecvError, PresenceError, ReadError, RosterError, StatusError, WriteError,
};
use futures::{future::Fuse, FutureExt};
use jabber::JID;
@@ -103,7 +103,7 @@ pub enum Command {
#[derive(Debug)]
pub struct Client {
- sender: mpsc::Sender<LuzMessage>,
+ sender: mpsc::Sender<LuzMessage<Command>>,
timeout: Duration,
}
@@ -117,7 +117,7 @@ impl Clone for Client {
}
impl Deref for Client {
- type Target = mpsc::Sender<LuzMessage>;
+ type Target = mpsc::Sender<LuzMessage<Command>>;
fn deref(&self) -> &Self::Target {
&self.sender
@@ -155,7 +155,8 @@ impl Client {
update_sender: update_send,
};
- let actor = Luz::new(jid, password, command_receiver, None, sup_recv, logic);
+ let actor: Luz<LogicState> =
+ Luz::new(jid, password, command_receiver, None, sup_recv, logic);
tokio::spawn(async move { actor.run().await });
(
@@ -448,8 +449,10 @@ pub struct LogicState {
update_sender: mpsc::Sender<UpdateMessage>,
}
-impl LogicState {
- pub async fn handle_connect(self, connection: Connected) {
+impl Logic for LogicState {
+ type Cmd = Command;
+
+ async fn handle_connect(self, connection: Connected) {
let (send, recv) = oneshot::channel();
debug!("getting roster");
self.clone()
@@ -468,7 +471,7 @@ impl LogicState {
let _ = self
.update_sender
.send(UpdateMessage::Error(Error::Connecting(
- ConnectionError::StatusCacheError(e.into()),
+ ConnectionJobError::StatusCacheError(e.into()),
)))
.await;
Online::default()
@@ -501,7 +504,7 @@ impl LogicState {
let _ = self
.update_sender
.send(UpdateMessage::Error(Error::Connecting(
- ConnectionError::SendPresence(WriteError::Actor(e.into())),
+ ConnectionJobError::SendPresence(WriteError::Actor(e.into())),
)))
.await;
}
@@ -518,7 +521,7 @@ impl LogicState {
let _ = self
.update_sender
.send(UpdateMessage::Error(Error::Connecting(
- ConnectionError::RosterRetreival(RosterError::Write(WriteError::Actor(
+ ConnectionJobError::RosterRetreival(RosterError::Write(WriteError::Actor(
e.into(),
))),
)))
@@ -527,7 +530,7 @@ impl LogicState {
}
}
- pub async fn handle_disconnect(self, connection: Connected) {
+ async fn handle_disconnect(self, connection: Connected) {
// TODO: be able to set offline status message
let offline_presence: stanza::client::presence::Presence =
Offline::default().into_stanza(None);
@@ -540,10 +543,7 @@ impl LogicState {
.await;
}
- /// stanza errors (recoverable)
- pub async fn handle_error(self, error: Error) {}
-
- pub async fn handle_stanza(
+ async fn handle_stanza(
self,
stanza: Stanza,
connection: Connected,
@@ -790,8 +790,7 @@ impl LogicState {
}
}
- // pub async fn handle_stream_error(self, error) {}
- pub async fn handle_online(self, command: Command, connection: Connected) {
+ async fn handle_online(self, command: Command, connection: Connected) {
match command {
Command::GetRoster(result_sender) => {
// TODO: jid resource should probably be stored within the connection
@@ -1438,7 +1437,7 @@ impl LogicState {
}
}
- pub async fn handle_offline(self, command: Command) {
+ async fn handle_offline(self, command: Command) {
match command {
Command::GetRoster(sender) => {
let roster = self.db.read_cached_roster().await;
@@ -1539,6 +1538,24 @@ impl LogicState {
}
}
}
+ // pub async fn handle_stream_error(self, error) {}
+ // stanza errors (recoverable)
+ // pub async fn handle_error(self, error: Error) {}
+ // when it aborts, must clear iq map no matter what
+ async fn on_abort(self) {
+ let mut iqs = self.pending.lock().await;
+ for (_id, sender) in iqs.drain() {
+ let _ = sender.send(Err(ReadError::LostConnection));
+ }
+ }
+
+ async fn handle_connection_error(self, error: ConnectionError) {
+ self.update_sender
+ .send(UpdateMessage::Error(
+ ConnectionError::AlreadyConnected.into(),
+ ))
+ .await;
+ }
}
#[derive(Clone)]
@@ -1548,12 +1565,42 @@ pub struct Connected {
write_handle: WriteHandle,
}
-pub trait Logic {}
+pub trait Logic {
+ type Cmd;
+
+ fn handle_connect(self, connection: Connected) -> impl std::future::Future<Output = ()> + Send;
+ fn handle_disconnect(
+ self,
+ connection: Connected,
+ ) -> impl std::future::Future<Output = ()> + Send;
+ fn handle_stanza(
+ self,
+ stanza: Stanza,
+ connection: Connected,
+ supervisor: SupervisorSender,
+ ) -> impl std::future::Future<Output = ()> + std::marker::Send;
+ fn handle_online(
+ self,
+ command: Self::Cmd,
+ connection: Connected,
+ ) -> impl std::future::Future<Output = ()> + std::marker::Send;
+ fn handle_offline(
+ self,
+ command: Self::Cmd,
+ ) -> impl std::future::Future<Output = ()> + std::marker::Send;
+ fn on_abort(self) -> impl std::future::Future<Output = ()> + std::marker::Send;
+ // TODO: look at these
+ fn handle_connection_error(
+ self,
+ error: ConnectionError,
+ ) -> impl std::future::Future<Output = ()> + std::marker::Send;
+ // async fn handle_stream_error(self, error) {}
+}
-pub struct Luz {
+pub struct Luz<Lgc: Logic> {
jid: JID,
password: Arc<String>,
- receiver: mpsc::Receiver<LuzMessage>,
+ receiver: mpsc::Receiver<LuzMessage<Lgc::Cmd>>,
// TODO: use a dyn passwordprovider trait to avoid storing password in memory
connected: Option<(Connected, SupervisorHandle)>,
// connected_intention: bool,
@@ -1562,19 +1609,19 @@ pub struct Luz {
// TODO: will need to have an auto reconnect state as well (e.g. in case server shut down, to try and reconnect later)
// TODO: will grow forever at this point, maybe not required as tasks will naturally shut down anyway?
// TODO: genericize
- logic: LogicState,
+ logic: Lgc,
// config: LampConfig,
tasks: JoinSet<()>,
}
-impl Luz {
+impl<Lgc: Logic + Clone + Send + 'static> Luz<Lgc> {
fn new(
jid: JID,
password: String,
- receiver: mpsc::Receiver<LuzMessage>,
+ receiver: mpsc::Receiver<LuzMessage<Lgc::Cmd>>,
connected: Option<(Connected, SupervisorHandle)>,
connection_supervisor_shutdown: Fuse<oneshot::Receiver<()>>,
- logic: LogicState,
+ logic: Lgc,
) -> Self {
Self {
jid,
@@ -1609,8 +1656,7 @@ impl Luz {
Some(_) => {
self.logic
.clone()
- .update_sender
- .send(UpdateMessage::Error(Error::AlreadyConnected))
+ .handle_connection_error(ConnectionError::AlreadyConnected)
.await;
}
None => {
@@ -1646,11 +1692,12 @@ impl Luz {
}
Err(e) => {
tracing::error!("error: {}", e);
- let _ = self.logic.clone().update_sender.send(
- UpdateMessage::Error(Error::Connecting(
- ConnectionError::ConnectionFailed(e.into()),
- )),
- );
+ self.logic
+ .clone()
+ .handle_connection_error(ConnectionError::ConnectionFailed(
+ e.into(),
+ ))
+ .await;
}
}
}
@@ -1658,10 +1705,9 @@ impl Luz {
}
LuzMessage::Disconnect => match self.connected {
None => {
- let _ = self
- .logic
- .update_sender
- .send(UpdateMessage::Error(Error::AlreadyDisconnected))
+ self.logic
+ .clone()
+ .handle_connection_error(ConnectionError::AlreadyDisconnected)
.await;
}
ref mut c => {
@@ -1691,76 +1737,15 @@ impl Luz {
}
}
-// TODO: separate sender and receiver, store handle to Luz process to ensure dropping
-// #[derive(Clone)]
-#[derive(Debug)]
-pub struct LuzHandle {
- sender: mpsc::Sender<LuzMessage>,
- timeout: Duration,
-}
-
-impl Clone for LuzHandle {
- fn clone(&self) -> Self {
- Self {
- sender: self.sender.clone(),
- timeout: self.timeout,
- }
- }
-}
-
-impl Deref for LuzHandle {
- type Target = mpsc::Sender<LuzMessage>;
-
- fn deref(&self) -> &Self::Target {
- &self.sender
- }
-}
-
-impl DerefMut for LuzHandle {
- fn deref_mut(&mut self) -> &mut Self::Target {
- &mut self.sender
- }
-}
-
-impl LuzHandle {
- // TODO: database creation separate
- pub fn new(jid: JID, password: String, logic: LogicState) -> Self {
- let (command_sender, command_receiver) = mpsc::channel(20);
-
- // might be bad, first supervisor shutdown notification oneshot is never used (disgusting)
- let (_sup_send, sup_recv) = oneshot::channel();
- let sup_recv = sup_recv.fuse();
-
- let actor = Luz::new(jid, password, command_receiver, None, sup_recv, logic);
- tokio::spawn(async move { actor.run().await });
-
- Self {
- sender: command_sender,
- // TODO: configure timeout
- timeout: Duration::from_secs(10),
- }
- }
-
- pub async fn connect(&self) -> Result<(), ActorError> {
- self.send(LuzMessage::Connect).await?;
- Ok(())
- }
-
- pub async fn disconnect(&self, offline: Offline) -> Result<(), ActorError> {
- self.send(LuzMessage::Disconnect).await?;
- Ok(())
- }
-}
-
// TODO: generate methods for each with a macro
-pub enum LuzMessage {
+pub enum LuzMessage<C> {
// TODO: login invisible xep-0186
/// connect to XMPP chat server. gets roster and publishes initial presence.
Connect,
/// disconnect from XMPP chat server, sending unavailable presence then closing stream.
Disconnect,
/// TODO: generics
- Command(Command),
+ Command(C),
}
#[derive(Debug, Clone)]