use std::{io::Cursor, time::Duration}; use base64::{prelude::BASE64_STANDARD, Engine}; use chrono::Utc; use image::ImageReader; use jid::JID; use lampada::{Connected, WriteMessage, error::WriteError}; use sha1::{Digest, Sha1}; use stanza::{ client::{ iq::{self, Iq, IqType, Query}, Stanza }, xep_0030::{info, items}, xep_0060::{self, owner, pubsub::{self, Pubsub}}, xep_0084, xep_0172::{self, Nick}, xep_0203::Delay }; use tokio::sync::oneshot; use tracing::{debug, error, info}; use uuid::Uuid; use crate::{ avatar, chat::{Body, Chat, Delivery, Message}, disco::{Info, Items}, error::{ AvatarPublishError, DatabaseError, DiscoError, Error, IqRequestError, MessageSendError, NickError, PEPError, RosterError, StatusError, SubscribeError }, files::FileStore, pep, presence::{Online, Presence, PresenceType}, roster::{Contact, ContactUpdate}, user::User, Command, UpdateMessage }; use super::{ local_only::{ handle_delete_chat, handle_delete_messaage, handle_get_chat, handle_get_chats, handle_get_chats_ordered, handle_get_chats_ordered_with_latest_messages, handle_get_chats_ordered_with_latest_messages_and_users, handle_get_messages, handle_get_messages_with_users, handle_get_user }, ClientLogic }; pub async fn handle_online(logic: ClientLogic, command: Command, connection: Connected) { let result = handle_online_result(&logic, command, connection).await; match result { Ok(_) => {} Err(e) => logic.handle_error(e).await, } } pub async fn handle_get_roster( logic: &ClientLogic, connection: Connected, ) -> Result, RosterError> { let iq_id = Uuid::new_v4().to_string(); let stanza = Stanza::Iq(Iq { from: Some(connection.jid().clone()), id: iq_id.to_string(), to: None, r#type: IqType::Get, lang: None, query: Some(iq::Query::Roster(stanza::roster::Query { ver: None, items: Vec::new(), })), errors: Vec::new(), }); let response = logic .pending() .request(&connection, stanza, iq_id.clone()) .await?; // TODO: timeout match response { Stanza::Iq(Iq { from: _, id, to: _, r#type, lang: _, query: Some(iq::Query::Roster(stanza::roster::Query { ver: _, items })), errors: _, }) if id == iq_id && r#type == IqType::Result => { let contacts: Vec = items.into_iter().map(|item| item.into()).collect(); if let Err(e) = logic.db().replace_cached_roster(contacts.clone()).await { logic .handle_error(Error::Roster(RosterError::Cache(e.into()))) .await; }; Ok(contacts) } ref s @ Stanza::Iq(Iq { from: _, ref id, to: _, r#type, lang: _, query: _, ref errors, }) if *id == iq_id && r#type == IqType::Error => { if let Some(error) = errors.first() { Err(RosterError::StanzaError(error.clone())) } else { Err(RosterError::UnexpectedStanza(s.clone())) } } s => Err(RosterError::UnexpectedStanza(s)), } } // this can't query the client... otherwise there is a hold-up and the connection can't complete pub async fn handle_get_roster_with_users( logic: &ClientLogic, connection: Connected, ) -> Result, RosterError> { let iq_id = Uuid::new_v4().to_string(); let stanza = Stanza::Iq(Iq { from: Some(connection.jid().clone()), id: iq_id.to_string(), to: None, r#type: IqType::Get, lang: None, query: Some(iq::Query::Roster(stanza::roster::Query { ver: None, items: Vec::new(), })), errors: Vec::new(), }); let response = logic .pending() .request(&connection, stanza, iq_id.clone()) .await?; // TODO: timeout match response { Stanza::Iq(Iq { from: _, id, to: _, r#type, lang: _, query: Some(iq::Query::Roster(stanza::roster::Query { ver: _, items })), errors: _, }) if id == iq_id && r#type == IqType::Result => { let contacts: Vec = items.into_iter().map(|item| item.into()).collect(); if let Err(e) = logic.db().replace_cached_roster(contacts.clone()).await { logic .handle_error(Error::Roster(RosterError::Cache(e.into()))) .await; }; let mut users = Vec::new(); for contact in &contacts { let user = logic.db().read_user(contact.user_jid.clone()).await?; users.push(user); } Ok(contacts.into_iter().zip(users).collect()) } ref s @ Stanza::Iq(Iq { from: _, ref id, to: _, r#type, lang: _, query: _, ref errors, }) if *id == iq_id && r#type == IqType::Error => { if let Some(error) = errors.first() { Err(RosterError::StanzaError(error.clone())) } else { Err(RosterError::UnexpectedStanza(s.clone())) } } s => Err(RosterError::UnexpectedStanza(s)), } } pub async fn handle_add_contact( logic: &ClientLogic, connection: Connected, jid: JID, ) -> Result<(), RosterError> { let iq_id = Uuid::new_v4().to_string(); let set_stanza = Stanza::Iq(Iq { from: Some(connection.jid().clone()), id: iq_id.clone(), to: None, r#type: IqType::Set, lang: None, query: Some(iq::Query::Roster(stanza::roster::Query { ver: None, items: vec![stanza::roster::Item { approved: None, ask: false, jid, name: None, subscription: None, groups: Vec::new(), }], })), errors: Vec::new(), }); let response = logic .pending() .request(&connection, set_stanza, iq_id.clone()) .await?; match response { Stanza::Iq(Iq { from: _, id, to: _, r#type, lang: _, query: _, errors: _, }) if id == iq_id && r#type == IqType::Result => Ok(()), ref s @ Stanza::Iq(Iq { from: _, ref id, to: _, r#type, lang: _, query: _, ref errors, }) if *id == iq_id && r#type == IqType::Error => { if let Some(error) = errors.first() { Err(RosterError::StanzaError(error.clone())) } else { Err(RosterError::UnexpectedStanza(s.clone())) } } s => Err(RosterError::UnexpectedStanza(s)), } } pub async fn handle_buddy_request( logic: &ClientLogic, connection: Connected, jid: JID, ) -> Result<(), SubscribeError> { let client_user = logic.db.read_user(logic.bare_jid.clone()).await?; let nick = client_user.nick.map(|nick| Nick(nick)); let presence = Stanza::Presence(stanza::client::presence::Presence { to: Some(jid.clone()), r#type: Some(stanza::client::presence::PresenceType::Subscribe), nick, ..Default::default() }); connection.write_handle().write(presence).await?; let presence = Stanza::Presence(stanza::client::presence::Presence { to: Some(jid), r#type: Some(stanza::client::presence::PresenceType::Subscribed), ..Default::default() }); connection.write_handle().write(presence).await?; Ok(()) } pub async fn handle_subscription_request( logic: &ClientLogic, connection: Connected, jid: JID, ) -> Result<(), SubscribeError> { // TODO: i should probably have builders let client_user = logic.db.read_user(logic.bare_jid.clone()).await?; let nick = client_user.nick.map(|nick| Nick(nick)); let presence = Stanza::Presence(stanza::client::presence::Presence { to: Some(jid), r#type: Some(stanza::client::presence::PresenceType::Subscribe), nick, ..Default::default() }); connection.write_handle().write(presence).await?; Ok(()) } pub async fn handle_accept_buddy_request( logic: &ClientLogic, connection: Connected, jid: JID, ) -> Result<(), SubscribeError> { let presence = Stanza::Presence(stanza::client::presence::Presence { to: Some(jid.clone()), r#type: Some(stanza::client::presence::PresenceType::Subscribed), ..Default::default() }); connection.write_handle().write(presence).await?; let client_user = logic.db.read_user(logic.bare_jid.clone()).await?; let nick = client_user.nick.map(|nick| Nick(nick)); let presence = Stanza::Presence(stanza::client::presence::Presence { to: Some(jid), r#type: Some(stanza::client::presence::PresenceType::Subscribe), nick, ..Default::default() }); connection.write_handle().write(presence).await?; Ok(()) } pub async fn handle_accept_subscription_request( logic: &ClientLogic, connection: Connected, jid: JID, ) -> Result<(), SubscribeError> { let client_user = logic.db.read_user(logic.bare_jid.clone()).await?; let nick = client_user.nick.map(|nick| Nick(nick)); let presence = Stanza::Presence(stanza::client::presence::Presence { to: Some(jid), r#type: Some(stanza::client::presence::PresenceType::Subscribe), nick, ..Default::default() }); connection.write_handle().write(presence).await?; Ok(()) } pub async fn handle_unsubscribe_from_contact( connection: Connected, jid: JID, ) -> Result<(), WriteError> { let presence = Stanza::Presence(stanza::client::presence::Presence { to: Some(jid), r#type: Some(stanza::client::presence::PresenceType::Unsubscribe), ..Default::default() }); connection.write_handle().write(presence).await?; Ok(()) } pub async fn handle_unsubscribe_contact(connection: Connected, jid: JID) -> Result<(), WriteError> { let presence = Stanza::Presence(stanza::client::presence::Presence { to: Some(jid), r#type: Some(stanza::client::presence::PresenceType::Unsubscribed), ..Default::default() }); connection.write_handle().write(presence).await?; Ok(()) } pub async fn handle_unfriend_contact(connection: Connected, jid: JID) -> Result<(), WriteError> { let presence = Stanza::Presence(stanza::client::presence::Presence { to: Some(jid.clone()), r#type: Some(stanza::client::presence::PresenceType::Unsubscribe), ..Default::default() }); connection.write_handle().write(presence).await?; let presence = Stanza::Presence(stanza::client::presence::Presence { to: Some(jid), r#type: Some(stanza::client::presence::PresenceType::Unsubscribed), ..Default::default() }); connection.write_handle().write(presence).await?; Ok(()) } pub async fn handle_delete_contact( logic: &ClientLogic, connection: Connected, jid: JID, ) -> Result<(), RosterError> { let iq_id = Uuid::new_v4().to_string(); let set_stanza = Stanza::Iq(Iq { from: Some(connection.jid().clone()), id: iq_id.clone(), to: None, r#type: IqType::Set, lang: None, query: Some(iq::Query::Roster(stanza::roster::Query { ver: None, items: vec![stanza::roster::Item { approved: None, ask: false, jid, name: None, subscription: Some(stanza::roster::Subscription::Remove), groups: Vec::new(), }], })), errors: Vec::new(), }); let result = logic .pending() .request(&connection, set_stanza, iq_id.clone()) .await?; match result { Stanza::Iq(Iq { from: _, id, to: _, r#type, lang: _, query: _, errors: _, // don't really need to check matching id as request() does this anyway }) if id == iq_id && r#type == IqType::Result => Ok(()), ref s @ Stanza::Iq(Iq { from: _, ref id, to: _, r#type, lang: _, query: _, ref errors, }) if *id == iq_id && r#type == IqType::Error => { if let Some(error) = errors.first() { Err(RosterError::StanzaError(error.clone())) } else { Err(RosterError::UnexpectedStanza(s.clone())) } } s => Err(RosterError::UnexpectedStanza(s)), } } pub async fn handle_update_contact( logic: &ClientLogic, connection: Connected, jid: JID, contact_update: ContactUpdate, ) -> Result<(), RosterError> { let iq_id = Uuid::new_v4().to_string(); let groups = Vec::from_iter( contact_update .groups .into_iter() .map(|group| stanza::roster::Group(Some(group))), ); let set_stanza = Stanza::Iq(Iq { from: Some(connection.jid().clone()), id: iq_id.clone(), to: None, r#type: IqType::Set, lang: None, query: Some(iq::Query::Roster(stanza::roster::Query { ver: None, items: vec![stanza::roster::Item { approved: None, ask: false, jid, name: contact_update.name, subscription: None, groups, }], })), errors: Vec::new(), }); let response = logic .pending() .request(&connection, set_stanza, iq_id.clone()) .await?; match response { Stanza::Iq(Iq { from: _, id, to: _, r#type, lang: _, query: _, errors: _, }) if id == iq_id && r#type == IqType::Result => Ok(()), ref s @ Stanza::Iq(Iq { from: _, ref id, to: _, r#type, lang: _, query: _, ref errors, }) if *id == iq_id && r#type == IqType::Error => { if let Some(error) = errors.first() { Err(RosterError::StanzaError(error.clone())) } else { Err(RosterError::UnexpectedStanza(s.clone())) } } s => Err(RosterError::UnexpectedStanza(s)), } } pub async fn handle_set_status( logic: &ClientLogic, connection: Connected, online: Online, ) -> Result<(), StatusError> { logic.db().upsert_cached_status(online.clone()).await?; connection .write_handle() .write(Stanza::Presence(online.into_stanza(None))) .await?; Ok(()) } pub async fn handle_send_message(logic: &ClientLogic, connection: Connected, jid: JID, body: Body) { // upsert the chat and user the message will be delivered to. if there is a conflict, it will return whatever was there, otherwise it will return false by default. // let have_chatted = logic.db().upsert_chat_and_user(&jid).await.unwrap_or(false); let have_chatted = match logic.db().upsert_chat_and_user(&jid).await { Ok(have_chatted) => { have_chatted }, Err(e) => { error!("{}", e); false }, }; let nick; let mark_chat_as_chatted; if have_chatted == false { match logic.db.read_user(logic.bare_jid.clone()).await { Ok(u) => { nick = u.nick.map(|nick| Nick(nick)); mark_chat_as_chatted = true; } Err(e) => { logic .handle_error(MessageSendError::GetUserDetails(e.into()).into()) .await; nick = None; mark_chat_as_chatted = false; } } } else { nick = None; mark_chat_as_chatted = false; } // generate message struct let id = Uuid::new_v4(); let timestamp = Utc::now(); let message = Message { id, from: connection.jid().as_bare(), body: body.clone(), timestamp, delivery: Some(Delivery::Sending), }; // try to store in message history that there is a new message that is sending. if client is quit mid-send then can mark as failed and re-send // TODO: mark these as potentially failed upon client launch if let Err(e) = logic .db() .create_message_with_user_resource(message.clone(), jid.clone(), connection.jid().clone()) .await { // TODO: should these really be handle_error or just the error macro? logic .handle_error(MessageSendError::MessageHistory(e.into()).into()) .await; } let from = match logic.db().read_user(logic.bare_jid.clone()).await { Ok(u) => u, Err(e) => { error!("{}", e); User { jid: logic.bare_jid.clone(), nick: None, avatar: None, } }, }; // tell the client a message is being sent logic .update_sender() .send(UpdateMessage::Message { to: jid.as_bare(), message, from, }) .await; // prepare the message stanza let message_stanza = Stanza::Message(stanza::client::message::Message { from: Some(connection.jid().clone()), id: Some(id.to_string()), to: Some(jid.clone()), // TODO: specify message type r#type: stanza::client::message::MessageType::Chat, // TODO: lang ? body: Some(stanza::client::message::Body { lang: None, body: Some(body.body.clone()), }), // include delay to have a consistent timestamp between server and client delay: Some(Delay { from: None, stamp: timestamp, }), nick, ..Default::default() }); // send the message let result = connection .write_handle() .write(message_stanza.clone()) .await; match result { Ok(_) => { info!("sent message: {:?}", message_stanza); logic .update_sender() .send(UpdateMessage::MessageDelivery { id, delivery: Delivery::Written, chat: jid.clone(), }) .await; if mark_chat_as_chatted { debug!("marking chat as chatted"); if let Err(e) = logic.db.mark_chat_as_chatted(jid).await { logic .handle_error(MessageSendError::MarkChatAsChatted(e.into()).into()) .await; } } } Err(e) => { logic .update_sender() .send(UpdateMessage::MessageDelivery { id, delivery: Delivery::Failed, chat: jid, }) .await; logic.handle_error(MessageSendError::Write(e).into()).await; } } } pub async fn handle_send_presence( connection: Connected, jid: Option, presence: PresenceType, ) -> Result<(), WriteError> { let mut presence: stanza::client::presence::Presence = presence.into(); presence.to = jid; connection .write_handle() .write(Stanza::Presence(presence)) .await?; Ok(()) } pub async fn handle_disco_info( logic: &ClientLogic, connection: Connected, jid: Option, node: Option, ) -> Result { let id = Uuid::new_v4().to_string(); let request = Iq { from: Some(connection.jid().clone()), id: id.clone(), to: jid.clone(), r#type: IqType::Get, lang: None, query: Some(Query::DiscoInfo(info::Query { node, features: Vec::new(), identities: Vec::new(), extensions: Vec::new(), })), errors: Vec::new(), }; match logic .pending() .request(&connection, Stanza::Iq(request), id) .await? { Stanza::Iq(Iq { from, r#type, query, mut errors, .. // TODO: maybe abstract a bunch of these different errors related to iqs into an iq error thing? as in like call iq.result(), get the query from inside, error otherwise. }) if r#type == IqType::Result || r#type == IqType::Error => { if from == jid || { if jid == None { from == Some(connection.jid().as_bare()) } else { false } } { match r#type { IqType::Result => { if let Some(query) = query { match query { Query::DiscoInfo(info) => Ok(info.into()), q => Err(DiscoError::MismatchedQuery(q)), } } else { Err(DiscoError::MissingQuery) } } IqType::Error => { if let Some(error) = errors.pop() { Err(error.into()) } else { Err(DiscoError::MissingError) } } _ => unreachable!(), } } else { Err(DiscoError::IncorrectEntity( from.unwrap_or_else(|| connection.jid().as_bare()), )) } } s => Err(DiscoError::UnexpectedStanza(s)), } } pub async fn handle_disco_items( logic: &ClientLogic, connection: Connected, jid: Option, node: Option, ) -> Result { let id = Uuid::new_v4().to_string(); let request = Iq { from: Some(connection.jid().clone()), id: id.clone(), to: jid.clone(), r#type: IqType::Get, lang: None, query: Some(Query::DiscoItems(items::Query { node, items: Vec::new(), })), errors: Vec::new(), }; match logic .pending() .request(&connection, Stanza::Iq(request), id) .await? { Stanza::Iq(Iq { from, r#type, query, mut errors, .. // TODO: maybe abstract a bunch of these different errors related to iqs into an iq error thing? as in like call iq.result(), get the query from inside, error otherwise. }) if r#type == IqType::Result || r#type == IqType::Error => { if from == jid || { if jid == None { from == Some(connection.jid().as_bare()) } else { false } } { match r#type { IqType::Result => { if let Some(query) = query { match query { Query::DiscoItems(items) => Ok(items.into()), q => Err(DiscoError::MismatchedQuery(q)), } } else { Err(DiscoError::MissingQuery) } } IqType::Error => { if let Some(error) = errors.pop() { Err(error.into()) } else { Err(DiscoError::MissingError) } } _ => unreachable!(), } } else { Err(DiscoError::IncorrectEntity( from.unwrap_or_else(|| connection.jid().as_bare()), )) } } s => Err(DiscoError::UnexpectedStanza(s)), } } pub async fn handle_publish_pep_item( logic: &ClientLogic, connection: Connected, item: pep::Item, node: String, ) -> Result<(), PEPError> { let id = Uuid::new_v4().to_string(); let publish = match item { pep::Item::Nick(n) => { if let Some(n) = n { pubsub::Publish { node, items: vec![pubsub::Item { item: Some(pubsub::Content::Nick(Nick(n))), ..Default::default() }], } } else { pubsub::Publish { node, items: vec![pubsub::Item { item: Some(pubsub::Content::Nick(Nick("".to_string()))), ..Default::default() }] } } }, pep::Item::AvatarMetadata(metadata) => { if let Some(metadata) = metadata { pubsub::Publish { node, items: vec![pubsub::Item { item: Some(pubsub::Content::AvatarMetadata(xep_0084::Metadata { info: vec![xep_0084::Info { bytes: metadata.bytes, height: None, id: metadata.hash.clone(), r#type: metadata.r#type, url: None, width: None }], pointers: Vec::new() })), id: Some(metadata.hash), ..Default::default() }]} } else { pubsub::Publish { node, items: vec![pubsub::Item { item: Some(pubsub::Content::AvatarMetadata(xep_0084::Metadata { info: Vec::new(), pointers: Vec::new() })), ..Default::default() }]} } }, pep::Item::AvatarData(data) => { if let Some(data) = data { pubsub::Publish { node, items: vec![pubsub::Item { item: Some(pubsub::Content::AvatarData(xep_0084::Data(data.data_b64))), id: Some(data.hash), ..Default::default() }] } } else { pubsub::Publish { node, items: vec![pubsub::Item { item: Some(pubsub::Content::AvatarData(xep_0084::Data("".to_string()))), ..Default::default() }]} } }, }; let request = Iq { from: Some(connection.jid().clone()), id: id.clone(), to: None, r#type: IqType::Set, lang: None, query: Some(Query::Pubsub(Pubsub::Publish(publish, None))), errors: Vec::new(), }; match logic .pending() .request(&connection, Stanza::Iq(request), id) .await? { Stanza::Iq(Iq { from, r#type, query, errors, .. // TODO: maybe abstract a bunch of these different errors related to iqs into an iq error thing? as in like call iq.result(), get the query from inside, error otherwise. }) if r#type == IqType::Result || r#type == IqType::Error => { if from == None || from == Some(connection.jid().as_bare()) { match r#type { IqType::Result => { if let Some(query) = query { match query { Query::Pubsub(_) => Ok(()), q => Err(PEPError::MismatchedQuery(q)), } } else { Err(PEPError::MissingQuery) } } IqType::Error => { Err(PEPError::StanzaErrors(errors)) } _ => unreachable!(), } } else { Err(PEPError::IncorrectEntity( from.unwrap_or_else(|| connection.jid().as_bare()), )) } } s => Err(PEPError::UnexpectedStanza(s)), } } pub async fn handle_get_pep_item(logic: &ClientLogic, connection: Connected, jid: Option, node: String, id: String) -> Result { let stanza_id = Uuid::new_v4().to_string(); let request = Iq { from: Some(connection.jid().clone()), id: stanza_id.clone(), to: jid.clone(), r#type: IqType::Get, lang: None, query: Some(Query::Pubsub(Pubsub::Items(pubsub::Items { max_items: None, node, subid: None, items: vec![pubsub::Item { id: Some(id.clone()), publisher: None, item: None }], }))), errors: Vec::new(), }; match logic .pending() .request(&connection, Stanza::Iq(request), stanza_id) .await? { Stanza::Iq(Iq { from, r#type, query, errors, .. // TODO: maybe abstract a bunch of these different errors related to iqs into an iq error thing? as in like call iq.result(), get the query from inside, error otherwise. }) if r#type == IqType::Result || r#type == IqType::Error => { if from == jid || { if jid == None { from == Some(connection.jid().as_bare()) } else { false } } { match r#type { IqType::Result => { if let Some(query) = query { match query { Query::Pubsub(Pubsub::Items(mut items)) => { if let Some(item) = items.items.pop() { if item.id == Some(id.clone()) { match item.item.ok_or(PEPError::MissingItem)? { pubsub::Content::Nick(nick) => { if nick.0.is_empty() { Ok(pep::Item::Nick(None)) } else { Ok(pep::Item::Nick(Some(nick.0))) } }, pubsub::Content::AvatarData(data) => Ok(pep::Item::AvatarData(Some(avatar::Data { hash: id, data_b64: data.0 }))), pubsub::Content::AvatarMetadata(metadata) => Ok(pep::Item::AvatarMetadata(metadata.info.into_iter().find(|info| info.url.is_none()).map(|info| info.into()))), pubsub::Content::Unknown(_element) => Err(PEPError::UnsupportedItem), } } else { Err(PEPError::IncorrectItemID(id, item.id.unwrap_or_else(|| "missing id".to_string()))) } } else { Err(PEPError::MissingItem) } }, q => Err(PEPError::MismatchedQuery(q)), } } else { Err(PEPError::MissingQuery) } } IqType::Error => { Err(PEPError::StanzaErrors(errors)) } _ => unreachable!(), } } else { // TODO: include expected entity Err(PEPError::IncorrectEntity( from.unwrap_or_else(|| connection.jid().as_bare()), )) } } s => Err(PEPError::UnexpectedStanza(s)), } } pub async fn handle_change_nick(logic: &ClientLogic, nick: Option) -> Result<(), NickError> { logic.client().publish(pep::Item::Nick(nick), xep_0172::XMLNS.to_string()).await?; Ok(()) } pub async fn handle_change_avatar(logic: &ClientLogic, img_data: Option>) -> Result<(), AvatarPublishError> { match img_data { // set avatar Some(data) => { // load the image data and guess the format let image = ImageReader::new(Cursor::new(data)).with_guessed_format()?.decode()?; // convert the image to png; let mut data_png = Vec::new(); let image = image.resize(192, 192, image::imageops::FilterType::Nearest); image.write_to(&mut Cursor::new(&mut data_png), image::ImageFormat::Jpeg)?; // calculate the length of the data in bytes. let bytes = data_png.len().try_into()?; // calculate sha1 hash of the data let mut sha1 = Sha1::new(); sha1.update(&data_png); let sha1_result = sha1.finalize(); let hash = hex::encode(sha1_result); // encode the image data as base64 let data_b64 = BASE64_STANDARD.encode(data_png.clone()); // publish the data to the data node logic.client().publish(pep::Item::AvatarData(Some(avatar::Data { hash: hash.clone(), data_b64 })), "urn:xmpp:avatar:data".to_string()).await?; // publish the metadata to the metadata node logic.client().publish(pep::Item::AvatarMetadata(Some(avatar::Metadata { bytes, hash: hash.clone(), r#type: "image/jpeg".to_string() })), "urn:xmpp:avatar:metadata".to_string()).await?; // if everything went well, save the data to the disk. if !logic.file_store().is_stored(&hash).await.map_err(|err| AvatarPublishError::FileStore(err))? { logic.file_store().store(&hash, &data_png).await.map_err(|err| AvatarPublishError::FileStore(err))? } // when the client receives the updated metadata notification from the pep node, it will already have it saved on the disk so will not require a retrieval. // TODO: should the node be purged? Ok(()) }, // remove avatar None => { logic.client().delete_pep_node("urn:xmpp:avatar:data".to_string()).await?; logic.client().publish(pep::Item::AvatarMetadata(None), "urn:xmpp:avatar:metadata".to_string(), ).await?; Ok(()) }, } } pub async fn handle_delete_pep_node( logic: &ClientLogic, connection: Connected, node: String, ) -> Result<(), PEPError> { let id = Uuid::new_v4().to_string(); let request = Iq { from: Some(connection.jid().clone()), id: id.clone(), to: None, r#type: IqType::Set, lang: None, query: Some(Query::PubsubOwner(xep_0060::owner::Pubsub::Delete(owner::Delete{ node, redirect: None }))), errors: Vec::new(), }; match logic .pending() .request(&connection, Stanza::Iq(request), id) .await? { Stanza::Iq(Iq { from, r#type, query, errors, .. // TODO: maybe abstract a bunch of these different errors related to iqs into an iq error thing? as in like call iq.result(), get the query from inside, error otherwise. }) if r#type == IqType::Result || r#type == IqType::Error => { if from == None || from == Some(connection.jid().as_bare()) { match r#type { IqType::Result => { if let Some(query) = query { match query { Query::PubsubOwner(_) => Ok(()), q => Err(PEPError::MismatchedQuery(q)), } } else { // Err(PEPError::MissingQuery) Ok(()) } } IqType::Error => { Err(PEPError::StanzaErrors(errors)) } _ => unreachable!(), } } else { Err(PEPError::IncorrectEntity( from.unwrap_or_else(|| connection.jid().as_bare()), )) } } s => Err(PEPError::UnexpectedStanza(s)), } } // TODO: could probably macro-ise? pub async fn handle_online_result( logic: &ClientLogic, command: Command, connection: Connected, ) -> Result<(), Error> { match command { Command::GetRoster(result_sender) => { let roster = handle_get_roster(logic, connection).await; let _ = result_sender.send(roster); } Command::GetRosterWithUsers(result_sender) => { let roster = handle_get_roster_with_users(logic, connection).await; let _ = result_sender.send(roster); } Command::GetChats(sender) => { let chats = handle_get_chats(logic).await; let _ = sender.send(chats); } Command::GetChatsOrdered(sender) => { let chats = handle_get_chats_ordered(logic).await; let _ = sender.send(chats); } Command::GetChatsOrderedWithLatestMessages(sender) => { let chats = handle_get_chats_ordered_with_latest_messages(logic).await; let _ = sender.send(chats); } Command::GetChatsOrderedWithLatestMessagesAndUsers(sender) => { let chats = handle_get_chats_ordered_with_latest_messages_and_users(logic).await; sender.send(chats); } Command::GetChat(jid, sender) => { let chat = handle_get_chat(logic, jid).await; let _ = sender.send(chat); } Command::GetMessages(jid, sender) => { let messages = handle_get_messages(logic, jid).await; let _ = sender.send(messages); } Command::GetMessagesWithUsers(jid, sender) => { let messages = handle_get_messages_with_users(logic, jid).await; sender.send(messages); } Command::DeleteChat(jid, sender) => { let result = handle_delete_chat(logic, jid).await; let _ = sender.send(result); } Command::DeleteMessage(uuid, sender) => { let result = handle_delete_messaage(logic, uuid).await; let _ = sender.send(result); } Command::GetUser(jid, sender) => { let user = handle_get_user(logic, jid).await; let _ = sender.send(user); } Command::AddContact(jid, sender) => { let result = handle_add_contact(logic, connection, jid).await; let _ = sender.send(result); } Command::BuddyRequest(jid, sender) => { let result = handle_buddy_request(logic, connection, jid).await; let _ = sender.send(result); } Command::SubscriptionRequest(jid, sender) => { let result = handle_subscription_request(logic, connection, jid).await; let _ = sender.send(result); } Command::AcceptBuddyRequest(jid, sender) => { let result = handle_accept_buddy_request(logic, connection, jid).await; let _ = sender.send(result); } Command::AcceptSubscriptionRequest(jid, sender) => { let result = handle_accept_subscription_request(logic, connection, jid).await; let _ = sender.send(result); } Command::UnsubscribeFromContact(jid, sender) => { let result = handle_unsubscribe_from_contact(connection, jid).await; let _ = sender.send(result); } Command::UnsubscribeContact(jid, sender) => { let result = handle_unsubscribe_contact(connection, jid).await; let _ = sender.send(result); } Command::UnfriendContact(jid, sender) => { let result = handle_unfriend_contact(connection, jid).await; let _ = sender.send(result); } Command::DeleteContact(jid, sender) => { let result = handle_delete_contact(logic, connection, jid).await; let _ = sender.send(result); } Command::UpdateContact(jid, contact_update, sender) => { let result = handle_update_contact(logic, connection, jid, contact_update).await; let _ = sender.send(result); } Command::SetStatus(online, sender) => { let result = handle_set_status(logic, connection, online).await; let _ = sender.send(result); } Command::SendMessage(jid, body) => { handle_send_message(logic, connection, jid, body).await; } Command::SendPresence(jid, presence, sender) => { let result = handle_send_presence(connection, jid, presence).await; let _ = sender.send(result); } Command::DiscoInfo(jid, node, sender) => { let result = handle_disco_info(logic, connection, jid, node).await; let _ = sender.send(result); } Command::DiscoItems(jid, node, sender) => { let result = handle_disco_items(logic, connection, jid, node).await; let _ = sender.send(result); } Command::PublishPEPItem { item, node, sender } => { let result = handle_publish_pep_item(logic, connection, item, node).await; let _ = sender.send(result); } Command::ChangeNick(nick, sender) => { let result = handle_change_nick(logic, nick).await; let _ = sender.send(result); } Command::ChangeAvatar(img_data, sender) => { let result = handle_change_avatar(logic, img_data).await; let _ = sender.send(result); }, Command::DeletePEPNode { node, sender } => { let result = handle_delete_pep_node(logic, connection, node).await; let _ = sender.send(result); }, Command::GetPEPItem { node, sender, jid, id } => { let result = handle_get_pep_item(logic, connection, jid, node, id).await; let _ = sender.send(result); }, } Ok(()) }