aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorLibravatar cel 🌸 <cel@bunny.garden>2025-02-25 23:29:44 +0000
committerLibravatar cel 🌸 <cel@bunny.garden>2025-02-25 23:29:44 +0000
commit4dac2dbe1d86d058b191f0d82c781d27479e4f74 (patch)
tree069c609d797581a91bfd50733a4f405877fa1b97
parentd797061786a2909ced74a8310181cac40c8e99f9 (diff)
downloadluz-4dac2dbe1d86d058b191f0d82c781d27479e4f74.tar.gz
luz-4dac2dbe1d86d058b191f0d82c781d27479e4f74.tar.bz2
luz-4dac2dbe1d86d058b191f0d82c781d27479e4f74.zip
refactor(luz): error types
-rw-r--r--luz/Cargo.toml1
-rw-r--r--luz/src/connection/mod.rs20
-rw-r--r--luz/src/connection/read.rs56
-rw-r--r--luz/src/connection/write.rs15
-rw-r--r--luz/src/error.rs202
-rw-r--r--luz/src/lib.rs289
-rw-r--r--luz/src/presence.rs9
7 files changed, 365 insertions, 227 deletions
diff --git a/luz/Cargo.toml b/luz/Cargo.toml
index 45a847f..90d321c 100644
--- a/luz/Cargo.toml
+++ b/luz/Cargo.toml
@@ -16,3 +16,4 @@ tokio-util = "0.7.13"
tracing = "0.1.41"
tracing-subscriber = "0.3.19"
uuid = { version = "1.13.1", features = ["v4"] }
+thiserror = "2.0.11"
diff --git a/luz/src/connection/mod.rs b/luz/src/connection/mod.rs
index 95aae1a..84109bc 100644
--- a/luz/src/connection/mod.rs
+++ b/luz/src/connection/mod.rs
@@ -20,7 +20,7 @@ use write::{WriteControl, WriteControlHandle, WriteHandle, WriteMessage};
use crate::{
db::Db,
- error::{Error, Reason},
+ error::{Error, ReadError, WriteError},
UpdateMessage,
};
@@ -36,7 +36,7 @@ pub struct Supervisor {
tokio::task::JoinSet<()>,
mpsc::Sender<SupervisorCommand>,
WriteHandle,
- Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Reason>>>>>,
+ Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, ReadError>>>>>,
)>,
sender: mpsc::Sender<UpdateMessage>,
writer_handle: WriteControlHandle,
@@ -62,7 +62,7 @@ pub enum State {
tokio::task::JoinSet<()>,
mpsc::Sender<SupervisorCommand>,
WriteHandle,
- Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Reason>>>>>,
+ Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, ReadError>>>>>,
),
),
}
@@ -77,7 +77,7 @@ impl Supervisor {
JoinSet<()>,
mpsc::Sender<SupervisorCommand>,
WriteHandle,
- Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Reason>>>>>,
+ Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, ReadError>>>>>,
)>,
sender: mpsc::Sender<UpdateMessage>,
writer_handle: WriteControlHandle,
@@ -180,7 +180,7 @@ impl Supervisor {
// if reconnection failure, respond to all current write messages with lost connection error. the received processes should complete themselves.
write_state.close();
while let Some(msg) = write_state.recv().await {
- let _ = msg.respond_to.send(Err(Reason::LostConnection));
+ let _ = msg.respond_to.send(Err(WriteError::LostConnection));
}
// TODO: is this the correct error?
let _ = self.sender.send(UpdateMessage::Error(Error::LostConnection)).await;
@@ -227,9 +227,9 @@ impl Supervisor {
Err(e) => {
// if reconnection failure, respond to all current write messages with lost connection error. the received processes should complete themselves.
write_recv.close();
- let _ = write_msg.respond_to.send(Err(Reason::LostConnection));
+ let _ = write_msg.respond_to.send(Err(WriteError::LostConnection));
while let Some(msg) = write_recv.recv().await {
- let _ = msg.respond_to.send(Err(Reason::LostConnection));
+ let _ = msg.respond_to.send(Err(WriteError::LostConnection));
}
// TODO: is this the correct error to send?
let _ = self.sender.send(UpdateMessage::Error(Error::LostConnection)).await;
@@ -278,10 +278,10 @@ impl Supervisor {
// if reconnection failure, respond to all current messages with lost connection error.
write_receiver.close();
if let Some(msg) = retry_msg {
- msg.respond_to.send(Err(Reason::LostConnection));
+ msg.respond_to.send(Err(WriteError::LostConnection));
}
while let Some(msg) = write_receiver.recv().await {
- msg.respond_to.send(Err(Reason::LostConnection));
+ msg.respond_to.send(Err(WriteError::LostConnection));
}
// TODO: is this the correct error?
let _ = self.sender.send(UpdateMessage::Error(Error::LostConnection)).await;
@@ -342,7 +342,7 @@ impl SupervisorHandle {
on_shutdown: oneshot::Sender<()>,
jid: Arc<Mutex<JID>>,
password: Arc<String>,
- pending_iqs: Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Reason>>>>>,
+ pending_iqs: Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, ReadError>>>>>,
) -> (WriteHandle, Self) {
let (command_sender, command_receiver) = mpsc::channel(20);
let (writer_error_sender, writer_error_receiver) = oneshot::channel();
diff --git a/luz/src/connection/read.rs b/luz/src/connection/read.rs
index 4390e00..0590ce3 100644
--- a/luz/src/connection/read.rs
+++ b/luz/src/connection/read.rs
@@ -18,16 +18,13 @@ use uuid::Uuid;
use crate::{
chat::{Body, Message},
db::Db,
- error::{Error, IqError, PresenceError, Reason, RecvMessageError},
+ error::{Error, IqError, MessageRecvError, PresenceError, ReadError, RosterError},
presence::{Offline, Online, Presence, Show},
roster::Contact,
UpdateMessage,
};
-use super::{
- write::{WriteHandle, WriteMessage},
- SupervisorCommand,
-};
+use super::{write::WriteHandle, SupervisorCommand};
pub struct Read {
control_receiver: mpsc::Receiver<ReadControl>,
@@ -38,7 +35,7 @@ pub struct Read {
JoinSet<()>,
mpsc::Sender<SupervisorCommand>,
WriteHandle,
- Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Reason>>>>>,
+ Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, ReadError>>>>>,
)>,
db: Db,
update_sender: mpsc::Sender<UpdateMessage>,
@@ -48,7 +45,7 @@ pub struct Read {
disconnecting: bool,
disconnect_timedout: oneshot::Receiver<()>,
// TODO: use proper stanza ids
- pending_iqs: Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Reason>>>>>,
+ pending_iqs: Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, ReadError>>>>>,
}
impl Read {
@@ -61,7 +58,7 @@ impl Read {
JoinSet<()>,
mpsc::Sender<SupervisorCommand>,
WriteHandle,
- Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Reason>>>>>,
+ Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, ReadError>>>>>,
)>,
db: Db,
update_sender: mpsc::Sender<UpdateMessage>,
@@ -69,9 +66,9 @@ impl Read {
supervisor_control: mpsc::Sender<SupervisorCommand>,
write_handle: WriteHandle,
tasks: JoinSet<()>,
- pending_iqs: Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Reason>>>>>,
+ pending_iqs: Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, ReadError>>>>>,
) -> Self {
- let (send, recv) = oneshot::channel();
+ let (_send, recv) = oneshot::channel();
Self {
control_receiver,
stream,
@@ -162,7 +159,7 @@ impl Read {
// when it aborts, must clear iq map no matter what
let mut iqs = self.pending_iqs.lock().await;
for (_id, sender) in iqs.drain() {
- let _ = sender.send(Err(Reason::LostConnection));
+ let _ = sender.send(Err(ReadError::LostConnection));
}
}
}
@@ -178,7 +175,7 @@ async fn handle_stanza(
db: Db,
supervisor_control: mpsc::Sender<SupervisorCommand>,
write_handle: WriteHandle,
- pending_iqs: Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Reason>>>>>,
+ pending_iqs: Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, ReadError>>>>>,
) {
match stanza {
Stanza::Message(stanza_message) => {
@@ -207,7 +204,9 @@ async fn handle_stanza(
if let Err(e) = result {
tracing::error!("messagecreate");
let _ = update_sender
- .send(UpdateMessage::Error(Error::CacheUpdate(e.into())))
+ .send(UpdateMessage::Error(Error::MessageRecv(
+ MessageRecvError::MessageHistory(e.into()),
+ )))
.await;
}
let _ = update_sender
@@ -215,8 +214,8 @@ async fn handle_stanza(
.await;
} else {
let _ = update_sender
- .send(UpdateMessage::Error(Error::RecvMessage(
- RecvMessageError::MissingFrom,
+ .send(UpdateMessage::Error(Error::MessageRecv(
+ MessageRecvError::MissingFrom,
)))
.await;
}
@@ -229,9 +228,16 @@ async fn handle_stanza(
stanza::client::presence::PresenceType::Error => {
// TODO: is there any other information that should go with the error? also MUST have an error, otherwise it's a different error. maybe it shoulnd't be an option.
let _ = update_sender
- .send(UpdateMessage::Error(Error::Presence(PresenceError::Error(
- Reason::Stanza(presence.errors.first().cloned()),
- ))))
+ .send(UpdateMessage::Error(Error::Presence(
+ // TODO: ughhhhhhhhhhhhh these stanza errors should probably just have an option, and custom display
+ PresenceError::StanzaError(
+ presence
+ .errors
+ .first()
+ .cloned()
+ .expect("error MUST have error"),
+ ),
+ )))
.await;
}
// should not happen (error to server)
@@ -329,8 +335,8 @@ async fn handle_stanza(
let contact: Contact = item.into();
if let Err(e) = db.upsert_contact(contact.clone()).await {
let _ = update_sender
- .send(UpdateMessage::Error(Error::CacheUpdate(
- e.into(),
+ .send(UpdateMessage::Error(Error::Roster(
+ RosterError::Cache(e.into()),
)))
.await;
}
@@ -381,7 +387,7 @@ pub enum ReadControl {
JoinSet<()>,
mpsc::Sender<SupervisorCommand>,
WriteHandle,
- Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Reason>>>>>,
+ Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, ReadError>>>>>,
)>,
),
}
@@ -414,13 +420,13 @@ impl ReadControlHandle {
JoinSet<()>,
mpsc::Sender<SupervisorCommand>,
WriteHandle,
- Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Reason>>>>>,
+ Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, ReadError>>>>>,
)>,
db: Db,
sender: mpsc::Sender<UpdateMessage>,
supervisor_control: mpsc::Sender<SupervisorCommand>,
jabber_write: WriteHandle,
- pending_iqs: Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Reason>>>>>,
+ pending_iqs: Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, ReadError>>>>>,
) -> Self {
let (control_sender, control_receiver) = mpsc::channel(20);
@@ -451,14 +457,14 @@ impl ReadControlHandle {
JoinSet<()>,
mpsc::Sender<SupervisorCommand>,
WriteHandle,
- Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Reason>>>>>,
+ Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, ReadError>>>>>,
)>,
db: Db,
sender: mpsc::Sender<UpdateMessage>,
supervisor_control: mpsc::Sender<SupervisorCommand>,
jabber_write: WriteHandle,
tasks: JoinSet<()>,
- pending_iqs: Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Reason>>>>>,
+ pending_iqs: Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, ReadError>>>>>,
) -> Self {
let (control_sender, control_receiver) = mpsc::channel(20);
diff --git a/luz/src/connection/write.rs b/luz/src/connection/write.rs
index 2273fac..3333d38 100644
--- a/luz/src/connection/write.rs
+++ b/luz/src/connection/write.rs
@@ -7,7 +7,7 @@ use tokio::{
task::JoinHandle,
};
-use crate::error::{Error, Reason};
+use crate::error::WriteError;
// actor that receives jabber stanzas to write, and if there is an error, sends a message back to the supervisor then aborts, so the supervisor can spawn a new stream.
pub struct Write {
@@ -17,9 +17,10 @@ pub struct Write {
on_crash: oneshot::Sender<(WriteMessage, mpsc::Receiver<WriteMessage>)>,
}
+#[derive(Debug)]
pub struct WriteMessage {
pub stanza: Stanza,
- pub respond_to: oneshot::Sender<Result<(), Reason>>,
+ pub respond_to: oneshot::Sender<Result<(), WriteError>>,
}
pub enum WriteControl {
@@ -84,9 +85,9 @@ impl Write {
Err(e) => match &e {
peanuts::Error::ReadError(_error) => {
// if connection lost during disconnection, just send lost connection error to the write requests
- let _ = msg.respond_to.send(Err(Reason::LostConnection));
+ let _ = msg.respond_to.send(Err(WriteError::LostConnection));
while let Some(msg) = self.stanza_receiver.recv().await {
- let _ = msg.respond_to.send(Err(Reason::LostConnection));
+ let _ = msg.respond_to.send(Err(WriteError::LostConnection));
}
break;
}
@@ -140,16 +141,16 @@ pub struct WriteHandle {
}
impl WriteHandle {
- pub async fn write(&self, stanza: Stanza) -> Result<(), Reason> {
+ pub async fn write(&self, stanza: Stanza) -> Result<(), WriteError> {
let (send, recv) = oneshot::channel();
self.send(WriteMessage {
stanza,
respond_to: send,
})
.await
- .map_err(|_| Reason::ChannelSend)?;
+ .map_err(|e| WriteError::Actor(e.into()))?;
// TODO: timeout
- recv.await?
+ recv.await.map_err(|e| WriteError::Actor(e.into()))?
}
}
diff --git a/luz/src/error.rs b/luz/src/error.rs
index f0b956e..efd3937 100644
--- a/luz/src/error.rs
+++ b/luz/src/error.rs
@@ -1,138 +1,158 @@
+use std::sync::Arc;
+
use stanza::client::Stanza;
-use tokio::sync::oneshot::{self};
+use thiserror::Error;
+use tokio::sync::{mpsc::error::SendError, oneshot::error::RecvError};
-#[derive(Debug)]
+#[derive(Debug, Error, Clone)]
pub enum Error {
+ #[error("already connected")]
AlreadyConnected,
// TODO: change to Connecting(ConnectingError)
- Connection(ConnectionError),
- Presence(PresenceError),
- SetStatus(Reason),
- Roster(Reason),
- Stream(stanza::stream::Error),
- SendMessage(Reason),
- RecvMessage(RecvMessageError),
+ #[error("connecting: {0}")]
+ Connecting(#[from] ConnectionError),
+ #[error("presence: {0}")]
+ Presence(#[from] PresenceError),
+ #[error("set status: {0}")]
+ SetStatus(#[from] StatusError),
+ // TODO: have different ones for get/update/set
+ #[error("roster: {0}")]
+ Roster(RosterError),
+ #[error("stream error: {0}")]
+ Stream(#[from] stanza::stream::Error),
+ #[error("message send error: {0}")]
+ MessageSend(MessageSendError),
+ #[error("message receive error: {0}")]
+ MessageRecv(MessageRecvError),
+ #[error("already disconnected")]
AlreadyDisconnected,
+ #[error("lost connection")]
LostConnection,
- // TODO: should all cache update errors include the context?
- CacheUpdate(Reason),
+ // TODO: Display for Content
+ #[error("received unrecognized/unsupported content: {0:?}")]
UnrecognizedContent(peanuts::element::Content),
+ #[error("iq receive error: {0}")]
Iq(IqError),
- Cloned,
+ #[error("disconnected")]
+ Disconnected,
}
-// TODO: this is horrifying, maybe just use tracing to forward error events???
-impl Clone for Error {
- fn clone(&self) -> Self {
- Error::Cloned
- }
+#[derive(Debug, Error, Clone)]
+pub enum MessageSendError {
+ #[error("could not add to message history: {0}")]
+ MessageHistory(#[from] DatabaseError),
}
-#[derive(Debug)]
+#[derive(Debug, Error, Clone)]
pub enum PresenceError {
- Error(Reason),
+ #[error("unsupported")]
Unsupported,
+ #[error("missing from")]
MissingFrom,
+ #[error("stanza error: {0}")]
+ StanzaError(#[from] stanza::client::error::Error),
}
-#[derive(Debug)]
+#[derive(Debug, Error, Clone)]
+// TODO: should probably have all iq query related errors here, including read, write, stanza error, etc.
pub enum IqError {
+ #[error("no iq with id matching `{0}`")]
NoMatchingId(String),
}
-#[derive(Debug)]
-pub enum RecvMessageError {
+#[derive(Debug, Error, Clone)]
+pub enum MessageRecvError {
+ #[error("could not add to message history: {0}")]
+ MessageHistory(#[from] DatabaseError),
+ #[error("missing from")]
MissingFrom,
}
-#[derive(Debug, Clone)]
+#[derive(Debug, Clone, Error)]
pub enum ConnectionError {
- ConnectionFailed(Reason),
- RosterRetreival(Reason),
- SendPresence(Reason),
- NoCachedStatus(Reason),
-}
-
-#[derive(Debug)]
-pub struct RosterError(pub Reason);
-
-impl From<RosterError> for Error {
- fn from(e: RosterError) -> Self {
- Self::Roster(e.0)
- }
-}
-
-impl From<RosterError> for ConnectionError {
- fn from(e: RosterError) -> Self {
- Self::RosterRetreival(e.0)
- }
+ #[error("connection failed: {0}")]
+ ConnectionFailed(#[from] jabber::Error),
+ #[error("failed roster retreival: {0}")]
+ RosterRetreival(#[from] RosterError),
+ #[error("failed to send available presence: {0}")]
+ SendPresence(#[from] WriteError),
+ #[error("cached status: {0}")]
+ StatusCacheError(#[from] DatabaseError),
+}
+
+#[derive(Debug, Error, Clone)]
+pub enum RosterError {
+ #[error("cache: {0}")]
+ Cache(#[from] DatabaseError),
+ #[error("stream write: {0}")]
+ Write(#[from] WriteError),
+ // TODO: display for stanza, to show as xml, same for read error types.
+ #[error("unexpected reply: {0:?}")]
+ UnexpectedStanza(Stanza),
+ #[error("stream read: {0}")]
+ Read(#[from] ReadError),
+ #[error("stanza error: {0}")]
+ StanzaError(#[from] stanza::client::error::Error),
}
-pub struct StatusError(pub Reason);
+#[derive(Debug, Error, Clone)]
+#[error("database error: {0}")]
+pub struct DatabaseError(Arc<sqlx::Error>);
-impl From<StatusError> for Error {
- fn from(e: StatusError) -> Self {
- Error::SetStatus(e.0)
+impl From<sqlx::Error> for DatabaseError {
+ fn from(e: sqlx::Error) -> Self {
+ Self(Arc::new(e))
}
}
-impl From<StatusError> for ConnectionError {
- fn from(e: StatusError) -> Self {
- Self::SendPresence(e.0)
- }
+#[derive(Debug, Error, Clone)]
+pub enum StatusError {
+ #[error("cache: {0}")]
+ Cache(#[from] DatabaseError),
+ #[error("stream write: {0}")]
+ Write(#[from] WriteError),
}
-#[derive(Debug)]
-pub enum Reason {
- // TODO: organisastion of error into internal error thing
- Timeout,
- Stream(stanza::stream_error::Error),
- Stanza(Option<stanza::client::error::Error>),
- Jabber(jabber::Error),
- XML(peanuts::Error),
- SQL(sqlx::Error),
- // JID(jid::ParseError),
+#[derive(Debug, Error, Clone)]
+pub enum WriteError {
+ #[error("xml: {0}")]
+ XML(#[from] peanuts::Error),
+ #[error("lost connection")]
LostConnection,
- OneshotRecv(oneshot::error::RecvError),
- UnexpectedStanza(Stanza),
+ // TODO: should this be in writeerror or separate?
+ #[error("actor: {0}")]
+ Actor(#[from] ActorError),
+ #[error("disconnected")]
Disconnected,
- ChannelSend,
- Cloned,
-}
-
-// TODO: same here
-impl Clone for Reason {
- fn clone(&self) -> Self {
- Reason::Cloned
- }
}
-impl From<oneshot::error::RecvError> for Reason {
- fn from(e: oneshot::error::RecvError) -> Reason {
- Self::OneshotRecv(e)
- }
+// TODO: separate peanuts read and write error?
+#[derive(Debug, Error, Clone)]
+pub enum ReadError {
+ #[error("xml: {0}")]
+ XML(#[from] peanuts::Error),
+ #[error("lost connection")]
+ LostConnection,
}
-impl From<peanuts::Error> for Reason {
- fn from(e: peanuts::Error) -> Self {
- Self::XML(e)
- }
+#[derive(Debug, Error, Clone)]
+pub enum ActorError {
+ #[error("receive timed out")]
+ Timeout,
+ #[error("could not send message to actor, channel closed")]
+ Send,
+ #[error("could not receive message from actor, channel closed")]
+ Receive,
}
-// impl From<jid::ParseError> for Reason {
-// fn from(e: jid::ParseError) -> Self {
-// Self::JID(e)
-// }
-// }
-
-impl From<sqlx::Error> for Reason {
- fn from(e: sqlx::Error) -> Self {
- Self::SQL(e)
+impl<T> From<SendError<T>> for ActorError {
+ fn from(_e: SendError<T>) -> Self {
+ Self::Send
}
}
-impl From<jabber::Error> for Reason {
- fn from(e: jabber::Error) -> Self {
- Self::Jabber(e)
+impl From<RecvError> for ActorError {
+ fn from(_e: RecvError) -> Self {
+ Self::Receive
}
}
diff --git a/luz/src/lib.rs b/luz/src/lib.rs
index 293bc08..91bcfdf 100644
--- a/luz/src/lib.rs
+++ b/luz/src/lib.rs
@@ -7,7 +7,7 @@ use std::{
use chat::{Body, Chat, Message};
use connection::{write::WriteMessage, SupervisorSender};
use db::Db;
-use error::{ConnectionError, Reason, RosterError, StatusError};
+use error::{ConnectionError, DatabaseError, ReadError, RosterError, StatusError, WriteError};
use futures::{future::Fuse, FutureExt};
use jabber::JID;
use presence::{Offline, Online, Presence};
@@ -21,7 +21,7 @@ use tokio::{
sync::{mpsc, oneshot, Mutex},
task::JoinSet,
};
-use tracing::{debug, info, Instrument};
+use tracing::{debug, info};
use user::User;
use uuid::Uuid;
@@ -44,7 +44,7 @@ pub struct Luz {
// TODO: use a dyn passwordprovider trait to avoid storing password in memory
password: Arc<String>,
connected: Arc<Mutex<Option<(WriteHandle, SupervisorHandle)>>>,
- pending_iqs: Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Reason>>>>>,
+ pending_iqs: Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, ReadError>>>>>,
db: Db,
sender: mpsc::Sender<UpdateMessage>,
/// if connection was shut down due to e.g. server shutdown, supervisor must be able to mark client as disconnected
@@ -159,8 +159,8 @@ impl Luz {
let _ = self
.sender
.send(UpdateMessage::Error(
- Error::Connection(
- ConnectionError::NoCachedStatus(
+ Error::Connecting(
+ ConnectionError::StatusCacheError(
e.into(),
),
),
@@ -170,16 +170,20 @@ impl Luz {
}
};
let (send, recv) = oneshot::channel();
- CommandMessage::SetStatus(online.clone(), send)
- .handle_online(
- writer.clone(),
- supervisor.sender(),
- self.jid.clone(),
- self.db.clone(),
- self.sender.clone(),
- self.pending_iqs.clone(),
- )
- .await;
+ CommandMessage::SendPresence(
+ None,
+ Presence::Online(online.clone()),
+ send,
+ )
+ .handle_online(
+ writer.clone(),
+ supervisor.sender(),
+ self.jid.clone(),
+ self.db.clone(),
+ self.sender.clone(),
+ self.pending_iqs.clone(),
+ )
+ .await;
let set_status = recv.await;
match set_status {
Ok(s) => match s {
@@ -198,13 +202,13 @@ impl Luz {
let _ = self
.sender
.send(UpdateMessage::Error(
- Error::Connection(e.into()),
+ Error::Connecting(e.into()),
))
.await;
}
},
Err(e) => {
- let _ = self.sender.send(UpdateMessage::Error(Error::Connection(ConnectionError::SendPresence(e.into())))).await;
+ let _ = self.sender.send(UpdateMessage::Error(Error::Connecting(ConnectionError::SendPresence(WriteError::Actor(e.into()))))).await;
}
}
}
@@ -212,7 +216,7 @@ impl Luz {
let _ = self
.sender
.send(UpdateMessage::Error(
- Error::Connection(e.into()),
+ Error::Connecting(e.into()),
))
.await;
}
@@ -221,8 +225,12 @@ impl Luz {
Err(e) => {
let _ = self
.sender
- .send(UpdateMessage::Error(Error::Connection(
- ConnectionError::RosterRetreival(e.into()),
+ .send(UpdateMessage::Error(Error::Connecting(
+ ConnectionError::RosterRetreival(
+ RosterError::Write(WriteError::Actor(
+ e.into(),
+ )),
+ ),
)))
.await;
}
@@ -230,7 +238,7 @@ impl Luz {
}
Err(e) => {
let _ =
- self.sender.send(UpdateMessage::Error(Error::Connection(
+ self.sender.send(UpdateMessage::Error(Error::Connecting(
ConnectionError::ConnectionFailed(e.into()),
)));
}
@@ -286,7 +294,7 @@ impl Luz {
impl CommandMessage {
pub async fn handle_offline(
- mut self,
+ self,
jid: Arc<Mutex<JID>>,
db: Db,
update_sender: mpsc::Sender<UpdateMessage>,
@@ -301,7 +309,7 @@ impl CommandMessage {
let _ = sender.send(Ok(roster));
}
Err(e) => {
- let _ = sender.send(Err(RosterError(e.into())));
+ let _ = sender.send(Err(RosterError::Cache(e.into())));
}
}
}
@@ -331,45 +339,48 @@ impl CommandMessage {
}
// TODO: offline queue to modify roster
CommandMessage::AddContact(jid, sender) => {
- sender.send(Err(Reason::Disconnected));
+ sender.send(Err(RosterError::Write(WriteError::Disconnected)));
}
CommandMessage::BuddyRequest(jid, sender) => {
- sender.send(Err(Reason::Disconnected));
+ sender.send(Err(WriteError::Disconnected));
}
CommandMessage::SubscriptionRequest(jid, sender) => {
- sender.send(Err(Reason::Disconnected));
+ sender.send(Err(WriteError::Disconnected));
}
CommandMessage::AcceptBuddyRequest(jid, sender) => {
- sender.send(Err(Reason::Disconnected));
+ sender.send(Err(WriteError::Disconnected));
}
CommandMessage::AcceptSubscriptionRequest(jid, sender) => {
- sender.send(Err(Reason::Disconnected));
+ sender.send(Err(WriteError::Disconnected));
}
CommandMessage::UnsubscribeFromContact(jid, sender) => {
- sender.send(Err(Reason::Disconnected));
+ sender.send(Err(WriteError::Disconnected));
}
CommandMessage::UnsubscribeContact(jid, sender) => {
- sender.send(Err(Reason::Disconnected));
+ sender.send(Err(WriteError::Disconnected));
}
CommandMessage::UnfriendContact(jid, sender) => {
- sender.send(Err(Reason::Disconnected));
+ sender.send(Err(WriteError::Disconnected));
}
CommandMessage::DeleteContact(jid, sender) => {
- sender.send(Err(Reason::Disconnected));
+ sender.send(Err(RosterError::Write(WriteError::Disconnected)));
}
CommandMessage::UpdateContact(jid, contact_update, sender) => {
- sender.send(Err(Reason::Disconnected));
+ sender.send(Err(RosterError::Write(WriteError::Disconnected)));
}
CommandMessage::SetStatus(online, sender) => {
let result = db
.upsert_cached_status(online)
.await
- .map_err(|e| StatusError(e.into()));
+ .map_err(|e| StatusError::Cache(e.into()));
sender.send(result);
}
// TODO: offline message queue
CommandMessage::SendMessage(jid, body, sender) => {
- sender.send(Err(Reason::Disconnected));
+ sender.send(Err(WriteError::Disconnected));
+ }
+ CommandMessage::SendPresence(jid, presence, sender) => {
+ sender.send(Err(WriteError::Disconnected));
}
}
}
@@ -382,7 +393,7 @@ impl CommandMessage {
client_jid: Arc<Mutex<JID>>,
db: Db,
update_sender: mpsc::Sender<UpdateMessage>,
- pending_iqs: Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Reason>>>>>,
+ pending_iqs: Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, ReadError>>>>>,
) {
match self {
CommandMessage::Connect => unreachable!(),
@@ -424,11 +435,12 @@ impl CommandMessage {
Ok(Ok(())) => info!("roster request sent"),
Ok(Err(e)) => {
// TODO: log errors if fail to send
- let _ = result_sender.send(Err(RosterError(e.into())));
+ let _ = result_sender.send(Err(RosterError::Write(e.into())));
return;
}
Err(e) => {
- let _ = result_sender.send(Err(RosterError(e.into())));
+ let _ = result_sender
+ .send(Err(RosterError::Write(WriteError::Actor(e.into()))));
return;
}
};
@@ -448,23 +460,41 @@ impl CommandMessage {
items.into_iter().map(|item| item.into()).collect();
if let Err(e) = db.replace_cached_roster(contacts.clone()).await {
update_sender
- .send(UpdateMessage::Error(Error::CacheUpdate(e.into())))
+ .send(UpdateMessage::Error(Error::Roster(RosterError::Cache(
+ e.into(),
+ ))))
.await;
};
result_sender.send(Ok(contacts));
return;
}
+ ref s @ Stanza::Iq(Iq {
+ from: _,
+ ref id,
+ to: _,
+ r#type,
+ lang: _,
+ query: _,
+ ref errors,
+ }) if *id == iq_id && r#type == IqType::Error => {
+ if let Some(error) = errors.first() {
+ result_sender.send(Err(RosterError::StanzaError(error.clone())));
+ } else {
+ result_sender.send(Err(RosterError::UnexpectedStanza(s.clone())));
+ }
+ return;
+ }
s => {
- result_sender.send(Err(RosterError(Reason::UnexpectedStanza(s))));
+ result_sender.send(Err(RosterError::UnexpectedStanza(s)));
return;
}
},
Ok(Err(e)) => {
- result_sender.send(Err(RosterError(e.into())));
+ result_sender.send(Err(RosterError::Read(e)));
return;
}
Err(e) => {
- result_sender.send(Err(RosterError(e.into())));
+ result_sender.send(Err(RosterError::Write(WriteError::Actor(e.into()))));
return;
}
}
@@ -525,8 +555,8 @@ impl CommandMessage {
}
// TODO: write_handle send helper function
let result = write_handle.write(set_stanza).await;
- if let Err(_) = result {
- sender.send(result);
+ if let Err(e) = result {
+ sender.send(Err(RosterError::Write(e)));
return;
}
let iq_result = recv.await;
@@ -545,24 +575,24 @@ impl CommandMessage {
sender.send(Ok(()));
return;
}
- Stanza::Iq(Iq {
+ ref s @ Stanza::Iq(Iq {
from: _,
- id,
+ ref id,
to: _,
r#type,
lang: _,
query: _,
- errors,
- }) if id == iq_id && r#type == IqType::Error => {
+ ref errors,
+ }) if *id == iq_id && r#type == IqType::Error => {
if let Some(error) = errors.first() {
- sender.send(Err(Reason::Stanza(Some(error.clone()))));
+ sender.send(Err(RosterError::StanzaError(error.clone())));
} else {
- sender.send(Err(Reason::Stanza(None)));
+ sender.send(Err(RosterError::UnexpectedStanza(s.clone())));
}
return;
}
s => {
- sender.send(Err(Reason::UnexpectedStanza(s)));
+ sender.send(Err(RosterError::UnexpectedStanza(s)));
return;
}
},
@@ -572,7 +602,7 @@ impl CommandMessage {
}
},
Err(e) => {
- sender.send(Err(e.into()));
+ sender.send(Err(RosterError::Write(WriteError::Actor(e.into()))));
return;
}
}
@@ -770,8 +800,8 @@ impl CommandMessage {
pending_iqs.lock().await.insert(iq_id.clone(), send);
}
let result = write_handle.write(set_stanza).await;
- if let Err(_) = result {
- sender.send(result);
+ if let Err(e) = result {
+ sender.send(Err(RosterError::Write(e)));
return;
}
let iq_result = recv.await;
@@ -790,24 +820,24 @@ impl CommandMessage {
sender.send(Ok(()));
return;
}
- Stanza::Iq(Iq {
+ ref s @ Stanza::Iq(Iq {
from: _,
- id,
+ ref id,
to: _,
r#type,
lang: _,
query: _,
- errors,
- }) if id == iq_id && r#type == IqType::Error => {
+ ref errors,
+ }) if *id == iq_id && r#type == IqType::Error => {
if let Some(error) = errors.first() {
- sender.send(Err(Reason::Stanza(Some(error.clone()))));
+ sender.send(Err(RosterError::StanzaError(error.clone())));
} else {
- sender.send(Err(Reason::Stanza(None)));
+ sender.send(Err(RosterError::UnexpectedStanza(s.clone())));
}
return;
}
s => {
- sender.send(Err(Reason::UnexpectedStanza(s)));
+ sender.send(Err(RosterError::UnexpectedStanza(s)));
return;
}
},
@@ -817,7 +847,7 @@ impl CommandMessage {
}
},
Err(e) => {
- sender.send(Err(e.into()));
+ sender.send(Err(RosterError::Write(WriteError::Actor(e.into()))));
return;
}
}
@@ -858,8 +888,8 @@ impl CommandMessage {
pending_iqs.lock().await.insert(iq_id.clone(), send);
}
let result = write_handle.write(set_stanza).await;
- if let Err(_) = result {
- sender.send(result);
+ if let Err(e) = result {
+ sender.send(Err(RosterError::Write(e)));
return;
}
let iq_result = recv.await;
@@ -878,24 +908,24 @@ impl CommandMessage {
sender.send(Ok(()));
return;
}
- Stanza::Iq(Iq {
+ ref s @ Stanza::Iq(Iq {
from: _,
- id,
+ ref id,
to: _,
r#type,
lang: _,
query: _,
- errors,
- }) if id == iq_id && r#type == IqType::Error => {
+ ref errors,
+ }) if *id == iq_id && r#type == IqType::Error => {
if let Some(error) = errors.first() {
- sender.send(Err(Reason::Stanza(Some(error.clone()))));
+ sender.send(Err(RosterError::StanzaError(error.clone())));
} else {
- sender.send(Err(Reason::Stanza(None)));
+ sender.send(Err(RosterError::UnexpectedStanza(s.clone())));
}
return;
}
s => {
- sender.send(Err(Reason::UnexpectedStanza(s)));
+ sender.send(Err(RosterError::UnexpectedStanza(s)));
return;
}
},
@@ -905,7 +935,7 @@ impl CommandMessage {
}
},
Err(e) => {
- sender.send(Err(e.into()));
+ sender.send(Err(RosterError::Write(WriteError::Actor(e.into()))));
return;
}
}
@@ -914,13 +944,16 @@ impl CommandMessage {
let result = db.upsert_cached_status(online.clone()).await;
if let Err(e) = result {
let _ = update_sender
- .send(UpdateMessage::Error(Error::CacheUpdate(e.into())))
+ .send(UpdateMessage::Error(Error::SetStatus(StatusError::Cache(
+ e.into(),
+ ))))
.await;
}
let result = write_handle
.write(Stanza::Presence(online.into()))
.await
- .map_err(|e| StatusError(e));
+ .map_err(|e| StatusError::Write(e));
+ // .map_err(|e| StatusError::Write(e));
let _ = sender.send(result);
}
// TODO: offline message queue
@@ -956,7 +989,9 @@ impl CommandMessage {
};
if let Err(e) = db.create_message(message, jid).await.map_err(|e| e.into())
{
- let _ = update_sender.send(UpdateMessage::Error(Error::CacheUpdate(e)));
+ let _ = update_sender.send(UpdateMessage::Error(Error::MessageSend(
+ error::MessageSendError::MessageHistory(e),
+ )));
}
let _ = sender.send(Ok(()));
}
@@ -965,6 +1000,15 @@ impl CommandMessage {
}
}
}
+ CommandMessage::SendPresence(jid, presence, sender) => {
+ let mut presence: stanza::client::presence::Presence = presence.into();
+ if let Some(jid) = jid {
+ presence.to = Some(jid);
+ };
+ let result = write_handle.write(Stanza::Presence(presence)).await;
+ // .map_err(|e| StatusError::Write(e));
+ let _ = sender.send(result);
+ }
}
}
}
@@ -999,11 +1043,12 @@ impl DerefMut for LuzHandle {
}
impl LuzHandle {
+ // TODO: database creation separate
pub async fn new(
jid: JID,
password: String,
db: &str,
- ) -> Result<(Self, mpsc::Receiver<UpdateMessage>), Reason> {
+ ) -> Result<(Self, mpsc::Receiver<UpdateMessage>), DatabaseError> {
let db = SqlitePool::connect(db).await?;
let (command_sender, command_receiver) = mpsc::channel(20);
let (update_sender, update_receiver) = mpsc::channel(20);
@@ -1030,8 +1075,59 @@ impl LuzHandle {
update_receiver,
))
}
+
+ pub async fn connect(&self) {
+ self.send(CommandMessage::Connect).await;
+ }
+
+ pub async fn disconnect(&self, offline: Offline) {
+ self.send(CommandMessage::Disconnect(offline)).await;
+ }
+
+ // pub async fn get_roster(&self) -> Result<Vec<Contact>, RosterError> {
+ // let (send, recv) = oneshot::channel();
+ // self.send(CommandMessage::GetRoster(send)).await.map_err(|e| RosterError::)?;
+ // Ok(recv.await?)
+ // }
+
+ // pub async fn get_chats(&self) -> Result<Vec<Chat>, Error> {}
+
+ // pub async fn get_chat(&self, jid: JID) -> Result<Chat, Error> {}
+
+ // pub async fn get_messages(&self, jid: JID) -> Result<Vec<Message>, Error> {}
+
+ // pub async fn delete_chat(&self, jid: JID) -> Result<(), Error> {}
+
+ // pub async fn delete_message(&self, id: Uuid) -> Result<(), Error> {}
+
+ // pub async fn get_user(&self, jid: JID) -> Result<User, Error> {}
+
+ // pub async fn add_contact(&self, jid: JID) -> Result<(), Error> {}
+
+ // pub async fn buddy_request(&self, jid: JID) -> Result<(), Error> {}
+
+ // pub async fn subscription_request(&self, jid: JID) -> Result<(), Error> {}
+
+ // pub async fn accept_buddy_request(&self, jid: JID) -> Result<(), Error> {}
+
+ // pub async fn accept_subscription_request(&self, jid: JID) -> Result<(), Error> {}
+
+ // pub async fn unsubscribe_from_contact(&self, jid: JID) -> Result<(), Error> {}
+
+ // pub async fn unsubscribe_contact(&self, jid: JID) -> Result<(), Error> {}
+
+ // pub async fn unfriend_contact(&self, jid: JID) -> Result<(), Error> {}
+
+ // pub async fn delete_contact(&self, jid: JID) -> Result<(), Error> {}
+
+ // pub async fn update_contact(&self, jid: JID, update: ContactUpdate) -> Result<(), Error> {}
+
+ // pub async fn set_status(&self, online: Online) -> Result<(), Error> {}
+
+ // pub async fn send_message(&self, jid: JID, body: Body) -> Result<(), Error> {}
}
+// TODO: generate methods for each with a macro
pub enum CommandMessage {
// TODO: login invisible xep-0186
/// connect to XMPP chat server. gets roster and publishes initial presence.
@@ -1042,46 +1138,51 @@ pub enum CommandMessage {
GetRoster(oneshot::Sender<Result<Vec<Contact>, RosterError>>),
/// get all chats. chat will include 10 messages in their message Vec (enough for chat previews)
// TODO: paging and filtering
- GetChats(oneshot::Sender<Result<Vec<Chat>, Reason>>),
+ GetChats(oneshot::Sender<Result<Vec<Chat>, DatabaseError>>),
/// get a specific chat by jid
- GetChat(JID, oneshot::Sender<Result<Chat, Reason>>),
+ GetChat(JID, oneshot::Sender<Result<Chat, DatabaseError>>),
/// get message history for chat (does appropriate mam things)
// TODO: paging and filtering
- GetMessages(JID, oneshot::Sender<Result<Vec<Message>, Reason>>),
+ GetMessages(JID, oneshot::Sender<Result<Vec<Message>, DatabaseError>>),
/// delete a chat from your chat history, along with all the corresponding messages
- DeleteChat(JID, oneshot::Sender<Result<(), Reason>>),
+ DeleteChat(JID, oneshot::Sender<Result<(), DatabaseError>>),
/// delete a message from your chat history
- DeleteMessage(Uuid, oneshot::Sender<Result<(), Reason>>),
+ DeleteMessage(Uuid, oneshot::Sender<Result<(), DatabaseError>>),
/// get a user from your users database
- GetUser(JID, oneshot::Sender<Result<User, Reason>>),
+ GetUser(JID, oneshot::Sender<Result<User, DatabaseError>>),
/// add a contact to your roster, with a status of none, no subscriptions.
- // TODO: for all these, consider returning with oneshot::Sender<Result<(), Error>>
- AddContact(JID, oneshot::Sender<Result<(), Reason>>),
+ AddContact(JID, oneshot::Sender<Result<(), RosterError>>),
/// send a friend request i.e. a subscription request with a subscription pre-approval. if not already added to roster server adds to roster.
- BuddyRequest(JID, oneshot::Sender<Result<(), Reason>>),
+ BuddyRequest(JID, oneshot::Sender<Result<(), WriteError>>),
/// send a subscription request, without pre-approval. if not already added to roster server adds to roster.
- SubscriptionRequest(JID, oneshot::Sender<Result<(), Reason>>),
+ SubscriptionRequest(JID, oneshot::Sender<Result<(), WriteError>>),
/// accept a friend request by accepting a pending subscription and sending a subscription request back. if not already added to roster adds to roster.
- AcceptBuddyRequest(JID, oneshot::Sender<Result<(), Reason>>),
+ AcceptBuddyRequest(JID, oneshot::Sender<Result<(), WriteError>>),
/// accept a pending subscription and doesn't send a subscription request back. if not already added to roster adds to roster.
- AcceptSubscriptionRequest(JID, oneshot::Sender<Result<(), Reason>>),
+ AcceptSubscriptionRequest(JID, oneshot::Sender<Result<(), WriteError>>),
/// unsubscribe to a contact, but don't remove their subscription.
- UnsubscribeFromContact(JID, oneshot::Sender<Result<(), Reason>>),
+ UnsubscribeFromContact(JID, oneshot::Sender<Result<(), WriteError>>),
/// stop a contact from being subscribed, but stay subscribed to the contact.
- UnsubscribeContact(JID, oneshot::Sender<Result<(), Reason>>),
+ UnsubscribeContact(JID, oneshot::Sender<Result<(), WriteError>>),
/// remove subscriptions to and from contact, but keep in roster.
- UnfriendContact(JID, oneshot::Sender<Result<(), Reason>>),
+ UnfriendContact(JID, oneshot::Sender<Result<(), WriteError>>),
/// remove a contact from the contact list. will remove subscriptions if not already done then delete contact from roster.
- DeleteContact(JID, oneshot::Sender<Result<(), Reason>>),
+ DeleteContact(JID, oneshot::Sender<Result<(), RosterError>>),
/// update contact. contact details will be overwritten with the contents of the contactupdate struct.
- UpdateContact(JID, ContactUpdate, oneshot::Sender<Result<(), Reason>>),
+ UpdateContact(JID, ContactUpdate, oneshot::Sender<Result<(), RosterError>>),
/// set online status. if disconnected, will be cached so when client connects, will be sent as the initial presence.
SetStatus(Online, oneshot::Sender<Result<(), StatusError>>),
+ /// send presence stanza
+ SendPresence(
+ Option<JID>,
+ Presence,
+ oneshot::Sender<Result<(), WriteError>>,
+ ),
/// send a directed presence (usually to a non-contact).
// TODO: should probably make it so people can add non-contact auto presence sharing in the client (most likely through setting an internal setting)
/// send a message to a jid (any kind of jid that can receive a message, e.g. a user or a
/// chatroom). if disconnected, will be cached so when client connects, message will be sent.
- SendMessage(JID, Body, oneshot::Sender<Result<(), Reason>>),
+ SendMessage(JID, Body, oneshot::Sender<Result<(), WriteError>>),
}
#[derive(Debug, Clone)]
diff --git a/luz/src/presence.rs b/luz/src/presence.rs
index 31a0d30..4bc1993 100644
--- a/luz/src/presence.rs
+++ b/luz/src/presence.rs
@@ -110,3 +110,12 @@ impl From<Offline> for stanza::client::presence::Presence {
}
}
}
+
+impl From<Presence> for stanza::client::presence::Presence {
+ fn from(value: Presence) -> Self {
+ match value {
+ Presence::Online(online) => online.into(),
+ Presence::Offline(offline) => offline.into(),
+ }
+ }
+}