use std::{
collections::HashMap,
ops::{Deref, DerefMut},
str::FromStr,
sync::Arc,
time::Duration,
};
use chat::{Body, Chat, Message};
use chrono::Utc;
use db::Db;
use error::{
ConnectionJobError, DatabaseError, Error, IqError, MessageRecvError, PresenceError,
RosterError, StatusError,
};
use futures::FutureExt;
use jid::JID;
use lampada::{
Connected, CoreClient, CoreClientCommand, Logic, SupervisorSender, WriteMessage,
error::{ActorError, CommandError, ConnectionError, ReadError, WriteError},
};
use logic::ClientLogic;
use presence::{Offline, Online, Presence, PresenceType, Show};
use roster::{Contact, ContactUpdate};
use stanza::client::{
Stanza,
iq::{self, Iq, IqType},
};
use tokio::{
sync::{Mutex, mpsc, oneshot},
time::timeout,
};
use tracing::{debug, info};
use user::User;
use uuid::Uuid;
pub mod chat;
pub mod db;
pub mod error;
mod logic;
pub mod presence;
pub mod roster;
pub mod user;
pub enum Command {
/// get the roster. if offline, retreive cached version from database. should be stored in application memory
GetRoster(oneshot::Sender<Result<Vec<Contact>, RosterError>>),
/// get all chats. chat will include 10 messages in their message Vec (enough for chat previews)
// TODO: paging and filtering
GetChats(oneshot::Sender<Result<Vec<Chat>, DatabaseError>>),
// TODO: paging and filtering
GetChatsOrdered(oneshot::Sender<Result<Vec<Chat>, DatabaseError>>),
// TODO: paging and filtering
GetChatsOrderedWithLatestMessages(oneshot::Sender<Result<Vec<(Chat, Message)>, DatabaseError>>),
/// get a specific chat by jid
GetChat(JID, oneshot::Sender<Result<Chat, DatabaseError>>),
/// get message history for chat (does appropriate mam things)
// TODO: paging and filtering
GetMessages(JID, oneshot::Sender<Result<Vec<Message>, DatabaseError>>),
/// delete a chat from your chat history, along with all the corresponding messages
DeleteChat(JID, oneshot::Sender<Result<(), DatabaseError>>),
/// delete a message from your chat history
DeleteMessage(Uuid, oneshot::Sender<Result<(), DatabaseError>>),
/// get a user from your users database
GetUser(JID, oneshot::Sender<Result<User, DatabaseError>>),
/// add a contact to your roster, with a status of none, no subscriptions.
AddContact(JID, oneshot::Sender<Result<(), RosterError>>),
/// send a friend request i.e. a subscription request with a subscription pre-approval. if not already added to roster server adds to roster.
BuddyRequest(JID, oneshot::Sender<Result<(), WriteError>>),
/// send a subscription request, without pre-approval. if not already added to roster server adds to roster.
SubscriptionRequest(JID, oneshot::Sender<Result<(), WriteError>>),
/// accept a friend request by accepting a pending subscription and sending a subscription request back. if not already added to roster adds to roster.
AcceptBuddyRequest(JID, oneshot::Sender<Result<(), WriteError>>),
/// accept a pending subscription and doesn't send a subscription request back. if not already added to roster adds to roster.
AcceptSubscriptionRequest(JID, oneshot::Sender<Result<(), WriteError>>),
/// unsubscribe to a contact, but don't remove their subscription.
UnsubscribeFromContact(JID, oneshot::Sender<Result<(), WriteError>>),
/// stop a contact from being subscribed, but stay subscribed to the contact.
UnsubscribeContact(JID, oneshot::Sender<Result<(), WriteError>>),
/// remove subscriptions to and from contact, but keep in roster.
UnfriendContact(JID, oneshot::Sender<Result<(), WriteError>>),
/// remove a contact from the contact list. will remove subscriptions if not already done then delete contact from roster.
DeleteContact(JID, oneshot::Sender<Result<(), RosterError>>),
/// update contact. contact details will be overwritten with the contents of the contactupdate struct.
UpdateContact(JID, ContactUpdate, oneshot::Sender<Result<(), RosterError>>),
/// set online status. if disconnected, will be cached so when client connects, will be sent as the initial presence.
SetStatus(Online, oneshot::Sender<Result<(), StatusError>>),
/// send presence stanza
// TODO: cache presence stanza
SendPresence(
Option<JID>,
PresenceType,
oneshot::Sender<Result<(), WriteError>>,
),
/// send a directed presence (usually to a non-contact).
// TODO: should probably make it so people can add non-contact auto presence sharing in the client (most likely through setting an internal setting)
/// send a message to a jid (any kind of jid that can receive a message, e.g. a user or a
/// chatroom). if disconnected, will be cached so when client connects, message will be sent.
SendMessage(JID, Body, oneshot::Sender<Result<(), WriteError>>),
}
/// an xmpp client that is suited for a chat client use case
#[derive(Debug)]
pub struct Client {
sender: mpsc::Sender<CoreClientCommand<Command>>,
timeout: Duration,
}
impl Clone for Client {
fn clone(&self) -> Self {
Self {
sender: self.sender.clone(),
timeout: self.timeout,
}
}
}
impl Deref for Client {
type Target = mpsc::Sender<CoreClientCommand<Command>>;
fn deref(&self) -> &Self::Target {
&self.sender
}
}
impl DerefMut for Client {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.sender
}
}
impl Client {
pub async fn connect(&self) -> Result<(), ActorError> {
self.send(CoreClientCommand::Connect).await?;
Ok(())
}
pub async fn disconnect(&self, offline: Offline) -> Result<(), ActorError> {
self.send(CoreClientCommand::Disconnect).await?;
Ok(())
}
pub fn new(jid: JID, password: String, db: Db) -> (Self, mpsc::Receiver<UpdateMessage>) {
let (command_sender, command_receiver) = mpsc::channel(20);
let (update_send, update_recv) = mpsc::channel(20);
// might be bad, first supervisor shutdown notification oneshot is never used (disgusting)
let (_sup_send, sup_recv) = oneshot::channel();
let sup_recv = sup_recv.fuse();
let logic = ClientLogic::new(db, Arc::new(Mutex::new(HashMap::new())), update_send);
let actor: CoreClient<ClientLogic> =
CoreClient::new(jid, password, command_receiver, None, sup_recv, logic);
tokio::spawn(async move { actor.run().await });
(
Self {
sender: command_sender,
// TODO: configure timeout
timeout: Duration::from_secs(10),
},
update_recv,
)
}
pub async fn get_roster(&self) -> Result<Vec<Contact>, CommandError<RosterError>> {
let (send, recv) = oneshot::channel();
self.send(CoreClientCommand::Command(Command::GetRoster(send)))
.await
.map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
let roster = timeout(self.timeout, recv)
.await
.map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
.map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
Ok(roster)
}
pub async fn get_chats(&self) -> Result<Vec<Chat>, CommandError<DatabaseError>> {
let (send, recv) = oneshot::channel();
self.send(CoreClientCommand::Command(Command::GetChats(send)))
.await
.map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
let chats = timeout(self.timeout, recv)
.await
.map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
.map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
Ok(chats)
}
pub async fn get_chats_ordered(&self) -> Result<Vec<Chat>, CommandError<DatabaseError>> {
let (send, recv) = oneshot::channel();
self.send(CoreClientCommand::Command(Command::GetChatsOrdered(send)))
.await
.map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
let chats = timeout(self.timeout, recv)
.await
.map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
.map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
Ok(chats)
}
pub async fn get_chats_ordered_with_latest_messages(
&self,
) -> Result<Vec<(Chat, Message)>, CommandError<DatabaseError>> {
let (send, recv) = oneshot::channel();
self.send(CoreClientCommand::Command(
Command::GetChatsOrderedWithLatestMessages(send),
))
.await
.map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
let chats = timeout(self.timeout, recv)
.await
.map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
.map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
Ok(chats)
}
pub async fn get_chat(&self, jid: JID) -> Result<Chat, CommandError<DatabaseError>> {
let (send, recv) = oneshot::channel();
self.send(CoreClientCommand::Command(Command::GetChat(jid, send)))
.await
.map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
let chat = timeout(self.timeout, recv)
.await
.map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
.map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
Ok(chat)
}
pub async fn get_messages(
&self,
jid: JID,
) -> Result<Vec<Message>, CommandError<DatabaseError>> {
let (send, recv) = oneshot::channel();
self.send(CoreClientCommand::Command(Command::GetMessages(jid, send)))
.await
.map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
let messages = timeout(self.timeout, recv)
.await
.map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
.map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
Ok(messages)
}
pub async fn delete_chat(&self, jid: JID) -> Result<(), CommandError<DatabaseError>> {
let (send, recv) = oneshot::channel();
self.send(CoreClientCommand::Command(Command::DeleteChat(jid, send)))
.await
.map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
let result = timeout(self.timeout, recv)
.await
.map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
.map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
Ok(result)
}
pub async fn delete_message(&self, id: Uuid) -> Result<(), CommandError<DatabaseError>> {
let (send, recv) = oneshot::channel();
self.send(CoreClientCommand::Command(Command::DeleteMessage(id, send)))
.await
.map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
let result = timeout(self.timeout, recv)
.await
.map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
.map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
Ok(result)
}
pub async fn get_user(&self, jid: JID) -> Result<User, CommandError<DatabaseError>> {
let (send, recv) = oneshot::channel();
self.send(CoreClientCommand::Command(Command::GetUser(jid, send)))
.await
.map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
let result = timeout(self.timeout, recv)
.await
.map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
.map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
Ok(result)
}
pub async fn add_contact(&self, jid: JID) -> Result<(), CommandError<RosterError>> {
let (send, recv) = oneshot::channel();
self.send(CoreClientCommand::Command(Command::AddContact(jid, send)))
.await
.map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
let result = timeout(self.timeout, recv)
.await
.map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
.map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
Ok(result)
}
pub async fn buddy_request(&self, jid: JID) -> Result<(), CommandError<WriteError>> {
let (send, recv) = oneshot::channel();
self.send(CoreClientCommand::Command(Command::BuddyRequest(jid, send)))
.await
.map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
let result = timeout(self.timeout, recv)
.await
.map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
.map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
Ok(result)
}
pub async fn subscription_request(&self, jid: JID) -> Result<(), CommandError<WriteError>> {
let (send, recv) = oneshot::channel();
self.send(CoreClientCommand::Command(Command::SubscriptionRequest(
jid, send,
)))
.await
.map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
let result = timeout(self.timeout, recv)
.await
.map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
.map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
Ok(result)
}
pub async fn accept_buddy_request(&self, jid: JID) -> Result<(), CommandError<WriteError>> {
let (send, recv) = oneshot::channel();
self.send(CoreClientCommand::Command(Command::AcceptBuddyRequest(
jid, send,
)))
.await
.map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
let result = timeout(self.timeout, recv)
.await
.map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
.map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
Ok(result)
}
pub async fn accept_subscription_request(
&self,
jid: JID,
) -> Result<(), CommandError<WriteError>> {
let (send, recv) = oneshot::channel();
self.send(CoreClientCommand::Command(
Command::AcceptSubscriptionRequest(jid, send),
))
.await
.map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
let result = timeout(self.timeout, recv)
.await
.map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
.map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
Ok(result)
}
pub async fn unsubscribe_from_contact(&self, jid: JID) -> Result<(), CommandError<WriteError>> {
let (send, recv) = oneshot::channel();
self.send(CoreClientCommand::Command(Command::UnsubscribeFromContact(
jid, send,
)))
.await
.map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
let result = timeout(self.timeout, recv)
.await
.map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
.map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
Ok(result)
}
pub async fn unsubscribe_contact(&self, jid: JID) -> Result<(), CommandError<WriteError>> {
let (send, recv) = oneshot::channel();
self.send(CoreClientCommand::Command(Command::UnsubscribeContact(
jid, send,
)))
.await
.map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
let result = timeout(self.timeout, recv)
.await
.map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
.map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
Ok(result)
}
pub async fn unfriend_contact(&self, jid: JID) -> Result<(), CommandError<WriteError>> {
let (send, recv) = oneshot::channel();
self.send(CoreClientCommand::Command(Command::UnfriendContact(
jid, send,
)))
.await
.map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
let result = timeout(self.timeout, recv)
.await
.map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
.map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
Ok(result)
}
pub async fn delete_contact(&self, jid: JID) -> Result<(), CommandError<RosterError>> {
let (send, recv) = oneshot::channel();
self.send(CoreClientCommand::Command(Command::DeleteContact(
jid, send,
)))
.await
.map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
let result = timeout(self.timeout, recv)
.await
.map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
.map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
Ok(result)
}
pub async fn update_contact(
&self,
jid: JID,
update: ContactUpdate,
) -> Result<(), CommandError<RosterError>> {
let (send, recv) = oneshot::channel();
self.send(CoreClientCommand::Command(Command::UpdateContact(
jid, update, send,
)))
.await
.map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
let result = timeout(self.timeout, recv)
.await
.map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
.map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
Ok(result)
}
pub async fn set_status(&self, online: Online) -> Result<(), CommandError<StatusError>> {
let (send, recv) = oneshot::channel();
self.send(CoreClientCommand::Command(Command::SetStatus(online, send)))
.await
.map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
let result = timeout(self.timeout, recv)
.await
.map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
.map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
Ok(result)
}
pub async fn send_message(&self, jid: JID, body: Body) -> Result<(), CommandError<WriteError>> {
let (send, recv) = oneshot::channel();
self.send(CoreClientCommand::Command(Command::SendMessage(
jid, body, send,
)))
.await
.map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
let result = timeout(self.timeout, recv)
.await
.map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
.map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
Ok(result)
}
}
impl From<Command> for CoreClientCommand<Command> {
fn from(value: Command) -> Self {
CoreClientCommand::Command(value)
}
}
#[derive(Debug, Clone)]
pub enum UpdateMessage {
Error(Error),
Online(Online, Vec<Contact>),
Offline(Offline),
/// received roster from jabber server (replace full app roster state with this)
/// is this needed?
FullRoster(Vec<Contact>),
/// (only update app roster state, don't replace)
RosterUpdate(Contact),
RosterDelete(JID),
/// presences should be stored with users in the ui, not contacts, as presences can be received from anyone
Presence {
from: JID,
presence: Presence,
},
// TODO: receipts
// MessageDispatched(Uuid),
Message {
to: JID,
message: Message,
},
SubscriptionRequest(jid::JID),
}