use chrono::Utc; use jid::JID; use lampada::{Connected, WriteMessage, error::WriteError}; use stanza::{ client::{ Stanza, iq::{self, Iq, IqType}, }, xep_0203::Delay, }; use tokio::sync::oneshot; use tracing::{debug, info}; use uuid::Uuid; use crate::{ Command, UpdateMessage, chat::{Body, Message}, error::{DatabaseError, Error, MessageSendError, RosterError, StatusError}, presence::{Online, Presence, PresenceType}, roster::{Contact, ContactUpdate}, }; use super::{ ClientLogic, local_only::{ handle_delete_chat, handle_delete_messaage, handle_get_chat, handle_get_chats, handle_get_chats_ordered, handle_get_chats_ordered_with_latest_messages, handle_get_messages, handle_get_user, }, }; pub async fn handle_online(logic: ClientLogic, command: Command, connection: Connected) { let result = handle_online_result(&logic, command, connection).await; match result { Ok(_) => {} Err(e) => logic.handle_error(e).await, } } pub async fn handle_get_roster( logic: &ClientLogic, connection: Connected, ) -> Result, RosterError> { let iq_id = Uuid::new_v4().to_string(); let stanza = Stanza::Iq(Iq { from: Some(connection.jid().clone()), id: iq_id.to_string(), to: None, r#type: IqType::Get, lang: None, query: Some(iq::Query::Roster(stanza::roster::Query { ver: None, items: Vec::new(), })), errors: Vec::new(), }); let response = logic .pending() .request(&connection, stanza, iq_id.clone()) .await?; // TODO: timeout match response { Stanza::Iq(Iq { from: _, id, to: _, r#type, lang: _, query: Some(iq::Query::Roster(stanza::roster::Query { ver: _, items })), errors: _, }) if id == iq_id && r#type == IqType::Result => { let contacts: Vec = items.into_iter().map(|item| item.into()).collect(); if let Err(e) = logic.db().replace_cached_roster(contacts.clone()).await { logic .handle_error(Error::Roster(RosterError::Cache(e.into()))) .await; }; Ok(contacts) } ref s @ Stanza::Iq(Iq { from: _, ref id, to: _, r#type, lang: _, query: _, ref errors, }) if *id == iq_id && r#type == IqType::Error => { if let Some(error) = errors.first() { Err(RosterError::StanzaError(error.clone())) } else { Err(RosterError::UnexpectedStanza(s.clone())) } } s => Err(RosterError::UnexpectedStanza(s)), } } pub async fn handle_add_contact( logic: &ClientLogic, connection: Connected, jid: JID, ) -> Result<(), RosterError> { let iq_id = Uuid::new_v4().to_string(); let set_stanza = Stanza::Iq(Iq { from: Some(connection.jid().clone()), id: iq_id.clone(), to: None, r#type: IqType::Set, lang: None, query: Some(iq::Query::Roster(stanza::roster::Query { ver: None, items: vec![stanza::roster::Item { approved: None, ask: false, jid, name: None, subscription: None, groups: Vec::new(), }], })), errors: Vec::new(), }); let response = logic .pending() .request(&connection, set_stanza, iq_id.clone()) .await?; match response { Stanza::Iq(Iq { from: _, id, to: _, r#type, lang: _, query: _, errors: _, }) if id == iq_id && r#type == IqType::Result => Ok(()), ref s @ Stanza::Iq(Iq { from: _, ref id, to: _, r#type, lang: _, query: _, ref errors, }) if *id == iq_id && r#type == IqType::Error => { if let Some(error) = errors.first() { Err(RosterError::StanzaError(error.clone())) } else { Err(RosterError::UnexpectedStanza(s.clone())) } } s => Err(RosterError::UnexpectedStanza(s)), } } pub async fn handle_buddy_request(connection: Connected, jid: JID) -> Result<(), WriteError> { let presence = Stanza::Presence(stanza::client::presence::Presence { from: None, id: None, to: Some(jid.clone()), r#type: Some(stanza::client::presence::PresenceType::Subscribe), lang: None, show: None, status: None, priority: None, errors: Vec::new(), delay: None, }); connection.write_handle().write(presence).await?; let presence = Stanza::Presence(stanza::client::presence::Presence { from: None, id: None, to: Some(jid), r#type: Some(stanza::client::presence::PresenceType::Subscribed), lang: None, show: None, status: None, priority: None, errors: Vec::new(), delay: None, }); connection.write_handle().write(presence).await?; Ok(()) } pub async fn handle_subscription_request( connection: Connected, jid: JID, ) -> Result<(), WriteError> { // TODO: i should probably have builders let presence = Stanza::Presence(stanza::client::presence::Presence { from: None, id: None, to: Some(jid), r#type: Some(stanza::client::presence::PresenceType::Subscribe), lang: None, show: None, status: None, priority: None, errors: Vec::new(), delay: None, }); connection.write_handle().write(presence).await?; Ok(()) } pub async fn handle_accept_buddy_request( connection: Connected, jid: JID, ) -> Result<(), WriteError> { let presence = Stanza::Presence(stanza::client::presence::Presence { from: None, id: None, to: Some(jid.clone()), r#type: Some(stanza::client::presence::PresenceType::Subscribed), lang: None, show: None, status: None, priority: None, errors: Vec::new(), delay: None, }); connection.write_handle().write(presence).await?; let presence = Stanza::Presence(stanza::client::presence::Presence { from: None, id: None, to: Some(jid), r#type: Some(stanza::client::presence::PresenceType::Subscribe), lang: None, show: None, status: None, priority: None, errors: Vec::new(), delay: None, }); connection.write_handle().write(presence).await?; Ok(()) } pub async fn handle_accept_subscription_request( connection: Connected, jid: JID, ) -> Result<(), WriteError> { let presence = Stanza::Presence(stanza::client::presence::Presence { from: None, id: None, to: Some(jid), r#type: Some(stanza::client::presence::PresenceType::Subscribe), lang: None, show: None, status: None, priority: None, errors: Vec::new(), delay: None, }); connection.write_handle().write(presence).await?; Ok(()) } pub async fn handle_unsubscribe_from_contact( connection: Connected, jid: JID, ) -> Result<(), WriteError> { let presence = Stanza::Presence(stanza::client::presence::Presence { from: None, id: None, to: Some(jid), r#type: Some(stanza::client::presence::PresenceType::Unsubscribe), lang: None, show: None, status: None, priority: None, errors: Vec::new(), delay: None, }); connection.write_handle().write(presence).await?; Ok(()) } pub async fn handle_unsubscribe_contact(connection: Connected, jid: JID) -> Result<(), WriteError> { let presence = Stanza::Presence(stanza::client::presence::Presence { from: None, id: None, to: Some(jid), r#type: Some(stanza::client::presence::PresenceType::Unsubscribed), lang: None, show: None, status: None, priority: None, errors: Vec::new(), delay: None, }); connection.write_handle().write(presence).await?; Ok(()) } pub async fn handle_unfriend_contact(connection: Connected, jid: JID) -> Result<(), WriteError> { let presence = Stanza::Presence(stanza::client::presence::Presence { from: None, id: None, to: Some(jid.clone()), r#type: Some(stanza::client::presence::PresenceType::Unsubscribe), lang: None, show: None, status: None, priority: None, errors: Vec::new(), delay: None, }); connection.write_handle().write(presence).await?; let presence = Stanza::Presence(stanza::client::presence::Presence { from: None, id: None, to: Some(jid), r#type: Some(stanza::client::presence::PresenceType::Unsubscribed), lang: None, show: None, status: None, priority: None, errors: Vec::new(), delay: None, }); connection.write_handle().write(presence).await?; Ok(()) } pub async fn handle_delete_contact( logic: &ClientLogic, connection: Connected, jid: JID, ) -> Result<(), RosterError> { let iq_id = Uuid::new_v4().to_string(); let set_stanza = Stanza::Iq(Iq { from: Some(connection.jid().clone()), id: iq_id.clone(), to: None, r#type: IqType::Set, lang: None, query: Some(iq::Query::Roster(stanza::roster::Query { ver: None, items: vec![stanza::roster::Item { approved: None, ask: false, jid, name: None, subscription: Some(stanza::roster::Subscription::Remove), groups: Vec::new(), }], })), errors: Vec::new(), }); let result = logic .pending() .request(&connection, set_stanza, iq_id.clone()) .await?; match result { Stanza::Iq(Iq { from: _, id, to: _, r#type, lang: _, query: _, errors: _, // don't really need to check matching id as request() does this anyway }) if id == iq_id && r#type == IqType::Result => Ok(()), ref s @ Stanza::Iq(Iq { from: _, ref id, to: _, r#type, lang: _, query: _, ref errors, }) if *id == iq_id && r#type == IqType::Error => { if let Some(error) = errors.first() { Err(RosterError::StanzaError(error.clone())) } else { Err(RosterError::UnexpectedStanza(s.clone())) } } s => Err(RosterError::UnexpectedStanza(s)), } } pub async fn handle_update_contact( logic: &ClientLogic, connection: Connected, jid: JID, contact_update: ContactUpdate, ) -> Result<(), RosterError> { let iq_id = Uuid::new_v4().to_string(); let groups = Vec::from_iter( contact_update .groups .into_iter() .map(|group| stanza::roster::Group(Some(group))), ); let set_stanza = Stanza::Iq(Iq { from: Some(connection.jid().clone()), id: iq_id.clone(), to: None, r#type: IqType::Set, lang: None, query: Some(iq::Query::Roster(stanza::roster::Query { ver: None, items: vec![stanza::roster::Item { approved: None, ask: false, jid, name: contact_update.name, subscription: None, groups, }], })), errors: Vec::new(), }); let response = logic .pending() .request(&connection, set_stanza, iq_id.clone()) .await?; match response { Stanza::Iq(Iq { from: _, id, to: _, r#type, lang: _, query: _, errors: _, }) if id == iq_id && r#type == IqType::Result => Ok(()), ref s @ Stanza::Iq(Iq { from: _, ref id, to: _, r#type, lang: _, query: _, ref errors, }) if *id == iq_id && r#type == IqType::Error => { if let Some(error) = errors.first() { Err(RosterError::StanzaError(error.clone())) } else { Err(RosterError::UnexpectedStanza(s.clone())) } } s => Err(RosterError::UnexpectedStanza(s)), } } pub async fn handle_set_status( logic: &ClientLogic, connection: Connected, online: Online, ) -> Result<(), StatusError> { logic .db() .upsert_cached_status(online.clone()) .await .map_err(|e| DatabaseError(e.into()))?; connection .write_handle() .write(Stanza::Presence(online.into_stanza(None))) .await?; Ok(()) } pub async fn handle_send_message( logic: &ClientLogic, connection: Connected, jid: JID, body: Body, ) -> Result<(), WriteError> { let id = Uuid::new_v4(); let timestamp = Utc::now(); let message = Stanza::Message(stanza::client::message::Message { from: Some(connection.jid().clone()), id: Some(id.to_string()), to: Some(jid.clone()), // TODO: specify message type r#type: stanza::client::message::MessageType::Chat, // TODO: lang ? lang: None, subject: None, body: Some(stanza::client::message::Body { lang: None, body: Some(body.body.clone()), }), thread: None, // include delay to have a consistent timestamp between server and client delay: Some(Delay { from: None, stamp: timestamp, }), }); connection.write_handle().write(message).await?; let mut message = Message { id, from: connection.jid().clone(), body, timestamp, }; info!("sent message: {:?}", message); if let Err(e) = logic .db() .create_message_with_self_resource_and_chat(message.clone(), jid.clone()) .await { // TODO: should these really be handle_error or just the error macro? logic .handle_error(MessageSendError::MessageHistory(e.into()).into()) .await; } // TODO: don't do this, have separate from from details message.from = message.from.as_bare(); let _ = logic .update_sender() .send(UpdateMessage::Message { to: jid, message }) .await; Ok(()) // TODO: refactor this to send a sending updatemessage, then update or something like that } pub async fn handle_send_presence( connection: Connected, jid: Option, presence: PresenceType, ) -> Result<(), WriteError> { let mut presence: stanza::client::presence::Presence = presence.into(); presence.to = jid; connection .write_handle() .write(Stanza::Presence(presence)) .await?; Ok(()) } // TODO: could probably macro-ise? pub async fn handle_online_result( logic: &ClientLogic, command: Command, connection: Connected, ) -> Result<(), Error> { match command { Command::GetRoster(result_sender) => { let roster = handle_get_roster(logic, connection).await; let _ = result_sender.send(roster); } Command::GetChats(sender) => { let chats = handle_get_chats(logic).await; let _ = sender.send(chats); } Command::GetChatsOrdered(sender) => { let chats = handle_get_chats_ordered(logic).await; let _ = sender.send(chats); } Command::GetChatsOrderedWithLatestMessages(sender) => { let chats = handle_get_chats_ordered_with_latest_messages(logic).await; let _ = sender.send(chats); } Command::GetChat(jid, sender) => { let chat = handle_get_chat(logic, jid).await; let _ = sender.send(chat); } Command::GetMessages(jid, sender) => { let messages = handle_get_messages(logic, jid).await; let _ = sender.send(messages); } Command::DeleteChat(jid, sender) => { let result = handle_delete_chat(logic, jid).await; let _ = sender.send(result); } Command::DeleteMessage(uuid, sender) => { let result = handle_delete_messaage(logic, uuid).await; let _ = sender.send(result); } Command::GetUser(jid, sender) => { let user = handle_get_user(logic, jid).await; let _ = sender.send(user); } // TODO: offline queue to modify roster Command::AddContact(jid, sender) => { let result = handle_add_contact(logic, connection, jid).await; let _ = sender.send(result); } Command::BuddyRequest(jid, sender) => { let result = handle_buddy_request(connection, jid).await; let _ = sender.send(result); } Command::SubscriptionRequest(jid, sender) => { let result = handle_subscription_request(connection, jid).await; let _ = sender.send(result); } Command::AcceptBuddyRequest(jid, sender) => { let result = handle_accept_buddy_request(connection, jid).await; let _ = sender.send(result); } Command::AcceptSubscriptionRequest(jid, sender) => { let result = handle_accept_subscription_request(connection, jid).await; let _ = sender.send(result); } Command::UnsubscribeFromContact(jid, sender) => { let result = handle_unsubscribe_from_contact(connection, jid).await; let _ = sender.send(result); } Command::UnsubscribeContact(jid, sender) => { let result = handle_unsubscribe_contact(connection, jid).await; let _ = sender.send(result); } Command::UnfriendContact(jid, sender) => { let result = handle_unfriend_contact(connection, jid).await; let _ = sender.send(result); } Command::DeleteContact(jid, sender) => { let result = handle_delete_contact(logic, connection, jid).await; let _ = sender.send(result); } Command::UpdateContact(jid, contact_update, sender) => { let result = handle_update_contact(logic, connection, jid, contact_update).await; let _ = sender.send(result); } Command::SetStatus(online, sender) => { let result = handle_set_status(logic, connection, online).await; let _ = sender.send(result); } // TODO: offline message queue Command::SendMessage(jid, body, sender) => { let result = handle_send_message(logic, connection, jid, body).await; let _ = sender.send(result); } Command::SendPresence(jid, presence, sender) => { let result = handle_send_presence(connection, jid, presence).await; let _ = sender.send(result); } } Ok(()) }