use std::str::FromStr; use base64::{Engine, prelude::BASE64_STANDARD}; use chrono::Utc; use lampada::{Connected, SupervisorSender}; use sha1::{Digest, Sha1}; use stanza::{ client::{ Stanza, iq::{self, Iq, IqType}, }, stanza_error::Error as StanzaError, xep_0030::{self, info}, xep_0060::event::{Content, Event, ItemsType}, }; use tracing::{debug, error, info, warn}; use uuid::Uuid; use crate::{ UpdateMessage, caps, chat::{Body, Message}, error::{ AvatarUpdateError, DatabaseError, Error, IqError, MessageRecvError, PresenceError, RosterError, }, files::FileStore, presence::{Offline, Online, Presence, PresenceType, Show}, roster::Contact, }; use super::ClientLogic; pub async fn handle_stanza( logic: ClientLogic, stanza: Stanza, connection: Connected, ) { let result = process_stanza(logic.clone(), stanza, connection).await; match result { Ok(u) => match u { _ => { if let Some(u) = u { logic.handle_update(u).await } } }, Err(e) => logic.handle_error(e).await, } } pub async fn recv_message( logic: ClientLogic, stanza_message: stanza::client::message::Message, ) -> Result, MessageRecvError> { if let Some(from) = stanza_message.from { // TODO: don't ignore delay from. xep says SHOULD send error if incorrect. let timestamp = stanza_message .delay .map(|delay| delay.stamp) .unwrap_or_else(|| Utc::now()); // TODO: group chat messages // body MUST be before user changes in order to avoid race condition where you e.g. get a nick update before the user is in the client state. // if there is a body, should create chat message if let Some(body) = stanza_message.body { let message = Message { id: stanza_message .id // TODO: proper id xep .map(|id| Uuid::from_str(&id).unwrap_or_else(|_| Uuid::new_v4())) .unwrap_or_else(|| Uuid::new_v4()), from: from.as_bare(), timestamp, body: Body { body: body.body.unwrap_or_default(), }, delivery: None, }; // save the message to the database match logic.db().upsert_chat_and_user(&from).await { Ok(_) => { if let Err(e) = logic .db() .create_message_with_user_resource( message.clone(), from.clone(), from.clone(), ) .await { logic .handle_error(Error::MessageRecv(MessageRecvError::MessageHistory(e))) .await; error!("failed to upsert chat and user") } } Err(e) => { logic .handle_error(Error::MessageRecv(MessageRecvError::MessageHistory(e))) .await; error!("failed to upsert chat and user") } }; // update the client with the new message logic .update_sender() .send(UpdateMessage::Message { to: from.as_bare(), message, }) .await; } if let Some(nick) = stanza_message.nick { let nick = nick.0; if nick.is_empty() { match logic.db().delete_user_nick(from.as_bare()).await { Ok(changed) => { if changed { logic .update_sender() .send(UpdateMessage::NickChanged { jid: from.as_bare(), nick: None, }) .await; } } Err(e) => { logic .handle_error(Error::MessageRecv(MessageRecvError::NickUpdate(e))) .await; // if failed, send user update anyway logic .update_sender() .send(UpdateMessage::NickChanged { jid: from.as_bare(), nick: None, }) .await; } } } else { match logic .db() .upsert_user_nick(from.as_bare(), nick.clone()) .await { Ok(changed) => { if changed { logic .update_sender() .send(UpdateMessage::NickChanged { jid: from.as_bare(), nick: Some(nick), }) .await; } } Err(e) => { logic .handle_error(Error::MessageRecv(MessageRecvError::NickUpdate(e))) .await; // if failed, send user update anyway logic .update_sender() .send(UpdateMessage::NickChanged { jid: from.as_bare(), nick: Some(nick), }) .await; } } } } if let Some(event) = stanza_message.event { match event { Event::Items(items) => { match items.node.as_str() { "http://jabber.org/protocol/nick" => match items.items { ItemsType::Item(items) => { if let Some(item) = items.first() { match &item.item { Some(c) => match c { Content::Nick(nick) => { let nick = nick.0.clone(); if nick.is_empty() { match logic .db() .delete_user_nick(from.as_bare()) .await { Ok(changed) => { if changed { logic .update_sender() .send(UpdateMessage::NickChanged { jid: from.as_bare(), nick: None, }) .await; } } Err(e) => { logic .handle_error(Error::MessageRecv( MessageRecvError::NickUpdate(e), )) .await; // if failed, send user update anyway logic .update_sender() .send(UpdateMessage::NickChanged { jid: from.as_bare(), nick: None, }) .await; } } } else { match logic .db() .upsert_user_nick( from.as_bare(), nick.clone(), ) .await { Ok(changed) => { if changed { logic .update_sender() .send(UpdateMessage::NickChanged { jid: from.as_bare(), nick: Some(nick), }) .await; } } Err(e) => { logic .handle_error(Error::MessageRecv( MessageRecvError::NickUpdate(e), )) .await; // if failed, send user update anyway logic .update_sender() .send(UpdateMessage::NickChanged { jid: from.as_bare(), nick: Some(nick), }) .await; } } } } _ => {} }, None => {} } } } _ => {} }, "urn:xmpp:avatar:metadata" => { match items.items { ItemsType::Item(items) => { if let Some(item) = items.first() { debug!("found item"); match &item.item { Some(Content::AvatarMetadata(metadata)) => { debug!("found metadata"); // check if user avatar has been deleted if let Some(metadata) = metadata .info .iter() .find(|info| info.url.is_none()) { debug!("checking if user avatar has changed"); // check if user avatar has changed match logic .db() .upsert_user_avatar( from.as_bare(), metadata.id.clone(), ) .await { Ok((changed, old_avatar)) => { if changed { if let Some(old_avatar) = old_avatar { if let Err(e) = logic .file_store() .delete(&old_avatar) .await.map_err(|err| AvatarUpdateError::FileStore(err)) { logic.handle_error(MessageRecvError::AvatarUpdate(e).into()).await; } } } match logic .file_store() .is_stored(&metadata.id) .await .map_err(|err| { AvatarUpdateError::::FileStore( err, ) }) { Ok(false) => { // get data let pep_item = logic.client().get_pep_item(Some(from.as_bare()), "urn:xmpp:avatar:data".to_string(), metadata.id.clone()).await.map_err(|err| Into::>::into(err))?; match pep_item { crate::pep::Item::AvatarData(data) => { let data = data.map(|data| data.data_b64).unwrap_or_default().replace("\n", ""); // TODO: these should all be in a separate avatarupdate function debug!("got avatar data"); match BASE64_STANDARD.decode(data) { Ok(data) => { let mut hasher = Sha1::new(); hasher.update(&data); let received_data_hash = hex::encode(hasher.finalize()); debug!("received_data_hash: {}, metadata_id: {}", received_data_hash, metadata.id); if received_data_hash.to_lowercase() == metadata.id.to_lowercase() { if let Err(e) = logic.file_store().store(&received_data_hash, &data).await { logic.handle_error(Error::MessageRecv(MessageRecvError::AvatarUpdate(AvatarUpdateError::FileStore(e)))).await; } if changed { logic .update_sender() .send( UpdateMessage::AvatarChanged { jid: from.as_bare(), id: Some( metadata.id.clone(), ), }, ) .await; } } }, Err(e) => { logic.handle_error(Error::MessageRecv(MessageRecvError::AvatarUpdate(AvatarUpdateError::Base64(e)))).await; }, } }, _ => { logic.handle_error(Error::MessageRecv(MessageRecvError::AvatarUpdate(AvatarUpdateError::MissingData))).await; } } } Ok(true) => { // just send the update if changed { logic .update_sender() .send( UpdateMessage::AvatarChanged { jid: from.as_bare(), id: Some( metadata.id.clone(), ), }, ) .await; } } Err(e) => { logic.handle_error(Error::MessageRecv(MessageRecvError::AvatarUpdate(e))).await; } } } Err(e) => { logic .handle_error(Error::MessageRecv( MessageRecvError::AvatarUpdate( AvatarUpdateError::Database( e, ), ), )) .await; } } } else { // delete avatar match logic .db() .delete_user_avatar(from.as_bare()) .await { Ok((changed, old_avatar)) => { if changed { if let Some(old_avatar) = old_avatar { if let Err(e) = logic .file_store() .delete(&old_avatar) .await.map_err(|err| AvatarUpdateError::FileStore(err)) { logic.handle_error(MessageRecvError::AvatarUpdate(e).into()).await; } } logic .update_sender() .send( UpdateMessage::AvatarChanged { jid: from.as_bare(), id: None, }, ) .await; } } Err(e) => { logic .handle_error(Error::MessageRecv( MessageRecvError::AvatarUpdate( AvatarUpdateError::Database( e, ), ), )) .await; } } } // check if the new file is in the file store // if not, retrieve from server and save in the file store (remember to check if the hash matches) // send the avatar update } _ => {} } } } _ => {} } } _ => {} } } // Event::Collection(collection) => todo!(), // Event::Configuration(configuration) => todo!(), // Event::Delete(delete) => todo!(), // Event::Purge(purge) => todo!(), // Event::Subscription(subscription) => todo!(), _ => {} // TODO: catch these catch-alls in some way } } Ok(None) // TODO: can this be more efficient? } else { Err(MessageRecvError::MissingFrom) } } pub async fn recv_presence( presence: stanza::client::presence::Presence, ) -> Result, PresenceError> { if let Some(from) = presence.from { match presence.r#type { Some(r#type) => match r#type { // error processing a presence from somebody stanza::client::presence::PresenceType::Error => { // TODO: is there any other information that should go with the error? also MUST have an error, otherwise it's a different error. maybe it shoulnd't be an option. // TODO: ughhhhhhhhhhhhh these stanza errors should probably just have an option, and custom display Err(PresenceError::StanzaError( presence .errors .first() .cloned() .expect("error MUST have error"), )) } // should not happen (error to server) stanza::client::presence::PresenceType::Probe => { // TODO: should probably write an error and restart stream Err(PresenceError::Unsupported) } stanza::client::presence::PresenceType::Subscribe => { // may get a subscription request from somebody who is not a contact!!! therefore should be its own kind of event Ok(Some(UpdateMessage::SubscriptionRequest(from))) } stanza::client::presence::PresenceType::Unavailable => { let offline = Offline { status: presence.status.map(|status| status.status.0), }; let timestamp = presence .delay .map(|delay| delay.stamp) .unwrap_or_else(|| Utc::now()); Ok(Some(UpdateMessage::Presence { from, presence: Presence { timestamp, presence: PresenceType::Offline(offline), }, })) } // for now, do nothing, as these are simply informational. will receive roster push from the server regarding the changes to do with them. stanza::client::presence::PresenceType::Subscribed => Ok(None), stanza::client::presence::PresenceType::Unsubscribe => Ok(None), stanza::client::presence::PresenceType::Unsubscribed => Ok(None), }, None => { let online = Online { show: presence.show.map(|show| match show { stanza::client::presence::Show::Away => Show::Away, stanza::client::presence::Show::Chat => Show::Chat, stanza::client::presence::Show::Dnd => Show::DoNotDisturb, stanza::client::presence::Show::Xa => Show::ExtendedAway, }), status: presence.status.map(|status| status.status.0), priority: presence.priority.map(|priority| priority.0), }; let timestamp = presence .delay .map(|delay| delay.stamp) .unwrap_or_else(|| Utc::now()); Ok(Some(UpdateMessage::Presence { from, presence: Presence { timestamp, presence: PresenceType::Online(online), }, })) } } } else { Err(PresenceError::MissingFrom) } } pub async fn recv_iq( logic: ClientLogic, connection: Connected, iq: Iq, ) -> Result, IqError> { if let Some(to) = &iq.to { if *to == *connection.jid() { } else { return Err(IqError::IncorrectAddressee(to.clone())); } } match iq.r#type { stanza::client::iq::IqType::Error | stanza::client::iq::IqType::Result => { let from = iq .from .clone() .unwrap_or_else(|| connection.server().clone()); let id = iq.id.clone(); debug!("received iq result with id `{}` from {}", id, from); logic.pending().respond(Stanza::Iq(iq), id).await?; Ok(None) } stanza::client::iq::IqType::Get => { let from = iq .from .clone() .unwrap_or_else(|| connection.server().clone()); if let Some(query) = iq.query { match query { stanza::client::iq::Query::DiscoInfo(query) => { info!("received disco#info request from {}", from); let current_caps_node = caps::caps_node(); let disco: info::Query = if query.node.is_none() || query.node == Some(current_caps_node) { let mut info = caps::client_info(); info.node = query.node; info.into() } else { match logic .db() .read_capabilities(&query.node.clone().unwrap()) .await { Ok(c) => match caps::decode_info_base64(c) { Ok(mut i) => { i.node = query.node; i.into() } Err(_e) => { let iq = Iq { from: Some(connection.jid().clone()), id: iq.id, to: iq.from, r#type: IqType::Error, lang: None, query: Some(iq::Query::DiscoInfo(query)), errors: vec![StanzaError::ItemNotFound.into()], }; // TODO: log error connection.write_handle().write(Stanza::Iq(iq)).await?; info!("replied to disco#info request from {}", from); return Ok(None); } }, Err(_e) => { let iq = Iq { from: Some(connection.jid().clone()), id: iq.id, to: iq.from, r#type: IqType::Error, lang: None, query: Some(iq::Query::DiscoInfo(query)), errors: vec![StanzaError::ItemNotFound.into()], }; // TODO: log error connection.write_handle().write(Stanza::Iq(iq)).await?; info!("replied to disco#info request from {}", from); return Ok(None); } } }; let iq = Iq { from: Some(connection.jid().clone()), id: iq.id, to: iq.from, r#type: IqType::Result, lang: None, query: Some(iq::Query::DiscoInfo(disco)), errors: vec![], }; connection.write_handle().write(Stanza::Iq(iq)).await?; info!("replied to disco#info request from {}", from); Ok(None) } _ => { warn!("received unsupported iq get from {}", from); let iq = Iq { from: Some(connection.jid().clone()), id: iq.id, to: iq.from, r#type: IqType::Error, lang: None, query: None, errors: vec![StanzaError::ServiceUnavailable.into()], }; connection.write_handle().write(Stanza::Iq(iq)).await?; warn!("replied to unsupported iq get from {}", from); Ok(None) } // stanza::client::iq::Query::Bind(bind) => todo!(), // stanza::client::iq::Query::DiscoItems(query) => todo!(), // stanza::client::iq::Query::Ping(ping) => todo!(), // stanza::client::iq::Query::Roster(query) => todo!(), // stanza::client::iq::Query::Unsupported => todo!(), } } else { info!("received malformed iq query from {}", from); let iq = Iq { from: Some(connection.jid().clone()), id: iq.id, to: iq.from, r#type: IqType::Error, lang: None, query: None, errors: vec![StanzaError::BadRequest.into()], }; connection.write_handle().write(Stanza::Iq(iq)).await?; info!("replied to malformed iq query from {}", from); Ok(None) } } stanza::client::iq::IqType::Set => { let from = iq .from .clone() .unwrap_or_else(|| connection.server().clone()); if let Some(query) = iq.query { match query { stanza::client::iq::Query::Roster(mut query) => { // TODO: should only have one, otherwise send error // if let Some(item) = query.items.pop() && query.items.len() == 1 { if let Some(item) = query.items.pop() { match item.subscription { Some(stanza::roster::Subscription::Remove) => { if let Err(e) = logic.db().delete_contact(item.jid.clone()).await { logic .handle_error(RosterError::Cache(e.into()).into()) .await; } Ok(Some(UpdateMessage::RosterDelete(item.jid))) } _ => { let contact: Contact = item.into(); if let Err(e) = logic.db().upsert_contact(contact.clone()).await { logic .handle_error(RosterError::Cache(e.into()).into()) .await; } let iq = Iq { from: Some(connection.jid().clone()), id: iq.id, to: iq.from, r#type: IqType::Result, lang: None, query: None, errors: vec![], }; if let Err(e) = connection.write_handle().write(Stanza::Iq(iq)).await { logic .handle_error(RosterError::PushReply(e.into()).into()) .await; } Ok(Some(UpdateMessage::RosterUpdate(contact))) } } } else { warn!("received malformed roster push"); let iq = Iq { from: Some(connection.jid().clone()), id: iq.id, to: iq.from, r#type: IqType::Error, lang: None, query: None, errors: vec![StanzaError::NotAcceptable.into()], }; connection.write_handle().write(Stanza::Iq(iq)).await?; Ok(None) } } // TODO: send unsupported to server _ => { warn!("received unsupported iq set from {}", from); let iq = Iq { from: Some(connection.jid().clone()), id: iq.id, to: iq.from, r#type: IqType::Error, lang: None, query: None, errors: vec![StanzaError::ServiceUnavailable.into()], }; connection.write_handle().write(Stanza::Iq(iq)).await?; warn!("replied to unsupported iq set from {}", from); Ok(None) } } } else { warn!("received malformed iq set from {}", from); let iq = Iq { from: Some(connection.jid().clone()), id: iq.id, to: iq.from, r#type: IqType::Error, lang: None, query: None, errors: vec![StanzaError::NotAcceptable.into()], }; connection.write_handle().write(Stanza::Iq(iq)).await?; Ok(None) } } } } pub async fn process_stanza( logic: ClientLogic, stanza: Stanza, connection: Connected, ) -> Result, Error> { let update = match stanza { Stanza::Message(stanza_message) => Ok(recv_message(logic, stanza_message).await?), Stanza::Presence(presence) => Ok(recv_presence(presence).await?), Stanza::Iq(iq) => Ok(recv_iq(logic, connection.clone(), iq).await?), // unreachable, always caught by lampada // TODO: make cleaner than this in some way Stanza::Error(error) => { unreachable!() } // should this cause a stream restart? Stanza::OtherContent(content) => { Err(Error::UnrecognizedContent) // TODO: send error to write_thread } }; update }