diff options
-rw-r--r-- | .helix/languages.toml | 2 | ||||
-rw-r--r-- | Cargo.toml | 4 | ||||
-rw-r--r-- | filamento/Cargo.toml | 17 | ||||
-rw-r--r-- | filamento/README.md | 3 | ||||
-rw-r--r-- | filamento/filamento.db | bin | 0 -> 90112 bytes | |||
-rw-r--r-- | filamento/migrations/20240113011930_luz.sql (renamed from luz/migrations/20240113011930_luz.sql) | 0 | ||||
-rw-r--r-- | filamento/src/chat.rs (renamed from luz/src/chat.rs) | 0 | ||||
-rw-r--r-- | filamento/src/db.rs (renamed from luz/src/db/mod.rs) | 0 | ||||
-rw-r--r-- | filamento/src/error.rs | 142 | ||||
-rw-r--r-- | filamento/src/lib.rs | 1598 | ||||
-rw-r--r-- | filamento/src/presence.rs (renamed from luz/src/presence.rs) | 0 | ||||
-rw-r--r-- | filamento/src/roster.rs (renamed from luz/src/roster.rs) | 0 | ||||
-rw-r--r-- | filamento/src/user.rs (renamed from luz/src/user.rs) | 0 | ||||
-rw-r--r-- | jabber/Cargo.toml | 42 | ||||
-rw-r--r-- | jabber/src/error.rs | 58 | ||||
-rw-r--r-- | jabber/src/lib.rs | 25 | ||||
-rw-r--r-- | lampada/.gitignore (renamed from luz/.gitignore) | 0 | ||||
-rw-r--r-- | lampada/Cargo.toml | 17 | ||||
-rw-r--r-- | lampada/README.md | 3 | ||||
-rw-r--r-- | lampada/scratch (renamed from luz/scratch) | 3 | ||||
-rw-r--r-- | lampada/src/connection/mod.rs (renamed from luz/src/connection/mod.rs) | 19 | ||||
-rw-r--r-- | lampada/src/connection/read.rs (renamed from luz/src/connection/read.rs) | 15 | ||||
-rw-r--r-- | lampada/src/connection/write.rs (renamed from luz/src/connection/write.rs) | 2 | ||||
-rw-r--r-- | lampada/src/error.rs | 82 | ||||
-rw-r--r-- | lampada/src/lib.rs | 238 | ||||
-rw-r--r-- | lampada/src/main.rs (renamed from luz/src/main.rs) | 6 | ||||
-rw-r--r-- | luz/Cargo.lock (renamed from jabber/Cargo.lock) | 0 | ||||
-rw-r--r-- | luz/Cargo.toml | 46 | ||||
-rw-r--r-- | luz/src/client.rs (renamed from jabber/src/client.rs) | 0 | ||||
-rw-r--r-- | luz/src/connection.rs (renamed from jabber/src/connection.rs) | 0 | ||||
-rw-r--r-- | luz/src/error.rs | 246 | ||||
-rw-r--r-- | luz/src/jabber_stream.rs (renamed from jabber/src/jabber_stream.rs) | 0 | ||||
-rw-r--r-- | luz/src/jabber_stream/bound_stream.rs (renamed from jabber/src/jabber_stream/bound_stream.rs) | 0 | ||||
-rw-r--r-- | luz/src/lib.rs | 1785 |
34 files changed, 2220 insertions, 2133 deletions
diff --git a/.helix/languages.toml b/.helix/languages.toml index 9e34dc3..ad628c8 100644 --- a/.helix/languages.toml +++ b/.helix/languages.toml @@ -1,4 +1,4 @@ [language-server.rust-analyzer] command = "rust-analyzer" -environment = { "DATABASE_URL" = "sqlite://luz/luz.db" } +environment = { "DATABASE_URL" = "sqlite://filamento/filamento.db" } config = { cargo.features = "all" } @@ -3,6 +3,6 @@ resolver = "2" members = [ "luz", - "jabber", - "stanza", "jid", + "lampada", + "stanza", "jid", "filamento", ] diff --git a/filamento/Cargo.toml b/filamento/Cargo.toml new file mode 100644 index 0000000..e25024a --- /dev/null +++ b/filamento/Cargo.toml @@ -0,0 +1,17 @@ +[package] +name = "filamento" +version = "0.1.0" +edition = "2024" + +[dependencies] +futures = "0.3.31" +lampada = { version = "0.1.0", path = "../lampada" } +tokio = "1.42.0" +thiserror = "2.0.11" +stanza = { version = "0.1.0", path = "../stanza", features = ["xep_0203"] } +sqlx = { version = "0.8.3", features = ["sqlite", "runtime-tokio", "uuid", "chrono"] } +# TODO: re-export jid? +jid = { version = "0.1.0", path = "../jid", features = ["sqlx"] } +uuid = { version = "1.13.1", features = ["v4"] } +tracing = "0.1.41" +chrono = "0.4.40" diff --git a/filamento/README.md b/filamento/README.md new file mode 100644 index 0000000..57b4135 --- /dev/null +++ b/filamento/README.md @@ -0,0 +1,3 @@ +# filament + +a high-level xmpp chat client using luz diff --git a/filamento/filamento.db b/filamento/filamento.db Binary files differnew file mode 100644 index 0000000..5c3c720 --- /dev/null +++ b/filamento/filamento.db diff --git a/luz/migrations/20240113011930_luz.sql b/filamento/migrations/20240113011930_luz.sql index 148598b..148598b 100644 --- a/luz/migrations/20240113011930_luz.sql +++ b/filamento/migrations/20240113011930_luz.sql diff --git a/luz/src/chat.rs b/filamento/src/chat.rs index c1194ea..c1194ea 100644 --- a/luz/src/chat.rs +++ b/filamento/src/chat.rs diff --git a/luz/src/db/mod.rs b/filamento/src/db.rs index aea40ac..aea40ac 100644 --- a/luz/src/db/mod.rs +++ b/filamento/src/db.rs diff --git a/filamento/src/error.rs b/filamento/src/error.rs new file mode 100644 index 0000000..996a503 --- /dev/null +++ b/filamento/src/error.rs @@ -0,0 +1,142 @@ +use std::sync::Arc; + +use lampada::error::{ConnectionError, ReadError, WriteError}; +use stanza::client::Stanza; +use thiserror::Error; + +pub use lampada::error::CommandError; + +// for the client logic impl +#[derive(Debug, Error, Clone)] +pub enum Error { + #[error("core error: {0}")] + Connection(#[from] ConnectionError), + #[error("received unrecognized/unsupported content")] + UnrecognizedContent, + // TODO: include content + // UnrecognizedContent(peanuts::element::Content), + #[error("iq receive error: {0}")] + Iq(IqError), + // TODO: change to Connecting(ConnectingError) + #[error("connecting: {0}")] + Connecting(#[from] ConnectionJobError), + #[error("presence: {0}")] + Presence(#[from] PresenceError), + #[error("set status: {0}")] + SetStatus(#[from] StatusError), + // TODO: have different ones for get/update/set + #[error("roster: {0}")] + Roster(RosterError), + #[error("stream error: {0}")] + Stream(#[from] stanza::stream::Error), + #[error("message send error: {0}")] + MessageSend(MessageSendError), + #[error("message receive error: {0}")] + MessageRecv(MessageRecvError), +} + +#[derive(Debug, Error, Clone)] +pub enum MessageSendError { + #[error("could not add to message history: {0}")] + MessageHistory(#[from] DatabaseError), +} + +#[derive(Debug, Error, Clone)] +pub enum MessageRecvError { + #[error("could not add to message history: {0}")] + MessageHistory(#[from] DatabaseError), + #[error("missing from")] + MissingFrom, +} + +#[derive(Debug, Error, Clone)] +pub enum StatusError { + #[error("cache: {0}")] + Cache(#[from] DatabaseError), + #[error("stream write: {0}")] + Write(#[from] WriteError), +} + +#[derive(Debug, Clone, Error)] +pub enum ConnectionJobError { + // #[error("connection failed: {0}")] + // ConnectionFailed(#[from] luz::Error), + #[error("failed roster retreival: {0}")] + RosterRetreival(#[from] RosterError), + #[error("failed to send available presence: {0}")] + SendPresence(#[from] WriteError), + #[error("cached status: {0}")] + StatusCacheError(#[from] DatabaseError), +} + +#[derive(Debug, Error, Clone)] +pub enum RosterError { + #[error("cache: {0}")] + Cache(#[from] DatabaseError), + #[error("stream write: {0}")] + Write(#[from] WriteError), + // TODO: display for stanza, to show as xml, same for read error types. + #[error("unexpected reply: {0:?}")] + UnexpectedStanza(Stanza), + #[error("stream read: {0}")] + Read(#[from] ReadError), + #[error("stanza error: {0}")] + StanzaError(#[from] stanza::client::error::Error), +} + +#[derive(Debug, Error, Clone)] +#[error("database error: {0}")] +pub struct DatabaseError(Arc<sqlx::Error>); + +impl From<sqlx::Error> for DatabaseError { + fn from(e: sqlx::Error) -> Self { + Self(Arc::new(e)) + } +} + +impl From<sqlx::Error> for DatabaseOpenError { + fn from(e: sqlx::Error) -> Self { + Self::Error(Arc::new(e)) + } +} + +#[derive(Debug, Error, Clone)] +// TODO: should probably have all iq query related errors here, including read, write, stanza error, etc. +pub enum IqError { + #[error("no iq with id matching `{0}`")] + NoMatchingId(String), +} + +#[derive(Debug, Error, Clone)] +pub enum DatabaseOpenError { + #[error("error: {0}")] + Error(Arc<sqlx::Error>), + #[error("migration: {0}")] + Migration(Arc<sqlx::migrate::MigrateError>), + #[error("io: {0}")] + Io(Arc<tokio::io::Error>), + #[error("invalid path")] + InvalidPath, +} + +impl From<sqlx::migrate::MigrateError> for DatabaseOpenError { + fn from(e: sqlx::migrate::MigrateError) -> Self { + Self::Migration(Arc::new(e)) + } +} + +impl From<tokio::io::Error> for DatabaseOpenError { + fn from(e: tokio::io::Error) -> Self { + Self::Io(Arc::new(e)) + } +} + +#[derive(Debug, Error, Clone)] +pub enum PresenceError { + #[error("unsupported")] + Unsupported, + #[error("missing from")] + MissingFrom, + #[error("stanza error: {0}")] + StanzaError(#[from] stanza::client::error::Error), +} diff --git a/filamento/src/lib.rs b/filamento/src/lib.rs new file mode 100644 index 0000000..db59a67 --- /dev/null +++ b/filamento/src/lib.rs @@ -0,0 +1,1598 @@ +use std::{ + collections::HashMap, + ops::{Deref, DerefMut}, + str::FromStr, + sync::Arc, + time::Duration, +}; + +use chat::{Body, Chat, Message}; +use chrono::Utc; +use db::Db; +use error::{ + ConnectionJobError, DatabaseError, Error, IqError, MessageRecvError, PresenceError, + RosterError, StatusError, +}; +use futures::FutureExt; +use jid::JID; +use lampada::{ + Connected, CoreClient, CoreClientCommand, Logic, SupervisorSender, WriteMessage, + error::{ActorError, CommandError, ConnectionError, ReadError, WriteError}, +}; +use presence::{Offline, Online, Presence, PresenceType, Show}; +use roster::{Contact, ContactUpdate}; +use stanza::client::{ + Stanza, + iq::{self, Iq, IqType}, +}; +use tokio::{ + sync::{Mutex, mpsc, oneshot}, + time::timeout, +}; +use tracing::{debug, info}; +use user::User; +use uuid::Uuid; + +pub mod chat; +pub mod db; +pub mod error; +pub mod presence; +pub mod roster; +pub mod user; + +pub enum Command { + /// get the roster. if offline, retreive cached version from database. should be stored in application memory + GetRoster(oneshot::Sender<Result<Vec<Contact>, RosterError>>), + /// get all chats. chat will include 10 messages in their message Vec (enough for chat previews) + // TODO: paging and filtering + GetChats(oneshot::Sender<Result<Vec<Chat>, DatabaseError>>), + // TODO: paging and filtering + GetChatsOrdered(oneshot::Sender<Result<Vec<Chat>, DatabaseError>>), + // TODO: paging and filtering + GetChatsOrderedWithLatestMessages(oneshot::Sender<Result<Vec<(Chat, Message)>, DatabaseError>>), + /// get a specific chat by jid + GetChat(JID, oneshot::Sender<Result<Chat, DatabaseError>>), + /// get message history for chat (does appropriate mam things) + // TODO: paging and filtering + GetMessages(JID, oneshot::Sender<Result<Vec<Message>, DatabaseError>>), + /// delete a chat from your chat history, along with all the corresponding messages + DeleteChat(JID, oneshot::Sender<Result<(), DatabaseError>>), + /// delete a message from your chat history + DeleteMessage(Uuid, oneshot::Sender<Result<(), DatabaseError>>), + /// get a user from your users database + GetUser(JID, oneshot::Sender<Result<User, DatabaseError>>), + /// add a contact to your roster, with a status of none, no subscriptions. + AddContact(JID, oneshot::Sender<Result<(), RosterError>>), + /// send a friend request i.e. a subscription request with a subscription pre-approval. if not already added to roster server adds to roster. + BuddyRequest(JID, oneshot::Sender<Result<(), WriteError>>), + /// send a subscription request, without pre-approval. if not already added to roster server adds to roster. + SubscriptionRequest(JID, oneshot::Sender<Result<(), WriteError>>), + /// accept a friend request by accepting a pending subscription and sending a subscription request back. if not already added to roster adds to roster. + AcceptBuddyRequest(JID, oneshot::Sender<Result<(), WriteError>>), + /// accept a pending subscription and doesn't send a subscription request back. if not already added to roster adds to roster. + AcceptSubscriptionRequest(JID, oneshot::Sender<Result<(), WriteError>>), + /// unsubscribe to a contact, but don't remove their subscription. + UnsubscribeFromContact(JID, oneshot::Sender<Result<(), WriteError>>), + /// stop a contact from being subscribed, but stay subscribed to the contact. + UnsubscribeContact(JID, oneshot::Sender<Result<(), WriteError>>), + /// remove subscriptions to and from contact, but keep in roster. + UnfriendContact(JID, oneshot::Sender<Result<(), WriteError>>), + /// remove a contact from the contact list. will remove subscriptions if not already done then delete contact from roster. + DeleteContact(JID, oneshot::Sender<Result<(), RosterError>>), + /// update contact. contact details will be overwritten with the contents of the contactupdate struct. + UpdateContact(JID, ContactUpdate, oneshot::Sender<Result<(), RosterError>>), + /// set online status. if disconnected, will be cached so when client connects, will be sent as the initial presence. + SetStatus(Online, oneshot::Sender<Result<(), StatusError>>), + /// send presence stanza + // TODO: cache presence stanza + SendPresence( + Option<JID>, + PresenceType, + oneshot::Sender<Result<(), WriteError>>, + ), + /// send a directed presence (usually to a non-contact). + // TODO: should probably make it so people can add non-contact auto presence sharing in the client (most likely through setting an internal setting) + /// send a message to a jid (any kind of jid that can receive a message, e.g. a user or a + /// chatroom). if disconnected, will be cached so when client connects, message will be sent. + SendMessage(JID, Body, oneshot::Sender<Result<(), WriteError>>), +} +/// an xmpp client that is suited for a chat client use case +#[derive(Debug)] +pub struct Client { + sender: mpsc::Sender<CoreClientCommand<Command>>, + timeout: Duration, +} + +impl Clone for Client { + fn clone(&self) -> Self { + Self { + sender: self.sender.clone(), + timeout: self.timeout, + } + } +} + +impl Deref for Client { + type Target = mpsc::Sender<CoreClientCommand<Command>>; + + fn deref(&self) -> &Self::Target { + &self.sender + } +} + +impl DerefMut for Client { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.sender + } +} + +impl Client { + pub async fn connect(&self) -> Result<(), ActorError> { + self.send(CoreClientCommand::Connect).await?; + Ok(()) + } + + pub async fn disconnect(&self, offline: Offline) -> Result<(), ActorError> { + self.send(CoreClientCommand::Disconnect).await?; + Ok(()) + } + + pub fn new(jid: JID, password: String, db: Db) -> (Self, mpsc::Receiver<UpdateMessage>) { + let (command_sender, command_receiver) = mpsc::channel(20); + let (update_send, update_recv) = mpsc::channel(20); + + // might be bad, first supervisor shutdown notification oneshot is never used (disgusting) + let (_sup_send, sup_recv) = oneshot::channel(); + let sup_recv = sup_recv.fuse(); + + let logic = ClientLogic { + db, + pending: Arc::new(Mutex::new(HashMap::new())), + update_sender: update_send, + }; + + let actor: CoreClient<ClientLogic> = + CoreClient::new(jid, password, command_receiver, None, sup_recv, logic); + tokio::spawn(async move { actor.run().await }); + + ( + Self { + sender: command_sender, + // TODO: configure timeout + timeout: Duration::from_secs(10), + }, + update_recv, + ) + } + + pub async fn get_roster(&self) -> Result<Vec<Contact>, CommandError<RosterError>> { + let (send, recv) = oneshot::channel(); + self.send(CoreClientCommand::Command(Command::GetRoster(send))) + .await + .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?; + let roster = timeout(self.timeout, recv) + .await + .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))? + .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??; + Ok(roster) + } + + pub async fn get_chats(&self) -> Result<Vec<Chat>, CommandError<DatabaseError>> { + let (send, recv) = oneshot::channel(); + self.send(CoreClientCommand::Command(Command::GetChats(send))) + .await + .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?; + let chats = timeout(self.timeout, recv) + .await + .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))? + .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??; + Ok(chats) + } + + pub async fn get_chats_ordered(&self) -> Result<Vec<Chat>, CommandError<DatabaseError>> { + let (send, recv) = oneshot::channel(); + self.send(CoreClientCommand::Command(Command::GetChatsOrdered(send))) + .await + .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?; + let chats = timeout(self.timeout, recv) + .await + .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))? + .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??; + Ok(chats) + } + + pub async fn get_chats_ordered_with_latest_messages( + &self, + ) -> Result<Vec<(Chat, Message)>, CommandError<DatabaseError>> { + let (send, recv) = oneshot::channel(); + self.send(CoreClientCommand::Command( + Command::GetChatsOrderedWithLatestMessages(send), + )) + .await + .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?; + let chats = timeout(self.timeout, recv) + .await + .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))? + .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??; + Ok(chats) + } + + pub async fn get_chat(&self, jid: JID) -> Result<Chat, CommandError<DatabaseError>> { + let (send, recv) = oneshot::channel(); + self.send(CoreClientCommand::Command(Command::GetChat(jid, send))) + .await + .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?; + let chat = timeout(self.timeout, recv) + .await + .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))? + .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??; + Ok(chat) + } + + pub async fn get_messages( + &self, + jid: JID, + ) -> Result<Vec<Message>, CommandError<DatabaseError>> { + let (send, recv) = oneshot::channel(); + self.send(CoreClientCommand::Command(Command::GetMessages(jid, send))) + .await + .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?; + let messages = timeout(self.timeout, recv) + .await + .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))? + .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??; + Ok(messages) + } + + pub async fn delete_chat(&self, jid: JID) -> Result<(), CommandError<DatabaseError>> { + let (send, recv) = oneshot::channel(); + self.send(CoreClientCommand::Command(Command::DeleteChat(jid, send))) + .await + .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?; + let result = timeout(self.timeout, recv) + .await + .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))? + .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??; + Ok(result) + } + + pub async fn delete_message(&self, id: Uuid) -> Result<(), CommandError<DatabaseError>> { + let (send, recv) = oneshot::channel(); + self.send(CoreClientCommand::Command(Command::DeleteMessage(id, send))) + .await + .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?; + let result = timeout(self.timeout, recv) + .await + .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))? + .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??; + Ok(result) + } + + pub async fn get_user(&self, jid: JID) -> Result<User, CommandError<DatabaseError>> { + let (send, recv) = oneshot::channel(); + self.send(CoreClientCommand::Command(Command::GetUser(jid, send))) + .await + .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?; + let result = timeout(self.timeout, recv) + .await + .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))? + .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??; + Ok(result) + } + + pub async fn add_contact(&self, jid: JID) -> Result<(), CommandError<RosterError>> { + let (send, recv) = oneshot::channel(); + self.send(CoreClientCommand::Command(Command::AddContact(jid, send))) + .await + .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?; + let result = timeout(self.timeout, recv) + .await + .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))? + .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??; + Ok(result) + } + + pub async fn buddy_request(&self, jid: JID) -> Result<(), CommandError<WriteError>> { + let (send, recv) = oneshot::channel(); + self.send(CoreClientCommand::Command(Command::BuddyRequest(jid, send))) + .await + .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?; + let result = timeout(self.timeout, recv) + .await + .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))? + .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??; + Ok(result) + } + + pub async fn subscription_request(&self, jid: JID) -> Result<(), CommandError<WriteError>> { + let (send, recv) = oneshot::channel(); + self.send(CoreClientCommand::Command(Command::SubscriptionRequest( + jid, send, + ))) + .await + .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?; + let result = timeout(self.timeout, recv) + .await + .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))? + .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??; + Ok(result) + } + + pub async fn accept_buddy_request(&self, jid: JID) -> Result<(), CommandError<WriteError>> { + let (send, recv) = oneshot::channel(); + self.send(CoreClientCommand::Command(Command::AcceptBuddyRequest( + jid, send, + ))) + .await + .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?; + let result = timeout(self.timeout, recv) + .await + .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))? + .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??; + Ok(result) + } + + pub async fn accept_subscription_request( + &self, + jid: JID, + ) -> Result<(), CommandError<WriteError>> { + let (send, recv) = oneshot::channel(); + self.send(CoreClientCommand::Command( + Command::AcceptSubscriptionRequest(jid, send), + )) + .await + .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?; + let result = timeout(self.timeout, recv) + .await + .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))? + .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??; + Ok(result) + } + + pub async fn unsubscribe_from_contact(&self, jid: JID) -> Result<(), CommandError<WriteError>> { + let (send, recv) = oneshot::channel(); + self.send(CoreClientCommand::Command(Command::UnsubscribeFromContact( + jid, send, + ))) + .await + .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?; + let result = timeout(self.timeout, recv) + .await + .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))? + .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??; + Ok(result) + } + + pub async fn unsubscribe_contact(&self, jid: JID) -> Result<(), CommandError<WriteError>> { + let (send, recv) = oneshot::channel(); + self.send(CoreClientCommand::Command(Command::UnsubscribeContact( + jid, send, + ))) + .await + .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?; + let result = timeout(self.timeout, recv) + .await + .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))? + .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??; + Ok(result) + } + + pub async fn unfriend_contact(&self, jid: JID) -> Result<(), CommandError<WriteError>> { + let (send, recv) = oneshot::channel(); + self.send(CoreClientCommand::Command(Command::UnfriendContact( + jid, send, + ))) + .await + .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?; + let result = timeout(self.timeout, recv) + .await + .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))? + .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??; + Ok(result) + } + + pub async fn delete_contact(&self, jid: JID) -> Result<(), CommandError<RosterError>> { + let (send, recv) = oneshot::channel(); + self.send(CoreClientCommand::Command(Command::DeleteContact( + jid, send, + ))) + .await + .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?; + let result = timeout(self.timeout, recv) + .await + .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))? + .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??; + Ok(result) + } + + pub async fn update_contact( + &self, + jid: JID, + update: ContactUpdate, + ) -> Result<(), CommandError<RosterError>> { + let (send, recv) = oneshot::channel(); + self.send(CoreClientCommand::Command(Command::UpdateContact( + jid, update, send, + ))) + .await + .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?; + let result = timeout(self.timeout, recv) + .await + .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))? + .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??; + Ok(result) + } + + pub async fn set_status(&self, online: Online) -> Result<(), CommandError<StatusError>> { + let (send, recv) = oneshot::channel(); + self.send(CoreClientCommand::Command(Command::SetStatus(online, send))) + .await + .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?; + let result = timeout(self.timeout, recv) + .await + .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))? + .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??; + Ok(result) + } + + pub async fn send_message(&self, jid: JID, body: Body) -> Result<(), CommandError<WriteError>> { + let (send, recv) = oneshot::channel(); + self.send(CoreClientCommand::Command(Command::SendMessage( + jid, body, send, + ))) + .await + .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?; + let result = timeout(self.timeout, recv) + .await + .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))? + .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??; + Ok(result) + } +} + +#[derive(Clone)] +pub struct ClientLogic { + db: Db, + pending: Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, ReadError>>>>>, + update_sender: mpsc::Sender<UpdateMessage>, +} + +impl Logic for ClientLogic { + type Cmd = Command; + + async fn handle_connect(self, connection: Connected) { + let (send, recv) = oneshot::channel(); + debug!("getting roster"); + self.clone() + .handle_online(Command::GetRoster(send), connection.clone()) + .await; + debug!("sent roster req"); + let roster = recv.await; + debug!("got roster"); + match roster { + Ok(r) => match r { + Ok(roster) => { + let online = self.db.read_cached_status().await; + let online = match online { + Ok(online) => online, + Err(e) => { + let _ = self + .update_sender + .send(UpdateMessage::Error(Error::Connecting( + ConnectionJobError::StatusCacheError(e.into()), + ))) + .await; + Online::default() + } + }; + let (send, recv) = oneshot::channel(); + self.clone() + .handle_online( + Command::SendPresence(None, PresenceType::Online(online.clone()), send), + connection, + ) + .await; + let set_status = recv.await; + match set_status { + Ok(s) => match s { + Ok(()) => { + let _ = self + .update_sender + .send(UpdateMessage::Online(online, roster)) + .await; + } + Err(e) => { + let _ = self + .update_sender + .send(UpdateMessage::Error(Error::Connecting(e.into()))) + .await; + } + }, + Err(e) => { + let _ = self + .update_sender + .send(UpdateMessage::Error(Error::Connecting( + ConnectionJobError::SendPresence(WriteError::Actor(e.into())), + ))) + .await; + } + } + } + Err(e) => { + let _ = self + .update_sender + .send(UpdateMessage::Error(Error::Connecting(e.into()))) + .await; + } + }, + Err(e) => { + let _ = self + .update_sender + .send(UpdateMessage::Error(Error::Connecting( + ConnectionJobError::RosterRetreival(RosterError::Write(WriteError::Actor( + e.into(), + ))), + ))) + .await; + } + } + } + + async fn handle_disconnect(self, connection: Connected) { + // TODO: be able to set offline status message + let offline_presence: stanza::client::presence::Presence = + Offline::default().into_stanza(None); + let stanza = Stanza::Presence(offline_presence); + // TODO: timeout and error check + connection.write_handle().write(stanza).await; + let _ = self + .update_sender + .send(UpdateMessage::Offline(Offline::default())) + .await; + } + + async fn handle_stanza( + self, + stanza: Stanza, + connection: Connected, + supervisor: SupervisorSender, + ) { + match stanza { + Stanza::Message(stanza_message) => { + if let Some(mut from) = stanza_message.from { + // TODO: don't ignore delay from. xep says SHOULD send error if incorrect. + let timestamp = stanza_message + .delay + .map(|delay| delay.stamp) + .unwrap_or_else(|| Utc::now()); + // TODO: group chat messages + let mut message = Message { + id: stanza_message + .id + // TODO: proper id storage + .map(|id| Uuid::from_str(&id).unwrap_or_else(|_| Uuid::new_v4())) + .unwrap_or_else(|| Uuid::new_v4()), + from: from.clone(), + timestamp, + body: Body { + // TODO: should this be an option? + body: stanza_message + .body + .map(|body| body.body) + .unwrap_or_default() + .unwrap_or_default(), + }, + }; + // TODO: can this be more efficient? + let result = self + .db + .create_message_with_user_resource_and_chat(message.clone(), from.clone()) + .await; + if let Err(e) = result { + tracing::error!("messagecreate"); + let _ = self + .update_sender + .send(UpdateMessage::Error(Error::MessageRecv( + MessageRecvError::MessageHistory(e.into()), + ))) + .await; + } + message.from = message.from.as_bare(); + from = from.as_bare(); + let _ = self + .update_sender + .send(UpdateMessage::Message { to: from, message }) + .await; + } else { + let _ = self + .update_sender + .send(UpdateMessage::Error(Error::MessageRecv( + MessageRecvError::MissingFrom, + ))) + .await; + } + } + Stanza::Presence(presence) => { + if let Some(from) = presence.from { + match presence.r#type { + Some(r#type) => match r#type { + // error processing a presence from somebody + stanza::client::presence::PresenceType::Error => { + // TODO: is there any other information that should go with the error? also MUST have an error, otherwise it's a different error. maybe it shoulnd't be an option. + let _ = self + .update_sender + .send(UpdateMessage::Error(Error::Presence( + // TODO: ughhhhhhhhhhhhh these stanza errors should probably just have an option, and custom display + PresenceError::StanzaError( + presence + .errors + .first() + .cloned() + .expect("error MUST have error"), + ), + ))) + .await; + } + // should not happen (error to server) + stanza::client::presence::PresenceType::Probe => { + // TODO: should probably write an error and restart stream + let _ = self + .update_sender + .send(UpdateMessage::Error(Error::Presence( + PresenceError::Unsupported, + ))) + .await; + } + stanza::client::presence::PresenceType::Subscribe => { + // may get a subscription request from somebody who is not a contact!!! therefore should be its own kind of event + let _ = self + .update_sender + .send(UpdateMessage::SubscriptionRequest(from)) + .await; + } + stanza::client::presence::PresenceType::Unavailable => { + let offline = Offline { + status: presence.status.map(|status| status.status.0), + }; + let timestamp = presence + .delay + .map(|delay| delay.stamp) + .unwrap_or_else(|| Utc::now()); + let _ = self + .update_sender + .send(UpdateMessage::Presence { + from, + presence: Presence { + timestamp, + presence: PresenceType::Offline(offline), + }, + }) + .await; + } + // for now, do nothing, as these are simply informational. will receive roster push from the server regarding the changes to do with them. + stanza::client::presence::PresenceType::Subscribed => {} + stanza::client::presence::PresenceType::Unsubscribe => {} + stanza::client::presence::PresenceType::Unsubscribed => {} + }, + None => { + let online = Online { + show: presence.show.map(|show| match show { + stanza::client::presence::Show::Away => Show::Away, + stanza::client::presence::Show::Chat => Show::Chat, + stanza::client::presence::Show::Dnd => Show::DoNotDisturb, + stanza::client::presence::Show::Xa => Show::ExtendedAway, + }), + status: presence.status.map(|status| status.status.0), + priority: presence.priority.map(|priority| priority.0), + }; + let timestamp = presence + .delay + .map(|delay| delay.stamp) + .unwrap_or_else(|| Utc::now()); + let _ = self + .update_sender + .send(UpdateMessage::Presence { + from, + presence: Presence { + timestamp, + presence: PresenceType::Online(online), + }, + }) + .await; + } + } + } else { + let _ = self + .update_sender + .send(UpdateMessage::Error(Error::Presence( + PresenceError::MissingFrom, + ))) + .await; + } + } + Stanza::Iq(iq) => match iq.r#type { + stanza::client::iq::IqType::Error | stanza::client::iq::IqType::Result => { + let send; + { + send = self.pending.lock().await.remove(&iq.id); + } + if let Some(send) = send { + send.send(Ok(Stanza::Iq(iq))); + } else { + let _ = self + .update_sender + .send(UpdateMessage::Error(Error::Iq(IqError::NoMatchingId( + iq.id, + )))) + .await; + } + } + // TODO: send unsupported to server + // TODO: proper errors i am so tired please + stanza::client::iq::IqType::Get => {} + stanza::client::iq::IqType::Set => { + if let Some(query) = iq.query { + match query { + stanza::client::iq::Query::Roster(mut query) => { + // TODO: there should only be one + if let Some(item) = query.items.pop() { + match item.subscription { + Some(stanza::roster::Subscription::Remove) => { + self.db.delete_contact(item.jid.clone()).await; + self.update_sender + .send(UpdateMessage::RosterDelete(item.jid)) + .await; + // TODO: send result + } + _ => { + let contact: Contact = item.into(); + if let Err(e) = + self.db.upsert_contact(contact.clone()).await + { + let _ = self + .update_sender + .send(UpdateMessage::Error(Error::Roster( + RosterError::Cache(e.into()), + ))) + .await; + } + let _ = self + .update_sender + .send(UpdateMessage::RosterUpdate(contact)) + .await; + // TODO: send result + // write_handle.write(Stanza::Iq(stanza::client::iq::Iq { + // from: , + // id: todo!(), + // to: todo!(), + // r#type: todo!(), + // lang: todo!(), + // query: todo!(), + // errors: todo!(), + // })); + } + } + } + } + // TODO: send unsupported to server + _ => {} + } + } else { + // TODO: send error (unsupported) to server + } + } + }, + Stanza::Error(error) => { + let _ = self + .update_sender + .send(UpdateMessage::Error(Error::Stream(error))) + .await; + // TODO: reconnect + } + Stanza::OtherContent(content) => { + let _ = self + .update_sender + .send(UpdateMessage::Error(Error::UnrecognizedContent)); + // TODO: send error to write_thread + } + } + } + + async fn handle_online(self, command: Command, connection: Connected) { + match command { + Command::GetRoster(result_sender) => { + // TODO: jid resource should probably be stored within the connection + debug!("before client_jid lock"); + debug!("after client_jid lock"); + let iq_id = Uuid::new_v4().to_string(); + let (send, iq_recv) = oneshot::channel(); + { + self.pending.lock().await.insert(iq_id.clone(), send); + } + let stanza = Stanza::Iq(Iq { + from: Some(connection.jid().clone()), + id: iq_id.to_string(), + to: None, + r#type: IqType::Get, + lang: None, + query: Some(iq::Query::Roster(stanza::roster::Query { + ver: None, + items: Vec::new(), + })), + errors: Vec::new(), + }); + let (send, recv) = oneshot::channel(); + let _ = connection + .write_handle() + .send(WriteMessage { + stanza, + respond_to: send, + }) + .await; + // TODO: timeout + match recv.await { + Ok(Ok(())) => info!("roster request sent"), + Ok(Err(e)) => { + // TODO: log errors if fail to send + let _ = result_sender.send(Err(RosterError::Write(e.into()))); + return; + } + Err(e) => { + let _ = result_sender + .send(Err(RosterError::Write(WriteError::Actor(e.into())))); + return; + } + }; + // TODO: timeout + match iq_recv.await { + Ok(Ok(stanza)) => match stanza { + Stanza::Iq(Iq { + from: _, + id, + to: _, + r#type, + lang: _, + query: Some(iq::Query::Roster(stanza::roster::Query { ver: _, items })), + errors: _, + }) if id == iq_id && r#type == IqType::Result => { + let contacts: Vec<Contact> = + items.into_iter().map(|item| item.into()).collect(); + if let Err(e) = self.db.replace_cached_roster(contacts.clone()).await { + self.update_sender + .send(UpdateMessage::Error(Error::Roster(RosterError::Cache( + e.into(), + )))) + .await; + }; + result_sender.send(Ok(contacts)); + return; + } + ref s @ Stanza::Iq(Iq { + from: _, + ref id, + to: _, + r#type, + lang: _, + query: _, + ref errors, + }) if *id == iq_id && r#type == IqType::Error => { + if let Some(error) = errors.first() { + result_sender.send(Err(RosterError::StanzaError(error.clone()))); + } else { + result_sender.send(Err(RosterError::UnexpectedStanza(s.clone()))); + } + return; + } + s => { + result_sender.send(Err(RosterError::UnexpectedStanza(s))); + return; + } + }, + Ok(Err(e)) => { + result_sender.send(Err(RosterError::Read(e))); + return; + } + Err(e) => { + result_sender.send(Err(RosterError::Write(WriteError::Actor(e.into())))); + return; + } + } + } + Command::GetChats(sender) => { + let chats = self.db.read_chats().await.map_err(|e| e.into()); + sender.send(chats); + } + Command::GetChatsOrdered(sender) => { + let chats = self.db.read_chats_ordered().await.map_err(|e| e.into()); + sender.send(chats); + } + Command::GetChatsOrderedWithLatestMessages(sender) => { + let chats = self + .db + .read_chats_ordered_with_latest_messages() + .await + .map_err(|e| e.into()); + sender.send(chats); + } + Command::GetChat(jid, sender) => { + let chats = self.db.read_chat(jid).await.map_err(|e| e.into()); + sender.send(chats); + } + Command::GetMessages(jid, sender) => { + let messages = self + .db + .read_message_history(jid) + .await + .map_err(|e| e.into()); + sender.send(messages); + } + Command::DeleteChat(jid, sender) => { + let result = self.db.delete_chat(jid).await.map_err(|e| e.into()); + sender.send(result); + } + Command::DeleteMessage(uuid, sender) => { + let result = self.db.delete_message(uuid).await.map_err(|e| e.into()); + sender.send(result); + } + Command::GetUser(jid, sender) => { + let user = self.db.read_user(jid).await.map_err(|e| e.into()); + sender.send(user); + } + // TODO: offline queue to modify roster + Command::AddContact(jid, sender) => { + let iq_id = Uuid::new_v4().to_string(); + let set_stanza = Stanza::Iq(Iq { + from: Some(connection.jid().clone()), + id: iq_id.clone(), + to: None, + r#type: IqType::Set, + lang: None, + query: Some(iq::Query::Roster(stanza::roster::Query { + ver: None, + items: vec![stanza::roster::Item { + approved: None, + ask: false, + jid, + name: None, + subscription: None, + groups: Vec::new(), + }], + })), + errors: Vec::new(), + }); + let (send, recv) = oneshot::channel(); + { + self.pending.lock().await.insert(iq_id.clone(), send); + } + // TODO: write_handle send helper function + let result = connection.write_handle().write(set_stanza).await; + if let Err(e) = result { + sender.send(Err(RosterError::Write(e))); + return; + } + let iq_result = recv.await; + match iq_result { + Ok(i) => match i { + Ok(iq_result) => match iq_result { + Stanza::Iq(Iq { + from: _, + id, + to: _, + r#type, + lang: _, + query: _, + errors: _, + }) if id == iq_id && r#type == IqType::Result => { + sender.send(Ok(())); + return; + } + ref s @ Stanza::Iq(Iq { + from: _, + ref id, + to: _, + r#type, + lang: _, + query: _, + ref errors, + }) if *id == iq_id && r#type == IqType::Error => { + if let Some(error) = errors.first() { + sender.send(Err(RosterError::StanzaError(error.clone()))); + } else { + sender.send(Err(RosterError::UnexpectedStanza(s.clone()))); + } + return; + } + s => { + sender.send(Err(RosterError::UnexpectedStanza(s))); + return; + } + }, + Err(e) => { + sender.send(Err(e.into())); + return; + } + }, + Err(e) => { + sender.send(Err(RosterError::Write(WriteError::Actor(e.into())))); + return; + } + } + } + Command::BuddyRequest(jid, sender) => { + let presence = Stanza::Presence(stanza::client::presence::Presence { + from: None, + id: None, + to: Some(jid.clone()), + r#type: Some(stanza::client::presence::PresenceType::Subscribe), + lang: None, + show: None, + status: None, + priority: None, + errors: Vec::new(), + delay: None, + }); + let result = connection.write_handle().write(presence).await; + match result { + Err(_) => { + let _ = sender.send(result); + } + Ok(()) => { + let presence = Stanza::Presence(stanza::client::presence::Presence { + from: None, + id: None, + to: Some(jid), + r#type: Some(stanza::client::presence::PresenceType::Subscribed), + lang: None, + show: None, + status: None, + priority: None, + errors: Vec::new(), + delay: None, + }); + let result = connection.write_handle().write(presence).await; + let _ = sender.send(result); + } + } + } + Command::SubscriptionRequest(jid, sender) => { + // TODO: i should probably have builders + let presence = Stanza::Presence(stanza::client::presence::Presence { + from: None, + id: None, + to: Some(jid), + r#type: Some(stanza::client::presence::PresenceType::Subscribe), + lang: None, + show: None, + status: None, + priority: None, + errors: Vec::new(), + delay: None, + }); + let result = connection.write_handle().write(presence).await; + let _ = sender.send(result); + } + Command::AcceptBuddyRequest(jid, sender) => { + let presence = Stanza::Presence(stanza::client::presence::Presence { + from: None, + id: None, + to: Some(jid.clone()), + r#type: Some(stanza::client::presence::PresenceType::Subscribed), + lang: None, + show: None, + status: None, + priority: None, + errors: Vec::new(), + delay: None, + }); + let result = connection.write_handle().write(presence).await; + match result { + Err(_) => { + let _ = sender.send(result); + } + Ok(()) => { + let presence = Stanza::Presence(stanza::client::presence::Presence { + from: None, + id: None, + to: Some(jid), + r#type: Some(stanza::client::presence::PresenceType::Subscribe), + lang: None, + show: None, + status: None, + priority: None, + errors: Vec::new(), + delay: None, + }); + let result = connection.write_handle().write(presence).await; + let _ = sender.send(result); + } + } + } + Command::AcceptSubscriptionRequest(jid, sender) => { + let presence = Stanza::Presence(stanza::client::presence::Presence { + from: None, + id: None, + to: Some(jid), + r#type: Some(stanza::client::presence::PresenceType::Subscribe), + lang: None, + show: None, + status: None, + priority: None, + errors: Vec::new(), + delay: None, + }); + let result = connection.write_handle().write(presence).await; + let _ = sender.send(result); + } + Command::UnsubscribeFromContact(jid, sender) => { + let presence = Stanza::Presence(stanza::client::presence::Presence { + from: None, + id: None, + to: Some(jid), + r#type: Some(stanza::client::presence::PresenceType::Unsubscribe), + lang: None, + show: None, + status: None, + priority: None, + errors: Vec::new(), + delay: None, + }); + let result = connection.write_handle().write(presence).await; + let _ = sender.send(result); + } + Command::UnsubscribeContact(jid, sender) => { + let presence = Stanza::Presence(stanza::client::presence::Presence { + from: None, + id: None, + to: Some(jid), + r#type: Some(stanza::client::presence::PresenceType::Unsubscribed), + lang: None, + show: None, + status: None, + priority: None, + errors: Vec::new(), + delay: None, + }); + let result = connection.write_handle().write(presence).await; + let _ = sender.send(result); + } + Command::UnfriendContact(jid, sender) => { + let presence = Stanza::Presence(stanza::client::presence::Presence { + from: None, + id: None, + to: Some(jid.clone()), + r#type: Some(stanza::client::presence::PresenceType::Unsubscribe), + lang: None, + show: None, + status: None, + priority: None, + errors: Vec::new(), + delay: None, + }); + let result = connection.write_handle().write(presence).await; + match result { + Err(_) => { + let _ = sender.send(result); + } + Ok(()) => { + let presence = Stanza::Presence(stanza::client::presence::Presence { + from: None, + id: None, + to: Some(jid), + r#type: Some(stanza::client::presence::PresenceType::Unsubscribed), + lang: None, + show: None, + status: None, + priority: None, + errors: Vec::new(), + delay: None, + }); + let result = connection.write_handle().write(presence).await; + let _ = sender.send(result); + } + } + } + Command::DeleteContact(jid, sender) => { + let iq_id = Uuid::new_v4().to_string(); + let set_stanza = Stanza::Iq(Iq { + from: Some(connection.jid().clone()), + id: iq_id.clone(), + to: None, + r#type: IqType::Set, + lang: None, + query: Some(iq::Query::Roster(stanza::roster::Query { + ver: None, + items: vec![stanza::roster::Item { + approved: None, + ask: false, + jid, + name: None, + subscription: Some(stanza::roster::Subscription::Remove), + groups: Vec::new(), + }], + })), + errors: Vec::new(), + }); + let (send, recv) = oneshot::channel(); + { + self.pending.lock().await.insert(iq_id.clone(), send); + } + let result = connection.write_handle().write(set_stanza).await; + if let Err(e) = result { + sender.send(Err(RosterError::Write(e))); + return; + } + let iq_result = recv.await; + match iq_result { + Ok(i) => match i { + Ok(iq_result) => match iq_result { + Stanza::Iq(Iq { + from: _, + id, + to: _, + r#type, + lang: _, + query: _, + errors: _, + }) if id == iq_id && r#type == IqType::Result => { + sender.send(Ok(())); + return; + } + ref s @ Stanza::Iq(Iq { + from: _, + ref id, + to: _, + r#type, + lang: _, + query: _, + ref errors, + }) if *id == iq_id && r#type == IqType::Error => { + if let Some(error) = errors.first() { + sender.send(Err(RosterError::StanzaError(error.clone()))); + } else { + sender.send(Err(RosterError::UnexpectedStanza(s.clone()))); + } + return; + } + s => { + sender.send(Err(RosterError::UnexpectedStanza(s))); + return; + } + }, + Err(e) => { + sender.send(Err(e.into())); + return; + } + }, + Err(e) => { + sender.send(Err(RosterError::Write(WriteError::Actor(e.into())))); + return; + } + } + } + Command::UpdateContact(jid, contact_update, sender) => { + let iq_id = Uuid::new_v4().to_string(); + let groups = Vec::from_iter( + contact_update + .groups + .into_iter() + .map(|group| stanza::roster::Group(Some(group))), + ); + let set_stanza = Stanza::Iq(Iq { + from: Some(connection.jid().clone()), + id: iq_id.clone(), + to: None, + r#type: IqType::Set, + lang: None, + query: Some(iq::Query::Roster(stanza::roster::Query { + ver: None, + items: vec![stanza::roster::Item { + approved: None, + ask: false, + jid, + name: contact_update.name, + subscription: None, + groups, + }], + })), + errors: Vec::new(), + }); + let (send, recv) = oneshot::channel(); + { + self.pending.lock().await.insert(iq_id.clone(), send); + } + let result = connection.write_handle().write(set_stanza).await; + if let Err(e) = result { + sender.send(Err(RosterError::Write(e))); + return; + } + let iq_result = recv.await; + match iq_result { + Ok(i) => match i { + Ok(iq_result) => match iq_result { + Stanza::Iq(Iq { + from: _, + id, + to: _, + r#type, + lang: _, + query: _, + errors: _, + }) if id == iq_id && r#type == IqType::Result => { + sender.send(Ok(())); + return; + } + ref s @ Stanza::Iq(Iq { + from: _, + ref id, + to: _, + r#type, + lang: _, + query: _, + ref errors, + }) if *id == iq_id && r#type == IqType::Error => { + if let Some(error) = errors.first() { + sender.send(Err(RosterError::StanzaError(error.clone()))); + } else { + sender.send(Err(RosterError::UnexpectedStanza(s.clone()))); + } + return; + } + s => { + sender.send(Err(RosterError::UnexpectedStanza(s))); + return; + } + }, + Err(e) => { + sender.send(Err(e.into())); + return; + } + }, + Err(e) => { + sender.send(Err(RosterError::Write(WriteError::Actor(e.into())))); + return; + } + } + } + Command::SetStatus(online, sender) => { + let result = self.db.upsert_cached_status(online.clone()).await; + if let Err(e) = result { + let _ = self + .update_sender + .send(UpdateMessage::Error(Error::SetStatus(StatusError::Cache( + e.into(), + )))) + .await; + } + let result = connection + .write_handle() + .write(Stanza::Presence(online.into_stanza(None))) + .await + .map_err(|e| StatusError::Write(e)); + // .map_err(|e| StatusError::Write(e)); + let _ = sender.send(result); + } + // TODO: offline message queue + Command::SendMessage(jid, body, sender) => { + let id = Uuid::new_v4(); + let message = Stanza::Message(stanza::client::message::Message { + from: Some(connection.jid().clone()), + id: Some(id.to_string()), + to: Some(jid.clone()), + // TODO: specify message type + r#type: stanza::client::message::MessageType::Chat, + // TODO: lang ? + lang: None, + subject: None, + body: Some(stanza::client::message::Body { + lang: None, + body: Some(body.body.clone()), + }), + thread: None, + delay: None, + }); + let _ = sender.send(Ok(())); + // let _ = sender.send(Ok(message.clone())); + let result = connection.write_handle().write(message).await; + match result { + Ok(_) => { + let mut message = Message { + id, + from: connection.jid().clone(), + body, + timestamp: Utc::now(), + }; + info!("send message {:?}", message); + if let Err(e) = self + .db + .create_message_with_self_resource_and_chat( + message.clone(), + jid.clone(), + ) + .await + .map_err(|e| e.into()) + { + tracing::error!("{}", e); + let _ = + self.update_sender + .send(UpdateMessage::Error(Error::MessageSend( + error::MessageSendError::MessageHistory(e), + ))); + } + // TODO: don't do this, have separate from from details + message.from = message.from.as_bare(); + let _ = self + .update_sender + .send(UpdateMessage::Message { to: jid, message }) + .await; + } + Err(_) => { + // let _ = sender.send(result); + } + } + } + Command::SendPresence(jid, presence, sender) => { + let mut presence: stanza::client::presence::Presence = presence.into(); + if let Some(jid) = jid { + presence.to = Some(jid); + }; + let result = connection + .write_handle() + .write(Stanza::Presence(presence)) + .await; + // .map_err(|e| StatusError::Write(e)); + let _ = sender.send(result); + } + } + } + + async fn handle_offline(self, command: Command) { + match command { + Command::GetRoster(sender) => { + let roster = self.db.read_cached_roster().await; + match roster { + Ok(roster) => { + let _ = sender.send(Ok(roster)); + } + Err(e) => { + let _ = sender.send(Err(RosterError::Cache(e.into()))); + } + } + } + Command::GetChats(sender) => { + let chats = self.db.read_chats().await.map_err(|e| e.into()); + sender.send(chats); + } + Command::GetChatsOrdered(sender) => { + let chats = self.db.read_chats_ordered().await.map_err(|e| e.into()); + sender.send(chats); + } + Command::GetChatsOrderedWithLatestMessages(sender) => { + let chats = self + .db + .read_chats_ordered_with_latest_messages() + .await + .map_err(|e| e.into()); + sender.send(chats); + } + Command::GetChat(jid, sender) => { + let chats = self.db.read_chat(jid).await.map_err(|e| e.into()); + sender.send(chats); + } + Command::GetMessages(jid, sender) => { + let messages = self + .db + .read_message_history(jid) + .await + .map_err(|e| e.into()); + sender.send(messages); + } + Command::DeleteChat(jid, sender) => { + let result = self.db.delete_chat(jid).await.map_err(|e| e.into()); + sender.send(result); + } + Command::DeleteMessage(uuid, sender) => { + let result = self.db.delete_message(uuid).await.map_err(|e| e.into()); + sender.send(result); + } + Command::GetUser(jid, sender) => { + let user = self.db.read_user(jid).await.map_err(|e| e.into()); + sender.send(user); + } + // TODO: offline queue to modify roster + Command::AddContact(_jid, sender) => { + sender.send(Err(RosterError::Write(WriteError::Disconnected))); + } + Command::BuddyRequest(_jid, sender) => { + sender.send(Err(WriteError::Disconnected)); + } + Command::SubscriptionRequest(_jid, sender) => { + sender.send(Err(WriteError::Disconnected)); + } + Command::AcceptBuddyRequest(_jid, sender) => { + sender.send(Err(WriteError::Disconnected)); + } + Command::AcceptSubscriptionRequest(_jid, sender) => { + sender.send(Err(WriteError::Disconnected)); + } + Command::UnsubscribeFromContact(_jid, sender) => { + sender.send(Err(WriteError::Disconnected)); + } + Command::UnsubscribeContact(_jid, sender) => { + sender.send(Err(WriteError::Disconnected)); + } + Command::UnfriendContact(_jid, sender) => { + sender.send(Err(WriteError::Disconnected)); + } + Command::DeleteContact(_jid, sender) => { + sender.send(Err(RosterError::Write(WriteError::Disconnected))); + } + Command::UpdateContact(_jid, _contact_update, sender) => { + sender.send(Err(RosterError::Write(WriteError::Disconnected))); + } + Command::SetStatus(online, sender) => { + let result = self + .db + .upsert_cached_status(online) + .await + .map_err(|e| StatusError::Cache(e.into())); + sender.send(result); + } + // TODO: offline message queue + Command::SendMessage(_jid, _body, sender) => { + sender.send(Err(WriteError::Disconnected)); + } + Command::SendPresence(_jid, _presence, sender) => { + sender.send(Err(WriteError::Disconnected)); + } + } + } + // pub async fn handle_stream_error(self, error) {} + // stanza errors (recoverable) + // pub async fn handle_error(self, error: Error) {} + // when it aborts, must clear iq map no matter what + async fn on_abort(self) { + let mut iqs = self.pending.lock().await; + for (_id, sender) in iqs.drain() { + let _ = sender.send(Err(ReadError::LostConnection)); + } + } + + async fn handle_connection_error(self, error: ConnectionError) { + self.update_sender + .send(UpdateMessage::Error( + ConnectionError::AlreadyConnected.into(), + )) + .await; + } +} + +impl From<Command> for CoreClientCommand<Command> { + fn from(value: Command) -> Self { + CoreClientCommand::Command(value) + } +} + +#[derive(Debug, Clone)] +pub enum UpdateMessage { + Error(Error), + Online(Online, Vec<Contact>), + Offline(Offline), + /// received roster from jabber server (replace full app roster state with this) + /// is this needed? + FullRoster(Vec<Contact>), + /// (only update app roster state, don't replace) + RosterUpdate(Contact), + RosterDelete(JID), + /// presences should be stored with users in the ui, not contacts, as presences can be received from anyone + Presence { + from: JID, + presence: Presence, + }, + // TODO: receipts + // MessageDispatched(Uuid), + Message { + to: JID, + message: Message, + }, + SubscriptionRequest(jid::JID), +} diff --git a/luz/src/presence.rs b/filamento/src/presence.rs index e35761c..e35761c 100644 --- a/luz/src/presence.rs +++ b/filamento/src/presence.rs diff --git a/luz/src/roster.rs b/filamento/src/roster.rs index 43c32f5..43c32f5 100644 --- a/luz/src/roster.rs +++ b/filamento/src/roster.rs diff --git a/luz/src/user.rs b/filamento/src/user.rs index 9914d14..9914d14 100644 --- a/luz/src/user.rs +++ b/filamento/src/user.rs diff --git a/jabber/Cargo.toml b/jabber/Cargo.toml deleted file mode 100644 index fbb776b..0000000 --- a/jabber/Cargo.toml +++ /dev/null @@ -1,42 +0,0 @@ -[package] -name = "jabber" -authors = ["cel <cel@bunny.garden>"] -version = "0.1.0" -edition = "2021" - -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html - -[dependencies] -async-recursion = "1.0.4" -async-trait = "0.1.68" -lazy_static = "1.4.0" -nanoid = "0.4.0" -# TODO: remove unneeded features and dependencies -rsasl = { version = "2.0.1", default_features = false, features = [ - "provider_base64", - "plain", - "config_builder", - "scram-sha-1", -] } -tokio = { version = "1.28", features = ["full"] } -tokio-native-tls = "0.3.1" -tracing = "0.1.40" -trust-dns-resolver = "0.22.0" -try_map = "0.3.1" -stanza = { version = "0.1.0", path = "../stanza" } -peanuts = { version = "0.1.0", path = "../../peanuts" } -jid = { version = "0.1.0", path = "../jid" } -futures = "0.3.31" -take_mut = "0.2.2" -pin-project-lite = "0.2.15" -pin-project = "1.1.7" -thiserror = "2.0.11" - -[dev-dependencies] -test-log = { version = "0.2", features = ["trace"] } -env_logger = "*" -tracing-subscriber = { version = "0.3", default-features = false, features = [ - "env-filter", - "fmt", -] } -stanza = { version = "0.1.0", path = "../stanza", features = ["xep_0199"] } diff --git a/jabber/src/error.rs b/jabber/src/error.rs deleted file mode 100644 index ec60778..0000000 --- a/jabber/src/error.rs +++ /dev/null @@ -1,58 +0,0 @@ -use std::str::Utf8Error; -use std::sync::Arc; - -use jid::ParseError; -use rsasl::mechname::MechanismNameError; -use stanza::client::error::Error as ClientError; -use stanza::sasl::Failure; -use stanza::stream::Error as StreamError; -use thiserror::Error; - -#[derive(Error, Debug, Clone)] -pub enum Error { - #[error("connection")] - Connection, - #[error("utf8 decode: {0}")] - Utf8Decode(#[from] Utf8Error), - #[error("negotiation")] - Negotiation, - #[error("tls required")] - TlsRequired, - #[error("already connected with tls")] - AlreadyTls, - // TODO: specify unsupported feature - #[error("unsupported feature")] - Unsupported, - #[error("jid missing localpart")] - NoLocalpart, - #[error("received unexpected element: {0:?}")] - UnexpectedElement(peanuts::Element), - #[error("xml error: {0}")] - XML(#[from] peanuts::Error), - #[error("sasl error: {0}")] - SASL(#[from] SASLError), - #[error("jid error: {0}")] - JID(#[from] ParseError), - #[error("client stanza error: {0}")] - ClientError(#[from] ClientError), - #[error("stream error: {0}")] - StreamError(#[from] StreamError), - #[error("error missing")] - MissingError, -} - -#[derive(Error, Debug, Clone)] -pub enum SASLError { - #[error("sasl error: {0}")] - SASL(Arc<rsasl::prelude::SASLError>), - #[error("mechanism error: {0}")] - MechanismName(#[from] MechanismNameError), - #[error("authentication failure: {0}")] - Authentication(#[from] Failure), -} - -impl From<rsasl::prelude::SASLError> for SASLError { - fn from(e: rsasl::prelude::SASLError) -> Self { - Self::SASL(Arc::new(e)) - } -} diff --git a/jabber/src/lib.rs b/jabber/src/lib.rs deleted file mode 100644 index 8855ca7..0000000 --- a/jabber/src/lib.rs +++ /dev/null @@ -1,25 +0,0 @@ -#![allow(unused_must_use)] -// #![feature(let_chains)] - -// TODO: logging (dropped errors) -pub mod client; -pub mod connection; -pub mod error; -pub mod jabber_stream; - -pub use connection::Connection; -pub use error::Error; -pub use jabber_stream::JabberStream; -pub use jid::JID; - -pub type Result<T> = std::result::Result<T, Error>; - -pub use client::connect_and_login; - -#[cfg(test)] -mod tests { - // #[tokio::test] - // async fn test_login() { - // crate::login("test@blos.sm/clown", "slayed").await.unwrap(); - // } -} diff --git a/luz/.gitignore b/lampada/.gitignore index 60868fd..60868fd 100644 --- a/luz/.gitignore +++ b/lampada/.gitignore diff --git a/lampada/Cargo.toml b/lampada/Cargo.toml new file mode 100644 index 0000000..856fd7d --- /dev/null +++ b/lampada/Cargo.toml @@ -0,0 +1,17 @@ +[package] +name = "lampada" +version = "0.1.0" +edition = "2021" + +[dependencies] +futures = "0.3.31" +luz = { version = "0.1.0", path = "../luz" } +peanuts = { version = "0.1.0", path = "../../peanuts" } +jid = { version = "0.1.0", path = "../jid", features = ["sqlx"] } +stanza = { version = "0.1.0", path = "../stanza", features = ["xep_0203"] } +tokio = "1.42.0" +tokio-stream = "0.1.17" +tokio-util = "0.7.13" +tracing = "0.1.41" +tracing-subscriber = "0.3.19" +thiserror = "2.0.11" diff --git a/lampada/README.md b/lampada/README.md new file mode 100644 index 0000000..dc5f016 --- /dev/null +++ b/lampada/README.md @@ -0,0 +1,3 @@ +# lampada + +a core xmpp client that graciously manages streams, delegating logic to an implementor of a trait. diff --git a/luz/scratch b/lampada/scratch index 9954aef..e013ded 100644 --- a/luz/scratch +++ b/lampada/scratch @@ -1,3 +1,6 @@ +macaw/céu +canopy/sol + # logic: - db diff --git a/luz/src/connection/mod.rs b/lampada/src/connection/mod.rs index 288de70..1e767b0 100644 --- a/luz/src/connection/mod.rs +++ b/lampada/src/connection/mod.rs @@ -7,8 +7,8 @@ use std::{ time::Duration, }; -use jabber::{connection::Tls, jabber_stream::bound_stream::BoundJabberStream}; use jid::JID; +use luz::{connection::Tls, jabber_stream::bound_stream::BoundJabberStream}; use read::{ReadControl, ReadControlHandle, ReadState}; use stanza::client::Stanza; use tokio::{ @@ -19,9 +19,8 @@ use tracing::info; use write::{WriteControl, WriteControlHandle, WriteHandle, WriteMessage, WriteState}; use crate::{ - db::Db, - error::{ConnectionError, Error, ReadError, WriteError}, - Connected, Logic, LogicState, UpdateMessage, + error::{ConnectionError, WriteError}, + Connected, Logic, }; mod read; @@ -84,7 +83,9 @@ impl<Lgc: Logic + Clone + Send + 'static> Supervisor<Lgc> { match msg { SupervisorCommand::Disconnect => { info!("disconnecting"); - // TODO: do handle_disconnect here + self.logic + .handle_disconnect(self.connected.clone()) + .await; let _ = self.write_control_handle.send(WriteControl::Disconnect).await; let _ = self.read_control_handle.send(ReadControl::Disconnect).await; info!("sent disconnect command"); @@ -108,11 +109,11 @@ impl<Lgc: Logic + Clone + Send + 'static> Supervisor<Lgc> { // send abort to read stream, as already done, consider let (read_state, mut write_state); match state { - // TODO: proper state things for read and write thread ChildState::Write(receiver) => { write_state = receiver; let (send, recv) = oneshot::channel(); let _ = self.read_control_handle.send(ReadControl::Abort(send)).await; + // TODO: need a tokio select, in case the state arrives from somewhere else if let Ok(state) = recv.await { read_state = state; } else { @@ -135,7 +136,7 @@ impl<Lgc: Logic + Clone + Send + 'static> Supervisor<Lgc> { let mut jid = self.connected.jid.clone(); let mut domain = jid.domainpart.clone(); // TODO: make sure connect_and_login does not modify the jid, but instead returns a jid. or something like that - let connection = jabber::connect_and_login(&mut jid, &*self.password, &mut domain).await; + let connection = luz::connect_and_login(&mut jid, &*self.password, &mut domain).await; match connection { Ok(c) => { let (read, write) = c.split(); @@ -182,7 +183,7 @@ impl<Lgc: Logic + Clone + Send + 'static> Supervisor<Lgc> { let mut jid = self.connected.jid.clone(); let mut domain = jid.domainpart.clone(); // TODO: same here - let connection = jabber::connect_and_login(&mut jid, &*self.password, &mut domain).await; + let connection = luz::connect_and_login(&mut jid, &*self.password, &mut domain).await; match connection { Ok(c) => { let (read, write) = c.split(); @@ -226,7 +227,7 @@ impl<Lgc: Logic + Clone + Send + 'static> Supervisor<Lgc> { let mut jid = self.connected.jid.clone(); let mut domain = jid.domainpart.clone(); - let connection = jabber::connect_and_login(&mut jid, &*self.password, &mut domain).await; + let connection = luz::connect_and_login(&mut jid, &*self.password, &mut domain).await; match connection { Ok(c) => { let (read, write) = c.split(); diff --git a/luz/src/connection/read.rs b/lampada/src/connection/read.rs index 4e55bc5..cc69387 100644 --- a/luz/src/connection/read.rs +++ b/lampada/src/connection/read.rs @@ -7,24 +7,15 @@ use std::{ time::Duration, }; -use chrono::{DateTime, Utc}; -use jabber::{connection::Tls, jabber_stream::bound_stream::BoundJabberReader}; +use luz::{connection::Tls, jabber_stream::bound_stream::BoundJabberReader}; use stanza::client::Stanza; use tokio::{ sync::{mpsc, oneshot, Mutex}, task::{JoinHandle, JoinSet}, }; use tracing::info; -use uuid::Uuid; - -use crate::{ - chat::{Body, Message}, - db::Db, - error::{Error, IqError, MessageRecvError, PresenceError, ReadError, RosterError}, - presence::{Offline, Online, Presence, PresenceType, Show}, - roster::Contact, - Connected, Logic, LogicState, UpdateMessage, -}; + +use crate::{Connected, Logic}; use super::{write::WriteHandle, SupervisorCommand, SupervisorSender}; diff --git a/luz/src/connection/write.rs b/lampada/src/connection/write.rs index ff78b81..8f0c34b 100644 --- a/luz/src/connection/write.rs +++ b/lampada/src/connection/write.rs @@ -1,6 +1,6 @@ use std::ops::{Deref, DerefMut}; -use jabber::{connection::Tls, jabber_stream::bound_stream::BoundJabberWriter}; +use luz::{connection::Tls, jabber_stream::bound_stream::BoundJabberWriter}; use stanza::client::Stanza; use tokio::{ sync::{mpsc, oneshot}, diff --git a/lampada/src/error.rs b/lampada/src/error.rs new file mode 100644 index 0000000..cdfb4db --- /dev/null +++ b/lampada/src/error.rs @@ -0,0 +1,82 @@ +use std::sync::Arc; + +use stanza::client::Stanza; +use thiserror::Error; +use tokio::{ + sync::{mpsc::error::SendError, oneshot::error::RecvError}, + time::error::Elapsed, +}; + +#[derive(Debug, Error, Clone)] +pub enum ConnectionError { + #[error("connection failed: {0}")] + ConnectionFailed(#[from] luz::Error), + #[error("already connected")] + AlreadyConnected, + #[error("already disconnected")] + AlreadyDisconnected, + #[error("lost connection")] + LostConnection, + // TODO: Display for Content + #[error("disconnected")] + Disconnected, +} + +#[derive(Debug, Error, Clone)] +pub enum CommandError<T> { + #[error("actor: {0}")] + Actor(ActorError), + #[error("{0}")] + Error(#[from] T), +} + +#[derive(Debug, Error, Clone)] +pub enum WriteError { + #[error("xml: {0}")] + XML(#[from] peanuts::Error), + #[error("lost connection")] + LostConnection, + // TODO: should this be in writeerror or separate? + #[error("actor: {0}")] + Actor(#[from] ActorError), + #[error("disconnected")] + Disconnected, +} + +// TODO: separate peanuts read and write error? +// TODO: which crate +#[derive(Debug, Error, Clone)] +pub enum ReadError { + #[error("xml: {0}")] + XML(#[from] peanuts::Error), + #[error("lost connection")] + LostConnection, +} + +#[derive(Debug, Error, Clone)] +pub enum ActorError { + #[error("receive timed out")] + Timeout, + #[error("could not send message to actor, channel closed")] + Send, + #[error("could not receive message from actor, channel closed")] + Receive, +} + +impl From<Elapsed> for ActorError { + fn from(_e: Elapsed) -> Self { + Self::Timeout + } +} + +impl<T> From<SendError<T>> for ActorError { + fn from(_e: SendError<T>) -> Self { + Self::Send + } +} + +impl From<RecvError> for ActorError { + fn from(_e: RecvError) -> Self { + Self::Receive + } +} diff --git a/lampada/src/lib.rs b/lampada/src/lib.rs new file mode 100644 index 0000000..c61c596 --- /dev/null +++ b/lampada/src/lib.rs @@ -0,0 +1,238 @@ +use std::{ + collections::HashMap, + ops::{Deref, DerefMut}, + str::FromStr, + sync::Arc, + time::Duration, +}; + +pub use connection::write::WriteMessage; +pub use connection::SupervisorSender; +use error::ConnectionError; +use futures::{future::Fuse, FutureExt}; +use luz::JID; +use stanza::client::{ + iq::{self, Iq, IqType}, + Stanza, +}; +use tokio::{ + sync::{mpsc, oneshot, Mutex}, + task::JoinSet, + time::timeout, +}; +use tracing::{debug, info}; + +use crate::connection::write::WriteHandle; +use crate::connection::{SupervisorCommand, SupervisorHandle}; + +mod connection; +pub mod error; + +#[derive(Clone)] +pub struct Connected { + // full jid will stay stable across reconnections + jid: JID, + write_handle: WriteHandle, +} + +impl Connected { + pub fn jid(&self) -> &JID { + &self.jid + } + + pub fn write_handle(&self) -> &WriteHandle { + &self.write_handle + } +} + +/// everything that a particular xmpp client must implement +pub trait Logic { + /// the command message type + type Cmd; + + /// run after binding to the stream (e.g. for a chat client, ) + fn handle_connect(self, connection: Connected) -> impl std::future::Future<Output = ()> + Send; + + /// run before closing the stream (e.g. send unavailable presence in a chat client) + fn handle_disconnect( + self, + connection: Connected, + ) -> impl std::future::Future<Output = ()> + Send; + + /// run to handle an incoming xmpp stanza + fn handle_stanza( + self, + stanza: Stanza, + connection: Connected, + supervisor: SupervisorSender, + ) -> impl std::future::Future<Output = ()> + std::marker::Send; + + /// run to handle a command message when a connection is currently established + fn handle_online( + self, + command: Self::Cmd, + connection: Connected, + ) -> impl std::future::Future<Output = ()> + std::marker::Send; + + /// run to handle a command message when disconnected + fn handle_offline( + self, + command: Self::Cmd, + ) -> impl std::future::Future<Output = ()> + std::marker::Send; + + /// run as cleanup after either an abort or a disconnect (e.g. reply to all pending requests with a disconnected error) + fn on_abort(self) -> impl std::future::Future<Output = ()> + std::marker::Send; + + /// handle connection errors from the core client logic + fn handle_connection_error( + self, + error: ConnectionError, + ) -> impl std::future::Future<Output = ()> + std::marker::Send; + + // async fn handle_stream_error(self, error) {} +} + +/// an actor that implements xmpp core (rfc6120), manages connection/stream status, and delegates any other logic to the generic which implements Logic, allowing different kinds of clients (e.g. chat, social, pubsub) to be built upon the same core +pub struct CoreClient<Lgc: Logic> { + jid: JID, + // TODO: use a dyn passwordprovider trait to avoid storing password in memory + password: Arc<String>, + receiver: mpsc::Receiver<CoreClientCommand<Lgc::Cmd>>, + connected: Option<(Connected, SupervisorHandle)>, + // TODO: will need to have an auto reconnect state as well (e.g. in case server shut down, to try and reconnect later) + // connected_intention: bool, + /// if connection was shut down due to e.g. server shutdown, supervisor must be able to mark client as disconnected + connection_supervisor_shutdown: Fuse<oneshot::Receiver<()>>, + logic: Lgc, + // config: LampConfig, + // TODO: will grow forever at this point, maybe not required as tasks will naturally shut down anyway? + tasks: JoinSet<()>, +} + +impl<Lgc: Logic + Clone + Send + 'static> CoreClient<Lgc> { + /// create a new actor + pub fn new( + jid: JID, + password: String, + receiver: mpsc::Receiver<CoreClientCommand<Lgc::Cmd>>, + connected: Option<(Connected, SupervisorHandle)>, + connection_supervisor_shutdown: Fuse<oneshot::Receiver<()>>, + logic: Lgc, + ) -> Self { + Self { + jid, + password: Arc::new(password), + connected, + receiver, + connection_supervisor_shutdown, + logic, + tasks: JoinSet::new(), + } + } + + /// run the actor + pub async fn run(mut self) { + loop { + let msg = tokio::select! { + // this is okay, as when created the supervisor (and connection) doesn't exist, but a bit messy + // THIS IS NOT OKAY LOLLLL - apparently fusing is the best option??? + _ = &mut self.connection_supervisor_shutdown => { + self.connected = None; + continue; + } + Some(msg) = self.receiver.recv() => { + msg + }, + else => break, + }; + match msg { + CoreClientCommand::Connect => { + match self.connected { + Some(_) => { + self.logic + .clone() + .handle_connection_error(ConnectionError::AlreadyConnected) + .await; + } + None => { + let mut jid = self.jid.clone(); + let mut domain = jid.domainpart.clone(); + // TODO: check what happens upon reconnection with same resource (this is probably what one wants to do and why jid should be mutated from a bare jid to one with a resource) + let streams_result = + luz::connect_and_login(&mut jid, &*self.password, &mut domain) + .await; + match streams_result { + Ok(s) => { + debug!("ok stream result"); + let (shutdown_send, shutdown_recv) = oneshot::channel::<()>(); + let (writer, supervisor) = SupervisorHandle::new( + s, + shutdown_send, + jid.clone(), + self.password.clone(), + self.logic.clone(), + ); + + let shutdown_recv = shutdown_recv.fuse(); + self.connection_supervisor_shutdown = shutdown_recv; + + let connected = Connected { + jid, + write_handle: writer, + }; + + self.logic.clone().handle_connect(connected.clone()).await; + + self.connected = Some((connected, supervisor)); + } + Err(e) => { + tracing::error!("error: {}", e); + self.logic + .clone() + .handle_connection_error(ConnectionError::ConnectionFailed( + e.into(), + )) + .await; + } + } + } + }; + } + CoreClientCommand::Disconnect => match self.connected { + None => { + self.logic + .clone() + .handle_connection_error(ConnectionError::AlreadyDisconnected) + .await; + } + ref mut c => { + if let Some((connected, supervisor_handle)) = c.take() { + let _ = supervisor_handle.send(SupervisorCommand::Disconnect).await; + } else { + unreachable!() + }; + } + }, + CoreClientCommand::Command(command) => { + match self.connected.as_ref() { + Some((w, s)) => self + .tasks + .spawn(self.logic.clone().handle_online(command, w.clone())), + None => self.tasks.spawn(self.logic.clone().handle_offline(command)), + }; + } + } + } + } +} + +// TODO: generate methods for each with a macro +pub enum CoreClientCommand<C> { + // TODO: login invisible xep-0186 + /// connect to XMPP chat server. gets roster and publishes initial presence. + Connect, + /// disconnect from XMPP chat server, sending unavailable presence then closing stream. + Disconnect, + /// TODO: generics + Command(C), +} diff --git a/luz/src/main.rs b/lampada/src/main.rs index 5aeef14..7b7469d 100644 --- a/luz/src/main.rs +++ b/lampada/src/main.rs @@ -1,7 +1,7 @@ use std::{path::Path, str::FromStr, time::Duration}; use jid::JID; -use luz::{db::Db, LuzHandle, LuzMessage}; +use lampada::{db::Db, CoreClientCommand, LuzHandle}; use sqlx::SqlitePool; use tokio::{ io::{AsyncReadExt, AsyncWriteExt}, @@ -24,11 +24,11 @@ async fn main() { } }); - luz.send(LuzMessage::Connect).await.unwrap(); + luz.send(CoreClientCommand::Connect).await.unwrap(); let (send, recv) = oneshot::channel(); tokio::time::sleep(Duration::from_secs(5)).await; info!("sending message"); - luz.send(LuzMessage::SendMessage( + luz.send(CoreClientCommand::SendMessage( JID::from_str("cel@blos.sm").unwrap(), luz::chat::Body { body: "hallo!!!".to_string(), diff --git a/jabber/Cargo.lock b/luz/Cargo.lock index d45d7c1..d45d7c1 100644 --- a/jabber/Cargo.lock +++ b/luz/Cargo.lock diff --git a/luz/Cargo.toml b/luz/Cargo.toml index 08a0d6c..c1c1511 100644 --- a/luz/Cargo.toml +++ b/luz/Cargo.toml @@ -1,20 +1,42 @@ [package] name = "luz" +authors = ["cel <cel@bunny.garden>"] version = "0.1.0" edition = "2021" +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + [dependencies] -futures = "0.3.31" -jabber = { version = "0.1.0", path = "../jabber" } +async-recursion = "1.0.4" +async-trait = "0.1.68" +lazy_static = "1.4.0" +nanoid = "0.4.0" +# TODO: remove unneeded features and dependencies +rsasl = { version = "2.0.1", default_features = false, features = [ + "provider_base64", + "plain", + "config_builder", + "scram-sha-1", +] } +tokio = { version = "1.28", features = ["full"] } +tokio-native-tls = "0.3.1" +tracing = "0.1.40" +trust-dns-resolver = "0.22.0" +try_map = "0.3.1" +stanza = { version = "0.1.0", path = "../stanza" } peanuts = { version = "0.1.0", path = "../../peanuts" } -jid = { version = "0.1.0", path = "../jid", features = ["sqlx"] } -sqlx = { version = "0.8.3", features = ["sqlite", "runtime-tokio", "uuid", "chrono"] } -stanza = { version = "0.1.0", path = "../stanza", features = ["xep_0203"] } -tokio = "1.42.0" -tokio-stream = "0.1.17" -tokio-util = "0.7.13" -tracing = "0.1.41" -tracing-subscriber = "0.3.19" -uuid = { version = "1.13.1", features = ["v4"] } +jid = { version = "0.1.0", path = "../jid" } +futures = "0.3.31" +take_mut = "0.2.2" +pin-project-lite = "0.2.15" +pin-project = "1.1.7" thiserror = "2.0.11" -chrono = "0.4.40" + +[dev-dependencies] +test-log = { version = "0.2", features = ["trace"] } +env_logger = "*" +tracing-subscriber = { version = "0.3", default-features = false, features = [ + "env-filter", + "fmt", +] } +stanza = { version = "0.1.0", path = "../stanza", features = ["xep_0199"] } diff --git a/jabber/src/client.rs b/luz/src/client.rs index de2be08..de2be08 100644 --- a/jabber/src/client.rs +++ b/luz/src/client.rs diff --git a/jabber/src/connection.rs b/luz/src/connection.rs index b185eca..b185eca 100644 --- a/jabber/src/connection.rs +++ b/luz/src/connection.rs diff --git a/luz/src/error.rs b/luz/src/error.rs index 46f45a8..ec60778 100644 --- a/luz/src/error.rs +++ b/luz/src/error.rs @@ -1,214 +1,58 @@ +use std::str::Utf8Error; use std::sync::Arc; -use stanza::client::Stanza; +use jid::ParseError; +use rsasl::mechname::MechanismNameError; +use stanza::client::error::Error as ClientError; +use stanza::sasl::Failure; +use stanza::stream::Error as StreamError; use thiserror::Error; -use tokio::{ - sync::{mpsc::error::SendError, oneshot::error::RecvError}, - time::error::Elapsed, -}; -#[derive(Debug, Error, Clone)] -pub enum ConnectionError { - #[error("connection failed: {0}")] - ConnectionFailed(#[from] jabber::Error), - #[error("already connected")] - AlreadyConnected, - #[error("already disconnected")] - AlreadyDisconnected, - #[error("lost connection")] - LostConnection, - // TODO: Display for Content - #[error("disconnected")] - Disconnected, -} - -// for the client logic impl -#[derive(Debug, Error, Clone)] +#[derive(Error, Debug, Clone)] pub enum Error { - #[error("core error: {0}")] - Connection(#[from] ConnectionError), - #[error("received unrecognized/unsupported content: {0:?}")] - UnrecognizedContent(peanuts::element::Content), - #[error("iq receive error: {0}")] - Iq(IqError), - // TODO: change to Connecting(ConnectingError) - #[error("connecting: {0}")] - Connecting(#[from] ConnectionJobError), - #[error("presence: {0}")] - Presence(#[from] PresenceError), - #[error("set status: {0}")] - SetStatus(#[from] StatusError), - // TODO: have different ones for get/update/set - #[error("roster: {0}")] - Roster(RosterError), - #[error("stream error: {0}")] - Stream(#[from] stanza::stream::Error), - #[error("message send error: {0}")] - MessageSend(MessageSendError), - #[error("message receive error: {0}")] - MessageRecv(MessageRecvError), -} - -#[derive(Debug, Error, Clone)] -pub enum CommandError<T> { - #[error("actor: {0}")] - Actor(ActorError), - #[error("{0}")] - Error(#[from] T), -} - -#[derive(Debug, Error, Clone)] -pub enum MessageSendError { - #[error("could not add to message history: {0}")] - MessageHistory(#[from] DatabaseError), -} - -#[derive(Debug, Error, Clone)] -pub enum PresenceError { - #[error("unsupported")] + #[error("connection")] + Connection, + #[error("utf8 decode: {0}")] + Utf8Decode(#[from] Utf8Error), + #[error("negotiation")] + Negotiation, + #[error("tls required")] + TlsRequired, + #[error("already connected with tls")] + AlreadyTls, + // TODO: specify unsupported feature + #[error("unsupported feature")] Unsupported, - #[error("missing from")] - MissingFrom, - #[error("stanza error: {0}")] - StanzaError(#[from] stanza::client::error::Error), -} - -#[derive(Debug, Error, Clone)] -// TODO: should probably have all iq query related errors here, including read, write, stanza error, etc. -pub enum IqError { - #[error("no iq with id matching `{0}`")] - NoMatchingId(String), -} - -#[derive(Debug, Error, Clone)] -pub enum MessageRecvError { - #[error("could not add to message history: {0}")] - MessageHistory(#[from] DatabaseError), - #[error("missing from")] - MissingFrom, -} - -#[derive(Debug, Clone, Error)] -pub enum ConnectionJobError { - #[error("connection failed: {0}")] - ConnectionFailed(#[from] jabber::Error), - #[error("failed roster retreival: {0}")] - RosterRetreival(#[from] RosterError), - #[error("failed to send available presence: {0}")] - SendPresence(#[from] WriteError), - #[error("cached status: {0}")] - StatusCacheError(#[from] DatabaseError), -} - -#[derive(Debug, Error, Clone)] -pub enum RosterError { - #[error("cache: {0}")] - Cache(#[from] DatabaseError), - #[error("stream write: {0}")] - Write(#[from] WriteError), - // TODO: display for stanza, to show as xml, same for read error types. - #[error("unexpected reply: {0:?}")] - UnexpectedStanza(Stanza), - #[error("stream read: {0}")] - Read(#[from] ReadError), - #[error("stanza error: {0}")] - StanzaError(#[from] stanza::client::error::Error), -} - -#[derive(Debug, Error, Clone)] -#[error("database error: {0}")] -pub struct DatabaseError(Arc<sqlx::Error>); - -#[derive(Debug, Error, Clone)] -pub enum DatabaseOpenError { - #[error("error: {0}")] - Error(Arc<sqlx::Error>), - #[error("migration: {0}")] - Migration(Arc<sqlx::migrate::MigrateError>), - #[error("io: {0}")] - Io(Arc<tokio::io::Error>), - #[error("invalid path")] - InvalidPath, -} - -impl From<sqlx::Error> for DatabaseError { - fn from(e: sqlx::Error) -> Self { - Self(Arc::new(e)) - } -} - -impl From<sqlx::Error> for DatabaseOpenError { - fn from(e: sqlx::Error) -> Self { - Self::Error(Arc::new(e)) - } -} - -impl From<sqlx::migrate::MigrateError> for DatabaseOpenError { - fn from(e: sqlx::migrate::MigrateError) -> Self { - Self::Migration(Arc::new(e)) - } -} - -impl From<tokio::io::Error> for DatabaseOpenError { - fn from(e: tokio::io::Error) -> Self { - Self::Io(Arc::new(e)) - } -} - -#[derive(Debug, Error, Clone)] -pub enum StatusError { - #[error("cache: {0}")] - Cache(#[from] DatabaseError), - #[error("stream write: {0}")] - Write(#[from] WriteError), -} - -#[derive(Debug, Error, Clone)] -pub enum WriteError { - #[error("xml: {0}")] - XML(#[from] peanuts::Error), - #[error("lost connection")] - LostConnection, - // TODO: should this be in writeerror or separate? - #[error("actor: {0}")] - Actor(#[from] ActorError), - #[error("disconnected")] - Disconnected, -} - -// TODO: separate peanuts read and write error? -#[derive(Debug, Error, Clone)] -pub enum ReadError { - #[error("xml: {0}")] + #[error("jid missing localpart")] + NoLocalpart, + #[error("received unexpected element: {0:?}")] + UnexpectedElement(peanuts::Element), + #[error("xml error: {0}")] XML(#[from] peanuts::Error), - #[error("lost connection")] - LostConnection, -} - -#[derive(Debug, Error, Clone)] -pub enum ActorError { - #[error("receive timed out")] - Timeout, - #[error("could not send message to actor, channel closed")] - Send, - #[error("could not receive message from actor, channel closed")] - Receive, -} - -impl From<Elapsed> for ActorError { - fn from(_e: Elapsed) -> Self { - Self::Timeout - } + #[error("sasl error: {0}")] + SASL(#[from] SASLError), + #[error("jid error: {0}")] + JID(#[from] ParseError), + #[error("client stanza error: {0}")] + ClientError(#[from] ClientError), + #[error("stream error: {0}")] + StreamError(#[from] StreamError), + #[error("error missing")] + MissingError, } -impl<T> From<SendError<T>> for ActorError { - fn from(_e: SendError<T>) -> Self { - Self::Send - } +#[derive(Error, Debug, Clone)] +pub enum SASLError { + #[error("sasl error: {0}")] + SASL(Arc<rsasl::prelude::SASLError>), + #[error("mechanism error: {0}")] + MechanismName(#[from] MechanismNameError), + #[error("authentication failure: {0}")] + Authentication(#[from] Failure), } -impl From<RecvError> for ActorError { - fn from(_e: RecvError) -> Self { - Self::Receive +impl From<rsasl::prelude::SASLError> for SASLError { + fn from(e: rsasl::prelude::SASLError) -> Self { + Self::SASL(Arc::new(e)) } } diff --git a/jabber/src/jabber_stream.rs b/luz/src/jabber_stream.rs index 302350d..302350d 100644 --- a/jabber/src/jabber_stream.rs +++ b/luz/src/jabber_stream.rs diff --git a/jabber/src/jabber_stream/bound_stream.rs b/luz/src/jabber_stream/bound_stream.rs index 25b79ff..25b79ff 100644 --- a/jabber/src/jabber_stream/bound_stream.rs +++ b/luz/src/jabber_stream/bound_stream.rs diff --git a/luz/src/lib.rs b/luz/src/lib.rs index b9c482c..8855ca7 100644 --- a/luz/src/lib.rs +++ b/luz/src/lib.rs @@ -1,1774 +1,25 @@ -use std::{ - collections::HashMap, - ops::{Deref, DerefMut}, - str::FromStr, - sync::Arc, - time::Duration, -}; +#![allow(unused_must_use)] +// #![feature(let_chains)] -use chat::{Body, Chat, Message}; -use chrono::Utc; -use connection::{write::WriteMessage, SupervisorSender}; -use db::Db; -use error::{ - ActorError, CommandError, ConnectionError, ConnectionJobError, DatabaseError, IqError, - MessageRecvError, PresenceError, ReadError, RosterError, StatusError, WriteError, -}; -use futures::{future::Fuse, FutureExt}; -use jabber::JID; -use presence::{Offline, Online, Presence, PresenceType, Show}; -use roster::{Contact, ContactUpdate}; -use sqlx::SqlitePool; -use stanza::client::{ - iq::{self, Iq, IqType}, - Stanza, -}; -use tokio::{ - sync::{mpsc, oneshot, Mutex}, - task::JoinSet, - time::timeout, -}; -use tracing::{debug, info}; -use user::User; -use uuid::Uuid; - -use crate::connection::write::WriteHandle; -use crate::connection::{SupervisorCommand, SupervisorHandle}; -use crate::error::Error; - -pub mod chat; -mod connection; -pub mod db; +// TODO: logging (dropped errors) +pub mod client; +pub mod connection; pub mod error; -pub mod presence; -pub mod roster; -pub mod user; - -pub enum Command { - /// get the roster. if offline, retreive cached version from database. should be stored in application memory - GetRoster(oneshot::Sender<Result<Vec<Contact>, RosterError>>), - /// get all chats. chat will include 10 messages in their message Vec (enough for chat previews) - // TODO: paging and filtering - GetChats(oneshot::Sender<Result<Vec<Chat>, DatabaseError>>), - // TODO: paging and filtering - GetChatsOrdered(oneshot::Sender<Result<Vec<Chat>, DatabaseError>>), - // TODO: paging and filtering - GetChatsOrderedWithLatestMessages(oneshot::Sender<Result<Vec<(Chat, Message)>, DatabaseError>>), - /// get a specific chat by jid - GetChat(JID, oneshot::Sender<Result<Chat, DatabaseError>>), - /// get message history for chat (does appropriate mam things) - // TODO: paging and filtering - GetMessages(JID, oneshot::Sender<Result<Vec<Message>, DatabaseError>>), - /// delete a chat from your chat history, along with all the corresponding messages - DeleteChat(JID, oneshot::Sender<Result<(), DatabaseError>>), - /// delete a message from your chat history - DeleteMessage(Uuid, oneshot::Sender<Result<(), DatabaseError>>), - /// get a user from your users database - GetUser(JID, oneshot::Sender<Result<User, DatabaseError>>), - /// add a contact to your roster, with a status of none, no subscriptions. - AddContact(JID, oneshot::Sender<Result<(), RosterError>>), - /// send a friend request i.e. a subscription request with a subscription pre-approval. if not already added to roster server adds to roster. - BuddyRequest(JID, oneshot::Sender<Result<(), WriteError>>), - /// send a subscription request, without pre-approval. if not already added to roster server adds to roster. - SubscriptionRequest(JID, oneshot::Sender<Result<(), WriteError>>), - /// accept a friend request by accepting a pending subscription and sending a subscription request back. if not already added to roster adds to roster. - AcceptBuddyRequest(JID, oneshot::Sender<Result<(), WriteError>>), - /// accept a pending subscription and doesn't send a subscription request back. if not already added to roster adds to roster. - AcceptSubscriptionRequest(JID, oneshot::Sender<Result<(), WriteError>>), - /// unsubscribe to a contact, but don't remove their subscription. - UnsubscribeFromContact(JID, oneshot::Sender<Result<(), WriteError>>), - /// stop a contact from being subscribed, but stay subscribed to the contact. - UnsubscribeContact(JID, oneshot::Sender<Result<(), WriteError>>), - /// remove subscriptions to and from contact, but keep in roster. - UnfriendContact(JID, oneshot::Sender<Result<(), WriteError>>), - /// remove a contact from the contact list. will remove subscriptions if not already done then delete contact from roster. - DeleteContact(JID, oneshot::Sender<Result<(), RosterError>>), - /// update contact. contact details will be overwritten with the contents of the contactupdate struct. - UpdateContact(JID, ContactUpdate, oneshot::Sender<Result<(), RosterError>>), - /// set online status. if disconnected, will be cached so when client connects, will be sent as the initial presence. - SetStatus(Online, oneshot::Sender<Result<(), StatusError>>), - /// send presence stanza - // TODO: cache presence stanza - SendPresence( - Option<JID>, - PresenceType, - oneshot::Sender<Result<(), WriteError>>, - ), - /// send a directed presence (usually to a non-contact). - // TODO: should probably make it so people can add non-contact auto presence sharing in the client (most likely through setting an internal setting) - /// send a message to a jid (any kind of jid that can receive a message, e.g. a user or a - /// chatroom). if disconnected, will be cached so when client connects, message will be sent. - SendMessage(JID, Body, oneshot::Sender<Result<(), WriteError>>), -} - -#[derive(Debug)] -pub struct Client { - sender: mpsc::Sender<LuzMessage<Command>>, - timeout: Duration, -} - -impl Clone for Client { - fn clone(&self) -> Self { - Self { - sender: self.sender.clone(), - timeout: self.timeout, - } - } -} - -impl Deref for Client { - type Target = mpsc::Sender<LuzMessage<Command>>; - - fn deref(&self) -> &Self::Target { - &self.sender - } -} - -impl DerefMut for Client { - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.sender - } -} - -impl Client { - pub async fn connect(&self) -> Result<(), ActorError> { - self.send(LuzMessage::Connect).await?; - Ok(()) - } - - pub async fn disconnect(&self, offline: Offline) -> Result<(), ActorError> { - self.send(LuzMessage::Disconnect).await?; - Ok(()) - } - - pub fn new(jid: JID, password: String, db: Db) -> (Self, mpsc::Receiver<UpdateMessage>) { - let (command_sender, command_receiver) = mpsc::channel(20); - let (update_send, update_recv) = mpsc::channel(20); - - // might be bad, first supervisor shutdown notification oneshot is never used (disgusting) - let (_sup_send, sup_recv) = oneshot::channel(); - let sup_recv = sup_recv.fuse(); - - let logic = LogicState { - db, - pending: Arc::new(Mutex::new(HashMap::new())), - update_sender: update_send, - }; - - let actor: Luz<LogicState> = - Luz::new(jid, password, command_receiver, None, sup_recv, logic); - tokio::spawn(async move { actor.run().await }); - - ( - Self { - sender: command_sender, - // TODO: configure timeout - timeout: Duration::from_secs(10), - }, - update_recv, - ) - } - - pub async fn get_roster(&self) -> Result<Vec<Contact>, CommandError<RosterError>> { - let (send, recv) = oneshot::channel(); - self.send(LuzMessage::Command(Command::GetRoster(send))) - .await - .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?; - let roster = timeout(self.timeout, recv) - .await - .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))? - .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??; - Ok(roster) - } - - pub async fn get_chats(&self) -> Result<Vec<Chat>, CommandError<DatabaseError>> { - let (send, recv) = oneshot::channel(); - self.send(LuzMessage::Command(Command::GetChats(send))) - .await - .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?; - let chats = timeout(self.timeout, recv) - .await - .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))? - .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??; - Ok(chats) - } - - pub async fn get_chats_ordered(&self) -> Result<Vec<Chat>, CommandError<DatabaseError>> { - let (send, recv) = oneshot::channel(); - self.send(LuzMessage::Command(Command::GetChatsOrdered(send))) - .await - .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?; - let chats = timeout(self.timeout, recv) - .await - .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))? - .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??; - Ok(chats) - } - - pub async fn get_chats_ordered_with_latest_messages( - &self, - ) -> Result<Vec<(Chat, Message)>, CommandError<DatabaseError>> { - let (send, recv) = oneshot::channel(); - self.send(LuzMessage::Command( - Command::GetChatsOrderedWithLatestMessages(send), - )) - .await - .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?; - let chats = timeout(self.timeout, recv) - .await - .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))? - .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??; - Ok(chats) - } - - pub async fn get_chat(&self, jid: JID) -> Result<Chat, CommandError<DatabaseError>> { - let (send, recv) = oneshot::channel(); - self.send(LuzMessage::Command(Command::GetChat(jid, send))) - .await - .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?; - let chat = timeout(self.timeout, recv) - .await - .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))? - .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??; - Ok(chat) - } - - pub async fn get_messages( - &self, - jid: JID, - ) -> Result<Vec<Message>, CommandError<DatabaseError>> { - let (send, recv) = oneshot::channel(); - self.send(LuzMessage::Command(Command::GetMessages(jid, send))) - .await - .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?; - let messages = timeout(self.timeout, recv) - .await - .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))? - .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??; - Ok(messages) - } - - pub async fn delete_chat(&self, jid: JID) -> Result<(), CommandError<DatabaseError>> { - let (send, recv) = oneshot::channel(); - self.send(LuzMessage::Command(Command::DeleteChat(jid, send))) - .await - .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?; - let result = timeout(self.timeout, recv) - .await - .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))? - .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??; - Ok(result) - } - - pub async fn delete_message(&self, id: Uuid) -> Result<(), CommandError<DatabaseError>> { - let (send, recv) = oneshot::channel(); - self.send(LuzMessage::Command(Command::DeleteMessage(id, send))) - .await - .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?; - let result = timeout(self.timeout, recv) - .await - .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))? - .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??; - Ok(result) - } - - pub async fn get_user(&self, jid: JID) -> Result<User, CommandError<DatabaseError>> { - let (send, recv) = oneshot::channel(); - self.send(LuzMessage::Command(Command::GetUser(jid, send))) - .await - .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?; - let result = timeout(self.timeout, recv) - .await - .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))? - .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??; - Ok(result) - } - - pub async fn add_contact(&self, jid: JID) -> Result<(), CommandError<RosterError>> { - let (send, recv) = oneshot::channel(); - self.send(LuzMessage::Command(Command::AddContact(jid, send))) - .await - .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?; - let result = timeout(self.timeout, recv) - .await - .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))? - .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??; - Ok(result) - } - - pub async fn buddy_request(&self, jid: JID) -> Result<(), CommandError<WriteError>> { - let (send, recv) = oneshot::channel(); - self.send(LuzMessage::Command(Command::BuddyRequest(jid, send))) - .await - .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?; - let result = timeout(self.timeout, recv) - .await - .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))? - .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??; - Ok(result) - } - - pub async fn subscription_request(&self, jid: JID) -> Result<(), CommandError<WriteError>> { - let (send, recv) = oneshot::channel(); - self.send(LuzMessage::Command(Command::SubscriptionRequest(jid, send))) - .await - .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?; - let result = timeout(self.timeout, recv) - .await - .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))? - .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??; - Ok(result) - } - - pub async fn accept_buddy_request(&self, jid: JID) -> Result<(), CommandError<WriteError>> { - let (send, recv) = oneshot::channel(); - self.send(LuzMessage::Command(Command::AcceptBuddyRequest(jid, send))) - .await - .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?; - let result = timeout(self.timeout, recv) - .await - .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))? - .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??; - Ok(result) - } +pub mod jabber_stream; - pub async fn accept_subscription_request( - &self, - jid: JID, - ) -> Result<(), CommandError<WriteError>> { - let (send, recv) = oneshot::channel(); - self.send(LuzMessage::Command(Command::AcceptSubscriptionRequest( - jid, send, - ))) - .await - .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?; - let result = timeout(self.timeout, recv) - .await - .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))? - .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??; - Ok(result) - } +pub use connection::Connection; +pub use error::Error; +pub use jabber_stream::JabberStream; +pub use jid::JID; - pub async fn unsubscribe_from_contact(&self, jid: JID) -> Result<(), CommandError<WriteError>> { - let (send, recv) = oneshot::channel(); - self.send(LuzMessage::Command(Command::UnsubscribeFromContact( - jid, send, - ))) - .await - .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?; - let result = timeout(self.timeout, recv) - .await - .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))? - .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??; - Ok(result) - } +pub type Result<T> = std::result::Result<T, Error>; - pub async fn unsubscribe_contact(&self, jid: JID) -> Result<(), CommandError<WriteError>> { - let (send, recv) = oneshot::channel(); - self.send(LuzMessage::Command(Command::UnsubscribeContact(jid, send))) - .await - .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?; - let result = timeout(self.timeout, recv) - .await - .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))? - .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??; - Ok(result) - } - - pub async fn unfriend_contact(&self, jid: JID) -> Result<(), CommandError<WriteError>> { - let (send, recv) = oneshot::channel(); - self.send(LuzMessage::Command(Command::UnfriendContact(jid, send))) - .await - .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?; - let result = timeout(self.timeout, recv) - .await - .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))? - .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??; - Ok(result) - } - - pub async fn delete_contact(&self, jid: JID) -> Result<(), CommandError<RosterError>> { - let (send, recv) = oneshot::channel(); - self.send(LuzMessage::Command(Command::DeleteContact(jid, send))) - .await - .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?; - let result = timeout(self.timeout, recv) - .await - .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))? - .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??; - Ok(result) - } - - pub async fn update_contact( - &self, - jid: JID, - update: ContactUpdate, - ) -> Result<(), CommandError<RosterError>> { - let (send, recv) = oneshot::channel(); - self.send(LuzMessage::Command(Command::UpdateContact( - jid, update, send, - ))) - .await - .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?; - let result = timeout(self.timeout, recv) - .await - .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))? - .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??; - Ok(result) - } - - pub async fn set_status(&self, online: Online) -> Result<(), CommandError<StatusError>> { - let (send, recv) = oneshot::channel(); - self.send(LuzMessage::Command(Command::SetStatus(online, send))) - .await - .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?; - let result = timeout(self.timeout, recv) - .await - .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))? - .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??; - Ok(result) - } - - pub async fn send_message(&self, jid: JID, body: Body) -> Result<(), CommandError<WriteError>> { - let (send, recv) = oneshot::channel(); - self.send(LuzMessage::Command(Command::SendMessage(jid, body, send))) - .await - .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?; - let result = timeout(self.timeout, recv) - .await - .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))? - .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??; - Ok(result) - } -} - -#[derive(Clone)] -pub struct LogicState { - db: Db, - pending: Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, ReadError>>>>>, - update_sender: mpsc::Sender<UpdateMessage>, -} - -impl Logic for LogicState { - type Cmd = Command; - - async fn handle_connect(self, connection: Connected) { - let (send, recv) = oneshot::channel(); - debug!("getting roster"); - self.clone() - .handle_online(Command::GetRoster(send), connection.clone()) - .await; - debug!("sent roster req"); - let roster = recv.await; - debug!("got roster"); - match roster { - Ok(r) => match r { - Ok(roster) => { - let online = self.db.read_cached_status().await; - let online = match online { - Ok(online) => online, - Err(e) => { - let _ = self - .update_sender - .send(UpdateMessage::Error(Error::Connecting( - ConnectionJobError::StatusCacheError(e.into()), - ))) - .await; - Online::default() - } - }; - let (send, recv) = oneshot::channel(); - self.clone() - .handle_online( - Command::SendPresence(None, PresenceType::Online(online.clone()), send), - connection, - ) - .await; - let set_status = recv.await; - match set_status { - Ok(s) => match s { - Ok(()) => { - let _ = self - .update_sender - .send(UpdateMessage::Online(online, roster)) - .await; - } - Err(e) => { - let _ = self - .update_sender - .send(UpdateMessage::Error(Error::Connecting(e.into()))) - .await; - } - }, - Err(e) => { - let _ = self - .update_sender - .send(UpdateMessage::Error(Error::Connecting( - ConnectionJobError::SendPresence(WriteError::Actor(e.into())), - ))) - .await; - } - } - } - Err(e) => { - let _ = self - .update_sender - .send(UpdateMessage::Error(Error::Connecting(e.into()))) - .await; - } - }, - Err(e) => { - let _ = self - .update_sender - .send(UpdateMessage::Error(Error::Connecting( - ConnectionJobError::RosterRetreival(RosterError::Write(WriteError::Actor( - e.into(), - ))), - ))) - .await; - } - } - } - - async fn handle_disconnect(self, connection: Connected) { - // TODO: be able to set offline status message - let offline_presence: stanza::client::presence::Presence = - Offline::default().into_stanza(None); - let stanza = Stanza::Presence(offline_presence); - // TODO: timeout and error check - connection.write_handle.write(stanza).await; - let _ = self - .update_sender - .send(UpdateMessage::Offline(Offline::default())) - .await; - } - - async fn handle_stanza( - self, - stanza: Stanza, - connection: Connected, - supervisor: SupervisorSender, - ) { - match stanza { - Stanza::Message(stanza_message) => { - if let Some(mut from) = stanza_message.from { - // TODO: don't ignore delay from. xep says SHOULD send error if incorrect. - let timestamp = stanza_message - .delay - .map(|delay| delay.stamp) - .unwrap_or_else(|| Utc::now()); - // TODO: group chat messages - let mut message = Message { - id: stanza_message - .id - // TODO: proper id storage - .map(|id| Uuid::from_str(&id).unwrap_or_else(|_| Uuid::new_v4())) - .unwrap_or_else(|| Uuid::new_v4()), - from: from.clone(), - timestamp, - body: Body { - // TODO: should this be an option? - body: stanza_message - .body - .map(|body| body.body) - .unwrap_or_default() - .unwrap_or_default(), - }, - }; - // TODO: can this be more efficient? - let result = self - .db - .create_message_with_user_resource_and_chat(message.clone(), from.clone()) - .await; - if let Err(e) = result { - tracing::error!("messagecreate"); - let _ = self - .update_sender - .send(UpdateMessage::Error(Error::MessageRecv( - MessageRecvError::MessageHistory(e.into()), - ))) - .await; - } - message.from = message.from.as_bare(); - from = from.as_bare(); - let _ = self - .update_sender - .send(UpdateMessage::Message { to: from, message }) - .await; - } else { - let _ = self - .update_sender - .send(UpdateMessage::Error(Error::MessageRecv( - MessageRecvError::MissingFrom, - ))) - .await; - } - } - Stanza::Presence(presence) => { - if let Some(from) = presence.from { - match presence.r#type { - Some(r#type) => match r#type { - // error processing a presence from somebody - stanza::client::presence::PresenceType::Error => { - // TODO: is there any other information that should go with the error? also MUST have an error, otherwise it's a different error. maybe it shoulnd't be an option. - let _ = self - .update_sender - .send(UpdateMessage::Error(Error::Presence( - // TODO: ughhhhhhhhhhhhh these stanza errors should probably just have an option, and custom display - PresenceError::StanzaError( - presence - .errors - .first() - .cloned() - .expect("error MUST have error"), - ), - ))) - .await; - } - // should not happen (error to server) - stanza::client::presence::PresenceType::Probe => { - // TODO: should probably write an error and restart stream - let _ = self - .update_sender - .send(UpdateMessage::Error(Error::Presence( - PresenceError::Unsupported, - ))) - .await; - } - stanza::client::presence::PresenceType::Subscribe => { - // may get a subscription request from somebody who is not a contact!!! therefore should be its own kind of event - let _ = self - .update_sender - .send(UpdateMessage::SubscriptionRequest(from)) - .await; - } - stanza::client::presence::PresenceType::Unavailable => { - let offline = Offline { - status: presence.status.map(|status| status.status.0), - }; - let timestamp = presence - .delay - .map(|delay| delay.stamp) - .unwrap_or_else(|| Utc::now()); - let _ = self - .update_sender - .send(UpdateMessage::Presence { - from, - presence: Presence { - timestamp, - presence: PresenceType::Offline(offline), - }, - }) - .await; - } - // for now, do nothing, as these are simply informational. will receive roster push from the server regarding the changes to do with them. - stanza::client::presence::PresenceType::Subscribed => {} - stanza::client::presence::PresenceType::Unsubscribe => {} - stanza::client::presence::PresenceType::Unsubscribed => {} - }, - None => { - let online = Online { - show: presence.show.map(|show| match show { - stanza::client::presence::Show::Away => Show::Away, - stanza::client::presence::Show::Chat => Show::Chat, - stanza::client::presence::Show::Dnd => Show::DoNotDisturb, - stanza::client::presence::Show::Xa => Show::ExtendedAway, - }), - status: presence.status.map(|status| status.status.0), - priority: presence.priority.map(|priority| priority.0), - }; - let timestamp = presence - .delay - .map(|delay| delay.stamp) - .unwrap_or_else(|| Utc::now()); - let _ = self - .update_sender - .send(UpdateMessage::Presence { - from, - presence: Presence { - timestamp, - presence: PresenceType::Online(online), - }, - }) - .await; - } - } - } else { - let _ = self - .update_sender - .send(UpdateMessage::Error(Error::Presence( - PresenceError::MissingFrom, - ))) - .await; - } - } - Stanza::Iq(iq) => match iq.r#type { - stanza::client::iq::IqType::Error | stanza::client::iq::IqType::Result => { - let send; - { - send = self.pending.lock().await.remove(&iq.id); - } - if let Some(send) = send { - send.send(Ok(Stanza::Iq(iq))); - } else { - let _ = self - .update_sender - .send(UpdateMessage::Error(Error::Iq(IqError::NoMatchingId( - iq.id, - )))) - .await; - } - } - // TODO: send unsupported to server - // TODO: proper errors i am so tired please - stanza::client::iq::IqType::Get => {} - stanza::client::iq::IqType::Set => { - if let Some(query) = iq.query { - match query { - stanza::client::iq::Query::Roster(mut query) => { - // TODO: there should only be one - if let Some(item) = query.items.pop() { - match item.subscription { - Some(stanza::roster::Subscription::Remove) => { - self.db.delete_contact(item.jid.clone()).await; - self.update_sender - .send(UpdateMessage::RosterDelete(item.jid)) - .await; - // TODO: send result - } - _ => { - let contact: Contact = item.into(); - if let Err(e) = - self.db.upsert_contact(contact.clone()).await - { - let _ = self - .update_sender - .send(UpdateMessage::Error(Error::Roster( - RosterError::Cache(e.into()), - ))) - .await; - } - let _ = self - .update_sender - .send(UpdateMessage::RosterUpdate(contact)) - .await; - // TODO: send result - // write_handle.write(Stanza::Iq(stanza::client::iq::Iq { - // from: , - // id: todo!(), - // to: todo!(), - // r#type: todo!(), - // lang: todo!(), - // query: todo!(), - // errors: todo!(), - // })); - } - } - } - } - // TODO: send unsupported to server - _ => {} - } - } else { - // TODO: send error (unsupported) to server - } - } - }, - Stanza::Error(error) => { - let _ = self - .update_sender - .send(UpdateMessage::Error(Error::Stream(error))) - .await; - // TODO: reconnect - } - Stanza::OtherContent(content) => { - let _ = self - .update_sender - .send(UpdateMessage::Error(Error::UnrecognizedContent(content))); - // TODO: send error to write_thread - } - } - } - - async fn handle_online(self, command: Command, connection: Connected) { - match command { - Command::GetRoster(result_sender) => { - // TODO: jid resource should probably be stored within the connection - debug!("before client_jid lock"); - debug!("after client_jid lock"); - let iq_id = Uuid::new_v4().to_string(); - let (send, iq_recv) = oneshot::channel(); - { - self.pending.lock().await.insert(iq_id.clone(), send); - } - let stanza = Stanza::Iq(Iq { - from: Some(connection.jid), - id: iq_id.to_string(), - to: None, - r#type: IqType::Get, - lang: None, - query: Some(iq::Query::Roster(stanza::roster::Query { - ver: None, - items: Vec::new(), - })), - errors: Vec::new(), - }); - let (send, recv) = oneshot::channel(); - let _ = connection - .write_handle - .send(WriteMessage { - stanza, - respond_to: send, - }) - .await; - // TODO: timeout - match recv.await { - Ok(Ok(())) => info!("roster request sent"), - Ok(Err(e)) => { - // TODO: log errors if fail to send - let _ = result_sender.send(Err(RosterError::Write(e.into()))); - return; - } - Err(e) => { - let _ = result_sender - .send(Err(RosterError::Write(WriteError::Actor(e.into())))); - return; - } - }; - // TODO: timeout - match iq_recv.await { - Ok(Ok(stanza)) => match stanza { - Stanza::Iq(Iq { - from: _, - id, - to: _, - r#type, - lang: _, - query: Some(iq::Query::Roster(stanza::roster::Query { ver: _, items })), - errors: _, - }) if id == iq_id && r#type == IqType::Result => { - let contacts: Vec<Contact> = - items.into_iter().map(|item| item.into()).collect(); - if let Err(e) = self.db.replace_cached_roster(contacts.clone()).await { - self.update_sender - .send(UpdateMessage::Error(Error::Roster(RosterError::Cache( - e.into(), - )))) - .await; - }; - result_sender.send(Ok(contacts)); - return; - } - ref s @ Stanza::Iq(Iq { - from: _, - ref id, - to: _, - r#type, - lang: _, - query: _, - ref errors, - }) if *id == iq_id && r#type == IqType::Error => { - if let Some(error) = errors.first() { - result_sender.send(Err(RosterError::StanzaError(error.clone()))); - } else { - result_sender.send(Err(RosterError::UnexpectedStanza(s.clone()))); - } - return; - } - s => { - result_sender.send(Err(RosterError::UnexpectedStanza(s))); - return; - } - }, - Ok(Err(e)) => { - result_sender.send(Err(RosterError::Read(e))); - return; - } - Err(e) => { - result_sender.send(Err(RosterError::Write(WriteError::Actor(e.into())))); - return; - } - } - } - Command::GetChats(sender) => { - let chats = self.db.read_chats().await.map_err(|e| e.into()); - sender.send(chats); - } - Command::GetChatsOrdered(sender) => { - let chats = self.db.read_chats_ordered().await.map_err(|e| e.into()); - sender.send(chats); - } - Command::GetChatsOrderedWithLatestMessages(sender) => { - let chats = self - .db - .read_chats_ordered_with_latest_messages() - .await - .map_err(|e| e.into()); - sender.send(chats); - } - Command::GetChat(jid, sender) => { - let chats = self.db.read_chat(jid).await.map_err(|e| e.into()); - sender.send(chats); - } - Command::GetMessages(jid, sender) => { - let messages = self - .db - .read_message_history(jid) - .await - .map_err(|e| e.into()); - sender.send(messages); - } - Command::DeleteChat(jid, sender) => { - let result = self.db.delete_chat(jid).await.map_err(|e| e.into()); - sender.send(result); - } - Command::DeleteMessage(uuid, sender) => { - let result = self.db.delete_message(uuid).await.map_err(|e| e.into()); - sender.send(result); - } - Command::GetUser(jid, sender) => { - let user = self.db.read_user(jid).await.map_err(|e| e.into()); - sender.send(user); - } - // TODO: offline queue to modify roster - Command::AddContact(jid, sender) => { - let iq_id = Uuid::new_v4().to_string(); - let set_stanza = Stanza::Iq(Iq { - from: Some(connection.jid), - id: iq_id.clone(), - to: None, - r#type: IqType::Set, - lang: None, - query: Some(iq::Query::Roster(stanza::roster::Query { - ver: None, - items: vec![stanza::roster::Item { - approved: None, - ask: false, - jid, - name: None, - subscription: None, - groups: Vec::new(), - }], - })), - errors: Vec::new(), - }); - let (send, recv) = oneshot::channel(); - { - self.pending.lock().await.insert(iq_id.clone(), send); - } - // TODO: write_handle send helper function - let result = connection.write_handle.write(set_stanza).await; - if let Err(e) = result { - sender.send(Err(RosterError::Write(e))); - return; - } - let iq_result = recv.await; - match iq_result { - Ok(i) => match i { - Ok(iq_result) => match iq_result { - Stanza::Iq(Iq { - from: _, - id, - to: _, - r#type, - lang: _, - query: _, - errors: _, - }) if id == iq_id && r#type == IqType::Result => { - sender.send(Ok(())); - return; - } - ref s @ Stanza::Iq(Iq { - from: _, - ref id, - to: _, - r#type, - lang: _, - query: _, - ref errors, - }) if *id == iq_id && r#type == IqType::Error => { - if let Some(error) = errors.first() { - sender.send(Err(RosterError::StanzaError(error.clone()))); - } else { - sender.send(Err(RosterError::UnexpectedStanza(s.clone()))); - } - return; - } - s => { - sender.send(Err(RosterError::UnexpectedStanza(s))); - return; - } - }, - Err(e) => { - sender.send(Err(e.into())); - return; - } - }, - Err(e) => { - sender.send(Err(RosterError::Write(WriteError::Actor(e.into())))); - return; - } - } - } - Command::BuddyRequest(jid, sender) => { - let presence = Stanza::Presence(stanza::client::presence::Presence { - from: None, - id: None, - to: Some(jid.clone()), - r#type: Some(stanza::client::presence::PresenceType::Subscribe), - lang: None, - show: None, - status: None, - priority: None, - errors: Vec::new(), - delay: None, - }); - let result = connection.write_handle.write(presence).await; - match result { - Err(_) => { - let _ = sender.send(result); - } - Ok(()) => { - let presence = Stanza::Presence(stanza::client::presence::Presence { - from: None, - id: None, - to: Some(jid), - r#type: Some(stanza::client::presence::PresenceType::Subscribed), - lang: None, - show: None, - status: None, - priority: None, - errors: Vec::new(), - delay: None, - }); - let result = connection.write_handle.write(presence).await; - let _ = sender.send(result); - } - } - } - Command::SubscriptionRequest(jid, sender) => { - // TODO: i should probably have builders - let presence = Stanza::Presence(stanza::client::presence::Presence { - from: None, - id: None, - to: Some(jid), - r#type: Some(stanza::client::presence::PresenceType::Subscribe), - lang: None, - show: None, - status: None, - priority: None, - errors: Vec::new(), - delay: None, - }); - let result = connection.write_handle.write(presence).await; - let _ = sender.send(result); - } - Command::AcceptBuddyRequest(jid, sender) => { - let presence = Stanza::Presence(stanza::client::presence::Presence { - from: None, - id: None, - to: Some(jid.clone()), - r#type: Some(stanza::client::presence::PresenceType::Subscribed), - lang: None, - show: None, - status: None, - priority: None, - errors: Vec::new(), - delay: None, - }); - let result = connection.write_handle.write(presence).await; - match result { - Err(_) => { - let _ = sender.send(result); - } - Ok(()) => { - let presence = Stanza::Presence(stanza::client::presence::Presence { - from: None, - id: None, - to: Some(jid), - r#type: Some(stanza::client::presence::PresenceType::Subscribe), - lang: None, - show: None, - status: None, - priority: None, - errors: Vec::new(), - delay: None, - }); - let result = connection.write_handle.write(presence).await; - let _ = sender.send(result); - } - } - } - Command::AcceptSubscriptionRequest(jid, sender) => { - let presence = Stanza::Presence(stanza::client::presence::Presence { - from: None, - id: None, - to: Some(jid), - r#type: Some(stanza::client::presence::PresenceType::Subscribe), - lang: None, - show: None, - status: None, - priority: None, - errors: Vec::new(), - delay: None, - }); - let result = connection.write_handle.write(presence).await; - let _ = sender.send(result); - } - Command::UnsubscribeFromContact(jid, sender) => { - let presence = Stanza::Presence(stanza::client::presence::Presence { - from: None, - id: None, - to: Some(jid), - r#type: Some(stanza::client::presence::PresenceType::Unsubscribe), - lang: None, - show: None, - status: None, - priority: None, - errors: Vec::new(), - delay: None, - }); - let result = connection.write_handle.write(presence).await; - let _ = sender.send(result); - } - Command::UnsubscribeContact(jid, sender) => { - let presence = Stanza::Presence(stanza::client::presence::Presence { - from: None, - id: None, - to: Some(jid), - r#type: Some(stanza::client::presence::PresenceType::Unsubscribed), - lang: None, - show: None, - status: None, - priority: None, - errors: Vec::new(), - delay: None, - }); - let result = connection.write_handle.write(presence).await; - let _ = sender.send(result); - } - Command::UnfriendContact(jid, sender) => { - let presence = Stanza::Presence(stanza::client::presence::Presence { - from: None, - id: None, - to: Some(jid.clone()), - r#type: Some(stanza::client::presence::PresenceType::Unsubscribe), - lang: None, - show: None, - status: None, - priority: None, - errors: Vec::new(), - delay: None, - }); - let result = connection.write_handle.write(presence).await; - match result { - Err(_) => { - let _ = sender.send(result); - } - Ok(()) => { - let presence = Stanza::Presence(stanza::client::presence::Presence { - from: None, - id: None, - to: Some(jid), - r#type: Some(stanza::client::presence::PresenceType::Unsubscribed), - lang: None, - show: None, - status: None, - priority: None, - errors: Vec::new(), - delay: None, - }); - let result = connection.write_handle.write(presence).await; - let _ = sender.send(result); - } - } - } - Command::DeleteContact(jid, sender) => { - let iq_id = Uuid::new_v4().to_string(); - let set_stanza = Stanza::Iq(Iq { - from: Some(connection.jid), - id: iq_id.clone(), - to: None, - r#type: IqType::Set, - lang: None, - query: Some(iq::Query::Roster(stanza::roster::Query { - ver: None, - items: vec![stanza::roster::Item { - approved: None, - ask: false, - jid, - name: None, - subscription: Some(stanza::roster::Subscription::Remove), - groups: Vec::new(), - }], - })), - errors: Vec::new(), - }); - let (send, recv) = oneshot::channel(); - { - self.pending.lock().await.insert(iq_id.clone(), send); - } - let result = connection.write_handle.write(set_stanza).await; - if let Err(e) = result { - sender.send(Err(RosterError::Write(e))); - return; - } - let iq_result = recv.await; - match iq_result { - Ok(i) => match i { - Ok(iq_result) => match iq_result { - Stanza::Iq(Iq { - from: _, - id, - to: _, - r#type, - lang: _, - query: _, - errors: _, - }) if id == iq_id && r#type == IqType::Result => { - sender.send(Ok(())); - return; - } - ref s @ Stanza::Iq(Iq { - from: _, - ref id, - to: _, - r#type, - lang: _, - query: _, - ref errors, - }) if *id == iq_id && r#type == IqType::Error => { - if let Some(error) = errors.first() { - sender.send(Err(RosterError::StanzaError(error.clone()))); - } else { - sender.send(Err(RosterError::UnexpectedStanza(s.clone()))); - } - return; - } - s => { - sender.send(Err(RosterError::UnexpectedStanza(s))); - return; - } - }, - Err(e) => { - sender.send(Err(e.into())); - return; - } - }, - Err(e) => { - sender.send(Err(RosterError::Write(WriteError::Actor(e.into())))); - return; - } - } - } - Command::UpdateContact(jid, contact_update, sender) => { - let iq_id = Uuid::new_v4().to_string(); - let groups = Vec::from_iter( - contact_update - .groups - .into_iter() - .map(|group| stanza::roster::Group(Some(group))), - ); - let set_stanza = Stanza::Iq(Iq { - from: Some(connection.jid), - id: iq_id.clone(), - to: None, - r#type: IqType::Set, - lang: None, - query: Some(iq::Query::Roster(stanza::roster::Query { - ver: None, - items: vec![stanza::roster::Item { - approved: None, - ask: false, - jid, - name: contact_update.name, - subscription: None, - groups, - }], - })), - errors: Vec::new(), - }); - let (send, recv) = oneshot::channel(); - { - self.pending.lock().await.insert(iq_id.clone(), send); - } - let result = connection.write_handle.write(set_stanza).await; - if let Err(e) = result { - sender.send(Err(RosterError::Write(e))); - return; - } - let iq_result = recv.await; - match iq_result { - Ok(i) => match i { - Ok(iq_result) => match iq_result { - Stanza::Iq(Iq { - from: _, - id, - to: _, - r#type, - lang: _, - query: _, - errors: _, - }) if id == iq_id && r#type == IqType::Result => { - sender.send(Ok(())); - return; - } - ref s @ Stanza::Iq(Iq { - from: _, - ref id, - to: _, - r#type, - lang: _, - query: _, - ref errors, - }) if *id == iq_id && r#type == IqType::Error => { - if let Some(error) = errors.first() { - sender.send(Err(RosterError::StanzaError(error.clone()))); - } else { - sender.send(Err(RosterError::UnexpectedStanza(s.clone()))); - } - return; - } - s => { - sender.send(Err(RosterError::UnexpectedStanza(s))); - return; - } - }, - Err(e) => { - sender.send(Err(e.into())); - return; - } - }, - Err(e) => { - sender.send(Err(RosterError::Write(WriteError::Actor(e.into())))); - return; - } - } - } - Command::SetStatus(online, sender) => { - let result = self.db.upsert_cached_status(online.clone()).await; - if let Err(e) = result { - let _ = self - .update_sender - .send(UpdateMessage::Error(Error::SetStatus(StatusError::Cache( - e.into(), - )))) - .await; - } - let result = connection - .write_handle - .write(Stanza::Presence(online.into_stanza(None))) - .await - .map_err(|e| StatusError::Write(e)); - // .map_err(|e| StatusError::Write(e)); - let _ = sender.send(result); - } - // TODO: offline message queue - Command::SendMessage(jid, body, sender) => { - let id = Uuid::new_v4(); - let message = Stanza::Message(stanza::client::message::Message { - from: Some(connection.jid.clone()), - id: Some(id.to_string()), - to: Some(jid.clone()), - // TODO: specify message type - r#type: stanza::client::message::MessageType::Chat, - // TODO: lang ? - lang: None, - subject: None, - body: Some(stanza::client::message::Body { - lang: None, - body: Some(body.body.clone()), - }), - thread: None, - delay: None, - }); - let _ = sender.send(Ok(())); - // let _ = sender.send(Ok(message.clone())); - let result = connection.write_handle.write(message).await; - match result { - Ok(_) => { - let mut message = Message { - id, - from: connection.jid, - body, - timestamp: Utc::now(), - }; - info!("send message {:?}", message); - if let Err(e) = self - .db - .create_message_with_self_resource_and_chat( - message.clone(), - jid.clone(), - ) - .await - .map_err(|e| e.into()) - { - tracing::error!("{}", e); - let _ = - self.update_sender - .send(UpdateMessage::Error(Error::MessageSend( - error::MessageSendError::MessageHistory(e), - ))); - } - // TODO: don't do this, have separate from from details - message.from = message.from.as_bare(); - let _ = self - .update_sender - .send(UpdateMessage::Message { to: jid, message }) - .await; - } - Err(_) => { - // let _ = sender.send(result); - } - } - } - Command::SendPresence(jid, presence, sender) => { - let mut presence: stanza::client::presence::Presence = presence.into(); - if let Some(jid) = jid { - presence.to = Some(jid); - }; - let result = connection - .write_handle - .write(Stanza::Presence(presence)) - .await; - // .map_err(|e| StatusError::Write(e)); - let _ = sender.send(result); - } - } - } - - async fn handle_offline(self, command: Command) { - match command { - Command::GetRoster(sender) => { - let roster = self.db.read_cached_roster().await; - match roster { - Ok(roster) => { - let _ = sender.send(Ok(roster)); - } - Err(e) => { - let _ = sender.send(Err(RosterError::Cache(e.into()))); - } - } - } - Command::GetChats(sender) => { - let chats = self.db.read_chats().await.map_err(|e| e.into()); - sender.send(chats); - } - Command::GetChatsOrdered(sender) => { - let chats = self.db.read_chats_ordered().await.map_err(|e| e.into()); - sender.send(chats); - } - Command::GetChatsOrderedWithLatestMessages(sender) => { - let chats = self - .db - .read_chats_ordered_with_latest_messages() - .await - .map_err(|e| e.into()); - sender.send(chats); - } - Command::GetChat(jid, sender) => { - let chats = self.db.read_chat(jid).await.map_err(|e| e.into()); - sender.send(chats); - } - Command::GetMessages(jid, sender) => { - let messages = self - .db - .read_message_history(jid) - .await - .map_err(|e| e.into()); - sender.send(messages); - } - Command::DeleteChat(jid, sender) => { - let result = self.db.delete_chat(jid).await.map_err(|e| e.into()); - sender.send(result); - } - Command::DeleteMessage(uuid, sender) => { - let result = self.db.delete_message(uuid).await.map_err(|e| e.into()); - sender.send(result); - } - Command::GetUser(jid, sender) => { - let user = self.db.read_user(jid).await.map_err(|e| e.into()); - sender.send(user); - } - // TODO: offline queue to modify roster - Command::AddContact(_jid, sender) => { - sender.send(Err(RosterError::Write(WriteError::Disconnected))); - } - Command::BuddyRequest(_jid, sender) => { - sender.send(Err(WriteError::Disconnected)); - } - Command::SubscriptionRequest(_jid, sender) => { - sender.send(Err(WriteError::Disconnected)); - } - Command::AcceptBuddyRequest(_jid, sender) => { - sender.send(Err(WriteError::Disconnected)); - } - Command::AcceptSubscriptionRequest(_jid, sender) => { - sender.send(Err(WriteError::Disconnected)); - } - Command::UnsubscribeFromContact(_jid, sender) => { - sender.send(Err(WriteError::Disconnected)); - } - Command::UnsubscribeContact(_jid, sender) => { - sender.send(Err(WriteError::Disconnected)); - } - Command::UnfriendContact(_jid, sender) => { - sender.send(Err(WriteError::Disconnected)); - } - Command::DeleteContact(_jid, sender) => { - sender.send(Err(RosterError::Write(WriteError::Disconnected))); - } - Command::UpdateContact(_jid, _contact_update, sender) => { - sender.send(Err(RosterError::Write(WriteError::Disconnected))); - } - Command::SetStatus(online, sender) => { - let result = self - .db - .upsert_cached_status(online) - .await - .map_err(|e| StatusError::Cache(e.into())); - sender.send(result); - } - // TODO: offline message queue - Command::SendMessage(_jid, _body, sender) => { - sender.send(Err(WriteError::Disconnected)); - } - Command::SendPresence(_jid, _presence, sender) => { - sender.send(Err(WriteError::Disconnected)); - } - } - } - // pub async fn handle_stream_error(self, error) {} - // stanza errors (recoverable) - // pub async fn handle_error(self, error: Error) {} - // when it aborts, must clear iq map no matter what - async fn on_abort(self) { - let mut iqs = self.pending.lock().await; - for (_id, sender) in iqs.drain() { - let _ = sender.send(Err(ReadError::LostConnection)); - } - } - - async fn handle_connection_error(self, error: ConnectionError) { - self.update_sender - .send(UpdateMessage::Error( - ConnectionError::AlreadyConnected.into(), - )) - .await; - } -} - -#[derive(Clone)] -pub struct Connected { - // full jid will stay stable across reconnections - jid: JID, - write_handle: WriteHandle, -} - -pub trait Logic { - type Cmd; - - fn handle_connect(self, connection: Connected) -> impl std::future::Future<Output = ()> + Send; - fn handle_disconnect( - self, - connection: Connected, - ) -> impl std::future::Future<Output = ()> + Send; - fn handle_stanza( - self, - stanza: Stanza, - connection: Connected, - supervisor: SupervisorSender, - ) -> impl std::future::Future<Output = ()> + std::marker::Send; - fn handle_online( - self, - command: Self::Cmd, - connection: Connected, - ) -> impl std::future::Future<Output = ()> + std::marker::Send; - fn handle_offline( - self, - command: Self::Cmd, - ) -> impl std::future::Future<Output = ()> + std::marker::Send; - fn on_abort(self) -> impl std::future::Future<Output = ()> + std::marker::Send; - // TODO: look at these - fn handle_connection_error( - self, - error: ConnectionError, - ) -> impl std::future::Future<Output = ()> + std::marker::Send; - // async fn handle_stream_error(self, error) {} -} - -pub struct Luz<Lgc: Logic> { - jid: JID, - password: Arc<String>, - receiver: mpsc::Receiver<LuzMessage<Lgc::Cmd>>, - // TODO: use a dyn passwordprovider trait to avoid storing password in memory - connected: Option<(Connected, SupervisorHandle)>, - // connected_intention: bool, - /// if connection was shut down due to e.g. server shutdown, supervisor must be able to mark client as disconnected - connection_supervisor_shutdown: Fuse<oneshot::Receiver<()>>, - // TODO: will need to have an auto reconnect state as well (e.g. in case server shut down, to try and reconnect later) - // TODO: will grow forever at this point, maybe not required as tasks will naturally shut down anyway? - // TODO: genericize - logic: Lgc, - // config: LampConfig, - tasks: JoinSet<()>, -} - -impl<Lgc: Logic + Clone + Send + 'static> Luz<Lgc> { - fn new( - jid: JID, - password: String, - receiver: mpsc::Receiver<LuzMessage<Lgc::Cmd>>, - connected: Option<(Connected, SupervisorHandle)>, - connection_supervisor_shutdown: Fuse<oneshot::Receiver<()>>, - logic: Lgc, - ) -> Self { - Self { - jid, - password: Arc::new(password), - connected, - receiver, - connection_supervisor_shutdown, - logic, - tasks: JoinSet::new(), - } - } - - async fn run(mut self) { - loop { - let msg = tokio::select! { - // this is okay, as when created the supervisor (and connection) doesn't exist, but a bit messy - // THIS IS NOT OKAY LOLLLL - apparently fusing is the best option??? - _ = &mut self.connection_supervisor_shutdown => { - self.connected = None; - continue; - } - Some(msg) = self.receiver.recv() => { - msg - }, - else => break, - }; - // TODO: consider separating disconnect/connect and commands apart from commandmessage - // TODO: dispatch commands separate tasks - match msg { - LuzMessage::Connect => { - match self.connected { - Some(_) => { - self.logic - .clone() - .handle_connection_error(ConnectionError::AlreadyConnected) - .await; - } - None => { - let mut jid = self.jid.clone(); - let mut domain = jid.domainpart.clone(); - // TODO: check what happens upon reconnection with same resource (this is probably what one wants to do and why jid should be mutated from a bare jid to one with a resource) - let streams_result = - jabber::connect_and_login(&mut jid, &*self.password, &mut domain) - .await; - match streams_result { - Ok(s) => { - debug!("ok stream result"); - let (shutdown_send, shutdown_recv) = oneshot::channel::<()>(); - let (writer, supervisor) = SupervisorHandle::new( - s, - shutdown_send, - jid.clone(), - self.password.clone(), - self.logic.clone(), - ); - - let shutdown_recv = shutdown_recv.fuse(); - self.connection_supervisor_shutdown = shutdown_recv; - - let connected = Connected { - jid, - write_handle: writer, - }; - - self.logic.clone().handle_connect(connected.clone()).await; - - self.connected = Some((connected, supervisor)); - } - Err(e) => { - tracing::error!("error: {}", e); - self.logic - .clone() - .handle_connection_error(ConnectionError::ConnectionFailed( - e.into(), - )) - .await; - } - } - } - }; - } - LuzMessage::Disconnect => match self.connected { - None => { - self.logic - .clone() - .handle_connection_error(ConnectionError::AlreadyDisconnected) - .await; - } - ref mut c => { - if let Some((connected, supervisor_handle)) = c.take() { - // TODO: better disconnect logic, only reflect once actually disconnected - // TODO: call within supervisor instead - self.logic - .clone() - .handle_disconnect(connected.clone()) - .await; - let _ = supervisor_handle.send(SupervisorCommand::Disconnect).await; - } else { - unreachable!() - }; - } - }, - LuzMessage::Command(command) => { - match self.connected.as_ref() { - Some((w, s)) => self - .tasks - .spawn(self.logic.clone().handle_online(command, w.clone())), - None => self.tasks.spawn(self.logic.clone().handle_offline(command)), - }; - } - } - } - } -} - -// TODO: generate methods for each with a macro -pub enum LuzMessage<C> { - // TODO: login invisible xep-0186 - /// connect to XMPP chat server. gets roster and publishes initial presence. - Connect, - /// disconnect from XMPP chat server, sending unavailable presence then closing stream. - Disconnect, - /// TODO: generics - Command(C), -} +pub use client::connect_and_login; -#[derive(Debug, Clone)] -pub enum UpdateMessage { - Error(Error), - Online(Online, Vec<Contact>), - Offline(Offline), - /// received roster from jabber server (replace full app roster state with this) - /// is this needed? - FullRoster(Vec<Contact>), - /// (only update app roster state, don't replace) - RosterUpdate(Contact), - RosterDelete(JID), - /// presences should be stored with users in the ui, not contacts, as presences can be received from anyone - Presence { - from: JID, - presence: Presence, - }, - // TODO: receipts - // MessageDispatched(Uuid), - Message { - to: JID, - message: Message, - }, - SubscriptionRequest(jid::JID), +#[cfg(test)] +mod tests { + // #[tokio::test] + // async fn test_login() { + // crate::login("test@blos.sm/clown", "slayed").await.unwrap(); + // } } |