use std::{ collections::HashMap, ops::{Deref, DerefMut}, str::FromStr, sync::Arc, time::Duration, }; use chat::{Body, Chat, Message}; use chrono::Utc; use db::Db; use error::{ ConnectionJobError, DatabaseError, Error, IqError, MessageRecvError, PresenceError, RosterError, StatusError, }; use futures::FutureExt; use jid::JID; use lampada::{ Connected, CoreClient, CoreClientCommand, Logic, SupervisorSender, WriteMessage, error::{ActorError, CommandError, ConnectionError, ReadError, WriteError}, }; use presence::{Offline, Online, Presence, PresenceType, Show}; use roster::{Contact, ContactUpdate}; use stanza::client::{ Stanza, iq::{self, Iq, IqType}, }; use tokio::{ sync::{Mutex, mpsc, oneshot}, time::timeout, }; use tracing::{debug, info}; use user::User; use uuid::Uuid; pub mod chat; pub mod db; pub mod error; pub mod presence; pub mod roster; pub mod user; pub enum Command { /// get the roster. if offline, retreive cached version from database. should be stored in application memory GetRoster(oneshot::Sender<Result<Vec<Contact>, RosterError>>), /// get all chats. chat will include 10 messages in their message Vec (enough for chat previews) // TODO: paging and filtering GetChats(oneshot::Sender<Result<Vec<Chat>, DatabaseError>>), // TODO: paging and filtering GetChatsOrdered(oneshot::Sender<Result<Vec<Chat>, DatabaseError>>), // TODO: paging and filtering GetChatsOrderedWithLatestMessages(oneshot::Sender<Result<Vec<(Chat, Message)>, DatabaseError>>), /// get a specific chat by jid GetChat(JID, oneshot::Sender<Result<Chat, DatabaseError>>), /// get message history for chat (does appropriate mam things) // TODO: paging and filtering GetMessages(JID, oneshot::Sender<Result<Vec<Message>, DatabaseError>>), /// delete a chat from your chat history, along with all the corresponding messages DeleteChat(JID, oneshot::Sender<Result<(), DatabaseError>>), /// delete a message from your chat history DeleteMessage(Uuid, oneshot::Sender<Result<(), DatabaseError>>), /// get a user from your users database GetUser(JID, oneshot::Sender<Result<User, DatabaseError>>), /// add a contact to your roster, with a status of none, no subscriptions. AddContact(JID, oneshot::Sender<Result<(), RosterError>>), /// send a friend request i.e. a subscription request with a subscription pre-approval. if not already added to roster server adds to roster. BuddyRequest(JID, oneshot::Sender<Result<(), WriteError>>), /// send a subscription request, without pre-approval. if not already added to roster server adds to roster. SubscriptionRequest(JID, oneshot::Sender<Result<(), WriteError>>), /// accept a friend request by accepting a pending subscription and sending a subscription request back. if not already added to roster adds to roster. AcceptBuddyRequest(JID, oneshot::Sender<Result<(), WriteError>>), /// accept a pending subscription and doesn't send a subscription request back. if not already added to roster adds to roster. AcceptSubscriptionRequest(JID, oneshot::Sender<Result<(), WriteError>>), /// unsubscribe to a contact, but don't remove their subscription. UnsubscribeFromContact(JID, oneshot::Sender<Result<(), WriteError>>), /// stop a contact from being subscribed, but stay subscribed to the contact. UnsubscribeContact(JID, oneshot::Sender<Result<(), WriteError>>), /// remove subscriptions to and from contact, but keep in roster. UnfriendContact(JID, oneshot::Sender<Result<(), WriteError>>), /// remove a contact from the contact list. will remove subscriptions if not already done then delete contact from roster. DeleteContact(JID, oneshot::Sender<Result<(), RosterError>>), /// update contact. contact details will be overwritten with the contents of the contactupdate struct. UpdateContact(JID, ContactUpdate, oneshot::Sender<Result<(), RosterError>>), /// set online status. if disconnected, will be cached so when client connects, will be sent as the initial presence. SetStatus(Online, oneshot::Sender<Result<(), StatusError>>), /// send presence stanza // TODO: cache presence stanza SendPresence( Option<JID>, PresenceType, oneshot::Sender<Result<(), WriteError>>, ), /// send a directed presence (usually to a non-contact). // TODO: should probably make it so people can add non-contact auto presence sharing in the client (most likely through setting an internal setting) /// send a message to a jid (any kind of jid that can receive a message, e.g. a user or a /// chatroom). if disconnected, will be cached so when client connects, message will be sent. SendMessage(JID, Body, oneshot::Sender<Result<(), WriteError>>), } /// an xmpp client that is suited for a chat client use case #[derive(Debug)] pub struct Client { sender: mpsc::Sender<CoreClientCommand<Command>>, timeout: Duration, } impl Clone for Client { fn clone(&self) -> Self { Self { sender: self.sender.clone(), timeout: self.timeout, } } } impl Deref for Client { type Target = mpsc::Sender<CoreClientCommand<Command>>; fn deref(&self) -> &Self::Target { &self.sender } } impl DerefMut for Client { fn deref_mut(&mut self) -> &mut Self::Target { &mut self.sender } } impl Client { pub async fn connect(&self) -> Result<(), ActorError> { self.send(CoreClientCommand::Connect).await?; Ok(()) } pub async fn disconnect(&self, offline: Offline) -> Result<(), ActorError> { self.send(CoreClientCommand::Disconnect).await?; Ok(()) } pub fn new(jid: JID, password: String, db: Db) -> (Self, mpsc::Receiver<UpdateMessage>) { let (command_sender, command_receiver) = mpsc::channel(20); let (update_send, update_recv) = mpsc::channel(20); // might be bad, first supervisor shutdown notification oneshot is never used (disgusting) let (_sup_send, sup_recv) = oneshot::channel(); let sup_recv = sup_recv.fuse(); let logic = ClientLogic { db, pending: Arc::new(Mutex::new(HashMap::new())), update_sender: update_send, }; let actor: CoreClient<ClientLogic> = CoreClient::new(jid, password, command_receiver, None, sup_recv, logic); tokio::spawn(async move { actor.run().await }); ( Self { sender: command_sender, // TODO: configure timeout timeout: Duration::from_secs(10), }, update_recv, ) } pub async fn get_roster(&self) -> Result<Vec<Contact>, CommandError<RosterError>> { let (send, recv) = oneshot::channel(); self.send(CoreClientCommand::Command(Command::GetRoster(send))) .await .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?; let roster = timeout(self.timeout, recv) .await .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))? .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??; Ok(roster) } pub async fn get_chats(&self) -> Result<Vec<Chat>, CommandError<DatabaseError>> { let (send, recv) = oneshot::channel(); self.send(CoreClientCommand::Command(Command::GetChats(send))) .await .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?; let chats = timeout(self.timeout, recv) .await .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))? .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??; Ok(chats) } pub async fn get_chats_ordered(&self) -> Result<Vec<Chat>, CommandError<DatabaseError>> { let (send, recv) = oneshot::channel(); self.send(CoreClientCommand::Command(Command::GetChatsOrdered(send))) .await .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?; let chats = timeout(self.timeout, recv) .await .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))? .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??; Ok(chats) } pub async fn get_chats_ordered_with_latest_messages( &self, ) -> Result<Vec<(Chat, Message)>, CommandError<DatabaseError>> { let (send, recv) = oneshot::channel(); self.send(CoreClientCommand::Command( Command::GetChatsOrderedWithLatestMessages(send), )) .await .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?; let chats = timeout(self.timeout, recv) .await .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))? .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??; Ok(chats) } pub async fn get_chat(&self, jid: JID) -> Result<Chat, CommandError<DatabaseError>> { let (send, recv) = oneshot::channel(); self.send(CoreClientCommand::Command(Command::GetChat(jid, send))) .await .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?; let chat = timeout(self.timeout, recv) .await .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))? .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??; Ok(chat) } pub async fn get_messages( &self, jid: JID, ) -> Result<Vec<Message>, CommandError<DatabaseError>> { let (send, recv) = oneshot::channel(); self.send(CoreClientCommand::Command(Command::GetMessages(jid, send))) .await .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?; let messages = timeout(self.timeout, recv) .await .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))? .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??; Ok(messages) } pub async fn delete_chat(&self, jid: JID) -> Result<(), CommandError<DatabaseError>> { let (send, recv) = oneshot::channel(); self.send(CoreClientCommand::Command(Command::DeleteChat(jid, send))) .await .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?; let result = timeout(self.timeout, recv) .await .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))? .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??; Ok(result) } pub async fn delete_message(&self, id: Uuid) -> Result<(), CommandError<DatabaseError>> { let (send, recv) = oneshot::channel(); self.send(CoreClientCommand::Command(Command::DeleteMessage(id, send))) .await .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?; let result = timeout(self.timeout, recv) .await .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))? .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??; Ok(result) } pub async fn get_user(&self, jid: JID) -> Result<User, CommandError<DatabaseError>> { let (send, recv) = oneshot::channel(); self.send(CoreClientCommand::Command(Command::GetUser(jid, send))) .await .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?; let result = timeout(self.timeout, recv) .await .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))? .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??; Ok(result) } pub async fn add_contact(&self, jid: JID) -> Result<(), CommandError<RosterError>> { let (send, recv) = oneshot::channel(); self.send(CoreClientCommand::Command(Command::AddContact(jid, send))) .await .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?; let result = timeout(self.timeout, recv) .await .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))? .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??; Ok(result) } pub async fn buddy_request(&self, jid: JID) -> Result<(), CommandError<WriteError>> { let (send, recv) = oneshot::channel(); self.send(CoreClientCommand::Command(Command::BuddyRequest(jid, send))) .await .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?; let result = timeout(self.timeout, recv) .await .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))? .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??; Ok(result) } pub async fn subscription_request(&self, jid: JID) -> Result<(), CommandError<WriteError>> { let (send, recv) = oneshot::channel(); self.send(CoreClientCommand::Command(Command::SubscriptionRequest( jid, send, ))) .await .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?; let result = timeout(self.timeout, recv) .await .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))? .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??; Ok(result) } pub async fn accept_buddy_request(&self, jid: JID) -> Result<(), CommandError<WriteError>> { let (send, recv) = oneshot::channel(); self.send(CoreClientCommand::Command(Command::AcceptBuddyRequest( jid, send, ))) .await .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?; let result = timeout(self.timeout, recv) .await .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))? .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??; Ok(result) } pub async fn accept_subscription_request( &self, jid: JID, ) -> Result<(), CommandError<WriteError>> { let (send, recv) = oneshot::channel(); self.send(CoreClientCommand::Command( Command::AcceptSubscriptionRequest(jid, send), )) .await .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?; let result = timeout(self.timeout, recv) .await .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))? .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??; Ok(result) } pub async fn unsubscribe_from_contact(&self, jid: JID) -> Result<(), CommandError<WriteError>> { let (send, recv) = oneshot::channel(); self.send(CoreClientCommand::Command(Command::UnsubscribeFromContact( jid, send, ))) .await .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?; let result = timeout(self.timeout, recv) .await .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))? .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??; Ok(result) } pub async fn unsubscribe_contact(&self, jid: JID) -> Result<(), CommandError<WriteError>> { let (send, recv) = oneshot::channel(); self.send(CoreClientCommand::Command(Command::UnsubscribeContact( jid, send, ))) .await .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?; let result = timeout(self.timeout, recv) .await .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))? .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??; Ok(result) } pub async fn unfriend_contact(&self, jid: JID) -> Result<(), CommandError<WriteError>> { let (send, recv) = oneshot::channel(); self.send(CoreClientCommand::Command(Command::UnfriendContact( jid, send, ))) .await .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?; let result = timeout(self.timeout, recv) .await .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))? .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??; Ok(result) } pub async fn delete_contact(&self, jid: JID) -> Result<(), CommandError<RosterError>> { let (send, recv) = oneshot::channel(); self.send(CoreClientCommand::Command(Command::DeleteContact( jid, send, ))) .await .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?; let result = timeout(self.timeout, recv) .await .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))? .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??; Ok(result) } pub async fn update_contact( &self, jid: JID, update: ContactUpdate, ) -> Result<(), CommandError<RosterError>> { let (send, recv) = oneshot::channel(); self.send(CoreClientCommand::Command(Command::UpdateContact( jid, update, send, ))) .await .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?; let result = timeout(self.timeout, recv) .await .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))? .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??; Ok(result) } pub async fn set_status(&self, online: Online) -> Result<(), CommandError<StatusError>> { let (send, recv) = oneshot::channel(); self.send(CoreClientCommand::Command(Command::SetStatus(online, send))) .await .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?; let result = timeout(self.timeout, recv) .await .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))? .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??; Ok(result) } pub async fn send_message(&self, jid: JID, body: Body) -> Result<(), CommandError<WriteError>> { let (send, recv) = oneshot::channel(); self.send(CoreClientCommand::Command(Command::SendMessage( jid, body, send, ))) .await .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?; let result = timeout(self.timeout, recv) .await .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))? .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??; Ok(result) } } #[derive(Clone)] pub struct ClientLogic { db: Db, pending: Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, ReadError>>>>>, update_sender: mpsc::Sender<UpdateMessage>, } impl Logic for ClientLogic { type Cmd = Command; async fn handle_connect(self, connection: Connected) { let (send, recv) = oneshot::channel(); debug!("getting roster"); self.clone() .handle_online(Command::GetRoster(send), connection.clone()) .await; debug!("sent roster req"); let roster = recv.await; debug!("got roster"); match roster { Ok(r) => match r { Ok(roster) => { let online = self.db.read_cached_status().await; let online = match online { Ok(online) => online, Err(e) => { let _ = self .update_sender .send(UpdateMessage::Error(Error::Connecting( ConnectionJobError::StatusCacheError(e.into()), ))) .await; Online::default() } }; let (send, recv) = oneshot::channel(); self.clone() .handle_online( Command::SendPresence(None, PresenceType::Online(online.clone()), send), connection, ) .await; let set_status = recv.await; match set_status { Ok(s) => match s { Ok(()) => { let _ = self .update_sender .send(UpdateMessage::Online(online, roster)) .await; } Err(e) => { let _ = self .update_sender .send(UpdateMessage::Error(Error::Connecting(e.into()))) .await; } }, Err(e) => { let _ = self .update_sender .send(UpdateMessage::Error(Error::Connecting( ConnectionJobError::SendPresence(WriteError::Actor(e.into())), ))) .await; } } } Err(e) => { let _ = self .update_sender .send(UpdateMessage::Error(Error::Connecting(e.into()))) .await; } }, Err(e) => { let _ = self .update_sender .send(UpdateMessage::Error(Error::Connecting( ConnectionJobError::RosterRetreival(RosterError::Write(WriteError::Actor( e.into(), ))), ))) .await; } } } async fn handle_disconnect(self, connection: Connected) { // TODO: be able to set offline status message let offline_presence: stanza::client::presence::Presence = Offline::default().into_stanza(None); let stanza = Stanza::Presence(offline_presence); // TODO: timeout and error check connection.write_handle().write(stanza).await; let _ = self .update_sender .send(UpdateMessage::Offline(Offline::default())) .await; } async fn handle_stanza( self, stanza: Stanza, connection: Connected, supervisor: SupervisorSender, ) { match stanza { Stanza::Message(stanza_message) => { if let Some(mut from) = stanza_message.from { // TODO: don't ignore delay from. xep says SHOULD send error if incorrect. let timestamp = stanza_message .delay .map(|delay| delay.stamp) .unwrap_or_else(|| Utc::now()); // TODO: group chat messages let mut message = Message { id: stanza_message .id // TODO: proper id storage .map(|id| Uuid::from_str(&id).unwrap_or_else(|_| Uuid::new_v4())) .unwrap_or_else(|| Uuid::new_v4()), from: from.clone(), timestamp, body: Body { // TODO: should this be an option? body: stanza_message .body .map(|body| body.body) .unwrap_or_default() .unwrap_or_default(), }, }; // TODO: can this be more efficient? let result = self .db .create_message_with_user_resource_and_chat(message.clone(), from.clone()) .await; if let Err(e) = result { tracing::error!("messagecreate"); let _ = self .update_sender .send(UpdateMessage::Error(Error::MessageRecv( MessageRecvError::MessageHistory(e.into()), ))) .await; } message.from = message.from.as_bare(); from = from.as_bare(); let _ = self .update_sender .send(UpdateMessage::Message { to: from, message }) .await; } else { let _ = self .update_sender .send(UpdateMessage::Error(Error::MessageRecv( MessageRecvError::MissingFrom, ))) .await; } } Stanza::Presence(presence) => { if let Some(from) = presence.from { match presence.r#type { Some(r#type) => match r#type { // error processing a presence from somebody stanza::client::presence::PresenceType::Error => { // TODO: is there any other information that should go with the error? also MUST have an error, otherwise it's a different error. maybe it shoulnd't be an option. let _ = self .update_sender .send(UpdateMessage::Error(Error::Presence( // TODO: ughhhhhhhhhhhhh these stanza errors should probably just have an option, and custom display PresenceError::StanzaError( presence .errors .first() .cloned() .expect("error MUST have error"), ), ))) .await; } // should not happen (error to server) stanza::client::presence::PresenceType::Probe => { // TODO: should probably write an error and restart stream let _ = self .update_sender .send(UpdateMessage::Error(Error::Presence( PresenceError::Unsupported, ))) .await; } stanza::client::presence::PresenceType::Subscribe => { // may get a subscription request from somebody who is not a contact!!! therefore should be its own kind of event let _ = self .update_sender .send(UpdateMessage::SubscriptionRequest(from)) .await; } stanza::client::presence::PresenceType::Unavailable => { let offline = Offline { status: presence.status.map(|status| status.status.0), }; let timestamp = presence .delay .map(|delay| delay.stamp) .unwrap_or_else(|| Utc::now()); let _ = self .update_sender .send(UpdateMessage::Presence { from, presence: Presence { timestamp, presence: PresenceType::Offline(offline), }, }) .await; } // for now, do nothing, as these are simply informational. will receive roster push from the server regarding the changes to do with them. stanza::client::presence::PresenceType::Subscribed => {} stanza::client::presence::PresenceType::Unsubscribe => {} stanza::client::presence::PresenceType::Unsubscribed => {} }, None => { let online = Online { show: presence.show.map(|show| match show { stanza::client::presence::Show::Away => Show::Away, stanza::client::presence::Show::Chat => Show::Chat, stanza::client::presence::Show::Dnd => Show::DoNotDisturb, stanza::client::presence::Show::Xa => Show::ExtendedAway, }), status: presence.status.map(|status| status.status.0), priority: presence.priority.map(|priority| priority.0), }; let timestamp = presence .delay .map(|delay| delay.stamp) .unwrap_or_else(|| Utc::now()); let _ = self .update_sender .send(UpdateMessage::Presence { from, presence: Presence { timestamp, presence: PresenceType::Online(online), }, }) .await; } } } else { let _ = self .update_sender .send(UpdateMessage::Error(Error::Presence( PresenceError::MissingFrom, ))) .await; } } Stanza::Iq(iq) => match iq.r#type { stanza::client::iq::IqType::Error | stanza::client::iq::IqType::Result => { let send; { send = self.pending.lock().await.remove(&iq.id); } if let Some(send) = send { send.send(Ok(Stanza::Iq(iq))); } else { let _ = self .update_sender .send(UpdateMessage::Error(Error::Iq(IqError::NoMatchingId( iq.id, )))) .await; } } // TODO: send unsupported to server // TODO: proper errors i am so tired please stanza::client::iq::IqType::Get => {} stanza::client::iq::IqType::Set => { if let Some(query) = iq.query { match query { stanza::client::iq::Query::Roster(mut query) => { // TODO: there should only be one if let Some(item) = query.items.pop() { match item.subscription { Some(stanza::roster::Subscription::Remove) => { self.db.delete_contact(item.jid.clone()).await; self.update_sender .send(UpdateMessage::RosterDelete(item.jid)) .await; // TODO: send result } _ => { let contact: Contact = item.into(); if let Err(e) = self.db.upsert_contact(contact.clone()).await { let _ = self .update_sender .send(UpdateMessage::Error(Error::Roster( RosterError::Cache(e.into()), ))) .await; } let _ = self .update_sender .send(UpdateMessage::RosterUpdate(contact)) .await; // TODO: send result // write_handle.write(Stanza::Iq(stanza::client::iq::Iq { // from: , // id: todo!(), // to: todo!(), // r#type: todo!(), // lang: todo!(), // query: todo!(), // errors: todo!(), // })); } } } } // TODO: send unsupported to server _ => {} } } else { // TODO: send error (unsupported) to server } } }, Stanza::Error(error) => { let _ = self .update_sender .send(UpdateMessage::Error(Error::Stream(error))) .await; // TODO: reconnect } Stanza::OtherContent(content) => { let _ = self .update_sender .send(UpdateMessage::Error(Error::UnrecognizedContent)); // TODO: send error to write_thread } } } async fn handle_online(self, command: Command, connection: Connected) { match command { Command::GetRoster(result_sender) => { // TODO: jid resource should probably be stored within the connection debug!("before client_jid lock"); debug!("after client_jid lock"); let iq_id = Uuid::new_v4().to_string(); let (send, iq_recv) = oneshot::channel(); { self.pending.lock().await.insert(iq_id.clone(), send); } let stanza = Stanza::Iq(Iq { from: Some(connection.jid().clone()), id: iq_id.to_string(), to: None, r#type: IqType::Get, lang: None, query: Some(iq::Query::Roster(stanza::roster::Query { ver: None, items: Vec::new(), })), errors: Vec::new(), }); let (send, recv) = oneshot::channel(); let _ = connection .write_handle() .send(WriteMessage { stanza, respond_to: send, }) .await; // TODO: timeout match recv.await { Ok(Ok(())) => info!("roster request sent"), Ok(Err(e)) => { // TODO: log errors if fail to send let _ = result_sender.send(Err(RosterError::Write(e.into()))); return; } Err(e) => { let _ = result_sender .send(Err(RosterError::Write(WriteError::Actor(e.into())))); return; } }; // TODO: timeout match iq_recv.await { Ok(Ok(stanza)) => match stanza { Stanza::Iq(Iq { from: _, id, to: _, r#type, lang: _, query: Some(iq::Query::Roster(stanza::roster::Query { ver: _, items })), errors: _, }) if id == iq_id && r#type == IqType::Result => { let contacts: Vec<Contact> = items.into_iter().map(|item| item.into()).collect(); if let Err(e) = self.db.replace_cached_roster(contacts.clone()).await { self.update_sender .send(UpdateMessage::Error(Error::Roster(RosterError::Cache( e.into(), )))) .await; }; result_sender.send(Ok(contacts)); return; } ref s @ Stanza::Iq(Iq { from: _, ref id, to: _, r#type, lang: _, query: _, ref errors, }) if *id == iq_id && r#type == IqType::Error => { if let Some(error) = errors.first() { result_sender.send(Err(RosterError::StanzaError(error.clone()))); } else { result_sender.send(Err(RosterError::UnexpectedStanza(s.clone()))); } return; } s => { result_sender.send(Err(RosterError::UnexpectedStanza(s))); return; } }, Ok(Err(e)) => { result_sender.send(Err(RosterError::Read(e))); return; } Err(e) => { result_sender.send(Err(RosterError::Write(WriteError::Actor(e.into())))); return; } } } Command::GetChats(sender) => { let chats = self.db.read_chats().await.map_err(|e| e.into()); sender.send(chats); } Command::GetChatsOrdered(sender) => { let chats = self.db.read_chats_ordered().await.map_err(|e| e.into()); sender.send(chats); } Command::GetChatsOrderedWithLatestMessages(sender) => { let chats = self .db .read_chats_ordered_with_latest_messages() .await .map_err(|e| e.into()); sender.send(chats); } Command::GetChat(jid, sender) => { let chats = self.db.read_chat(jid).await.map_err(|e| e.into()); sender.send(chats); } Command::GetMessages(jid, sender) => { let messages = self .db .read_message_history(jid) .await .map_err(|e| e.into()); sender.send(messages); } Command::DeleteChat(jid, sender) => { let result = self.db.delete_chat(jid).await.map_err(|e| e.into()); sender.send(result); } Command::DeleteMessage(uuid, sender) => { let result = self.db.delete_message(uuid).await.map_err(|e| e.into()); sender.send(result); } Command::GetUser(jid, sender) => { let user = self.db.read_user(jid).await.map_err(|e| e.into()); sender.send(user); } // TODO: offline queue to modify roster Command::AddContact(jid, sender) => { let iq_id = Uuid::new_v4().to_string(); let set_stanza = Stanza::Iq(Iq { from: Some(connection.jid().clone()), id: iq_id.clone(), to: None, r#type: IqType::Set, lang: None, query: Some(iq::Query::Roster(stanza::roster::Query { ver: None, items: vec![stanza::roster::Item { approved: None, ask: false, jid, name: None, subscription: None, groups: Vec::new(), }], })), errors: Vec::new(), }); let (send, recv) = oneshot::channel(); { self.pending.lock().await.insert(iq_id.clone(), send); } // TODO: write_handle send helper function let result = connection.write_handle().write(set_stanza).await; if let Err(e) = result { sender.send(Err(RosterError::Write(e))); return; } let iq_result = recv.await; match iq_result { Ok(i) => match i { Ok(iq_result) => match iq_result { Stanza::Iq(Iq { from: _, id, to: _, r#type, lang: _, query: _, errors: _, }) if id == iq_id && r#type == IqType::Result => { sender.send(Ok(())); return; } ref s @ Stanza::Iq(Iq { from: _, ref id, to: _, r#type, lang: _, query: _, ref errors, }) if *id == iq_id && r#type == IqType::Error => { if let Some(error) = errors.first() { sender.send(Err(RosterError::StanzaError(error.clone()))); } else { sender.send(Err(RosterError::UnexpectedStanza(s.clone()))); } return; } s => { sender.send(Err(RosterError::UnexpectedStanza(s))); return; } }, Err(e) => { sender.send(Err(e.into())); return; } }, Err(e) => { sender.send(Err(RosterError::Write(WriteError::Actor(e.into())))); return; } } } Command::BuddyRequest(jid, sender) => { let presence = Stanza::Presence(stanza::client::presence::Presence { from: None, id: None, to: Some(jid.clone()), r#type: Some(stanza::client::presence::PresenceType::Subscribe), lang: None, show: None, status: None, priority: None, errors: Vec::new(), delay: None, }); let result = connection.write_handle().write(presence).await; match result { Err(_) => { let _ = sender.send(result); } Ok(()) => { let presence = Stanza::Presence(stanza::client::presence::Presence { from: None, id: None, to: Some(jid), r#type: Some(stanza::client::presence::PresenceType::Subscribed), lang: None, show: None, status: None, priority: None, errors: Vec::new(), delay: None, }); let result = connection.write_handle().write(presence).await; let _ = sender.send(result); } } } Command::SubscriptionRequest(jid, sender) => { // TODO: i should probably have builders let presence = Stanza::Presence(stanza::client::presence::Presence { from: None, id: None, to: Some(jid), r#type: Some(stanza::client::presence::PresenceType::Subscribe), lang: None, show: None, status: None, priority: None, errors: Vec::new(), delay: None, }); let result = connection.write_handle().write(presence).await; let _ = sender.send(result); } Command::AcceptBuddyRequest(jid, sender) => { let presence = Stanza::Presence(stanza::client::presence::Presence { from: None, id: None, to: Some(jid.clone()), r#type: Some(stanza::client::presence::PresenceType::Subscribed), lang: None, show: None, status: None, priority: None, errors: Vec::new(), delay: None, }); let result = connection.write_handle().write(presence).await; match result { Err(_) => { let _ = sender.send(result); } Ok(()) => { let presence = Stanza::Presence(stanza::client::presence::Presence { from: None, id: None, to: Some(jid), r#type: Some(stanza::client::presence::PresenceType::Subscribe), lang: None, show: None, status: None, priority: None, errors: Vec::new(), delay: None, }); let result = connection.write_handle().write(presence).await; let _ = sender.send(result); } } } Command::AcceptSubscriptionRequest(jid, sender) => { let presence = Stanza::Presence(stanza::client::presence::Presence { from: None, id: None, to: Some(jid), r#type: Some(stanza::client::presence::PresenceType::Subscribe), lang: None, show: None, status: None, priority: None, errors: Vec::new(), delay: None, }); let result = connection.write_handle().write(presence).await; let _ = sender.send(result); } Command::UnsubscribeFromContact(jid, sender) => { let presence = Stanza::Presence(stanza::client::presence::Presence { from: None, id: None, to: Some(jid), r#type: Some(stanza::client::presence::PresenceType::Unsubscribe), lang: None, show: None, status: None, priority: None, errors: Vec::new(), delay: None, }); let result = connection.write_handle().write(presence).await; let _ = sender.send(result); } Command::UnsubscribeContact(jid, sender) => { let presence = Stanza::Presence(stanza::client::presence::Presence { from: None, id: None, to: Some(jid), r#type: Some(stanza::client::presence::PresenceType::Unsubscribed), lang: None, show: None, status: None, priority: None, errors: Vec::new(), delay: None, }); let result = connection.write_handle().write(presence).await; let _ = sender.send(result); } Command::UnfriendContact(jid, sender) => { let presence = Stanza::Presence(stanza::client::presence::Presence { from: None, id: None, to: Some(jid.clone()), r#type: Some(stanza::client::presence::PresenceType::Unsubscribe), lang: None, show: None, status: None, priority: None, errors: Vec::new(), delay: None, }); let result = connection.write_handle().write(presence).await; match result { Err(_) => { let _ = sender.send(result); } Ok(()) => { let presence = Stanza::Presence(stanza::client::presence::Presence { from: None, id: None, to: Some(jid), r#type: Some(stanza::client::presence::PresenceType::Unsubscribed), lang: None, show: None, status: None, priority: None, errors: Vec::new(), delay: None, }); let result = connection.write_handle().write(presence).await; let _ = sender.send(result); } } } Command::DeleteContact(jid, sender) => { let iq_id = Uuid::new_v4().to_string(); let set_stanza = Stanza::Iq(Iq { from: Some(connection.jid().clone()), id: iq_id.clone(), to: None, r#type: IqType::Set, lang: None, query: Some(iq::Query::Roster(stanza::roster::Query { ver: None, items: vec![stanza::roster::Item { approved: None, ask: false, jid, name: None, subscription: Some(stanza::roster::Subscription::Remove), groups: Vec::new(), }], })), errors: Vec::new(), }); let (send, recv) = oneshot::channel(); { self.pending.lock().await.insert(iq_id.clone(), send); } let result = connection.write_handle().write(set_stanza).await; if let Err(e) = result { sender.send(Err(RosterError::Write(e))); return; } let iq_result = recv.await; match iq_result { Ok(i) => match i { Ok(iq_result) => match iq_result { Stanza::Iq(Iq { from: _, id, to: _, r#type, lang: _, query: _, errors: _, }) if id == iq_id && r#type == IqType::Result => { sender.send(Ok(())); return; } ref s @ Stanza::Iq(Iq { from: _, ref id, to: _, r#type, lang: _, query: _, ref errors, }) if *id == iq_id && r#type == IqType::Error => { if let Some(error) = errors.first() { sender.send(Err(RosterError::StanzaError(error.clone()))); } else { sender.send(Err(RosterError::UnexpectedStanza(s.clone()))); } return; } s => { sender.send(Err(RosterError::UnexpectedStanza(s))); return; } }, Err(e) => { sender.send(Err(e.into())); return; } }, Err(e) => { sender.send(Err(RosterError::Write(WriteError::Actor(e.into())))); return; } } } Command::UpdateContact(jid, contact_update, sender) => { let iq_id = Uuid::new_v4().to_string(); let groups = Vec::from_iter( contact_update .groups .into_iter() .map(|group| stanza::roster::Group(Some(group))), ); let set_stanza = Stanza::Iq(Iq { from: Some(connection.jid().clone()), id: iq_id.clone(), to: None, r#type: IqType::Set, lang: None, query: Some(iq::Query::Roster(stanza::roster::Query { ver: None, items: vec![stanza::roster::Item { approved: None, ask: false, jid, name: contact_update.name, subscription: None, groups, }], })), errors: Vec::new(), }); let (send, recv) = oneshot::channel(); { self.pending.lock().await.insert(iq_id.clone(), send); } let result = connection.write_handle().write(set_stanza).await; if let Err(e) = result { sender.send(Err(RosterError::Write(e))); return; } let iq_result = recv.await; match iq_result { Ok(i) => match i { Ok(iq_result) => match iq_result { Stanza::Iq(Iq { from: _, id, to: _, r#type, lang: _, query: _, errors: _, }) if id == iq_id && r#type == IqType::Result => { sender.send(Ok(())); return; } ref s @ Stanza::Iq(Iq { from: _, ref id, to: _, r#type, lang: _, query: _, ref errors, }) if *id == iq_id && r#type == IqType::Error => { if let Some(error) = errors.first() { sender.send(Err(RosterError::StanzaError(error.clone()))); } else { sender.send(Err(RosterError::UnexpectedStanza(s.clone()))); } return; } s => { sender.send(Err(RosterError::UnexpectedStanza(s))); return; } }, Err(e) => { sender.send(Err(e.into())); return; } }, Err(e) => { sender.send(Err(RosterError::Write(WriteError::Actor(e.into())))); return; } } } Command::SetStatus(online, sender) => { let result = self.db.upsert_cached_status(online.clone()).await; if let Err(e) = result { let _ = self .update_sender .send(UpdateMessage::Error(Error::SetStatus(StatusError::Cache( e.into(), )))) .await; } let result = connection .write_handle() .write(Stanza::Presence(online.into_stanza(None))) .await .map_err(|e| StatusError::Write(e)); // .map_err(|e| StatusError::Write(e)); let _ = sender.send(result); } // TODO: offline message queue Command::SendMessage(jid, body, sender) => { let id = Uuid::new_v4(); let message = Stanza::Message(stanza::client::message::Message { from: Some(connection.jid().clone()), id: Some(id.to_string()), to: Some(jid.clone()), // TODO: specify message type r#type: stanza::client::message::MessageType::Chat, // TODO: lang ? lang: None, subject: None, body: Some(stanza::client::message::Body { lang: None, body: Some(body.body.clone()), }), thread: None, delay: None, }); let _ = sender.send(Ok(())); // let _ = sender.send(Ok(message.clone())); let result = connection.write_handle().write(message).await; match result { Ok(_) => { let mut message = Message { id, from: connection.jid().clone(), body, timestamp: Utc::now(), }; info!("send message {:?}", message); if let Err(e) = self .db .create_message_with_self_resource_and_chat( message.clone(), jid.clone(), ) .await .map_err(|e| e.into()) { tracing::error!("{}", e); let _ = self.update_sender .send(UpdateMessage::Error(Error::MessageSend( error::MessageSendError::MessageHistory(e), ))); } // TODO: don't do this, have separate from from details message.from = message.from.as_bare(); let _ = self .update_sender .send(UpdateMessage::Message { to: jid, message }) .await; } Err(_) => { // let _ = sender.send(result); } } } Command::SendPresence(jid, presence, sender) => { let mut presence: stanza::client::presence::Presence = presence.into(); if let Some(jid) = jid { presence.to = Some(jid); }; let result = connection .write_handle() .write(Stanza::Presence(presence)) .await; // .map_err(|e| StatusError::Write(e)); let _ = sender.send(result); } } } async fn handle_offline(self, command: Command) { match command { Command::GetRoster(sender) => { let roster = self.db.read_cached_roster().await; match roster { Ok(roster) => { let _ = sender.send(Ok(roster)); } Err(e) => { let _ = sender.send(Err(RosterError::Cache(e.into()))); } } } Command::GetChats(sender) => { let chats = self.db.read_chats().await.map_err(|e| e.into()); sender.send(chats); } Command::GetChatsOrdered(sender) => { let chats = self.db.read_chats_ordered().await.map_err(|e| e.into()); sender.send(chats); } Command::GetChatsOrderedWithLatestMessages(sender) => { let chats = self .db .read_chats_ordered_with_latest_messages() .await .map_err(|e| e.into()); sender.send(chats); } Command::GetChat(jid, sender) => { let chats = self.db.read_chat(jid).await.map_err(|e| e.into()); sender.send(chats); } Command::GetMessages(jid, sender) => { let messages = self .db .read_message_history(jid) .await .map_err(|e| e.into()); sender.send(messages); } Command::DeleteChat(jid, sender) => { let result = self.db.delete_chat(jid).await.map_err(|e| e.into()); sender.send(result); } Command::DeleteMessage(uuid, sender) => { let result = self.db.delete_message(uuid).await.map_err(|e| e.into()); sender.send(result); } Command::GetUser(jid, sender) => { let user = self.db.read_user(jid).await.map_err(|e| e.into()); sender.send(user); } // TODO: offline queue to modify roster Command::AddContact(_jid, sender) => { sender.send(Err(RosterError::Write(WriteError::Disconnected))); } Command::BuddyRequest(_jid, sender) => { sender.send(Err(WriteError::Disconnected)); } Command::SubscriptionRequest(_jid, sender) => { sender.send(Err(WriteError::Disconnected)); } Command::AcceptBuddyRequest(_jid, sender) => { sender.send(Err(WriteError::Disconnected)); } Command::AcceptSubscriptionRequest(_jid, sender) => { sender.send(Err(WriteError::Disconnected)); } Command::UnsubscribeFromContact(_jid, sender) => { sender.send(Err(WriteError::Disconnected)); } Command::UnsubscribeContact(_jid, sender) => { sender.send(Err(WriteError::Disconnected)); } Command::UnfriendContact(_jid, sender) => { sender.send(Err(WriteError::Disconnected)); } Command::DeleteContact(_jid, sender) => { sender.send(Err(RosterError::Write(WriteError::Disconnected))); } Command::UpdateContact(_jid, _contact_update, sender) => { sender.send(Err(RosterError::Write(WriteError::Disconnected))); } Command::SetStatus(online, sender) => { let result = self .db .upsert_cached_status(online) .await .map_err(|e| StatusError::Cache(e.into())); sender.send(result); } // TODO: offline message queue Command::SendMessage(_jid, _body, sender) => { sender.send(Err(WriteError::Disconnected)); } Command::SendPresence(_jid, _presence, sender) => { sender.send(Err(WriteError::Disconnected)); } } } // pub async fn handle_stream_error(self, error) {} // stanza errors (recoverable) // pub async fn handle_error(self, error: Error) {} // when it aborts, must clear iq map no matter what async fn on_abort(self) { let mut iqs = self.pending.lock().await; for (_id, sender) in iqs.drain() { let _ = sender.send(Err(ReadError::LostConnection)); } } async fn handle_connection_error(self, error: ConnectionError) { self.update_sender .send(UpdateMessage::Error( ConnectionError::AlreadyConnected.into(), )) .await; } } impl From<Command> for CoreClientCommand<Command> { fn from(value: Command) -> Self { CoreClientCommand::Command(value) } } #[derive(Debug, Clone)] pub enum UpdateMessage { Error(Error), Online(Online, Vec<Contact>), Offline(Offline), /// received roster from jabber server (replace full app roster state with this) /// is this needed? FullRoster(Vec<Contact>), /// (only update app roster state, don't replace) RosterUpdate(Contact), RosterDelete(JID), /// presences should be stored with users in the ui, not contacts, as presences can be received from anyone Presence { from: JID, presence: Presence, }, // TODO: receipts // MessageDispatched(Uuid), Message { to: JID, message: Message, }, SubscriptionRequest(jid::JID), }