aboutsummaryrefslogtreecommitdiffstats
path: root/filamento/src/logic/process_stanza.rs
diff options
context:
space:
mode:
Diffstat (limited to 'filamento/src/logic/process_stanza.rs')
-rw-r--r--filamento/src/logic/process_stanza.rs495
1 files changed, 448 insertions, 47 deletions
diff --git a/filamento/src/logic/process_stanza.rs b/filamento/src/logic/process_stanza.rs
index 182fb43..cdaff97 100644
--- a/filamento/src/logic/process_stanza.rs
+++ b/filamento/src/logic/process_stanza.rs
@@ -1,7 +1,9 @@
use std::str::FromStr;
+use base64::{Engine, prelude::BASE64_STANDARD};
use chrono::Utc;
use lampada::{Connected, SupervisorSender};
+use sha1::{Digest, Sha1};
use stanza::{
client::{
Stanza,
@@ -9,6 +11,7 @@ use stanza::{
},
stanza_error::Error as StanzaError,
xep_0030::{self, info},
+ xep_0060::event::{Content, Event, ItemsType},
};
use tracing::{debug, error, info, warn};
use uuid::Uuid;
@@ -16,14 +19,23 @@ use uuid::Uuid;
use crate::{
UpdateMessage, caps,
chat::{Body, Message},
- error::{DatabaseError, Error, IqError, MessageRecvError, PresenceError, RosterError},
+ error::{
+ AvatarUpdateError, DatabaseError, Error, IqError, IqProcessError, MessageRecvError,
+ PresenceError, RosterError,
+ },
+ files::FileStore,
presence::{Offline, Online, Presence, PresenceType, Show},
roster::Contact,
+ user::User,
};
use super::ClientLogic;
-pub async fn handle_stanza(logic: ClientLogic, stanza: Stanza, connection: Connected) {
+pub async fn handle_stanza<Fs: FileStore + Clone>(
+ logic: ClientLogic<Fs>,
+ stanza: Stanza,
+ connection: Connected,
+) {
let result = process_stanza(logic.clone(), stanza, connection).await;
match result {
Ok(u) => match u {
@@ -37,10 +49,10 @@ pub async fn handle_stanza(logic: ClientLogic, stanza: Stanza, connection: Conne
}
}
-pub async fn recv_message(
- logic: ClientLogic,
+pub async fn recv_message<Fs: FileStore + Clone>(
+ logic: ClientLogic<Fs>,
stanza_message: stanza::client::message::Message,
-) -> Result<Option<UpdateMessage>, MessageRecvError> {
+) -> Result<Option<UpdateMessage>, MessageRecvError<Fs>> {
if let Some(from) = stanza_message.from {
// TODO: don't ignore delay from. xep says SHOULD send error if incorrect.
let timestamp = stanza_message
@@ -49,6 +61,7 @@ pub async fn recv_message(
.unwrap_or_else(|| Utc::now());
// TODO: group chat messages
+ // body MUST be before user changes in order to avoid race condition where you e.g. get a nick update before the user is in the client state.
// if there is a body, should create chat message
if let Some(body) = stanza_message.body {
let message = Message {
@@ -66,45 +79,392 @@ pub async fn recv_message(
};
// save the message to the database
- logic.db().upsert_chat_and_user(&from).await?;
- if let Err(e) = logic
- .db()
- .create_message_with_user_resource(message.clone(), from.clone(), from.clone())
- .await
- {
- logic
- .handle_error(Error::MessageRecv(MessageRecvError::MessageHistory(e)))
- .await;
- }
+ match logic.db().upsert_chat_and_user(&from).await {
+ Ok(_) => {
+ if let Err(e) = logic
+ .db()
+ .create_message_with_user_resource(
+ message.clone(),
+ from.clone(),
+ from.clone(),
+ )
+ .await
+ {
+ logic
+ .handle_error(Error::MessageRecv(MessageRecvError::MessageHistory(e)))
+ .await;
+ error!("failed to upsert chat and user")
+ }
+ }
+ Err(e) => {
+ logic
+ .handle_error(Error::MessageRecv(MessageRecvError::MessageHistory(e)))
+ .await;
+ error!("failed to upsert chat and user")
+ }
+ };
+
+ let from_user = match logic.db().read_user(from.as_bare()).await {
+ Ok(u) => u,
+ Err(e) => {
+ error!("{}", e);
+ User {
+ jid: from.as_bare(),
+ nick: None,
+ avatar: None,
+ }
+ }
+ };
// update the client with the new message
logic
.update_sender()
.send(UpdateMessage::Message {
to: from.as_bare(),
+ from: from_user,
message,
})
.await;
}
if let Some(nick) = stanza_message.nick {
- if let Err(e) = logic
- .db()
- .upsert_user_nick(from.as_bare(), nick.0.clone())
- .await
- {
- logic
- .handle_error(Error::MessageRecv(MessageRecvError::NickUpdate(e)))
- .await;
+ let nick = nick.0;
+ if nick.is_empty() {
+ match logic.db().delete_user_nick(from.as_bare()).await {
+ Ok(changed) => {
+ if changed {
+ logic
+ .update_sender()
+ .send(UpdateMessage::NickChanged {
+ jid: from.as_bare(),
+ nick: None,
+ })
+ .await;
+ }
+ }
+ Err(e) => {
+ logic
+ .handle_error(Error::MessageRecv(MessageRecvError::NickUpdate(e)))
+ .await;
+ // if failed, send user update anyway
+ logic
+ .update_sender()
+ .send(UpdateMessage::NickChanged {
+ jid: from.as_bare(),
+ nick: None,
+ })
+ .await;
+ }
+ }
+ } else {
+ match logic
+ .db()
+ .upsert_user_nick(from.as_bare(), nick.clone())
+ .await
+ {
+ Ok(changed) => {
+ if changed {
+ logic
+ .update_sender()
+ .send(UpdateMessage::NickChanged {
+ jid: from.as_bare(),
+ nick: Some(nick),
+ })
+ .await;
+ }
+ }
+ Err(e) => {
+ logic
+ .handle_error(Error::MessageRecv(MessageRecvError::NickUpdate(e)))
+ .await;
+ // if failed, send user update anyway
+ logic
+ .update_sender()
+ .send(UpdateMessage::NickChanged {
+ jid: from.as_bare(),
+ nick: Some(nick),
+ })
+ .await;
+ }
+ }
}
+ }
- logic
- .update_sender()
- .send(UpdateMessage::NickChanged {
- jid: from.as_bare(),
- nick: nick.0,
- })
- .await;
+ if let Some(event) = stanza_message.event {
+ match event {
+ Event::Items(items) => {
+ match items.node.as_str() {
+ "http://jabber.org/protocol/nick" => match items.items {
+ ItemsType::Item(items) => {
+ if let Some(item) = items.first() {
+ match &item.item {
+ Some(c) => match c {
+ Content::Nick(nick) => {
+ let nick = nick.0.clone();
+ if nick.is_empty() {
+ match logic
+ .db()
+ .delete_user_nick(from.as_bare())
+ .await
+ {
+ Ok(changed) => {
+ if changed {
+ logic
+ .update_sender()
+ .send(UpdateMessage::NickChanged {
+ jid: from.as_bare(),
+ nick: None,
+ })
+ .await;
+ }
+ }
+ Err(e) => {
+ logic
+ .handle_error(Error::MessageRecv(
+ MessageRecvError::NickUpdate(e),
+ ))
+ .await;
+ // if failed, send user update anyway
+ logic
+ .update_sender()
+ .send(UpdateMessage::NickChanged {
+ jid: from.as_bare(),
+ nick: None,
+ })
+ .await;
+ }
+ }
+ } else {
+ match logic
+ .db()
+ .upsert_user_nick(
+ from.as_bare(),
+ nick.clone(),
+ )
+ .await
+ {
+ Ok(changed) => {
+ if changed {
+ logic
+ .update_sender()
+ .send(UpdateMessage::NickChanged {
+ jid: from.as_bare(),
+ nick: Some(nick),
+ })
+ .await;
+ }
+ }
+ Err(e) => {
+ logic
+ .handle_error(Error::MessageRecv(
+ MessageRecvError::NickUpdate(e),
+ ))
+ .await;
+ // if failed, send user update anyway
+ logic
+ .update_sender()
+ .send(UpdateMessage::NickChanged {
+ jid: from.as_bare(),
+ nick: Some(nick),
+ })
+ .await;
+ }
+ }
+ }
+ }
+ _ => {}
+ },
+ None => {}
+ }
+ }
+ }
+ _ => {}
+ },
+ "urn:xmpp:avatar:metadata" => {
+ match items.items {
+ ItemsType::Item(items) => {
+ if let Some(item) = items.first() {
+ debug!("found item");
+ match &item.item {
+ Some(Content::AvatarMetadata(metadata)) => {
+ debug!("found metadata");
+ // check if user avatar has been deleted
+ if let Some(metadata) = metadata
+ .info
+ .iter()
+ .find(|info| info.url.is_none())
+ {
+ debug!("checking if user avatar has changed");
+ // check if user avatar has changed
+ match logic
+ .db()
+ .upsert_user_avatar(
+ from.as_bare(),
+ metadata.id.clone(),
+ )
+ .await
+ {
+ Ok((changed, old_avatar)) => {
+ if changed {
+ if let Some(old_avatar) = old_avatar
+ {
+ if let Err(e) = logic
+ .file_store()
+ .delete(&old_avatar)
+ .await.map_err(|err| AvatarUpdateError::FileStore(err)) {
+ logic.handle_error(MessageRecvError::AvatarUpdate(e).into()).await;
+ }
+ }
+ }
+
+ match logic
+ .file_store()
+ .is_stored(&metadata.id)
+ .await
+ .map_err(|err| {
+ AvatarUpdateError::<Fs>::FileStore(
+ err,
+ )
+ }) {
+ Ok(false) => {
+ // get data
+ let pep_item = logic.client().get_pep_item(Some(from.as_bare()), "urn:xmpp:avatar:data".to_string(), metadata.id.clone()).await.map_err(|err| Into::<AvatarUpdateError<Fs>>::into(err))?;
+ match pep_item {
+ crate::pep::Item::AvatarData(data) => {
+ let data = data.map(|data| data.data_b64).unwrap_or_default().replace("\n", "");
+ // TODO: these should all be in a separate avatarupdate function
+ debug!("got avatar data");
+ match BASE64_STANDARD.decode(data) {
+ Ok(data) => {
+ let mut hasher = Sha1::new();
+ hasher.update(&data);
+ let received_data_hash = hex::encode(hasher.finalize());
+ debug!("received_data_hash: {}, metadata_id: {}", received_data_hash, metadata.id);
+ if received_data_hash.to_lowercase() == metadata.id.to_lowercase() {
+ if let Err(e) = logic.file_store().store(&received_data_hash, &data).await {
+ logic.handle_error(Error::MessageRecv(MessageRecvError::AvatarUpdate(AvatarUpdateError::FileStore(e)))).await;
+ }
+ if changed {
+ logic
+ .update_sender()
+ .send(
+ UpdateMessage::AvatarChanged {
+ jid: from.as_bare(),
+ id: Some(
+ metadata.id.clone(),
+ ),
+ },
+ )
+ .await;
+ }
+ }
+ },
+ Err(e) => {
+ logic.handle_error(Error::MessageRecv(MessageRecvError::AvatarUpdate(AvatarUpdateError::Base64(e)))).await;
+ },
+ }
+ },
+ _ => {
+ logic.handle_error(Error::MessageRecv(MessageRecvError::AvatarUpdate(AvatarUpdateError::MissingData))).await;
+ }
+ }
+ }
+ Ok(true) => {
+ // just send the update
+ if changed {
+ logic
+ .update_sender()
+ .send(
+ UpdateMessage::AvatarChanged {
+ jid: from.as_bare(),
+ id: Some(
+ metadata.id.clone(),
+ ),
+ },
+ )
+ .await;
+ }
+ }
+ Err(e) => {
+ logic.handle_error(Error::MessageRecv(MessageRecvError::AvatarUpdate(e))).await;
+ }
+ }
+ }
+ Err(e) => {
+ logic
+ .handle_error(Error::MessageRecv(
+ MessageRecvError::AvatarUpdate(
+ AvatarUpdateError::Database(
+ e,
+ ),
+ ),
+ ))
+ .await;
+ }
+ }
+ } else {
+ // delete avatar
+ match logic
+ .db()
+ .delete_user_avatar(from.as_bare())
+ .await
+ {
+ Ok((changed, old_avatar)) => {
+ if changed {
+ if let Some(old_avatar) = old_avatar
+ {
+ if let Err(e) = logic
+ .file_store()
+ .delete(&old_avatar)
+ .await.map_err(|err| AvatarUpdateError::FileStore(err)) {
+ logic.handle_error(MessageRecvError::AvatarUpdate(e).into()).await;
+ }
+ }
+ logic
+ .update_sender()
+ .send(
+ UpdateMessage::AvatarChanged {
+ jid: from.as_bare(),
+ id: None,
+ },
+ )
+ .await;
+ }
+ }
+ Err(e) => {
+ logic
+ .handle_error(Error::MessageRecv(
+ MessageRecvError::AvatarUpdate(
+ AvatarUpdateError::Database(
+ e,
+ ),
+ ),
+ ))
+ .await;
+ }
+ }
+ }
+ // check if the new file is in the file store
+ // if not, retrieve from server and save in the file store (remember to check if the hash matches)
+ // send the avatar update
+ }
+ _ => {}
+ }
+ }
+ }
+ _ => {}
+ }
+ }
+ _ => {}
+ }
+ }
+ // Event::Collection(collection) => todo!(),
+ // Event::Configuration(configuration) => todo!(),
+ // Event::Delete(delete) => todo!(),
+ // Event::Purge(purge) => todo!(),
+ // Event::Subscription(subscription) => todo!(),
+ _ => {} // TODO: catch these catch-alls in some way
+ }
}
Ok(None)
@@ -191,15 +551,15 @@ pub async fn recv_presence(
}
}
-pub async fn recv_iq(
- logic: ClientLogic,
+pub async fn recv_iq<Fs: FileStore + Clone>(
+ logic: ClientLogic<Fs>,
connection: Connected,
iq: Iq,
-) -> Result<Option<UpdateMessage>, IqError> {
+) -> Result<Option<UpdateMessage>, IqProcessError> {
if let Some(to) = &iq.to {
if *to == *connection.jid() {
} else {
- return Err(IqError::IncorrectAddressee(to.clone()));
+ return Err(IqProcessError::Iq(IqError::IncorrectAddressee(to.clone())));
}
}
match iq.r#type {
@@ -210,7 +570,11 @@ pub async fn recv_iq(
.unwrap_or_else(|| connection.server().clone());
let id = iq.id.clone();
debug!("received iq result with id `{}` from {}", id, from);
- logic.pending().respond(Stanza::Iq(iq), id).await?;
+ logic
+ .pending()
+ .respond(Stanza::Iq(iq), id)
+ .await
+ .map_err(|e| Into::<IqError>::into(e))?;
Ok(None)
}
stanza::client::iq::IqType::Get => {
@@ -250,7 +614,11 @@ pub async fn recv_iq(
errors: vec![StanzaError::ItemNotFound.into()],
};
// TODO: log error
- connection.write_handle().write(Stanza::Iq(iq)).await?;
+ connection
+ .write_handle()
+ .write(Stanza::Iq(iq))
+ .await
+ .map_err(|e| Into::<IqError>::into(e))?;
info!("replied to disco#info request from {}", from);
return Ok(None);
}
@@ -266,7 +634,11 @@ pub async fn recv_iq(
errors: vec![StanzaError::ItemNotFound.into()],
};
// TODO: log error
- connection.write_handle().write(Stanza::Iq(iq)).await?;
+ connection
+ .write_handle()
+ .write(Stanza::Iq(iq))
+ .await
+ .map_err(|e| Into::<IqError>::into(e))?;
info!("replied to disco#info request from {}", from);
return Ok(None);
}
@@ -281,7 +653,11 @@ pub async fn recv_iq(
query: Some(iq::Query::DiscoInfo(disco)),
errors: vec![],
};
- connection.write_handle().write(Stanza::Iq(iq)).await?;
+ connection
+ .write_handle()
+ .write(Stanza::Iq(iq))
+ .await
+ .map_err(|e| Into::<IqError>::into(e))?;
info!("replied to disco#info request from {}", from);
Ok(None)
}
@@ -296,7 +672,11 @@ pub async fn recv_iq(
query: None,
errors: vec![StanzaError::ServiceUnavailable.into()],
};
- connection.write_handle().write(Stanza::Iq(iq)).await?;
+ connection
+ .write_handle()
+ .write(Stanza::Iq(iq))
+ .await
+ .map_err(|e| Into::<IqError>::into(e))?;
warn!("replied to unsupported iq get from {}", from);
Ok(None)
} // stanza::client::iq::Query::Bind(bind) => todo!(),
@@ -316,7 +696,11 @@ pub async fn recv_iq(
query: None,
errors: vec![StanzaError::BadRequest.into()],
};
- connection.write_handle().write(Stanza::Iq(iq)).await?;
+ connection
+ .write_handle()
+ .write(Stanza::Iq(iq))
+ .await
+ .map_err(|e| Into::<IqError>::into(e))?;
info!("replied to malformed iq query from {}", from);
Ok(None)
}
@@ -367,7 +751,12 @@ pub async fn recv_iq(
.handle_error(RosterError::PushReply(e.into()).into())
.await;
}
- Ok(Some(UpdateMessage::RosterUpdate(contact)))
+ let user = logic
+ .db()
+ .read_user(contact.user_jid.clone())
+ .await
+ .map_err(|e| Into::<RosterError>::into(e))?;
+ Ok(Some(UpdateMessage::RosterUpdate(contact, user)))
}
}
} else {
@@ -381,7 +770,11 @@ pub async fn recv_iq(
query: None,
errors: vec![StanzaError::NotAcceptable.into()],
};
- connection.write_handle().write(Stanza::Iq(iq)).await?;
+ connection
+ .write_handle()
+ .write(Stanza::Iq(iq))
+ .await
+ .map_err(|e| Into::<IqError>::into(e))?;
Ok(None)
}
}
@@ -397,7 +790,11 @@ pub async fn recv_iq(
query: None,
errors: vec![StanzaError::ServiceUnavailable.into()],
};
- connection.write_handle().write(Stanza::Iq(iq)).await?;
+ connection
+ .write_handle()
+ .write(Stanza::Iq(iq))
+ .await
+ .map_err(|e| Into::<IqError>::into(e))?;
warn!("replied to unsupported iq set from {}", from);
Ok(None)
}
@@ -413,18 +810,22 @@ pub async fn recv_iq(
query: None,
errors: vec![StanzaError::NotAcceptable.into()],
};
- connection.write_handle().write(Stanza::Iq(iq)).await?;
+ connection
+ .write_handle()
+ .write(Stanza::Iq(iq))
+ .await
+ .map_err(|e| Into::<IqError>::into(e))?;
Ok(None)
}
}
}
}
-pub async fn process_stanza(
- logic: ClientLogic,
+pub async fn process_stanza<Fs: FileStore + Clone>(
+ logic: ClientLogic<Fs>,
stanza: Stanza,
connection: Connected,
-) -> Result<Option<UpdateMessage>, Error> {
+) -> Result<Option<UpdateMessage>, Error<Fs>> {
let update = match stanza {
Stanza::Message(stanza_message) => Ok(recv_message(logic, stanza_message).await?),
Stanza::Presence(presence) => Ok(recv_presence(presence).await?),