diff options
Diffstat (limited to 'filamento/src/lib.rs')
-rw-r--r-- | filamento/src/lib.rs | 1598 |
1 files changed, 1598 insertions, 0 deletions
diff --git a/filamento/src/lib.rs b/filamento/src/lib.rs new file mode 100644 index 0000000..db59a67 --- /dev/null +++ b/filamento/src/lib.rs @@ -0,0 +1,1598 @@ +use std::{ + collections::HashMap, + ops::{Deref, DerefMut}, + str::FromStr, + sync::Arc, + time::Duration, +}; + +use chat::{Body, Chat, Message}; +use chrono::Utc; +use db::Db; +use error::{ + ConnectionJobError, DatabaseError, Error, IqError, MessageRecvError, PresenceError, + RosterError, StatusError, +}; +use futures::FutureExt; +use jid::JID; +use lampada::{ + Connected, CoreClient, CoreClientCommand, Logic, SupervisorSender, WriteMessage, + error::{ActorError, CommandError, ConnectionError, ReadError, WriteError}, +}; +use presence::{Offline, Online, Presence, PresenceType, Show}; +use roster::{Contact, ContactUpdate}; +use stanza::client::{ + Stanza, + iq::{self, Iq, IqType}, +}; +use tokio::{ + sync::{Mutex, mpsc, oneshot}, + time::timeout, +}; +use tracing::{debug, info}; +use user::User; +use uuid::Uuid; + +pub mod chat; +pub mod db; +pub mod error; +pub mod presence; +pub mod roster; +pub mod user; + +pub enum Command { + /// get the roster. if offline, retreive cached version from database. should be stored in application memory + GetRoster(oneshot::Sender<Result<Vec<Contact>, RosterError>>), + /// get all chats. chat will include 10 messages in their message Vec (enough for chat previews) + // TODO: paging and filtering + GetChats(oneshot::Sender<Result<Vec<Chat>, DatabaseError>>), + // TODO: paging and filtering + GetChatsOrdered(oneshot::Sender<Result<Vec<Chat>, DatabaseError>>), + // TODO: paging and filtering + GetChatsOrderedWithLatestMessages(oneshot::Sender<Result<Vec<(Chat, Message)>, DatabaseError>>), + /// get a specific chat by jid + GetChat(JID, oneshot::Sender<Result<Chat, DatabaseError>>), + /// get message history for chat (does appropriate mam things) + // TODO: paging and filtering + GetMessages(JID, oneshot::Sender<Result<Vec<Message>, DatabaseError>>), + /// delete a chat from your chat history, along with all the corresponding messages + DeleteChat(JID, oneshot::Sender<Result<(), DatabaseError>>), + /// delete a message from your chat history + DeleteMessage(Uuid, oneshot::Sender<Result<(), DatabaseError>>), + /// get a user from your users database + GetUser(JID, oneshot::Sender<Result<User, DatabaseError>>), + /// add a contact to your roster, with a status of none, no subscriptions. + AddContact(JID, oneshot::Sender<Result<(), RosterError>>), + /// send a friend request i.e. a subscription request with a subscription pre-approval. if not already added to roster server adds to roster. + BuddyRequest(JID, oneshot::Sender<Result<(), WriteError>>), + /// send a subscription request, without pre-approval. if not already added to roster server adds to roster. + SubscriptionRequest(JID, oneshot::Sender<Result<(), WriteError>>), + /// accept a friend request by accepting a pending subscription and sending a subscription request back. if not already added to roster adds to roster. + AcceptBuddyRequest(JID, oneshot::Sender<Result<(), WriteError>>), + /// accept a pending subscription and doesn't send a subscription request back. if not already added to roster adds to roster. + AcceptSubscriptionRequest(JID, oneshot::Sender<Result<(), WriteError>>), + /// unsubscribe to a contact, but don't remove their subscription. + UnsubscribeFromContact(JID, oneshot::Sender<Result<(), WriteError>>), + /// stop a contact from being subscribed, but stay subscribed to the contact. + UnsubscribeContact(JID, oneshot::Sender<Result<(), WriteError>>), + /// remove subscriptions to and from contact, but keep in roster. + UnfriendContact(JID, oneshot::Sender<Result<(), WriteError>>), + /// remove a contact from the contact list. will remove subscriptions if not already done then delete contact from roster. + DeleteContact(JID, oneshot::Sender<Result<(), RosterError>>), + /// update contact. contact details will be overwritten with the contents of the contactupdate struct. + UpdateContact(JID, ContactUpdate, oneshot::Sender<Result<(), RosterError>>), + /// set online status. if disconnected, will be cached so when client connects, will be sent as the initial presence. + SetStatus(Online, oneshot::Sender<Result<(), StatusError>>), + /// send presence stanza + // TODO: cache presence stanza + SendPresence( + Option<JID>, + PresenceType, + oneshot::Sender<Result<(), WriteError>>, + ), + /// send a directed presence (usually to a non-contact). + // TODO: should probably make it so people can add non-contact auto presence sharing in the client (most likely through setting an internal setting) + /// send a message to a jid (any kind of jid that can receive a message, e.g. a user or a + /// chatroom). if disconnected, will be cached so when client connects, message will be sent. + SendMessage(JID, Body, oneshot::Sender<Result<(), WriteError>>), +} +/// an xmpp client that is suited for a chat client use case +#[derive(Debug)] +pub struct Client { + sender: mpsc::Sender<CoreClientCommand<Command>>, + timeout: Duration, +} + +impl Clone for Client { + fn clone(&self) -> Self { + Self { + sender: self.sender.clone(), + timeout: self.timeout, + } + } +} + +impl Deref for Client { + type Target = mpsc::Sender<CoreClientCommand<Command>>; + + fn deref(&self) -> &Self::Target { + &self.sender + } +} + +impl DerefMut for Client { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.sender + } +} + +impl Client { + pub async fn connect(&self) -> Result<(), ActorError> { + self.send(CoreClientCommand::Connect).await?; + Ok(()) + } + + pub async fn disconnect(&self, offline: Offline) -> Result<(), ActorError> { + self.send(CoreClientCommand::Disconnect).await?; + Ok(()) + } + + pub fn new(jid: JID, password: String, db: Db) -> (Self, mpsc::Receiver<UpdateMessage>) { + let (command_sender, command_receiver) = mpsc::channel(20); + let (update_send, update_recv) = mpsc::channel(20); + + // might be bad, first supervisor shutdown notification oneshot is never used (disgusting) + let (_sup_send, sup_recv) = oneshot::channel(); + let sup_recv = sup_recv.fuse(); + + let logic = ClientLogic { + db, + pending: Arc::new(Mutex::new(HashMap::new())), + update_sender: update_send, + }; + + let actor: CoreClient<ClientLogic> = + CoreClient::new(jid, password, command_receiver, None, sup_recv, logic); + tokio::spawn(async move { actor.run().await }); + + ( + Self { + sender: command_sender, + // TODO: configure timeout + timeout: Duration::from_secs(10), + }, + update_recv, + ) + } + + pub async fn get_roster(&self) -> Result<Vec<Contact>, CommandError<RosterError>> { + let (send, recv) = oneshot::channel(); + self.send(CoreClientCommand::Command(Command::GetRoster(send))) + .await + .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?; + let roster = timeout(self.timeout, recv) + .await + .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))? + .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??; + Ok(roster) + } + + pub async fn get_chats(&self) -> Result<Vec<Chat>, CommandError<DatabaseError>> { + let (send, recv) = oneshot::channel(); + self.send(CoreClientCommand::Command(Command::GetChats(send))) + .await + .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?; + let chats = timeout(self.timeout, recv) + .await + .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))? + .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??; + Ok(chats) + } + + pub async fn get_chats_ordered(&self) -> Result<Vec<Chat>, CommandError<DatabaseError>> { + let (send, recv) = oneshot::channel(); + self.send(CoreClientCommand::Command(Command::GetChatsOrdered(send))) + .await + .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?; + let chats = timeout(self.timeout, recv) + .await + .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))? + .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??; + Ok(chats) + } + + pub async fn get_chats_ordered_with_latest_messages( + &self, + ) -> Result<Vec<(Chat, Message)>, CommandError<DatabaseError>> { + let (send, recv) = oneshot::channel(); + self.send(CoreClientCommand::Command( + Command::GetChatsOrderedWithLatestMessages(send), + )) + .await + .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?; + let chats = timeout(self.timeout, recv) + .await + .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))? + .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??; + Ok(chats) + } + + pub async fn get_chat(&self, jid: JID) -> Result<Chat, CommandError<DatabaseError>> { + let (send, recv) = oneshot::channel(); + self.send(CoreClientCommand::Command(Command::GetChat(jid, send))) + .await + .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?; + let chat = timeout(self.timeout, recv) + .await + .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))? + .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??; + Ok(chat) + } + + pub async fn get_messages( + &self, + jid: JID, + ) -> Result<Vec<Message>, CommandError<DatabaseError>> { + let (send, recv) = oneshot::channel(); + self.send(CoreClientCommand::Command(Command::GetMessages(jid, send))) + .await + .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?; + let messages = timeout(self.timeout, recv) + .await + .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))? + .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??; + Ok(messages) + } + + pub async fn delete_chat(&self, jid: JID) -> Result<(), CommandError<DatabaseError>> { + let (send, recv) = oneshot::channel(); + self.send(CoreClientCommand::Command(Command::DeleteChat(jid, send))) + .await + .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?; + let result = timeout(self.timeout, recv) + .await + .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))? + .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??; + Ok(result) + } + + pub async fn delete_message(&self, id: Uuid) -> Result<(), CommandError<DatabaseError>> { + let (send, recv) = oneshot::channel(); + self.send(CoreClientCommand::Command(Command::DeleteMessage(id, send))) + .await + .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?; + let result = timeout(self.timeout, recv) + .await + .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))? + .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??; + Ok(result) + } + + pub async fn get_user(&self, jid: JID) -> Result<User, CommandError<DatabaseError>> { + let (send, recv) = oneshot::channel(); + self.send(CoreClientCommand::Command(Command::GetUser(jid, send))) + .await + .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?; + let result = timeout(self.timeout, recv) + .await + .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))? + .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??; + Ok(result) + } + + pub async fn add_contact(&self, jid: JID) -> Result<(), CommandError<RosterError>> { + let (send, recv) = oneshot::channel(); + self.send(CoreClientCommand::Command(Command::AddContact(jid, send))) + .await + .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?; + let result = timeout(self.timeout, recv) + .await + .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))? + .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??; + Ok(result) + } + + pub async fn buddy_request(&self, jid: JID) -> Result<(), CommandError<WriteError>> { + let (send, recv) = oneshot::channel(); + self.send(CoreClientCommand::Command(Command::BuddyRequest(jid, send))) + .await + .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?; + let result = timeout(self.timeout, recv) + .await + .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))? + .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??; + Ok(result) + } + + pub async fn subscription_request(&self, jid: JID) -> Result<(), CommandError<WriteError>> { + let (send, recv) = oneshot::channel(); + self.send(CoreClientCommand::Command(Command::SubscriptionRequest( + jid, send, + ))) + .await + .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?; + let result = timeout(self.timeout, recv) + .await + .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))? + .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??; + Ok(result) + } + + pub async fn accept_buddy_request(&self, jid: JID) -> Result<(), CommandError<WriteError>> { + let (send, recv) = oneshot::channel(); + self.send(CoreClientCommand::Command(Command::AcceptBuddyRequest( + jid, send, + ))) + .await + .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?; + let result = timeout(self.timeout, recv) + .await + .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))? + .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??; + Ok(result) + } + + pub async fn accept_subscription_request( + &self, + jid: JID, + ) -> Result<(), CommandError<WriteError>> { + let (send, recv) = oneshot::channel(); + self.send(CoreClientCommand::Command( + Command::AcceptSubscriptionRequest(jid, send), + )) + .await + .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?; + let result = timeout(self.timeout, recv) + .await + .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))? + .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??; + Ok(result) + } + + pub async fn unsubscribe_from_contact(&self, jid: JID) -> Result<(), CommandError<WriteError>> { + let (send, recv) = oneshot::channel(); + self.send(CoreClientCommand::Command(Command::UnsubscribeFromContact( + jid, send, + ))) + .await + .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?; + let result = timeout(self.timeout, recv) + .await + .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))? + .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??; + Ok(result) + } + + pub async fn unsubscribe_contact(&self, jid: JID) -> Result<(), CommandError<WriteError>> { + let (send, recv) = oneshot::channel(); + self.send(CoreClientCommand::Command(Command::UnsubscribeContact( + jid, send, + ))) + .await + .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?; + let result = timeout(self.timeout, recv) + .await + .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))? + .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??; + Ok(result) + } + + pub async fn unfriend_contact(&self, jid: JID) -> Result<(), CommandError<WriteError>> { + let (send, recv) = oneshot::channel(); + self.send(CoreClientCommand::Command(Command::UnfriendContact( + jid, send, + ))) + .await + .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?; + let result = timeout(self.timeout, recv) + .await + .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))? + .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??; + Ok(result) + } + + pub async fn delete_contact(&self, jid: JID) -> Result<(), CommandError<RosterError>> { + let (send, recv) = oneshot::channel(); + self.send(CoreClientCommand::Command(Command::DeleteContact( + jid, send, + ))) + .await + .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?; + let result = timeout(self.timeout, recv) + .await + .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))? + .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??; + Ok(result) + } + + pub async fn update_contact( + &self, + jid: JID, + update: ContactUpdate, + ) -> Result<(), CommandError<RosterError>> { + let (send, recv) = oneshot::channel(); + self.send(CoreClientCommand::Command(Command::UpdateContact( + jid, update, send, + ))) + .await + .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?; + let result = timeout(self.timeout, recv) + .await + .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))? + .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??; + Ok(result) + } + + pub async fn set_status(&self, online: Online) -> Result<(), CommandError<StatusError>> { + let (send, recv) = oneshot::channel(); + self.send(CoreClientCommand::Command(Command::SetStatus(online, send))) + .await + .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?; + let result = timeout(self.timeout, recv) + .await + .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))? + .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??; + Ok(result) + } + + pub async fn send_message(&self, jid: JID, body: Body) -> Result<(), CommandError<WriteError>> { + let (send, recv) = oneshot::channel(); + self.send(CoreClientCommand::Command(Command::SendMessage( + jid, body, send, + ))) + .await + .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?; + let result = timeout(self.timeout, recv) + .await + .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))? + .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??; + Ok(result) + } +} + +#[derive(Clone)] +pub struct ClientLogic { + db: Db, + pending: Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, ReadError>>>>>, + update_sender: mpsc::Sender<UpdateMessage>, +} + +impl Logic for ClientLogic { + type Cmd = Command; + + async fn handle_connect(self, connection: Connected) { + let (send, recv) = oneshot::channel(); + debug!("getting roster"); + self.clone() + .handle_online(Command::GetRoster(send), connection.clone()) + .await; + debug!("sent roster req"); + let roster = recv.await; + debug!("got roster"); + match roster { + Ok(r) => match r { + Ok(roster) => { + let online = self.db.read_cached_status().await; + let online = match online { + Ok(online) => online, + Err(e) => { + let _ = self + .update_sender + .send(UpdateMessage::Error(Error::Connecting( + ConnectionJobError::StatusCacheError(e.into()), + ))) + .await; + Online::default() + } + }; + let (send, recv) = oneshot::channel(); + self.clone() + .handle_online( + Command::SendPresence(None, PresenceType::Online(online.clone()), send), + connection, + ) + .await; + let set_status = recv.await; + match set_status { + Ok(s) => match s { + Ok(()) => { + let _ = self + .update_sender + .send(UpdateMessage::Online(online, roster)) + .await; + } + Err(e) => { + let _ = self + .update_sender + .send(UpdateMessage::Error(Error::Connecting(e.into()))) + .await; + } + }, + Err(e) => { + let _ = self + .update_sender + .send(UpdateMessage::Error(Error::Connecting( + ConnectionJobError::SendPresence(WriteError::Actor(e.into())), + ))) + .await; + } + } + } + Err(e) => { + let _ = self + .update_sender + .send(UpdateMessage::Error(Error::Connecting(e.into()))) + .await; + } + }, + Err(e) => { + let _ = self + .update_sender + .send(UpdateMessage::Error(Error::Connecting( + ConnectionJobError::RosterRetreival(RosterError::Write(WriteError::Actor( + e.into(), + ))), + ))) + .await; + } + } + } + + async fn handle_disconnect(self, connection: Connected) { + // TODO: be able to set offline status message + let offline_presence: stanza::client::presence::Presence = + Offline::default().into_stanza(None); + let stanza = Stanza::Presence(offline_presence); + // TODO: timeout and error check + connection.write_handle().write(stanza).await; + let _ = self + .update_sender + .send(UpdateMessage::Offline(Offline::default())) + .await; + } + + async fn handle_stanza( + self, + stanza: Stanza, + connection: Connected, + supervisor: SupervisorSender, + ) { + match stanza { + Stanza::Message(stanza_message) => { + if let Some(mut from) = stanza_message.from { + // TODO: don't ignore delay from. xep says SHOULD send error if incorrect. + let timestamp = stanza_message + .delay + .map(|delay| delay.stamp) + .unwrap_or_else(|| Utc::now()); + // TODO: group chat messages + let mut message = Message { + id: stanza_message + .id + // TODO: proper id storage + .map(|id| Uuid::from_str(&id).unwrap_or_else(|_| Uuid::new_v4())) + .unwrap_or_else(|| Uuid::new_v4()), + from: from.clone(), + timestamp, + body: Body { + // TODO: should this be an option? + body: stanza_message + .body + .map(|body| body.body) + .unwrap_or_default() + .unwrap_or_default(), + }, + }; + // TODO: can this be more efficient? + let result = self + .db + .create_message_with_user_resource_and_chat(message.clone(), from.clone()) + .await; + if let Err(e) = result { + tracing::error!("messagecreate"); + let _ = self + .update_sender + .send(UpdateMessage::Error(Error::MessageRecv( + MessageRecvError::MessageHistory(e.into()), + ))) + .await; + } + message.from = message.from.as_bare(); + from = from.as_bare(); + let _ = self + .update_sender + .send(UpdateMessage::Message { to: from, message }) + .await; + } else { + let _ = self + .update_sender + .send(UpdateMessage::Error(Error::MessageRecv( + MessageRecvError::MissingFrom, + ))) + .await; + } + } + Stanza::Presence(presence) => { + if let Some(from) = presence.from { + match presence.r#type { + Some(r#type) => match r#type { + // error processing a presence from somebody + stanza::client::presence::PresenceType::Error => { + // TODO: is there any other information that should go with the error? also MUST have an error, otherwise it's a different error. maybe it shoulnd't be an option. + let _ = self + .update_sender + .send(UpdateMessage::Error(Error::Presence( + // TODO: ughhhhhhhhhhhhh these stanza errors should probably just have an option, and custom display + PresenceError::StanzaError( + presence + .errors + .first() + .cloned() + .expect("error MUST have error"), + ), + ))) + .await; + } + // should not happen (error to server) + stanza::client::presence::PresenceType::Probe => { + // TODO: should probably write an error and restart stream + let _ = self + .update_sender + .send(UpdateMessage::Error(Error::Presence( + PresenceError::Unsupported, + ))) + .await; + } + stanza::client::presence::PresenceType::Subscribe => { + // may get a subscription request from somebody who is not a contact!!! therefore should be its own kind of event + let _ = self + .update_sender + .send(UpdateMessage::SubscriptionRequest(from)) + .await; + } + stanza::client::presence::PresenceType::Unavailable => { + let offline = Offline { + status: presence.status.map(|status| status.status.0), + }; + let timestamp = presence + .delay + .map(|delay| delay.stamp) + .unwrap_or_else(|| Utc::now()); + let _ = self + .update_sender + .send(UpdateMessage::Presence { + from, + presence: Presence { + timestamp, + presence: PresenceType::Offline(offline), + }, + }) + .await; + } + // for now, do nothing, as these are simply informational. will receive roster push from the server regarding the changes to do with them. + stanza::client::presence::PresenceType::Subscribed => {} + stanza::client::presence::PresenceType::Unsubscribe => {} + stanza::client::presence::PresenceType::Unsubscribed => {} + }, + None => { + let online = Online { + show: presence.show.map(|show| match show { + stanza::client::presence::Show::Away => Show::Away, + stanza::client::presence::Show::Chat => Show::Chat, + stanza::client::presence::Show::Dnd => Show::DoNotDisturb, + stanza::client::presence::Show::Xa => Show::ExtendedAway, + }), + status: presence.status.map(|status| status.status.0), + priority: presence.priority.map(|priority| priority.0), + }; + let timestamp = presence + .delay + .map(|delay| delay.stamp) + .unwrap_or_else(|| Utc::now()); + let _ = self + .update_sender + .send(UpdateMessage::Presence { + from, + presence: Presence { + timestamp, + presence: PresenceType::Online(online), + }, + }) + .await; + } + } + } else { + let _ = self + .update_sender + .send(UpdateMessage::Error(Error::Presence( + PresenceError::MissingFrom, + ))) + .await; + } + } + Stanza::Iq(iq) => match iq.r#type { + stanza::client::iq::IqType::Error | stanza::client::iq::IqType::Result => { + let send; + { + send = self.pending.lock().await.remove(&iq.id); + } + if let Some(send) = send { + send.send(Ok(Stanza::Iq(iq))); + } else { + let _ = self + .update_sender + .send(UpdateMessage::Error(Error::Iq(IqError::NoMatchingId( + iq.id, + )))) + .await; + } + } + // TODO: send unsupported to server + // TODO: proper errors i am so tired please + stanza::client::iq::IqType::Get => {} + stanza::client::iq::IqType::Set => { + if let Some(query) = iq.query { + match query { + stanza::client::iq::Query::Roster(mut query) => { + // TODO: there should only be one + if let Some(item) = query.items.pop() { + match item.subscription { + Some(stanza::roster::Subscription::Remove) => { + self.db.delete_contact(item.jid.clone()).await; + self.update_sender + .send(UpdateMessage::RosterDelete(item.jid)) + .await; + // TODO: send result + } + _ => { + let contact: Contact = item.into(); + if let Err(e) = + self.db.upsert_contact(contact.clone()).await + { + let _ = self + .update_sender + .send(UpdateMessage::Error(Error::Roster( + RosterError::Cache(e.into()), + ))) + .await; + } + let _ = self + .update_sender + .send(UpdateMessage::RosterUpdate(contact)) + .await; + // TODO: send result + // write_handle.write(Stanza::Iq(stanza::client::iq::Iq { + // from: , + // id: todo!(), + // to: todo!(), + // r#type: todo!(), + // lang: todo!(), + // query: todo!(), + // errors: todo!(), + // })); + } + } + } + } + // TODO: send unsupported to server + _ => {} + } + } else { + // TODO: send error (unsupported) to server + } + } + }, + Stanza::Error(error) => { + let _ = self + .update_sender + .send(UpdateMessage::Error(Error::Stream(error))) + .await; + // TODO: reconnect + } + Stanza::OtherContent(content) => { + let _ = self + .update_sender + .send(UpdateMessage::Error(Error::UnrecognizedContent)); + // TODO: send error to write_thread + } + } + } + + async fn handle_online(self, command: Command, connection: Connected) { + match command { + Command::GetRoster(result_sender) => { + // TODO: jid resource should probably be stored within the connection + debug!("before client_jid lock"); + debug!("after client_jid lock"); + let iq_id = Uuid::new_v4().to_string(); + let (send, iq_recv) = oneshot::channel(); + { + self.pending.lock().await.insert(iq_id.clone(), send); + } + let stanza = Stanza::Iq(Iq { + from: Some(connection.jid().clone()), + id: iq_id.to_string(), + to: None, + r#type: IqType::Get, + lang: None, + query: Some(iq::Query::Roster(stanza::roster::Query { + ver: None, + items: Vec::new(), + })), + errors: Vec::new(), + }); + let (send, recv) = oneshot::channel(); + let _ = connection + .write_handle() + .send(WriteMessage { + stanza, + respond_to: send, + }) + .await; + // TODO: timeout + match recv.await { + Ok(Ok(())) => info!("roster request sent"), + Ok(Err(e)) => { + // TODO: log errors if fail to send + let _ = result_sender.send(Err(RosterError::Write(e.into()))); + return; + } + Err(e) => { + let _ = result_sender + .send(Err(RosterError::Write(WriteError::Actor(e.into())))); + return; + } + }; + // TODO: timeout + match iq_recv.await { + Ok(Ok(stanza)) => match stanza { + Stanza::Iq(Iq { + from: _, + id, + to: _, + r#type, + lang: _, + query: Some(iq::Query::Roster(stanza::roster::Query { ver: _, items })), + errors: _, + }) if id == iq_id && r#type == IqType::Result => { + let contacts: Vec<Contact> = + items.into_iter().map(|item| item.into()).collect(); + if let Err(e) = self.db.replace_cached_roster(contacts.clone()).await { + self.update_sender + .send(UpdateMessage::Error(Error::Roster(RosterError::Cache( + e.into(), + )))) + .await; + }; + result_sender.send(Ok(contacts)); + return; + } + ref s @ Stanza::Iq(Iq { + from: _, + ref id, + to: _, + r#type, + lang: _, + query: _, + ref errors, + }) if *id == iq_id && r#type == IqType::Error => { + if let Some(error) = errors.first() { + result_sender.send(Err(RosterError::StanzaError(error.clone()))); + } else { + result_sender.send(Err(RosterError::UnexpectedStanza(s.clone()))); + } + return; + } + s => { + result_sender.send(Err(RosterError::UnexpectedStanza(s))); + return; + } + }, + Ok(Err(e)) => { + result_sender.send(Err(RosterError::Read(e))); + return; + } + Err(e) => { + result_sender.send(Err(RosterError::Write(WriteError::Actor(e.into())))); + return; + } + } + } + Command::GetChats(sender) => { + let chats = self.db.read_chats().await.map_err(|e| e.into()); + sender.send(chats); + } + Command::GetChatsOrdered(sender) => { + let chats = self.db.read_chats_ordered().await.map_err(|e| e.into()); + sender.send(chats); + } + Command::GetChatsOrderedWithLatestMessages(sender) => { + let chats = self + .db + .read_chats_ordered_with_latest_messages() + .await + .map_err(|e| e.into()); + sender.send(chats); + } + Command::GetChat(jid, sender) => { + let chats = self.db.read_chat(jid).await.map_err(|e| e.into()); + sender.send(chats); + } + Command::GetMessages(jid, sender) => { + let messages = self + .db + .read_message_history(jid) + .await + .map_err(|e| e.into()); + sender.send(messages); + } + Command::DeleteChat(jid, sender) => { + let result = self.db.delete_chat(jid).await.map_err(|e| e.into()); + sender.send(result); + } + Command::DeleteMessage(uuid, sender) => { + let result = self.db.delete_message(uuid).await.map_err(|e| e.into()); + sender.send(result); + } + Command::GetUser(jid, sender) => { + let user = self.db.read_user(jid).await.map_err(|e| e.into()); + sender.send(user); + } + // TODO: offline queue to modify roster + Command::AddContact(jid, sender) => { + let iq_id = Uuid::new_v4().to_string(); + let set_stanza = Stanza::Iq(Iq { + from: Some(connection.jid().clone()), + id: iq_id.clone(), + to: None, + r#type: IqType::Set, + lang: None, + query: Some(iq::Query::Roster(stanza::roster::Query { + ver: None, + items: vec![stanza::roster::Item { + approved: None, + ask: false, + jid, + name: None, + subscription: None, + groups: Vec::new(), + }], + })), + errors: Vec::new(), + }); + let (send, recv) = oneshot::channel(); + { + self.pending.lock().await.insert(iq_id.clone(), send); + } + // TODO: write_handle send helper function + let result = connection.write_handle().write(set_stanza).await; + if let Err(e) = result { + sender.send(Err(RosterError::Write(e))); + return; + } + let iq_result = recv.await; + match iq_result { + Ok(i) => match i { + Ok(iq_result) => match iq_result { + Stanza::Iq(Iq { + from: _, + id, + to: _, + r#type, + lang: _, + query: _, + errors: _, + }) if id == iq_id && r#type == IqType::Result => { + sender.send(Ok(())); + return; + } + ref s @ Stanza::Iq(Iq { + from: _, + ref id, + to: _, + r#type, + lang: _, + query: _, + ref errors, + }) if *id == iq_id && r#type == IqType::Error => { + if let Some(error) = errors.first() { + sender.send(Err(RosterError::StanzaError(error.clone()))); + } else { + sender.send(Err(RosterError::UnexpectedStanza(s.clone()))); + } + return; + } + s => { + sender.send(Err(RosterError::UnexpectedStanza(s))); + return; + } + }, + Err(e) => { + sender.send(Err(e.into())); + return; + } + }, + Err(e) => { + sender.send(Err(RosterError::Write(WriteError::Actor(e.into())))); + return; + } + } + } + Command::BuddyRequest(jid, sender) => { + let presence = Stanza::Presence(stanza::client::presence::Presence { + from: None, + id: None, + to: Some(jid.clone()), + r#type: Some(stanza::client::presence::PresenceType::Subscribe), + lang: None, + show: None, + status: None, + priority: None, + errors: Vec::new(), + delay: None, + }); + let result = connection.write_handle().write(presence).await; + match result { + Err(_) => { + let _ = sender.send(result); + } + Ok(()) => { + let presence = Stanza::Presence(stanza::client::presence::Presence { + from: None, + id: None, + to: Some(jid), + r#type: Some(stanza::client::presence::PresenceType::Subscribed), + lang: None, + show: None, + status: None, + priority: None, + errors: Vec::new(), + delay: None, + }); + let result = connection.write_handle().write(presence).await; + let _ = sender.send(result); + } + } + } + Command::SubscriptionRequest(jid, sender) => { + // TODO: i should probably have builders + let presence = Stanza::Presence(stanza::client::presence::Presence { + from: None, + id: None, + to: Some(jid), + r#type: Some(stanza::client::presence::PresenceType::Subscribe), + lang: None, + show: None, + status: None, + priority: None, + errors: Vec::new(), + delay: None, + }); + let result = connection.write_handle().write(presence).await; + let _ = sender.send(result); + } + Command::AcceptBuddyRequest(jid, sender) => { + let presence = Stanza::Presence(stanza::client::presence::Presence { + from: None, + id: None, + to: Some(jid.clone()), + r#type: Some(stanza::client::presence::PresenceType::Subscribed), + lang: None, + show: None, + status: None, + priority: None, + errors: Vec::new(), + delay: None, + }); + let result = connection.write_handle().write(presence).await; + match result { + Err(_) => { + let _ = sender.send(result); + } + Ok(()) => { + let presence = Stanza::Presence(stanza::client::presence::Presence { + from: None, + id: None, + to: Some(jid), + r#type: Some(stanza::client::presence::PresenceType::Subscribe), + lang: None, + show: None, + status: None, + priority: None, + errors: Vec::new(), + delay: None, + }); + let result = connection.write_handle().write(presence).await; + let _ = sender.send(result); + } + } + } + Command::AcceptSubscriptionRequest(jid, sender) => { + let presence = Stanza::Presence(stanza::client::presence::Presence { + from: None, + id: None, + to: Some(jid), + r#type: Some(stanza::client::presence::PresenceType::Subscribe), + lang: None, + show: None, + status: None, + priority: None, + errors: Vec::new(), + delay: None, + }); + let result = connection.write_handle().write(presence).await; + let _ = sender.send(result); + } + Command::UnsubscribeFromContact(jid, sender) => { + let presence = Stanza::Presence(stanza::client::presence::Presence { + from: None, + id: None, + to: Some(jid), + r#type: Some(stanza::client::presence::PresenceType::Unsubscribe), + lang: None, + show: None, + status: None, + priority: None, + errors: Vec::new(), + delay: None, + }); + let result = connection.write_handle().write(presence).await; + let _ = sender.send(result); + } + Command::UnsubscribeContact(jid, sender) => { + let presence = Stanza::Presence(stanza::client::presence::Presence { + from: None, + id: None, + to: Some(jid), + r#type: Some(stanza::client::presence::PresenceType::Unsubscribed), + lang: None, + show: None, + status: None, + priority: None, + errors: Vec::new(), + delay: None, + }); + let result = connection.write_handle().write(presence).await; + let _ = sender.send(result); + } + Command::UnfriendContact(jid, sender) => { + let presence = Stanza::Presence(stanza::client::presence::Presence { + from: None, + id: None, + to: Some(jid.clone()), + r#type: Some(stanza::client::presence::PresenceType::Unsubscribe), + lang: None, + show: None, + status: None, + priority: None, + errors: Vec::new(), + delay: None, + }); + let result = connection.write_handle().write(presence).await; + match result { + Err(_) => { + let _ = sender.send(result); + } + Ok(()) => { + let presence = Stanza::Presence(stanza::client::presence::Presence { + from: None, + id: None, + to: Some(jid), + r#type: Some(stanza::client::presence::PresenceType::Unsubscribed), + lang: None, + show: None, + status: None, + priority: None, + errors: Vec::new(), + delay: None, + }); + let result = connection.write_handle().write(presence).await; + let _ = sender.send(result); + } + } + } + Command::DeleteContact(jid, sender) => { + let iq_id = Uuid::new_v4().to_string(); + let set_stanza = Stanza::Iq(Iq { + from: Some(connection.jid().clone()), + id: iq_id.clone(), + to: None, + r#type: IqType::Set, + lang: None, + query: Some(iq::Query::Roster(stanza::roster::Query { + ver: None, + items: vec![stanza::roster::Item { + approved: None, + ask: false, + jid, + name: None, + subscription: Some(stanza::roster::Subscription::Remove), + groups: Vec::new(), + }], + })), + errors: Vec::new(), + }); + let (send, recv) = oneshot::channel(); + { + self.pending.lock().await.insert(iq_id.clone(), send); + } + let result = connection.write_handle().write(set_stanza).await; + if let Err(e) = result { + sender.send(Err(RosterError::Write(e))); + return; + } + let iq_result = recv.await; + match iq_result { + Ok(i) => match i { + Ok(iq_result) => match iq_result { + Stanza::Iq(Iq { + from: _, + id, + to: _, + r#type, + lang: _, + query: _, + errors: _, + }) if id == iq_id && r#type == IqType::Result => { + sender.send(Ok(())); + return; + } + ref s @ Stanza::Iq(Iq { + from: _, + ref id, + to: _, + r#type, + lang: _, + query: _, + ref errors, + }) if *id == iq_id && r#type == IqType::Error => { + if let Some(error) = errors.first() { + sender.send(Err(RosterError::StanzaError(error.clone()))); + } else { + sender.send(Err(RosterError::UnexpectedStanza(s.clone()))); + } + return; + } + s => { + sender.send(Err(RosterError::UnexpectedStanza(s))); + return; + } + }, + Err(e) => { + sender.send(Err(e.into())); + return; + } + }, + Err(e) => { + sender.send(Err(RosterError::Write(WriteError::Actor(e.into())))); + return; + } + } + } + Command::UpdateContact(jid, contact_update, sender) => { + let iq_id = Uuid::new_v4().to_string(); + let groups = Vec::from_iter( + contact_update + .groups + .into_iter() + .map(|group| stanza::roster::Group(Some(group))), + ); + let set_stanza = Stanza::Iq(Iq { + from: Some(connection.jid().clone()), + id: iq_id.clone(), + to: None, + r#type: IqType::Set, + lang: None, + query: Some(iq::Query::Roster(stanza::roster::Query { + ver: None, + items: vec![stanza::roster::Item { + approved: None, + ask: false, + jid, + name: contact_update.name, + subscription: None, + groups, + }], + })), + errors: Vec::new(), + }); + let (send, recv) = oneshot::channel(); + { + self.pending.lock().await.insert(iq_id.clone(), send); + } + let result = connection.write_handle().write(set_stanza).await; + if let Err(e) = result { + sender.send(Err(RosterError::Write(e))); + return; + } + let iq_result = recv.await; + match iq_result { + Ok(i) => match i { + Ok(iq_result) => match iq_result { + Stanza::Iq(Iq { + from: _, + id, + to: _, + r#type, + lang: _, + query: _, + errors: _, + }) if id == iq_id && r#type == IqType::Result => { + sender.send(Ok(())); + return; + } + ref s @ Stanza::Iq(Iq { + from: _, + ref id, + to: _, + r#type, + lang: _, + query: _, + ref errors, + }) if *id == iq_id && r#type == IqType::Error => { + if let Some(error) = errors.first() { + sender.send(Err(RosterError::StanzaError(error.clone()))); + } else { + sender.send(Err(RosterError::UnexpectedStanza(s.clone()))); + } + return; + } + s => { + sender.send(Err(RosterError::UnexpectedStanza(s))); + return; + } + }, + Err(e) => { + sender.send(Err(e.into())); + return; + } + }, + Err(e) => { + sender.send(Err(RosterError::Write(WriteError::Actor(e.into())))); + return; + } + } + } + Command::SetStatus(online, sender) => { + let result = self.db.upsert_cached_status(online.clone()).await; + if let Err(e) = result { + let _ = self + .update_sender + .send(UpdateMessage::Error(Error::SetStatus(StatusError::Cache( + e.into(), + )))) + .await; + } + let result = connection + .write_handle() + .write(Stanza::Presence(online.into_stanza(None))) + .await + .map_err(|e| StatusError::Write(e)); + // .map_err(|e| StatusError::Write(e)); + let _ = sender.send(result); + } + // TODO: offline message queue + Command::SendMessage(jid, body, sender) => { + let id = Uuid::new_v4(); + let message = Stanza::Message(stanza::client::message::Message { + from: Some(connection.jid().clone()), + id: Some(id.to_string()), + to: Some(jid.clone()), + // TODO: specify message type + r#type: stanza::client::message::MessageType::Chat, + // TODO: lang ? + lang: None, + subject: None, + body: Some(stanza::client::message::Body { + lang: None, + body: Some(body.body.clone()), + }), + thread: None, + delay: None, + }); + let _ = sender.send(Ok(())); + // let _ = sender.send(Ok(message.clone())); + let result = connection.write_handle().write(message).await; + match result { + Ok(_) => { + let mut message = Message { + id, + from: connection.jid().clone(), + body, + timestamp: Utc::now(), + }; + info!("send message {:?}", message); + if let Err(e) = self + .db + .create_message_with_self_resource_and_chat( + message.clone(), + jid.clone(), + ) + .await + .map_err(|e| e.into()) + { + tracing::error!("{}", e); + let _ = + self.update_sender + .send(UpdateMessage::Error(Error::MessageSend( + error::MessageSendError::MessageHistory(e), + ))); + } + // TODO: don't do this, have separate from from details + message.from = message.from.as_bare(); + let _ = self + .update_sender + .send(UpdateMessage::Message { to: jid, message }) + .await; + } + Err(_) => { + // let _ = sender.send(result); + } + } + } + Command::SendPresence(jid, presence, sender) => { + let mut presence: stanza::client::presence::Presence = presence.into(); + if let Some(jid) = jid { + presence.to = Some(jid); + }; + let result = connection + .write_handle() + .write(Stanza::Presence(presence)) + .await; + // .map_err(|e| StatusError::Write(e)); + let _ = sender.send(result); + } + } + } + + async fn handle_offline(self, command: Command) { + match command { + Command::GetRoster(sender) => { + let roster = self.db.read_cached_roster().await; + match roster { + Ok(roster) => { + let _ = sender.send(Ok(roster)); + } + Err(e) => { + let _ = sender.send(Err(RosterError::Cache(e.into()))); + } + } + } + Command::GetChats(sender) => { + let chats = self.db.read_chats().await.map_err(|e| e.into()); + sender.send(chats); + } + Command::GetChatsOrdered(sender) => { + let chats = self.db.read_chats_ordered().await.map_err(|e| e.into()); + sender.send(chats); + } + Command::GetChatsOrderedWithLatestMessages(sender) => { + let chats = self + .db + .read_chats_ordered_with_latest_messages() + .await + .map_err(|e| e.into()); + sender.send(chats); + } + Command::GetChat(jid, sender) => { + let chats = self.db.read_chat(jid).await.map_err(|e| e.into()); + sender.send(chats); + } + Command::GetMessages(jid, sender) => { + let messages = self + .db + .read_message_history(jid) + .await + .map_err(|e| e.into()); + sender.send(messages); + } + Command::DeleteChat(jid, sender) => { + let result = self.db.delete_chat(jid).await.map_err(|e| e.into()); + sender.send(result); + } + Command::DeleteMessage(uuid, sender) => { + let result = self.db.delete_message(uuid).await.map_err(|e| e.into()); + sender.send(result); + } + Command::GetUser(jid, sender) => { + let user = self.db.read_user(jid).await.map_err(|e| e.into()); + sender.send(user); + } + // TODO: offline queue to modify roster + Command::AddContact(_jid, sender) => { + sender.send(Err(RosterError::Write(WriteError::Disconnected))); + } + Command::BuddyRequest(_jid, sender) => { + sender.send(Err(WriteError::Disconnected)); + } + Command::SubscriptionRequest(_jid, sender) => { + sender.send(Err(WriteError::Disconnected)); + } + Command::AcceptBuddyRequest(_jid, sender) => { + sender.send(Err(WriteError::Disconnected)); + } + Command::AcceptSubscriptionRequest(_jid, sender) => { + sender.send(Err(WriteError::Disconnected)); + } + Command::UnsubscribeFromContact(_jid, sender) => { + sender.send(Err(WriteError::Disconnected)); + } + Command::UnsubscribeContact(_jid, sender) => { + sender.send(Err(WriteError::Disconnected)); + } + Command::UnfriendContact(_jid, sender) => { + sender.send(Err(WriteError::Disconnected)); + } + Command::DeleteContact(_jid, sender) => { + sender.send(Err(RosterError::Write(WriteError::Disconnected))); + } + Command::UpdateContact(_jid, _contact_update, sender) => { + sender.send(Err(RosterError::Write(WriteError::Disconnected))); + } + Command::SetStatus(online, sender) => { + let result = self + .db + .upsert_cached_status(online) + .await + .map_err(|e| StatusError::Cache(e.into())); + sender.send(result); + } + // TODO: offline message queue + Command::SendMessage(_jid, _body, sender) => { + sender.send(Err(WriteError::Disconnected)); + } + Command::SendPresence(_jid, _presence, sender) => { + sender.send(Err(WriteError::Disconnected)); + } + } + } + // pub async fn handle_stream_error(self, error) {} + // stanza errors (recoverable) + // pub async fn handle_error(self, error: Error) {} + // when it aborts, must clear iq map no matter what + async fn on_abort(self) { + let mut iqs = self.pending.lock().await; + for (_id, sender) in iqs.drain() { + let _ = sender.send(Err(ReadError::LostConnection)); + } + } + + async fn handle_connection_error(self, error: ConnectionError) { + self.update_sender + .send(UpdateMessage::Error( + ConnectionError::AlreadyConnected.into(), + )) + .await; + } +} + +impl From<Command> for CoreClientCommand<Command> { + fn from(value: Command) -> Self { + CoreClientCommand::Command(value) + } +} + +#[derive(Debug, Clone)] +pub enum UpdateMessage { + Error(Error), + Online(Online, Vec<Contact>), + Offline(Offline), + /// received roster from jabber server (replace full app roster state with this) + /// is this needed? + FullRoster(Vec<Contact>), + /// (only update app roster state, don't replace) + RosterUpdate(Contact), + RosterDelete(JID), + /// presences should be stored with users in the ui, not contacts, as presences can be received from anyone + Presence { + from: JID, + presence: Presence, + }, + // TODO: receipts + // MessageDispatched(Uuid), + Message { + to: JID, + message: Message, + }, + SubscriptionRequest(jid::JID), +} |