aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--.helix/languages.toml2
-rw-r--r--Cargo.toml4
-rw-r--r--filamento/Cargo.toml17
-rw-r--r--filamento/README.md3
-rw-r--r--filamento/filamento.dbbin0 -> 90112 bytes
-rw-r--r--filamento/migrations/20240113011930_luz.sql (renamed from luz/migrations/20240113011930_luz.sql)0
-rw-r--r--filamento/src/chat.rs (renamed from luz/src/chat.rs)0
-rw-r--r--filamento/src/db.rs (renamed from luz/src/db/mod.rs)0
-rw-r--r--filamento/src/error.rs142
-rw-r--r--filamento/src/lib.rs1598
-rw-r--r--filamento/src/presence.rs (renamed from luz/src/presence.rs)0
-rw-r--r--filamento/src/roster.rs (renamed from luz/src/roster.rs)0
-rw-r--r--filamento/src/user.rs (renamed from luz/src/user.rs)0
-rw-r--r--jabber/Cargo.toml42
-rw-r--r--jabber/src/error.rs58
-rw-r--r--jabber/src/lib.rs25
-rw-r--r--lampada/.gitignore (renamed from luz/.gitignore)0
-rw-r--r--lampada/Cargo.toml17
-rw-r--r--lampada/README.md3
-rw-r--r--lampada/scratch (renamed from luz/scratch)3
-rw-r--r--lampada/src/connection/mod.rs (renamed from luz/src/connection/mod.rs)19
-rw-r--r--lampada/src/connection/read.rs (renamed from luz/src/connection/read.rs)15
-rw-r--r--lampada/src/connection/write.rs (renamed from luz/src/connection/write.rs)2
-rw-r--r--lampada/src/error.rs82
-rw-r--r--lampada/src/lib.rs238
-rw-r--r--lampada/src/main.rs (renamed from luz/src/main.rs)6
-rw-r--r--luz/Cargo.lock (renamed from jabber/Cargo.lock)0
-rw-r--r--luz/Cargo.toml46
-rw-r--r--luz/src/client.rs (renamed from jabber/src/client.rs)0
-rw-r--r--luz/src/connection.rs (renamed from jabber/src/connection.rs)0
-rw-r--r--luz/src/error.rs246
-rw-r--r--luz/src/jabber_stream.rs (renamed from jabber/src/jabber_stream.rs)0
-rw-r--r--luz/src/jabber_stream/bound_stream.rs (renamed from jabber/src/jabber_stream/bound_stream.rs)0
-rw-r--r--luz/src/lib.rs1785
34 files changed, 2220 insertions, 2133 deletions
diff --git a/.helix/languages.toml b/.helix/languages.toml
index 9e34dc3..ad628c8 100644
--- a/.helix/languages.toml
+++ b/.helix/languages.toml
@@ -1,4 +1,4 @@
[language-server.rust-analyzer]
command = "rust-analyzer"
-environment = { "DATABASE_URL" = "sqlite://luz/luz.db" }
+environment = { "DATABASE_URL" = "sqlite://filamento/filamento.db" }
config = { cargo.features = "all" }
diff --git a/Cargo.toml b/Cargo.toml
index f3137ce..a9daa7a 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -3,6 +3,6 @@ resolver = "2"
members = [
"luz",
- "jabber",
- "stanza", "jid",
+ "lampada",
+ "stanza", "jid", "filamento",
]
diff --git a/filamento/Cargo.toml b/filamento/Cargo.toml
new file mode 100644
index 0000000..e25024a
--- /dev/null
+++ b/filamento/Cargo.toml
@@ -0,0 +1,17 @@
+[package]
+name = "filamento"
+version = "0.1.0"
+edition = "2024"
+
+[dependencies]
+futures = "0.3.31"
+lampada = { version = "0.1.0", path = "../lampada" }
+tokio = "1.42.0"
+thiserror = "2.0.11"
+stanza = { version = "0.1.0", path = "../stanza", features = ["xep_0203"] }
+sqlx = { version = "0.8.3", features = ["sqlite", "runtime-tokio", "uuid", "chrono"] }
+# TODO: re-export jid?
+jid = { version = "0.1.0", path = "../jid", features = ["sqlx"] }
+uuid = { version = "1.13.1", features = ["v4"] }
+tracing = "0.1.41"
+chrono = "0.4.40"
diff --git a/filamento/README.md b/filamento/README.md
new file mode 100644
index 0000000..57b4135
--- /dev/null
+++ b/filamento/README.md
@@ -0,0 +1,3 @@
+# filament
+
+a high-level xmpp chat client using luz
diff --git a/filamento/filamento.db b/filamento/filamento.db
new file mode 100644
index 0000000..5c3c720
--- /dev/null
+++ b/filamento/filamento.db
Binary files differ
diff --git a/luz/migrations/20240113011930_luz.sql b/filamento/migrations/20240113011930_luz.sql
index 148598b..148598b 100644
--- a/luz/migrations/20240113011930_luz.sql
+++ b/filamento/migrations/20240113011930_luz.sql
diff --git a/luz/src/chat.rs b/filamento/src/chat.rs
index c1194ea..c1194ea 100644
--- a/luz/src/chat.rs
+++ b/filamento/src/chat.rs
diff --git a/luz/src/db/mod.rs b/filamento/src/db.rs
index aea40ac..aea40ac 100644
--- a/luz/src/db/mod.rs
+++ b/filamento/src/db.rs
diff --git a/filamento/src/error.rs b/filamento/src/error.rs
new file mode 100644
index 0000000..996a503
--- /dev/null
+++ b/filamento/src/error.rs
@@ -0,0 +1,142 @@
+use std::sync::Arc;
+
+use lampada::error::{ConnectionError, ReadError, WriteError};
+use stanza::client::Stanza;
+use thiserror::Error;
+
+pub use lampada::error::CommandError;
+
+// for the client logic impl
+#[derive(Debug, Error, Clone)]
+pub enum Error {
+ #[error("core error: {0}")]
+ Connection(#[from] ConnectionError),
+ #[error("received unrecognized/unsupported content")]
+ UnrecognizedContent,
+ // TODO: include content
+ // UnrecognizedContent(peanuts::element::Content),
+ #[error("iq receive error: {0}")]
+ Iq(IqError),
+ // TODO: change to Connecting(ConnectingError)
+ #[error("connecting: {0}")]
+ Connecting(#[from] ConnectionJobError),
+ #[error("presence: {0}")]
+ Presence(#[from] PresenceError),
+ #[error("set status: {0}")]
+ SetStatus(#[from] StatusError),
+ // TODO: have different ones for get/update/set
+ #[error("roster: {0}")]
+ Roster(RosterError),
+ #[error("stream error: {0}")]
+ Stream(#[from] stanza::stream::Error),
+ #[error("message send error: {0}")]
+ MessageSend(MessageSendError),
+ #[error("message receive error: {0}")]
+ MessageRecv(MessageRecvError),
+}
+
+#[derive(Debug, Error, Clone)]
+pub enum MessageSendError {
+ #[error("could not add to message history: {0}")]
+ MessageHistory(#[from] DatabaseError),
+}
+
+#[derive(Debug, Error, Clone)]
+pub enum MessageRecvError {
+ #[error("could not add to message history: {0}")]
+ MessageHistory(#[from] DatabaseError),
+ #[error("missing from")]
+ MissingFrom,
+}
+
+#[derive(Debug, Error, Clone)]
+pub enum StatusError {
+ #[error("cache: {0}")]
+ Cache(#[from] DatabaseError),
+ #[error("stream write: {0}")]
+ Write(#[from] WriteError),
+}
+
+#[derive(Debug, Clone, Error)]
+pub enum ConnectionJobError {
+ // #[error("connection failed: {0}")]
+ // ConnectionFailed(#[from] luz::Error),
+ #[error("failed roster retreival: {0}")]
+ RosterRetreival(#[from] RosterError),
+ #[error("failed to send available presence: {0}")]
+ SendPresence(#[from] WriteError),
+ #[error("cached status: {0}")]
+ StatusCacheError(#[from] DatabaseError),
+}
+
+#[derive(Debug, Error, Clone)]
+pub enum RosterError {
+ #[error("cache: {0}")]
+ Cache(#[from] DatabaseError),
+ #[error("stream write: {0}")]
+ Write(#[from] WriteError),
+ // TODO: display for stanza, to show as xml, same for read error types.
+ #[error("unexpected reply: {0:?}")]
+ UnexpectedStanza(Stanza),
+ #[error("stream read: {0}")]
+ Read(#[from] ReadError),
+ #[error("stanza error: {0}")]
+ StanzaError(#[from] stanza::client::error::Error),
+}
+
+#[derive(Debug, Error, Clone)]
+#[error("database error: {0}")]
+pub struct DatabaseError(Arc<sqlx::Error>);
+
+impl From<sqlx::Error> for DatabaseError {
+ fn from(e: sqlx::Error) -> Self {
+ Self(Arc::new(e))
+ }
+}
+
+impl From<sqlx::Error> for DatabaseOpenError {
+ fn from(e: sqlx::Error) -> Self {
+ Self::Error(Arc::new(e))
+ }
+}
+
+#[derive(Debug, Error, Clone)]
+// TODO: should probably have all iq query related errors here, including read, write, stanza error, etc.
+pub enum IqError {
+ #[error("no iq with id matching `{0}`")]
+ NoMatchingId(String),
+}
+
+#[derive(Debug, Error, Clone)]
+pub enum DatabaseOpenError {
+ #[error("error: {0}")]
+ Error(Arc<sqlx::Error>),
+ #[error("migration: {0}")]
+ Migration(Arc<sqlx::migrate::MigrateError>),
+ #[error("io: {0}")]
+ Io(Arc<tokio::io::Error>),
+ #[error("invalid path")]
+ InvalidPath,
+}
+
+impl From<sqlx::migrate::MigrateError> for DatabaseOpenError {
+ fn from(e: sqlx::migrate::MigrateError) -> Self {
+ Self::Migration(Arc::new(e))
+ }
+}
+
+impl From<tokio::io::Error> for DatabaseOpenError {
+ fn from(e: tokio::io::Error) -> Self {
+ Self::Io(Arc::new(e))
+ }
+}
+
+#[derive(Debug, Error, Clone)]
+pub enum PresenceError {
+ #[error("unsupported")]
+ Unsupported,
+ #[error("missing from")]
+ MissingFrom,
+ #[error("stanza error: {0}")]
+ StanzaError(#[from] stanza::client::error::Error),
+}
diff --git a/filamento/src/lib.rs b/filamento/src/lib.rs
new file mode 100644
index 0000000..db59a67
--- /dev/null
+++ b/filamento/src/lib.rs
@@ -0,0 +1,1598 @@
+use std::{
+ collections::HashMap,
+ ops::{Deref, DerefMut},
+ str::FromStr,
+ sync::Arc,
+ time::Duration,
+};
+
+use chat::{Body, Chat, Message};
+use chrono::Utc;
+use db::Db;
+use error::{
+ ConnectionJobError, DatabaseError, Error, IqError, MessageRecvError, PresenceError,
+ RosterError, StatusError,
+};
+use futures::FutureExt;
+use jid::JID;
+use lampada::{
+ Connected, CoreClient, CoreClientCommand, Logic, SupervisorSender, WriteMessage,
+ error::{ActorError, CommandError, ConnectionError, ReadError, WriteError},
+};
+use presence::{Offline, Online, Presence, PresenceType, Show};
+use roster::{Contact, ContactUpdate};
+use stanza::client::{
+ Stanza,
+ iq::{self, Iq, IqType},
+};
+use tokio::{
+ sync::{Mutex, mpsc, oneshot},
+ time::timeout,
+};
+use tracing::{debug, info};
+use user::User;
+use uuid::Uuid;
+
+pub mod chat;
+pub mod db;
+pub mod error;
+pub mod presence;
+pub mod roster;
+pub mod user;
+
+pub enum Command {
+ /// get the roster. if offline, retreive cached version from database. should be stored in application memory
+ GetRoster(oneshot::Sender<Result<Vec<Contact>, RosterError>>),
+ /// get all chats. chat will include 10 messages in their message Vec (enough for chat previews)
+ // TODO: paging and filtering
+ GetChats(oneshot::Sender<Result<Vec<Chat>, DatabaseError>>),
+ // TODO: paging and filtering
+ GetChatsOrdered(oneshot::Sender<Result<Vec<Chat>, DatabaseError>>),
+ // TODO: paging and filtering
+ GetChatsOrderedWithLatestMessages(oneshot::Sender<Result<Vec<(Chat, Message)>, DatabaseError>>),
+ /// get a specific chat by jid
+ GetChat(JID, oneshot::Sender<Result<Chat, DatabaseError>>),
+ /// get message history for chat (does appropriate mam things)
+ // TODO: paging and filtering
+ GetMessages(JID, oneshot::Sender<Result<Vec<Message>, DatabaseError>>),
+ /// delete a chat from your chat history, along with all the corresponding messages
+ DeleteChat(JID, oneshot::Sender<Result<(), DatabaseError>>),
+ /// delete a message from your chat history
+ DeleteMessage(Uuid, oneshot::Sender<Result<(), DatabaseError>>),
+ /// get a user from your users database
+ GetUser(JID, oneshot::Sender<Result<User, DatabaseError>>),
+ /// add a contact to your roster, with a status of none, no subscriptions.
+ AddContact(JID, oneshot::Sender<Result<(), RosterError>>),
+ /// send a friend request i.e. a subscription request with a subscription pre-approval. if not already added to roster server adds to roster.
+ BuddyRequest(JID, oneshot::Sender<Result<(), WriteError>>),
+ /// send a subscription request, without pre-approval. if not already added to roster server adds to roster.
+ SubscriptionRequest(JID, oneshot::Sender<Result<(), WriteError>>),
+ /// accept a friend request by accepting a pending subscription and sending a subscription request back. if not already added to roster adds to roster.
+ AcceptBuddyRequest(JID, oneshot::Sender<Result<(), WriteError>>),
+ /// accept a pending subscription and doesn't send a subscription request back. if not already added to roster adds to roster.
+ AcceptSubscriptionRequest(JID, oneshot::Sender<Result<(), WriteError>>),
+ /// unsubscribe to a contact, but don't remove their subscription.
+ UnsubscribeFromContact(JID, oneshot::Sender<Result<(), WriteError>>),
+ /// stop a contact from being subscribed, but stay subscribed to the contact.
+ UnsubscribeContact(JID, oneshot::Sender<Result<(), WriteError>>),
+ /// remove subscriptions to and from contact, but keep in roster.
+ UnfriendContact(JID, oneshot::Sender<Result<(), WriteError>>),
+ /// remove a contact from the contact list. will remove subscriptions if not already done then delete contact from roster.
+ DeleteContact(JID, oneshot::Sender<Result<(), RosterError>>),
+ /// update contact. contact details will be overwritten with the contents of the contactupdate struct.
+ UpdateContact(JID, ContactUpdate, oneshot::Sender<Result<(), RosterError>>),
+ /// set online status. if disconnected, will be cached so when client connects, will be sent as the initial presence.
+ SetStatus(Online, oneshot::Sender<Result<(), StatusError>>),
+ /// send presence stanza
+ // TODO: cache presence stanza
+ SendPresence(
+ Option<JID>,
+ PresenceType,
+ oneshot::Sender<Result<(), WriteError>>,
+ ),
+ /// send a directed presence (usually to a non-contact).
+ // TODO: should probably make it so people can add non-contact auto presence sharing in the client (most likely through setting an internal setting)
+ /// send a message to a jid (any kind of jid that can receive a message, e.g. a user or a
+ /// chatroom). if disconnected, will be cached so when client connects, message will be sent.
+ SendMessage(JID, Body, oneshot::Sender<Result<(), WriteError>>),
+}
+/// an xmpp client that is suited for a chat client use case
+#[derive(Debug)]
+pub struct Client {
+ sender: mpsc::Sender<CoreClientCommand<Command>>,
+ timeout: Duration,
+}
+
+impl Clone for Client {
+ fn clone(&self) -> Self {
+ Self {
+ sender: self.sender.clone(),
+ timeout: self.timeout,
+ }
+ }
+}
+
+impl Deref for Client {
+ type Target = mpsc::Sender<CoreClientCommand<Command>>;
+
+ fn deref(&self) -> &Self::Target {
+ &self.sender
+ }
+}
+
+impl DerefMut for Client {
+ fn deref_mut(&mut self) -> &mut Self::Target {
+ &mut self.sender
+ }
+}
+
+impl Client {
+ pub async fn connect(&self) -> Result<(), ActorError> {
+ self.send(CoreClientCommand::Connect).await?;
+ Ok(())
+ }
+
+ pub async fn disconnect(&self, offline: Offline) -> Result<(), ActorError> {
+ self.send(CoreClientCommand::Disconnect).await?;
+ Ok(())
+ }
+
+ pub fn new(jid: JID, password: String, db: Db) -> (Self, mpsc::Receiver<UpdateMessage>) {
+ let (command_sender, command_receiver) = mpsc::channel(20);
+ let (update_send, update_recv) = mpsc::channel(20);
+
+ // might be bad, first supervisor shutdown notification oneshot is never used (disgusting)
+ let (_sup_send, sup_recv) = oneshot::channel();
+ let sup_recv = sup_recv.fuse();
+
+ let logic = ClientLogic {
+ db,
+ pending: Arc::new(Mutex::new(HashMap::new())),
+ update_sender: update_send,
+ };
+
+ let actor: CoreClient<ClientLogic> =
+ CoreClient::new(jid, password, command_receiver, None, sup_recv, logic);
+ tokio::spawn(async move { actor.run().await });
+
+ (
+ Self {
+ sender: command_sender,
+ // TODO: configure timeout
+ timeout: Duration::from_secs(10),
+ },
+ update_recv,
+ )
+ }
+
+ pub async fn get_roster(&self) -> Result<Vec<Contact>, CommandError<RosterError>> {
+ let (send, recv) = oneshot::channel();
+ self.send(CoreClientCommand::Command(Command::GetRoster(send)))
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
+ let roster = timeout(self.timeout, recv)
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
+ Ok(roster)
+ }
+
+ pub async fn get_chats(&self) -> Result<Vec<Chat>, CommandError<DatabaseError>> {
+ let (send, recv) = oneshot::channel();
+ self.send(CoreClientCommand::Command(Command::GetChats(send)))
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
+ let chats = timeout(self.timeout, recv)
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
+ Ok(chats)
+ }
+
+ pub async fn get_chats_ordered(&self) -> Result<Vec<Chat>, CommandError<DatabaseError>> {
+ let (send, recv) = oneshot::channel();
+ self.send(CoreClientCommand::Command(Command::GetChatsOrdered(send)))
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
+ let chats = timeout(self.timeout, recv)
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
+ Ok(chats)
+ }
+
+ pub async fn get_chats_ordered_with_latest_messages(
+ &self,
+ ) -> Result<Vec<(Chat, Message)>, CommandError<DatabaseError>> {
+ let (send, recv) = oneshot::channel();
+ self.send(CoreClientCommand::Command(
+ Command::GetChatsOrderedWithLatestMessages(send),
+ ))
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
+ let chats = timeout(self.timeout, recv)
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
+ Ok(chats)
+ }
+
+ pub async fn get_chat(&self, jid: JID) -> Result<Chat, CommandError<DatabaseError>> {
+ let (send, recv) = oneshot::channel();
+ self.send(CoreClientCommand::Command(Command::GetChat(jid, send)))
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
+ let chat = timeout(self.timeout, recv)
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
+ Ok(chat)
+ }
+
+ pub async fn get_messages(
+ &self,
+ jid: JID,
+ ) -> Result<Vec<Message>, CommandError<DatabaseError>> {
+ let (send, recv) = oneshot::channel();
+ self.send(CoreClientCommand::Command(Command::GetMessages(jid, send)))
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
+ let messages = timeout(self.timeout, recv)
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
+ Ok(messages)
+ }
+
+ pub async fn delete_chat(&self, jid: JID) -> Result<(), CommandError<DatabaseError>> {
+ let (send, recv) = oneshot::channel();
+ self.send(CoreClientCommand::Command(Command::DeleteChat(jid, send)))
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
+ let result = timeout(self.timeout, recv)
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
+ Ok(result)
+ }
+
+ pub async fn delete_message(&self, id: Uuid) -> Result<(), CommandError<DatabaseError>> {
+ let (send, recv) = oneshot::channel();
+ self.send(CoreClientCommand::Command(Command::DeleteMessage(id, send)))
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
+ let result = timeout(self.timeout, recv)
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
+ Ok(result)
+ }
+
+ pub async fn get_user(&self, jid: JID) -> Result<User, CommandError<DatabaseError>> {
+ let (send, recv) = oneshot::channel();
+ self.send(CoreClientCommand::Command(Command::GetUser(jid, send)))
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
+ let result = timeout(self.timeout, recv)
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
+ Ok(result)
+ }
+
+ pub async fn add_contact(&self, jid: JID) -> Result<(), CommandError<RosterError>> {
+ let (send, recv) = oneshot::channel();
+ self.send(CoreClientCommand::Command(Command::AddContact(jid, send)))
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
+ let result = timeout(self.timeout, recv)
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
+ Ok(result)
+ }
+
+ pub async fn buddy_request(&self, jid: JID) -> Result<(), CommandError<WriteError>> {
+ let (send, recv) = oneshot::channel();
+ self.send(CoreClientCommand::Command(Command::BuddyRequest(jid, send)))
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
+ let result = timeout(self.timeout, recv)
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
+ Ok(result)
+ }
+
+ pub async fn subscription_request(&self, jid: JID) -> Result<(), CommandError<WriteError>> {
+ let (send, recv) = oneshot::channel();
+ self.send(CoreClientCommand::Command(Command::SubscriptionRequest(
+ jid, send,
+ )))
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
+ let result = timeout(self.timeout, recv)
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
+ Ok(result)
+ }
+
+ pub async fn accept_buddy_request(&self, jid: JID) -> Result<(), CommandError<WriteError>> {
+ let (send, recv) = oneshot::channel();
+ self.send(CoreClientCommand::Command(Command::AcceptBuddyRequest(
+ jid, send,
+ )))
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
+ let result = timeout(self.timeout, recv)
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
+ Ok(result)
+ }
+
+ pub async fn accept_subscription_request(
+ &self,
+ jid: JID,
+ ) -> Result<(), CommandError<WriteError>> {
+ let (send, recv) = oneshot::channel();
+ self.send(CoreClientCommand::Command(
+ Command::AcceptSubscriptionRequest(jid, send),
+ ))
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
+ let result = timeout(self.timeout, recv)
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
+ Ok(result)
+ }
+
+ pub async fn unsubscribe_from_contact(&self, jid: JID) -> Result<(), CommandError<WriteError>> {
+ let (send, recv) = oneshot::channel();
+ self.send(CoreClientCommand::Command(Command::UnsubscribeFromContact(
+ jid, send,
+ )))
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
+ let result = timeout(self.timeout, recv)
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
+ Ok(result)
+ }
+
+ pub async fn unsubscribe_contact(&self, jid: JID) -> Result<(), CommandError<WriteError>> {
+ let (send, recv) = oneshot::channel();
+ self.send(CoreClientCommand::Command(Command::UnsubscribeContact(
+ jid, send,
+ )))
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
+ let result = timeout(self.timeout, recv)
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
+ Ok(result)
+ }
+
+ pub async fn unfriend_contact(&self, jid: JID) -> Result<(), CommandError<WriteError>> {
+ let (send, recv) = oneshot::channel();
+ self.send(CoreClientCommand::Command(Command::UnfriendContact(
+ jid, send,
+ )))
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
+ let result = timeout(self.timeout, recv)
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
+ Ok(result)
+ }
+
+ pub async fn delete_contact(&self, jid: JID) -> Result<(), CommandError<RosterError>> {
+ let (send, recv) = oneshot::channel();
+ self.send(CoreClientCommand::Command(Command::DeleteContact(
+ jid, send,
+ )))
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
+ let result = timeout(self.timeout, recv)
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
+ Ok(result)
+ }
+
+ pub async fn update_contact(
+ &self,
+ jid: JID,
+ update: ContactUpdate,
+ ) -> Result<(), CommandError<RosterError>> {
+ let (send, recv) = oneshot::channel();
+ self.send(CoreClientCommand::Command(Command::UpdateContact(
+ jid, update, send,
+ )))
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
+ let result = timeout(self.timeout, recv)
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
+ Ok(result)
+ }
+
+ pub async fn set_status(&self, online: Online) -> Result<(), CommandError<StatusError>> {
+ let (send, recv) = oneshot::channel();
+ self.send(CoreClientCommand::Command(Command::SetStatus(online, send)))
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
+ let result = timeout(self.timeout, recv)
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
+ Ok(result)
+ }
+
+ pub async fn send_message(&self, jid: JID, body: Body) -> Result<(), CommandError<WriteError>> {
+ let (send, recv) = oneshot::channel();
+ self.send(CoreClientCommand::Command(Command::SendMessage(
+ jid, body, send,
+ )))
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
+ let result = timeout(self.timeout, recv)
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
+ Ok(result)
+ }
+}
+
+#[derive(Clone)]
+pub struct ClientLogic {
+ db: Db,
+ pending: Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, ReadError>>>>>,
+ update_sender: mpsc::Sender<UpdateMessage>,
+}
+
+impl Logic for ClientLogic {
+ type Cmd = Command;
+
+ async fn handle_connect(self, connection: Connected) {
+ let (send, recv) = oneshot::channel();
+ debug!("getting roster");
+ self.clone()
+ .handle_online(Command::GetRoster(send), connection.clone())
+ .await;
+ debug!("sent roster req");
+ let roster = recv.await;
+ debug!("got roster");
+ match roster {
+ Ok(r) => match r {
+ Ok(roster) => {
+ let online = self.db.read_cached_status().await;
+ let online = match online {
+ Ok(online) => online,
+ Err(e) => {
+ let _ = self
+ .update_sender
+ .send(UpdateMessage::Error(Error::Connecting(
+ ConnectionJobError::StatusCacheError(e.into()),
+ )))
+ .await;
+ Online::default()
+ }
+ };
+ let (send, recv) = oneshot::channel();
+ self.clone()
+ .handle_online(
+ Command::SendPresence(None, PresenceType::Online(online.clone()), send),
+ connection,
+ )
+ .await;
+ let set_status = recv.await;
+ match set_status {
+ Ok(s) => match s {
+ Ok(()) => {
+ let _ = self
+ .update_sender
+ .send(UpdateMessage::Online(online, roster))
+ .await;
+ }
+ Err(e) => {
+ let _ = self
+ .update_sender
+ .send(UpdateMessage::Error(Error::Connecting(e.into())))
+ .await;
+ }
+ },
+ Err(e) => {
+ let _ = self
+ .update_sender
+ .send(UpdateMessage::Error(Error::Connecting(
+ ConnectionJobError::SendPresence(WriteError::Actor(e.into())),
+ )))
+ .await;
+ }
+ }
+ }
+ Err(e) => {
+ let _ = self
+ .update_sender
+ .send(UpdateMessage::Error(Error::Connecting(e.into())))
+ .await;
+ }
+ },
+ Err(e) => {
+ let _ = self
+ .update_sender
+ .send(UpdateMessage::Error(Error::Connecting(
+ ConnectionJobError::RosterRetreival(RosterError::Write(WriteError::Actor(
+ e.into(),
+ ))),
+ )))
+ .await;
+ }
+ }
+ }
+
+ async fn handle_disconnect(self, connection: Connected) {
+ // TODO: be able to set offline status message
+ let offline_presence: stanza::client::presence::Presence =
+ Offline::default().into_stanza(None);
+ let stanza = Stanza::Presence(offline_presence);
+ // TODO: timeout and error check
+ connection.write_handle().write(stanza).await;
+ let _ = self
+ .update_sender
+ .send(UpdateMessage::Offline(Offline::default()))
+ .await;
+ }
+
+ async fn handle_stanza(
+ self,
+ stanza: Stanza,
+ connection: Connected,
+ supervisor: SupervisorSender,
+ ) {
+ match stanza {
+ Stanza::Message(stanza_message) => {
+ if let Some(mut from) = stanza_message.from {
+ // TODO: don't ignore delay from. xep says SHOULD send error if incorrect.
+ let timestamp = stanza_message
+ .delay
+ .map(|delay| delay.stamp)
+ .unwrap_or_else(|| Utc::now());
+ // TODO: group chat messages
+ let mut message = Message {
+ id: stanza_message
+ .id
+ // TODO: proper id storage
+ .map(|id| Uuid::from_str(&id).unwrap_or_else(|_| Uuid::new_v4()))
+ .unwrap_or_else(|| Uuid::new_v4()),
+ from: from.clone(),
+ timestamp,
+ body: Body {
+ // TODO: should this be an option?
+ body: stanza_message
+ .body
+ .map(|body| body.body)
+ .unwrap_or_default()
+ .unwrap_or_default(),
+ },
+ };
+ // TODO: can this be more efficient?
+ let result = self
+ .db
+ .create_message_with_user_resource_and_chat(message.clone(), from.clone())
+ .await;
+ if let Err(e) = result {
+ tracing::error!("messagecreate");
+ let _ = self
+ .update_sender
+ .send(UpdateMessage::Error(Error::MessageRecv(
+ MessageRecvError::MessageHistory(e.into()),
+ )))
+ .await;
+ }
+ message.from = message.from.as_bare();
+ from = from.as_bare();
+ let _ = self
+ .update_sender
+ .send(UpdateMessage::Message { to: from, message })
+ .await;
+ } else {
+ let _ = self
+ .update_sender
+ .send(UpdateMessage::Error(Error::MessageRecv(
+ MessageRecvError::MissingFrom,
+ )))
+ .await;
+ }
+ }
+ Stanza::Presence(presence) => {
+ if let Some(from) = presence.from {
+ match presence.r#type {
+ Some(r#type) => match r#type {
+ // error processing a presence from somebody
+ stanza::client::presence::PresenceType::Error => {
+ // TODO: is there any other information that should go with the error? also MUST have an error, otherwise it's a different error. maybe it shoulnd't be an option.
+ let _ = self
+ .update_sender
+ .send(UpdateMessage::Error(Error::Presence(
+ // TODO: ughhhhhhhhhhhhh these stanza errors should probably just have an option, and custom display
+ PresenceError::StanzaError(
+ presence
+ .errors
+ .first()
+ .cloned()
+ .expect("error MUST have error"),
+ ),
+ )))
+ .await;
+ }
+ // should not happen (error to server)
+ stanza::client::presence::PresenceType::Probe => {
+ // TODO: should probably write an error and restart stream
+ let _ = self
+ .update_sender
+ .send(UpdateMessage::Error(Error::Presence(
+ PresenceError::Unsupported,
+ )))
+ .await;
+ }
+ stanza::client::presence::PresenceType::Subscribe => {
+ // may get a subscription request from somebody who is not a contact!!! therefore should be its own kind of event
+ let _ = self
+ .update_sender
+ .send(UpdateMessage::SubscriptionRequest(from))
+ .await;
+ }
+ stanza::client::presence::PresenceType::Unavailable => {
+ let offline = Offline {
+ status: presence.status.map(|status| status.status.0),
+ };
+ let timestamp = presence
+ .delay
+ .map(|delay| delay.stamp)
+ .unwrap_or_else(|| Utc::now());
+ let _ = self
+ .update_sender
+ .send(UpdateMessage::Presence {
+ from,
+ presence: Presence {
+ timestamp,
+ presence: PresenceType::Offline(offline),
+ },
+ })
+ .await;
+ }
+ // for now, do nothing, as these are simply informational. will receive roster push from the server regarding the changes to do with them.
+ stanza::client::presence::PresenceType::Subscribed => {}
+ stanza::client::presence::PresenceType::Unsubscribe => {}
+ stanza::client::presence::PresenceType::Unsubscribed => {}
+ },
+ None => {
+ let online = Online {
+ show: presence.show.map(|show| match show {
+ stanza::client::presence::Show::Away => Show::Away,
+ stanza::client::presence::Show::Chat => Show::Chat,
+ stanza::client::presence::Show::Dnd => Show::DoNotDisturb,
+ stanza::client::presence::Show::Xa => Show::ExtendedAway,
+ }),
+ status: presence.status.map(|status| status.status.0),
+ priority: presence.priority.map(|priority| priority.0),
+ };
+ let timestamp = presence
+ .delay
+ .map(|delay| delay.stamp)
+ .unwrap_or_else(|| Utc::now());
+ let _ = self
+ .update_sender
+ .send(UpdateMessage::Presence {
+ from,
+ presence: Presence {
+ timestamp,
+ presence: PresenceType::Online(online),
+ },
+ })
+ .await;
+ }
+ }
+ } else {
+ let _ = self
+ .update_sender
+ .send(UpdateMessage::Error(Error::Presence(
+ PresenceError::MissingFrom,
+ )))
+ .await;
+ }
+ }
+ Stanza::Iq(iq) => match iq.r#type {
+ stanza::client::iq::IqType::Error | stanza::client::iq::IqType::Result => {
+ let send;
+ {
+ send = self.pending.lock().await.remove(&iq.id);
+ }
+ if let Some(send) = send {
+ send.send(Ok(Stanza::Iq(iq)));
+ } else {
+ let _ = self
+ .update_sender
+ .send(UpdateMessage::Error(Error::Iq(IqError::NoMatchingId(
+ iq.id,
+ ))))
+ .await;
+ }
+ }
+ // TODO: send unsupported to server
+ // TODO: proper errors i am so tired please
+ stanza::client::iq::IqType::Get => {}
+ stanza::client::iq::IqType::Set => {
+ if let Some(query) = iq.query {
+ match query {
+ stanza::client::iq::Query::Roster(mut query) => {
+ // TODO: there should only be one
+ if let Some(item) = query.items.pop() {
+ match item.subscription {
+ Some(stanza::roster::Subscription::Remove) => {
+ self.db.delete_contact(item.jid.clone()).await;
+ self.update_sender
+ .send(UpdateMessage::RosterDelete(item.jid))
+ .await;
+ // TODO: send result
+ }
+ _ => {
+ let contact: Contact = item.into();
+ if let Err(e) =
+ self.db.upsert_contact(contact.clone()).await
+ {
+ let _ = self
+ .update_sender
+ .send(UpdateMessage::Error(Error::Roster(
+ RosterError::Cache(e.into()),
+ )))
+ .await;
+ }
+ let _ = self
+ .update_sender
+ .send(UpdateMessage::RosterUpdate(contact))
+ .await;
+ // TODO: send result
+ // write_handle.write(Stanza::Iq(stanza::client::iq::Iq {
+ // from: ,
+ // id: todo!(),
+ // to: todo!(),
+ // r#type: todo!(),
+ // lang: todo!(),
+ // query: todo!(),
+ // errors: todo!(),
+ // }));
+ }
+ }
+ }
+ }
+ // TODO: send unsupported to server
+ _ => {}
+ }
+ } else {
+ // TODO: send error (unsupported) to server
+ }
+ }
+ },
+ Stanza::Error(error) => {
+ let _ = self
+ .update_sender
+ .send(UpdateMessage::Error(Error::Stream(error)))
+ .await;
+ // TODO: reconnect
+ }
+ Stanza::OtherContent(content) => {
+ let _ = self
+ .update_sender
+ .send(UpdateMessage::Error(Error::UnrecognizedContent));
+ // TODO: send error to write_thread
+ }
+ }
+ }
+
+ async fn handle_online(self, command: Command, connection: Connected) {
+ match command {
+ Command::GetRoster(result_sender) => {
+ // TODO: jid resource should probably be stored within the connection
+ debug!("before client_jid lock");
+ debug!("after client_jid lock");
+ let iq_id = Uuid::new_v4().to_string();
+ let (send, iq_recv) = oneshot::channel();
+ {
+ self.pending.lock().await.insert(iq_id.clone(), send);
+ }
+ let stanza = Stanza::Iq(Iq {
+ from: Some(connection.jid().clone()),
+ id: iq_id.to_string(),
+ to: None,
+ r#type: IqType::Get,
+ lang: None,
+ query: Some(iq::Query::Roster(stanza::roster::Query {
+ ver: None,
+ items: Vec::new(),
+ })),
+ errors: Vec::new(),
+ });
+ let (send, recv) = oneshot::channel();
+ let _ = connection
+ .write_handle()
+ .send(WriteMessage {
+ stanza,
+ respond_to: send,
+ })
+ .await;
+ // TODO: timeout
+ match recv.await {
+ Ok(Ok(())) => info!("roster request sent"),
+ Ok(Err(e)) => {
+ // TODO: log errors if fail to send
+ let _ = result_sender.send(Err(RosterError::Write(e.into())));
+ return;
+ }
+ Err(e) => {
+ let _ = result_sender
+ .send(Err(RosterError::Write(WriteError::Actor(e.into()))));
+ return;
+ }
+ };
+ // TODO: timeout
+ match iq_recv.await {
+ Ok(Ok(stanza)) => match stanza {
+ Stanza::Iq(Iq {
+ from: _,
+ id,
+ to: _,
+ r#type,
+ lang: _,
+ query: Some(iq::Query::Roster(stanza::roster::Query { ver: _, items })),
+ errors: _,
+ }) if id == iq_id && r#type == IqType::Result => {
+ let contacts: Vec<Contact> =
+ items.into_iter().map(|item| item.into()).collect();
+ if let Err(e) = self.db.replace_cached_roster(contacts.clone()).await {
+ self.update_sender
+ .send(UpdateMessage::Error(Error::Roster(RosterError::Cache(
+ e.into(),
+ ))))
+ .await;
+ };
+ result_sender.send(Ok(contacts));
+ return;
+ }
+ ref s @ Stanza::Iq(Iq {
+ from: _,
+ ref id,
+ to: _,
+ r#type,
+ lang: _,
+ query: _,
+ ref errors,
+ }) if *id == iq_id && r#type == IqType::Error => {
+ if let Some(error) = errors.first() {
+ result_sender.send(Err(RosterError::StanzaError(error.clone())));
+ } else {
+ result_sender.send(Err(RosterError::UnexpectedStanza(s.clone())));
+ }
+ return;
+ }
+ s => {
+ result_sender.send(Err(RosterError::UnexpectedStanza(s)));
+ return;
+ }
+ },
+ Ok(Err(e)) => {
+ result_sender.send(Err(RosterError::Read(e)));
+ return;
+ }
+ Err(e) => {
+ result_sender.send(Err(RosterError::Write(WriteError::Actor(e.into()))));
+ return;
+ }
+ }
+ }
+ Command::GetChats(sender) => {
+ let chats = self.db.read_chats().await.map_err(|e| e.into());
+ sender.send(chats);
+ }
+ Command::GetChatsOrdered(sender) => {
+ let chats = self.db.read_chats_ordered().await.map_err(|e| e.into());
+ sender.send(chats);
+ }
+ Command::GetChatsOrderedWithLatestMessages(sender) => {
+ let chats = self
+ .db
+ .read_chats_ordered_with_latest_messages()
+ .await
+ .map_err(|e| e.into());
+ sender.send(chats);
+ }
+ Command::GetChat(jid, sender) => {
+ let chats = self.db.read_chat(jid).await.map_err(|e| e.into());
+ sender.send(chats);
+ }
+ Command::GetMessages(jid, sender) => {
+ let messages = self
+ .db
+ .read_message_history(jid)
+ .await
+ .map_err(|e| e.into());
+ sender.send(messages);
+ }
+ Command::DeleteChat(jid, sender) => {
+ let result = self.db.delete_chat(jid).await.map_err(|e| e.into());
+ sender.send(result);
+ }
+ Command::DeleteMessage(uuid, sender) => {
+ let result = self.db.delete_message(uuid).await.map_err(|e| e.into());
+ sender.send(result);
+ }
+ Command::GetUser(jid, sender) => {
+ let user = self.db.read_user(jid).await.map_err(|e| e.into());
+ sender.send(user);
+ }
+ // TODO: offline queue to modify roster
+ Command::AddContact(jid, sender) => {
+ let iq_id = Uuid::new_v4().to_string();
+ let set_stanza = Stanza::Iq(Iq {
+ from: Some(connection.jid().clone()),
+ id: iq_id.clone(),
+ to: None,
+ r#type: IqType::Set,
+ lang: None,
+ query: Some(iq::Query::Roster(stanza::roster::Query {
+ ver: None,
+ items: vec![stanza::roster::Item {
+ approved: None,
+ ask: false,
+ jid,
+ name: None,
+ subscription: None,
+ groups: Vec::new(),
+ }],
+ })),
+ errors: Vec::new(),
+ });
+ let (send, recv) = oneshot::channel();
+ {
+ self.pending.lock().await.insert(iq_id.clone(), send);
+ }
+ // TODO: write_handle send helper function
+ let result = connection.write_handle().write(set_stanza).await;
+ if let Err(e) = result {
+ sender.send(Err(RosterError::Write(e)));
+ return;
+ }
+ let iq_result = recv.await;
+ match iq_result {
+ Ok(i) => match i {
+ Ok(iq_result) => match iq_result {
+ Stanza::Iq(Iq {
+ from: _,
+ id,
+ to: _,
+ r#type,
+ lang: _,
+ query: _,
+ errors: _,
+ }) if id == iq_id && r#type == IqType::Result => {
+ sender.send(Ok(()));
+ return;
+ }
+ ref s @ Stanza::Iq(Iq {
+ from: _,
+ ref id,
+ to: _,
+ r#type,
+ lang: _,
+ query: _,
+ ref errors,
+ }) if *id == iq_id && r#type == IqType::Error => {
+ if let Some(error) = errors.first() {
+ sender.send(Err(RosterError::StanzaError(error.clone())));
+ } else {
+ sender.send(Err(RosterError::UnexpectedStanza(s.clone())));
+ }
+ return;
+ }
+ s => {
+ sender.send(Err(RosterError::UnexpectedStanza(s)));
+ return;
+ }
+ },
+ Err(e) => {
+ sender.send(Err(e.into()));
+ return;
+ }
+ },
+ Err(e) => {
+ sender.send(Err(RosterError::Write(WriteError::Actor(e.into()))));
+ return;
+ }
+ }
+ }
+ Command::BuddyRequest(jid, sender) => {
+ let presence = Stanza::Presence(stanza::client::presence::Presence {
+ from: None,
+ id: None,
+ to: Some(jid.clone()),
+ r#type: Some(stanza::client::presence::PresenceType::Subscribe),
+ lang: None,
+ show: None,
+ status: None,
+ priority: None,
+ errors: Vec::new(),
+ delay: None,
+ });
+ let result = connection.write_handle().write(presence).await;
+ match result {
+ Err(_) => {
+ let _ = sender.send(result);
+ }
+ Ok(()) => {
+ let presence = Stanza::Presence(stanza::client::presence::Presence {
+ from: None,
+ id: None,
+ to: Some(jid),
+ r#type: Some(stanza::client::presence::PresenceType::Subscribed),
+ lang: None,
+ show: None,
+ status: None,
+ priority: None,
+ errors: Vec::new(),
+ delay: None,
+ });
+ let result = connection.write_handle().write(presence).await;
+ let _ = sender.send(result);
+ }
+ }
+ }
+ Command::SubscriptionRequest(jid, sender) => {
+ // TODO: i should probably have builders
+ let presence = Stanza::Presence(stanza::client::presence::Presence {
+ from: None,
+ id: None,
+ to: Some(jid),
+ r#type: Some(stanza::client::presence::PresenceType::Subscribe),
+ lang: None,
+ show: None,
+ status: None,
+ priority: None,
+ errors: Vec::new(),
+ delay: None,
+ });
+ let result = connection.write_handle().write(presence).await;
+ let _ = sender.send(result);
+ }
+ Command::AcceptBuddyRequest(jid, sender) => {
+ let presence = Stanza::Presence(stanza::client::presence::Presence {
+ from: None,
+ id: None,
+ to: Some(jid.clone()),
+ r#type: Some(stanza::client::presence::PresenceType::Subscribed),
+ lang: None,
+ show: None,
+ status: None,
+ priority: None,
+ errors: Vec::new(),
+ delay: None,
+ });
+ let result = connection.write_handle().write(presence).await;
+ match result {
+ Err(_) => {
+ let _ = sender.send(result);
+ }
+ Ok(()) => {
+ let presence = Stanza::Presence(stanza::client::presence::Presence {
+ from: None,
+ id: None,
+ to: Some(jid),
+ r#type: Some(stanza::client::presence::PresenceType::Subscribe),
+ lang: None,
+ show: None,
+ status: None,
+ priority: None,
+ errors: Vec::new(),
+ delay: None,
+ });
+ let result = connection.write_handle().write(presence).await;
+ let _ = sender.send(result);
+ }
+ }
+ }
+ Command::AcceptSubscriptionRequest(jid, sender) => {
+ let presence = Stanza::Presence(stanza::client::presence::Presence {
+ from: None,
+ id: None,
+ to: Some(jid),
+ r#type: Some(stanza::client::presence::PresenceType::Subscribe),
+ lang: None,
+ show: None,
+ status: None,
+ priority: None,
+ errors: Vec::new(),
+ delay: None,
+ });
+ let result = connection.write_handle().write(presence).await;
+ let _ = sender.send(result);
+ }
+ Command::UnsubscribeFromContact(jid, sender) => {
+ let presence = Stanza::Presence(stanza::client::presence::Presence {
+ from: None,
+ id: None,
+ to: Some(jid),
+ r#type: Some(stanza::client::presence::PresenceType::Unsubscribe),
+ lang: None,
+ show: None,
+ status: None,
+ priority: None,
+ errors: Vec::new(),
+ delay: None,
+ });
+ let result = connection.write_handle().write(presence).await;
+ let _ = sender.send(result);
+ }
+ Command::UnsubscribeContact(jid, sender) => {
+ let presence = Stanza::Presence(stanza::client::presence::Presence {
+ from: None,
+ id: None,
+ to: Some(jid),
+ r#type: Some(stanza::client::presence::PresenceType::Unsubscribed),
+ lang: None,
+ show: None,
+ status: None,
+ priority: None,
+ errors: Vec::new(),
+ delay: None,
+ });
+ let result = connection.write_handle().write(presence).await;
+ let _ = sender.send(result);
+ }
+ Command::UnfriendContact(jid, sender) => {
+ let presence = Stanza::Presence(stanza::client::presence::Presence {
+ from: None,
+ id: None,
+ to: Some(jid.clone()),
+ r#type: Some(stanza::client::presence::PresenceType::Unsubscribe),
+ lang: None,
+ show: None,
+ status: None,
+ priority: None,
+ errors: Vec::new(),
+ delay: None,
+ });
+ let result = connection.write_handle().write(presence).await;
+ match result {
+ Err(_) => {
+ let _ = sender.send(result);
+ }
+ Ok(()) => {
+ let presence = Stanza::Presence(stanza::client::presence::Presence {
+ from: None,
+ id: None,
+ to: Some(jid),
+ r#type: Some(stanza::client::presence::PresenceType::Unsubscribed),
+ lang: None,
+ show: None,
+ status: None,
+ priority: None,
+ errors: Vec::new(),
+ delay: None,
+ });
+ let result = connection.write_handle().write(presence).await;
+ let _ = sender.send(result);
+ }
+ }
+ }
+ Command::DeleteContact(jid, sender) => {
+ let iq_id = Uuid::new_v4().to_string();
+ let set_stanza = Stanza::Iq(Iq {
+ from: Some(connection.jid().clone()),
+ id: iq_id.clone(),
+ to: None,
+ r#type: IqType::Set,
+ lang: None,
+ query: Some(iq::Query::Roster(stanza::roster::Query {
+ ver: None,
+ items: vec![stanza::roster::Item {
+ approved: None,
+ ask: false,
+ jid,
+ name: None,
+ subscription: Some(stanza::roster::Subscription::Remove),
+ groups: Vec::new(),
+ }],
+ })),
+ errors: Vec::new(),
+ });
+ let (send, recv) = oneshot::channel();
+ {
+ self.pending.lock().await.insert(iq_id.clone(), send);
+ }
+ let result = connection.write_handle().write(set_stanza).await;
+ if let Err(e) = result {
+ sender.send(Err(RosterError::Write(e)));
+ return;
+ }
+ let iq_result = recv.await;
+ match iq_result {
+ Ok(i) => match i {
+ Ok(iq_result) => match iq_result {
+ Stanza::Iq(Iq {
+ from: _,
+ id,
+ to: _,
+ r#type,
+ lang: _,
+ query: _,
+ errors: _,
+ }) if id == iq_id && r#type == IqType::Result => {
+ sender.send(Ok(()));
+ return;
+ }
+ ref s @ Stanza::Iq(Iq {
+ from: _,
+ ref id,
+ to: _,
+ r#type,
+ lang: _,
+ query: _,
+ ref errors,
+ }) if *id == iq_id && r#type == IqType::Error => {
+ if let Some(error) = errors.first() {
+ sender.send(Err(RosterError::StanzaError(error.clone())));
+ } else {
+ sender.send(Err(RosterError::UnexpectedStanza(s.clone())));
+ }
+ return;
+ }
+ s => {
+ sender.send(Err(RosterError::UnexpectedStanza(s)));
+ return;
+ }
+ },
+ Err(e) => {
+ sender.send(Err(e.into()));
+ return;
+ }
+ },
+ Err(e) => {
+ sender.send(Err(RosterError::Write(WriteError::Actor(e.into()))));
+ return;
+ }
+ }
+ }
+ Command::UpdateContact(jid, contact_update, sender) => {
+ let iq_id = Uuid::new_v4().to_string();
+ let groups = Vec::from_iter(
+ contact_update
+ .groups
+ .into_iter()
+ .map(|group| stanza::roster::Group(Some(group))),
+ );
+ let set_stanza = Stanza::Iq(Iq {
+ from: Some(connection.jid().clone()),
+ id: iq_id.clone(),
+ to: None,
+ r#type: IqType::Set,
+ lang: None,
+ query: Some(iq::Query::Roster(stanza::roster::Query {
+ ver: None,
+ items: vec![stanza::roster::Item {
+ approved: None,
+ ask: false,
+ jid,
+ name: contact_update.name,
+ subscription: None,
+ groups,
+ }],
+ })),
+ errors: Vec::new(),
+ });
+ let (send, recv) = oneshot::channel();
+ {
+ self.pending.lock().await.insert(iq_id.clone(), send);
+ }
+ let result = connection.write_handle().write(set_stanza).await;
+ if let Err(e) = result {
+ sender.send(Err(RosterError::Write(e)));
+ return;
+ }
+ let iq_result = recv.await;
+ match iq_result {
+ Ok(i) => match i {
+ Ok(iq_result) => match iq_result {
+ Stanza::Iq(Iq {
+ from: _,
+ id,
+ to: _,
+ r#type,
+ lang: _,
+ query: _,
+ errors: _,
+ }) if id == iq_id && r#type == IqType::Result => {
+ sender.send(Ok(()));
+ return;
+ }
+ ref s @ Stanza::Iq(Iq {
+ from: _,
+ ref id,
+ to: _,
+ r#type,
+ lang: _,
+ query: _,
+ ref errors,
+ }) if *id == iq_id && r#type == IqType::Error => {
+ if let Some(error) = errors.first() {
+ sender.send(Err(RosterError::StanzaError(error.clone())));
+ } else {
+ sender.send(Err(RosterError::UnexpectedStanza(s.clone())));
+ }
+ return;
+ }
+ s => {
+ sender.send(Err(RosterError::UnexpectedStanza(s)));
+ return;
+ }
+ },
+ Err(e) => {
+ sender.send(Err(e.into()));
+ return;
+ }
+ },
+ Err(e) => {
+ sender.send(Err(RosterError::Write(WriteError::Actor(e.into()))));
+ return;
+ }
+ }
+ }
+ Command::SetStatus(online, sender) => {
+ let result = self.db.upsert_cached_status(online.clone()).await;
+ if let Err(e) = result {
+ let _ = self
+ .update_sender
+ .send(UpdateMessage::Error(Error::SetStatus(StatusError::Cache(
+ e.into(),
+ ))))
+ .await;
+ }
+ let result = connection
+ .write_handle()
+ .write(Stanza::Presence(online.into_stanza(None)))
+ .await
+ .map_err(|e| StatusError::Write(e));
+ // .map_err(|e| StatusError::Write(e));
+ let _ = sender.send(result);
+ }
+ // TODO: offline message queue
+ Command::SendMessage(jid, body, sender) => {
+ let id = Uuid::new_v4();
+ let message = Stanza::Message(stanza::client::message::Message {
+ from: Some(connection.jid().clone()),
+ id: Some(id.to_string()),
+ to: Some(jid.clone()),
+ // TODO: specify message type
+ r#type: stanza::client::message::MessageType::Chat,
+ // TODO: lang ?
+ lang: None,
+ subject: None,
+ body: Some(stanza::client::message::Body {
+ lang: None,
+ body: Some(body.body.clone()),
+ }),
+ thread: None,
+ delay: None,
+ });
+ let _ = sender.send(Ok(()));
+ // let _ = sender.send(Ok(message.clone()));
+ let result = connection.write_handle().write(message).await;
+ match result {
+ Ok(_) => {
+ let mut message = Message {
+ id,
+ from: connection.jid().clone(),
+ body,
+ timestamp: Utc::now(),
+ };
+ info!("send message {:?}", message);
+ if let Err(e) = self
+ .db
+ .create_message_with_self_resource_and_chat(
+ message.clone(),
+ jid.clone(),
+ )
+ .await
+ .map_err(|e| e.into())
+ {
+ tracing::error!("{}", e);
+ let _ =
+ self.update_sender
+ .send(UpdateMessage::Error(Error::MessageSend(
+ error::MessageSendError::MessageHistory(e),
+ )));
+ }
+ // TODO: don't do this, have separate from from details
+ message.from = message.from.as_bare();
+ let _ = self
+ .update_sender
+ .send(UpdateMessage::Message { to: jid, message })
+ .await;
+ }
+ Err(_) => {
+ // let _ = sender.send(result);
+ }
+ }
+ }
+ Command::SendPresence(jid, presence, sender) => {
+ let mut presence: stanza::client::presence::Presence = presence.into();
+ if let Some(jid) = jid {
+ presence.to = Some(jid);
+ };
+ let result = connection
+ .write_handle()
+ .write(Stanza::Presence(presence))
+ .await;
+ // .map_err(|e| StatusError::Write(e));
+ let _ = sender.send(result);
+ }
+ }
+ }
+
+ async fn handle_offline(self, command: Command) {
+ match command {
+ Command::GetRoster(sender) => {
+ let roster = self.db.read_cached_roster().await;
+ match roster {
+ Ok(roster) => {
+ let _ = sender.send(Ok(roster));
+ }
+ Err(e) => {
+ let _ = sender.send(Err(RosterError::Cache(e.into())));
+ }
+ }
+ }
+ Command::GetChats(sender) => {
+ let chats = self.db.read_chats().await.map_err(|e| e.into());
+ sender.send(chats);
+ }
+ Command::GetChatsOrdered(sender) => {
+ let chats = self.db.read_chats_ordered().await.map_err(|e| e.into());
+ sender.send(chats);
+ }
+ Command::GetChatsOrderedWithLatestMessages(sender) => {
+ let chats = self
+ .db
+ .read_chats_ordered_with_latest_messages()
+ .await
+ .map_err(|e| e.into());
+ sender.send(chats);
+ }
+ Command::GetChat(jid, sender) => {
+ let chats = self.db.read_chat(jid).await.map_err(|e| e.into());
+ sender.send(chats);
+ }
+ Command::GetMessages(jid, sender) => {
+ let messages = self
+ .db
+ .read_message_history(jid)
+ .await
+ .map_err(|e| e.into());
+ sender.send(messages);
+ }
+ Command::DeleteChat(jid, sender) => {
+ let result = self.db.delete_chat(jid).await.map_err(|e| e.into());
+ sender.send(result);
+ }
+ Command::DeleteMessage(uuid, sender) => {
+ let result = self.db.delete_message(uuid).await.map_err(|e| e.into());
+ sender.send(result);
+ }
+ Command::GetUser(jid, sender) => {
+ let user = self.db.read_user(jid).await.map_err(|e| e.into());
+ sender.send(user);
+ }
+ // TODO: offline queue to modify roster
+ Command::AddContact(_jid, sender) => {
+ sender.send(Err(RosterError::Write(WriteError::Disconnected)));
+ }
+ Command::BuddyRequest(_jid, sender) => {
+ sender.send(Err(WriteError::Disconnected));
+ }
+ Command::SubscriptionRequest(_jid, sender) => {
+ sender.send(Err(WriteError::Disconnected));
+ }
+ Command::AcceptBuddyRequest(_jid, sender) => {
+ sender.send(Err(WriteError::Disconnected));
+ }
+ Command::AcceptSubscriptionRequest(_jid, sender) => {
+ sender.send(Err(WriteError::Disconnected));
+ }
+ Command::UnsubscribeFromContact(_jid, sender) => {
+ sender.send(Err(WriteError::Disconnected));
+ }
+ Command::UnsubscribeContact(_jid, sender) => {
+ sender.send(Err(WriteError::Disconnected));
+ }
+ Command::UnfriendContact(_jid, sender) => {
+ sender.send(Err(WriteError::Disconnected));
+ }
+ Command::DeleteContact(_jid, sender) => {
+ sender.send(Err(RosterError::Write(WriteError::Disconnected)));
+ }
+ Command::UpdateContact(_jid, _contact_update, sender) => {
+ sender.send(Err(RosterError::Write(WriteError::Disconnected)));
+ }
+ Command::SetStatus(online, sender) => {
+ let result = self
+ .db
+ .upsert_cached_status(online)
+ .await
+ .map_err(|e| StatusError::Cache(e.into()));
+ sender.send(result);
+ }
+ // TODO: offline message queue
+ Command::SendMessage(_jid, _body, sender) => {
+ sender.send(Err(WriteError::Disconnected));
+ }
+ Command::SendPresence(_jid, _presence, sender) => {
+ sender.send(Err(WriteError::Disconnected));
+ }
+ }
+ }
+ // pub async fn handle_stream_error(self, error) {}
+ // stanza errors (recoverable)
+ // pub async fn handle_error(self, error: Error) {}
+ // when it aborts, must clear iq map no matter what
+ async fn on_abort(self) {
+ let mut iqs = self.pending.lock().await;
+ for (_id, sender) in iqs.drain() {
+ let _ = sender.send(Err(ReadError::LostConnection));
+ }
+ }
+
+ async fn handle_connection_error(self, error: ConnectionError) {
+ self.update_sender
+ .send(UpdateMessage::Error(
+ ConnectionError::AlreadyConnected.into(),
+ ))
+ .await;
+ }
+}
+
+impl From<Command> for CoreClientCommand<Command> {
+ fn from(value: Command) -> Self {
+ CoreClientCommand::Command(value)
+ }
+}
+
+#[derive(Debug, Clone)]
+pub enum UpdateMessage {
+ Error(Error),
+ Online(Online, Vec<Contact>),
+ Offline(Offline),
+ /// received roster from jabber server (replace full app roster state with this)
+ /// is this needed?
+ FullRoster(Vec<Contact>),
+ /// (only update app roster state, don't replace)
+ RosterUpdate(Contact),
+ RosterDelete(JID),
+ /// presences should be stored with users in the ui, not contacts, as presences can be received from anyone
+ Presence {
+ from: JID,
+ presence: Presence,
+ },
+ // TODO: receipts
+ // MessageDispatched(Uuid),
+ Message {
+ to: JID,
+ message: Message,
+ },
+ SubscriptionRequest(jid::JID),
+}
diff --git a/luz/src/presence.rs b/filamento/src/presence.rs
index e35761c..e35761c 100644
--- a/luz/src/presence.rs
+++ b/filamento/src/presence.rs
diff --git a/luz/src/roster.rs b/filamento/src/roster.rs
index 43c32f5..43c32f5 100644
--- a/luz/src/roster.rs
+++ b/filamento/src/roster.rs
diff --git a/luz/src/user.rs b/filamento/src/user.rs
index 9914d14..9914d14 100644
--- a/luz/src/user.rs
+++ b/filamento/src/user.rs
diff --git a/jabber/Cargo.toml b/jabber/Cargo.toml
deleted file mode 100644
index fbb776b..0000000
--- a/jabber/Cargo.toml
+++ /dev/null
@@ -1,42 +0,0 @@
-[package]
-name = "jabber"
-authors = ["cel <cel@bunny.garden>"]
-version = "0.1.0"
-edition = "2021"
-
-# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
-
-[dependencies]
-async-recursion = "1.0.4"
-async-trait = "0.1.68"
-lazy_static = "1.4.0"
-nanoid = "0.4.0"
-# TODO: remove unneeded features and dependencies
-rsasl = { version = "2.0.1", default_features = false, features = [
- "provider_base64",
- "plain",
- "config_builder",
- "scram-sha-1",
-] }
-tokio = { version = "1.28", features = ["full"] }
-tokio-native-tls = "0.3.1"
-tracing = "0.1.40"
-trust-dns-resolver = "0.22.0"
-try_map = "0.3.1"
-stanza = { version = "0.1.0", path = "../stanza" }
-peanuts = { version = "0.1.0", path = "../../peanuts" }
-jid = { version = "0.1.0", path = "../jid" }
-futures = "0.3.31"
-take_mut = "0.2.2"
-pin-project-lite = "0.2.15"
-pin-project = "1.1.7"
-thiserror = "2.0.11"
-
-[dev-dependencies]
-test-log = { version = "0.2", features = ["trace"] }
-env_logger = "*"
-tracing-subscriber = { version = "0.3", default-features = false, features = [
- "env-filter",
- "fmt",
-] }
-stanza = { version = "0.1.0", path = "../stanza", features = ["xep_0199"] }
diff --git a/jabber/src/error.rs b/jabber/src/error.rs
deleted file mode 100644
index ec60778..0000000
--- a/jabber/src/error.rs
+++ /dev/null
@@ -1,58 +0,0 @@
-use std::str::Utf8Error;
-use std::sync::Arc;
-
-use jid::ParseError;
-use rsasl::mechname::MechanismNameError;
-use stanza::client::error::Error as ClientError;
-use stanza::sasl::Failure;
-use stanza::stream::Error as StreamError;
-use thiserror::Error;
-
-#[derive(Error, Debug, Clone)]
-pub enum Error {
- #[error("connection")]
- Connection,
- #[error("utf8 decode: {0}")]
- Utf8Decode(#[from] Utf8Error),
- #[error("negotiation")]
- Negotiation,
- #[error("tls required")]
- TlsRequired,
- #[error("already connected with tls")]
- AlreadyTls,
- // TODO: specify unsupported feature
- #[error("unsupported feature")]
- Unsupported,
- #[error("jid missing localpart")]
- NoLocalpart,
- #[error("received unexpected element: {0:?}")]
- UnexpectedElement(peanuts::Element),
- #[error("xml error: {0}")]
- XML(#[from] peanuts::Error),
- #[error("sasl error: {0}")]
- SASL(#[from] SASLError),
- #[error("jid error: {0}")]
- JID(#[from] ParseError),
- #[error("client stanza error: {0}")]
- ClientError(#[from] ClientError),
- #[error("stream error: {0}")]
- StreamError(#[from] StreamError),
- #[error("error missing")]
- MissingError,
-}
-
-#[derive(Error, Debug, Clone)]
-pub enum SASLError {
- #[error("sasl error: {0}")]
- SASL(Arc<rsasl::prelude::SASLError>),
- #[error("mechanism error: {0}")]
- MechanismName(#[from] MechanismNameError),
- #[error("authentication failure: {0}")]
- Authentication(#[from] Failure),
-}
-
-impl From<rsasl::prelude::SASLError> for SASLError {
- fn from(e: rsasl::prelude::SASLError) -> Self {
- Self::SASL(Arc::new(e))
- }
-}
diff --git a/jabber/src/lib.rs b/jabber/src/lib.rs
deleted file mode 100644
index 8855ca7..0000000
--- a/jabber/src/lib.rs
+++ /dev/null
@@ -1,25 +0,0 @@
-#![allow(unused_must_use)]
-// #![feature(let_chains)]
-
-// TODO: logging (dropped errors)
-pub mod client;
-pub mod connection;
-pub mod error;
-pub mod jabber_stream;
-
-pub use connection::Connection;
-pub use error::Error;
-pub use jabber_stream::JabberStream;
-pub use jid::JID;
-
-pub type Result<T> = std::result::Result<T, Error>;
-
-pub use client::connect_and_login;
-
-#[cfg(test)]
-mod tests {
- // #[tokio::test]
- // async fn test_login() {
- // crate::login("test@blos.sm/clown", "slayed").await.unwrap();
- // }
-}
diff --git a/luz/.gitignore b/lampada/.gitignore
index 60868fd..60868fd 100644
--- a/luz/.gitignore
+++ b/lampada/.gitignore
diff --git a/lampada/Cargo.toml b/lampada/Cargo.toml
new file mode 100644
index 0000000..856fd7d
--- /dev/null
+++ b/lampada/Cargo.toml
@@ -0,0 +1,17 @@
+[package]
+name = "lampada"
+version = "0.1.0"
+edition = "2021"
+
+[dependencies]
+futures = "0.3.31"
+luz = { version = "0.1.0", path = "../luz" }
+peanuts = { version = "0.1.0", path = "../../peanuts" }
+jid = { version = "0.1.0", path = "../jid", features = ["sqlx"] }
+stanza = { version = "0.1.0", path = "../stanza", features = ["xep_0203"] }
+tokio = "1.42.0"
+tokio-stream = "0.1.17"
+tokio-util = "0.7.13"
+tracing = "0.1.41"
+tracing-subscriber = "0.3.19"
+thiserror = "2.0.11"
diff --git a/lampada/README.md b/lampada/README.md
new file mode 100644
index 0000000..dc5f016
--- /dev/null
+++ b/lampada/README.md
@@ -0,0 +1,3 @@
+# lampada
+
+a core xmpp client that graciously manages streams, delegating logic to an implementor of a trait.
diff --git a/luz/scratch b/lampada/scratch
index 9954aef..e013ded 100644
--- a/luz/scratch
+++ b/lampada/scratch
@@ -1,3 +1,6 @@
+macaw/céu
+canopy/sol
+
# logic:
- db
diff --git a/luz/src/connection/mod.rs b/lampada/src/connection/mod.rs
index 288de70..1e767b0 100644
--- a/luz/src/connection/mod.rs
+++ b/lampada/src/connection/mod.rs
@@ -7,8 +7,8 @@ use std::{
time::Duration,
};
-use jabber::{connection::Tls, jabber_stream::bound_stream::BoundJabberStream};
use jid::JID;
+use luz::{connection::Tls, jabber_stream::bound_stream::BoundJabberStream};
use read::{ReadControl, ReadControlHandle, ReadState};
use stanza::client::Stanza;
use tokio::{
@@ -19,9 +19,8 @@ use tracing::info;
use write::{WriteControl, WriteControlHandle, WriteHandle, WriteMessage, WriteState};
use crate::{
- db::Db,
- error::{ConnectionError, Error, ReadError, WriteError},
- Connected, Logic, LogicState, UpdateMessage,
+ error::{ConnectionError, WriteError},
+ Connected, Logic,
};
mod read;
@@ -84,7 +83,9 @@ impl<Lgc: Logic + Clone + Send + 'static> Supervisor<Lgc> {
match msg {
SupervisorCommand::Disconnect => {
info!("disconnecting");
- // TODO: do handle_disconnect here
+ self.logic
+ .handle_disconnect(self.connected.clone())
+ .await;
let _ = self.write_control_handle.send(WriteControl::Disconnect).await;
let _ = self.read_control_handle.send(ReadControl::Disconnect).await;
info!("sent disconnect command");
@@ -108,11 +109,11 @@ impl<Lgc: Logic + Clone + Send + 'static> Supervisor<Lgc> {
// send abort to read stream, as already done, consider
let (read_state, mut write_state);
match state {
- // TODO: proper state things for read and write thread
ChildState::Write(receiver) => {
write_state = receiver;
let (send, recv) = oneshot::channel();
let _ = self.read_control_handle.send(ReadControl::Abort(send)).await;
+ // TODO: need a tokio select, in case the state arrives from somewhere else
if let Ok(state) = recv.await {
read_state = state;
} else {
@@ -135,7 +136,7 @@ impl<Lgc: Logic + Clone + Send + 'static> Supervisor<Lgc> {
let mut jid = self.connected.jid.clone();
let mut domain = jid.domainpart.clone();
// TODO: make sure connect_and_login does not modify the jid, but instead returns a jid. or something like that
- let connection = jabber::connect_and_login(&mut jid, &*self.password, &mut domain).await;
+ let connection = luz::connect_and_login(&mut jid, &*self.password, &mut domain).await;
match connection {
Ok(c) => {
let (read, write) = c.split();
@@ -182,7 +183,7 @@ impl<Lgc: Logic + Clone + Send + 'static> Supervisor<Lgc> {
let mut jid = self.connected.jid.clone();
let mut domain = jid.domainpart.clone();
// TODO: same here
- let connection = jabber::connect_and_login(&mut jid, &*self.password, &mut domain).await;
+ let connection = luz::connect_and_login(&mut jid, &*self.password, &mut domain).await;
match connection {
Ok(c) => {
let (read, write) = c.split();
@@ -226,7 +227,7 @@ impl<Lgc: Logic + Clone + Send + 'static> Supervisor<Lgc> {
let mut jid = self.connected.jid.clone();
let mut domain = jid.domainpart.clone();
- let connection = jabber::connect_and_login(&mut jid, &*self.password, &mut domain).await;
+ let connection = luz::connect_and_login(&mut jid, &*self.password, &mut domain).await;
match connection {
Ok(c) => {
let (read, write) = c.split();
diff --git a/luz/src/connection/read.rs b/lampada/src/connection/read.rs
index 4e55bc5..cc69387 100644
--- a/luz/src/connection/read.rs
+++ b/lampada/src/connection/read.rs
@@ -7,24 +7,15 @@ use std::{
time::Duration,
};
-use chrono::{DateTime, Utc};
-use jabber::{connection::Tls, jabber_stream::bound_stream::BoundJabberReader};
+use luz::{connection::Tls, jabber_stream::bound_stream::BoundJabberReader};
use stanza::client::Stanza;
use tokio::{
sync::{mpsc, oneshot, Mutex},
task::{JoinHandle, JoinSet},
};
use tracing::info;
-use uuid::Uuid;
-
-use crate::{
- chat::{Body, Message},
- db::Db,
- error::{Error, IqError, MessageRecvError, PresenceError, ReadError, RosterError},
- presence::{Offline, Online, Presence, PresenceType, Show},
- roster::Contact,
- Connected, Logic, LogicState, UpdateMessage,
-};
+
+use crate::{Connected, Logic};
use super::{write::WriteHandle, SupervisorCommand, SupervisorSender};
diff --git a/luz/src/connection/write.rs b/lampada/src/connection/write.rs
index ff78b81..8f0c34b 100644
--- a/luz/src/connection/write.rs
+++ b/lampada/src/connection/write.rs
@@ -1,6 +1,6 @@
use std::ops::{Deref, DerefMut};
-use jabber::{connection::Tls, jabber_stream::bound_stream::BoundJabberWriter};
+use luz::{connection::Tls, jabber_stream::bound_stream::BoundJabberWriter};
use stanza::client::Stanza;
use tokio::{
sync::{mpsc, oneshot},
diff --git a/lampada/src/error.rs b/lampada/src/error.rs
new file mode 100644
index 0000000..cdfb4db
--- /dev/null
+++ b/lampada/src/error.rs
@@ -0,0 +1,82 @@
+use std::sync::Arc;
+
+use stanza::client::Stanza;
+use thiserror::Error;
+use tokio::{
+ sync::{mpsc::error::SendError, oneshot::error::RecvError},
+ time::error::Elapsed,
+};
+
+#[derive(Debug, Error, Clone)]
+pub enum ConnectionError {
+ #[error("connection failed: {0}")]
+ ConnectionFailed(#[from] luz::Error),
+ #[error("already connected")]
+ AlreadyConnected,
+ #[error("already disconnected")]
+ AlreadyDisconnected,
+ #[error("lost connection")]
+ LostConnection,
+ // TODO: Display for Content
+ #[error("disconnected")]
+ Disconnected,
+}
+
+#[derive(Debug, Error, Clone)]
+pub enum CommandError<T> {
+ #[error("actor: {0}")]
+ Actor(ActorError),
+ #[error("{0}")]
+ Error(#[from] T),
+}
+
+#[derive(Debug, Error, Clone)]
+pub enum WriteError {
+ #[error("xml: {0}")]
+ XML(#[from] peanuts::Error),
+ #[error("lost connection")]
+ LostConnection,
+ // TODO: should this be in writeerror or separate?
+ #[error("actor: {0}")]
+ Actor(#[from] ActorError),
+ #[error("disconnected")]
+ Disconnected,
+}
+
+// TODO: separate peanuts read and write error?
+// TODO: which crate
+#[derive(Debug, Error, Clone)]
+pub enum ReadError {
+ #[error("xml: {0}")]
+ XML(#[from] peanuts::Error),
+ #[error("lost connection")]
+ LostConnection,
+}
+
+#[derive(Debug, Error, Clone)]
+pub enum ActorError {
+ #[error("receive timed out")]
+ Timeout,
+ #[error("could not send message to actor, channel closed")]
+ Send,
+ #[error("could not receive message from actor, channel closed")]
+ Receive,
+}
+
+impl From<Elapsed> for ActorError {
+ fn from(_e: Elapsed) -> Self {
+ Self::Timeout
+ }
+}
+
+impl<T> From<SendError<T>> for ActorError {
+ fn from(_e: SendError<T>) -> Self {
+ Self::Send
+ }
+}
+
+impl From<RecvError> for ActorError {
+ fn from(_e: RecvError) -> Self {
+ Self::Receive
+ }
+}
diff --git a/lampada/src/lib.rs b/lampada/src/lib.rs
new file mode 100644
index 0000000..c61c596
--- /dev/null
+++ b/lampada/src/lib.rs
@@ -0,0 +1,238 @@
+use std::{
+ collections::HashMap,
+ ops::{Deref, DerefMut},
+ str::FromStr,
+ sync::Arc,
+ time::Duration,
+};
+
+pub use connection::write::WriteMessage;
+pub use connection::SupervisorSender;
+use error::ConnectionError;
+use futures::{future::Fuse, FutureExt};
+use luz::JID;
+use stanza::client::{
+ iq::{self, Iq, IqType},
+ Stanza,
+};
+use tokio::{
+ sync::{mpsc, oneshot, Mutex},
+ task::JoinSet,
+ time::timeout,
+};
+use tracing::{debug, info};
+
+use crate::connection::write::WriteHandle;
+use crate::connection::{SupervisorCommand, SupervisorHandle};
+
+mod connection;
+pub mod error;
+
+#[derive(Clone)]
+pub struct Connected {
+ // full jid will stay stable across reconnections
+ jid: JID,
+ write_handle: WriteHandle,
+}
+
+impl Connected {
+ pub fn jid(&self) -> &JID {
+ &self.jid
+ }
+
+ pub fn write_handle(&self) -> &WriteHandle {
+ &self.write_handle
+ }
+}
+
+/// everything that a particular xmpp client must implement
+pub trait Logic {
+ /// the command message type
+ type Cmd;
+
+ /// run after binding to the stream (e.g. for a chat client, )
+ fn handle_connect(self, connection: Connected) -> impl std::future::Future<Output = ()> + Send;
+
+ /// run before closing the stream (e.g. send unavailable presence in a chat client)
+ fn handle_disconnect(
+ self,
+ connection: Connected,
+ ) -> impl std::future::Future<Output = ()> + Send;
+
+ /// run to handle an incoming xmpp stanza
+ fn handle_stanza(
+ self,
+ stanza: Stanza,
+ connection: Connected,
+ supervisor: SupervisorSender,
+ ) -> impl std::future::Future<Output = ()> + std::marker::Send;
+
+ /// run to handle a command message when a connection is currently established
+ fn handle_online(
+ self,
+ command: Self::Cmd,
+ connection: Connected,
+ ) -> impl std::future::Future<Output = ()> + std::marker::Send;
+
+ /// run to handle a command message when disconnected
+ fn handle_offline(
+ self,
+ command: Self::Cmd,
+ ) -> impl std::future::Future<Output = ()> + std::marker::Send;
+
+ /// run as cleanup after either an abort or a disconnect (e.g. reply to all pending requests with a disconnected error)
+ fn on_abort(self) -> impl std::future::Future<Output = ()> + std::marker::Send;
+
+ /// handle connection errors from the core client logic
+ fn handle_connection_error(
+ self,
+ error: ConnectionError,
+ ) -> impl std::future::Future<Output = ()> + std::marker::Send;
+
+ // async fn handle_stream_error(self, error) {}
+}
+
+/// an actor that implements xmpp core (rfc6120), manages connection/stream status, and delegates any other logic to the generic which implements Logic, allowing different kinds of clients (e.g. chat, social, pubsub) to be built upon the same core
+pub struct CoreClient<Lgc: Logic> {
+ jid: JID,
+ // TODO: use a dyn passwordprovider trait to avoid storing password in memory
+ password: Arc<String>,
+ receiver: mpsc::Receiver<CoreClientCommand<Lgc::Cmd>>,
+ connected: Option<(Connected, SupervisorHandle)>,
+ // TODO: will need to have an auto reconnect state as well (e.g. in case server shut down, to try and reconnect later)
+ // connected_intention: bool,
+ /// if connection was shut down due to e.g. server shutdown, supervisor must be able to mark client as disconnected
+ connection_supervisor_shutdown: Fuse<oneshot::Receiver<()>>,
+ logic: Lgc,
+ // config: LampConfig,
+ // TODO: will grow forever at this point, maybe not required as tasks will naturally shut down anyway?
+ tasks: JoinSet<()>,
+}
+
+impl<Lgc: Logic + Clone + Send + 'static> CoreClient<Lgc> {
+ /// create a new actor
+ pub fn new(
+ jid: JID,
+ password: String,
+ receiver: mpsc::Receiver<CoreClientCommand<Lgc::Cmd>>,
+ connected: Option<(Connected, SupervisorHandle)>,
+ connection_supervisor_shutdown: Fuse<oneshot::Receiver<()>>,
+ logic: Lgc,
+ ) -> Self {
+ Self {
+ jid,
+ password: Arc::new(password),
+ connected,
+ receiver,
+ connection_supervisor_shutdown,
+ logic,
+ tasks: JoinSet::new(),
+ }
+ }
+
+ /// run the actor
+ pub async fn run(mut self) {
+ loop {
+ let msg = tokio::select! {
+ // this is okay, as when created the supervisor (and connection) doesn't exist, but a bit messy
+ // THIS IS NOT OKAY LOLLLL - apparently fusing is the best option???
+ _ = &mut self.connection_supervisor_shutdown => {
+ self.connected = None;
+ continue;
+ }
+ Some(msg) = self.receiver.recv() => {
+ msg
+ },
+ else => break,
+ };
+ match msg {
+ CoreClientCommand::Connect => {
+ match self.connected {
+ Some(_) => {
+ self.logic
+ .clone()
+ .handle_connection_error(ConnectionError::AlreadyConnected)
+ .await;
+ }
+ None => {
+ let mut jid = self.jid.clone();
+ let mut domain = jid.domainpart.clone();
+ // TODO: check what happens upon reconnection with same resource (this is probably what one wants to do and why jid should be mutated from a bare jid to one with a resource)
+ let streams_result =
+ luz::connect_and_login(&mut jid, &*self.password, &mut domain)
+ .await;
+ match streams_result {
+ Ok(s) => {
+ debug!("ok stream result");
+ let (shutdown_send, shutdown_recv) = oneshot::channel::<()>();
+ let (writer, supervisor) = SupervisorHandle::new(
+ s,
+ shutdown_send,
+ jid.clone(),
+ self.password.clone(),
+ self.logic.clone(),
+ );
+
+ let shutdown_recv = shutdown_recv.fuse();
+ self.connection_supervisor_shutdown = shutdown_recv;
+
+ let connected = Connected {
+ jid,
+ write_handle: writer,
+ };
+
+ self.logic.clone().handle_connect(connected.clone()).await;
+
+ self.connected = Some((connected, supervisor));
+ }
+ Err(e) => {
+ tracing::error!("error: {}", e);
+ self.logic
+ .clone()
+ .handle_connection_error(ConnectionError::ConnectionFailed(
+ e.into(),
+ ))
+ .await;
+ }
+ }
+ }
+ };
+ }
+ CoreClientCommand::Disconnect => match self.connected {
+ None => {
+ self.logic
+ .clone()
+ .handle_connection_error(ConnectionError::AlreadyDisconnected)
+ .await;
+ }
+ ref mut c => {
+ if let Some((connected, supervisor_handle)) = c.take() {
+ let _ = supervisor_handle.send(SupervisorCommand::Disconnect).await;
+ } else {
+ unreachable!()
+ };
+ }
+ },
+ CoreClientCommand::Command(command) => {
+ match self.connected.as_ref() {
+ Some((w, s)) => self
+ .tasks
+ .spawn(self.logic.clone().handle_online(command, w.clone())),
+ None => self.tasks.spawn(self.logic.clone().handle_offline(command)),
+ };
+ }
+ }
+ }
+ }
+}
+
+// TODO: generate methods for each with a macro
+pub enum CoreClientCommand<C> {
+ // TODO: login invisible xep-0186
+ /// connect to XMPP chat server. gets roster and publishes initial presence.
+ Connect,
+ /// disconnect from XMPP chat server, sending unavailable presence then closing stream.
+ Disconnect,
+ /// TODO: generics
+ Command(C),
+}
diff --git a/luz/src/main.rs b/lampada/src/main.rs
index 5aeef14..7b7469d 100644
--- a/luz/src/main.rs
+++ b/lampada/src/main.rs
@@ -1,7 +1,7 @@
use std::{path::Path, str::FromStr, time::Duration};
use jid::JID;
-use luz::{db::Db, LuzHandle, LuzMessage};
+use lampada::{db::Db, CoreClientCommand, LuzHandle};
use sqlx::SqlitePool;
use tokio::{
io::{AsyncReadExt, AsyncWriteExt},
@@ -24,11 +24,11 @@ async fn main() {
}
});
- luz.send(LuzMessage::Connect).await.unwrap();
+ luz.send(CoreClientCommand::Connect).await.unwrap();
let (send, recv) = oneshot::channel();
tokio::time::sleep(Duration::from_secs(5)).await;
info!("sending message");
- luz.send(LuzMessage::SendMessage(
+ luz.send(CoreClientCommand::SendMessage(
JID::from_str("cel@blos.sm").unwrap(),
luz::chat::Body {
body: "hallo!!!".to_string(),
diff --git a/jabber/Cargo.lock b/luz/Cargo.lock
index d45d7c1..d45d7c1 100644
--- a/jabber/Cargo.lock
+++ b/luz/Cargo.lock
diff --git a/luz/Cargo.toml b/luz/Cargo.toml
index 08a0d6c..c1c1511 100644
--- a/luz/Cargo.toml
+++ b/luz/Cargo.toml
@@ -1,20 +1,42 @@
[package]
name = "luz"
+authors = ["cel <cel@bunny.garden>"]
version = "0.1.0"
edition = "2021"
+# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
+
[dependencies]
-futures = "0.3.31"
-jabber = { version = "0.1.0", path = "../jabber" }
+async-recursion = "1.0.4"
+async-trait = "0.1.68"
+lazy_static = "1.4.0"
+nanoid = "0.4.0"
+# TODO: remove unneeded features and dependencies
+rsasl = { version = "2.0.1", default_features = false, features = [
+ "provider_base64",
+ "plain",
+ "config_builder",
+ "scram-sha-1",
+] }
+tokio = { version = "1.28", features = ["full"] }
+tokio-native-tls = "0.3.1"
+tracing = "0.1.40"
+trust-dns-resolver = "0.22.0"
+try_map = "0.3.1"
+stanza = { version = "0.1.0", path = "../stanza" }
peanuts = { version = "0.1.0", path = "../../peanuts" }
-jid = { version = "0.1.0", path = "../jid", features = ["sqlx"] }
-sqlx = { version = "0.8.3", features = ["sqlite", "runtime-tokio", "uuid", "chrono"] }
-stanza = { version = "0.1.0", path = "../stanza", features = ["xep_0203"] }
-tokio = "1.42.0"
-tokio-stream = "0.1.17"
-tokio-util = "0.7.13"
-tracing = "0.1.41"
-tracing-subscriber = "0.3.19"
-uuid = { version = "1.13.1", features = ["v4"] }
+jid = { version = "0.1.0", path = "../jid" }
+futures = "0.3.31"
+take_mut = "0.2.2"
+pin-project-lite = "0.2.15"
+pin-project = "1.1.7"
thiserror = "2.0.11"
-chrono = "0.4.40"
+
+[dev-dependencies]
+test-log = { version = "0.2", features = ["trace"] }
+env_logger = "*"
+tracing-subscriber = { version = "0.3", default-features = false, features = [
+ "env-filter",
+ "fmt",
+] }
+stanza = { version = "0.1.0", path = "../stanza", features = ["xep_0199"] }
diff --git a/jabber/src/client.rs b/luz/src/client.rs
index de2be08..de2be08 100644
--- a/jabber/src/client.rs
+++ b/luz/src/client.rs
diff --git a/jabber/src/connection.rs b/luz/src/connection.rs
index b185eca..b185eca 100644
--- a/jabber/src/connection.rs
+++ b/luz/src/connection.rs
diff --git a/luz/src/error.rs b/luz/src/error.rs
index 46f45a8..ec60778 100644
--- a/luz/src/error.rs
+++ b/luz/src/error.rs
@@ -1,214 +1,58 @@
+use std::str::Utf8Error;
use std::sync::Arc;
-use stanza::client::Stanza;
+use jid::ParseError;
+use rsasl::mechname::MechanismNameError;
+use stanza::client::error::Error as ClientError;
+use stanza::sasl::Failure;
+use stanza::stream::Error as StreamError;
use thiserror::Error;
-use tokio::{
- sync::{mpsc::error::SendError, oneshot::error::RecvError},
- time::error::Elapsed,
-};
-#[derive(Debug, Error, Clone)]
-pub enum ConnectionError {
- #[error("connection failed: {0}")]
- ConnectionFailed(#[from] jabber::Error),
- #[error("already connected")]
- AlreadyConnected,
- #[error("already disconnected")]
- AlreadyDisconnected,
- #[error("lost connection")]
- LostConnection,
- // TODO: Display for Content
- #[error("disconnected")]
- Disconnected,
-}
-
-// for the client logic impl
-#[derive(Debug, Error, Clone)]
+#[derive(Error, Debug, Clone)]
pub enum Error {
- #[error("core error: {0}")]
- Connection(#[from] ConnectionError),
- #[error("received unrecognized/unsupported content: {0:?}")]
- UnrecognizedContent(peanuts::element::Content),
- #[error("iq receive error: {0}")]
- Iq(IqError),
- // TODO: change to Connecting(ConnectingError)
- #[error("connecting: {0}")]
- Connecting(#[from] ConnectionJobError),
- #[error("presence: {0}")]
- Presence(#[from] PresenceError),
- #[error("set status: {0}")]
- SetStatus(#[from] StatusError),
- // TODO: have different ones for get/update/set
- #[error("roster: {0}")]
- Roster(RosterError),
- #[error("stream error: {0}")]
- Stream(#[from] stanza::stream::Error),
- #[error("message send error: {0}")]
- MessageSend(MessageSendError),
- #[error("message receive error: {0}")]
- MessageRecv(MessageRecvError),
-}
-
-#[derive(Debug, Error, Clone)]
-pub enum CommandError<T> {
- #[error("actor: {0}")]
- Actor(ActorError),
- #[error("{0}")]
- Error(#[from] T),
-}
-
-#[derive(Debug, Error, Clone)]
-pub enum MessageSendError {
- #[error("could not add to message history: {0}")]
- MessageHistory(#[from] DatabaseError),
-}
-
-#[derive(Debug, Error, Clone)]
-pub enum PresenceError {
- #[error("unsupported")]
+ #[error("connection")]
+ Connection,
+ #[error("utf8 decode: {0}")]
+ Utf8Decode(#[from] Utf8Error),
+ #[error("negotiation")]
+ Negotiation,
+ #[error("tls required")]
+ TlsRequired,
+ #[error("already connected with tls")]
+ AlreadyTls,
+ // TODO: specify unsupported feature
+ #[error("unsupported feature")]
Unsupported,
- #[error("missing from")]
- MissingFrom,
- #[error("stanza error: {0}")]
- StanzaError(#[from] stanza::client::error::Error),
-}
-
-#[derive(Debug, Error, Clone)]
-// TODO: should probably have all iq query related errors here, including read, write, stanza error, etc.
-pub enum IqError {
- #[error("no iq with id matching `{0}`")]
- NoMatchingId(String),
-}
-
-#[derive(Debug, Error, Clone)]
-pub enum MessageRecvError {
- #[error("could not add to message history: {0}")]
- MessageHistory(#[from] DatabaseError),
- #[error("missing from")]
- MissingFrom,
-}
-
-#[derive(Debug, Clone, Error)]
-pub enum ConnectionJobError {
- #[error("connection failed: {0}")]
- ConnectionFailed(#[from] jabber::Error),
- #[error("failed roster retreival: {0}")]
- RosterRetreival(#[from] RosterError),
- #[error("failed to send available presence: {0}")]
- SendPresence(#[from] WriteError),
- #[error("cached status: {0}")]
- StatusCacheError(#[from] DatabaseError),
-}
-
-#[derive(Debug, Error, Clone)]
-pub enum RosterError {
- #[error("cache: {0}")]
- Cache(#[from] DatabaseError),
- #[error("stream write: {0}")]
- Write(#[from] WriteError),
- // TODO: display for stanza, to show as xml, same for read error types.
- #[error("unexpected reply: {0:?}")]
- UnexpectedStanza(Stanza),
- #[error("stream read: {0}")]
- Read(#[from] ReadError),
- #[error("stanza error: {0}")]
- StanzaError(#[from] stanza::client::error::Error),
-}
-
-#[derive(Debug, Error, Clone)]
-#[error("database error: {0}")]
-pub struct DatabaseError(Arc<sqlx::Error>);
-
-#[derive(Debug, Error, Clone)]
-pub enum DatabaseOpenError {
- #[error("error: {0}")]
- Error(Arc<sqlx::Error>),
- #[error("migration: {0}")]
- Migration(Arc<sqlx::migrate::MigrateError>),
- #[error("io: {0}")]
- Io(Arc<tokio::io::Error>),
- #[error("invalid path")]
- InvalidPath,
-}
-
-impl From<sqlx::Error> for DatabaseError {
- fn from(e: sqlx::Error) -> Self {
- Self(Arc::new(e))
- }
-}
-
-impl From<sqlx::Error> for DatabaseOpenError {
- fn from(e: sqlx::Error) -> Self {
- Self::Error(Arc::new(e))
- }
-}
-
-impl From<sqlx::migrate::MigrateError> for DatabaseOpenError {
- fn from(e: sqlx::migrate::MigrateError) -> Self {
- Self::Migration(Arc::new(e))
- }
-}
-
-impl From<tokio::io::Error> for DatabaseOpenError {
- fn from(e: tokio::io::Error) -> Self {
- Self::Io(Arc::new(e))
- }
-}
-
-#[derive(Debug, Error, Clone)]
-pub enum StatusError {
- #[error("cache: {0}")]
- Cache(#[from] DatabaseError),
- #[error("stream write: {0}")]
- Write(#[from] WriteError),
-}
-
-#[derive(Debug, Error, Clone)]
-pub enum WriteError {
- #[error("xml: {0}")]
- XML(#[from] peanuts::Error),
- #[error("lost connection")]
- LostConnection,
- // TODO: should this be in writeerror or separate?
- #[error("actor: {0}")]
- Actor(#[from] ActorError),
- #[error("disconnected")]
- Disconnected,
-}
-
-// TODO: separate peanuts read and write error?
-#[derive(Debug, Error, Clone)]
-pub enum ReadError {
- #[error("xml: {0}")]
+ #[error("jid missing localpart")]
+ NoLocalpart,
+ #[error("received unexpected element: {0:?}")]
+ UnexpectedElement(peanuts::Element),
+ #[error("xml error: {0}")]
XML(#[from] peanuts::Error),
- #[error("lost connection")]
- LostConnection,
-}
-
-#[derive(Debug, Error, Clone)]
-pub enum ActorError {
- #[error("receive timed out")]
- Timeout,
- #[error("could not send message to actor, channel closed")]
- Send,
- #[error("could not receive message from actor, channel closed")]
- Receive,
-}
-
-impl From<Elapsed> for ActorError {
- fn from(_e: Elapsed) -> Self {
- Self::Timeout
- }
+ #[error("sasl error: {0}")]
+ SASL(#[from] SASLError),
+ #[error("jid error: {0}")]
+ JID(#[from] ParseError),
+ #[error("client stanza error: {0}")]
+ ClientError(#[from] ClientError),
+ #[error("stream error: {0}")]
+ StreamError(#[from] StreamError),
+ #[error("error missing")]
+ MissingError,
}
-impl<T> From<SendError<T>> for ActorError {
- fn from(_e: SendError<T>) -> Self {
- Self::Send
- }
+#[derive(Error, Debug, Clone)]
+pub enum SASLError {
+ #[error("sasl error: {0}")]
+ SASL(Arc<rsasl::prelude::SASLError>),
+ #[error("mechanism error: {0}")]
+ MechanismName(#[from] MechanismNameError),
+ #[error("authentication failure: {0}")]
+ Authentication(#[from] Failure),
}
-impl From<RecvError> for ActorError {
- fn from(_e: RecvError) -> Self {
- Self::Receive
+impl From<rsasl::prelude::SASLError> for SASLError {
+ fn from(e: rsasl::prelude::SASLError) -> Self {
+ Self::SASL(Arc::new(e))
}
}
diff --git a/jabber/src/jabber_stream.rs b/luz/src/jabber_stream.rs
index 302350d..302350d 100644
--- a/jabber/src/jabber_stream.rs
+++ b/luz/src/jabber_stream.rs
diff --git a/jabber/src/jabber_stream/bound_stream.rs b/luz/src/jabber_stream/bound_stream.rs
index 25b79ff..25b79ff 100644
--- a/jabber/src/jabber_stream/bound_stream.rs
+++ b/luz/src/jabber_stream/bound_stream.rs
diff --git a/luz/src/lib.rs b/luz/src/lib.rs
index b9c482c..8855ca7 100644
--- a/luz/src/lib.rs
+++ b/luz/src/lib.rs
@@ -1,1774 +1,25 @@
-use std::{
- collections::HashMap,
- ops::{Deref, DerefMut},
- str::FromStr,
- sync::Arc,
- time::Duration,
-};
+#![allow(unused_must_use)]
+// #![feature(let_chains)]
-use chat::{Body, Chat, Message};
-use chrono::Utc;
-use connection::{write::WriteMessage, SupervisorSender};
-use db::Db;
-use error::{
- ActorError, CommandError, ConnectionError, ConnectionJobError, DatabaseError, IqError,
- MessageRecvError, PresenceError, ReadError, RosterError, StatusError, WriteError,
-};
-use futures::{future::Fuse, FutureExt};
-use jabber::JID;
-use presence::{Offline, Online, Presence, PresenceType, Show};
-use roster::{Contact, ContactUpdate};
-use sqlx::SqlitePool;
-use stanza::client::{
- iq::{self, Iq, IqType},
- Stanza,
-};
-use tokio::{
- sync::{mpsc, oneshot, Mutex},
- task::JoinSet,
- time::timeout,
-};
-use tracing::{debug, info};
-use user::User;
-use uuid::Uuid;
-
-use crate::connection::write::WriteHandle;
-use crate::connection::{SupervisorCommand, SupervisorHandle};
-use crate::error::Error;
-
-pub mod chat;
-mod connection;
-pub mod db;
+// TODO: logging (dropped errors)
+pub mod client;
+pub mod connection;
pub mod error;
-pub mod presence;
-pub mod roster;
-pub mod user;
-
-pub enum Command {
- /// get the roster. if offline, retreive cached version from database. should be stored in application memory
- GetRoster(oneshot::Sender<Result<Vec<Contact>, RosterError>>),
- /// get all chats. chat will include 10 messages in their message Vec (enough for chat previews)
- // TODO: paging and filtering
- GetChats(oneshot::Sender<Result<Vec<Chat>, DatabaseError>>),
- // TODO: paging and filtering
- GetChatsOrdered(oneshot::Sender<Result<Vec<Chat>, DatabaseError>>),
- // TODO: paging and filtering
- GetChatsOrderedWithLatestMessages(oneshot::Sender<Result<Vec<(Chat, Message)>, DatabaseError>>),
- /// get a specific chat by jid
- GetChat(JID, oneshot::Sender<Result<Chat, DatabaseError>>),
- /// get message history for chat (does appropriate mam things)
- // TODO: paging and filtering
- GetMessages(JID, oneshot::Sender<Result<Vec<Message>, DatabaseError>>),
- /// delete a chat from your chat history, along with all the corresponding messages
- DeleteChat(JID, oneshot::Sender<Result<(), DatabaseError>>),
- /// delete a message from your chat history
- DeleteMessage(Uuid, oneshot::Sender<Result<(), DatabaseError>>),
- /// get a user from your users database
- GetUser(JID, oneshot::Sender<Result<User, DatabaseError>>),
- /// add a contact to your roster, with a status of none, no subscriptions.
- AddContact(JID, oneshot::Sender<Result<(), RosterError>>),
- /// send a friend request i.e. a subscription request with a subscription pre-approval. if not already added to roster server adds to roster.
- BuddyRequest(JID, oneshot::Sender<Result<(), WriteError>>),
- /// send a subscription request, without pre-approval. if not already added to roster server adds to roster.
- SubscriptionRequest(JID, oneshot::Sender<Result<(), WriteError>>),
- /// accept a friend request by accepting a pending subscription and sending a subscription request back. if not already added to roster adds to roster.
- AcceptBuddyRequest(JID, oneshot::Sender<Result<(), WriteError>>),
- /// accept a pending subscription and doesn't send a subscription request back. if not already added to roster adds to roster.
- AcceptSubscriptionRequest(JID, oneshot::Sender<Result<(), WriteError>>),
- /// unsubscribe to a contact, but don't remove their subscription.
- UnsubscribeFromContact(JID, oneshot::Sender<Result<(), WriteError>>),
- /// stop a contact from being subscribed, but stay subscribed to the contact.
- UnsubscribeContact(JID, oneshot::Sender<Result<(), WriteError>>),
- /// remove subscriptions to and from contact, but keep in roster.
- UnfriendContact(JID, oneshot::Sender<Result<(), WriteError>>),
- /// remove a contact from the contact list. will remove subscriptions if not already done then delete contact from roster.
- DeleteContact(JID, oneshot::Sender<Result<(), RosterError>>),
- /// update contact. contact details will be overwritten with the contents of the contactupdate struct.
- UpdateContact(JID, ContactUpdate, oneshot::Sender<Result<(), RosterError>>),
- /// set online status. if disconnected, will be cached so when client connects, will be sent as the initial presence.
- SetStatus(Online, oneshot::Sender<Result<(), StatusError>>),
- /// send presence stanza
- // TODO: cache presence stanza
- SendPresence(
- Option<JID>,
- PresenceType,
- oneshot::Sender<Result<(), WriteError>>,
- ),
- /// send a directed presence (usually to a non-contact).
- // TODO: should probably make it so people can add non-contact auto presence sharing in the client (most likely through setting an internal setting)
- /// send a message to a jid (any kind of jid that can receive a message, e.g. a user or a
- /// chatroom). if disconnected, will be cached so when client connects, message will be sent.
- SendMessage(JID, Body, oneshot::Sender<Result<(), WriteError>>),
-}
-
-#[derive(Debug)]
-pub struct Client {
- sender: mpsc::Sender<LuzMessage<Command>>,
- timeout: Duration,
-}
-
-impl Clone for Client {
- fn clone(&self) -> Self {
- Self {
- sender: self.sender.clone(),
- timeout: self.timeout,
- }
- }
-}
-
-impl Deref for Client {
- type Target = mpsc::Sender<LuzMessage<Command>>;
-
- fn deref(&self) -> &Self::Target {
- &self.sender
- }
-}
-
-impl DerefMut for Client {
- fn deref_mut(&mut self) -> &mut Self::Target {
- &mut self.sender
- }
-}
-
-impl Client {
- pub async fn connect(&self) -> Result<(), ActorError> {
- self.send(LuzMessage::Connect).await?;
- Ok(())
- }
-
- pub async fn disconnect(&self, offline: Offline) -> Result<(), ActorError> {
- self.send(LuzMessage::Disconnect).await?;
- Ok(())
- }
-
- pub fn new(jid: JID, password: String, db: Db) -> (Self, mpsc::Receiver<UpdateMessage>) {
- let (command_sender, command_receiver) = mpsc::channel(20);
- let (update_send, update_recv) = mpsc::channel(20);
-
- // might be bad, first supervisor shutdown notification oneshot is never used (disgusting)
- let (_sup_send, sup_recv) = oneshot::channel();
- let sup_recv = sup_recv.fuse();
-
- let logic = LogicState {
- db,
- pending: Arc::new(Mutex::new(HashMap::new())),
- update_sender: update_send,
- };
-
- let actor: Luz<LogicState> =
- Luz::new(jid, password, command_receiver, None, sup_recv, logic);
- tokio::spawn(async move { actor.run().await });
-
- (
- Self {
- sender: command_sender,
- // TODO: configure timeout
- timeout: Duration::from_secs(10),
- },
- update_recv,
- )
- }
-
- pub async fn get_roster(&self) -> Result<Vec<Contact>, CommandError<RosterError>> {
- let (send, recv) = oneshot::channel();
- self.send(LuzMessage::Command(Command::GetRoster(send)))
- .await
- .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
- let roster = timeout(self.timeout, recv)
- .await
- .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
- .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
- Ok(roster)
- }
-
- pub async fn get_chats(&self) -> Result<Vec<Chat>, CommandError<DatabaseError>> {
- let (send, recv) = oneshot::channel();
- self.send(LuzMessage::Command(Command::GetChats(send)))
- .await
- .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
- let chats = timeout(self.timeout, recv)
- .await
- .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
- .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
- Ok(chats)
- }
-
- pub async fn get_chats_ordered(&self) -> Result<Vec<Chat>, CommandError<DatabaseError>> {
- let (send, recv) = oneshot::channel();
- self.send(LuzMessage::Command(Command::GetChatsOrdered(send)))
- .await
- .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
- let chats = timeout(self.timeout, recv)
- .await
- .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
- .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
- Ok(chats)
- }
-
- pub async fn get_chats_ordered_with_latest_messages(
- &self,
- ) -> Result<Vec<(Chat, Message)>, CommandError<DatabaseError>> {
- let (send, recv) = oneshot::channel();
- self.send(LuzMessage::Command(
- Command::GetChatsOrderedWithLatestMessages(send),
- ))
- .await
- .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
- let chats = timeout(self.timeout, recv)
- .await
- .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
- .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
- Ok(chats)
- }
-
- pub async fn get_chat(&self, jid: JID) -> Result<Chat, CommandError<DatabaseError>> {
- let (send, recv) = oneshot::channel();
- self.send(LuzMessage::Command(Command::GetChat(jid, send)))
- .await
- .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
- let chat = timeout(self.timeout, recv)
- .await
- .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
- .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
- Ok(chat)
- }
-
- pub async fn get_messages(
- &self,
- jid: JID,
- ) -> Result<Vec<Message>, CommandError<DatabaseError>> {
- let (send, recv) = oneshot::channel();
- self.send(LuzMessage::Command(Command::GetMessages(jid, send)))
- .await
- .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
- let messages = timeout(self.timeout, recv)
- .await
- .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
- .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
- Ok(messages)
- }
-
- pub async fn delete_chat(&self, jid: JID) -> Result<(), CommandError<DatabaseError>> {
- let (send, recv) = oneshot::channel();
- self.send(LuzMessage::Command(Command::DeleteChat(jid, send)))
- .await
- .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
- let result = timeout(self.timeout, recv)
- .await
- .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
- .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
- Ok(result)
- }
-
- pub async fn delete_message(&self, id: Uuid) -> Result<(), CommandError<DatabaseError>> {
- let (send, recv) = oneshot::channel();
- self.send(LuzMessage::Command(Command::DeleteMessage(id, send)))
- .await
- .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
- let result = timeout(self.timeout, recv)
- .await
- .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
- .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
- Ok(result)
- }
-
- pub async fn get_user(&self, jid: JID) -> Result<User, CommandError<DatabaseError>> {
- let (send, recv) = oneshot::channel();
- self.send(LuzMessage::Command(Command::GetUser(jid, send)))
- .await
- .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
- let result = timeout(self.timeout, recv)
- .await
- .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
- .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
- Ok(result)
- }
-
- pub async fn add_contact(&self, jid: JID) -> Result<(), CommandError<RosterError>> {
- let (send, recv) = oneshot::channel();
- self.send(LuzMessage::Command(Command::AddContact(jid, send)))
- .await
- .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
- let result = timeout(self.timeout, recv)
- .await
- .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
- .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
- Ok(result)
- }
-
- pub async fn buddy_request(&self, jid: JID) -> Result<(), CommandError<WriteError>> {
- let (send, recv) = oneshot::channel();
- self.send(LuzMessage::Command(Command::BuddyRequest(jid, send)))
- .await
- .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
- let result = timeout(self.timeout, recv)
- .await
- .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
- .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
- Ok(result)
- }
-
- pub async fn subscription_request(&self, jid: JID) -> Result<(), CommandError<WriteError>> {
- let (send, recv) = oneshot::channel();
- self.send(LuzMessage::Command(Command::SubscriptionRequest(jid, send)))
- .await
- .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
- let result = timeout(self.timeout, recv)
- .await
- .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
- .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
- Ok(result)
- }
-
- pub async fn accept_buddy_request(&self, jid: JID) -> Result<(), CommandError<WriteError>> {
- let (send, recv) = oneshot::channel();
- self.send(LuzMessage::Command(Command::AcceptBuddyRequest(jid, send)))
- .await
- .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
- let result = timeout(self.timeout, recv)
- .await
- .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
- .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
- Ok(result)
- }
+pub mod jabber_stream;
- pub async fn accept_subscription_request(
- &self,
- jid: JID,
- ) -> Result<(), CommandError<WriteError>> {
- let (send, recv) = oneshot::channel();
- self.send(LuzMessage::Command(Command::AcceptSubscriptionRequest(
- jid, send,
- )))
- .await
- .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
- let result = timeout(self.timeout, recv)
- .await
- .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
- .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
- Ok(result)
- }
+pub use connection::Connection;
+pub use error::Error;
+pub use jabber_stream::JabberStream;
+pub use jid::JID;
- pub async fn unsubscribe_from_contact(&self, jid: JID) -> Result<(), CommandError<WriteError>> {
- let (send, recv) = oneshot::channel();
- self.send(LuzMessage::Command(Command::UnsubscribeFromContact(
- jid, send,
- )))
- .await
- .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
- let result = timeout(self.timeout, recv)
- .await
- .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
- .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
- Ok(result)
- }
+pub type Result<T> = std::result::Result<T, Error>;
- pub async fn unsubscribe_contact(&self, jid: JID) -> Result<(), CommandError<WriteError>> {
- let (send, recv) = oneshot::channel();
- self.send(LuzMessage::Command(Command::UnsubscribeContact(jid, send)))
- .await
- .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
- let result = timeout(self.timeout, recv)
- .await
- .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
- .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
- Ok(result)
- }
-
- pub async fn unfriend_contact(&self, jid: JID) -> Result<(), CommandError<WriteError>> {
- let (send, recv) = oneshot::channel();
- self.send(LuzMessage::Command(Command::UnfriendContact(jid, send)))
- .await
- .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
- let result = timeout(self.timeout, recv)
- .await
- .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
- .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
- Ok(result)
- }
-
- pub async fn delete_contact(&self, jid: JID) -> Result<(), CommandError<RosterError>> {
- let (send, recv) = oneshot::channel();
- self.send(LuzMessage::Command(Command::DeleteContact(jid, send)))
- .await
- .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
- let result = timeout(self.timeout, recv)
- .await
- .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
- .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
- Ok(result)
- }
-
- pub async fn update_contact(
- &self,
- jid: JID,
- update: ContactUpdate,
- ) -> Result<(), CommandError<RosterError>> {
- let (send, recv) = oneshot::channel();
- self.send(LuzMessage::Command(Command::UpdateContact(
- jid, update, send,
- )))
- .await
- .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
- let result = timeout(self.timeout, recv)
- .await
- .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
- .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
- Ok(result)
- }
-
- pub async fn set_status(&self, online: Online) -> Result<(), CommandError<StatusError>> {
- let (send, recv) = oneshot::channel();
- self.send(LuzMessage::Command(Command::SetStatus(online, send)))
- .await
- .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
- let result = timeout(self.timeout, recv)
- .await
- .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
- .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
- Ok(result)
- }
-
- pub async fn send_message(&self, jid: JID, body: Body) -> Result<(), CommandError<WriteError>> {
- let (send, recv) = oneshot::channel();
- self.send(LuzMessage::Command(Command::SendMessage(jid, body, send)))
- .await
- .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
- let result = timeout(self.timeout, recv)
- .await
- .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
- .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
- Ok(result)
- }
-}
-
-#[derive(Clone)]
-pub struct LogicState {
- db: Db,
- pending: Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, ReadError>>>>>,
- update_sender: mpsc::Sender<UpdateMessage>,
-}
-
-impl Logic for LogicState {
- type Cmd = Command;
-
- async fn handle_connect(self, connection: Connected) {
- let (send, recv) = oneshot::channel();
- debug!("getting roster");
- self.clone()
- .handle_online(Command::GetRoster(send), connection.clone())
- .await;
- debug!("sent roster req");
- let roster = recv.await;
- debug!("got roster");
- match roster {
- Ok(r) => match r {
- Ok(roster) => {
- let online = self.db.read_cached_status().await;
- let online = match online {
- Ok(online) => online,
- Err(e) => {
- let _ = self
- .update_sender
- .send(UpdateMessage::Error(Error::Connecting(
- ConnectionJobError::StatusCacheError(e.into()),
- )))
- .await;
- Online::default()
- }
- };
- let (send, recv) = oneshot::channel();
- self.clone()
- .handle_online(
- Command::SendPresence(None, PresenceType::Online(online.clone()), send),
- connection,
- )
- .await;
- let set_status = recv.await;
- match set_status {
- Ok(s) => match s {
- Ok(()) => {
- let _ = self
- .update_sender
- .send(UpdateMessage::Online(online, roster))
- .await;
- }
- Err(e) => {
- let _ = self
- .update_sender
- .send(UpdateMessage::Error(Error::Connecting(e.into())))
- .await;
- }
- },
- Err(e) => {
- let _ = self
- .update_sender
- .send(UpdateMessage::Error(Error::Connecting(
- ConnectionJobError::SendPresence(WriteError::Actor(e.into())),
- )))
- .await;
- }
- }
- }
- Err(e) => {
- let _ = self
- .update_sender
- .send(UpdateMessage::Error(Error::Connecting(e.into())))
- .await;
- }
- },
- Err(e) => {
- let _ = self
- .update_sender
- .send(UpdateMessage::Error(Error::Connecting(
- ConnectionJobError::RosterRetreival(RosterError::Write(WriteError::Actor(
- e.into(),
- ))),
- )))
- .await;
- }
- }
- }
-
- async fn handle_disconnect(self, connection: Connected) {
- // TODO: be able to set offline status message
- let offline_presence: stanza::client::presence::Presence =
- Offline::default().into_stanza(None);
- let stanza = Stanza::Presence(offline_presence);
- // TODO: timeout and error check
- connection.write_handle.write(stanza).await;
- let _ = self
- .update_sender
- .send(UpdateMessage::Offline(Offline::default()))
- .await;
- }
-
- async fn handle_stanza(
- self,
- stanza: Stanza,
- connection: Connected,
- supervisor: SupervisorSender,
- ) {
- match stanza {
- Stanza::Message(stanza_message) => {
- if let Some(mut from) = stanza_message.from {
- // TODO: don't ignore delay from. xep says SHOULD send error if incorrect.
- let timestamp = stanza_message
- .delay
- .map(|delay| delay.stamp)
- .unwrap_or_else(|| Utc::now());
- // TODO: group chat messages
- let mut message = Message {
- id: stanza_message
- .id
- // TODO: proper id storage
- .map(|id| Uuid::from_str(&id).unwrap_or_else(|_| Uuid::new_v4()))
- .unwrap_or_else(|| Uuid::new_v4()),
- from: from.clone(),
- timestamp,
- body: Body {
- // TODO: should this be an option?
- body: stanza_message
- .body
- .map(|body| body.body)
- .unwrap_or_default()
- .unwrap_or_default(),
- },
- };
- // TODO: can this be more efficient?
- let result = self
- .db
- .create_message_with_user_resource_and_chat(message.clone(), from.clone())
- .await;
- if let Err(e) = result {
- tracing::error!("messagecreate");
- let _ = self
- .update_sender
- .send(UpdateMessage::Error(Error::MessageRecv(
- MessageRecvError::MessageHistory(e.into()),
- )))
- .await;
- }
- message.from = message.from.as_bare();
- from = from.as_bare();
- let _ = self
- .update_sender
- .send(UpdateMessage::Message { to: from, message })
- .await;
- } else {
- let _ = self
- .update_sender
- .send(UpdateMessage::Error(Error::MessageRecv(
- MessageRecvError::MissingFrom,
- )))
- .await;
- }
- }
- Stanza::Presence(presence) => {
- if let Some(from) = presence.from {
- match presence.r#type {
- Some(r#type) => match r#type {
- // error processing a presence from somebody
- stanza::client::presence::PresenceType::Error => {
- // TODO: is there any other information that should go with the error? also MUST have an error, otherwise it's a different error. maybe it shoulnd't be an option.
- let _ = self
- .update_sender
- .send(UpdateMessage::Error(Error::Presence(
- // TODO: ughhhhhhhhhhhhh these stanza errors should probably just have an option, and custom display
- PresenceError::StanzaError(
- presence
- .errors
- .first()
- .cloned()
- .expect("error MUST have error"),
- ),
- )))
- .await;
- }
- // should not happen (error to server)
- stanza::client::presence::PresenceType::Probe => {
- // TODO: should probably write an error and restart stream
- let _ = self
- .update_sender
- .send(UpdateMessage::Error(Error::Presence(
- PresenceError::Unsupported,
- )))
- .await;
- }
- stanza::client::presence::PresenceType::Subscribe => {
- // may get a subscription request from somebody who is not a contact!!! therefore should be its own kind of event
- let _ = self
- .update_sender
- .send(UpdateMessage::SubscriptionRequest(from))
- .await;
- }
- stanza::client::presence::PresenceType::Unavailable => {
- let offline = Offline {
- status: presence.status.map(|status| status.status.0),
- };
- let timestamp = presence
- .delay
- .map(|delay| delay.stamp)
- .unwrap_or_else(|| Utc::now());
- let _ = self
- .update_sender
- .send(UpdateMessage::Presence {
- from,
- presence: Presence {
- timestamp,
- presence: PresenceType::Offline(offline),
- },
- })
- .await;
- }
- // for now, do nothing, as these are simply informational. will receive roster push from the server regarding the changes to do with them.
- stanza::client::presence::PresenceType::Subscribed => {}
- stanza::client::presence::PresenceType::Unsubscribe => {}
- stanza::client::presence::PresenceType::Unsubscribed => {}
- },
- None => {
- let online = Online {
- show: presence.show.map(|show| match show {
- stanza::client::presence::Show::Away => Show::Away,
- stanza::client::presence::Show::Chat => Show::Chat,
- stanza::client::presence::Show::Dnd => Show::DoNotDisturb,
- stanza::client::presence::Show::Xa => Show::ExtendedAway,
- }),
- status: presence.status.map(|status| status.status.0),
- priority: presence.priority.map(|priority| priority.0),
- };
- let timestamp = presence
- .delay
- .map(|delay| delay.stamp)
- .unwrap_or_else(|| Utc::now());
- let _ = self
- .update_sender
- .send(UpdateMessage::Presence {
- from,
- presence: Presence {
- timestamp,
- presence: PresenceType::Online(online),
- },
- })
- .await;
- }
- }
- } else {
- let _ = self
- .update_sender
- .send(UpdateMessage::Error(Error::Presence(
- PresenceError::MissingFrom,
- )))
- .await;
- }
- }
- Stanza::Iq(iq) => match iq.r#type {
- stanza::client::iq::IqType::Error | stanza::client::iq::IqType::Result => {
- let send;
- {
- send = self.pending.lock().await.remove(&iq.id);
- }
- if let Some(send) = send {
- send.send(Ok(Stanza::Iq(iq)));
- } else {
- let _ = self
- .update_sender
- .send(UpdateMessage::Error(Error::Iq(IqError::NoMatchingId(
- iq.id,
- ))))
- .await;
- }
- }
- // TODO: send unsupported to server
- // TODO: proper errors i am so tired please
- stanza::client::iq::IqType::Get => {}
- stanza::client::iq::IqType::Set => {
- if let Some(query) = iq.query {
- match query {
- stanza::client::iq::Query::Roster(mut query) => {
- // TODO: there should only be one
- if let Some(item) = query.items.pop() {
- match item.subscription {
- Some(stanza::roster::Subscription::Remove) => {
- self.db.delete_contact(item.jid.clone()).await;
- self.update_sender
- .send(UpdateMessage::RosterDelete(item.jid))
- .await;
- // TODO: send result
- }
- _ => {
- let contact: Contact = item.into();
- if let Err(e) =
- self.db.upsert_contact(contact.clone()).await
- {
- let _ = self
- .update_sender
- .send(UpdateMessage::Error(Error::Roster(
- RosterError::Cache(e.into()),
- )))
- .await;
- }
- let _ = self
- .update_sender
- .send(UpdateMessage::RosterUpdate(contact))
- .await;
- // TODO: send result
- // write_handle.write(Stanza::Iq(stanza::client::iq::Iq {
- // from: ,
- // id: todo!(),
- // to: todo!(),
- // r#type: todo!(),
- // lang: todo!(),
- // query: todo!(),
- // errors: todo!(),
- // }));
- }
- }
- }
- }
- // TODO: send unsupported to server
- _ => {}
- }
- } else {
- // TODO: send error (unsupported) to server
- }
- }
- },
- Stanza::Error(error) => {
- let _ = self
- .update_sender
- .send(UpdateMessage::Error(Error::Stream(error)))
- .await;
- // TODO: reconnect
- }
- Stanza::OtherContent(content) => {
- let _ = self
- .update_sender
- .send(UpdateMessage::Error(Error::UnrecognizedContent(content)));
- // TODO: send error to write_thread
- }
- }
- }
-
- async fn handle_online(self, command: Command, connection: Connected) {
- match command {
- Command::GetRoster(result_sender) => {
- // TODO: jid resource should probably be stored within the connection
- debug!("before client_jid lock");
- debug!("after client_jid lock");
- let iq_id = Uuid::new_v4().to_string();
- let (send, iq_recv) = oneshot::channel();
- {
- self.pending.lock().await.insert(iq_id.clone(), send);
- }
- let stanza = Stanza::Iq(Iq {
- from: Some(connection.jid),
- id: iq_id.to_string(),
- to: None,
- r#type: IqType::Get,
- lang: None,
- query: Some(iq::Query::Roster(stanza::roster::Query {
- ver: None,
- items: Vec::new(),
- })),
- errors: Vec::new(),
- });
- let (send, recv) = oneshot::channel();
- let _ = connection
- .write_handle
- .send(WriteMessage {
- stanza,
- respond_to: send,
- })
- .await;
- // TODO: timeout
- match recv.await {
- Ok(Ok(())) => info!("roster request sent"),
- Ok(Err(e)) => {
- // TODO: log errors if fail to send
- let _ = result_sender.send(Err(RosterError::Write(e.into())));
- return;
- }
- Err(e) => {
- let _ = result_sender
- .send(Err(RosterError::Write(WriteError::Actor(e.into()))));
- return;
- }
- };
- // TODO: timeout
- match iq_recv.await {
- Ok(Ok(stanza)) => match stanza {
- Stanza::Iq(Iq {
- from: _,
- id,
- to: _,
- r#type,
- lang: _,
- query: Some(iq::Query::Roster(stanza::roster::Query { ver: _, items })),
- errors: _,
- }) if id == iq_id && r#type == IqType::Result => {
- let contacts: Vec<Contact> =
- items.into_iter().map(|item| item.into()).collect();
- if let Err(e) = self.db.replace_cached_roster(contacts.clone()).await {
- self.update_sender
- .send(UpdateMessage::Error(Error::Roster(RosterError::Cache(
- e.into(),
- ))))
- .await;
- };
- result_sender.send(Ok(contacts));
- return;
- }
- ref s @ Stanza::Iq(Iq {
- from: _,
- ref id,
- to: _,
- r#type,
- lang: _,
- query: _,
- ref errors,
- }) if *id == iq_id && r#type == IqType::Error => {
- if let Some(error) = errors.first() {
- result_sender.send(Err(RosterError::StanzaError(error.clone())));
- } else {
- result_sender.send(Err(RosterError::UnexpectedStanza(s.clone())));
- }
- return;
- }
- s => {
- result_sender.send(Err(RosterError::UnexpectedStanza(s)));
- return;
- }
- },
- Ok(Err(e)) => {
- result_sender.send(Err(RosterError::Read(e)));
- return;
- }
- Err(e) => {
- result_sender.send(Err(RosterError::Write(WriteError::Actor(e.into()))));
- return;
- }
- }
- }
- Command::GetChats(sender) => {
- let chats = self.db.read_chats().await.map_err(|e| e.into());
- sender.send(chats);
- }
- Command::GetChatsOrdered(sender) => {
- let chats = self.db.read_chats_ordered().await.map_err(|e| e.into());
- sender.send(chats);
- }
- Command::GetChatsOrderedWithLatestMessages(sender) => {
- let chats = self
- .db
- .read_chats_ordered_with_latest_messages()
- .await
- .map_err(|e| e.into());
- sender.send(chats);
- }
- Command::GetChat(jid, sender) => {
- let chats = self.db.read_chat(jid).await.map_err(|e| e.into());
- sender.send(chats);
- }
- Command::GetMessages(jid, sender) => {
- let messages = self
- .db
- .read_message_history(jid)
- .await
- .map_err(|e| e.into());
- sender.send(messages);
- }
- Command::DeleteChat(jid, sender) => {
- let result = self.db.delete_chat(jid).await.map_err(|e| e.into());
- sender.send(result);
- }
- Command::DeleteMessage(uuid, sender) => {
- let result = self.db.delete_message(uuid).await.map_err(|e| e.into());
- sender.send(result);
- }
- Command::GetUser(jid, sender) => {
- let user = self.db.read_user(jid).await.map_err(|e| e.into());
- sender.send(user);
- }
- // TODO: offline queue to modify roster
- Command::AddContact(jid, sender) => {
- let iq_id = Uuid::new_v4().to_string();
- let set_stanza = Stanza::Iq(Iq {
- from: Some(connection.jid),
- id: iq_id.clone(),
- to: None,
- r#type: IqType::Set,
- lang: None,
- query: Some(iq::Query::Roster(stanza::roster::Query {
- ver: None,
- items: vec![stanza::roster::Item {
- approved: None,
- ask: false,
- jid,
- name: None,
- subscription: None,
- groups: Vec::new(),
- }],
- })),
- errors: Vec::new(),
- });
- let (send, recv) = oneshot::channel();
- {
- self.pending.lock().await.insert(iq_id.clone(), send);
- }
- // TODO: write_handle send helper function
- let result = connection.write_handle.write(set_stanza).await;
- if let Err(e) = result {
- sender.send(Err(RosterError::Write(e)));
- return;
- }
- let iq_result = recv.await;
- match iq_result {
- Ok(i) => match i {
- Ok(iq_result) => match iq_result {
- Stanza::Iq(Iq {
- from: _,
- id,
- to: _,
- r#type,
- lang: _,
- query: _,
- errors: _,
- }) if id == iq_id && r#type == IqType::Result => {
- sender.send(Ok(()));
- return;
- }
- ref s @ Stanza::Iq(Iq {
- from: _,
- ref id,
- to: _,
- r#type,
- lang: _,
- query: _,
- ref errors,
- }) if *id == iq_id && r#type == IqType::Error => {
- if let Some(error) = errors.first() {
- sender.send(Err(RosterError::StanzaError(error.clone())));
- } else {
- sender.send(Err(RosterError::UnexpectedStanza(s.clone())));
- }
- return;
- }
- s => {
- sender.send(Err(RosterError::UnexpectedStanza(s)));
- return;
- }
- },
- Err(e) => {
- sender.send(Err(e.into()));
- return;
- }
- },
- Err(e) => {
- sender.send(Err(RosterError::Write(WriteError::Actor(e.into()))));
- return;
- }
- }
- }
- Command::BuddyRequest(jid, sender) => {
- let presence = Stanza::Presence(stanza::client::presence::Presence {
- from: None,
- id: None,
- to: Some(jid.clone()),
- r#type: Some(stanza::client::presence::PresenceType::Subscribe),
- lang: None,
- show: None,
- status: None,
- priority: None,
- errors: Vec::new(),
- delay: None,
- });
- let result = connection.write_handle.write(presence).await;
- match result {
- Err(_) => {
- let _ = sender.send(result);
- }
- Ok(()) => {
- let presence = Stanza::Presence(stanza::client::presence::Presence {
- from: None,
- id: None,
- to: Some(jid),
- r#type: Some(stanza::client::presence::PresenceType::Subscribed),
- lang: None,
- show: None,
- status: None,
- priority: None,
- errors: Vec::new(),
- delay: None,
- });
- let result = connection.write_handle.write(presence).await;
- let _ = sender.send(result);
- }
- }
- }
- Command::SubscriptionRequest(jid, sender) => {
- // TODO: i should probably have builders
- let presence = Stanza::Presence(stanza::client::presence::Presence {
- from: None,
- id: None,
- to: Some(jid),
- r#type: Some(stanza::client::presence::PresenceType::Subscribe),
- lang: None,
- show: None,
- status: None,
- priority: None,
- errors: Vec::new(),
- delay: None,
- });
- let result = connection.write_handle.write(presence).await;
- let _ = sender.send(result);
- }
- Command::AcceptBuddyRequest(jid, sender) => {
- let presence = Stanza::Presence(stanza::client::presence::Presence {
- from: None,
- id: None,
- to: Some(jid.clone()),
- r#type: Some(stanza::client::presence::PresenceType::Subscribed),
- lang: None,
- show: None,
- status: None,
- priority: None,
- errors: Vec::new(),
- delay: None,
- });
- let result = connection.write_handle.write(presence).await;
- match result {
- Err(_) => {
- let _ = sender.send(result);
- }
- Ok(()) => {
- let presence = Stanza::Presence(stanza::client::presence::Presence {
- from: None,
- id: None,
- to: Some(jid),
- r#type: Some(stanza::client::presence::PresenceType::Subscribe),
- lang: None,
- show: None,
- status: None,
- priority: None,
- errors: Vec::new(),
- delay: None,
- });
- let result = connection.write_handle.write(presence).await;
- let _ = sender.send(result);
- }
- }
- }
- Command::AcceptSubscriptionRequest(jid, sender) => {
- let presence = Stanza::Presence(stanza::client::presence::Presence {
- from: None,
- id: None,
- to: Some(jid),
- r#type: Some(stanza::client::presence::PresenceType::Subscribe),
- lang: None,
- show: None,
- status: None,
- priority: None,
- errors: Vec::new(),
- delay: None,
- });
- let result = connection.write_handle.write(presence).await;
- let _ = sender.send(result);
- }
- Command::UnsubscribeFromContact(jid, sender) => {
- let presence = Stanza::Presence(stanza::client::presence::Presence {
- from: None,
- id: None,
- to: Some(jid),
- r#type: Some(stanza::client::presence::PresenceType::Unsubscribe),
- lang: None,
- show: None,
- status: None,
- priority: None,
- errors: Vec::new(),
- delay: None,
- });
- let result = connection.write_handle.write(presence).await;
- let _ = sender.send(result);
- }
- Command::UnsubscribeContact(jid, sender) => {
- let presence = Stanza::Presence(stanza::client::presence::Presence {
- from: None,
- id: None,
- to: Some(jid),
- r#type: Some(stanza::client::presence::PresenceType::Unsubscribed),
- lang: None,
- show: None,
- status: None,
- priority: None,
- errors: Vec::new(),
- delay: None,
- });
- let result = connection.write_handle.write(presence).await;
- let _ = sender.send(result);
- }
- Command::UnfriendContact(jid, sender) => {
- let presence = Stanza::Presence(stanza::client::presence::Presence {
- from: None,
- id: None,
- to: Some(jid.clone()),
- r#type: Some(stanza::client::presence::PresenceType::Unsubscribe),
- lang: None,
- show: None,
- status: None,
- priority: None,
- errors: Vec::new(),
- delay: None,
- });
- let result = connection.write_handle.write(presence).await;
- match result {
- Err(_) => {
- let _ = sender.send(result);
- }
- Ok(()) => {
- let presence = Stanza::Presence(stanza::client::presence::Presence {
- from: None,
- id: None,
- to: Some(jid),
- r#type: Some(stanza::client::presence::PresenceType::Unsubscribed),
- lang: None,
- show: None,
- status: None,
- priority: None,
- errors: Vec::new(),
- delay: None,
- });
- let result = connection.write_handle.write(presence).await;
- let _ = sender.send(result);
- }
- }
- }
- Command::DeleteContact(jid, sender) => {
- let iq_id = Uuid::new_v4().to_string();
- let set_stanza = Stanza::Iq(Iq {
- from: Some(connection.jid),
- id: iq_id.clone(),
- to: None,
- r#type: IqType::Set,
- lang: None,
- query: Some(iq::Query::Roster(stanza::roster::Query {
- ver: None,
- items: vec![stanza::roster::Item {
- approved: None,
- ask: false,
- jid,
- name: None,
- subscription: Some(stanza::roster::Subscription::Remove),
- groups: Vec::new(),
- }],
- })),
- errors: Vec::new(),
- });
- let (send, recv) = oneshot::channel();
- {
- self.pending.lock().await.insert(iq_id.clone(), send);
- }
- let result = connection.write_handle.write(set_stanza).await;
- if let Err(e) = result {
- sender.send(Err(RosterError::Write(e)));
- return;
- }
- let iq_result = recv.await;
- match iq_result {
- Ok(i) => match i {
- Ok(iq_result) => match iq_result {
- Stanza::Iq(Iq {
- from: _,
- id,
- to: _,
- r#type,
- lang: _,
- query: _,
- errors: _,
- }) if id == iq_id && r#type == IqType::Result => {
- sender.send(Ok(()));
- return;
- }
- ref s @ Stanza::Iq(Iq {
- from: _,
- ref id,
- to: _,
- r#type,
- lang: _,
- query: _,
- ref errors,
- }) if *id == iq_id && r#type == IqType::Error => {
- if let Some(error) = errors.first() {
- sender.send(Err(RosterError::StanzaError(error.clone())));
- } else {
- sender.send(Err(RosterError::UnexpectedStanza(s.clone())));
- }
- return;
- }
- s => {
- sender.send(Err(RosterError::UnexpectedStanza(s)));
- return;
- }
- },
- Err(e) => {
- sender.send(Err(e.into()));
- return;
- }
- },
- Err(e) => {
- sender.send(Err(RosterError::Write(WriteError::Actor(e.into()))));
- return;
- }
- }
- }
- Command::UpdateContact(jid, contact_update, sender) => {
- let iq_id = Uuid::new_v4().to_string();
- let groups = Vec::from_iter(
- contact_update
- .groups
- .into_iter()
- .map(|group| stanza::roster::Group(Some(group))),
- );
- let set_stanza = Stanza::Iq(Iq {
- from: Some(connection.jid),
- id: iq_id.clone(),
- to: None,
- r#type: IqType::Set,
- lang: None,
- query: Some(iq::Query::Roster(stanza::roster::Query {
- ver: None,
- items: vec![stanza::roster::Item {
- approved: None,
- ask: false,
- jid,
- name: contact_update.name,
- subscription: None,
- groups,
- }],
- })),
- errors: Vec::new(),
- });
- let (send, recv) = oneshot::channel();
- {
- self.pending.lock().await.insert(iq_id.clone(), send);
- }
- let result = connection.write_handle.write(set_stanza).await;
- if let Err(e) = result {
- sender.send(Err(RosterError::Write(e)));
- return;
- }
- let iq_result = recv.await;
- match iq_result {
- Ok(i) => match i {
- Ok(iq_result) => match iq_result {
- Stanza::Iq(Iq {
- from: _,
- id,
- to: _,
- r#type,
- lang: _,
- query: _,
- errors: _,
- }) if id == iq_id && r#type == IqType::Result => {
- sender.send(Ok(()));
- return;
- }
- ref s @ Stanza::Iq(Iq {
- from: _,
- ref id,
- to: _,
- r#type,
- lang: _,
- query: _,
- ref errors,
- }) if *id == iq_id && r#type == IqType::Error => {
- if let Some(error) = errors.first() {
- sender.send(Err(RosterError::StanzaError(error.clone())));
- } else {
- sender.send(Err(RosterError::UnexpectedStanza(s.clone())));
- }
- return;
- }
- s => {
- sender.send(Err(RosterError::UnexpectedStanza(s)));
- return;
- }
- },
- Err(e) => {
- sender.send(Err(e.into()));
- return;
- }
- },
- Err(e) => {
- sender.send(Err(RosterError::Write(WriteError::Actor(e.into()))));
- return;
- }
- }
- }
- Command::SetStatus(online, sender) => {
- let result = self.db.upsert_cached_status(online.clone()).await;
- if let Err(e) = result {
- let _ = self
- .update_sender
- .send(UpdateMessage::Error(Error::SetStatus(StatusError::Cache(
- e.into(),
- ))))
- .await;
- }
- let result = connection
- .write_handle
- .write(Stanza::Presence(online.into_stanza(None)))
- .await
- .map_err(|e| StatusError::Write(e));
- // .map_err(|e| StatusError::Write(e));
- let _ = sender.send(result);
- }
- // TODO: offline message queue
- Command::SendMessage(jid, body, sender) => {
- let id = Uuid::new_v4();
- let message = Stanza::Message(stanza::client::message::Message {
- from: Some(connection.jid.clone()),
- id: Some(id.to_string()),
- to: Some(jid.clone()),
- // TODO: specify message type
- r#type: stanza::client::message::MessageType::Chat,
- // TODO: lang ?
- lang: None,
- subject: None,
- body: Some(stanza::client::message::Body {
- lang: None,
- body: Some(body.body.clone()),
- }),
- thread: None,
- delay: None,
- });
- let _ = sender.send(Ok(()));
- // let _ = sender.send(Ok(message.clone()));
- let result = connection.write_handle.write(message).await;
- match result {
- Ok(_) => {
- let mut message = Message {
- id,
- from: connection.jid,
- body,
- timestamp: Utc::now(),
- };
- info!("send message {:?}", message);
- if let Err(e) = self
- .db
- .create_message_with_self_resource_and_chat(
- message.clone(),
- jid.clone(),
- )
- .await
- .map_err(|e| e.into())
- {
- tracing::error!("{}", e);
- let _ =
- self.update_sender
- .send(UpdateMessage::Error(Error::MessageSend(
- error::MessageSendError::MessageHistory(e),
- )));
- }
- // TODO: don't do this, have separate from from details
- message.from = message.from.as_bare();
- let _ = self
- .update_sender
- .send(UpdateMessage::Message { to: jid, message })
- .await;
- }
- Err(_) => {
- // let _ = sender.send(result);
- }
- }
- }
- Command::SendPresence(jid, presence, sender) => {
- let mut presence: stanza::client::presence::Presence = presence.into();
- if let Some(jid) = jid {
- presence.to = Some(jid);
- };
- let result = connection
- .write_handle
- .write(Stanza::Presence(presence))
- .await;
- // .map_err(|e| StatusError::Write(e));
- let _ = sender.send(result);
- }
- }
- }
-
- async fn handle_offline(self, command: Command) {
- match command {
- Command::GetRoster(sender) => {
- let roster = self.db.read_cached_roster().await;
- match roster {
- Ok(roster) => {
- let _ = sender.send(Ok(roster));
- }
- Err(e) => {
- let _ = sender.send(Err(RosterError::Cache(e.into())));
- }
- }
- }
- Command::GetChats(sender) => {
- let chats = self.db.read_chats().await.map_err(|e| e.into());
- sender.send(chats);
- }
- Command::GetChatsOrdered(sender) => {
- let chats = self.db.read_chats_ordered().await.map_err(|e| e.into());
- sender.send(chats);
- }
- Command::GetChatsOrderedWithLatestMessages(sender) => {
- let chats = self
- .db
- .read_chats_ordered_with_latest_messages()
- .await
- .map_err(|e| e.into());
- sender.send(chats);
- }
- Command::GetChat(jid, sender) => {
- let chats = self.db.read_chat(jid).await.map_err(|e| e.into());
- sender.send(chats);
- }
- Command::GetMessages(jid, sender) => {
- let messages = self
- .db
- .read_message_history(jid)
- .await
- .map_err(|e| e.into());
- sender.send(messages);
- }
- Command::DeleteChat(jid, sender) => {
- let result = self.db.delete_chat(jid).await.map_err(|e| e.into());
- sender.send(result);
- }
- Command::DeleteMessage(uuid, sender) => {
- let result = self.db.delete_message(uuid).await.map_err(|e| e.into());
- sender.send(result);
- }
- Command::GetUser(jid, sender) => {
- let user = self.db.read_user(jid).await.map_err(|e| e.into());
- sender.send(user);
- }
- // TODO: offline queue to modify roster
- Command::AddContact(_jid, sender) => {
- sender.send(Err(RosterError::Write(WriteError::Disconnected)));
- }
- Command::BuddyRequest(_jid, sender) => {
- sender.send(Err(WriteError::Disconnected));
- }
- Command::SubscriptionRequest(_jid, sender) => {
- sender.send(Err(WriteError::Disconnected));
- }
- Command::AcceptBuddyRequest(_jid, sender) => {
- sender.send(Err(WriteError::Disconnected));
- }
- Command::AcceptSubscriptionRequest(_jid, sender) => {
- sender.send(Err(WriteError::Disconnected));
- }
- Command::UnsubscribeFromContact(_jid, sender) => {
- sender.send(Err(WriteError::Disconnected));
- }
- Command::UnsubscribeContact(_jid, sender) => {
- sender.send(Err(WriteError::Disconnected));
- }
- Command::UnfriendContact(_jid, sender) => {
- sender.send(Err(WriteError::Disconnected));
- }
- Command::DeleteContact(_jid, sender) => {
- sender.send(Err(RosterError::Write(WriteError::Disconnected)));
- }
- Command::UpdateContact(_jid, _contact_update, sender) => {
- sender.send(Err(RosterError::Write(WriteError::Disconnected)));
- }
- Command::SetStatus(online, sender) => {
- let result = self
- .db
- .upsert_cached_status(online)
- .await
- .map_err(|e| StatusError::Cache(e.into()));
- sender.send(result);
- }
- // TODO: offline message queue
- Command::SendMessage(_jid, _body, sender) => {
- sender.send(Err(WriteError::Disconnected));
- }
- Command::SendPresence(_jid, _presence, sender) => {
- sender.send(Err(WriteError::Disconnected));
- }
- }
- }
- // pub async fn handle_stream_error(self, error) {}
- // stanza errors (recoverable)
- // pub async fn handle_error(self, error: Error) {}
- // when it aborts, must clear iq map no matter what
- async fn on_abort(self) {
- let mut iqs = self.pending.lock().await;
- for (_id, sender) in iqs.drain() {
- let _ = sender.send(Err(ReadError::LostConnection));
- }
- }
-
- async fn handle_connection_error(self, error: ConnectionError) {
- self.update_sender
- .send(UpdateMessage::Error(
- ConnectionError::AlreadyConnected.into(),
- ))
- .await;
- }
-}
-
-#[derive(Clone)]
-pub struct Connected {
- // full jid will stay stable across reconnections
- jid: JID,
- write_handle: WriteHandle,
-}
-
-pub trait Logic {
- type Cmd;
-
- fn handle_connect(self, connection: Connected) -> impl std::future::Future<Output = ()> + Send;
- fn handle_disconnect(
- self,
- connection: Connected,
- ) -> impl std::future::Future<Output = ()> + Send;
- fn handle_stanza(
- self,
- stanza: Stanza,
- connection: Connected,
- supervisor: SupervisorSender,
- ) -> impl std::future::Future<Output = ()> + std::marker::Send;
- fn handle_online(
- self,
- command: Self::Cmd,
- connection: Connected,
- ) -> impl std::future::Future<Output = ()> + std::marker::Send;
- fn handle_offline(
- self,
- command: Self::Cmd,
- ) -> impl std::future::Future<Output = ()> + std::marker::Send;
- fn on_abort(self) -> impl std::future::Future<Output = ()> + std::marker::Send;
- // TODO: look at these
- fn handle_connection_error(
- self,
- error: ConnectionError,
- ) -> impl std::future::Future<Output = ()> + std::marker::Send;
- // async fn handle_stream_error(self, error) {}
-}
-
-pub struct Luz<Lgc: Logic> {
- jid: JID,
- password: Arc<String>,
- receiver: mpsc::Receiver<LuzMessage<Lgc::Cmd>>,
- // TODO: use a dyn passwordprovider trait to avoid storing password in memory
- connected: Option<(Connected, SupervisorHandle)>,
- // connected_intention: bool,
- /// if connection was shut down due to e.g. server shutdown, supervisor must be able to mark client as disconnected
- connection_supervisor_shutdown: Fuse<oneshot::Receiver<()>>,
- // TODO: will need to have an auto reconnect state as well (e.g. in case server shut down, to try and reconnect later)
- // TODO: will grow forever at this point, maybe not required as tasks will naturally shut down anyway?
- // TODO: genericize
- logic: Lgc,
- // config: LampConfig,
- tasks: JoinSet<()>,
-}
-
-impl<Lgc: Logic + Clone + Send + 'static> Luz<Lgc> {
- fn new(
- jid: JID,
- password: String,
- receiver: mpsc::Receiver<LuzMessage<Lgc::Cmd>>,
- connected: Option<(Connected, SupervisorHandle)>,
- connection_supervisor_shutdown: Fuse<oneshot::Receiver<()>>,
- logic: Lgc,
- ) -> Self {
- Self {
- jid,
- password: Arc::new(password),
- connected,
- receiver,
- connection_supervisor_shutdown,
- logic,
- tasks: JoinSet::new(),
- }
- }
-
- async fn run(mut self) {
- loop {
- let msg = tokio::select! {
- // this is okay, as when created the supervisor (and connection) doesn't exist, but a bit messy
- // THIS IS NOT OKAY LOLLLL - apparently fusing is the best option???
- _ = &mut self.connection_supervisor_shutdown => {
- self.connected = None;
- continue;
- }
- Some(msg) = self.receiver.recv() => {
- msg
- },
- else => break,
- };
- // TODO: consider separating disconnect/connect and commands apart from commandmessage
- // TODO: dispatch commands separate tasks
- match msg {
- LuzMessage::Connect => {
- match self.connected {
- Some(_) => {
- self.logic
- .clone()
- .handle_connection_error(ConnectionError::AlreadyConnected)
- .await;
- }
- None => {
- let mut jid = self.jid.clone();
- let mut domain = jid.domainpart.clone();
- // TODO: check what happens upon reconnection with same resource (this is probably what one wants to do and why jid should be mutated from a bare jid to one with a resource)
- let streams_result =
- jabber::connect_and_login(&mut jid, &*self.password, &mut domain)
- .await;
- match streams_result {
- Ok(s) => {
- debug!("ok stream result");
- let (shutdown_send, shutdown_recv) = oneshot::channel::<()>();
- let (writer, supervisor) = SupervisorHandle::new(
- s,
- shutdown_send,
- jid.clone(),
- self.password.clone(),
- self.logic.clone(),
- );
-
- let shutdown_recv = shutdown_recv.fuse();
- self.connection_supervisor_shutdown = shutdown_recv;
-
- let connected = Connected {
- jid,
- write_handle: writer,
- };
-
- self.logic.clone().handle_connect(connected.clone()).await;
-
- self.connected = Some((connected, supervisor));
- }
- Err(e) => {
- tracing::error!("error: {}", e);
- self.logic
- .clone()
- .handle_connection_error(ConnectionError::ConnectionFailed(
- e.into(),
- ))
- .await;
- }
- }
- }
- };
- }
- LuzMessage::Disconnect => match self.connected {
- None => {
- self.logic
- .clone()
- .handle_connection_error(ConnectionError::AlreadyDisconnected)
- .await;
- }
- ref mut c => {
- if let Some((connected, supervisor_handle)) = c.take() {
- // TODO: better disconnect logic, only reflect once actually disconnected
- // TODO: call within supervisor instead
- self.logic
- .clone()
- .handle_disconnect(connected.clone())
- .await;
- let _ = supervisor_handle.send(SupervisorCommand::Disconnect).await;
- } else {
- unreachable!()
- };
- }
- },
- LuzMessage::Command(command) => {
- match self.connected.as_ref() {
- Some((w, s)) => self
- .tasks
- .spawn(self.logic.clone().handle_online(command, w.clone())),
- None => self.tasks.spawn(self.logic.clone().handle_offline(command)),
- };
- }
- }
- }
- }
-}
-
-// TODO: generate methods for each with a macro
-pub enum LuzMessage<C> {
- // TODO: login invisible xep-0186
- /// connect to XMPP chat server. gets roster and publishes initial presence.
- Connect,
- /// disconnect from XMPP chat server, sending unavailable presence then closing stream.
- Disconnect,
- /// TODO: generics
- Command(C),
-}
+pub use client::connect_and_login;
-#[derive(Debug, Clone)]
-pub enum UpdateMessage {
- Error(Error),
- Online(Online, Vec<Contact>),
- Offline(Offline),
- /// received roster from jabber server (replace full app roster state with this)
- /// is this needed?
- FullRoster(Vec<Contact>),
- /// (only update app roster state, don't replace)
- RosterUpdate(Contact),
- RosterDelete(JID),
- /// presences should be stored with users in the ui, not contacts, as presences can be received from anyone
- Presence {
- from: JID,
- presence: Presence,
- },
- // TODO: receipts
- // MessageDispatched(Uuid),
- Message {
- to: JID,
- message: Message,
- },
- SubscriptionRequest(jid::JID),
+#[cfg(test)]
+mod tests {
+ // #[tokio::test]
+ // async fn test_login() {
+ // crate::login("test@blos.sm/clown", "slayed").await.unwrap();
+ // }
}