use std::{
collections::HashMap,
ops::{Deref, DerefMut},
str::FromStr,
sync::Arc,
time::Duration,
};
use chat::{Body, Chat, Message};
use chrono::Utc;
use db::Db;
use error::{
ConnectionJobError, DatabaseError, Error, IqError, MessageRecvError, PresenceError,
RosterError, StatusError,
};
use futures::FutureExt;
use jid::JID;
use lampada::{
Connected, CoreClient, CoreClientCommand, Logic, SupervisorSender, WriteMessage,
error::{ActorError, CommandError, ConnectionError, ReadError, WriteError},
};
use presence::{Offline, Online, Presence, PresenceType, Show};
use roster::{Contact, ContactUpdate};
use stanza::client::{
Stanza,
iq::{self, Iq, IqType},
};
use tokio::{
sync::{Mutex, mpsc, oneshot},
time::timeout,
};
use tracing::{debug, info};
use user::User;
use uuid::Uuid;
pub mod chat;
pub mod db;
pub mod error;
pub mod presence;
pub mod roster;
pub mod user;
pub enum Command {
/// get the roster. if offline, retreive cached version from database. should be stored in application memory
GetRoster(oneshot::Sender<Result<Vec<Contact>, RosterError>>),
/// get all chats. chat will include 10 messages in their message Vec (enough for chat previews)
// TODO: paging and filtering
GetChats(oneshot::Sender<Result<Vec<Chat>, DatabaseError>>),
// TODO: paging and filtering
GetChatsOrdered(oneshot::Sender<Result<Vec<Chat>, DatabaseError>>),
// TODO: paging and filtering
GetChatsOrderedWithLatestMessages(oneshot::Sender<Result<Vec<(Chat, Message)>, DatabaseError>>),
/// get a specific chat by jid
GetChat(JID, oneshot::Sender<Result<Chat, DatabaseError>>),
/// get message history for chat (does appropriate mam things)
// TODO: paging and filtering
GetMessages(JID, oneshot::Sender<Result<Vec<Message>, DatabaseError>>),
/// delete a chat from your chat history, along with all the corresponding messages
DeleteChat(JID, oneshot::Sender<Result<(), DatabaseError>>),
/// delete a message from your chat history
DeleteMessage(Uuid, oneshot::Sender<Result<(), DatabaseError>>),
/// get a user from your users database
GetUser(JID, oneshot::Sender<Result<User, DatabaseError>>),
/// add a contact to your roster, with a status of none, no subscriptions.
AddContact(JID, oneshot::Sender<Result<(), RosterError>>),
/// send a friend request i.e. a subscription request with a subscription pre-approval. if not already added to roster server adds to roster.
BuddyRequest(JID, oneshot::Sender<Result<(), WriteError>>),
/// send a subscription request, without pre-approval. if not already added to roster server adds to roster.
SubscriptionRequest(JID, oneshot::Sender<Result<(), WriteError>>),
/// accept a friend request by accepting a pending subscription and sending a subscription request back. if not already added to roster adds to roster.
AcceptBuddyRequest(JID, oneshot::Sender<Result<(), WriteError>>),
/// accept a pending subscription and doesn't send a subscription request back. if not already added to roster adds to roster.
AcceptSubscriptionRequest(JID, oneshot::Sender<Result<(), WriteError>>),
/// unsubscribe to a contact, but don't remove their subscription.
UnsubscribeFromContact(JID, oneshot::Sender<Result<(), WriteError>>),
/// stop a contact from being subscribed, but stay subscribed to the contact.
UnsubscribeContact(JID, oneshot::Sender<Result<(), WriteError>>),
/// remove subscriptions to and from contact, but keep in roster.
UnfriendContact(JID, oneshot::Sender<Result<(), WriteError>>),
/// remove a contact from the contact list. will remove subscriptions if not already done then delete contact from roster.
DeleteContact(JID, oneshot::Sender<Result<(), RosterError>>),
/// update contact. contact details will be overwritten with the contents of the contactupdate struct.
UpdateContact(JID, ContactUpdate, oneshot::Sender<Result<(), RosterError>>),
/// set online status. if disconnected, will be cached so when client connects, will be sent as the initial presence.
SetStatus(Online, oneshot::Sender<Result<(), StatusError>>),
/// send presence stanza
// TODO: cache presence stanza
SendPresence(
Option<JID>,
PresenceType,
oneshot::Sender<Result<(), WriteError>>,
),
/// send a directed presence (usually to a non-contact).
// TODO: should probably make it so people can add non-contact auto presence sharing in the client (most likely through setting an internal setting)
/// send a message to a jid (any kind of jid that can receive a message, e.g. a user or a
/// chatroom). if disconnected, will be cached so when client connects, message will be sent.
SendMessage(JID, Body, oneshot::Sender<Result<(), WriteError>>),
}
/// an xmpp client that is suited for a chat client use case
#[derive(Debug)]
pub struct Client {
sender: mpsc::Sender<CoreClientCommand<Command>>,
timeout: Duration,
}
impl Clone for Client {
fn clone(&self) -> Self {
Self {
sender: self.sender.clone(),
timeout: self.timeout,
}
}
}
impl Deref for Client {
type Target = mpsc::Sender<CoreClientCommand<Command>>;
fn deref(&self) -> &Self::Target {
&self.sender
}
}
impl DerefMut for Client {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.sender
}
}
impl Client {
pub async fn connect(&self) -> Result<(), ActorError> {
self.send(CoreClientCommand::Connect).await?;
Ok(())
}
pub async fn disconnect(&self, offline: Offline) -> Result<(), ActorError> {
self.send(CoreClientCommand::Disconnect).await?;
Ok(())
}
pub fn new(jid: JID, password: String, db: Db) -> (Self, mpsc::Receiver<UpdateMessage>) {
let (command_sender, command_receiver) = mpsc::channel(20);
let (update_send, update_recv) = mpsc::channel(20);
// might be bad, first supervisor shutdown notification oneshot is never used (disgusting)
let (_sup_send, sup_recv) = oneshot::channel();
let sup_recv = sup_recv.fuse();
let logic = ClientLogic {
db,
pending: Arc::new(Mutex::new(HashMap::new())),
update_sender: update_send,
};
let actor: CoreClient<ClientLogic> =
CoreClient::new(jid, password, command_receiver, None, sup_recv, logic);
tokio::spawn(async move { actor.run().await });
(
Self {
sender: command_sender,
// TODO: configure timeout
timeout: Duration::from_secs(10),
},
update_recv,
)
}
pub async fn get_roster(&self) -> Result<Vec<Contact>, CommandError<RosterError>> {
let (send, recv) = oneshot::channel();
self.send(CoreClientCommand::Command(Command::GetRoster(send)))
.await
.map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
let roster = timeout(self.timeout, recv)
.await
.map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
.map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
Ok(roster)
}
pub async fn get_chats(&self) -> Result<Vec<Chat>, CommandError<DatabaseError>> {
let (send, recv) = oneshot::channel();
self.send(CoreClientCommand::Command(Command::GetChats(send)))
.await
.map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
let chats = timeout(self.timeout, recv)
.await
.map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
.map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
Ok(chats)
}
pub async fn get_chats_ordered(&self) -> Result<Vec<Chat>, CommandError<DatabaseError>> {
let (send, recv) = oneshot::channel();
self.send(CoreClientCommand::Command(Command::GetChatsOrdered(send)))
.await
.map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
let chats = timeout(self.timeout, recv)
.await
.map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
.map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
Ok(chats)
}
pub async fn get_chats_ordered_with_latest_messages(
&self,
) -> Result<Vec<(Chat, Message)>, CommandError<DatabaseError>> {
let (send, recv) = oneshot::channel();
self.send(CoreClientCommand::Command(
Command::GetChatsOrderedWithLatestMessages(send),
))
.await
.map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
let chats = timeout(self.timeout, recv)
.await
.map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
.map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
Ok(chats)
}
pub async fn get_chat(&self, jid: JID) -> Result<Chat, CommandError<DatabaseError>> {
let (send, recv) = oneshot::channel();
self.send(CoreClientCommand::Command(Command::GetChat(jid, send)))
.await
.map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
let chat = timeout(self.timeout, recv)
.await
.map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
.map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
Ok(chat)
}
pub async fn get_messages(
&self,
jid: JID,
) -> Result<Vec<Message>, CommandError<DatabaseError>> {
let (send, recv) = oneshot::channel();
self.send(CoreClientCommand::Command(Command::GetMessages(jid, send)))
.await
.map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
let messages = timeout(self.timeout, recv)
.await
.map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
.map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
Ok(messages)
}
pub async fn delete_chat(&self, jid: JID) -> Result<(), CommandError<DatabaseError>> {
let (send, recv) = oneshot::channel();
self.send(CoreClientCommand::Command(Command::DeleteChat(jid, send)))
.await
.map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
let result = timeout(self.timeout, recv)
.await
.map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
.map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
Ok(result)
}
pub async fn delete_message(&self, id: Uuid) -> Result<(), CommandError<DatabaseError>> {
let (send, recv) = oneshot::channel();
self.send(CoreClientCommand::Command(Command::DeleteMessage(id, send)))
.await
.map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
let result = timeout(self.timeout, recv)
.await
.map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
.map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
Ok(result)
}
pub async fn get_user(&self, jid: JID) -> Result<User, CommandError<DatabaseError>> {
let (send, recv) = oneshot::channel();
self.send(CoreClientCommand::Command(Command::GetUser(jid, send)))
.await
.map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
let result = timeout(self.timeout, recv)
.await
.map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
.map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
Ok(result)
}
pub async fn add_contact(&self, jid: JID) -> Result<(), CommandError<RosterError>> {
let (send, recv) = oneshot::channel();
self.send(CoreClientCommand::Command(Command::AddContact(jid, send)))
.await
.map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
let result = timeout(self.timeout, recv)
.await
.map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
.map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
Ok(result)
}
pub async fn buddy_request(&self, jid: JID) -> Result<(), CommandError<WriteError>> {
let (send, recv) = oneshot::channel();
self.send(CoreClientCommand::Command(Command::BuddyRequest(jid, send)))
.await
.map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
let result = timeout(self.timeout, recv)
.await
.map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
.map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
Ok(result)
}
pub async fn subscription_request(&self, jid: JID) -> Result<(), CommandError<WriteError>> {
let (send, recv) = oneshot::channel();
self.send(CoreClientCommand::Command(Command::SubscriptionRequest(
jid, send,
)))
.await
.map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
let result = timeout(self.timeout, recv)
.await
.map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
.map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
Ok(result)
}
pub async fn accept_buddy_request(&self, jid: JID) -> Result<(), CommandError<WriteError>> {
let (send, recv) = oneshot::channel();
self.send(CoreClientCommand::Command(Command::AcceptBuddyRequest(
jid, send,
)))
.await
.map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
let result = timeout(self.timeout, recv)
.await
.map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
.map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
Ok(result)
}
pub async fn accept_subscription_request(
&self,
jid: JID,
) -> Result<(), CommandError<WriteError>> {
let (send, recv) = oneshot::channel();
self.send(CoreClientCommand::Command(
Command::AcceptSubscriptionRequest(jid, send),
))
.await
.map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
let result = timeout(self.timeout, recv)
.await
.map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
.map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
Ok(result)
}
pub async fn unsubscribe_from_contact(&self, jid: JID) -> Result<(), CommandError<WriteError>> {
let (send, recv) = oneshot::channel();
self.send(CoreClientCommand::Command(Command::UnsubscribeFromContact(
jid, send,
)))
.await
.map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
let result = timeout(self.timeout, recv)
.await
.map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
.map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
Ok(result)
}
pub async fn unsubscribe_contact(&self, jid: JID) -> Result<(), CommandError<WriteError>> {
let (send, recv) = oneshot::channel();
self.send(CoreClientCommand::Command(Command::UnsubscribeContact(
jid, send,
)))
.await
.map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
let result = timeout(self.timeout, recv)
.await
.map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
.map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
Ok(result)
}
pub async fn unfriend_contact(&self, jid: JID) -> Result<(), CommandError<WriteError>> {
let (send, recv) = oneshot::channel();
self.send(CoreClientCommand::Command(Command::UnfriendContact(
jid, send,
)))
.await
.map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
let result = timeout(self.timeout, recv)
.await
.map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
.map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
Ok(result)
}
pub async fn delete_contact(&self, jid: JID) -> Result<(), CommandError<RosterError>> {
let (send, recv) = oneshot::channel();
self.send(CoreClientCommand::Command(Command::DeleteContact(
jid, send,
)))
.await
.map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
let result = timeout(self.timeout, recv)
.await
.map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
.map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
Ok(result)
}
pub async fn update_contact(
&self,
jid: JID,
update: ContactUpdate,
) -> Result<(), CommandError<RosterError>> {
let (send, recv) = oneshot::channel();
self.send(CoreClientCommand::Command(Command::UpdateContact(
jid, update, send,
)))
.await
.map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
let result = timeout(self.timeout, recv)
.await
.map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
.map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
Ok(result)
}
pub async fn set_status(&self, online: Online) -> Result<(), CommandError<StatusError>> {
let (send, recv) = oneshot::channel();
self.send(CoreClientCommand::Command(Command::SetStatus(online, send)))
.await
.map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
let result = timeout(self.timeout, recv)
.await
.map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
.map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
Ok(result)
}
pub async fn send_message(&self, jid: JID, body: Body) -> Result<(), CommandError<WriteError>> {
let (send, recv) = oneshot::channel();
self.send(CoreClientCommand::Command(Command::SendMessage(
jid, body, send,
)))
.await
.map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
let result = timeout(self.timeout, recv)
.await
.map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
.map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
Ok(result)
}
}
#[derive(Clone)]
pub struct ClientLogic {
db: Db,
pending: Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, ReadError>>>>>,
update_sender: mpsc::Sender<UpdateMessage>,
}
impl Logic for ClientLogic {
type Cmd = Command;
async fn handle_connect(self, connection: Connected) {
let (send, recv) = oneshot::channel();
debug!("getting roster");
self.clone()
.handle_online(Command::GetRoster(send), connection.clone())
.await;
debug!("sent roster req");
let roster = recv.await;
debug!("got roster");
match roster {
Ok(r) => match r {
Ok(roster) => {
let online = self.db.read_cached_status().await;
let online = match online {
Ok(online) => online,
Err(e) => {
let _ = self
.update_sender
.send(UpdateMessage::Error(Error::Connecting(
ConnectionJobError::StatusCacheError(e.into()),
)))
.await;
Online::default()
}
};
let (send, recv) = oneshot::channel();
self.clone()
.handle_online(
Command::SendPresence(None, PresenceType::Online(online.clone()), send),
connection,
)
.await;
let set_status = recv.await;
match set_status {
Ok(s) => match s {
Ok(()) => {
let _ = self
.update_sender
.send(UpdateMessage::Online(online, roster))
.await;
}
Err(e) => {
let _ = self
.update_sender
.send(UpdateMessage::Error(Error::Connecting(e.into())))
.await;
}
},
Err(e) => {
let _ = self
.update_sender
.send(UpdateMessage::Error(Error::Connecting(
ConnectionJobError::SendPresence(WriteError::Actor(e.into())),
)))
.await;
}
}
}
Err(e) => {
let _ = self
.update_sender
.send(UpdateMessage::Error(Error::Connecting(e.into())))
.await;
}
},
Err(e) => {
let _ = self
.update_sender
.send(UpdateMessage::Error(Error::Connecting(
ConnectionJobError::RosterRetreival(RosterError::Write(WriteError::Actor(
e.into(),
))),
)))
.await;
}
}
}
async fn handle_disconnect(self, connection: Connected) {
// TODO: be able to set offline status message
let offline_presence: stanza::client::presence::Presence =
Offline::default().into_stanza(None);
let stanza = Stanza::Presence(offline_presence);
// TODO: timeout and error check
connection.write_handle().write(stanza).await;
let _ = self
.update_sender
.send(UpdateMessage::Offline(Offline::default()))
.await;
}
async fn handle_stanza(
self,
stanza: Stanza,
connection: Connected,
supervisor: SupervisorSender,
) {
match stanza {
Stanza::Message(stanza_message) => {
if let Some(mut from) = stanza_message.from {
// TODO: don't ignore delay from. xep says SHOULD send error if incorrect.
let timestamp = stanza_message
.delay
.map(|delay| delay.stamp)
.unwrap_or_else(|| Utc::now());
// TODO: group chat messages
let mut message = Message {
id: stanza_message
.id
// TODO: proper id storage
.map(|id| Uuid::from_str(&id).unwrap_or_else(|_| Uuid::new_v4()))
.unwrap_or_else(|| Uuid::new_v4()),
from: from.clone(),
timestamp,
body: Body {
// TODO: should this be an option?
body: stanza_message
.body
.map(|body| body.body)
.unwrap_or_default()
.unwrap_or_default(),
},
};
// TODO: can this be more efficient?
let result = self
.db
.create_message_with_user_resource_and_chat(message.clone(), from.clone())
.await;
if let Err(e) = result {
tracing::error!("messagecreate");
let _ = self
.update_sender
.send(UpdateMessage::Error(Error::MessageRecv(
MessageRecvError::MessageHistory(e.into()),
)))
.await;
}
message.from = message.from.as_bare();
from = from.as_bare();
let _ = self
.update_sender
.send(UpdateMessage::Message { to: from, message })
.await;
} else {
let _ = self
.update_sender
.send(UpdateMessage::Error(Error::MessageRecv(
MessageRecvError::MissingFrom,
)))
.await;
}
}
Stanza::Presence(presence) => {
if let Some(from) = presence.from {
match presence.r#type {
Some(r#type) => match r#type {
// error processing a presence from somebody
stanza::client::presence::PresenceType::Error => {
// TODO: is there any other information that should go with the error? also MUST have an error, otherwise it's a different error. maybe it shoulnd't be an option.
let _ = self
.update_sender
.send(UpdateMessage::Error(Error::Presence(
// TODO: ughhhhhhhhhhhhh these stanza errors should probably just have an option, and custom display
PresenceError::StanzaError(
presence
.errors
.first()
.cloned()
.expect("error MUST have error"),
),
)))
.await;
}
// should not happen (error to server)
stanza::client::presence::PresenceType::Probe => {
// TODO: should probably write an error and restart stream
let _ = self
.update_sender
.send(UpdateMessage::Error(Error::Presence(
PresenceError::Unsupported,
)))
.await;
}
stanza::client::presence::PresenceType::Subscribe => {
// may get a subscription request from somebody who is not a contact!!! therefore should be its own kind of event
let _ = self
.update_sender
.send(UpdateMessage::SubscriptionRequest(from))
.await;
}
stanza::client::presence::PresenceType::Unavailable => {
let offline = Offline {
status: presence.status.map(|status| status.status.0),
};
let timestamp = presence
.delay
.map(|delay| delay.stamp)
.unwrap_or_else(|| Utc::now());
let _ = self
.update_sender
.send(UpdateMessage::Presence {
from,
presence: Presence {
timestamp,
presence: PresenceType::Offline(offline),
},
})
.await;
}
// for now, do nothing, as these are simply informational. will receive roster push from the server regarding the changes to do with them.
stanza::client::presence::PresenceType::Subscribed => {}
stanza::client::presence::PresenceType::Unsubscribe => {}
stanza::client::presence::PresenceType::Unsubscribed => {}
},
None => {
let online = Online {
show: presence.show.map(|show| match show {
stanza::client::presence::Show::Away => Show::Away,
stanza::client::presence::Show::Chat => Show::Chat,
stanza::client::presence::Show::Dnd => Show::DoNotDisturb,
stanza::client::presence::Show::Xa => Show::ExtendedAway,
}),
status: presence.status.map(|status| status.status.0),
priority: presence.priority.map(|priority| priority.0),
};
let timestamp = presence
.delay
.map(|delay| delay.stamp)
.unwrap_or_else(|| Utc::now());
let _ = self
.update_sender
.send(UpdateMessage::Presence {
from,
presence: Presence {
timestamp,
presence: PresenceType::Online(online),
},
})
.await;
}
}
} else {
let _ = self
.update_sender
.send(UpdateMessage::Error(Error::Presence(
PresenceError::MissingFrom,
)))
.await;
}
}
Stanza::Iq(iq) => match iq.r#type {
stanza::client::iq::IqType::Error | stanza::client::iq::IqType::Result => {
let send;
{
send = self.pending.lock().await.remove(&iq.id);
}
if let Some(send) = send {
send.send(Ok(Stanza::Iq(iq)));
} else {
let _ = self
.update_sender
.send(UpdateMessage::Error(Error::Iq(IqError::NoMatchingId(
iq.id,
))))
.await;
}
}
// TODO: send unsupported to server
// TODO: proper errors i am so tired please
stanza::client::iq::IqType::Get => {}
stanza::client::iq::IqType::Set => {
if let Some(query) = iq.query {
match query {
stanza::client::iq::Query::Roster(mut query) => {
// TODO: there should only be one
if let Some(item) = query.items.pop() {
match item.subscription {
Some(stanza::roster::Subscription::Remove) => {
self.db.delete_contact(item.jid.clone()).await;
self.update_sender
.send(UpdateMessage::RosterDelete(item.jid))
.await;
// TODO: send result
}
_ => {
let contact: Contact = item.into();
if let Err(e) =
self.db.upsert_contact(contact.clone()).await
{
let _ = self
.update_sender
.send(UpdateMessage::Error(Error::Roster(
RosterError::Cache(e.into()),
)))
.await;
}
let _ = self
.update_sender
.send(UpdateMessage::RosterUpdate(contact))
.await;
// TODO: send result
// write_handle.write(Stanza::Iq(stanza::client::iq::Iq {
// from: ,
// id: todo!(),
// to: todo!(),
// r#type: todo!(),
// lang: todo!(),
// query: todo!(),
// errors: todo!(),
// }));
}
}
}
}
// TODO: send unsupported to server
_ => {}
}
} else {
// TODO: send error (unsupported) to server
}
}
},
Stanza::Error(error) => {
let _ = self
.update_sender
.send(UpdateMessage::Error(Error::Stream(error)))
.await;
// TODO: reconnect
}
Stanza::OtherContent(content) => {
let _ = self
.update_sender
.send(UpdateMessage::Error(Error::UnrecognizedContent));
// TODO: send error to write_thread
}
}
}
async fn handle_online(self, command: Command, connection: Connected) {
match command {
Command::GetRoster(result_sender) => {
// TODO: jid resource should probably be stored within the connection
debug!("before client_jid lock");
debug!("after client_jid lock");
let iq_id = Uuid::new_v4().to_string();
let (send, iq_recv) = oneshot::channel();
{
self.pending.lock().await.insert(iq_id.clone(), send);
}
let stanza = Stanza::Iq(Iq {
from: Some(connection.jid().clone()),
id: iq_id.to_string(),
to: None,
r#type: IqType::Get,
lang: None,
query: Some(iq::Query::Roster(stanza::roster::Query {
ver: None,
items: Vec::new(),
})),
errors: Vec::new(),
});
let (send, recv) = oneshot::channel();
let _ = connection
.write_handle()
.send(WriteMessage {
stanza,
respond_to: send,
})
.await;
// TODO: timeout
match recv.await {
Ok(Ok(())) => info!("roster request sent"),
Ok(Err(e)) => {
// TODO: log errors if fail to send
let _ = result_sender.send(Err(RosterError::Write(e.into())));
return;
}
Err(e) => {
let _ = result_sender
.send(Err(RosterError::Write(WriteError::Actor(e.into()))));
return;
}
};
// TODO: timeout
match iq_recv.await {
Ok(Ok(stanza)) => match stanza {
Stanza::Iq(Iq {
from: _,
id,
to: _,
r#type,
lang: _,
query: Some(iq::Query::Roster(stanza::roster::Query { ver: _, items })),
errors: _,
}) if id == iq_id && r#type == IqType::Result => {
let contacts: Vec<Contact> =
items.into_iter().map(|item| item.into()).collect();
if let Err(e) = self.db.replace_cached_roster(contacts.clone()).await {
self.update_sender
.send(UpdateMessage::Error(Error::Roster(RosterError::Cache(
e.into(),
))))
.await;
};
result_sender.send(Ok(contacts));
return;
}
ref s @ Stanza::Iq(Iq {
from: _,
ref id,
to: _,
r#type,
lang: _,
query: _,
ref errors,
}) if *id == iq_id && r#type == IqType::Error => {
if let Some(error) = errors.first() {
result_sender.send(Err(RosterError::StanzaError(error.clone())));
} else {
result_sender.send(Err(RosterError::UnexpectedStanza(s.clone())));
}
return;
}
s => {
result_sender.send(Err(RosterError::UnexpectedStanza(s)));
return;
}
},
Ok(Err(e)) => {
result_sender.send(Err(RosterError::Read(e)));
return;
}
Err(e) => {
result_sender.send(Err(RosterError::Write(WriteError::Actor(e.into()))));
return;
}
}
}
Command::GetChats(sender) => {
let chats = self.db.read_chats().await.map_err(|e| e.into());
sender.send(chats);
}
Command::GetChatsOrdered(sender) => {
let chats = self.db.read_chats_ordered().await.map_err(|e| e.into());
sender.send(chats);
}
Command::GetChatsOrderedWithLatestMessages(sender) => {
let chats = self
.db
.read_chats_ordered_with_latest_messages()
.await
.map_err(|e| e.into());
sender.send(chats);
}
Command::GetChat(jid, sender) => {
let chats = self.db.read_chat(jid).await.map_err(|e| e.into());
sender.send(chats);
}
Command::GetMessages(jid, sender) => {
let messages = self
.db
.read_message_history(jid)
.await
.map_err(|e| e.into());
sender.send(messages);
}
Command::DeleteChat(jid, sender) => {
let result = self.db.delete_chat(jid).await.map_err(|e| e.into());
sender.send(result);
}
Command::DeleteMessage(uuid, sender) => {
let result = self.db.delete_message(uuid).await.map_err(|e| e.into());
sender.send(result);
}
Command::GetUser(jid, sender) => {
let user = self.db.read_user(jid).await.map_err(|e| e.into());
sender.send(user);
}
// TODO: offline queue to modify roster
Command::AddContact(jid, sender) => {
let iq_id = Uuid::new_v4().to_string();
let set_stanza = Stanza::Iq(Iq {
from: Some(connection.jid().clone()),
id: iq_id.clone(),
to: None,
r#type: IqType::Set,
lang: None,
query: Some(iq::Query::Roster(stanza::roster::Query {
ver: None,
items: vec![stanza::roster::Item {
approved: None,
ask: false,
jid,
name: None,
subscription: None,
groups: Vec::new(),
}],
})),
errors: Vec::new(),
});
let (send, recv) = oneshot::channel();
{
self.pending.lock().await.insert(iq_id.clone(), send);
}
// TODO: write_handle send helper function
let result = connection.write_handle().write(set_stanza).await;
if let Err(e) = result {
sender.send(Err(RosterError::Write(e)));
return;
}
let iq_result = recv.await;
match iq_result {
Ok(i) => match i {
Ok(iq_result) => match iq_result {
Stanza::Iq(Iq {
from: _,
id,
to: _,
r#type,
lang: _,
query: _,
errors: _,
}) if id == iq_id && r#type == IqType::Result => {
sender.send(Ok(()));
return;
}
ref s @ Stanza::Iq(Iq {
from: _,
ref id,
to: _,
r#type,
lang: _,
query: _,
ref errors,
}) if *id == iq_id && r#type == IqType::Error => {
if let Some(error) = errors.first() {
sender.send(Err(RosterError::StanzaError(error.clone())));
} else {
sender.send(Err(RosterError::UnexpectedStanza(s.clone())));
}
return;
}
s => {
sender.send(Err(RosterError::UnexpectedStanza(s)));
return;
}
},
Err(e) => {
sender.send(Err(e.into()));
return;
}
},
Err(e) => {
sender.send(Err(RosterError::Write(WriteError::Actor(e.into()))));
return;
}
}
}
Command::BuddyRequest(jid, sender) => {
let presence = Stanza::Presence(stanza::client::presence::Presence {
from: None,
id: None,
to: Some(jid.clone()),
r#type: Some(stanza::client::presence::PresenceType::Subscribe),
lang: None,
show: None,
status: None,
priority: None,
errors: Vec::new(),
delay: None,
});
let result = connection.write_handle().write(presence).await;
match result {
Err(_) => {
let _ = sender.send(result);
}
Ok(()) => {
let presence = Stanza::Presence(stanza::client::presence::Presence {
from: None,
id: None,
to: Some(jid),
r#type: Some(stanza::client::presence::PresenceType::Subscribed),
lang: None,
show: None,
status: None,
priority: None,
errors: Vec::new(),
delay: None,
});
let result = connection.write_handle().write(presence).await;
let _ = sender.send(result);
}
}
}
Command::SubscriptionRequest(jid, sender) => {
// TODO: i should probably have builders
let presence = Stanza::Presence(stanza::client::presence::Presence {
from: None,
id: None,
to: Some(jid),
r#type: Some(stanza::client::presence::PresenceType::Subscribe),
lang: None,
show: None,
status: None,
priority: None,
errors: Vec::new(),
delay: None,
});
let result = connection.write_handle().write(presence).await;
let _ = sender.send(result);
}
Command::AcceptBuddyRequest(jid, sender) => {
let presence = Stanza::Presence(stanza::client::presence::Presence {
from: None,
id: None,
to: Some(jid.clone()),
r#type: Some(stanza::client::presence::PresenceType::Subscribed),
lang: None,
show: None,
status: None,
priority: None,
errors: Vec::new(),
delay: None,
});
let result = connection.write_handle().write(presence).await;
match result {
Err(_) => {
let _ = sender.send(result);
}
Ok(()) => {
let presence = Stanza::Presence(stanza::client::presence::Presence {
from: None,
id: None,
to: Some(jid),
r#type: Some(stanza::client::presence::PresenceType::Subscribe),
lang: None,
show: None,
status: None,
priority: None,
errors: Vec::new(),
delay: None,
});
let result = connection.write_handle().write(presence).await;
let _ = sender.send(result);
}
}
}
Command::AcceptSubscriptionRequest(jid, sender) => {
let presence = Stanza::Presence(stanza::client::presence::Presence {
from: None,
id: None,
to: Some(jid),
r#type: Some(stanza::client::presence::PresenceType::Subscribe),
lang: None,
show: None,
status: None,
priority: None,
errors: Vec::new(),
delay: None,
});
let result = connection.write_handle().write(presence).await;
let _ = sender.send(result);
}
Command::UnsubscribeFromContact(jid, sender) => {
let presence = Stanza::Presence(stanza::client::presence::Presence {
from: None,
id: None,
to: Some(jid),
r#type: Some(stanza::client::presence::PresenceType::Unsubscribe),
lang: None,
show: None,
status: None,
priority: None,
errors: Vec::new(),
delay: None,
});
let result = connection.write_handle().write(presence).await;
let _ = sender.send(result);
}
Command::UnsubscribeContact(jid, sender) => {
let presence = Stanza::Presence(stanza::client::presence::Presence {
from: None,
id: None,
to: Some(jid),
r#type: Some(stanza::client::presence::PresenceType::Unsubscribed),
lang: None,
show: None,
status: None,
priority: None,
errors: Vec::new(),
delay: None,
});
let result = connection.write_handle().write(presence).await;
let _ = sender.send(result);
}
Command::UnfriendContact(jid, sender) => {
let presence = Stanza::Presence(stanza::client::presence::Presence {
from: None,
id: None,
to: Some(jid.clone()),
r#type: Some(stanza::client::presence::PresenceType::Unsubscribe),
lang: None,
show: None,
status: None,
priority: None,
errors: Vec::new(),
delay: None,
});
let result = connection.write_handle().write(presence).await;
match result {
Err(_) => {
let _ = sender.send(result);
}
Ok(()) => {
let presence = Stanza::Presence(stanza::client::presence::Presence {
from: None,
id: None,
to: Some(jid),
r#type: Some(stanza::client::presence::PresenceType::Unsubscribed),
lang: None,
show: None,
status: None,
priority: None,
errors: Vec::new(),
delay: None,
});
let result = connection.write_handle().write(presence).await;
let _ = sender.send(result);
}
}
}
Command::DeleteContact(jid, sender) => {
let iq_id = Uuid::new_v4().to_string();
let set_stanza = Stanza::Iq(Iq {
from: Some(connection.jid().clone()),
id: iq_id.clone(),
to: None,
r#type: IqType::Set,
lang: None,
query: Some(iq::Query::Roster(stanza::roster::Query {
ver: None,
items: vec![stanza::roster::Item {
approved: None,
ask: false,
jid,
name: None,
subscription: Some(stanza::roster::Subscription::Remove),
groups: Vec::new(),
}],
})),
errors: Vec::new(),
});
let (send, recv) = oneshot::channel();
{
self.pending.lock().await.insert(iq_id.clone(), send);
}
let result = connection.write_handle().write(set_stanza).await;
if let Err(e) = result {
sender.send(Err(RosterError::Write(e)));
return;
}
let iq_result = recv.await;
match iq_result {
Ok(i) => match i {
Ok(iq_result) => match iq_result {
Stanza::Iq(Iq {
from: _,
id,
to: _,
r#type,
lang: _,
query: _,
errors: _,
}) if id == iq_id && r#type == IqType::Result => {
sender.send(Ok(()));
return;
}
ref s @ Stanza::Iq(Iq {
from: _,
ref id,
to: _,
r#type,
lang: _,
query: _,
ref errors,
}) if *id == iq_id && r#type == IqType::Error => {
if let Some(error) = errors.first() {
sender.send(Err(RosterError::StanzaError(error.clone())));
} else {
sender.send(Err(RosterError::UnexpectedStanza(s.clone())));
}
return;
}
s => {
sender.send(Err(RosterError::UnexpectedStanza(s)));
return;
}
},
Err(e) => {
sender.send(Err(e.into()));
return;
}
},
Err(e) => {
sender.send(Err(RosterError::Write(WriteError::Actor(e.into()))));
return;
}
}
}
Command::UpdateContact(jid, contact_update, sender) => {
let iq_id = Uuid::new_v4().to_string();
let groups = Vec::from_iter(
contact_update
.groups
.into_iter()
.map(|group| stanza::roster::Group(Some(group))),
);
let set_stanza = Stanza::Iq(Iq {
from: Some(connection.jid().clone()),
id: iq_id.clone(),
to: None,
r#type: IqType::Set,
lang: None,
query: Some(iq::Query::Roster(stanza::roster::Query {
ver: None,
items: vec![stanza::roster::Item {
approved: None,
ask: false,
jid,
name: contact_update.name,
subscription: None,
groups,
}],
})),
errors: Vec::new(),
});
let (send, recv) = oneshot::channel();
{
self.pending.lock().await.insert(iq_id.clone(), send);
}
let result = connection.write_handle().write(set_stanza).await;
if let Err(e) = result {
sender.send(Err(RosterError::Write(e)));
return;
}
let iq_result = recv.await;
match iq_result {
Ok(i) => match i {
Ok(iq_result) => match iq_result {
Stanza::Iq(Iq {
from: _,
id,
to: _,
r#type,
lang: _,
query: _,
errors: _,
}) if id == iq_id && r#type == IqType::Result => {
sender.send(Ok(()));
return;
}
ref s @ Stanza::Iq(Iq {
from: _,
ref id,
to: _,
r#type,
lang: _,
query: _,
ref errors,
}) if *id == iq_id && r#type == IqType::Error => {
if let Some(error) = errors.first() {
sender.send(Err(RosterError::StanzaError(error.clone())));
} else {
sender.send(Err(RosterError::UnexpectedStanza(s.clone())));
}
return;
}
s => {
sender.send(Err(RosterError::UnexpectedStanza(s)));
return;
}
},
Err(e) => {
sender.send(Err(e.into()));
return;
}
},
Err(e) => {
sender.send(Err(RosterError::Write(WriteError::Actor(e.into()))));
return;
}
}
}
Command::SetStatus(online, sender) => {
let result = self.db.upsert_cached_status(online.clone()).await;
if let Err(e) = result {
let _ = self
.update_sender
.send(UpdateMessage::Error(Error::SetStatus(StatusError::Cache(
e.into(),
))))
.await;
}
let result = connection
.write_handle()
.write(Stanza::Presence(online.into_stanza(None)))
.await
.map_err(|e| StatusError::Write(e));
// .map_err(|e| StatusError::Write(e));
let _ = sender.send(result);
}
// TODO: offline message queue
Command::SendMessage(jid, body, sender) => {
let id = Uuid::new_v4();
let message = Stanza::Message(stanza::client::message::Message {
from: Some(connection.jid().clone()),
id: Some(id.to_string()),
to: Some(jid.clone()),
// TODO: specify message type
r#type: stanza::client::message::MessageType::Chat,
// TODO: lang ?
lang: None,
subject: None,
body: Some(stanza::client::message::Body {
lang: None,
body: Some(body.body.clone()),
}),
thread: None,
delay: None,
});
let _ = sender.send(Ok(()));
// let _ = sender.send(Ok(message.clone()));
let result = connection.write_handle().write(message).await;
match result {
Ok(_) => {
let mut message = Message {
id,
from: connection.jid().clone(),
body,
timestamp: Utc::now(),
};
info!("send message {:?}", message);
if let Err(e) = self
.db
.create_message_with_self_resource_and_chat(
message.clone(),
jid.clone(),
)
.await
.map_err(|e| e.into())
{
tracing::error!("{}", e);
let _ =
self.update_sender
.send(UpdateMessage::Error(Error::MessageSend(
error::MessageSendError::MessageHistory(e),
)));
}
// TODO: don't do this, have separate from from details
message.from = message.from.as_bare();
let _ = self
.update_sender
.send(UpdateMessage::Message { to: jid, message })
.await;
}
Err(_) => {
// let _ = sender.send(result);
}
}
}
Command::SendPresence(jid, presence, sender) => {
let mut presence: stanza::client::presence::Presence = presence.into();
if let Some(jid) = jid {
presence.to = Some(jid);
};
let result = connection
.write_handle()
.write(Stanza::Presence(presence))
.await;
// .map_err(|e| StatusError::Write(e));
let _ = sender.send(result);
}
}
}
async fn handle_offline(self, command: Command) {
match command {
Command::GetRoster(sender) => {
let roster = self.db.read_cached_roster().await;
match roster {
Ok(roster) => {
let _ = sender.send(Ok(roster));
}
Err(e) => {
let _ = sender.send(Err(RosterError::Cache(e.into())));
}
}
}
Command::GetChats(sender) => {
let chats = self.db.read_chats().await.map_err(|e| e.into());
sender.send(chats);
}
Command::GetChatsOrdered(sender) => {
let chats = self.db.read_chats_ordered().await.map_err(|e| e.into());
sender.send(chats);
}
Command::GetChatsOrderedWithLatestMessages(sender) => {
let chats = self
.db
.read_chats_ordered_with_latest_messages()
.await
.map_err(|e| e.into());
sender.send(chats);
}
Command::GetChat(jid, sender) => {
let chats = self.db.read_chat(jid).await.map_err(|e| e.into());
sender.send(chats);
}
Command::GetMessages(jid, sender) => {
let messages = self
.db
.read_message_history(jid)
.await
.map_err(|e| e.into());
sender.send(messages);
}
Command::DeleteChat(jid, sender) => {
let result = self.db.delete_chat(jid).await.map_err(|e| e.into());
sender.send(result);
}
Command::DeleteMessage(uuid, sender) => {
let result = self.db.delete_message(uuid).await.map_err(|e| e.into());
sender.send(result);
}
Command::GetUser(jid, sender) => {
let user = self.db.read_user(jid).await.map_err(|e| e.into());
sender.send(user);
}
// TODO: offline queue to modify roster
Command::AddContact(_jid, sender) => {
sender.send(Err(RosterError::Write(WriteError::Disconnected)));
}
Command::BuddyRequest(_jid, sender) => {
sender.send(Err(WriteError::Disconnected));
}
Command::SubscriptionRequest(_jid, sender) => {
sender.send(Err(WriteError::Disconnected));
}
Command::AcceptBuddyRequest(_jid, sender) => {
sender.send(Err(WriteError::Disconnected));
}
Command::AcceptSubscriptionRequest(_jid, sender) => {
sender.send(Err(WriteError::Disconnected));
}
Command::UnsubscribeFromContact(_jid, sender) => {
sender.send(Err(WriteError::Disconnected));
}
Command::UnsubscribeContact(_jid, sender) => {
sender.send(Err(WriteError::Disconnected));
}
Command::UnfriendContact(_jid, sender) => {
sender.send(Err(WriteError::Disconnected));
}
Command::DeleteContact(_jid, sender) => {
sender.send(Err(RosterError::Write(WriteError::Disconnected)));
}
Command::UpdateContact(_jid, _contact_update, sender) => {
sender.send(Err(RosterError::Write(WriteError::Disconnected)));
}
Command::SetStatus(online, sender) => {
let result = self
.db
.upsert_cached_status(online)
.await
.map_err(|e| StatusError::Cache(e.into()));
sender.send(result);
}
// TODO: offline message queue
Command::SendMessage(_jid, _body, sender) => {
sender.send(Err(WriteError::Disconnected));
}
Command::SendPresence(_jid, _presence, sender) => {
sender.send(Err(WriteError::Disconnected));
}
}
}
// pub async fn handle_stream_error(self, error) {}
// stanza errors (recoverable)
// pub async fn handle_error(self, error: Error) {}
// when it aborts, must clear iq map no matter what
async fn on_abort(self) {
let mut iqs = self.pending.lock().await;
for (_id, sender) in iqs.drain() {
let _ = sender.send(Err(ReadError::LostConnection));
}
}
async fn handle_connection_error(self, error: ConnectionError) {
self.update_sender
.send(UpdateMessage::Error(
ConnectionError::AlreadyConnected.into(),
))
.await;
}
}
impl From<Command> for CoreClientCommand<Command> {
fn from(value: Command) -> Self {
CoreClientCommand::Command(value)
}
}
#[derive(Debug, Clone)]
pub enum UpdateMessage {
Error(Error),
Online(Online, Vec<Contact>),
Offline(Offline),
/// received roster from jabber server (replace full app roster state with this)
/// is this needed?
FullRoster(Vec<Contact>),
/// (only update app roster state, don't replace)
RosterUpdate(Contact),
RosterDelete(JID),
/// presences should be stored with users in the ui, not contacts, as presences can be received from anyone
Presence {
from: JID,
presence: Presence,
},
// TODO: receipts
// MessageDispatched(Uuid),
Message {
to: JID,
message: Message,
},
SubscriptionRequest(jid::JID),
}