diff options
Diffstat (limited to 'filamento/src/logic/online.rs')
-rw-r--r-- | filamento/src/logic/online.rs | 1152 |
1 files changed, 570 insertions, 582 deletions
diff --git a/filamento/src/logic/online.rs b/filamento/src/logic/online.rs index 967ebb2..8bbeaa5 100644 --- a/filamento/src/logic/online.rs +++ b/filamento/src/logic/online.rs @@ -1,8 +1,12 @@ use chrono::Utc; +use jid::JID; use lampada::{Connected, WriteMessage, error::WriteError}; -use stanza::client::{ - Stanza, - iq::{self, Iq, IqType}, +use stanza::{ + client::{ + Stanza, + iq::{self, Iq, IqType}, + }, + xep_0203::Delay, }; use tokio::sync::oneshot; use tracing::{debug, info}; @@ -10,641 +14,625 @@ use uuid::Uuid; use crate::{ Command, UpdateMessage, - chat::Message, - error::{Error, MessageSendError, RosterError, StatusError}, - roster::Contact, + chat::{Body, Message}, + error::{DatabaseError, Error, MessageSendError, RosterError, StatusError}, + presence::{Online, Presence, PresenceType}, + roster::{Contact, ContactUpdate}, }; -use super::ClientLogic; +use super::{ + ClientLogic, + local_only::{ + handle_delete_chat, handle_delete_messaage, handle_get_chat, handle_get_chats, + handle_get_chats_ordered, handle_get_chats_ordered_with_latest_messages, + handle_get_messages, handle_get_user, + }, +}; pub async fn handle_online(logic: ClientLogic, command: Command, connection: Connected) { - match command { - Command::GetRoster(result_sender) => { - let iq_id = Uuid::new_v4().to_string(); - let (send, iq_recv) = oneshot::channel(); - { - logic.pending().lock().await.insert(iq_id.clone(), send); - } - let stanza = Stanza::Iq(Iq { - from: Some(connection.jid().clone()), - id: iq_id.to_string(), - to: None, - r#type: IqType::Get, - lang: None, - query: Some(iq::Query::Roster(stanza::roster::Query { - ver: None, - items: Vec::new(), - })), - errors: Vec::new(), - }); - let (send, recv) = oneshot::channel(); - let _ = connection - .write_handle() - .send(WriteMessage { - stanza, - respond_to: send, - }) - .await; - // TODO: timeout - match recv.await { - Ok(Ok(())) => info!("roster request sent"), - Ok(Err(e)) => { - // TODO: log errors if fail to send - let _ = result_sender.send(Err(RosterError::Write(e.into()))); - return; - } - Err(e) => { - let _ = - result_sender.send(Err(RosterError::Write(WriteError::Actor(e.into())))); - return; - } + let result = handle_online_result(&logic, command, connection).await; + match result { + Ok(_) => {} + Err(e) => logic.handle_error(e).await, + } +} + +pub async fn handle_get_roster( + logic: &ClientLogic, + connection: Connected, +) -> Result<Vec<Contact>, RosterError> { + let iq_id = Uuid::new_v4().to_string(); + let stanza = Stanza::Iq(Iq { + from: Some(connection.jid().clone()), + id: iq_id.to_string(), + to: None, + r#type: IqType::Get, + lang: None, + query: Some(iq::Query::Roster(stanza::roster::Query { + ver: None, + items: Vec::new(), + })), + errors: Vec::new(), + }); + let response = logic + .pending() + .request(&connection, stanza, iq_id.clone()) + .await?; + // TODO: timeout + match response { + Stanza::Iq(Iq { + from: _, + id, + to: _, + r#type, + lang: _, + query: Some(iq::Query::Roster(stanza::roster::Query { ver: _, items })), + errors: _, + }) if id == iq_id && r#type == IqType::Result => { + let contacts: Vec<Contact> = items.into_iter().map(|item| item.into()).collect(); + if let Err(e) = logic.db().replace_cached_roster(contacts.clone()).await { + logic + .handle_error(Error::Roster(RosterError::Cache(e.into()))) + .await; }; - // TODO: timeout - match iq_recv.await { - Ok(Ok(stanza)) => match stanza { - Stanza::Iq(Iq { - from: _, - id, - to: _, - r#type, - lang: _, - query: Some(iq::Query::Roster(stanza::roster::Query { ver: _, items })), - errors: _, - }) if id == iq_id && r#type == IqType::Result => { - let contacts: Vec<Contact> = - items.into_iter().map(|item| item.into()).collect(); - if let Err(e) = logic.db().replace_cached_roster(contacts.clone()).await { - logic - .handle_error(Error::Roster(RosterError::Cache(e.into()))) - .await; - }; - result_sender.send(Ok(contacts)); - return; - } - ref s @ Stanza::Iq(Iq { - from: _, - ref id, - to: _, - r#type, - lang: _, - query: _, - ref errors, - }) if *id == iq_id && r#type == IqType::Error => { - if let Some(error) = errors.first() { - result_sender.send(Err(RosterError::StanzaError(error.clone()))); - } else { - result_sender.send(Err(RosterError::UnexpectedStanza(s.clone()))); - } - return; - } - s => { - result_sender.send(Err(RosterError::UnexpectedStanza(s))); - return; - } - }, - Ok(Err(e)) => { - result_sender.send(Err(RosterError::Read(e))); - return; - } - Err(e) => { - result_sender.send(Err(RosterError::Write(WriteError::Actor(e.into())))); - return; - } + Ok(contacts) + } + ref s @ Stanza::Iq(Iq { + from: _, + ref id, + to: _, + r#type, + lang: _, + query: _, + ref errors, + }) if *id == iq_id && r#type == IqType::Error => { + if let Some(error) = errors.first() { + Err(RosterError::StanzaError(error.clone())) + } else { + Err(RosterError::UnexpectedStanza(s.clone())) + } + } + s => Err(RosterError::UnexpectedStanza(s)), + } +} + +pub async fn handle_add_contact( + logic: &ClientLogic, + connection: Connected, + jid: JID, +) -> Result<(), RosterError> { + let iq_id = Uuid::new_v4().to_string(); + let set_stanza = Stanza::Iq(Iq { + from: Some(connection.jid().clone()), + id: iq_id.clone(), + to: None, + r#type: IqType::Set, + lang: None, + query: Some(iq::Query::Roster(stanza::roster::Query { + ver: None, + items: vec![stanza::roster::Item { + approved: None, + ask: false, + jid, + name: None, + subscription: None, + groups: Vec::new(), + }], + })), + errors: Vec::new(), + }); + let response = logic + .pending() + .request(&connection, set_stanza, iq_id.clone()) + .await?; + match response { + Stanza::Iq(Iq { + from: _, + id, + to: _, + r#type, + lang: _, + query: _, + errors: _, + }) if id == iq_id && r#type == IqType::Result => Ok(()), + ref s @ Stanza::Iq(Iq { + from: _, + ref id, + to: _, + r#type, + lang: _, + query: _, + ref errors, + }) if *id == iq_id && r#type == IqType::Error => { + if let Some(error) = errors.first() { + Err(RosterError::StanzaError(error.clone())) + } else { + Err(RosterError::UnexpectedStanza(s.clone())) + } + } + s => Err(RosterError::UnexpectedStanza(s)), + } +} + +pub async fn handle_buddy_request(connection: Connected, jid: JID) -> Result<(), WriteError> { + let presence = Stanza::Presence(stanza::client::presence::Presence { + from: None, + id: None, + to: Some(jid.clone()), + r#type: Some(stanza::client::presence::PresenceType::Subscribe), + lang: None, + show: None, + status: None, + priority: None, + errors: Vec::new(), + delay: None, + }); + connection.write_handle().write(presence).await?; + let presence = Stanza::Presence(stanza::client::presence::Presence { + from: None, + id: None, + to: Some(jid), + r#type: Some(stanza::client::presence::PresenceType::Subscribed), + lang: None, + show: None, + status: None, + priority: None, + errors: Vec::new(), + delay: None, + }); + connection.write_handle().write(presence).await?; + Ok(()) +} + +pub async fn handle_subscription_request( + connection: Connected, + jid: JID, +) -> Result<(), WriteError> { + // TODO: i should probably have builders + let presence = Stanza::Presence(stanza::client::presence::Presence { + from: None, + id: None, + to: Some(jid), + r#type: Some(stanza::client::presence::PresenceType::Subscribe), + lang: None, + show: None, + status: None, + priority: None, + errors: Vec::new(), + delay: None, + }); + connection.write_handle().write(presence).await?; + Ok(()) +} + +pub async fn handle_accept_buddy_request( + connection: Connected, + jid: JID, +) -> Result<(), WriteError> { + let presence = Stanza::Presence(stanza::client::presence::Presence { + from: None, + id: None, + to: Some(jid.clone()), + r#type: Some(stanza::client::presence::PresenceType::Subscribed), + lang: None, + show: None, + status: None, + priority: None, + errors: Vec::new(), + delay: None, + }); + connection.write_handle().write(presence).await?; + let presence = Stanza::Presence(stanza::client::presence::Presence { + from: None, + id: None, + to: Some(jid), + r#type: Some(stanza::client::presence::PresenceType::Subscribe), + lang: None, + show: None, + status: None, + priority: None, + errors: Vec::new(), + delay: None, + }); + connection.write_handle().write(presence).await?; + Ok(()) +} + +pub async fn handle_accept_subscription_request( + connection: Connected, + jid: JID, +) -> Result<(), WriteError> { + let presence = Stanza::Presence(stanza::client::presence::Presence { + from: None, + id: None, + to: Some(jid), + r#type: Some(stanza::client::presence::PresenceType::Subscribe), + lang: None, + show: None, + status: None, + priority: None, + errors: Vec::new(), + delay: None, + }); + connection.write_handle().write(presence).await?; + Ok(()) +} + +pub async fn handle_unsubscribe_from_contact( + connection: Connected, + jid: JID, +) -> Result<(), WriteError> { + let presence = Stanza::Presence(stanza::client::presence::Presence { + from: None, + id: None, + to: Some(jid), + r#type: Some(stanza::client::presence::PresenceType::Unsubscribe), + lang: None, + show: None, + status: None, + priority: None, + errors: Vec::new(), + delay: None, + }); + connection.write_handle().write(presence).await?; + Ok(()) +} + +pub async fn handle_unsubscribe_contact(connection: Connected, jid: JID) -> Result<(), WriteError> { + let presence = Stanza::Presence(stanza::client::presence::Presence { + from: None, + id: None, + to: Some(jid), + r#type: Some(stanza::client::presence::PresenceType::Unsubscribed), + lang: None, + show: None, + status: None, + priority: None, + errors: Vec::new(), + delay: None, + }); + connection.write_handle().write(presence).await?; + Ok(()) +} + +pub async fn handle_unfriend_contact(connection: Connected, jid: JID) -> Result<(), WriteError> { + let presence = Stanza::Presence(stanza::client::presence::Presence { + from: None, + id: None, + to: Some(jid.clone()), + r#type: Some(stanza::client::presence::PresenceType::Unsubscribe), + lang: None, + show: None, + status: None, + priority: None, + errors: Vec::new(), + delay: None, + }); + connection.write_handle().write(presence).await?; + let presence = Stanza::Presence(stanza::client::presence::Presence { + from: None, + id: None, + to: Some(jid), + r#type: Some(stanza::client::presence::PresenceType::Unsubscribed), + lang: None, + show: None, + status: None, + priority: None, + errors: Vec::new(), + delay: None, + }); + connection.write_handle().write(presence).await?; + Ok(()) +} + +pub async fn handle_delete_contact( + logic: &ClientLogic, + connection: Connected, + jid: JID, +) -> Result<(), RosterError> { + let iq_id = Uuid::new_v4().to_string(); + let set_stanza = Stanza::Iq(Iq { + from: Some(connection.jid().clone()), + id: iq_id.clone(), + to: None, + r#type: IqType::Set, + lang: None, + query: Some(iq::Query::Roster(stanza::roster::Query { + ver: None, + items: vec![stanza::roster::Item { + approved: None, + ask: false, + jid, + name: None, + subscription: Some(stanza::roster::Subscription::Remove), + groups: Vec::new(), + }], + })), + errors: Vec::new(), + }); + let result = logic + .pending() + .request(&connection, set_stanza, iq_id.clone()) + .await?; + match result { + Stanza::Iq(Iq { + from: _, + id, + to: _, + r#type, + lang: _, + query: _, + errors: _, + // don't really need to check matching id as request() does this anyway + }) if id == iq_id && r#type == IqType::Result => Ok(()), + ref s @ Stanza::Iq(Iq { + from: _, + ref id, + to: _, + r#type, + lang: _, + query: _, + ref errors, + }) if *id == iq_id && r#type == IqType::Error => { + if let Some(error) = errors.first() { + Err(RosterError::StanzaError(error.clone())) + } else { + Err(RosterError::UnexpectedStanza(s.clone())) + } + } + s => Err(RosterError::UnexpectedStanza(s)), + } +} + +pub async fn handle_update_contact( + logic: &ClientLogic, + connection: Connected, + jid: JID, + contact_update: ContactUpdate, +) -> Result<(), RosterError> { + let iq_id = Uuid::new_v4().to_string(); + let groups = Vec::from_iter( + contact_update + .groups + .into_iter() + .map(|group| stanza::roster::Group(Some(group))), + ); + let set_stanza = Stanza::Iq(Iq { + from: Some(connection.jid().clone()), + id: iq_id.clone(), + to: None, + r#type: IqType::Set, + lang: None, + query: Some(iq::Query::Roster(stanza::roster::Query { + ver: None, + items: vec![stanza::roster::Item { + approved: None, + ask: false, + jid, + name: contact_update.name, + subscription: None, + groups, + }], + })), + errors: Vec::new(), + }); + let response = logic + .pending() + .request(&connection, set_stanza, iq_id.clone()) + .await?; + match response { + Stanza::Iq(Iq { + from: _, + id, + to: _, + r#type, + lang: _, + query: _, + errors: _, + }) if id == iq_id && r#type == IqType::Result => Ok(()), + ref s @ Stanza::Iq(Iq { + from: _, + ref id, + to: _, + r#type, + lang: _, + query: _, + ref errors, + }) if *id == iq_id && r#type == IqType::Error => { + if let Some(error) = errors.first() { + Err(RosterError::StanzaError(error.clone())) + } else { + Err(RosterError::UnexpectedStanza(s.clone())) } } + s => Err(RosterError::UnexpectedStanza(s)), + } +} + +pub async fn handle_set_status( + logic: &ClientLogic, + connection: Connected, + online: Online, +) -> Result<(), StatusError> { + logic + .db() + .upsert_cached_status(online.clone()) + .await + .map_err(|e| DatabaseError(e.into()))?; + connection + .write_handle() + .write(Stanza::Presence(online.into_stanza(None))) + .await?; + Ok(()) +} + +pub async fn handle_send_message( + logic: &ClientLogic, + connection: Connected, + jid: JID, + body: Body, +) -> Result<(), WriteError> { + let id = Uuid::new_v4(); + let timestamp = Utc::now(); + let message = Stanza::Message(stanza::client::message::Message { + from: Some(connection.jid().clone()), + id: Some(id.to_string()), + to: Some(jid.clone()), + // TODO: specify message type + r#type: stanza::client::message::MessageType::Chat, + // TODO: lang ? + lang: None, + subject: None, + body: Some(stanza::client::message::Body { + lang: None, + body: Some(body.body.clone()), + }), + thread: None, + // include delay to have a consistent timestamp between server and client + delay: Some(Delay { + from: None, + stamp: timestamp, + }), + }); + connection.write_handle().write(message).await?; + let mut message = Message { + id, + from: connection.jid().clone(), + body, + timestamp, + }; + info!("sent message: {:?}", message); + if let Err(e) = logic + .db() + .create_message_with_self_resource_and_chat(message.clone(), jid.clone()) + .await + { + // TODO: should these really be handle_error or just the error macro? + logic + .handle_error(MessageSendError::MessageHistory(e.into()).into()) + .await; + } + // TODO: don't do this, have separate from from details + message.from = message.from.as_bare(); + let _ = logic + .update_sender() + .send(UpdateMessage::Message { to: jid, message }) + .await; + Ok(()) + // TODO: refactor this to send a sending updatemessage, then update or something like that +} + +pub async fn handle_send_presence( + connection: Connected, + jid: Option<JID>, + presence: PresenceType, +) -> Result<(), WriteError> { + let mut presence: stanza::client::presence::Presence = presence.into(); + presence.to = jid; + connection + .write_handle() + .write(Stanza::Presence(presence)) + .await?; + Ok(()) +} + +// TODO: could probably macro-ise? +pub async fn handle_online_result( + logic: &ClientLogic, + command: Command, + connection: Connected, +) -> Result<(), Error> { + match command { + Command::GetRoster(result_sender) => { + let roster = handle_get_roster(logic, connection).await; + let _ = result_sender.send(roster); + } Command::GetChats(sender) => { - let chats = logic.db().read_chats().await.map_err(|e| e.into()); - sender.send(chats); + let chats = handle_get_chats(logic).await; + let _ = sender.send(chats); } Command::GetChatsOrdered(sender) => { - let chats = logic.db().read_chats_ordered().await.map_err(|e| e.into()); - sender.send(chats); + let chats = handle_get_chats_ordered(logic).await; + let _ = sender.send(chats); } Command::GetChatsOrderedWithLatestMessages(sender) => { - let chats = logic - .db() - .read_chats_ordered_with_latest_messages() - .await - .map_err(|e| e.into()); - sender.send(chats); + let chats = handle_get_chats_ordered_with_latest_messages(logic).await; + let _ = sender.send(chats); } Command::GetChat(jid, sender) => { - let chats = logic.db().read_chat(jid).await.map_err(|e| e.into()); - sender.send(chats); + let chat = handle_get_chat(logic, jid).await; + let _ = sender.send(chat); } Command::GetMessages(jid, sender) => { - let messages = logic - .db() - .read_message_history(jid) - .await - .map_err(|e| e.into()); - sender.send(messages); + let messages = handle_get_messages(logic, jid).await; + let _ = sender.send(messages); } Command::DeleteChat(jid, sender) => { - let result = logic.db().delete_chat(jid).await.map_err(|e| e.into()); - sender.send(result); + let result = handle_delete_chat(logic, jid).await; + let _ = sender.send(result); } Command::DeleteMessage(uuid, sender) => { - let result = logic.db().delete_message(uuid).await.map_err(|e| e.into()); - sender.send(result); + let result = handle_delete_messaage(logic, uuid).await; + let _ = sender.send(result); } Command::GetUser(jid, sender) => { - let user = logic.db().read_user(jid).await.map_err(|e| e.into()); - sender.send(user); + let user = handle_get_user(logic, jid).await; + let _ = sender.send(user); } // TODO: offline queue to modify roster Command::AddContact(jid, sender) => { - let iq_id = Uuid::new_v4().to_string(); - let set_stanza = Stanza::Iq(Iq { - from: Some(connection.jid().clone()), - id: iq_id.clone(), - to: None, - r#type: IqType::Set, - lang: None, - query: Some(iq::Query::Roster(stanza::roster::Query { - ver: None, - items: vec![stanza::roster::Item { - approved: None, - ask: false, - jid, - name: None, - subscription: None, - groups: Vec::new(), - }], - })), - errors: Vec::new(), - }); - let (send, recv) = oneshot::channel(); - { - logic.pending().lock().await.insert(iq_id.clone(), send); - } - // TODO: write_handle send helper function - let result = connection.write_handle().write(set_stanza).await; - if let Err(e) = result { - sender.send(Err(RosterError::Write(e))); - return; - } - let iq_result = recv.await; - match iq_result { - Ok(i) => match i { - Ok(iq_result) => match iq_result { - Stanza::Iq(Iq { - from: _, - id, - to: _, - r#type, - lang: _, - query: _, - errors: _, - }) if id == iq_id && r#type == IqType::Result => { - sender.send(Ok(())); - return; - } - ref s @ Stanza::Iq(Iq { - from: _, - ref id, - to: _, - r#type, - lang: _, - query: _, - ref errors, - }) if *id == iq_id && r#type == IqType::Error => { - if let Some(error) = errors.first() { - sender.send(Err(RosterError::StanzaError(error.clone()))); - } else { - sender.send(Err(RosterError::UnexpectedStanza(s.clone()))); - } - return; - } - s => { - sender.send(Err(RosterError::UnexpectedStanza(s))); - return; - } - }, - Err(e) => { - sender.send(Err(e.into())); - return; - } - }, - Err(e) => { - sender.send(Err(RosterError::Write(WriteError::Actor(e.into())))); - return; - } - } + let result = handle_add_contact(logic, connection, jid).await; + let _ = sender.send(result); } Command::BuddyRequest(jid, sender) => { - let presence = Stanza::Presence(stanza::client::presence::Presence { - from: None, - id: None, - to: Some(jid.clone()), - r#type: Some(stanza::client::presence::PresenceType::Subscribe), - lang: None, - show: None, - status: None, - priority: None, - errors: Vec::new(), - delay: None, - }); - let result = connection.write_handle().write(presence).await; - match result { - Err(_) => { - let _ = sender.send(result); - } - Ok(()) => { - let presence = Stanza::Presence(stanza::client::presence::Presence { - from: None, - id: None, - to: Some(jid), - r#type: Some(stanza::client::presence::PresenceType::Subscribed), - lang: None, - show: None, - status: None, - priority: None, - errors: Vec::new(), - delay: None, - }); - let result = connection.write_handle().write(presence).await; - let _ = sender.send(result); - } - } + let result = handle_buddy_request(connection, jid).await; + let _ = sender.send(result); } Command::SubscriptionRequest(jid, sender) => { - // TODO: i should probably have builders - let presence = Stanza::Presence(stanza::client::presence::Presence { - from: None, - id: None, - to: Some(jid), - r#type: Some(stanza::client::presence::PresenceType::Subscribe), - lang: None, - show: None, - status: None, - priority: None, - errors: Vec::new(), - delay: None, - }); - let result = connection.write_handle().write(presence).await; + let result = handle_subscription_request(connection, jid).await; let _ = sender.send(result); } Command::AcceptBuddyRequest(jid, sender) => { - let presence = Stanza::Presence(stanza::client::presence::Presence { - from: None, - id: None, - to: Some(jid.clone()), - r#type: Some(stanza::client::presence::PresenceType::Subscribed), - lang: None, - show: None, - status: None, - priority: None, - errors: Vec::new(), - delay: None, - }); - let result = connection.write_handle().write(presence).await; - match result { - Err(_) => { - let _ = sender.send(result); - } - Ok(()) => { - let presence = Stanza::Presence(stanza::client::presence::Presence { - from: None, - id: None, - to: Some(jid), - r#type: Some(stanza::client::presence::PresenceType::Subscribe), - lang: None, - show: None, - status: None, - priority: None, - errors: Vec::new(), - delay: None, - }); - let result = connection.write_handle().write(presence).await; - let _ = sender.send(result); - } - } + let result = handle_accept_buddy_request(connection, jid).await; + let _ = sender.send(result); } Command::AcceptSubscriptionRequest(jid, sender) => { - let presence = Stanza::Presence(stanza::client::presence::Presence { - from: None, - id: None, - to: Some(jid), - r#type: Some(stanza::client::presence::PresenceType::Subscribe), - lang: None, - show: None, - status: None, - priority: None, - errors: Vec::new(), - delay: None, - }); - let result = connection.write_handle().write(presence).await; + let result = handle_accept_subscription_request(connection, jid).await; let _ = sender.send(result); } Command::UnsubscribeFromContact(jid, sender) => { - let presence = Stanza::Presence(stanza::client::presence::Presence { - from: None, - id: None, - to: Some(jid), - r#type: Some(stanza::client::presence::PresenceType::Unsubscribe), - lang: None, - show: None, - status: None, - priority: None, - errors: Vec::new(), - delay: None, - }); - let result = connection.write_handle().write(presence).await; + let result = handle_unsubscribe_from_contact(connection, jid).await; let _ = sender.send(result); } Command::UnsubscribeContact(jid, sender) => { - let presence = Stanza::Presence(stanza::client::presence::Presence { - from: None, - id: None, - to: Some(jid), - r#type: Some(stanza::client::presence::PresenceType::Unsubscribed), - lang: None, - show: None, - status: None, - priority: None, - errors: Vec::new(), - delay: None, - }); - let result = connection.write_handle().write(presence).await; + let result = handle_unsubscribe_contact(connection, jid).await; let _ = sender.send(result); } Command::UnfriendContact(jid, sender) => { - let presence = Stanza::Presence(stanza::client::presence::Presence { - from: None, - id: None, - to: Some(jid.clone()), - r#type: Some(stanza::client::presence::PresenceType::Unsubscribe), - lang: None, - show: None, - status: None, - priority: None, - errors: Vec::new(), - delay: None, - }); - let result = connection.write_handle().write(presence).await; - match result { - Err(_) => { - let _ = sender.send(result); - } - Ok(()) => { - let presence = Stanza::Presence(stanza::client::presence::Presence { - from: None, - id: None, - to: Some(jid), - r#type: Some(stanza::client::presence::PresenceType::Unsubscribed), - lang: None, - show: None, - status: None, - priority: None, - errors: Vec::new(), - delay: None, - }); - let result = connection.write_handle().write(presence).await; - let _ = sender.send(result); - } - } + let result = handle_unfriend_contact(connection, jid).await; + let _ = sender.send(result); } Command::DeleteContact(jid, sender) => { - let iq_id = Uuid::new_v4().to_string(); - let set_stanza = Stanza::Iq(Iq { - from: Some(connection.jid().clone()), - id: iq_id.clone(), - to: None, - r#type: IqType::Set, - lang: None, - query: Some(iq::Query::Roster(stanza::roster::Query { - ver: None, - items: vec![stanza::roster::Item { - approved: None, - ask: false, - jid, - name: None, - subscription: Some(stanza::roster::Subscription::Remove), - groups: Vec::new(), - }], - })), - errors: Vec::new(), - }); - let (send, recv) = oneshot::channel(); - { - logic.pending().lock().await.insert(iq_id.clone(), send); - } - let result = connection.write_handle().write(set_stanza).await; - if let Err(e) = result { - sender.send(Err(RosterError::Write(e))); - return; - } - let iq_result = recv.await; - match iq_result { - Ok(i) => match i { - Ok(iq_result) => match iq_result { - Stanza::Iq(Iq { - from: _, - id, - to: _, - r#type, - lang: _, - query: _, - errors: _, - }) if id == iq_id && r#type == IqType::Result => { - sender.send(Ok(())); - return; - } - ref s @ Stanza::Iq(Iq { - from: _, - ref id, - to: _, - r#type, - lang: _, - query: _, - ref errors, - }) if *id == iq_id && r#type == IqType::Error => { - if let Some(error) = errors.first() { - sender.send(Err(RosterError::StanzaError(error.clone()))); - } else { - sender.send(Err(RosterError::UnexpectedStanza(s.clone()))); - } - return; - } - s => { - sender.send(Err(RosterError::UnexpectedStanza(s))); - return; - } - }, - Err(e) => { - sender.send(Err(e.into())); - return; - } - }, - Err(e) => { - sender.send(Err(RosterError::Write(WriteError::Actor(e.into())))); - return; - } - } + let result = handle_delete_contact(logic, connection, jid).await; + let _ = sender.send(result); } Command::UpdateContact(jid, contact_update, sender) => { - let iq_id = Uuid::new_v4().to_string(); - let groups = Vec::from_iter( - contact_update - .groups - .into_iter() - .map(|group| stanza::roster::Group(Some(group))), - ); - let set_stanza = Stanza::Iq(Iq { - from: Some(connection.jid().clone()), - id: iq_id.clone(), - to: None, - r#type: IqType::Set, - lang: None, - query: Some(iq::Query::Roster(stanza::roster::Query { - ver: None, - items: vec![stanza::roster::Item { - approved: None, - ask: false, - jid, - name: contact_update.name, - subscription: None, - groups, - }], - })), - errors: Vec::new(), - }); - let (send, recv) = oneshot::channel(); - { - logic.pending().lock().await.insert(iq_id.clone(), send); - } - let result = connection.write_handle().write(set_stanza).await; - if let Err(e) = result { - sender.send(Err(RosterError::Write(e))); - return; - } - let iq_result = recv.await; - match iq_result { - Ok(i) => match i { - Ok(iq_result) => match iq_result { - Stanza::Iq(Iq { - from: _, - id, - to: _, - r#type, - lang: _, - query: _, - errors: _, - }) if id == iq_id && r#type == IqType::Result => { - sender.send(Ok(())); - return; - } - ref s @ Stanza::Iq(Iq { - from: _, - ref id, - to: _, - r#type, - lang: _, - query: _, - ref errors, - }) if *id == iq_id && r#type == IqType::Error => { - if let Some(error) = errors.first() { - sender.send(Err(RosterError::StanzaError(error.clone()))); - } else { - sender.send(Err(RosterError::UnexpectedStanza(s.clone()))); - } - return; - } - s => { - sender.send(Err(RosterError::UnexpectedStanza(s))); - return; - } - }, - Err(e) => { - sender.send(Err(e.into())); - return; - } - }, - Err(e) => { - sender.send(Err(RosterError::Write(WriteError::Actor(e.into())))); - return; - } - } + let result = handle_update_contact(logic, connection, jid, contact_update).await; + let _ = sender.send(result); } Command::SetStatus(online, sender) => { - let result = logic.db().upsert_cached_status(online.clone()).await; - if let Err(e) = result { - logic - .handle_error(StatusError::Cache(e.into()).into()) - .await; - } - let result = connection - .write_handle() - .write(Stanza::Presence(online.into_stanza(None))) - .await - .map_err(|e| StatusError::Write(e)); - // .map_err(|e| StatusError::Write(e)); + let result = handle_set_status(logic, connection, online).await; let _ = sender.send(result); } // TODO: offline message queue Command::SendMessage(jid, body, sender) => { - let id = Uuid::new_v4(); - let message = Stanza::Message(stanza::client::message::Message { - from: Some(connection.jid().clone()), - id: Some(id.to_string()), - to: Some(jid.clone()), - // TODO: specify message type - r#type: stanza::client::message::MessageType::Chat, - // TODO: lang ? - lang: None, - subject: None, - body: Some(stanza::client::message::Body { - lang: None, - body: Some(body.body.clone()), - }), - thread: None, - delay: None, - }); - let _ = sender.send(Ok(())); - // let _ = sender.send(Ok(message.clone())); - let result = connection.write_handle().write(message).await; - match result { - Ok(_) => { - let mut message = Message { - id, - from: connection.jid().clone(), - body, - timestamp: Utc::now(), - }; - info!("send message {:?}", message); - if let Err(e) = logic - .db() - .create_message_with_self_resource_and_chat(message.clone(), jid.clone()) - .await - { - logic - .handle_error(MessageSendError::MessageHistory(e.into()).into()) - .await; - } - // TODO: don't do this, have separate from from details - message.from = message.from.as_bare(); - let _ = logic - .update_sender() - .send(UpdateMessage::Message { to: jid, message }) - .await; - } - Err(_) => { - // let _ = sender.send(result); - } - } + let result = handle_send_message(logic, connection, jid, body).await; + let _ = sender.send(result); } Command::SendPresence(jid, presence, sender) => { - let mut presence: stanza::client::presence::Presence = presence.into(); - if let Some(jid) = jid { - presence.to = Some(jid); - }; - let result = connection - .write_handle() - .write(Stanza::Presence(presence)) - .await; - // .map_err(|e| StatusError::Write(e)); + let result = handle_send_presence(connection, jid, presence).await; let _ = sender.send(result); } } + Ok(()) } |