use std::str::FromStr;
use base64::{Engine, prelude::BASE64_STANDARD};
use chrono::Utc;
use lampada::{Connected, SupervisorSender};
use sha1::{Digest, Sha1};
use stanza::{
client::{
Stanza,
iq::{self, Iq, IqType},
},
stanza_error::Error as StanzaError,
xep_0030::{self, info},
xep_0060::event::{Content, Event, ItemsType},
};
use tracing::{debug, error, info, warn};
use uuid::Uuid;
use crate::{
UpdateMessage, caps,
chat::{Body, Message},
error::{
AvatarUpdateError, DatabaseError, Error, IqError, IqProcessError, MessageRecvError,
PresenceError, RosterError,
},
files::FileStore,
presence::{Offline, Online, Presence, PresenceType, Show},
roster::Contact,
user::User,
};
use super::ClientLogic;
pub async fn handle_stanza<Fs: FileStore + Clone>(
logic: ClientLogic<Fs>,
stanza: Stanza,
connection: Connected,
) {
let result = process_stanza(logic.clone(), stanza, connection).await;
match result {
Ok(u) => match u {
_ => {
if let Some(u) = u {
logic.handle_update(u).await
}
}
},
Err(e) => logic.handle_error(e).await,
}
}
pub async fn recv_message<Fs: FileStore + Clone>(
logic: ClientLogic<Fs>,
stanza_message: stanza::client::message::Message,
) -> Result<Option<UpdateMessage>, MessageRecvError<Fs>> {
if let Some(from) = stanza_message.from {
// TODO: don't ignore delay from. xep says SHOULD send error if incorrect.
let timestamp = stanza_message
.delay
.map(|delay| delay.stamp)
.unwrap_or_else(|| Utc::now());
// TODO: group chat messages
// body MUST be before user changes in order to avoid race condition where you e.g. get a nick update before the user is in the client state.
// if there is a body, should create chat message
if let Some(body) = stanza_message.body {
let message = Message {
id: stanza_message
.id
// TODO: proper id xep
.map(|id| Uuid::from_str(&id).unwrap_or_else(|_| Uuid::new_v4()))
.unwrap_or_else(|| Uuid::new_v4()),
from: from.as_bare(),
timestamp,
body: Body {
body: body.body.unwrap_or_default(),
},
delivery: None,
};
// save the message to the database
match logic.db().upsert_chat_and_user(&from).await {
Ok(_) => {
if let Err(e) = logic
.db()
.create_message_with_user_resource(
message.clone(),
from.clone(),
from.clone(),
)
.await
{
logic
.handle_error(Error::MessageRecv(MessageRecvError::MessageHistory(e)))
.await;
error!("failed to upsert chat and user")
}
}
Err(e) => {
logic
.handle_error(Error::MessageRecv(MessageRecvError::MessageHistory(e)))
.await;
error!("failed to upsert chat and user")
}
};
let from_user = match logic.db().read_user(from.as_bare()).await {
Ok(u) => u,
Err(e) => {
error!("{}", e);
User {
jid: from.as_bare(),
nick: None,
avatar: None,
}
}
};
// update the client with the new message
logic
.update_sender()
.send(UpdateMessage::Message {
to: from.as_bare(),
from: from_user,
message,
})
.await;
}
if let Some(nick) = stanza_message.nick {
let nick = nick.0;
if nick.is_empty() {
match logic.db().delete_user_nick(from.as_bare()).await {
Ok(changed) => {
if changed {
logic
.update_sender()
.send(UpdateMessage::NickChanged {
jid: from.as_bare(),
nick: None,
})
.await;
}
}
Err(e) => {
logic
.handle_error(Error::MessageRecv(MessageRecvError::NickUpdate(e)))
.await;
// if failed, send user update anyway
logic
.update_sender()
.send(UpdateMessage::NickChanged {
jid: from.as_bare(),
nick: None,
})
.await;
}
}
} else {
match logic
.db()
.upsert_user_nick(from.as_bare(), nick.clone())
.await
{
Ok(changed) => {
if changed {
logic
.update_sender()
.send(UpdateMessage::NickChanged {
jid: from.as_bare(),
nick: Some(nick),
})
.await;
}
}
Err(e) => {
logic
.handle_error(Error::MessageRecv(MessageRecvError::NickUpdate(e)))
.await;
// if failed, send user update anyway
logic
.update_sender()
.send(UpdateMessage::NickChanged {
jid: from.as_bare(),
nick: Some(nick),
})
.await;
}
}
}
}
if let Some(event) = stanza_message.event {
match event {
Event::Items(items) => {
match items.node.as_str() {
"http://jabber.org/protocol/nick" => match items.items {
ItemsType::Item(items) => {
if let Some(item) = items.first() {
match &item.item {
Some(c) => match c {
Content::Nick(nick) => {
let nick = nick.0.clone();
if nick.is_empty() {
match logic
.db()
.delete_user_nick(from.as_bare())
.await
{
Ok(changed) => {
if changed {
logic
.update_sender()
.send(UpdateMessage::NickChanged {
jid: from.as_bare(),
nick: None,
})
.await;
}
}
Err(e) => {
logic
.handle_error(Error::MessageRecv(
MessageRecvError::NickUpdate(e),
))
.await;
// if failed, send user update anyway
logic
.update_sender()
.send(UpdateMessage::NickChanged {
jid: from.as_bare(),
nick: None,
})
.await;
}
}
} else {
match logic
.db()
.upsert_user_nick(
from.as_bare(),
nick.clone(),
)
.await
{
Ok(changed) => {
if changed {
logic
.update_sender()
.send(UpdateMessage::NickChanged {
jid: from.as_bare(),
nick: Some(nick),
})
.await;
}
}
Err(e) => {
logic
.handle_error(Error::MessageRecv(
MessageRecvError::NickUpdate(e),
))
.await;
// if failed, send user update anyway
logic
.update_sender()
.send(UpdateMessage::NickChanged {
jid: from.as_bare(),
nick: Some(nick),
})
.await;
}
}
}
}
_ => {}
},
None => {}
}
}
}
_ => {}
},
"urn:xmpp:avatar:metadata" => {
match items.items {
ItemsType::Item(items) => {
if let Some(item) = items.first() {
debug!("found item");
match &item.item {
Some(Content::AvatarMetadata(metadata)) => {
debug!("found metadata");
// check if user avatar has been deleted
if let Some(metadata) = metadata
.info
.iter()
.find(|info| info.url.is_none())
{
debug!("checking if user avatar has changed");
// check if user avatar has changed
match logic
.db()
.upsert_user_avatar(
from.as_bare(),
metadata.id.clone(),
)
.await
{
Ok((changed, old_avatar)) => {
if changed {
if let Some(old_avatar) = old_avatar
{
if let Err(e) = logic
.file_store()
.delete(&old_avatar)
.await.map_err(|err| AvatarUpdateError::FileStore(err)) {
logic.handle_error(MessageRecvError::AvatarUpdate(e).into()).await;
}
}
}
match logic
.file_store()
.is_stored(&metadata.id)
.await
.map_err(|err| {
AvatarUpdateError::<Fs>::FileStore(
err,
)
}) {
Ok(false) => {
// get data
let pep_item = logic.client().get_pep_item(Some(from.as_bare()), "urn:xmpp:avatar:data".to_string(), metadata.id.clone()).await.map_err(|err| Into::<AvatarUpdateError<Fs>>::into(err))?;
match pep_item {
crate::pep::Item::AvatarData(data) => {
let data = data.map(|data| data.data_b64).unwrap_or_default().replace("\n", "");
// TODO: these should all be in a separate avatarupdate function
debug!("got avatar data");
match BASE64_STANDARD.decode(data) {
Ok(data) => {
let mut hasher = Sha1::new();
hasher.update(&data);
let received_data_hash = hex::encode(hasher.finalize());
debug!("received_data_hash: {}, metadata_id: {}", received_data_hash, metadata.id);
if received_data_hash.to_lowercase() == metadata.id.to_lowercase() {
if let Err(e) = logic.file_store().store(&received_data_hash, &data).await {
logic.handle_error(Error::MessageRecv(MessageRecvError::AvatarUpdate(AvatarUpdateError::FileStore(e)))).await;
}
if changed {
logic
.update_sender()
.send(
UpdateMessage::AvatarChanged {
jid: from.as_bare(),
id: Some(
metadata.id.clone(),
),
},
)
.await;
}
}
},
Err(e) => {
logic.handle_error(Error::MessageRecv(MessageRecvError::AvatarUpdate(AvatarUpdateError::Base64(e)))).await;
},
}
},
_ => {
logic.handle_error(Error::MessageRecv(MessageRecvError::AvatarUpdate(AvatarUpdateError::MissingData))).await;
}
}
}
Ok(true) => {
// just send the update
if changed {
logic
.update_sender()
.send(
UpdateMessage::AvatarChanged {
jid: from.as_bare(),
id: Some(
metadata.id.clone(),
),
},
)
.await;
}
}
Err(e) => {
logic.handle_error(Error::MessageRecv(MessageRecvError::AvatarUpdate(e))).await;
}
}
}
Err(e) => {
logic
.handle_error(Error::MessageRecv(
MessageRecvError::AvatarUpdate(
AvatarUpdateError::Database(
e,
),
),
))
.await;
}
}
} else {
// delete avatar
match logic
.db()
.delete_user_avatar(from.as_bare())
.await
{
Ok((changed, old_avatar)) => {
if changed {
if let Some(old_avatar) = old_avatar
{
if let Err(e) = logic
.file_store()
.delete(&old_avatar)
.await.map_err(|err| AvatarUpdateError::FileStore(err)) {
logic.handle_error(MessageRecvError::AvatarUpdate(e).into()).await;
}
}
logic
.update_sender()
.send(
UpdateMessage::AvatarChanged {
jid: from.as_bare(),
id: None,
},
)
.await;
}
}
Err(e) => {
logic
.handle_error(Error::MessageRecv(
MessageRecvError::AvatarUpdate(
AvatarUpdateError::Database(
e,
),
),
))
.await;
}
}
}
// check if the new file is in the file store
// if not, retrieve from server and save in the file store (remember to check if the hash matches)
// send the avatar update
}
_ => {}
}
}
}
_ => {}
}
}
_ => {}
}
}
// Event::Collection(collection) => todo!(),
// Event::Configuration(configuration) => todo!(),
// Event::Delete(delete) => todo!(),
// Event::Purge(purge) => todo!(),
// Event::Subscription(subscription) => todo!(),
_ => {} // TODO: catch these catch-alls in some way
}
}
Ok(None)
// TODO: can this be more efficient?
} else {
Err(MessageRecvError::MissingFrom)
}
}
pub async fn recv_presence(
presence: stanza::client::presence::Presence,
) -> Result<Option<UpdateMessage>, PresenceError> {
if let Some(from) = presence.from {
match presence.r#type {
Some(r#type) => match r#type {
// error processing a presence from somebody
stanza::client::presence::PresenceType::Error => {
// TODO: is there any other information that should go with the error? also MUST have an error, otherwise it's a different error. maybe it shoulnd't be an option.
// TODO: ughhhhhhhhhhhhh these stanza errors should probably just have an option, and custom display
Err(PresenceError::StanzaError(
presence
.errors
.first()
.cloned()
.expect("error MUST have error"),
))
}
// should not happen (error to server)
stanza::client::presence::PresenceType::Probe => {
// TODO: should probably write an error and restart stream
Err(PresenceError::Unsupported)
}
stanza::client::presence::PresenceType::Subscribe => {
// may get a subscription request from somebody who is not a contact!!! therefore should be its own kind of event
Ok(Some(UpdateMessage::SubscriptionRequest(from)))
}
stanza::client::presence::PresenceType::Unavailable => {
let offline = Offline {
status: presence.status.map(|status| status.status.0),
};
let timestamp = presence
.delay
.map(|delay| delay.stamp)
.unwrap_or_else(|| Utc::now());
Ok(Some(UpdateMessage::Presence {
from,
presence: Presence {
timestamp,
presence: PresenceType::Offline(offline),
},
}))
}
// for now, do nothing, as these are simply informational. will receive roster push from the server regarding the changes to do with them.
stanza::client::presence::PresenceType::Subscribed => Ok(None),
stanza::client::presence::PresenceType::Unsubscribe => Ok(None),
stanza::client::presence::PresenceType::Unsubscribed => Ok(None),
},
None => {
let online = Online {
show: presence.show.map(|show| match show {
stanza::client::presence::Show::Away => Show::Away,
stanza::client::presence::Show::Chat => Show::Chat,
stanza::client::presence::Show::Dnd => Show::DoNotDisturb,
stanza::client::presence::Show::Xa => Show::ExtendedAway,
}),
status: presence.status.map(|status| status.status.0),
priority: presence.priority.map(|priority| priority.0),
};
let timestamp = presence
.delay
.map(|delay| delay.stamp)
.unwrap_or_else(|| Utc::now());
Ok(Some(UpdateMessage::Presence {
from,
presence: Presence {
timestamp,
presence: PresenceType::Online(online),
},
}))
}
}
} else {
Err(PresenceError::MissingFrom)
}
}
pub async fn recv_iq<Fs: FileStore + Clone>(
logic: ClientLogic<Fs>,
connection: Connected,
iq: Iq,
) -> Result<Option<UpdateMessage>, IqProcessError> {
if let Some(to) = &iq.to {
if *to == *connection.jid() {
} else {
return Err(IqProcessError::Iq(IqError::IncorrectAddressee(to.clone())));
}
}
match iq.r#type {
stanza::client::iq::IqType::Error | stanza::client::iq::IqType::Result => {
let from = iq
.from
.clone()
.unwrap_or_else(|| connection.server().clone());
let id = iq.id.clone();
debug!("received iq result with id `{}` from {}", id, from);
logic
.pending()
.respond(Stanza::Iq(iq), id)
.await
.map_err(|e| Into::<IqError>::into(e))?;
Ok(None)
}
stanza::client::iq::IqType::Get => {
let from = iq
.from
.clone()
.unwrap_or_else(|| connection.server().clone());
if let Some(query) = iq.query {
match query {
stanza::client::iq::Query::DiscoInfo(query) => {
info!("received disco#info request from {}", from);
let current_caps_node = caps::caps_node();
let disco: info::Query =
if query.node.is_none() || query.node == Some(current_caps_node) {
let mut info = caps::client_info();
info.node = query.node;
info.into()
} else {
match logic
.db()
.read_capabilities(&query.node.clone().unwrap())
.await
{
Ok(c) => match caps::decode_info_base64(c) {
Ok(mut i) => {
i.node = query.node;
i.into()
}
Err(_e) => {
let iq = Iq {
from: Some(connection.jid().clone()),
id: iq.id,
to: iq.from,
r#type: IqType::Error,
lang: None,
query: Some(iq::Query::DiscoInfo(query)),
errors: vec![StanzaError::ItemNotFound.into()],
};
// TODO: log error
connection
.write_handle()
.write(Stanza::Iq(iq))
.await
.map_err(|e| Into::<IqError>::into(e))?;
info!("replied to disco#info request from {}", from);
return Ok(None);
}
},
Err(_e) => {
let iq = Iq {
from: Some(connection.jid().clone()),
id: iq.id,
to: iq.from,
r#type: IqType::Error,
lang: None,
query: Some(iq::Query::DiscoInfo(query)),
errors: vec![StanzaError::ItemNotFound.into()],
};
// TODO: log error
connection
.write_handle()
.write(Stanza::Iq(iq))
.await
.map_err(|e| Into::<IqError>::into(e))?;
info!("replied to disco#info request from {}", from);
return Ok(None);
}
}
};
let iq = Iq {
from: Some(connection.jid().clone()),
id: iq.id,
to: iq.from,
r#type: IqType::Result,
lang: None,
query: Some(iq::Query::DiscoInfo(disco)),
errors: vec![],
};
connection
.write_handle()
.write(Stanza::Iq(iq))
.await
.map_err(|e| Into::<IqError>::into(e))?;
info!("replied to disco#info request from {}", from);
Ok(None)
}
_ => {
warn!("received unsupported iq get from {}", from);
let iq = Iq {
from: Some(connection.jid().clone()),
id: iq.id,
to: iq.from,
r#type: IqType::Error,
lang: None,
query: None,
errors: vec![StanzaError::ServiceUnavailable.into()],
};
connection
.write_handle()
.write(Stanza::Iq(iq))
.await
.map_err(|e| Into::<IqError>::into(e))?;
warn!("replied to unsupported iq get from {}", from);
Ok(None)
} // stanza::client::iq::Query::Bind(bind) => todo!(),
// stanza::client::iq::Query::DiscoItems(query) => todo!(),
// stanza::client::iq::Query::Ping(ping) => todo!(),
// stanza::client::iq::Query::Roster(query) => todo!(),
// stanza::client::iq::Query::Unsupported => todo!(),
}
} else {
info!("received malformed iq query from {}", from);
let iq = Iq {
from: Some(connection.jid().clone()),
id: iq.id,
to: iq.from,
r#type: IqType::Error,
lang: None,
query: None,
errors: vec![StanzaError::BadRequest.into()],
};
connection
.write_handle()
.write(Stanza::Iq(iq))
.await
.map_err(|e| Into::<IqError>::into(e))?;
info!("replied to malformed iq query from {}", from);
Ok(None)
}
}
stanza::client::iq::IqType::Set => {
let from = iq
.from
.clone()
.unwrap_or_else(|| connection.server().clone());
if let Some(query) = iq.query {
match query {
stanza::client::iq::Query::Roster(mut query) => {
// TODO: should only have one, otherwise send error
// if let Some(item) = query.items.pop() && query.items.len() == 1 {
if let Some(item) = query.items.pop() {
match item.subscription {
Some(stanza::roster::Subscription::Remove) => {
if let Err(e) =
logic.db().delete_contact(item.jid.clone()).await
{
logic
.handle_error(RosterError::Cache(e.into()).into())
.await;
}
Ok(Some(UpdateMessage::RosterDelete(item.jid)))
}
_ => {
let contact: Contact = item.into();
if let Err(e) = logic.db().upsert_contact(contact.clone()).await
{
logic
.handle_error(RosterError::Cache(e.into()).into())
.await;
}
let iq = Iq {
from: Some(connection.jid().clone()),
id: iq.id,
to: iq.from,
r#type: IqType::Result,
lang: None,
query: None,
errors: vec![],
};
if let Err(e) =
connection.write_handle().write(Stanza::Iq(iq)).await
{
logic
.handle_error(RosterError::PushReply(e.into()).into())
.await;
}
let user = logic
.db()
.read_user(contact.user_jid.clone())
.await
.map_err(|e| Into::<RosterError>::into(e))?;
Ok(Some(UpdateMessage::RosterUpdate(contact, user)))
}
}
} else {
warn!("received malformed roster push");
let iq = Iq {
from: Some(connection.jid().clone()),
id: iq.id,
to: iq.from,
r#type: IqType::Error,
lang: None,
query: None,
errors: vec![StanzaError::NotAcceptable.into()],
};
connection
.write_handle()
.write(Stanza::Iq(iq))
.await
.map_err(|e| Into::<IqError>::into(e))?;
Ok(None)
}
}
// TODO: send unsupported to server
_ => {
warn!("received unsupported iq set from {}", from);
let iq = Iq {
from: Some(connection.jid().clone()),
id: iq.id,
to: iq.from,
r#type: IqType::Error,
lang: None,
query: None,
errors: vec![StanzaError::ServiceUnavailable.into()],
};
connection
.write_handle()
.write(Stanza::Iq(iq))
.await
.map_err(|e| Into::<IqError>::into(e))?;
warn!("replied to unsupported iq set from {}", from);
Ok(None)
}
}
} else {
warn!("received malformed iq set from {}", from);
let iq = Iq {
from: Some(connection.jid().clone()),
id: iq.id,
to: iq.from,
r#type: IqType::Error,
lang: None,
query: None,
errors: vec![StanzaError::NotAcceptable.into()],
};
connection
.write_handle()
.write(Stanza::Iq(iq))
.await
.map_err(|e| Into::<IqError>::into(e))?;
Ok(None)
}
}
}
}
pub async fn process_stanza<Fs: FileStore + Clone>(
logic: ClientLogic<Fs>,
stanza: Stanza,
connection: Connected,
) -> Result<Option<UpdateMessage>, Error<Fs>> {
let update = match stanza {
Stanza::Message(stanza_message) => Ok(recv_message(logic, stanza_message).await?),
Stanza::Presence(presence) => Ok(recv_presence(presence).await?),
Stanza::Iq(iq) => Ok(recv_iq(logic, connection.clone(), iq).await?),
// unreachable, always caught by lampada
// TODO: make cleaner than this in some way
Stanza::Error(error) => {
unreachable!()
}
// should this cause a stream restart?
Stanza::OtherContent(content) => {
Err(Error::UnrecognizedContent)
// TODO: send error to write_thread
}
};
update
}