use chrono::Utc;
use jid::JID;
use lampada::{Connected, WriteMessage, error::WriteError};
use stanza::{
client::{
Stanza,
iq::{self, Iq, IqType, Query},
},
xep_0030::{info, items},
xep_0203::Delay,
};
use tokio::sync::oneshot;
use tracing::{debug, error, info};
use uuid::Uuid;
use crate::{
Command, UpdateMessage,
chat::{Body, Message},
disco::{Info, Items},
error::{DatabaseError, DiscoError, Error, MessageSendError, RosterError, StatusError},
presence::{Online, Presence, PresenceType},
roster::{Contact, ContactUpdate},
};
use super::{
ClientLogic,
local_only::{
handle_delete_chat, handle_delete_messaage, handle_get_chat, handle_get_chats,
handle_get_chats_ordered, handle_get_chats_ordered_with_latest_messages,
handle_get_messages, handle_get_user,
},
};
pub async fn handle_online(logic: ClientLogic, command: Command, connection: Connected) {
let result = handle_online_result(&logic, command, connection).await;
match result {
Ok(_) => {}
Err(e) => logic.handle_error(e).await,
}
}
pub async fn handle_get_roster(
logic: &ClientLogic,
connection: Connected,
) -> Result<Vec<Contact>, RosterError> {
let iq_id = Uuid::new_v4().to_string();
let stanza = Stanza::Iq(Iq {
from: Some(connection.jid().clone()),
id: iq_id.to_string(),
to: None,
r#type: IqType::Get,
lang: None,
query: Some(iq::Query::Roster(stanza::roster::Query {
ver: None,
items: Vec::new(),
})),
errors: Vec::new(),
});
let response = logic
.pending()
.request(&connection, stanza, iq_id.clone())
.await?;
// TODO: timeout
match response {
Stanza::Iq(Iq {
from: _,
id,
to: _,
r#type,
lang: _,
query: Some(iq::Query::Roster(stanza::roster::Query { ver: _, items })),
errors: _,
}) if id == iq_id && r#type == IqType::Result => {
let contacts: Vec<Contact> = items.into_iter().map(|item| item.into()).collect();
if let Err(e) = logic.db().replace_cached_roster(contacts.clone()).await {
logic
.handle_error(Error::Roster(RosterError::Cache(e.into())))
.await;
};
Ok(contacts)
}
ref s @ Stanza::Iq(Iq {
from: _,
ref id,
to: _,
r#type,
lang: _,
query: _,
ref errors,
}) if *id == iq_id && r#type == IqType::Error => {
if let Some(error) = errors.first() {
Err(RosterError::StanzaError(error.clone()))
} else {
Err(RosterError::UnexpectedStanza(s.clone()))
}
}
s => Err(RosterError::UnexpectedStanza(s)),
}
}
pub async fn handle_add_contact(
logic: &ClientLogic,
connection: Connected,
jid: JID,
) -> Result<(), RosterError> {
let iq_id = Uuid::new_v4().to_string();
let set_stanza = Stanza::Iq(Iq {
from: Some(connection.jid().clone()),
id: iq_id.clone(),
to: None,
r#type: IqType::Set,
lang: None,
query: Some(iq::Query::Roster(stanza::roster::Query {
ver: None,
items: vec![stanza::roster::Item {
approved: None,
ask: false,
jid,
name: None,
subscription: None,
groups: Vec::new(),
}],
})),
errors: Vec::new(),
});
let response = logic
.pending()
.request(&connection, set_stanza, iq_id.clone())
.await?;
match response {
Stanza::Iq(Iq {
from: _,
id,
to: _,
r#type,
lang: _,
query: _,
errors: _,
}) if id == iq_id && r#type == IqType::Result => Ok(()),
ref s @ Stanza::Iq(Iq {
from: _,
ref id,
to: _,
r#type,
lang: _,
query: _,
ref errors,
}) if *id == iq_id && r#type == IqType::Error => {
if let Some(error) = errors.first() {
Err(RosterError::StanzaError(error.clone()))
} else {
Err(RosterError::UnexpectedStanza(s.clone()))
}
}
s => Err(RosterError::UnexpectedStanza(s)),
}
}
pub async fn handle_buddy_request(connection: Connected, jid: JID) -> Result<(), WriteError> {
let presence = Stanza::Presence(stanza::client::presence::Presence {
from: None,
id: None,
to: Some(jid.clone()),
r#type: Some(stanza::client::presence::PresenceType::Subscribe),
lang: None,
show: None,
status: None,
priority: None,
errors: Vec::new(),
delay: None,
});
connection.write_handle().write(presence).await?;
let presence = Stanza::Presence(stanza::client::presence::Presence {
from: None,
id: None,
to: Some(jid),
r#type: Some(stanza::client::presence::PresenceType::Subscribed),
lang: None,
show: None,
status: None,
priority: None,
errors: Vec::new(),
delay: None,
});
connection.write_handle().write(presence).await?;
Ok(())
}
pub async fn handle_subscription_request(
connection: Connected,
jid: JID,
) -> Result<(), WriteError> {
// TODO: i should probably have builders
let presence = Stanza::Presence(stanza::client::presence::Presence {
from: None,
id: None,
to: Some(jid),
r#type: Some(stanza::client::presence::PresenceType::Subscribe),
lang: None,
show: None,
status: None,
priority: None,
errors: Vec::new(),
delay: None,
});
connection.write_handle().write(presence).await?;
Ok(())
}
pub async fn handle_accept_buddy_request(
connection: Connected,
jid: JID,
) -> Result<(), WriteError> {
let presence = Stanza::Presence(stanza::client::presence::Presence {
from: None,
id: None,
to: Some(jid.clone()),
r#type: Some(stanza::client::presence::PresenceType::Subscribed),
lang: None,
show: None,
status: None,
priority: None,
errors: Vec::new(),
delay: None,
});
connection.write_handle().write(presence).await?;
let presence = Stanza::Presence(stanza::client::presence::Presence {
from: None,
id: None,
to: Some(jid),
r#type: Some(stanza::client::presence::PresenceType::Subscribe),
lang: None,
show: None,
status: None,
priority: None,
errors: Vec::new(),
delay: None,
});
connection.write_handle().write(presence).await?;
Ok(())
}
pub async fn handle_accept_subscription_request(
connection: Connected,
jid: JID,
) -> Result<(), WriteError> {
let presence = Stanza::Presence(stanza::client::presence::Presence {
from: None,
id: None,
to: Some(jid),
r#type: Some(stanza::client::presence::PresenceType::Subscribe),
lang: None,
show: None,
status: None,
priority: None,
errors: Vec::new(),
delay: None,
});
connection.write_handle().write(presence).await?;
Ok(())
}
pub async fn handle_unsubscribe_from_contact(
connection: Connected,
jid: JID,
) -> Result<(), WriteError> {
let presence = Stanza::Presence(stanza::client::presence::Presence {
from: None,
id: None,
to: Some(jid),
r#type: Some(stanza::client::presence::PresenceType::Unsubscribe),
lang: None,
show: None,
status: None,
priority: None,
errors: Vec::new(),
delay: None,
});
connection.write_handle().write(presence).await?;
Ok(())
}
pub async fn handle_unsubscribe_contact(connection: Connected, jid: JID) -> Result<(), WriteError> {
let presence = Stanza::Presence(stanza::client::presence::Presence {
from: None,
id: None,
to: Some(jid),
r#type: Some(stanza::client::presence::PresenceType::Unsubscribed),
lang: None,
show: None,
status: None,
priority: None,
errors: Vec::new(),
delay: None,
});
connection.write_handle().write(presence).await?;
Ok(())
}
pub async fn handle_unfriend_contact(connection: Connected, jid: JID) -> Result<(), WriteError> {
let presence = Stanza::Presence(stanza::client::presence::Presence {
from: None,
id: None,
to: Some(jid.clone()),
r#type: Some(stanza::client::presence::PresenceType::Unsubscribe),
lang: None,
show: None,
status: None,
priority: None,
errors: Vec::new(),
delay: None,
});
connection.write_handle().write(presence).await?;
let presence = Stanza::Presence(stanza::client::presence::Presence {
from: None,
id: None,
to: Some(jid),
r#type: Some(stanza::client::presence::PresenceType::Unsubscribed),
lang: None,
show: None,
status: None,
priority: None,
errors: Vec::new(),
delay: None,
});
connection.write_handle().write(presence).await?;
Ok(())
}
pub async fn handle_delete_contact(
logic: &ClientLogic,
connection: Connected,
jid: JID,
) -> Result<(), RosterError> {
let iq_id = Uuid::new_v4().to_string();
let set_stanza = Stanza::Iq(Iq {
from: Some(connection.jid().clone()),
id: iq_id.clone(),
to: None,
r#type: IqType::Set,
lang: None,
query: Some(iq::Query::Roster(stanza::roster::Query {
ver: None,
items: vec![stanza::roster::Item {
approved: None,
ask: false,
jid,
name: None,
subscription: Some(stanza::roster::Subscription::Remove),
groups: Vec::new(),
}],
})),
errors: Vec::new(),
});
let result = logic
.pending()
.request(&connection, set_stanza, iq_id.clone())
.await?;
match result {
Stanza::Iq(Iq {
from: _,
id,
to: _,
r#type,
lang: _,
query: _,
errors: _,
// don't really need to check matching id as request() does this anyway
}) if id == iq_id && r#type == IqType::Result => Ok(()),
ref s @ Stanza::Iq(Iq {
from: _,
ref id,
to: _,
r#type,
lang: _,
query: _,
ref errors,
}) if *id == iq_id && r#type == IqType::Error => {
if let Some(error) = errors.first() {
Err(RosterError::StanzaError(error.clone()))
} else {
Err(RosterError::UnexpectedStanza(s.clone()))
}
}
s => Err(RosterError::UnexpectedStanza(s)),
}
}
pub async fn handle_update_contact(
logic: &ClientLogic,
connection: Connected,
jid: JID,
contact_update: ContactUpdate,
) -> Result<(), RosterError> {
let iq_id = Uuid::new_v4().to_string();
let groups = Vec::from_iter(
contact_update
.groups
.into_iter()
.map(|group| stanza::roster::Group(Some(group))),
);
let set_stanza = Stanza::Iq(Iq {
from: Some(connection.jid().clone()),
id: iq_id.clone(),
to: None,
r#type: IqType::Set,
lang: None,
query: Some(iq::Query::Roster(stanza::roster::Query {
ver: None,
items: vec![stanza::roster::Item {
approved: None,
ask: false,
jid,
name: contact_update.name,
subscription: None,
groups,
}],
})),
errors: Vec::new(),
});
let response = logic
.pending()
.request(&connection, set_stanza, iq_id.clone())
.await?;
match response {
Stanza::Iq(Iq {
from: _,
id,
to: _,
r#type,
lang: _,
query: _,
errors: _,
}) if id == iq_id && r#type == IqType::Result => Ok(()),
ref s @ Stanza::Iq(Iq {
from: _,
ref id,
to: _,
r#type,
lang: _,
query: _,
ref errors,
}) if *id == iq_id && r#type == IqType::Error => {
if let Some(error) = errors.first() {
Err(RosterError::StanzaError(error.clone()))
} else {
Err(RosterError::UnexpectedStanza(s.clone()))
}
}
s => Err(RosterError::UnexpectedStanza(s)),
}
}
pub async fn handle_set_status(
logic: &ClientLogic,
connection: Connected,
online: Online,
) -> Result<(), StatusError> {
logic.db().upsert_cached_status(online.clone()).await?;
connection
.write_handle()
.write(Stanza::Presence(online.into_stanza(None)))
.await?;
Ok(())
}
pub async fn handle_send_message(
logic: &ClientLogic,
connection: Connected,
jid: JID,
body: Body,
) -> Result<(), WriteError> {
let id = Uuid::new_v4();
let timestamp = Utc::now();
let message = Stanza::Message(stanza::client::message::Message {
from: Some(connection.jid().clone()),
id: Some(id.to_string()),
to: Some(jid.clone()),
// TODO: specify message type
r#type: stanza::client::message::MessageType::Chat,
// TODO: lang ?
lang: None,
subject: None,
body: Some(stanza::client::message::Body {
lang: None,
body: Some(body.body.clone()),
}),
thread: None,
// include delay to have a consistent timestamp between server and client
delay: Some(Delay {
from: None,
stamp: timestamp,
}),
});
connection.write_handle().write(message).await?;
let mut message = Message {
id,
from: connection.jid().clone(),
body,
timestamp,
};
info!("sent message: {:?}", message);
if let Err(e) = logic
.db()
.create_message_with_self_resource_and_chat(message.clone(), jid.clone())
.await
{
// TODO: should these really be handle_error or just the error macro?
logic
.handle_error(MessageSendError::MessageHistory(e.into()).into())
.await;
}
// TODO: don't do this, have separate from from details
message.from = message.from.as_bare();
let _ = logic
.update_sender()
.send(UpdateMessage::Message { to: jid, message })
.await;
Ok(())
// TODO: refactor this to send a sending updatemessage, then update or something like that
}
pub async fn handle_send_presence(
connection: Connected,
jid: Option<JID>,
presence: PresenceType,
) -> Result<(), WriteError> {
let mut presence: stanza::client::presence::Presence = presence.into();
presence.to = jid;
connection
.write_handle()
.write(Stanza::Presence(presence))
.await?;
Ok(())
}
pub async fn handle_disco_info(
logic: &ClientLogic,
connection: Connected,
jid: Option<JID>,
node: Option<String>,
) -> Result<Info, DiscoError> {
let id = Uuid::new_v4().to_string();
let request = Iq {
from: Some(connection.jid().clone()),
id: id.clone(),
to: jid.clone(),
r#type: IqType::Get,
lang: None,
query: Some(Query::DiscoInfo(info::Query {
node,
features: Vec::new(),
identities: Vec::new(),
})),
errors: Vec::new(),
};
match logic
.pending()
.request(&connection, Stanza::Iq(request), id)
.await?
{
Stanza::Iq(Iq {
from,
r#type,
query,
mut errors,
..
// TODO: maybe abstract a bunch of these different errors related to iqs into an iq error thing? as in like call iq.result(), get the query from inside, error otherwise.
}) if r#type == IqType::Result || r#type == IqType::Error => {
if from == jid || {
if jid == None {
from == Some(connection.jid().as_bare())
} else {
false
}
} {
match r#type {
IqType::Result => {
if let Some(query) = query {
match query {
Query::DiscoInfo(info) => Ok(info.into()),
q => Err(DiscoError::MismatchedQuery(q)),
}
} else {
Err(DiscoError::MissingQuery)
}
}
IqType::Error => {
if let Some(error) = errors.pop() {
Err(error.into())
} else {
Err(DiscoError::MissingError)
}
}
_ => unreachable!(),
}
} else {
Err(DiscoError::IncorrectEntity(
from.unwrap_or_else(|| connection.jid().as_bare()),
))
}
}
s => Err(DiscoError::UnexpectedStanza(s)),
}
}
pub async fn handle_disco_items(
logic: &ClientLogic,
connection: Connected,
jid: Option<JID>,
node: Option<String>,
) -> Result<Items, DiscoError> {
let id = Uuid::new_v4().to_string();
let request = Iq {
from: Some(connection.jid().clone()),
id: id.clone(),
to: jid.clone(),
r#type: IqType::Get,
lang: None,
query: Some(Query::DiscoItems(items::Query {
node,
items: Vec::new(),
})),
errors: Vec::new(),
};
match logic
.pending()
.request(&connection, Stanza::Iq(request), id)
.await?
{
Stanza::Iq(Iq {
from,
r#type,
query,
mut errors,
..
// TODO: maybe abstract a bunch of these different errors related to iqs into an iq error thing? as in like call iq.result(), get the query from inside, error otherwise.
}) if r#type == IqType::Result || r#type == IqType::Error => {
if from == jid || {
if jid == None {
from == Some(connection.jid().as_bare())
} else {
false
}
} {
match r#type {
IqType::Result => {
if let Some(query) = query {
match query {
Query::DiscoItems(items) => Ok(items.into()),
q => Err(DiscoError::MismatchedQuery(q)),
}
} else {
Err(DiscoError::MissingQuery)
}
}
IqType::Error => {
if let Some(error) = errors.pop() {
Err(error.into())
} else {
Err(DiscoError::MissingError)
}
}
_ => unreachable!(),
}
} else {
Err(DiscoError::IncorrectEntity(
from.unwrap_or_else(|| connection.jid().as_bare()),
))
}
}
s => Err(DiscoError::UnexpectedStanza(s)),
}
}
// TODO: could probably macro-ise?
pub async fn handle_online_result(
logic: &ClientLogic,
command: Command,
connection: Connected,
) -> Result<(), Error> {
match command {
Command::GetRoster(result_sender) => {
let roster = handle_get_roster(logic, connection).await;
let _ = result_sender.send(roster);
}
Command::GetChats(sender) => {
let chats = handle_get_chats(logic).await;
let _ = sender.send(chats);
}
Command::GetChatsOrdered(sender) => {
let chats = handle_get_chats_ordered(logic).await;
let _ = sender.send(chats);
}
Command::GetChatsOrderedWithLatestMessages(sender) => {
let chats = handle_get_chats_ordered_with_latest_messages(logic).await;
let _ = sender.send(chats);
}
Command::GetChat(jid, sender) => {
let chat = handle_get_chat(logic, jid).await;
let _ = sender.send(chat);
}
Command::GetMessages(jid, sender) => {
let messages = handle_get_messages(logic, jid).await;
let _ = sender.send(messages);
}
Command::DeleteChat(jid, sender) => {
let result = handle_delete_chat(logic, jid).await;
let _ = sender.send(result);
}
Command::DeleteMessage(uuid, sender) => {
let result = handle_delete_messaage(logic, uuid).await;
let _ = sender.send(result);
}
Command::GetUser(jid, sender) => {
let user = handle_get_user(logic, jid).await;
let _ = sender.send(user);
}
// TODO: offline queue to modify roster
Command::AddContact(jid, sender) => {
let result = handle_add_contact(logic, connection, jid).await;
let _ = sender.send(result);
}
Command::BuddyRequest(jid, sender) => {
let result = handle_buddy_request(connection, jid).await;
let _ = sender.send(result);
}
Command::SubscriptionRequest(jid, sender) => {
let result = handle_subscription_request(connection, jid).await;
let _ = sender.send(result);
}
Command::AcceptBuddyRequest(jid, sender) => {
let result = handle_accept_buddy_request(connection, jid).await;
let _ = sender.send(result);
}
Command::AcceptSubscriptionRequest(jid, sender) => {
let result = handle_accept_subscription_request(connection, jid).await;
let _ = sender.send(result);
}
Command::UnsubscribeFromContact(jid, sender) => {
let result = handle_unsubscribe_from_contact(connection, jid).await;
let _ = sender.send(result);
}
Command::UnsubscribeContact(jid, sender) => {
let result = handle_unsubscribe_contact(connection, jid).await;
let _ = sender.send(result);
}
Command::UnfriendContact(jid, sender) => {
let result = handle_unfriend_contact(connection, jid).await;
let _ = sender.send(result);
}
Command::DeleteContact(jid, sender) => {
let result = handle_delete_contact(logic, connection, jid).await;
let _ = sender.send(result);
}
Command::UpdateContact(jid, contact_update, sender) => {
let result = handle_update_contact(logic, connection, jid, contact_update).await;
let _ = sender.send(result);
}
Command::SetStatus(online, sender) => {
let result = handle_set_status(logic, connection, online).await;
let _ = sender.send(result);
}
// TODO: offline message queue
Command::SendMessage(jid, body, sender) => {
let result = handle_send_message(logic, connection, jid, body).await;
let _ = sender.send(result);
}
Command::SendPresence(jid, presence, sender) => {
let result = handle_send_presence(connection, jid, presence).await;
let _ = sender.send(result);
}
Command::DiscoInfo(jid, node, sender) => {
let result = handle_disco_info(logic, connection, jid, node).await;
let _ = sender.send(result);
}
Command::DiscoItems(jid, node, sender) => {
let result = handle_disco_items(logic, connection, jid, node).await;
let _ = sender.send(result);
}
}
Ok(())
}