use std::{ collections::HashMap, ops::{Deref, DerefMut}, str::FromStr, sync::Arc, time::Duration, }; use chat::{Body, Chat, Delivery, Message}; use chrono::Utc; use db::Db; use disco::{Info, Items}; use error::{ AvatarPublishError, ConnectionJobError, DatabaseError, DiscoError, Error, IqError, MessageRecvError, NickError, PEPError, PresenceError, RosterError, StatusError, SubscribeError, }; use files::FileStore; use futures::FutureExt; use jid::JID; use lampada::{ Connected, CoreClient, CoreClientCommand, Logic, SupervisorSender, WriteMessage, error::{ActorError, CommandError, ConnectionError, ReadError, WriteError}, }; use logic::ClientLogic; use presence::{Offline, Online, Presence, PresenceType, Show}; use roster::{Contact, ContactUpdate}; use stanza::client::{ Stanza, iq::{self, Iq, IqType}, }; use tokio::{ sync::{Mutex, mpsc, oneshot}, time::timeout, }; use tracing::{debug, info}; use user::User; use uuid::Uuid; pub mod avatar; pub mod caps; pub mod chat; pub mod db; pub mod disco; pub mod error; pub mod files; mod logic; pub mod pep; pub mod presence; pub mod roster; pub mod user; pub enum Command { /// get the roster. if offline, retreive cached version from database. should be stored in application memory GetRoster(oneshot::Sender, RosterError>>), /// get the roster. if offline, retreive cached version from database. should be stored in application memory. includes user associated with each contact GetRosterWithUsers(oneshot::Sender, RosterError>>), /// get all chats. chat will include 10 messages in their message Vec (enough for chat previews) // TODO: paging and filtering GetChats(oneshot::Sender, DatabaseError>>), // TODO: paging and filtering GetChatsOrdered(oneshot::Sender, DatabaseError>>), // TODO: paging and filtering GetChatsOrderedWithLatestMessages(oneshot::Sender, DatabaseError>>), // TODO: paging and filtering, nullabillity for latest message GetChatsOrderedWithLatestMessagesAndUsers( oneshot::Sender, DatabaseError>>, ), /// get a specific chat by jid GetChat(JID, oneshot::Sender>), /// get message history for chat (does appropriate mam things) // TODO: paging and filtering GetMessages(JID, oneshot::Sender, DatabaseError>>), /// get message history for chat (does appropriate mam things) // TODO: paging and filtering GetMessagesWithUsers( JID, oneshot::Sender, DatabaseError>>, ), /// delete a chat from your chat history, along with all the corresponding messages DeleteChat(JID, oneshot::Sender>), /// delete a message from your chat history DeleteMessage(Uuid, oneshot::Sender>), /// get a user from your users database GetUser(JID, oneshot::Sender>), /// add a contact to your roster, with a status of none, no subscriptions. AddContact(JID, oneshot::Sender>), /// send a friend request i.e. a subscription request with a subscription pre-approval. if not already added to roster server adds to roster. BuddyRequest(JID, oneshot::Sender>), /// send a subscription request, without pre-approval. if not already added to roster server adds to roster. SubscriptionRequest(JID, oneshot::Sender>), /// accept a friend request by accepting a pending subscription and sending a subscription request back. if not already added to roster adds to roster. AcceptBuddyRequest(JID, oneshot::Sender>), /// accept a pending subscription and doesn't send a subscription request back. if not already added to roster adds to roster. AcceptSubscriptionRequest(JID, oneshot::Sender>), /// unsubscribe to a contact, but don't remove their subscription. UnsubscribeFromContact(JID, oneshot::Sender>), /// stop a contact from being subscribed, but stay subscribed to the contact. UnsubscribeContact(JID, oneshot::Sender>), /// remove subscriptions to and from contact, but keep in roster. UnfriendContact(JID, oneshot::Sender>), /// remove a contact from the contact list. will remove subscriptions if not already done then delete contact from roster. DeleteContact(JID, oneshot::Sender>), /// update contact. contact details will be overwritten with the contents of the contactupdate struct. UpdateContact(JID, ContactUpdate, oneshot::Sender>), /// set online status. if disconnected, will be cached so when client connects, will be sent as the initial presence. SetStatus(Online, oneshot::Sender>), /// send presence stanza // TODO: cache presence stanza SendPresence( Option, PresenceType, oneshot::Sender>, ), /// send a directed presence (usually to a non-contact). // TODO: should probably make it so people can add non-contact auto presence sharing in the client (most likely through setting an internal setting) /// send a message to a jid (any kind of jid that can receive a message, e.g. a user or a /// chatroom). if disconnected, will be cached so when client connects, message will be sent. SendMessage(JID, Body), // TODO: resend failed messages // ResendMessage(Uuid), /// disco info query DiscoInfo( Option, Option, oneshot::Sender>, ), /// disco items query DiscoItems( Option, Option, oneshot::Sender>, ), /// publish item to a pep node specified. PublishPEPItem { item: pep::Item, node: String, sender: oneshot::Sender>, }, DeletePEPNode { node: String, sender: oneshot::Sender>, }, GetPEPItem { jid: Option, node: String, id: String, sender: oneshot::Sender>, }, /// change client user nickname ChangeNick(Option, oneshot::Sender>), // // TODO // GetNick(...), // GetAvatar(...) // /// get capability node // GetCaps(String, oneshot::Sender>), /// change client user avatar ChangeAvatar( Option>, oneshot::Sender>>, ), } #[derive(Debug, Clone)] pub enum UpdateMessage { Online(Online, Vec<(Contact, User)>), Offline(Offline), /// (only update app roster state, don't replace) RosterUpdate(Contact, User), RosterDelete(JID), /// presences should be stored with users in the ui, not contacts, as presences can be received from anyone Presence { from: JID, presence: Presence, }, // TODO: receipts // MessageDispatched(Uuid), Message { to: JID, from: User, message: Message, }, MessageDelivery { id: Uuid, chat: JID, delivery: Delivery, }, SubscriptionRequest(jid::JID), NickChanged { jid: JID, nick: Option, }, AvatarChanged { jid: JID, id: Option, }, } /// an xmpp client that is suited for a chat client use case #[derive(Debug)] pub struct Client { sender: mpsc::Sender>>, timeout: Duration, } impl Client { pub fn with_timeout(&self, timeout: Duration) -> Self { Self { sender: self.sender.clone(), timeout, } } } impl Clone for Client { fn clone(&self) -> Self { Self { sender: self.sender.clone(), timeout: self.timeout, } } } impl Deref for Client { type Target = mpsc::Sender>>; fn deref(&self) -> &Self::Target { &self.sender } } impl DerefMut for Client { fn deref_mut(&mut self) -> &mut Self::Target { &mut self.sender } } impl Client { pub fn new( jid: JID, password: String, db: Db, file_store: Fs, ) -> (Self, mpsc::Receiver) { let (command_sender, command_receiver) = mpsc::channel(20); let (update_send, update_recv) = mpsc::channel(20); // might be bad, first supervisor shutdown notification oneshot is never used (disgusting) let (_sup_send, sup_recv) = oneshot::channel(); let sup_recv = sup_recv.fuse(); let client = Self { sender: command_sender, // TODO: configure timeout timeout: Duration::from_secs(10), }; let logic = ClientLogic::new(client.clone(), jid.as_bare(), db, update_send, file_store); let actor: CoreClient> = CoreClient::new(jid, password, command_receiver, None, sup_recv, logic); tokio::spawn(async move { actor.run().await }); (client, update_recv) } } impl Client { pub async fn connect(&self) -> Result<(), ActorError> { self.send(CoreClientCommand::Connect).await?; Ok(()) } pub async fn disconnect(&self, offline: Offline) -> Result<(), ActorError> { self.send(CoreClientCommand::Disconnect).await?; Ok(()) } pub async fn get_roster(&self) -> Result, CommandError> { let (send, recv) = oneshot::channel(); self.send(CoreClientCommand::Command(Command::GetRoster(send))) .await .map_err(|e| CommandError::Actor(Into::::into(e)))?; let roster = timeout(self.timeout, recv) .await .map_err(|e| CommandError::Actor(Into::::into(e)))? .map_err(|e| CommandError::Actor(Into::::into(e)))??; Ok(roster) } pub async fn get_roster_with_users( &self, ) -> Result, CommandError> { let (send, recv) = oneshot::channel(); self.send(CoreClientCommand::Command(Command::GetRosterWithUsers( send, ))) .await .map_err(|e| CommandError::Actor(Into::::into(e)))?; let roster = timeout(self.timeout, recv) .await .map_err(|e| CommandError::Actor(Into::::into(e)))? .map_err(|e| CommandError::Actor(Into::::into(e)))??; Ok(roster) } pub async fn get_chats(&self) -> Result, CommandError> { let (send, recv) = oneshot::channel(); self.send(CoreClientCommand::Command(Command::GetChats(send))) .await .map_err(|e| CommandError::Actor(Into::::into(e)))?; let chats = timeout(self.timeout, recv) .await .map_err(|e| CommandError::Actor(Into::::into(e)))? .map_err(|e| CommandError::Actor(Into::::into(e)))??; Ok(chats) } pub async fn get_chats_ordered(&self) -> Result, CommandError> { let (send, recv) = oneshot::channel(); self.send(CoreClientCommand::Command(Command::GetChatsOrdered(send))) .await .map_err(|e| CommandError::Actor(Into::::into(e)))?; let chats = timeout(self.timeout, recv) .await .map_err(|e| CommandError::Actor(Into::::into(e)))? .map_err(|e| CommandError::Actor(Into::::into(e)))??; Ok(chats) } pub async fn get_chats_ordered_with_latest_messages( &self, ) -> Result, CommandError> { let (send, recv) = oneshot::channel(); self.send(CoreClientCommand::Command( Command::GetChatsOrderedWithLatestMessages(send), )) .await .map_err(|e| CommandError::Actor(Into::::into(e)))?; let chats = timeout(self.timeout, recv) .await .map_err(|e| CommandError::Actor(Into::::into(e)))? .map_err(|e| CommandError::Actor(Into::::into(e)))??; Ok(chats) } pub async fn get_chats_ordered_with_latest_messages_and_users( &self, ) -> Result, CommandError> { let (send, recv) = oneshot::channel(); self.send(CoreClientCommand::Command( Command::GetChatsOrderedWithLatestMessagesAndUsers(send), )) .await .map_err(|e| CommandError::Actor(Into::::into(e)))?; let chats = timeout(self.timeout, recv) .await .map_err(|e| CommandError::Actor(Into::::into(e)))? .map_err(|e| CommandError::Actor(Into::::into(e)))??; Ok(chats) } pub async fn get_chat(&self, jid: JID) -> Result> { let (send, recv) = oneshot::channel(); self.send(CoreClientCommand::Command(Command::GetChat(jid, send))) .await .map_err(|e| CommandError::Actor(Into::::into(e)))?; let chat = timeout(self.timeout, recv) .await .map_err(|e| CommandError::Actor(Into::::into(e)))? .map_err(|e| CommandError::Actor(Into::::into(e)))??; Ok(chat) } pub async fn get_messages( &self, jid: JID, ) -> Result, CommandError> { let (send, recv) = oneshot::channel(); self.send(CoreClientCommand::Command(Command::GetMessages(jid, send))) .await .map_err(|e| CommandError::Actor(Into::::into(e)))?; let messages = timeout(self.timeout, recv) .await .map_err(|e| CommandError::Actor(Into::::into(e)))? .map_err(|e| CommandError::Actor(Into::::into(e)))??; Ok(messages) } pub async fn get_messages_with_users( &self, jid: JID, ) -> Result, CommandError> { let (send, recv) = oneshot::channel(); self.send(CoreClientCommand::Command(Command::GetMessagesWithUsers( jid, send, ))) .await .map_err(|e| CommandError::Actor(Into::::into(e)))?; let messages = timeout(self.timeout, recv) .await .map_err(|e| CommandError::Actor(Into::::into(e)))? .map_err(|e| CommandError::Actor(Into::::into(e)))??; Ok(messages) } pub async fn delete_chat(&self, jid: JID) -> Result<(), CommandError> { let (send, recv) = oneshot::channel(); self.send(CoreClientCommand::Command(Command::DeleteChat(jid, send))) .await .map_err(|e| CommandError::Actor(Into::::into(e)))?; let result = timeout(self.timeout, recv) .await .map_err(|e| CommandError::Actor(Into::::into(e)))? .map_err(|e| CommandError::Actor(Into::::into(e)))??; Ok(result) } pub async fn delete_message(&self, id: Uuid) -> Result<(), CommandError> { let (send, recv) = oneshot::channel(); self.send(CoreClientCommand::Command(Command::DeleteMessage(id, send))) .await .map_err(|e| CommandError::Actor(Into::::into(e)))?; let result = timeout(self.timeout, recv) .await .map_err(|e| CommandError::Actor(Into::::into(e)))? .map_err(|e| CommandError::Actor(Into::::into(e)))??; Ok(result) } pub async fn get_user(&self, jid: JID) -> Result> { let (send, recv) = oneshot::channel(); self.send(CoreClientCommand::Command(Command::GetUser(jid, send))) .await .map_err(|e| CommandError::Actor(Into::::into(e)))?; let result = timeout(self.timeout, recv) .await .map_err(|e| CommandError::Actor(Into::::into(e)))? .map_err(|e| CommandError::Actor(Into::::into(e)))??; Ok(result) } pub async fn add_contact(&self, jid: JID) -> Result<(), CommandError> { let (send, recv) = oneshot::channel(); self.send(CoreClientCommand::Command(Command::AddContact(jid, send))) .await .map_err(|e| CommandError::Actor(Into::::into(e)))?; let result = timeout(self.timeout, recv) .await .map_err(|e| CommandError::Actor(Into::::into(e)))? .map_err(|e| CommandError::Actor(Into::::into(e)))??; Ok(result) } pub async fn buddy_request(&self, jid: JID) -> Result<(), CommandError> { let (send, recv) = oneshot::channel(); self.send(CoreClientCommand::Command(Command::BuddyRequest(jid, send))) .await .map_err(|e| CommandError::Actor(Into::::into(e)))?; let result = timeout(self.timeout, recv) .await .map_err(|e| CommandError::Actor(Into::::into(e)))? .map_err(|e| CommandError::Actor(Into::::into(e)))??; Ok(result) } pub async fn subscription_request(&self, jid: JID) -> Result<(), CommandError> { let (send, recv) = oneshot::channel(); self.send(CoreClientCommand::Command(Command::SubscriptionRequest( jid, send, ))) .await .map_err(|e| CommandError::Actor(Into::::into(e)))?; let result = timeout(self.timeout, recv) .await .map_err(|e| CommandError::Actor(Into::::into(e)))? .map_err(|e| CommandError::Actor(Into::::into(e)))??; Ok(result) } pub async fn accept_buddy_request(&self, jid: JID) -> Result<(), CommandError> { let (send, recv) = oneshot::channel(); self.send(CoreClientCommand::Command(Command::AcceptBuddyRequest( jid, send, ))) .await .map_err(|e| CommandError::Actor(Into::::into(e)))?; let result = timeout(self.timeout, recv) .await .map_err(|e| CommandError::Actor(Into::::into(e)))? .map_err(|e| CommandError::Actor(Into::::into(e)))??; Ok(result) } pub async fn accept_subscription_request( &self, jid: JID, ) -> Result<(), CommandError> { let (send, recv) = oneshot::channel(); self.send(CoreClientCommand::Command( Command::AcceptSubscriptionRequest(jid, send), )) .await .map_err(|e| CommandError::Actor(Into::::into(e)))?; let result = timeout(self.timeout, recv) .await .map_err(|e| CommandError::Actor(Into::::into(e)))? .map_err(|e| CommandError::Actor(Into::::into(e)))??; Ok(result) } pub async fn unsubscribe_from_contact(&self, jid: JID) -> Result<(), CommandError> { let (send, recv) = oneshot::channel(); self.send(CoreClientCommand::Command(Command::UnsubscribeFromContact( jid, send, ))) .await .map_err(|e| CommandError::Actor(Into::::into(e)))?; let result = timeout(self.timeout, recv) .await .map_err(|e| CommandError::Actor(Into::::into(e)))? .map_err(|e| CommandError::Actor(Into::::into(e)))??; Ok(result) } pub async fn unsubscribe_contact(&self, jid: JID) -> Result<(), CommandError> { let (send, recv) = oneshot::channel(); self.send(CoreClientCommand::Command(Command::UnsubscribeContact( jid, send, ))) .await .map_err(|e| CommandError::Actor(Into::::into(e)))?; let result = timeout(self.timeout, recv) .await .map_err(|e| CommandError::Actor(Into::::into(e)))? .map_err(|e| CommandError::Actor(Into::::into(e)))??; Ok(result) } pub async fn unfriend_contact(&self, jid: JID) -> Result<(), CommandError> { let (send, recv) = oneshot::channel(); self.send(CoreClientCommand::Command(Command::UnfriendContact( jid, send, ))) .await .map_err(|e| CommandError::Actor(Into::::into(e)))?; let result = timeout(self.timeout, recv) .await .map_err(|e| CommandError::Actor(Into::::into(e)))? .map_err(|e| CommandError::Actor(Into::::into(e)))??; Ok(result) } pub async fn delete_contact(&self, jid: JID) -> Result<(), CommandError> { let (send, recv) = oneshot::channel(); self.send(CoreClientCommand::Command(Command::DeleteContact( jid, send, ))) .await .map_err(|e| CommandError::Actor(Into::::into(e)))?; let result = timeout(self.timeout, recv) .await .map_err(|e| CommandError::Actor(Into::::into(e)))? .map_err(|e| CommandError::Actor(Into::::into(e)))??; Ok(result) } pub async fn update_contact( &self, jid: JID, update: ContactUpdate, ) -> Result<(), CommandError> { let (send, recv) = oneshot::channel(); self.send(CoreClientCommand::Command(Command::UpdateContact( jid, update, send, ))) .await .map_err(|e| CommandError::Actor(Into::::into(e)))?; let result = timeout(self.timeout, recv) .await .map_err(|e| CommandError::Actor(Into::::into(e)))? .map_err(|e| CommandError::Actor(Into::::into(e)))??; Ok(result) } pub async fn set_status(&self, online: Online) -> Result<(), CommandError> { let (send, recv) = oneshot::channel(); self.send(CoreClientCommand::Command(Command::SetStatus(online, send))) .await .map_err(|e| CommandError::Actor(Into::::into(e)))?; let result = timeout(self.timeout, recv) .await .map_err(|e| CommandError::Actor(Into::::into(e)))? .map_err(|e| CommandError::Actor(Into::::into(e)))??; Ok(result) } pub async fn send_message(&self, jid: JID, body: Body) -> Result<(), ActorError> { self.send(CoreClientCommand::Command(Command::SendMessage(jid, body))) .await?; Ok(()) } pub async fn disco_info( &self, jid: Option, node: Option, ) -> Result> { let (send, recv) = oneshot::channel(); self.send(CoreClientCommand::Command(Command::DiscoInfo( jid, node, send, ))) .await .map_err(|e| CommandError::Actor(Into::::into(e)))?; let result = timeout(self.timeout, recv) .await .map_err(|e| CommandError::Actor(Into::::into(e)))? .map_err(|e| CommandError::Actor(Into::::into(e)))??; Ok(result) } pub async fn disco_items( &self, jid: Option, node: Option, ) -> Result> { let (send, recv) = oneshot::channel(); self.send(CoreClientCommand::Command(Command::DiscoItems( jid, node, send, ))) .await .map_err(|e| CommandError::Actor(Into::::into(e)))?; let result = timeout(self.timeout, recv) .await .map_err(|e| CommandError::Actor(Into::::into(e)))? .map_err(|e| CommandError::Actor(Into::::into(e)))??; Ok(result) } pub async fn publish( &self, item: pep::Item, node: String, ) -> Result<(), CommandError> { let (send, recv) = oneshot::channel(); self.send(CoreClientCommand::Command(Command::PublishPEPItem { item, node, sender: send, })) .await .map_err(|e| CommandError::Actor(Into::::into(e)))?; let result = timeout(self.timeout, recv) .await .map_err(|e| CommandError::Actor(Into::::into(e)))? .map_err(|e| CommandError::Actor(Into::::into(e)))??; Ok(result) } pub async fn delete_pep_node(&self, node: String) -> Result<(), CommandError> { let (send, recv) = oneshot::channel(); self.send(CoreClientCommand::Command(Command::DeletePEPNode { node, sender: send, })) .await .map_err(|e| CommandError::Actor(Into::::into(e)))?; let result = timeout(self.timeout, recv) .await .map_err(|e| CommandError::Actor(Into::::into(e)))? .map_err(|e| CommandError::Actor(Into::::into(e)))??; Ok(result) } pub async fn get_pep_item( &self, jid: Option, node: String, id: String, ) -> Result> { let (send, recv) = oneshot::channel(); self.send(CoreClientCommand::Command(Command::GetPEPItem { jid, node, id, sender: send, })) .await .map_err(|e| CommandError::Actor(Into::::into(e)))?; let result = timeout(self.timeout, recv) .await .map_err(|e| CommandError::Actor(Into::::into(e)))? .map_err(|e| CommandError::Actor(Into::::into(e)))??; Ok(result) } pub async fn change_nick(&self, nick: Option) -> Result<(), CommandError> { let (send, recv) = oneshot::channel(); self.send(CoreClientCommand::Command(Command::ChangeNick(nick, send))) .await .map_err(|e| CommandError::Actor(Into::::into(e)))?; let result = timeout(self.timeout, recv) .await .map_err(|e| CommandError::Actor(Into::::into(e)))? .map_err(|e| CommandError::Actor(Into::::into(e)))??; Ok(result) } pub async fn change_avatar( &self, avatar: Option>, ) -> Result<(), CommandError>> { let (send, recv) = oneshot::channel(); self.send(CoreClientCommand::Command(Command::ChangeAvatar( avatar, send, ))) .await .map_err(|e| CommandError::Actor(Into::::into(e)))?; let result = timeout(self.timeout, recv) .await .map_err(|e| CommandError::Actor(Into::::into(e)))? .map_err(|e| CommandError::Actor(Into::::into(e)))??; Ok(result) } } impl From> for CoreClientCommand> { fn from(value: Command) -> Self { CoreClientCommand::Command(value) } }