diff options
Diffstat (limited to 'filamento/src/logic/process_stanza.rs')
-rw-r--r-- | filamento/src/logic/process_stanza.rs | 495 |
1 files changed, 448 insertions, 47 deletions
diff --git a/filamento/src/logic/process_stanza.rs b/filamento/src/logic/process_stanza.rs index 182fb43..cdaff97 100644 --- a/filamento/src/logic/process_stanza.rs +++ b/filamento/src/logic/process_stanza.rs @@ -1,7 +1,9 @@ use std::str::FromStr; +use base64::{Engine, prelude::BASE64_STANDARD}; use chrono::Utc; use lampada::{Connected, SupervisorSender}; +use sha1::{Digest, Sha1}; use stanza::{ client::{ Stanza, @@ -9,6 +11,7 @@ use stanza::{ }, stanza_error::Error as StanzaError, xep_0030::{self, info}, + xep_0060::event::{Content, Event, ItemsType}, }; use tracing::{debug, error, info, warn}; use uuid::Uuid; @@ -16,14 +19,23 @@ use uuid::Uuid; use crate::{ UpdateMessage, caps, chat::{Body, Message}, - error::{DatabaseError, Error, IqError, MessageRecvError, PresenceError, RosterError}, + error::{ + AvatarUpdateError, DatabaseError, Error, IqError, IqProcessError, MessageRecvError, + PresenceError, RosterError, + }, + files::FileStore, presence::{Offline, Online, Presence, PresenceType, Show}, roster::Contact, + user::User, }; use super::ClientLogic; -pub async fn handle_stanza(logic: ClientLogic, stanza: Stanza, connection: Connected) { +pub async fn handle_stanza<Fs: FileStore + Clone>( + logic: ClientLogic<Fs>, + stanza: Stanza, + connection: Connected, +) { let result = process_stanza(logic.clone(), stanza, connection).await; match result { Ok(u) => match u { @@ -37,10 +49,10 @@ pub async fn handle_stanza(logic: ClientLogic, stanza: Stanza, connection: Conne } } -pub async fn recv_message( - logic: ClientLogic, +pub async fn recv_message<Fs: FileStore + Clone>( + logic: ClientLogic<Fs>, stanza_message: stanza::client::message::Message, -) -> Result<Option<UpdateMessage>, MessageRecvError> { +) -> Result<Option<UpdateMessage>, MessageRecvError<Fs>> { if let Some(from) = stanza_message.from { // TODO: don't ignore delay from. xep says SHOULD send error if incorrect. let timestamp = stanza_message @@ -49,6 +61,7 @@ pub async fn recv_message( .unwrap_or_else(|| Utc::now()); // TODO: group chat messages + // body MUST be before user changes in order to avoid race condition where you e.g. get a nick update before the user is in the client state. // if there is a body, should create chat message if let Some(body) = stanza_message.body { let message = Message { @@ -66,45 +79,392 @@ pub async fn recv_message( }; // save the message to the database - logic.db().upsert_chat_and_user(&from).await?; - if let Err(e) = logic - .db() - .create_message_with_user_resource(message.clone(), from.clone(), from.clone()) - .await - { - logic - .handle_error(Error::MessageRecv(MessageRecvError::MessageHistory(e))) - .await; - } + match logic.db().upsert_chat_and_user(&from).await { + Ok(_) => { + if let Err(e) = logic + .db() + .create_message_with_user_resource( + message.clone(), + from.clone(), + from.clone(), + ) + .await + { + logic + .handle_error(Error::MessageRecv(MessageRecvError::MessageHistory(e))) + .await; + error!("failed to upsert chat and user") + } + } + Err(e) => { + logic + .handle_error(Error::MessageRecv(MessageRecvError::MessageHistory(e))) + .await; + error!("failed to upsert chat and user") + } + }; + + let from_user = match logic.db().read_user(from.as_bare()).await { + Ok(u) => u, + Err(e) => { + error!("{}", e); + User { + jid: from.as_bare(), + nick: None, + avatar: None, + } + } + }; // update the client with the new message logic .update_sender() .send(UpdateMessage::Message { to: from.as_bare(), + from: from_user, message, }) .await; } if let Some(nick) = stanza_message.nick { - if let Err(e) = logic - .db() - .upsert_user_nick(from.as_bare(), nick.0.clone()) - .await - { - logic - .handle_error(Error::MessageRecv(MessageRecvError::NickUpdate(e))) - .await; + let nick = nick.0; + if nick.is_empty() { + match logic.db().delete_user_nick(from.as_bare()).await { + Ok(changed) => { + if changed { + logic + .update_sender() + .send(UpdateMessage::NickChanged { + jid: from.as_bare(), + nick: None, + }) + .await; + } + } + Err(e) => { + logic + .handle_error(Error::MessageRecv(MessageRecvError::NickUpdate(e))) + .await; + // if failed, send user update anyway + logic + .update_sender() + .send(UpdateMessage::NickChanged { + jid: from.as_bare(), + nick: None, + }) + .await; + } + } + } else { + match logic + .db() + .upsert_user_nick(from.as_bare(), nick.clone()) + .await + { + Ok(changed) => { + if changed { + logic + .update_sender() + .send(UpdateMessage::NickChanged { + jid: from.as_bare(), + nick: Some(nick), + }) + .await; + } + } + Err(e) => { + logic + .handle_error(Error::MessageRecv(MessageRecvError::NickUpdate(e))) + .await; + // if failed, send user update anyway + logic + .update_sender() + .send(UpdateMessage::NickChanged { + jid: from.as_bare(), + nick: Some(nick), + }) + .await; + } + } } + } - logic - .update_sender() - .send(UpdateMessage::NickChanged { - jid: from.as_bare(), - nick: nick.0, - }) - .await; + if let Some(event) = stanza_message.event { + match event { + Event::Items(items) => { + match items.node.as_str() { + "http://jabber.org/protocol/nick" => match items.items { + ItemsType::Item(items) => { + if let Some(item) = items.first() { + match &item.item { + Some(c) => match c { + Content::Nick(nick) => { + let nick = nick.0.clone(); + if nick.is_empty() { + match logic + .db() + .delete_user_nick(from.as_bare()) + .await + { + Ok(changed) => { + if changed { + logic + .update_sender() + .send(UpdateMessage::NickChanged { + jid: from.as_bare(), + nick: None, + }) + .await; + } + } + Err(e) => { + logic + .handle_error(Error::MessageRecv( + MessageRecvError::NickUpdate(e), + )) + .await; + // if failed, send user update anyway + logic + .update_sender() + .send(UpdateMessage::NickChanged { + jid: from.as_bare(), + nick: None, + }) + .await; + } + } + } else { + match logic + .db() + .upsert_user_nick( + from.as_bare(), + nick.clone(), + ) + .await + { + Ok(changed) => { + if changed { + logic + .update_sender() + .send(UpdateMessage::NickChanged { + jid: from.as_bare(), + nick: Some(nick), + }) + .await; + } + } + Err(e) => { + logic + .handle_error(Error::MessageRecv( + MessageRecvError::NickUpdate(e), + )) + .await; + // if failed, send user update anyway + logic + .update_sender() + .send(UpdateMessage::NickChanged { + jid: from.as_bare(), + nick: Some(nick), + }) + .await; + } + } + } + } + _ => {} + }, + None => {} + } + } + } + _ => {} + }, + "urn:xmpp:avatar:metadata" => { + match items.items { + ItemsType::Item(items) => { + if let Some(item) = items.first() { + debug!("found item"); + match &item.item { + Some(Content::AvatarMetadata(metadata)) => { + debug!("found metadata"); + // check if user avatar has been deleted + if let Some(metadata) = metadata + .info + .iter() + .find(|info| info.url.is_none()) + { + debug!("checking if user avatar has changed"); + // check if user avatar has changed + match logic + .db() + .upsert_user_avatar( + from.as_bare(), + metadata.id.clone(), + ) + .await + { + Ok((changed, old_avatar)) => { + if changed { + if let Some(old_avatar) = old_avatar + { + if let Err(e) = logic + .file_store() + .delete(&old_avatar) + .await.map_err(|err| AvatarUpdateError::FileStore(err)) { + logic.handle_error(MessageRecvError::AvatarUpdate(e).into()).await; + } + } + } + + match logic + .file_store() + .is_stored(&metadata.id) + .await + .map_err(|err| { + AvatarUpdateError::<Fs>::FileStore( + err, + ) + }) { + Ok(false) => { + // get data + let pep_item = logic.client().get_pep_item(Some(from.as_bare()), "urn:xmpp:avatar:data".to_string(), metadata.id.clone()).await.map_err(|err| Into::<AvatarUpdateError<Fs>>::into(err))?; + match pep_item { + crate::pep::Item::AvatarData(data) => { + let data = data.map(|data| data.data_b64).unwrap_or_default().replace("\n", ""); + // TODO: these should all be in a separate avatarupdate function + debug!("got avatar data"); + match BASE64_STANDARD.decode(data) { + Ok(data) => { + let mut hasher = Sha1::new(); + hasher.update(&data); + let received_data_hash = hex::encode(hasher.finalize()); + debug!("received_data_hash: {}, metadata_id: {}", received_data_hash, metadata.id); + if received_data_hash.to_lowercase() == metadata.id.to_lowercase() { + if let Err(e) = logic.file_store().store(&received_data_hash, &data).await { + logic.handle_error(Error::MessageRecv(MessageRecvError::AvatarUpdate(AvatarUpdateError::FileStore(e)))).await; + } + if changed { + logic + .update_sender() + .send( + UpdateMessage::AvatarChanged { + jid: from.as_bare(), + id: Some( + metadata.id.clone(), + ), + }, + ) + .await; + } + } + }, + Err(e) => { + logic.handle_error(Error::MessageRecv(MessageRecvError::AvatarUpdate(AvatarUpdateError::Base64(e)))).await; + }, + } + }, + _ => { + logic.handle_error(Error::MessageRecv(MessageRecvError::AvatarUpdate(AvatarUpdateError::MissingData))).await; + } + } + } + Ok(true) => { + // just send the update + if changed { + logic + .update_sender() + .send( + UpdateMessage::AvatarChanged { + jid: from.as_bare(), + id: Some( + metadata.id.clone(), + ), + }, + ) + .await; + } + } + Err(e) => { + logic.handle_error(Error::MessageRecv(MessageRecvError::AvatarUpdate(e))).await; + } + } + } + Err(e) => { + logic + .handle_error(Error::MessageRecv( + MessageRecvError::AvatarUpdate( + AvatarUpdateError::Database( + e, + ), + ), + )) + .await; + } + } + } else { + // delete avatar + match logic + .db() + .delete_user_avatar(from.as_bare()) + .await + { + Ok((changed, old_avatar)) => { + if changed { + if let Some(old_avatar) = old_avatar + { + if let Err(e) = logic + .file_store() + .delete(&old_avatar) + .await.map_err(|err| AvatarUpdateError::FileStore(err)) { + logic.handle_error(MessageRecvError::AvatarUpdate(e).into()).await; + } + } + logic + .update_sender() + .send( + UpdateMessage::AvatarChanged { + jid: from.as_bare(), + id: None, + }, + ) + .await; + } + } + Err(e) => { + logic + .handle_error(Error::MessageRecv( + MessageRecvError::AvatarUpdate( + AvatarUpdateError::Database( + e, + ), + ), + )) + .await; + } + } + } + // check if the new file is in the file store + // if not, retrieve from server and save in the file store (remember to check if the hash matches) + // send the avatar update + } + _ => {} + } + } + } + _ => {} + } + } + _ => {} + } + } + // Event::Collection(collection) => todo!(), + // Event::Configuration(configuration) => todo!(), + // Event::Delete(delete) => todo!(), + // Event::Purge(purge) => todo!(), + // Event::Subscription(subscription) => todo!(), + _ => {} // TODO: catch these catch-alls in some way + } } Ok(None) @@ -191,15 +551,15 @@ pub async fn recv_presence( } } -pub async fn recv_iq( - logic: ClientLogic, +pub async fn recv_iq<Fs: FileStore + Clone>( + logic: ClientLogic<Fs>, connection: Connected, iq: Iq, -) -> Result<Option<UpdateMessage>, IqError> { +) -> Result<Option<UpdateMessage>, IqProcessError> { if let Some(to) = &iq.to { if *to == *connection.jid() { } else { - return Err(IqError::IncorrectAddressee(to.clone())); + return Err(IqProcessError::Iq(IqError::IncorrectAddressee(to.clone()))); } } match iq.r#type { @@ -210,7 +570,11 @@ pub async fn recv_iq( .unwrap_or_else(|| connection.server().clone()); let id = iq.id.clone(); debug!("received iq result with id `{}` from {}", id, from); - logic.pending().respond(Stanza::Iq(iq), id).await?; + logic + .pending() + .respond(Stanza::Iq(iq), id) + .await + .map_err(|e| Into::<IqError>::into(e))?; Ok(None) } stanza::client::iq::IqType::Get => { @@ -250,7 +614,11 @@ pub async fn recv_iq( errors: vec![StanzaError::ItemNotFound.into()], }; // TODO: log error - connection.write_handle().write(Stanza::Iq(iq)).await?; + connection + .write_handle() + .write(Stanza::Iq(iq)) + .await + .map_err(|e| Into::<IqError>::into(e))?; info!("replied to disco#info request from {}", from); return Ok(None); } @@ -266,7 +634,11 @@ pub async fn recv_iq( errors: vec![StanzaError::ItemNotFound.into()], }; // TODO: log error - connection.write_handle().write(Stanza::Iq(iq)).await?; + connection + .write_handle() + .write(Stanza::Iq(iq)) + .await + .map_err(|e| Into::<IqError>::into(e))?; info!("replied to disco#info request from {}", from); return Ok(None); } @@ -281,7 +653,11 @@ pub async fn recv_iq( query: Some(iq::Query::DiscoInfo(disco)), errors: vec![], }; - connection.write_handle().write(Stanza::Iq(iq)).await?; + connection + .write_handle() + .write(Stanza::Iq(iq)) + .await + .map_err(|e| Into::<IqError>::into(e))?; info!("replied to disco#info request from {}", from); Ok(None) } @@ -296,7 +672,11 @@ pub async fn recv_iq( query: None, errors: vec![StanzaError::ServiceUnavailable.into()], }; - connection.write_handle().write(Stanza::Iq(iq)).await?; + connection + .write_handle() + .write(Stanza::Iq(iq)) + .await + .map_err(|e| Into::<IqError>::into(e))?; warn!("replied to unsupported iq get from {}", from); Ok(None) } // stanza::client::iq::Query::Bind(bind) => todo!(), @@ -316,7 +696,11 @@ pub async fn recv_iq( query: None, errors: vec![StanzaError::BadRequest.into()], }; - connection.write_handle().write(Stanza::Iq(iq)).await?; + connection + .write_handle() + .write(Stanza::Iq(iq)) + .await + .map_err(|e| Into::<IqError>::into(e))?; info!("replied to malformed iq query from {}", from); Ok(None) } @@ -367,7 +751,12 @@ pub async fn recv_iq( .handle_error(RosterError::PushReply(e.into()).into()) .await; } - Ok(Some(UpdateMessage::RosterUpdate(contact))) + let user = logic + .db() + .read_user(contact.user_jid.clone()) + .await + .map_err(|e| Into::<RosterError>::into(e))?; + Ok(Some(UpdateMessage::RosterUpdate(contact, user))) } } } else { @@ -381,7 +770,11 @@ pub async fn recv_iq( query: None, errors: vec![StanzaError::NotAcceptable.into()], }; - connection.write_handle().write(Stanza::Iq(iq)).await?; + connection + .write_handle() + .write(Stanza::Iq(iq)) + .await + .map_err(|e| Into::<IqError>::into(e))?; Ok(None) } } @@ -397,7 +790,11 @@ pub async fn recv_iq( query: None, errors: vec![StanzaError::ServiceUnavailable.into()], }; - connection.write_handle().write(Stanza::Iq(iq)).await?; + connection + .write_handle() + .write(Stanza::Iq(iq)) + .await + .map_err(|e| Into::<IqError>::into(e))?; warn!("replied to unsupported iq set from {}", from); Ok(None) } @@ -413,18 +810,22 @@ pub async fn recv_iq( query: None, errors: vec![StanzaError::NotAcceptable.into()], }; - connection.write_handle().write(Stanza::Iq(iq)).await?; + connection + .write_handle() + .write(Stanza::Iq(iq)) + .await + .map_err(|e| Into::<IqError>::into(e))?; Ok(None) } } } } -pub async fn process_stanza( - logic: ClientLogic, +pub async fn process_stanza<Fs: FileStore + Clone>( + logic: ClientLogic<Fs>, stanza: Stanza, connection: Connected, -) -> Result<Option<UpdateMessage>, Error> { +) -> Result<Option<UpdateMessage>, Error<Fs>> { let update = match stanza { Stanza::Message(stanza_message) => Ok(recv_message(logic, stanza_message).await?), Stanza::Presence(presence) => Ok(recv_presence(presence).await?), |