aboutsummaryrefslogtreecommitdiffstats
path: root/filamento/src/logic/online.rs
diff options
context:
space:
mode:
Diffstat (limited to 'filamento/src/logic/online.rs')
-rw-r--r--filamento/src/logic/online.rs1152
1 files changed, 570 insertions, 582 deletions
diff --git a/filamento/src/logic/online.rs b/filamento/src/logic/online.rs
index 967ebb2..8bbeaa5 100644
--- a/filamento/src/logic/online.rs
+++ b/filamento/src/logic/online.rs
@@ -1,8 +1,12 @@
use chrono::Utc;
+use jid::JID;
use lampada::{Connected, WriteMessage, error::WriteError};
-use stanza::client::{
- Stanza,
- iq::{self, Iq, IqType},
+use stanza::{
+ client::{
+ Stanza,
+ iq::{self, Iq, IqType},
+ },
+ xep_0203::Delay,
};
use tokio::sync::oneshot;
use tracing::{debug, info};
@@ -10,641 +14,625 @@ use uuid::Uuid;
use crate::{
Command, UpdateMessage,
- chat::Message,
- error::{Error, MessageSendError, RosterError, StatusError},
- roster::Contact,
+ chat::{Body, Message},
+ error::{DatabaseError, Error, MessageSendError, RosterError, StatusError},
+ presence::{Online, Presence, PresenceType},
+ roster::{Contact, ContactUpdate},
};
-use super::ClientLogic;
+use super::{
+ ClientLogic,
+ local_only::{
+ handle_delete_chat, handle_delete_messaage, handle_get_chat, handle_get_chats,
+ handle_get_chats_ordered, handle_get_chats_ordered_with_latest_messages,
+ handle_get_messages, handle_get_user,
+ },
+};
pub async fn handle_online(logic: ClientLogic, command: Command, connection: Connected) {
- match command {
- Command::GetRoster(result_sender) => {
- let iq_id = Uuid::new_v4().to_string();
- let (send, iq_recv) = oneshot::channel();
- {
- logic.pending().lock().await.insert(iq_id.clone(), send);
- }
- let stanza = Stanza::Iq(Iq {
- from: Some(connection.jid().clone()),
- id: iq_id.to_string(),
- to: None,
- r#type: IqType::Get,
- lang: None,
- query: Some(iq::Query::Roster(stanza::roster::Query {
- ver: None,
- items: Vec::new(),
- })),
- errors: Vec::new(),
- });
- let (send, recv) = oneshot::channel();
- let _ = connection
- .write_handle()
- .send(WriteMessage {
- stanza,
- respond_to: send,
- })
- .await;
- // TODO: timeout
- match recv.await {
- Ok(Ok(())) => info!("roster request sent"),
- Ok(Err(e)) => {
- // TODO: log errors if fail to send
- let _ = result_sender.send(Err(RosterError::Write(e.into())));
- return;
- }
- Err(e) => {
- let _ =
- result_sender.send(Err(RosterError::Write(WriteError::Actor(e.into()))));
- return;
- }
+ let result = handle_online_result(&logic, command, connection).await;
+ match result {
+ Ok(_) => {}
+ Err(e) => logic.handle_error(e).await,
+ }
+}
+
+pub async fn handle_get_roster(
+ logic: &ClientLogic,
+ connection: Connected,
+) -> Result<Vec<Contact>, RosterError> {
+ let iq_id = Uuid::new_v4().to_string();
+ let stanza = Stanza::Iq(Iq {
+ from: Some(connection.jid().clone()),
+ id: iq_id.to_string(),
+ to: None,
+ r#type: IqType::Get,
+ lang: None,
+ query: Some(iq::Query::Roster(stanza::roster::Query {
+ ver: None,
+ items: Vec::new(),
+ })),
+ errors: Vec::new(),
+ });
+ let response = logic
+ .pending()
+ .request(&connection, stanza, iq_id.clone())
+ .await?;
+ // TODO: timeout
+ match response {
+ Stanza::Iq(Iq {
+ from: _,
+ id,
+ to: _,
+ r#type,
+ lang: _,
+ query: Some(iq::Query::Roster(stanza::roster::Query { ver: _, items })),
+ errors: _,
+ }) if id == iq_id && r#type == IqType::Result => {
+ let contacts: Vec<Contact> = items.into_iter().map(|item| item.into()).collect();
+ if let Err(e) = logic.db().replace_cached_roster(contacts.clone()).await {
+ logic
+ .handle_error(Error::Roster(RosterError::Cache(e.into())))
+ .await;
};
- // TODO: timeout
- match iq_recv.await {
- Ok(Ok(stanza)) => match stanza {
- Stanza::Iq(Iq {
- from: _,
- id,
- to: _,
- r#type,
- lang: _,
- query: Some(iq::Query::Roster(stanza::roster::Query { ver: _, items })),
- errors: _,
- }) if id == iq_id && r#type == IqType::Result => {
- let contacts: Vec<Contact> =
- items.into_iter().map(|item| item.into()).collect();
- if let Err(e) = logic.db().replace_cached_roster(contacts.clone()).await {
- logic
- .handle_error(Error::Roster(RosterError::Cache(e.into())))
- .await;
- };
- result_sender.send(Ok(contacts));
- return;
- }
- ref s @ Stanza::Iq(Iq {
- from: _,
- ref id,
- to: _,
- r#type,
- lang: _,
- query: _,
- ref errors,
- }) if *id == iq_id && r#type == IqType::Error => {
- if let Some(error) = errors.first() {
- result_sender.send(Err(RosterError::StanzaError(error.clone())));
- } else {
- result_sender.send(Err(RosterError::UnexpectedStanza(s.clone())));
- }
- return;
- }
- s => {
- result_sender.send(Err(RosterError::UnexpectedStanza(s)));
- return;
- }
- },
- Ok(Err(e)) => {
- result_sender.send(Err(RosterError::Read(e)));
- return;
- }
- Err(e) => {
- result_sender.send(Err(RosterError::Write(WriteError::Actor(e.into()))));
- return;
- }
+ Ok(contacts)
+ }
+ ref s @ Stanza::Iq(Iq {
+ from: _,
+ ref id,
+ to: _,
+ r#type,
+ lang: _,
+ query: _,
+ ref errors,
+ }) if *id == iq_id && r#type == IqType::Error => {
+ if let Some(error) = errors.first() {
+ Err(RosterError::StanzaError(error.clone()))
+ } else {
+ Err(RosterError::UnexpectedStanza(s.clone()))
+ }
+ }
+ s => Err(RosterError::UnexpectedStanza(s)),
+ }
+}
+
+pub async fn handle_add_contact(
+ logic: &ClientLogic,
+ connection: Connected,
+ jid: JID,
+) -> Result<(), RosterError> {
+ let iq_id = Uuid::new_v4().to_string();
+ let set_stanza = Stanza::Iq(Iq {
+ from: Some(connection.jid().clone()),
+ id: iq_id.clone(),
+ to: None,
+ r#type: IqType::Set,
+ lang: None,
+ query: Some(iq::Query::Roster(stanza::roster::Query {
+ ver: None,
+ items: vec![stanza::roster::Item {
+ approved: None,
+ ask: false,
+ jid,
+ name: None,
+ subscription: None,
+ groups: Vec::new(),
+ }],
+ })),
+ errors: Vec::new(),
+ });
+ let response = logic
+ .pending()
+ .request(&connection, set_stanza, iq_id.clone())
+ .await?;
+ match response {
+ Stanza::Iq(Iq {
+ from: _,
+ id,
+ to: _,
+ r#type,
+ lang: _,
+ query: _,
+ errors: _,
+ }) if id == iq_id && r#type == IqType::Result => Ok(()),
+ ref s @ Stanza::Iq(Iq {
+ from: _,
+ ref id,
+ to: _,
+ r#type,
+ lang: _,
+ query: _,
+ ref errors,
+ }) if *id == iq_id && r#type == IqType::Error => {
+ if let Some(error) = errors.first() {
+ Err(RosterError::StanzaError(error.clone()))
+ } else {
+ Err(RosterError::UnexpectedStanza(s.clone()))
+ }
+ }
+ s => Err(RosterError::UnexpectedStanza(s)),
+ }
+}
+
+pub async fn handle_buddy_request(connection: Connected, jid: JID) -> Result<(), WriteError> {
+ let presence = Stanza::Presence(stanza::client::presence::Presence {
+ from: None,
+ id: None,
+ to: Some(jid.clone()),
+ r#type: Some(stanza::client::presence::PresenceType::Subscribe),
+ lang: None,
+ show: None,
+ status: None,
+ priority: None,
+ errors: Vec::new(),
+ delay: None,
+ });
+ connection.write_handle().write(presence).await?;
+ let presence = Stanza::Presence(stanza::client::presence::Presence {
+ from: None,
+ id: None,
+ to: Some(jid),
+ r#type: Some(stanza::client::presence::PresenceType::Subscribed),
+ lang: None,
+ show: None,
+ status: None,
+ priority: None,
+ errors: Vec::new(),
+ delay: None,
+ });
+ connection.write_handle().write(presence).await?;
+ Ok(())
+}
+
+pub async fn handle_subscription_request(
+ connection: Connected,
+ jid: JID,
+) -> Result<(), WriteError> {
+ // TODO: i should probably have builders
+ let presence = Stanza::Presence(stanza::client::presence::Presence {
+ from: None,
+ id: None,
+ to: Some(jid),
+ r#type: Some(stanza::client::presence::PresenceType::Subscribe),
+ lang: None,
+ show: None,
+ status: None,
+ priority: None,
+ errors: Vec::new(),
+ delay: None,
+ });
+ connection.write_handle().write(presence).await?;
+ Ok(())
+}
+
+pub async fn handle_accept_buddy_request(
+ connection: Connected,
+ jid: JID,
+) -> Result<(), WriteError> {
+ let presence = Stanza::Presence(stanza::client::presence::Presence {
+ from: None,
+ id: None,
+ to: Some(jid.clone()),
+ r#type: Some(stanza::client::presence::PresenceType::Subscribed),
+ lang: None,
+ show: None,
+ status: None,
+ priority: None,
+ errors: Vec::new(),
+ delay: None,
+ });
+ connection.write_handle().write(presence).await?;
+ let presence = Stanza::Presence(stanza::client::presence::Presence {
+ from: None,
+ id: None,
+ to: Some(jid),
+ r#type: Some(stanza::client::presence::PresenceType::Subscribe),
+ lang: None,
+ show: None,
+ status: None,
+ priority: None,
+ errors: Vec::new(),
+ delay: None,
+ });
+ connection.write_handle().write(presence).await?;
+ Ok(())
+}
+
+pub async fn handle_accept_subscription_request(
+ connection: Connected,
+ jid: JID,
+) -> Result<(), WriteError> {
+ let presence = Stanza::Presence(stanza::client::presence::Presence {
+ from: None,
+ id: None,
+ to: Some(jid),
+ r#type: Some(stanza::client::presence::PresenceType::Subscribe),
+ lang: None,
+ show: None,
+ status: None,
+ priority: None,
+ errors: Vec::new(),
+ delay: None,
+ });
+ connection.write_handle().write(presence).await?;
+ Ok(())
+}
+
+pub async fn handle_unsubscribe_from_contact(
+ connection: Connected,
+ jid: JID,
+) -> Result<(), WriteError> {
+ let presence = Stanza::Presence(stanza::client::presence::Presence {
+ from: None,
+ id: None,
+ to: Some(jid),
+ r#type: Some(stanza::client::presence::PresenceType::Unsubscribe),
+ lang: None,
+ show: None,
+ status: None,
+ priority: None,
+ errors: Vec::new(),
+ delay: None,
+ });
+ connection.write_handle().write(presence).await?;
+ Ok(())
+}
+
+pub async fn handle_unsubscribe_contact(connection: Connected, jid: JID) -> Result<(), WriteError> {
+ let presence = Stanza::Presence(stanza::client::presence::Presence {
+ from: None,
+ id: None,
+ to: Some(jid),
+ r#type: Some(stanza::client::presence::PresenceType::Unsubscribed),
+ lang: None,
+ show: None,
+ status: None,
+ priority: None,
+ errors: Vec::new(),
+ delay: None,
+ });
+ connection.write_handle().write(presence).await?;
+ Ok(())
+}
+
+pub async fn handle_unfriend_contact(connection: Connected, jid: JID) -> Result<(), WriteError> {
+ let presence = Stanza::Presence(stanza::client::presence::Presence {
+ from: None,
+ id: None,
+ to: Some(jid.clone()),
+ r#type: Some(stanza::client::presence::PresenceType::Unsubscribe),
+ lang: None,
+ show: None,
+ status: None,
+ priority: None,
+ errors: Vec::new(),
+ delay: None,
+ });
+ connection.write_handle().write(presence).await?;
+ let presence = Stanza::Presence(stanza::client::presence::Presence {
+ from: None,
+ id: None,
+ to: Some(jid),
+ r#type: Some(stanza::client::presence::PresenceType::Unsubscribed),
+ lang: None,
+ show: None,
+ status: None,
+ priority: None,
+ errors: Vec::new(),
+ delay: None,
+ });
+ connection.write_handle().write(presence).await?;
+ Ok(())
+}
+
+pub async fn handle_delete_contact(
+ logic: &ClientLogic,
+ connection: Connected,
+ jid: JID,
+) -> Result<(), RosterError> {
+ let iq_id = Uuid::new_v4().to_string();
+ let set_stanza = Stanza::Iq(Iq {
+ from: Some(connection.jid().clone()),
+ id: iq_id.clone(),
+ to: None,
+ r#type: IqType::Set,
+ lang: None,
+ query: Some(iq::Query::Roster(stanza::roster::Query {
+ ver: None,
+ items: vec![stanza::roster::Item {
+ approved: None,
+ ask: false,
+ jid,
+ name: None,
+ subscription: Some(stanza::roster::Subscription::Remove),
+ groups: Vec::new(),
+ }],
+ })),
+ errors: Vec::new(),
+ });
+ let result = logic
+ .pending()
+ .request(&connection, set_stanza, iq_id.clone())
+ .await?;
+ match result {
+ Stanza::Iq(Iq {
+ from: _,
+ id,
+ to: _,
+ r#type,
+ lang: _,
+ query: _,
+ errors: _,
+ // don't really need to check matching id as request() does this anyway
+ }) if id == iq_id && r#type == IqType::Result => Ok(()),
+ ref s @ Stanza::Iq(Iq {
+ from: _,
+ ref id,
+ to: _,
+ r#type,
+ lang: _,
+ query: _,
+ ref errors,
+ }) if *id == iq_id && r#type == IqType::Error => {
+ if let Some(error) = errors.first() {
+ Err(RosterError::StanzaError(error.clone()))
+ } else {
+ Err(RosterError::UnexpectedStanza(s.clone()))
+ }
+ }
+ s => Err(RosterError::UnexpectedStanza(s)),
+ }
+}
+
+pub async fn handle_update_contact(
+ logic: &ClientLogic,
+ connection: Connected,
+ jid: JID,
+ contact_update: ContactUpdate,
+) -> Result<(), RosterError> {
+ let iq_id = Uuid::new_v4().to_string();
+ let groups = Vec::from_iter(
+ contact_update
+ .groups
+ .into_iter()
+ .map(|group| stanza::roster::Group(Some(group))),
+ );
+ let set_stanza = Stanza::Iq(Iq {
+ from: Some(connection.jid().clone()),
+ id: iq_id.clone(),
+ to: None,
+ r#type: IqType::Set,
+ lang: None,
+ query: Some(iq::Query::Roster(stanza::roster::Query {
+ ver: None,
+ items: vec![stanza::roster::Item {
+ approved: None,
+ ask: false,
+ jid,
+ name: contact_update.name,
+ subscription: None,
+ groups,
+ }],
+ })),
+ errors: Vec::new(),
+ });
+ let response = logic
+ .pending()
+ .request(&connection, set_stanza, iq_id.clone())
+ .await?;
+ match response {
+ Stanza::Iq(Iq {
+ from: _,
+ id,
+ to: _,
+ r#type,
+ lang: _,
+ query: _,
+ errors: _,
+ }) if id == iq_id && r#type == IqType::Result => Ok(()),
+ ref s @ Stanza::Iq(Iq {
+ from: _,
+ ref id,
+ to: _,
+ r#type,
+ lang: _,
+ query: _,
+ ref errors,
+ }) if *id == iq_id && r#type == IqType::Error => {
+ if let Some(error) = errors.first() {
+ Err(RosterError::StanzaError(error.clone()))
+ } else {
+ Err(RosterError::UnexpectedStanza(s.clone()))
}
}
+ s => Err(RosterError::UnexpectedStanza(s)),
+ }
+}
+
+pub async fn handle_set_status(
+ logic: &ClientLogic,
+ connection: Connected,
+ online: Online,
+) -> Result<(), StatusError> {
+ logic
+ .db()
+ .upsert_cached_status(online.clone())
+ .await
+ .map_err(|e| DatabaseError(e.into()))?;
+ connection
+ .write_handle()
+ .write(Stanza::Presence(online.into_stanza(None)))
+ .await?;
+ Ok(())
+}
+
+pub async fn handle_send_message(
+ logic: &ClientLogic,
+ connection: Connected,
+ jid: JID,
+ body: Body,
+) -> Result<(), WriteError> {
+ let id = Uuid::new_v4();
+ let timestamp = Utc::now();
+ let message = Stanza::Message(stanza::client::message::Message {
+ from: Some(connection.jid().clone()),
+ id: Some(id.to_string()),
+ to: Some(jid.clone()),
+ // TODO: specify message type
+ r#type: stanza::client::message::MessageType::Chat,
+ // TODO: lang ?
+ lang: None,
+ subject: None,
+ body: Some(stanza::client::message::Body {
+ lang: None,
+ body: Some(body.body.clone()),
+ }),
+ thread: None,
+ // include delay to have a consistent timestamp between server and client
+ delay: Some(Delay {
+ from: None,
+ stamp: timestamp,
+ }),
+ });
+ connection.write_handle().write(message).await?;
+ let mut message = Message {
+ id,
+ from: connection.jid().clone(),
+ body,
+ timestamp,
+ };
+ info!("sent message: {:?}", message);
+ if let Err(e) = logic
+ .db()
+ .create_message_with_self_resource_and_chat(message.clone(), jid.clone())
+ .await
+ {
+ // TODO: should these really be handle_error or just the error macro?
+ logic
+ .handle_error(MessageSendError::MessageHistory(e.into()).into())
+ .await;
+ }
+ // TODO: don't do this, have separate from from details
+ message.from = message.from.as_bare();
+ let _ = logic
+ .update_sender()
+ .send(UpdateMessage::Message { to: jid, message })
+ .await;
+ Ok(())
+ // TODO: refactor this to send a sending updatemessage, then update or something like that
+}
+
+pub async fn handle_send_presence(
+ connection: Connected,
+ jid: Option<JID>,
+ presence: PresenceType,
+) -> Result<(), WriteError> {
+ let mut presence: stanza::client::presence::Presence = presence.into();
+ presence.to = jid;
+ connection
+ .write_handle()
+ .write(Stanza::Presence(presence))
+ .await?;
+ Ok(())
+}
+
+// TODO: could probably macro-ise?
+pub async fn handle_online_result(
+ logic: &ClientLogic,
+ command: Command,
+ connection: Connected,
+) -> Result<(), Error> {
+ match command {
+ Command::GetRoster(result_sender) => {
+ let roster = handle_get_roster(logic, connection).await;
+ let _ = result_sender.send(roster);
+ }
Command::GetChats(sender) => {
- let chats = logic.db().read_chats().await.map_err(|e| e.into());
- sender.send(chats);
+ let chats = handle_get_chats(logic).await;
+ let _ = sender.send(chats);
}
Command::GetChatsOrdered(sender) => {
- let chats = logic.db().read_chats_ordered().await.map_err(|e| e.into());
- sender.send(chats);
+ let chats = handle_get_chats_ordered(logic).await;
+ let _ = sender.send(chats);
}
Command::GetChatsOrderedWithLatestMessages(sender) => {
- let chats = logic
- .db()
- .read_chats_ordered_with_latest_messages()
- .await
- .map_err(|e| e.into());
- sender.send(chats);
+ let chats = handle_get_chats_ordered_with_latest_messages(logic).await;
+ let _ = sender.send(chats);
}
Command::GetChat(jid, sender) => {
- let chats = logic.db().read_chat(jid).await.map_err(|e| e.into());
- sender.send(chats);
+ let chat = handle_get_chat(logic, jid).await;
+ let _ = sender.send(chat);
}
Command::GetMessages(jid, sender) => {
- let messages = logic
- .db()
- .read_message_history(jid)
- .await
- .map_err(|e| e.into());
- sender.send(messages);
+ let messages = handle_get_messages(logic, jid).await;
+ let _ = sender.send(messages);
}
Command::DeleteChat(jid, sender) => {
- let result = logic.db().delete_chat(jid).await.map_err(|e| e.into());
- sender.send(result);
+ let result = handle_delete_chat(logic, jid).await;
+ let _ = sender.send(result);
}
Command::DeleteMessage(uuid, sender) => {
- let result = logic.db().delete_message(uuid).await.map_err(|e| e.into());
- sender.send(result);
+ let result = handle_delete_messaage(logic, uuid).await;
+ let _ = sender.send(result);
}
Command::GetUser(jid, sender) => {
- let user = logic.db().read_user(jid).await.map_err(|e| e.into());
- sender.send(user);
+ let user = handle_get_user(logic, jid).await;
+ let _ = sender.send(user);
}
// TODO: offline queue to modify roster
Command::AddContact(jid, sender) => {
- let iq_id = Uuid::new_v4().to_string();
- let set_stanza = Stanza::Iq(Iq {
- from: Some(connection.jid().clone()),
- id: iq_id.clone(),
- to: None,
- r#type: IqType::Set,
- lang: None,
- query: Some(iq::Query::Roster(stanza::roster::Query {
- ver: None,
- items: vec![stanza::roster::Item {
- approved: None,
- ask: false,
- jid,
- name: None,
- subscription: None,
- groups: Vec::new(),
- }],
- })),
- errors: Vec::new(),
- });
- let (send, recv) = oneshot::channel();
- {
- logic.pending().lock().await.insert(iq_id.clone(), send);
- }
- // TODO: write_handle send helper function
- let result = connection.write_handle().write(set_stanza).await;
- if let Err(e) = result {
- sender.send(Err(RosterError::Write(e)));
- return;
- }
- let iq_result = recv.await;
- match iq_result {
- Ok(i) => match i {
- Ok(iq_result) => match iq_result {
- Stanza::Iq(Iq {
- from: _,
- id,
- to: _,
- r#type,
- lang: _,
- query: _,
- errors: _,
- }) if id == iq_id && r#type == IqType::Result => {
- sender.send(Ok(()));
- return;
- }
- ref s @ Stanza::Iq(Iq {
- from: _,
- ref id,
- to: _,
- r#type,
- lang: _,
- query: _,
- ref errors,
- }) if *id == iq_id && r#type == IqType::Error => {
- if let Some(error) = errors.first() {
- sender.send(Err(RosterError::StanzaError(error.clone())));
- } else {
- sender.send(Err(RosterError::UnexpectedStanza(s.clone())));
- }
- return;
- }
- s => {
- sender.send(Err(RosterError::UnexpectedStanza(s)));
- return;
- }
- },
- Err(e) => {
- sender.send(Err(e.into()));
- return;
- }
- },
- Err(e) => {
- sender.send(Err(RosterError::Write(WriteError::Actor(e.into()))));
- return;
- }
- }
+ let result = handle_add_contact(logic, connection, jid).await;
+ let _ = sender.send(result);
}
Command::BuddyRequest(jid, sender) => {
- let presence = Stanza::Presence(stanza::client::presence::Presence {
- from: None,
- id: None,
- to: Some(jid.clone()),
- r#type: Some(stanza::client::presence::PresenceType::Subscribe),
- lang: None,
- show: None,
- status: None,
- priority: None,
- errors: Vec::new(),
- delay: None,
- });
- let result = connection.write_handle().write(presence).await;
- match result {
- Err(_) => {
- let _ = sender.send(result);
- }
- Ok(()) => {
- let presence = Stanza::Presence(stanza::client::presence::Presence {
- from: None,
- id: None,
- to: Some(jid),
- r#type: Some(stanza::client::presence::PresenceType::Subscribed),
- lang: None,
- show: None,
- status: None,
- priority: None,
- errors: Vec::new(),
- delay: None,
- });
- let result = connection.write_handle().write(presence).await;
- let _ = sender.send(result);
- }
- }
+ let result = handle_buddy_request(connection, jid).await;
+ let _ = sender.send(result);
}
Command::SubscriptionRequest(jid, sender) => {
- // TODO: i should probably have builders
- let presence = Stanza::Presence(stanza::client::presence::Presence {
- from: None,
- id: None,
- to: Some(jid),
- r#type: Some(stanza::client::presence::PresenceType::Subscribe),
- lang: None,
- show: None,
- status: None,
- priority: None,
- errors: Vec::new(),
- delay: None,
- });
- let result = connection.write_handle().write(presence).await;
+ let result = handle_subscription_request(connection, jid).await;
let _ = sender.send(result);
}
Command::AcceptBuddyRequest(jid, sender) => {
- let presence = Stanza::Presence(stanza::client::presence::Presence {
- from: None,
- id: None,
- to: Some(jid.clone()),
- r#type: Some(stanza::client::presence::PresenceType::Subscribed),
- lang: None,
- show: None,
- status: None,
- priority: None,
- errors: Vec::new(),
- delay: None,
- });
- let result = connection.write_handle().write(presence).await;
- match result {
- Err(_) => {
- let _ = sender.send(result);
- }
- Ok(()) => {
- let presence = Stanza::Presence(stanza::client::presence::Presence {
- from: None,
- id: None,
- to: Some(jid),
- r#type: Some(stanza::client::presence::PresenceType::Subscribe),
- lang: None,
- show: None,
- status: None,
- priority: None,
- errors: Vec::new(),
- delay: None,
- });
- let result = connection.write_handle().write(presence).await;
- let _ = sender.send(result);
- }
- }
+ let result = handle_accept_buddy_request(connection, jid).await;
+ let _ = sender.send(result);
}
Command::AcceptSubscriptionRequest(jid, sender) => {
- let presence = Stanza::Presence(stanza::client::presence::Presence {
- from: None,
- id: None,
- to: Some(jid),
- r#type: Some(stanza::client::presence::PresenceType::Subscribe),
- lang: None,
- show: None,
- status: None,
- priority: None,
- errors: Vec::new(),
- delay: None,
- });
- let result = connection.write_handle().write(presence).await;
+ let result = handle_accept_subscription_request(connection, jid).await;
let _ = sender.send(result);
}
Command::UnsubscribeFromContact(jid, sender) => {
- let presence = Stanza::Presence(stanza::client::presence::Presence {
- from: None,
- id: None,
- to: Some(jid),
- r#type: Some(stanza::client::presence::PresenceType::Unsubscribe),
- lang: None,
- show: None,
- status: None,
- priority: None,
- errors: Vec::new(),
- delay: None,
- });
- let result = connection.write_handle().write(presence).await;
+ let result = handle_unsubscribe_from_contact(connection, jid).await;
let _ = sender.send(result);
}
Command::UnsubscribeContact(jid, sender) => {
- let presence = Stanza::Presence(stanza::client::presence::Presence {
- from: None,
- id: None,
- to: Some(jid),
- r#type: Some(stanza::client::presence::PresenceType::Unsubscribed),
- lang: None,
- show: None,
- status: None,
- priority: None,
- errors: Vec::new(),
- delay: None,
- });
- let result = connection.write_handle().write(presence).await;
+ let result = handle_unsubscribe_contact(connection, jid).await;
let _ = sender.send(result);
}
Command::UnfriendContact(jid, sender) => {
- let presence = Stanza::Presence(stanza::client::presence::Presence {
- from: None,
- id: None,
- to: Some(jid.clone()),
- r#type: Some(stanza::client::presence::PresenceType::Unsubscribe),
- lang: None,
- show: None,
- status: None,
- priority: None,
- errors: Vec::new(),
- delay: None,
- });
- let result = connection.write_handle().write(presence).await;
- match result {
- Err(_) => {
- let _ = sender.send(result);
- }
- Ok(()) => {
- let presence = Stanza::Presence(stanza::client::presence::Presence {
- from: None,
- id: None,
- to: Some(jid),
- r#type: Some(stanza::client::presence::PresenceType::Unsubscribed),
- lang: None,
- show: None,
- status: None,
- priority: None,
- errors: Vec::new(),
- delay: None,
- });
- let result = connection.write_handle().write(presence).await;
- let _ = sender.send(result);
- }
- }
+ let result = handle_unfriend_contact(connection, jid).await;
+ let _ = sender.send(result);
}
Command::DeleteContact(jid, sender) => {
- let iq_id = Uuid::new_v4().to_string();
- let set_stanza = Stanza::Iq(Iq {
- from: Some(connection.jid().clone()),
- id: iq_id.clone(),
- to: None,
- r#type: IqType::Set,
- lang: None,
- query: Some(iq::Query::Roster(stanza::roster::Query {
- ver: None,
- items: vec![stanza::roster::Item {
- approved: None,
- ask: false,
- jid,
- name: None,
- subscription: Some(stanza::roster::Subscription::Remove),
- groups: Vec::new(),
- }],
- })),
- errors: Vec::new(),
- });
- let (send, recv) = oneshot::channel();
- {
- logic.pending().lock().await.insert(iq_id.clone(), send);
- }
- let result = connection.write_handle().write(set_stanza).await;
- if let Err(e) = result {
- sender.send(Err(RosterError::Write(e)));
- return;
- }
- let iq_result = recv.await;
- match iq_result {
- Ok(i) => match i {
- Ok(iq_result) => match iq_result {
- Stanza::Iq(Iq {
- from: _,
- id,
- to: _,
- r#type,
- lang: _,
- query: _,
- errors: _,
- }) if id == iq_id && r#type == IqType::Result => {
- sender.send(Ok(()));
- return;
- }
- ref s @ Stanza::Iq(Iq {
- from: _,
- ref id,
- to: _,
- r#type,
- lang: _,
- query: _,
- ref errors,
- }) if *id == iq_id && r#type == IqType::Error => {
- if let Some(error) = errors.first() {
- sender.send(Err(RosterError::StanzaError(error.clone())));
- } else {
- sender.send(Err(RosterError::UnexpectedStanza(s.clone())));
- }
- return;
- }
- s => {
- sender.send(Err(RosterError::UnexpectedStanza(s)));
- return;
- }
- },
- Err(e) => {
- sender.send(Err(e.into()));
- return;
- }
- },
- Err(e) => {
- sender.send(Err(RosterError::Write(WriteError::Actor(e.into()))));
- return;
- }
- }
+ let result = handle_delete_contact(logic, connection, jid).await;
+ let _ = sender.send(result);
}
Command::UpdateContact(jid, contact_update, sender) => {
- let iq_id = Uuid::new_v4().to_string();
- let groups = Vec::from_iter(
- contact_update
- .groups
- .into_iter()
- .map(|group| stanza::roster::Group(Some(group))),
- );
- let set_stanza = Stanza::Iq(Iq {
- from: Some(connection.jid().clone()),
- id: iq_id.clone(),
- to: None,
- r#type: IqType::Set,
- lang: None,
- query: Some(iq::Query::Roster(stanza::roster::Query {
- ver: None,
- items: vec![stanza::roster::Item {
- approved: None,
- ask: false,
- jid,
- name: contact_update.name,
- subscription: None,
- groups,
- }],
- })),
- errors: Vec::new(),
- });
- let (send, recv) = oneshot::channel();
- {
- logic.pending().lock().await.insert(iq_id.clone(), send);
- }
- let result = connection.write_handle().write(set_stanza).await;
- if let Err(e) = result {
- sender.send(Err(RosterError::Write(e)));
- return;
- }
- let iq_result = recv.await;
- match iq_result {
- Ok(i) => match i {
- Ok(iq_result) => match iq_result {
- Stanza::Iq(Iq {
- from: _,
- id,
- to: _,
- r#type,
- lang: _,
- query: _,
- errors: _,
- }) if id == iq_id && r#type == IqType::Result => {
- sender.send(Ok(()));
- return;
- }
- ref s @ Stanza::Iq(Iq {
- from: _,
- ref id,
- to: _,
- r#type,
- lang: _,
- query: _,
- ref errors,
- }) if *id == iq_id && r#type == IqType::Error => {
- if let Some(error) = errors.first() {
- sender.send(Err(RosterError::StanzaError(error.clone())));
- } else {
- sender.send(Err(RosterError::UnexpectedStanza(s.clone())));
- }
- return;
- }
- s => {
- sender.send(Err(RosterError::UnexpectedStanza(s)));
- return;
- }
- },
- Err(e) => {
- sender.send(Err(e.into()));
- return;
- }
- },
- Err(e) => {
- sender.send(Err(RosterError::Write(WriteError::Actor(e.into()))));
- return;
- }
- }
+ let result = handle_update_contact(logic, connection, jid, contact_update).await;
+ let _ = sender.send(result);
}
Command::SetStatus(online, sender) => {
- let result = logic.db().upsert_cached_status(online.clone()).await;
- if let Err(e) = result {
- logic
- .handle_error(StatusError::Cache(e.into()).into())
- .await;
- }
- let result = connection
- .write_handle()
- .write(Stanza::Presence(online.into_stanza(None)))
- .await
- .map_err(|e| StatusError::Write(e));
- // .map_err(|e| StatusError::Write(e));
+ let result = handle_set_status(logic, connection, online).await;
let _ = sender.send(result);
}
// TODO: offline message queue
Command::SendMessage(jid, body, sender) => {
- let id = Uuid::new_v4();
- let message = Stanza::Message(stanza::client::message::Message {
- from: Some(connection.jid().clone()),
- id: Some(id.to_string()),
- to: Some(jid.clone()),
- // TODO: specify message type
- r#type: stanza::client::message::MessageType::Chat,
- // TODO: lang ?
- lang: None,
- subject: None,
- body: Some(stanza::client::message::Body {
- lang: None,
- body: Some(body.body.clone()),
- }),
- thread: None,
- delay: None,
- });
- let _ = sender.send(Ok(()));
- // let _ = sender.send(Ok(message.clone()));
- let result = connection.write_handle().write(message).await;
- match result {
- Ok(_) => {
- let mut message = Message {
- id,
- from: connection.jid().clone(),
- body,
- timestamp: Utc::now(),
- };
- info!("send message {:?}", message);
- if let Err(e) = logic
- .db()
- .create_message_with_self_resource_and_chat(message.clone(), jid.clone())
- .await
- {
- logic
- .handle_error(MessageSendError::MessageHistory(e.into()).into())
- .await;
- }
- // TODO: don't do this, have separate from from details
- message.from = message.from.as_bare();
- let _ = logic
- .update_sender()
- .send(UpdateMessage::Message { to: jid, message })
- .await;
- }
- Err(_) => {
- // let _ = sender.send(result);
- }
- }
+ let result = handle_send_message(logic, connection, jid, body).await;
+ let _ = sender.send(result);
}
Command::SendPresence(jid, presence, sender) => {
- let mut presence: stanza::client::presence::Presence = presence.into();
- if let Some(jid) = jid {
- presence.to = Some(jid);
- };
- let result = connection
- .write_handle()
- .write(Stanza::Presence(presence))
- .await;
- // .map_err(|e| StatusError::Write(e));
+ let result = handle_send_presence(connection, jid, presence).await;
let _ = sender.send(result);
}
}
+ Ok(())
}