diff options
Diffstat (limited to 'filamento/src/logic')
-rw-r--r-- | filamento/src/logic/connect.rs | 2 | ||||
-rw-r--r-- | filamento/src/logic/local_only.rs | 16 | ||||
-rw-r--r-- | filamento/src/logic/offline.rs | 37 | ||||
-rw-r--r-- | filamento/src/logic/online.rs | 101 | ||||
-rw-r--r-- | filamento/src/logic/process_stanza.rs | 83 |
5 files changed, 217 insertions, 22 deletions
diff --git a/filamento/src/logic/connect.rs b/filamento/src/logic/connect.rs index 37cdad5..9d61ca4 100644 --- a/filamento/src/logic/connect.rs +++ b/filamento/src/logic/connect.rs @@ -19,7 +19,7 @@ pub async fn handle_connect<Fs: FileStore + Clone + Send + Sync>( debug!("getting roster"); logic .clone() - .handle_online(Command::GetRoster(send), connection.clone()) + .handle_online(Command::GetRosterWithUsers(send), connection.clone()) .await; debug!("sent roster req"); let roster = recv.await; diff --git a/filamento/src/logic/local_only.rs b/filamento/src/logic/local_only.rs index cabbef4..dc94d2c 100644 --- a/filamento/src/logic/local_only.rs +++ b/filamento/src/logic/local_only.rs @@ -28,6 +28,15 @@ pub async fn handle_get_chats_ordered_with_latest_messages<Fs: FileStore + Clone Ok(logic.db().read_chats_ordered_with_latest_messages().await?) } +pub async fn handle_get_chats_ordered_with_latest_messages_and_users<Fs: FileStore + Clone>( + logic: &ClientLogic<Fs>, +) -> Result<Vec<((Chat, User), (Message, User))>, DatabaseError> { + Ok(logic + .db() + .read_chats_ordered_with_latest_messages_and_users() + .await?) +} + pub async fn handle_get_chat<Fs: FileStore + Clone>( logic: &ClientLogic<Fs>, jid: JID, @@ -42,6 +51,13 @@ pub async fn handle_get_messages<Fs: FileStore + Clone>( Ok(logic.db().read_message_history(jid).await?) } +pub async fn handle_get_messages_with_users<Fs: FileStore + Clone>( + logic: &ClientLogic<Fs>, + jid: JID, +) -> Result<Vec<(Message, User)>, DatabaseError> { + Ok(logic.db().read_message_history_with_users(jid).await?) +} + pub async fn handle_delete_chat<Fs: FileStore + Clone>( logic: &ClientLogic<Fs>, jid: JID, diff --git a/filamento/src/logic/offline.rs b/filamento/src/logic/offline.rs index 566972c..b87484c 100644 --- a/filamento/src/logic/offline.rs +++ b/filamento/src/logic/offline.rs @@ -2,6 +2,7 @@ use std::process::id; use chrono::Utc; use lampada::error::WriteError; +use tracing::error; use uuid::Uuid; use crate::{ @@ -14,6 +15,7 @@ use crate::{ files::FileStore, presence::Online, roster::Contact, + user::User, }; use super::{ @@ -21,7 +23,8 @@ use super::{ local_only::{ handle_delete_chat, handle_delete_messaage, handle_get_chat, handle_get_chats, handle_get_chats_ordered, handle_get_chats_ordered_with_latest_messages, - handle_get_messages, handle_get_user, + handle_get_chats_ordered_with_latest_messages_and_users, handle_get_messages, + handle_get_messages_with_users, handle_get_user, }, }; @@ -47,6 +50,12 @@ pub async fn handle_get_roster<Fs: FileStore + Clone>( Ok(logic.db().read_cached_roster().await?) } +pub async fn handle_get_roster_with_users<Fs: FileStore + Clone>( + logic: &ClientLogic<Fs>, +) -> Result<Vec<(Contact, User)>, RosterError> { + Ok(logic.db().read_cached_roster_with_users().await?) +} + pub async fn handle_offline_result<Fs: FileStore + Clone>( logic: &ClientLogic<Fs>, command: Command<Fs>, @@ -56,6 +65,10 @@ pub async fn handle_offline_result<Fs: FileStore + Clone>( let roster = handle_get_roster(logic).await; sender.send(roster); } + Command::GetRosterWithUsers(sender) => { + let roster = handle_get_roster_with_users(logic).await; + sender.send(roster); + } Command::GetChats(sender) => { let chats = handle_get_chats(logic).await; sender.send(chats); @@ -68,6 +81,10 @@ pub async fn handle_offline_result<Fs: FileStore + Clone>( let chats = handle_get_chats_ordered_with_latest_messages(logic).await; sender.send(chats); } + Command::GetChatsOrderedWithLatestMessagesAndUsers(sender) => { + let chats = handle_get_chats_ordered_with_latest_messages_and_users(logic).await; + sender.send(chats); + } Command::GetChat(jid, sender) => { let chats = handle_get_chat(logic, jid).await; sender.send(chats); @@ -76,6 +93,10 @@ pub async fn handle_offline_result<Fs: FileStore + Clone>( let messages = handle_get_messages(logic, jid).await; sender.send(messages); } + Command::GetMessagesWithUsers(jid, sender) => { + let messages = handle_get_messages_with_users(logic, jid).await; + sender.send(messages); + } Command::DeleteChat(jid, sender) => { let result = handle_delete_chat(logic, jid).await; sender.send(result); @@ -151,11 +172,25 @@ pub async fn handle_offline_result<Fs: FileStore + Clone>( .handle_error(MessageSendError::MessageHistory(e.into()).into()) .await; } + + let from = match logic.db().read_user(logic.bare_jid.clone()).await { + Ok(u) => u, + Err(e) => { + error!("{}", e); + User { + jid: logic.bare_jid.clone(), + nick: None, + avatar: None, + } + } + }; + logic .update_sender() .send(crate::UpdateMessage::Message { to: jid.as_bare(), message, + from, }) .await; } diff --git a/filamento/src/logic/online.rs b/filamento/src/logic/online.rs index d9441d7..767f923 100644 --- a/filamento/src/logic/online.rs +++ b/filamento/src/logic/online.rs @@ -18,16 +18,13 @@ use uuid::Uuid; use crate::{ avatar, chat::{Body, Chat, Delivery, Message}, disco::{Info, Items}, error::{ AvatarPublishError, DatabaseError, DiscoError, Error, IqRequestError, MessageSendError, NickError, PEPError, RosterError, StatusError, SubscribeError - }, files::FileStore, pep, presence::{Online, Presence, PresenceType}, roster::{Contact, ContactUpdate}, Command, UpdateMessage + }, files::FileStore, pep, presence::{Online, Presence, PresenceType}, roster::{Contact, ContactUpdate}, user::User, Command, UpdateMessage }; use super::{ - ClientLogic, local_only::{ - handle_delete_chat, handle_delete_messaage, handle_get_chat, handle_get_chats, - handle_get_chats_ordered, handle_get_chats_ordered_with_latest_messages, - handle_get_messages, handle_get_user, - }, + handle_delete_chat, handle_delete_messaage, handle_get_chat, handle_get_chats, handle_get_chats_ordered, handle_get_chats_ordered_with_latest_messages, handle_get_chats_ordered_with_latest_messages_and_users, handle_get_messages, handle_get_messages_with_users, handle_get_user + }, ClientLogic }; pub async fn handle_online<Fs: FileStore + Clone>(logic: ClientLogic<Fs>, command: Command<Fs>, connection: Connected) { @@ -97,6 +94,71 @@ pub async fn handle_get_roster<Fs: FileStore + Clone>( } } +// this can't query the client... otherwise there is a hold-up and the connection can't complete +pub async fn handle_get_roster_with_users<Fs: FileStore + Clone>( + logic: &ClientLogic<Fs>, + connection: Connected, +) -> Result<Vec<(Contact, User)>, RosterError> { + let iq_id = Uuid::new_v4().to_string(); + let stanza = Stanza::Iq(Iq { + from: Some(connection.jid().clone()), + id: iq_id.to_string(), + to: None, + r#type: IqType::Get, + lang: None, + query: Some(iq::Query::Roster(stanza::roster::Query { + ver: None, + items: Vec::new(), + })), + errors: Vec::new(), + }); + let response = logic + .pending() + .request(&connection, stanza, iq_id.clone()) + .await?; + // TODO: timeout + match response { + Stanza::Iq(Iq { + from: _, + id, + to: _, + r#type, + lang: _, + query: Some(iq::Query::Roster(stanza::roster::Query { ver: _, items })), + errors: _, + }) if id == iq_id && r#type == IqType::Result => { + let contacts: Vec<Contact> = items.into_iter().map(|item| item.into()).collect(); + if let Err(e) = logic.db().replace_cached_roster(contacts.clone()).await { + logic + .handle_error(Error::Roster(RosterError::Cache(e.into()))) + .await; + }; + let mut users = Vec::new(); + for contact in &contacts { + let user = logic.db().read_user(contact.user_jid.clone()).await?; + users.push(user); + } + Ok(contacts.into_iter().zip(users).collect()) + } + ref s @ Stanza::Iq(Iq { + from: _, + ref id, + to: _, + r#type, + lang: _, + query: _, + ref errors, + }) if *id == iq_id && r#type == IqType::Error => { + if let Some(error) = errors.first() { + Err(RosterError::StanzaError(error.clone())) + } else { + Err(RosterError::UnexpectedStanza(s.clone())) + } + } + s => Err(RosterError::UnexpectedStanza(s)), + } +} + pub async fn handle_add_contact<Fs: FileStore + Clone>( logic: &ClientLogic<Fs>, connection: Connected, @@ -470,12 +532,25 @@ pub async fn handle_send_message<Fs: FileStore + Clone>(logic: &ClientLogic<Fs>, .await; } + let from = match logic.db().read_user(logic.bare_jid.clone()).await { + Ok(u) => u, + Err(e) => { + error!("{}", e); + User { + jid: logic.bare_jid.clone(), + nick: None, + avatar: None, + } + }, + }; + // tell the client a message is being sent logic .update_sender() .send(UpdateMessage::Message { to: jid.as_bare(), message, + from, }) .await; @@ -513,6 +588,7 @@ pub async fn handle_send_message<Fs: FileStore + Clone>(logic: &ClientLogic<Fs>, .send(UpdateMessage::MessageDelivery { id, delivery: Delivery::Written, + chat: jid.clone(), }) .await; if mark_chat_as_chatted { @@ -530,6 +606,7 @@ pub async fn handle_send_message<Fs: FileStore + Clone>(logic: &ClientLogic<Fs>, .send(UpdateMessage::MessageDelivery { id, delivery: Delivery::Failed, + chat: jid, }) .await; logic.handle_error(MessageSendError::Write(e).into()).await; @@ -1006,6 +1083,10 @@ pub async fn handle_online_result<Fs: FileStore + Clone>( let roster = handle_get_roster(logic, connection).await; let _ = result_sender.send(roster); } + Command::GetRosterWithUsers(result_sender) => { + let roster = handle_get_roster_with_users(logic, connection).await; + let _ = result_sender.send(roster); + } Command::GetChats(sender) => { let chats = handle_get_chats(logic).await; let _ = sender.send(chats); @@ -1018,6 +1099,10 @@ pub async fn handle_online_result<Fs: FileStore + Clone>( let chats = handle_get_chats_ordered_with_latest_messages(logic).await; let _ = sender.send(chats); } + Command::GetChatsOrderedWithLatestMessagesAndUsers(sender) => { + let chats = handle_get_chats_ordered_with_latest_messages_and_users(logic).await; + sender.send(chats); + } Command::GetChat(jid, sender) => { let chat = handle_get_chat(logic, jid).await; let _ = sender.send(chat); @@ -1026,6 +1111,10 @@ pub async fn handle_online_result<Fs: FileStore + Clone>( let messages = handle_get_messages(logic, jid).await; let _ = sender.send(messages); } + Command::GetMessagesWithUsers(jid, sender) => { + let messages = handle_get_messages_with_users(logic, jid).await; + sender.send(messages); + } Command::DeleteChat(jid, sender) => { let result = handle_delete_chat(logic, jid).await; let _ = sender.send(result); diff --git a/filamento/src/logic/process_stanza.rs b/filamento/src/logic/process_stanza.rs index 9c49b04..cdaff97 100644 --- a/filamento/src/logic/process_stanza.rs +++ b/filamento/src/logic/process_stanza.rs @@ -20,12 +20,13 @@ use crate::{ UpdateMessage, caps, chat::{Body, Message}, error::{ - AvatarUpdateError, DatabaseError, Error, IqError, MessageRecvError, PresenceError, - RosterError, + AvatarUpdateError, DatabaseError, Error, IqError, IqProcessError, MessageRecvError, + PresenceError, RosterError, }, files::FileStore, presence::{Offline, Online, Presence, PresenceType, Show}, roster::Contact, + user::User, }; use super::ClientLogic; @@ -103,11 +104,24 @@ pub async fn recv_message<Fs: FileStore + Clone>( } }; + let from_user = match logic.db().read_user(from.as_bare()).await { + Ok(u) => u, + Err(e) => { + error!("{}", e); + User { + jid: from.as_bare(), + nick: None, + avatar: None, + } + } + }; + // update the client with the new message logic .update_sender() .send(UpdateMessage::Message { to: from.as_bare(), + from: from_user, message, }) .await; @@ -541,11 +555,11 @@ pub async fn recv_iq<Fs: FileStore + Clone>( logic: ClientLogic<Fs>, connection: Connected, iq: Iq, -) -> Result<Option<UpdateMessage>, IqError> { +) -> Result<Option<UpdateMessage>, IqProcessError> { if let Some(to) = &iq.to { if *to == *connection.jid() { } else { - return Err(IqError::IncorrectAddressee(to.clone())); + return Err(IqProcessError::Iq(IqError::IncorrectAddressee(to.clone()))); } } match iq.r#type { @@ -556,7 +570,11 @@ pub async fn recv_iq<Fs: FileStore + Clone>( .unwrap_or_else(|| connection.server().clone()); let id = iq.id.clone(); debug!("received iq result with id `{}` from {}", id, from); - logic.pending().respond(Stanza::Iq(iq), id).await?; + logic + .pending() + .respond(Stanza::Iq(iq), id) + .await + .map_err(|e| Into::<IqError>::into(e))?; Ok(None) } stanza::client::iq::IqType::Get => { @@ -596,7 +614,11 @@ pub async fn recv_iq<Fs: FileStore + Clone>( errors: vec![StanzaError::ItemNotFound.into()], }; // TODO: log error - connection.write_handle().write(Stanza::Iq(iq)).await?; + connection + .write_handle() + .write(Stanza::Iq(iq)) + .await + .map_err(|e| Into::<IqError>::into(e))?; info!("replied to disco#info request from {}", from); return Ok(None); } @@ -612,7 +634,11 @@ pub async fn recv_iq<Fs: FileStore + Clone>( errors: vec![StanzaError::ItemNotFound.into()], }; // TODO: log error - connection.write_handle().write(Stanza::Iq(iq)).await?; + connection + .write_handle() + .write(Stanza::Iq(iq)) + .await + .map_err(|e| Into::<IqError>::into(e))?; info!("replied to disco#info request from {}", from); return Ok(None); } @@ -627,7 +653,11 @@ pub async fn recv_iq<Fs: FileStore + Clone>( query: Some(iq::Query::DiscoInfo(disco)), errors: vec![], }; - connection.write_handle().write(Stanza::Iq(iq)).await?; + connection + .write_handle() + .write(Stanza::Iq(iq)) + .await + .map_err(|e| Into::<IqError>::into(e))?; info!("replied to disco#info request from {}", from); Ok(None) } @@ -642,7 +672,11 @@ pub async fn recv_iq<Fs: FileStore + Clone>( query: None, errors: vec![StanzaError::ServiceUnavailable.into()], }; - connection.write_handle().write(Stanza::Iq(iq)).await?; + connection + .write_handle() + .write(Stanza::Iq(iq)) + .await + .map_err(|e| Into::<IqError>::into(e))?; warn!("replied to unsupported iq get from {}", from); Ok(None) } // stanza::client::iq::Query::Bind(bind) => todo!(), @@ -662,7 +696,11 @@ pub async fn recv_iq<Fs: FileStore + Clone>( query: None, errors: vec![StanzaError::BadRequest.into()], }; - connection.write_handle().write(Stanza::Iq(iq)).await?; + connection + .write_handle() + .write(Stanza::Iq(iq)) + .await + .map_err(|e| Into::<IqError>::into(e))?; info!("replied to malformed iq query from {}", from); Ok(None) } @@ -713,7 +751,12 @@ pub async fn recv_iq<Fs: FileStore + Clone>( .handle_error(RosterError::PushReply(e.into()).into()) .await; } - Ok(Some(UpdateMessage::RosterUpdate(contact))) + let user = logic + .db() + .read_user(contact.user_jid.clone()) + .await + .map_err(|e| Into::<RosterError>::into(e))?; + Ok(Some(UpdateMessage::RosterUpdate(contact, user))) } } } else { @@ -727,7 +770,11 @@ pub async fn recv_iq<Fs: FileStore + Clone>( query: None, errors: vec![StanzaError::NotAcceptable.into()], }; - connection.write_handle().write(Stanza::Iq(iq)).await?; + connection + .write_handle() + .write(Stanza::Iq(iq)) + .await + .map_err(|e| Into::<IqError>::into(e))?; Ok(None) } } @@ -743,7 +790,11 @@ pub async fn recv_iq<Fs: FileStore + Clone>( query: None, errors: vec![StanzaError::ServiceUnavailable.into()], }; - connection.write_handle().write(Stanza::Iq(iq)).await?; + connection + .write_handle() + .write(Stanza::Iq(iq)) + .await + .map_err(|e| Into::<IqError>::into(e))?; warn!("replied to unsupported iq set from {}", from); Ok(None) } @@ -759,7 +810,11 @@ pub async fn recv_iq<Fs: FileStore + Clone>( query: None, errors: vec![StanzaError::NotAcceptable.into()], }; - connection.write_handle().write(Stanza::Iq(iq)).await?; + connection + .write_handle() + .write(Stanza::Iq(iq)) + .await + .map_err(|e| Into::<IqError>::into(e))?; Ok(None) } } |