use chrono::Utc;
use lampada::{Connected, WriteMessage, error::WriteError};
use stanza::client::{
Stanza,
iq::{self, Iq, IqType},
};
use tokio::sync::oneshot;
use tracing::{debug, info};
use uuid::Uuid;
use crate::{
Command, UpdateMessage,
chat::Message,
error::{Error, MessageSendError, RosterError, StatusError},
roster::Contact,
};
use super::ClientLogic;
pub async fn handle_online(logic: ClientLogic, command: Command, connection: Connected) {
match command {
Command::GetRoster(result_sender) => {
let iq_id = Uuid::new_v4().to_string();
let (send, iq_recv) = oneshot::channel();
{
logic.pending().lock().await.insert(iq_id.clone(), send);
}
let stanza = Stanza::Iq(Iq {
from: Some(connection.jid().clone()),
id: iq_id.to_string(),
to: None,
r#type: IqType::Get,
lang: None,
query: Some(iq::Query::Roster(stanza::roster::Query {
ver: None,
items: Vec::new(),
})),
errors: Vec::new(),
});
let (send, recv) = oneshot::channel();
let _ = connection
.write_handle()
.send(WriteMessage {
stanza,
respond_to: send,
})
.await;
// TODO: timeout
match recv.await {
Ok(Ok(())) => info!("roster request sent"),
Ok(Err(e)) => {
// TODO: log errors if fail to send
let _ = result_sender.send(Err(RosterError::Write(e.into())));
return;
}
Err(e) => {
let _ =
result_sender.send(Err(RosterError::Write(WriteError::Actor(e.into()))));
return;
}
};
// TODO: timeout
match iq_recv.await {
Ok(Ok(stanza)) => match stanza {
Stanza::Iq(Iq {
from: _,
id,
to: _,
r#type,
lang: _,
query: Some(iq::Query::Roster(stanza::roster::Query { ver: _, items })),
errors: _,
}) if id == iq_id && r#type == IqType::Result => {
let contacts: Vec<Contact> =
items.into_iter().map(|item| item.into()).collect();
if let Err(e) = logic.db().replace_cached_roster(contacts.clone()).await {
logic
.update_sender()
.send(UpdateMessage::Error(Error::Roster(RosterError::Cache(
e.into(),
))))
.await;
};
result_sender.send(Ok(contacts));
return;
}
ref s @ Stanza::Iq(Iq {
from: _,
ref id,
to: _,
r#type,
lang: _,
query: _,
ref errors,
}) if *id == iq_id && r#type == IqType::Error => {
if let Some(error) = errors.first() {
result_sender.send(Err(RosterError::StanzaError(error.clone())));
} else {
result_sender.send(Err(RosterError::UnexpectedStanza(s.clone())));
}
return;
}
s => {
result_sender.send(Err(RosterError::UnexpectedStanza(s)));
return;
}
},
Ok(Err(e)) => {
result_sender.send(Err(RosterError::Read(e)));
return;
}
Err(e) => {
result_sender.send(Err(RosterError::Write(WriteError::Actor(e.into()))));
return;
}
}
}
Command::GetChats(sender) => {
let chats = logic.db().read_chats().await.map_err(|e| e.into());
sender.send(chats);
}
Command::GetChatsOrdered(sender) => {
let chats = logic.db().read_chats_ordered().await.map_err(|e| e.into());
sender.send(chats);
}
Command::GetChatsOrderedWithLatestMessages(sender) => {
let chats = logic
.db()
.read_chats_ordered_with_latest_messages()
.await
.map_err(|e| e.into());
sender.send(chats);
}
Command::GetChat(jid, sender) => {
let chats = logic.db().read_chat(jid).await.map_err(|e| e.into());
sender.send(chats);
}
Command::GetMessages(jid, sender) => {
let messages = logic
.db()
.read_message_history(jid)
.await
.map_err(|e| e.into());
sender.send(messages);
}
Command::DeleteChat(jid, sender) => {
let result = logic.db().delete_chat(jid).await.map_err(|e| e.into());
sender.send(result);
}
Command::DeleteMessage(uuid, sender) => {
let result = logic.db().delete_message(uuid).await.map_err(|e| e.into());
sender.send(result);
}
Command::GetUser(jid, sender) => {
let user = logic.db().read_user(jid).await.map_err(|e| e.into());
sender.send(user);
}
// TODO: offline queue to modify roster
Command::AddContact(jid, sender) => {
let iq_id = Uuid::new_v4().to_string();
let set_stanza = Stanza::Iq(Iq {
from: Some(connection.jid().clone()),
id: iq_id.clone(),
to: None,
r#type: IqType::Set,
lang: None,
query: Some(iq::Query::Roster(stanza::roster::Query {
ver: None,
items: vec![stanza::roster::Item {
approved: None,
ask: false,
jid,
name: None,
subscription: None,
groups: Vec::new(),
}],
})),
errors: Vec::new(),
});
let (send, recv) = oneshot::channel();
{
logic.pending().lock().await.insert(iq_id.clone(), send);
}
// TODO: write_handle send helper function
let result = connection.write_handle().write(set_stanza).await;
if let Err(e) = result {
sender.send(Err(RosterError::Write(e)));
return;
}
let iq_result = recv.await;
match iq_result {
Ok(i) => match i {
Ok(iq_result) => match iq_result {
Stanza::Iq(Iq {
from: _,
id,
to: _,
r#type,
lang: _,
query: _,
errors: _,
}) if id == iq_id && r#type == IqType::Result => {
sender.send(Ok(()));
return;
}
ref s @ Stanza::Iq(Iq {
from: _,
ref id,
to: _,
r#type,
lang: _,
query: _,
ref errors,
}) if *id == iq_id && r#type == IqType::Error => {
if let Some(error) = errors.first() {
sender.send(Err(RosterError::StanzaError(error.clone())));
} else {
sender.send(Err(RosterError::UnexpectedStanza(s.clone())));
}
return;
}
s => {
sender.send(Err(RosterError::UnexpectedStanza(s)));
return;
}
},
Err(e) => {
sender.send(Err(e.into()));
return;
}
},
Err(e) => {
sender.send(Err(RosterError::Write(WriteError::Actor(e.into()))));
return;
}
}
}
Command::BuddyRequest(jid, sender) => {
let presence = Stanza::Presence(stanza::client::presence::Presence {
from: None,
id: None,
to: Some(jid.clone()),
r#type: Some(stanza::client::presence::PresenceType::Subscribe),
lang: None,
show: None,
status: None,
priority: None,
errors: Vec::new(),
delay: None,
});
let result = connection.write_handle().write(presence).await;
match result {
Err(_) => {
let _ = sender.send(result);
}
Ok(()) => {
let presence = Stanza::Presence(stanza::client::presence::Presence {
from: None,
id: None,
to: Some(jid),
r#type: Some(stanza::client::presence::PresenceType::Subscribed),
lang: None,
show: None,
status: None,
priority: None,
errors: Vec::new(),
delay: None,
});
let result = connection.write_handle().write(presence).await;
let _ = sender.send(result);
}
}
}
Command::SubscriptionRequest(jid, sender) => {
// TODO: i should probably have builders
let presence = Stanza::Presence(stanza::client::presence::Presence {
from: None,
id: None,
to: Some(jid),
r#type: Some(stanza::client::presence::PresenceType::Subscribe),
lang: None,
show: None,
status: None,
priority: None,
errors: Vec::new(),
delay: None,
});
let result = connection.write_handle().write(presence).await;
let _ = sender.send(result);
}
Command::AcceptBuddyRequest(jid, sender) => {
let presence = Stanza::Presence(stanza::client::presence::Presence {
from: None,
id: None,
to: Some(jid.clone()),
r#type: Some(stanza::client::presence::PresenceType::Subscribed),
lang: None,
show: None,
status: None,
priority: None,
errors: Vec::new(),
delay: None,
});
let result = connection.write_handle().write(presence).await;
match result {
Err(_) => {
let _ = sender.send(result);
}
Ok(()) => {
let presence = Stanza::Presence(stanza::client::presence::Presence {
from: None,
id: None,
to: Some(jid),
r#type: Some(stanza::client::presence::PresenceType::Subscribe),
lang: None,
show: None,
status: None,
priority: None,
errors: Vec::new(),
delay: None,
});
let result = connection.write_handle().write(presence).await;
let _ = sender.send(result);
}
}
}
Command::AcceptSubscriptionRequest(jid, sender) => {
let presence = Stanza::Presence(stanza::client::presence::Presence {
from: None,
id: None,
to: Some(jid),
r#type: Some(stanza::client::presence::PresenceType::Subscribe),
lang: None,
show: None,
status: None,
priority: None,
errors: Vec::new(),
delay: None,
});
let result = connection.write_handle().write(presence).await;
let _ = sender.send(result);
}
Command::UnsubscribeFromContact(jid, sender) => {
let presence = Stanza::Presence(stanza::client::presence::Presence {
from: None,
id: None,
to: Some(jid),
r#type: Some(stanza::client::presence::PresenceType::Unsubscribe),
lang: None,
show: None,
status: None,
priority: None,
errors: Vec::new(),
delay: None,
});
let result = connection.write_handle().write(presence).await;
let _ = sender.send(result);
}
Command::UnsubscribeContact(jid, sender) => {
let presence = Stanza::Presence(stanza::client::presence::Presence {
from: None,
id: None,
to: Some(jid),
r#type: Some(stanza::client::presence::PresenceType::Unsubscribed),
lang: None,
show: None,
status: None,
priority: None,
errors: Vec::new(),
delay: None,
});
let result = connection.write_handle().write(presence).await;
let _ = sender.send(result);
}
Command::UnfriendContact(jid, sender) => {
let presence = Stanza::Presence(stanza::client::presence::Presence {
from: None,
id: None,
to: Some(jid.clone()),
r#type: Some(stanza::client::presence::PresenceType::Unsubscribe),
lang: None,
show: None,
status: None,
priority: None,
errors: Vec::new(),
delay: None,
});
let result = connection.write_handle().write(presence).await;
match result {
Err(_) => {
let _ = sender.send(result);
}
Ok(()) => {
let presence = Stanza::Presence(stanza::client::presence::Presence {
from: None,
id: None,
to: Some(jid),
r#type: Some(stanza::client::presence::PresenceType::Unsubscribed),
lang: None,
show: None,
status: None,
priority: None,
errors: Vec::new(),
delay: None,
});
let result = connection.write_handle().write(presence).await;
let _ = sender.send(result);
}
}
}
Command::DeleteContact(jid, sender) => {
let iq_id = Uuid::new_v4().to_string();
let set_stanza = Stanza::Iq(Iq {
from: Some(connection.jid().clone()),
id: iq_id.clone(),
to: None,
r#type: IqType::Set,
lang: None,
query: Some(iq::Query::Roster(stanza::roster::Query {
ver: None,
items: vec![stanza::roster::Item {
approved: None,
ask: false,
jid,
name: None,
subscription: Some(stanza::roster::Subscription::Remove),
groups: Vec::new(),
}],
})),
errors: Vec::new(),
});
let (send, recv) = oneshot::channel();
{
logic.pending().lock().await.insert(iq_id.clone(), send);
}
let result = connection.write_handle().write(set_stanza).await;
if let Err(e) = result {
sender.send(Err(RosterError::Write(e)));
return;
}
let iq_result = recv.await;
match iq_result {
Ok(i) => match i {
Ok(iq_result) => match iq_result {
Stanza::Iq(Iq {
from: _,
id,
to: _,
r#type,
lang: _,
query: _,
errors: _,
}) if id == iq_id && r#type == IqType::Result => {
sender.send(Ok(()));
return;
}
ref s @ Stanza::Iq(Iq {
from: _,
ref id,
to: _,
r#type,
lang: _,
query: _,
ref errors,
}) if *id == iq_id && r#type == IqType::Error => {
if let Some(error) = errors.first() {
sender.send(Err(RosterError::StanzaError(error.clone())));
} else {
sender.send(Err(RosterError::UnexpectedStanza(s.clone())));
}
return;
}
s => {
sender.send(Err(RosterError::UnexpectedStanza(s)));
return;
}
},
Err(e) => {
sender.send(Err(e.into()));
return;
}
},
Err(e) => {
sender.send(Err(RosterError::Write(WriteError::Actor(e.into()))));
return;
}
}
}
Command::UpdateContact(jid, contact_update, sender) => {
let iq_id = Uuid::new_v4().to_string();
let groups = Vec::from_iter(
contact_update
.groups
.into_iter()
.map(|group| stanza::roster::Group(Some(group))),
);
let set_stanza = Stanza::Iq(Iq {
from: Some(connection.jid().clone()),
id: iq_id.clone(),
to: None,
r#type: IqType::Set,
lang: None,
query: Some(iq::Query::Roster(stanza::roster::Query {
ver: None,
items: vec![stanza::roster::Item {
approved: None,
ask: false,
jid,
name: contact_update.name,
subscription: None,
groups,
}],
})),
errors: Vec::new(),
});
let (send, recv) = oneshot::channel();
{
logic.pending().lock().await.insert(iq_id.clone(), send);
}
let result = connection.write_handle().write(set_stanza).await;
if let Err(e) = result {
sender.send(Err(RosterError::Write(e)));
return;
}
let iq_result = recv.await;
match iq_result {
Ok(i) => match i {
Ok(iq_result) => match iq_result {
Stanza::Iq(Iq {
from: _,
id,
to: _,
r#type,
lang: _,
query: _,
errors: _,
}) if id == iq_id && r#type == IqType::Result => {
sender.send(Ok(()));
return;
}
ref s @ Stanza::Iq(Iq {
from: _,
ref id,
to: _,
r#type,
lang: _,
query: _,
ref errors,
}) if *id == iq_id && r#type == IqType::Error => {
if let Some(error) = errors.first() {
sender.send(Err(RosterError::StanzaError(error.clone())));
} else {
sender.send(Err(RosterError::UnexpectedStanza(s.clone())));
}
return;
}
s => {
sender.send(Err(RosterError::UnexpectedStanza(s)));
return;
}
},
Err(e) => {
sender.send(Err(e.into()));
return;
}
},
Err(e) => {
sender.send(Err(RosterError::Write(WriteError::Actor(e.into()))));
return;
}
}
}
Command::SetStatus(online, sender) => {
let result = logic.db().upsert_cached_status(online.clone()).await;
if let Err(e) = result {
let _ = logic
.update_sender()
.send(UpdateMessage::Error(Error::SetStatus(StatusError::Cache(
e.into(),
))))
.await;
}
let result = connection
.write_handle()
.write(Stanza::Presence(online.into_stanza(None)))
.await
.map_err(|e| StatusError::Write(e));
// .map_err(|e| StatusError::Write(e));
let _ = sender.send(result);
}
// TODO: offline message queue
Command::SendMessage(jid, body, sender) => {
let id = Uuid::new_v4();
let message = Stanza::Message(stanza::client::message::Message {
from: Some(connection.jid().clone()),
id: Some(id.to_string()),
to: Some(jid.clone()),
// TODO: specify message type
r#type: stanza::client::message::MessageType::Chat,
// TODO: lang ?
lang: None,
subject: None,
body: Some(stanza::client::message::Body {
lang: None,
body: Some(body.body.clone()),
}),
thread: None,
delay: None,
});
let _ = sender.send(Ok(()));
// let _ = sender.send(Ok(message.clone()));
let result = connection.write_handle().write(message).await;
match result {
Ok(_) => {
let mut message = Message {
id,
from: connection.jid().clone(),
body,
timestamp: Utc::now(),
};
info!("send message {:?}", message);
if let Err(e) = logic
.db()
.create_message_with_self_resource_and_chat(message.clone(), jid.clone())
.await
.map_err(|e| e.into())
{
tracing::error!("{}", e);
let _ =
logic
.update_sender()
.send(UpdateMessage::Error(Error::MessageSend(
MessageSendError::MessageHistory(e),
)));
}
// TODO: don't do this, have separate from from details
message.from = message.from.as_bare();
let _ = logic
.update_sender()
.send(UpdateMessage::Message { to: jid, message })
.await;
}
Err(_) => {
// let _ = sender.send(result);
}
}
}
Command::SendPresence(jid, presence, sender) => {
let mut presence: stanza::client::presence::Presence = presence.into();
if let Some(jid) = jid {
presence.to = Some(jid);
};
let result = connection
.write_handle()
.write(Stanza::Presence(presence))
.await;
// .map_err(|e| StatusError::Write(e));
let _ = sender.send(result);
}
}
}