aboutsummaryrefslogtreecommitdiffstats
path: root/filamento/src/lib.rs
diff options
context:
space:
mode:
Diffstat (limited to 'filamento/src/lib.rs')
-rw-r--r--filamento/src/lib.rs1598
1 files changed, 1598 insertions, 0 deletions
diff --git a/filamento/src/lib.rs b/filamento/src/lib.rs
new file mode 100644
index 0000000..db59a67
--- /dev/null
+++ b/filamento/src/lib.rs
@@ -0,0 +1,1598 @@
+use std::{
+ collections::HashMap,
+ ops::{Deref, DerefMut},
+ str::FromStr,
+ sync::Arc,
+ time::Duration,
+};
+
+use chat::{Body, Chat, Message};
+use chrono::Utc;
+use db::Db;
+use error::{
+ ConnectionJobError, DatabaseError, Error, IqError, MessageRecvError, PresenceError,
+ RosterError, StatusError,
+};
+use futures::FutureExt;
+use jid::JID;
+use lampada::{
+ Connected, CoreClient, CoreClientCommand, Logic, SupervisorSender, WriteMessage,
+ error::{ActorError, CommandError, ConnectionError, ReadError, WriteError},
+};
+use presence::{Offline, Online, Presence, PresenceType, Show};
+use roster::{Contact, ContactUpdate};
+use stanza::client::{
+ Stanza,
+ iq::{self, Iq, IqType},
+};
+use tokio::{
+ sync::{Mutex, mpsc, oneshot},
+ time::timeout,
+};
+use tracing::{debug, info};
+use user::User;
+use uuid::Uuid;
+
+pub mod chat;
+pub mod db;
+pub mod error;
+pub mod presence;
+pub mod roster;
+pub mod user;
+
+pub enum Command {
+ /// get the roster. if offline, retreive cached version from database. should be stored in application memory
+ GetRoster(oneshot::Sender<Result<Vec<Contact>, RosterError>>),
+ /// get all chats. chat will include 10 messages in their message Vec (enough for chat previews)
+ // TODO: paging and filtering
+ GetChats(oneshot::Sender<Result<Vec<Chat>, DatabaseError>>),
+ // TODO: paging and filtering
+ GetChatsOrdered(oneshot::Sender<Result<Vec<Chat>, DatabaseError>>),
+ // TODO: paging and filtering
+ GetChatsOrderedWithLatestMessages(oneshot::Sender<Result<Vec<(Chat, Message)>, DatabaseError>>),
+ /// get a specific chat by jid
+ GetChat(JID, oneshot::Sender<Result<Chat, DatabaseError>>),
+ /// get message history for chat (does appropriate mam things)
+ // TODO: paging and filtering
+ GetMessages(JID, oneshot::Sender<Result<Vec<Message>, DatabaseError>>),
+ /// delete a chat from your chat history, along with all the corresponding messages
+ DeleteChat(JID, oneshot::Sender<Result<(), DatabaseError>>),
+ /// delete a message from your chat history
+ DeleteMessage(Uuid, oneshot::Sender<Result<(), DatabaseError>>),
+ /// get a user from your users database
+ GetUser(JID, oneshot::Sender<Result<User, DatabaseError>>),
+ /// add a contact to your roster, with a status of none, no subscriptions.
+ AddContact(JID, oneshot::Sender<Result<(), RosterError>>),
+ /// send a friend request i.e. a subscription request with a subscription pre-approval. if not already added to roster server adds to roster.
+ BuddyRequest(JID, oneshot::Sender<Result<(), WriteError>>),
+ /// send a subscription request, without pre-approval. if not already added to roster server adds to roster.
+ SubscriptionRequest(JID, oneshot::Sender<Result<(), WriteError>>),
+ /// accept a friend request by accepting a pending subscription and sending a subscription request back. if not already added to roster adds to roster.
+ AcceptBuddyRequest(JID, oneshot::Sender<Result<(), WriteError>>),
+ /// accept a pending subscription and doesn't send a subscription request back. if not already added to roster adds to roster.
+ AcceptSubscriptionRequest(JID, oneshot::Sender<Result<(), WriteError>>),
+ /// unsubscribe to a contact, but don't remove their subscription.
+ UnsubscribeFromContact(JID, oneshot::Sender<Result<(), WriteError>>),
+ /// stop a contact from being subscribed, but stay subscribed to the contact.
+ UnsubscribeContact(JID, oneshot::Sender<Result<(), WriteError>>),
+ /// remove subscriptions to and from contact, but keep in roster.
+ UnfriendContact(JID, oneshot::Sender<Result<(), WriteError>>),
+ /// remove a contact from the contact list. will remove subscriptions if not already done then delete contact from roster.
+ DeleteContact(JID, oneshot::Sender<Result<(), RosterError>>),
+ /// update contact. contact details will be overwritten with the contents of the contactupdate struct.
+ UpdateContact(JID, ContactUpdate, oneshot::Sender<Result<(), RosterError>>),
+ /// set online status. if disconnected, will be cached so when client connects, will be sent as the initial presence.
+ SetStatus(Online, oneshot::Sender<Result<(), StatusError>>),
+ /// send presence stanza
+ // TODO: cache presence stanza
+ SendPresence(
+ Option<JID>,
+ PresenceType,
+ oneshot::Sender<Result<(), WriteError>>,
+ ),
+ /// send a directed presence (usually to a non-contact).
+ // TODO: should probably make it so people can add non-contact auto presence sharing in the client (most likely through setting an internal setting)
+ /// send a message to a jid (any kind of jid that can receive a message, e.g. a user or a
+ /// chatroom). if disconnected, will be cached so when client connects, message will be sent.
+ SendMessage(JID, Body, oneshot::Sender<Result<(), WriteError>>),
+}
+/// an xmpp client that is suited for a chat client use case
+#[derive(Debug)]
+pub struct Client {
+ sender: mpsc::Sender<CoreClientCommand<Command>>,
+ timeout: Duration,
+}
+
+impl Clone for Client {
+ fn clone(&self) -> Self {
+ Self {
+ sender: self.sender.clone(),
+ timeout: self.timeout,
+ }
+ }
+}
+
+impl Deref for Client {
+ type Target = mpsc::Sender<CoreClientCommand<Command>>;
+
+ fn deref(&self) -> &Self::Target {
+ &self.sender
+ }
+}
+
+impl DerefMut for Client {
+ fn deref_mut(&mut self) -> &mut Self::Target {
+ &mut self.sender
+ }
+}
+
+impl Client {
+ pub async fn connect(&self) -> Result<(), ActorError> {
+ self.send(CoreClientCommand::Connect).await?;
+ Ok(())
+ }
+
+ pub async fn disconnect(&self, offline: Offline) -> Result<(), ActorError> {
+ self.send(CoreClientCommand::Disconnect).await?;
+ Ok(())
+ }
+
+ pub fn new(jid: JID, password: String, db: Db) -> (Self, mpsc::Receiver<UpdateMessage>) {
+ let (command_sender, command_receiver) = mpsc::channel(20);
+ let (update_send, update_recv) = mpsc::channel(20);
+
+ // might be bad, first supervisor shutdown notification oneshot is never used (disgusting)
+ let (_sup_send, sup_recv) = oneshot::channel();
+ let sup_recv = sup_recv.fuse();
+
+ let logic = ClientLogic {
+ db,
+ pending: Arc::new(Mutex::new(HashMap::new())),
+ update_sender: update_send,
+ };
+
+ let actor: CoreClient<ClientLogic> =
+ CoreClient::new(jid, password, command_receiver, None, sup_recv, logic);
+ tokio::spawn(async move { actor.run().await });
+
+ (
+ Self {
+ sender: command_sender,
+ // TODO: configure timeout
+ timeout: Duration::from_secs(10),
+ },
+ update_recv,
+ )
+ }
+
+ pub async fn get_roster(&self) -> Result<Vec<Contact>, CommandError<RosterError>> {
+ let (send, recv) = oneshot::channel();
+ self.send(CoreClientCommand::Command(Command::GetRoster(send)))
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
+ let roster = timeout(self.timeout, recv)
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
+ Ok(roster)
+ }
+
+ pub async fn get_chats(&self) -> Result<Vec<Chat>, CommandError<DatabaseError>> {
+ let (send, recv) = oneshot::channel();
+ self.send(CoreClientCommand::Command(Command::GetChats(send)))
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
+ let chats = timeout(self.timeout, recv)
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
+ Ok(chats)
+ }
+
+ pub async fn get_chats_ordered(&self) -> Result<Vec<Chat>, CommandError<DatabaseError>> {
+ let (send, recv) = oneshot::channel();
+ self.send(CoreClientCommand::Command(Command::GetChatsOrdered(send)))
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
+ let chats = timeout(self.timeout, recv)
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
+ Ok(chats)
+ }
+
+ pub async fn get_chats_ordered_with_latest_messages(
+ &self,
+ ) -> Result<Vec<(Chat, Message)>, CommandError<DatabaseError>> {
+ let (send, recv) = oneshot::channel();
+ self.send(CoreClientCommand::Command(
+ Command::GetChatsOrderedWithLatestMessages(send),
+ ))
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
+ let chats = timeout(self.timeout, recv)
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
+ Ok(chats)
+ }
+
+ pub async fn get_chat(&self, jid: JID) -> Result<Chat, CommandError<DatabaseError>> {
+ let (send, recv) = oneshot::channel();
+ self.send(CoreClientCommand::Command(Command::GetChat(jid, send)))
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
+ let chat = timeout(self.timeout, recv)
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
+ Ok(chat)
+ }
+
+ pub async fn get_messages(
+ &self,
+ jid: JID,
+ ) -> Result<Vec<Message>, CommandError<DatabaseError>> {
+ let (send, recv) = oneshot::channel();
+ self.send(CoreClientCommand::Command(Command::GetMessages(jid, send)))
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
+ let messages = timeout(self.timeout, recv)
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
+ Ok(messages)
+ }
+
+ pub async fn delete_chat(&self, jid: JID) -> Result<(), CommandError<DatabaseError>> {
+ let (send, recv) = oneshot::channel();
+ self.send(CoreClientCommand::Command(Command::DeleteChat(jid, send)))
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
+ let result = timeout(self.timeout, recv)
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
+ Ok(result)
+ }
+
+ pub async fn delete_message(&self, id: Uuid) -> Result<(), CommandError<DatabaseError>> {
+ let (send, recv) = oneshot::channel();
+ self.send(CoreClientCommand::Command(Command::DeleteMessage(id, send)))
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
+ let result = timeout(self.timeout, recv)
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
+ Ok(result)
+ }
+
+ pub async fn get_user(&self, jid: JID) -> Result<User, CommandError<DatabaseError>> {
+ let (send, recv) = oneshot::channel();
+ self.send(CoreClientCommand::Command(Command::GetUser(jid, send)))
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
+ let result = timeout(self.timeout, recv)
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
+ Ok(result)
+ }
+
+ pub async fn add_contact(&self, jid: JID) -> Result<(), CommandError<RosterError>> {
+ let (send, recv) = oneshot::channel();
+ self.send(CoreClientCommand::Command(Command::AddContact(jid, send)))
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
+ let result = timeout(self.timeout, recv)
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
+ Ok(result)
+ }
+
+ pub async fn buddy_request(&self, jid: JID) -> Result<(), CommandError<WriteError>> {
+ let (send, recv) = oneshot::channel();
+ self.send(CoreClientCommand::Command(Command::BuddyRequest(jid, send)))
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
+ let result = timeout(self.timeout, recv)
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
+ Ok(result)
+ }
+
+ pub async fn subscription_request(&self, jid: JID) -> Result<(), CommandError<WriteError>> {
+ let (send, recv) = oneshot::channel();
+ self.send(CoreClientCommand::Command(Command::SubscriptionRequest(
+ jid, send,
+ )))
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
+ let result = timeout(self.timeout, recv)
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
+ Ok(result)
+ }
+
+ pub async fn accept_buddy_request(&self, jid: JID) -> Result<(), CommandError<WriteError>> {
+ let (send, recv) = oneshot::channel();
+ self.send(CoreClientCommand::Command(Command::AcceptBuddyRequest(
+ jid, send,
+ )))
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
+ let result = timeout(self.timeout, recv)
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
+ Ok(result)
+ }
+
+ pub async fn accept_subscription_request(
+ &self,
+ jid: JID,
+ ) -> Result<(), CommandError<WriteError>> {
+ let (send, recv) = oneshot::channel();
+ self.send(CoreClientCommand::Command(
+ Command::AcceptSubscriptionRequest(jid, send),
+ ))
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
+ let result = timeout(self.timeout, recv)
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
+ Ok(result)
+ }
+
+ pub async fn unsubscribe_from_contact(&self, jid: JID) -> Result<(), CommandError<WriteError>> {
+ let (send, recv) = oneshot::channel();
+ self.send(CoreClientCommand::Command(Command::UnsubscribeFromContact(
+ jid, send,
+ )))
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
+ let result = timeout(self.timeout, recv)
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
+ Ok(result)
+ }
+
+ pub async fn unsubscribe_contact(&self, jid: JID) -> Result<(), CommandError<WriteError>> {
+ let (send, recv) = oneshot::channel();
+ self.send(CoreClientCommand::Command(Command::UnsubscribeContact(
+ jid, send,
+ )))
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
+ let result = timeout(self.timeout, recv)
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
+ Ok(result)
+ }
+
+ pub async fn unfriend_contact(&self, jid: JID) -> Result<(), CommandError<WriteError>> {
+ let (send, recv) = oneshot::channel();
+ self.send(CoreClientCommand::Command(Command::UnfriendContact(
+ jid, send,
+ )))
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
+ let result = timeout(self.timeout, recv)
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
+ Ok(result)
+ }
+
+ pub async fn delete_contact(&self, jid: JID) -> Result<(), CommandError<RosterError>> {
+ let (send, recv) = oneshot::channel();
+ self.send(CoreClientCommand::Command(Command::DeleteContact(
+ jid, send,
+ )))
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
+ let result = timeout(self.timeout, recv)
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
+ Ok(result)
+ }
+
+ pub async fn update_contact(
+ &self,
+ jid: JID,
+ update: ContactUpdate,
+ ) -> Result<(), CommandError<RosterError>> {
+ let (send, recv) = oneshot::channel();
+ self.send(CoreClientCommand::Command(Command::UpdateContact(
+ jid, update, send,
+ )))
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
+ let result = timeout(self.timeout, recv)
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
+ Ok(result)
+ }
+
+ pub async fn set_status(&self, online: Online) -> Result<(), CommandError<StatusError>> {
+ let (send, recv) = oneshot::channel();
+ self.send(CoreClientCommand::Command(Command::SetStatus(online, send)))
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
+ let result = timeout(self.timeout, recv)
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
+ Ok(result)
+ }
+
+ pub async fn send_message(&self, jid: JID, body: Body) -> Result<(), CommandError<WriteError>> {
+ let (send, recv) = oneshot::channel();
+ self.send(CoreClientCommand::Command(Command::SendMessage(
+ jid, body, send,
+ )))
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
+ let result = timeout(self.timeout, recv)
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
+ Ok(result)
+ }
+}
+
+#[derive(Clone)]
+pub struct ClientLogic {
+ db: Db,
+ pending: Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, ReadError>>>>>,
+ update_sender: mpsc::Sender<UpdateMessage>,
+}
+
+impl Logic for ClientLogic {
+ type Cmd = Command;
+
+ async fn handle_connect(self, connection: Connected) {
+ let (send, recv) = oneshot::channel();
+ debug!("getting roster");
+ self.clone()
+ .handle_online(Command::GetRoster(send), connection.clone())
+ .await;
+ debug!("sent roster req");
+ let roster = recv.await;
+ debug!("got roster");
+ match roster {
+ Ok(r) => match r {
+ Ok(roster) => {
+ let online = self.db.read_cached_status().await;
+ let online = match online {
+ Ok(online) => online,
+ Err(e) => {
+ let _ = self
+ .update_sender
+ .send(UpdateMessage::Error(Error::Connecting(
+ ConnectionJobError::StatusCacheError(e.into()),
+ )))
+ .await;
+ Online::default()
+ }
+ };
+ let (send, recv) = oneshot::channel();
+ self.clone()
+ .handle_online(
+ Command::SendPresence(None, PresenceType::Online(online.clone()), send),
+ connection,
+ )
+ .await;
+ let set_status = recv.await;
+ match set_status {
+ Ok(s) => match s {
+ Ok(()) => {
+ let _ = self
+ .update_sender
+ .send(UpdateMessage::Online(online, roster))
+ .await;
+ }
+ Err(e) => {
+ let _ = self
+ .update_sender
+ .send(UpdateMessage::Error(Error::Connecting(e.into())))
+ .await;
+ }
+ },
+ Err(e) => {
+ let _ = self
+ .update_sender
+ .send(UpdateMessage::Error(Error::Connecting(
+ ConnectionJobError::SendPresence(WriteError::Actor(e.into())),
+ )))
+ .await;
+ }
+ }
+ }
+ Err(e) => {
+ let _ = self
+ .update_sender
+ .send(UpdateMessage::Error(Error::Connecting(e.into())))
+ .await;
+ }
+ },
+ Err(e) => {
+ let _ = self
+ .update_sender
+ .send(UpdateMessage::Error(Error::Connecting(
+ ConnectionJobError::RosterRetreival(RosterError::Write(WriteError::Actor(
+ e.into(),
+ ))),
+ )))
+ .await;
+ }
+ }
+ }
+
+ async fn handle_disconnect(self, connection: Connected) {
+ // TODO: be able to set offline status message
+ let offline_presence: stanza::client::presence::Presence =
+ Offline::default().into_stanza(None);
+ let stanza = Stanza::Presence(offline_presence);
+ // TODO: timeout and error check
+ connection.write_handle().write(stanza).await;
+ let _ = self
+ .update_sender
+ .send(UpdateMessage::Offline(Offline::default()))
+ .await;
+ }
+
+ async fn handle_stanza(
+ self,
+ stanza: Stanza,
+ connection: Connected,
+ supervisor: SupervisorSender,
+ ) {
+ match stanza {
+ Stanza::Message(stanza_message) => {
+ if let Some(mut from) = stanza_message.from {
+ // TODO: don't ignore delay from. xep says SHOULD send error if incorrect.
+ let timestamp = stanza_message
+ .delay
+ .map(|delay| delay.stamp)
+ .unwrap_or_else(|| Utc::now());
+ // TODO: group chat messages
+ let mut message = Message {
+ id: stanza_message
+ .id
+ // TODO: proper id storage
+ .map(|id| Uuid::from_str(&id).unwrap_or_else(|_| Uuid::new_v4()))
+ .unwrap_or_else(|| Uuid::new_v4()),
+ from: from.clone(),
+ timestamp,
+ body: Body {
+ // TODO: should this be an option?
+ body: stanza_message
+ .body
+ .map(|body| body.body)
+ .unwrap_or_default()
+ .unwrap_or_default(),
+ },
+ };
+ // TODO: can this be more efficient?
+ let result = self
+ .db
+ .create_message_with_user_resource_and_chat(message.clone(), from.clone())
+ .await;
+ if let Err(e) = result {
+ tracing::error!("messagecreate");
+ let _ = self
+ .update_sender
+ .send(UpdateMessage::Error(Error::MessageRecv(
+ MessageRecvError::MessageHistory(e.into()),
+ )))
+ .await;
+ }
+ message.from = message.from.as_bare();
+ from = from.as_bare();
+ let _ = self
+ .update_sender
+ .send(UpdateMessage::Message { to: from, message })
+ .await;
+ } else {
+ let _ = self
+ .update_sender
+ .send(UpdateMessage::Error(Error::MessageRecv(
+ MessageRecvError::MissingFrom,
+ )))
+ .await;
+ }
+ }
+ Stanza::Presence(presence) => {
+ if let Some(from) = presence.from {
+ match presence.r#type {
+ Some(r#type) => match r#type {
+ // error processing a presence from somebody
+ stanza::client::presence::PresenceType::Error => {
+ // TODO: is there any other information that should go with the error? also MUST have an error, otherwise it's a different error. maybe it shoulnd't be an option.
+ let _ = self
+ .update_sender
+ .send(UpdateMessage::Error(Error::Presence(
+ // TODO: ughhhhhhhhhhhhh these stanza errors should probably just have an option, and custom display
+ PresenceError::StanzaError(
+ presence
+ .errors
+ .first()
+ .cloned()
+ .expect("error MUST have error"),
+ ),
+ )))
+ .await;
+ }
+ // should not happen (error to server)
+ stanza::client::presence::PresenceType::Probe => {
+ // TODO: should probably write an error and restart stream
+ let _ = self
+ .update_sender
+ .send(UpdateMessage::Error(Error::Presence(
+ PresenceError::Unsupported,
+ )))
+ .await;
+ }
+ stanza::client::presence::PresenceType::Subscribe => {
+ // may get a subscription request from somebody who is not a contact!!! therefore should be its own kind of event
+ let _ = self
+ .update_sender
+ .send(UpdateMessage::SubscriptionRequest(from))
+ .await;
+ }
+ stanza::client::presence::PresenceType::Unavailable => {
+ let offline = Offline {
+ status: presence.status.map(|status| status.status.0),
+ };
+ let timestamp = presence
+ .delay
+ .map(|delay| delay.stamp)
+ .unwrap_or_else(|| Utc::now());
+ let _ = self
+ .update_sender
+ .send(UpdateMessage::Presence {
+ from,
+ presence: Presence {
+ timestamp,
+ presence: PresenceType::Offline(offline),
+ },
+ })
+ .await;
+ }
+ // for now, do nothing, as these are simply informational. will receive roster push from the server regarding the changes to do with them.
+ stanza::client::presence::PresenceType::Subscribed => {}
+ stanza::client::presence::PresenceType::Unsubscribe => {}
+ stanza::client::presence::PresenceType::Unsubscribed => {}
+ },
+ None => {
+ let online = Online {
+ show: presence.show.map(|show| match show {
+ stanza::client::presence::Show::Away => Show::Away,
+ stanza::client::presence::Show::Chat => Show::Chat,
+ stanza::client::presence::Show::Dnd => Show::DoNotDisturb,
+ stanza::client::presence::Show::Xa => Show::ExtendedAway,
+ }),
+ status: presence.status.map(|status| status.status.0),
+ priority: presence.priority.map(|priority| priority.0),
+ };
+ let timestamp = presence
+ .delay
+ .map(|delay| delay.stamp)
+ .unwrap_or_else(|| Utc::now());
+ let _ = self
+ .update_sender
+ .send(UpdateMessage::Presence {
+ from,
+ presence: Presence {
+ timestamp,
+ presence: PresenceType::Online(online),
+ },
+ })
+ .await;
+ }
+ }
+ } else {
+ let _ = self
+ .update_sender
+ .send(UpdateMessage::Error(Error::Presence(
+ PresenceError::MissingFrom,
+ )))
+ .await;
+ }
+ }
+ Stanza::Iq(iq) => match iq.r#type {
+ stanza::client::iq::IqType::Error | stanza::client::iq::IqType::Result => {
+ let send;
+ {
+ send = self.pending.lock().await.remove(&iq.id);
+ }
+ if let Some(send) = send {
+ send.send(Ok(Stanza::Iq(iq)));
+ } else {
+ let _ = self
+ .update_sender
+ .send(UpdateMessage::Error(Error::Iq(IqError::NoMatchingId(
+ iq.id,
+ ))))
+ .await;
+ }
+ }
+ // TODO: send unsupported to server
+ // TODO: proper errors i am so tired please
+ stanza::client::iq::IqType::Get => {}
+ stanza::client::iq::IqType::Set => {
+ if let Some(query) = iq.query {
+ match query {
+ stanza::client::iq::Query::Roster(mut query) => {
+ // TODO: there should only be one
+ if let Some(item) = query.items.pop() {
+ match item.subscription {
+ Some(stanza::roster::Subscription::Remove) => {
+ self.db.delete_contact(item.jid.clone()).await;
+ self.update_sender
+ .send(UpdateMessage::RosterDelete(item.jid))
+ .await;
+ // TODO: send result
+ }
+ _ => {
+ let contact: Contact = item.into();
+ if let Err(e) =
+ self.db.upsert_contact(contact.clone()).await
+ {
+ let _ = self
+ .update_sender
+ .send(UpdateMessage::Error(Error::Roster(
+ RosterError::Cache(e.into()),
+ )))
+ .await;
+ }
+ let _ = self
+ .update_sender
+ .send(UpdateMessage::RosterUpdate(contact))
+ .await;
+ // TODO: send result
+ // write_handle.write(Stanza::Iq(stanza::client::iq::Iq {
+ // from: ,
+ // id: todo!(),
+ // to: todo!(),
+ // r#type: todo!(),
+ // lang: todo!(),
+ // query: todo!(),
+ // errors: todo!(),
+ // }));
+ }
+ }
+ }
+ }
+ // TODO: send unsupported to server
+ _ => {}
+ }
+ } else {
+ // TODO: send error (unsupported) to server
+ }
+ }
+ },
+ Stanza::Error(error) => {
+ let _ = self
+ .update_sender
+ .send(UpdateMessage::Error(Error::Stream(error)))
+ .await;
+ // TODO: reconnect
+ }
+ Stanza::OtherContent(content) => {
+ let _ = self
+ .update_sender
+ .send(UpdateMessage::Error(Error::UnrecognizedContent));
+ // TODO: send error to write_thread
+ }
+ }
+ }
+
+ async fn handle_online(self, command: Command, connection: Connected) {
+ match command {
+ Command::GetRoster(result_sender) => {
+ // TODO: jid resource should probably be stored within the connection
+ debug!("before client_jid lock");
+ debug!("after client_jid lock");
+ let iq_id = Uuid::new_v4().to_string();
+ let (send, iq_recv) = oneshot::channel();
+ {
+ self.pending.lock().await.insert(iq_id.clone(), send);
+ }
+ let stanza = Stanza::Iq(Iq {
+ from: Some(connection.jid().clone()),
+ id: iq_id.to_string(),
+ to: None,
+ r#type: IqType::Get,
+ lang: None,
+ query: Some(iq::Query::Roster(stanza::roster::Query {
+ ver: None,
+ items: Vec::new(),
+ })),
+ errors: Vec::new(),
+ });
+ let (send, recv) = oneshot::channel();
+ let _ = connection
+ .write_handle()
+ .send(WriteMessage {
+ stanza,
+ respond_to: send,
+ })
+ .await;
+ // TODO: timeout
+ match recv.await {
+ Ok(Ok(())) => info!("roster request sent"),
+ Ok(Err(e)) => {
+ // TODO: log errors if fail to send
+ let _ = result_sender.send(Err(RosterError::Write(e.into())));
+ return;
+ }
+ Err(e) => {
+ let _ = result_sender
+ .send(Err(RosterError::Write(WriteError::Actor(e.into()))));
+ return;
+ }
+ };
+ // TODO: timeout
+ match iq_recv.await {
+ Ok(Ok(stanza)) => match stanza {
+ Stanza::Iq(Iq {
+ from: _,
+ id,
+ to: _,
+ r#type,
+ lang: _,
+ query: Some(iq::Query::Roster(stanza::roster::Query { ver: _, items })),
+ errors: _,
+ }) if id == iq_id && r#type == IqType::Result => {
+ let contacts: Vec<Contact> =
+ items.into_iter().map(|item| item.into()).collect();
+ if let Err(e) = self.db.replace_cached_roster(contacts.clone()).await {
+ self.update_sender
+ .send(UpdateMessage::Error(Error::Roster(RosterError::Cache(
+ e.into(),
+ ))))
+ .await;
+ };
+ result_sender.send(Ok(contacts));
+ return;
+ }
+ ref s @ Stanza::Iq(Iq {
+ from: _,
+ ref id,
+ to: _,
+ r#type,
+ lang: _,
+ query: _,
+ ref errors,
+ }) if *id == iq_id && r#type == IqType::Error => {
+ if let Some(error) = errors.first() {
+ result_sender.send(Err(RosterError::StanzaError(error.clone())));
+ } else {
+ result_sender.send(Err(RosterError::UnexpectedStanza(s.clone())));
+ }
+ return;
+ }
+ s => {
+ result_sender.send(Err(RosterError::UnexpectedStanza(s)));
+ return;
+ }
+ },
+ Ok(Err(e)) => {
+ result_sender.send(Err(RosterError::Read(e)));
+ return;
+ }
+ Err(e) => {
+ result_sender.send(Err(RosterError::Write(WriteError::Actor(e.into()))));
+ return;
+ }
+ }
+ }
+ Command::GetChats(sender) => {
+ let chats = self.db.read_chats().await.map_err(|e| e.into());
+ sender.send(chats);
+ }
+ Command::GetChatsOrdered(sender) => {
+ let chats = self.db.read_chats_ordered().await.map_err(|e| e.into());
+ sender.send(chats);
+ }
+ Command::GetChatsOrderedWithLatestMessages(sender) => {
+ let chats = self
+ .db
+ .read_chats_ordered_with_latest_messages()
+ .await
+ .map_err(|e| e.into());
+ sender.send(chats);
+ }
+ Command::GetChat(jid, sender) => {
+ let chats = self.db.read_chat(jid).await.map_err(|e| e.into());
+ sender.send(chats);
+ }
+ Command::GetMessages(jid, sender) => {
+ let messages = self
+ .db
+ .read_message_history(jid)
+ .await
+ .map_err(|e| e.into());
+ sender.send(messages);
+ }
+ Command::DeleteChat(jid, sender) => {
+ let result = self.db.delete_chat(jid).await.map_err(|e| e.into());
+ sender.send(result);
+ }
+ Command::DeleteMessage(uuid, sender) => {
+ let result = self.db.delete_message(uuid).await.map_err(|e| e.into());
+ sender.send(result);
+ }
+ Command::GetUser(jid, sender) => {
+ let user = self.db.read_user(jid).await.map_err(|e| e.into());
+ sender.send(user);
+ }
+ // TODO: offline queue to modify roster
+ Command::AddContact(jid, sender) => {
+ let iq_id = Uuid::new_v4().to_string();
+ let set_stanza = Stanza::Iq(Iq {
+ from: Some(connection.jid().clone()),
+ id: iq_id.clone(),
+ to: None,
+ r#type: IqType::Set,
+ lang: None,
+ query: Some(iq::Query::Roster(stanza::roster::Query {
+ ver: None,
+ items: vec![stanza::roster::Item {
+ approved: None,
+ ask: false,
+ jid,
+ name: None,
+ subscription: None,
+ groups: Vec::new(),
+ }],
+ })),
+ errors: Vec::new(),
+ });
+ let (send, recv) = oneshot::channel();
+ {
+ self.pending.lock().await.insert(iq_id.clone(), send);
+ }
+ // TODO: write_handle send helper function
+ let result = connection.write_handle().write(set_stanza).await;
+ if let Err(e) = result {
+ sender.send(Err(RosterError::Write(e)));
+ return;
+ }
+ let iq_result = recv.await;
+ match iq_result {
+ Ok(i) => match i {
+ Ok(iq_result) => match iq_result {
+ Stanza::Iq(Iq {
+ from: _,
+ id,
+ to: _,
+ r#type,
+ lang: _,
+ query: _,
+ errors: _,
+ }) if id == iq_id && r#type == IqType::Result => {
+ sender.send(Ok(()));
+ return;
+ }
+ ref s @ Stanza::Iq(Iq {
+ from: _,
+ ref id,
+ to: _,
+ r#type,
+ lang: _,
+ query: _,
+ ref errors,
+ }) if *id == iq_id && r#type == IqType::Error => {
+ if let Some(error) = errors.first() {
+ sender.send(Err(RosterError::StanzaError(error.clone())));
+ } else {
+ sender.send(Err(RosterError::UnexpectedStanza(s.clone())));
+ }
+ return;
+ }
+ s => {
+ sender.send(Err(RosterError::UnexpectedStanza(s)));
+ return;
+ }
+ },
+ Err(e) => {
+ sender.send(Err(e.into()));
+ return;
+ }
+ },
+ Err(e) => {
+ sender.send(Err(RosterError::Write(WriteError::Actor(e.into()))));
+ return;
+ }
+ }
+ }
+ Command::BuddyRequest(jid, sender) => {
+ let presence = Stanza::Presence(stanza::client::presence::Presence {
+ from: None,
+ id: None,
+ to: Some(jid.clone()),
+ r#type: Some(stanza::client::presence::PresenceType::Subscribe),
+ lang: None,
+ show: None,
+ status: None,
+ priority: None,
+ errors: Vec::new(),
+ delay: None,
+ });
+ let result = connection.write_handle().write(presence).await;
+ match result {
+ Err(_) => {
+ let _ = sender.send(result);
+ }
+ Ok(()) => {
+ let presence = Stanza::Presence(stanza::client::presence::Presence {
+ from: None,
+ id: None,
+ to: Some(jid),
+ r#type: Some(stanza::client::presence::PresenceType::Subscribed),
+ lang: None,
+ show: None,
+ status: None,
+ priority: None,
+ errors: Vec::new(),
+ delay: None,
+ });
+ let result = connection.write_handle().write(presence).await;
+ let _ = sender.send(result);
+ }
+ }
+ }
+ Command::SubscriptionRequest(jid, sender) => {
+ // TODO: i should probably have builders
+ let presence = Stanza::Presence(stanza::client::presence::Presence {
+ from: None,
+ id: None,
+ to: Some(jid),
+ r#type: Some(stanza::client::presence::PresenceType::Subscribe),
+ lang: None,
+ show: None,
+ status: None,
+ priority: None,
+ errors: Vec::new(),
+ delay: None,
+ });
+ let result = connection.write_handle().write(presence).await;
+ let _ = sender.send(result);
+ }
+ Command::AcceptBuddyRequest(jid, sender) => {
+ let presence = Stanza::Presence(stanza::client::presence::Presence {
+ from: None,
+ id: None,
+ to: Some(jid.clone()),
+ r#type: Some(stanza::client::presence::PresenceType::Subscribed),
+ lang: None,
+ show: None,
+ status: None,
+ priority: None,
+ errors: Vec::new(),
+ delay: None,
+ });
+ let result = connection.write_handle().write(presence).await;
+ match result {
+ Err(_) => {
+ let _ = sender.send(result);
+ }
+ Ok(()) => {
+ let presence = Stanza::Presence(stanza::client::presence::Presence {
+ from: None,
+ id: None,
+ to: Some(jid),
+ r#type: Some(stanza::client::presence::PresenceType::Subscribe),
+ lang: None,
+ show: None,
+ status: None,
+ priority: None,
+ errors: Vec::new(),
+ delay: None,
+ });
+ let result = connection.write_handle().write(presence).await;
+ let _ = sender.send(result);
+ }
+ }
+ }
+ Command::AcceptSubscriptionRequest(jid, sender) => {
+ let presence = Stanza::Presence(stanza::client::presence::Presence {
+ from: None,
+ id: None,
+ to: Some(jid),
+ r#type: Some(stanza::client::presence::PresenceType::Subscribe),
+ lang: None,
+ show: None,
+ status: None,
+ priority: None,
+ errors: Vec::new(),
+ delay: None,
+ });
+ let result = connection.write_handle().write(presence).await;
+ let _ = sender.send(result);
+ }
+ Command::UnsubscribeFromContact(jid, sender) => {
+ let presence = Stanza::Presence(stanza::client::presence::Presence {
+ from: None,
+ id: None,
+ to: Some(jid),
+ r#type: Some(stanza::client::presence::PresenceType::Unsubscribe),
+ lang: None,
+ show: None,
+ status: None,
+ priority: None,
+ errors: Vec::new(),
+ delay: None,
+ });
+ let result = connection.write_handle().write(presence).await;
+ let _ = sender.send(result);
+ }
+ Command::UnsubscribeContact(jid, sender) => {
+ let presence = Stanza::Presence(stanza::client::presence::Presence {
+ from: None,
+ id: None,
+ to: Some(jid),
+ r#type: Some(stanza::client::presence::PresenceType::Unsubscribed),
+ lang: None,
+ show: None,
+ status: None,
+ priority: None,
+ errors: Vec::new(),
+ delay: None,
+ });
+ let result = connection.write_handle().write(presence).await;
+ let _ = sender.send(result);
+ }
+ Command::UnfriendContact(jid, sender) => {
+ let presence = Stanza::Presence(stanza::client::presence::Presence {
+ from: None,
+ id: None,
+ to: Some(jid.clone()),
+ r#type: Some(stanza::client::presence::PresenceType::Unsubscribe),
+ lang: None,
+ show: None,
+ status: None,
+ priority: None,
+ errors: Vec::new(),
+ delay: None,
+ });
+ let result = connection.write_handle().write(presence).await;
+ match result {
+ Err(_) => {
+ let _ = sender.send(result);
+ }
+ Ok(()) => {
+ let presence = Stanza::Presence(stanza::client::presence::Presence {
+ from: None,
+ id: None,
+ to: Some(jid),
+ r#type: Some(stanza::client::presence::PresenceType::Unsubscribed),
+ lang: None,
+ show: None,
+ status: None,
+ priority: None,
+ errors: Vec::new(),
+ delay: None,
+ });
+ let result = connection.write_handle().write(presence).await;
+ let _ = sender.send(result);
+ }
+ }
+ }
+ Command::DeleteContact(jid, sender) => {
+ let iq_id = Uuid::new_v4().to_string();
+ let set_stanza = Stanza::Iq(Iq {
+ from: Some(connection.jid().clone()),
+ id: iq_id.clone(),
+ to: None,
+ r#type: IqType::Set,
+ lang: None,
+ query: Some(iq::Query::Roster(stanza::roster::Query {
+ ver: None,
+ items: vec![stanza::roster::Item {
+ approved: None,
+ ask: false,
+ jid,
+ name: None,
+ subscription: Some(stanza::roster::Subscription::Remove),
+ groups: Vec::new(),
+ }],
+ })),
+ errors: Vec::new(),
+ });
+ let (send, recv) = oneshot::channel();
+ {
+ self.pending.lock().await.insert(iq_id.clone(), send);
+ }
+ let result = connection.write_handle().write(set_stanza).await;
+ if let Err(e) = result {
+ sender.send(Err(RosterError::Write(e)));
+ return;
+ }
+ let iq_result = recv.await;
+ match iq_result {
+ Ok(i) => match i {
+ Ok(iq_result) => match iq_result {
+ Stanza::Iq(Iq {
+ from: _,
+ id,
+ to: _,
+ r#type,
+ lang: _,
+ query: _,
+ errors: _,
+ }) if id == iq_id && r#type == IqType::Result => {
+ sender.send(Ok(()));
+ return;
+ }
+ ref s @ Stanza::Iq(Iq {
+ from: _,
+ ref id,
+ to: _,
+ r#type,
+ lang: _,
+ query: _,
+ ref errors,
+ }) if *id == iq_id && r#type == IqType::Error => {
+ if let Some(error) = errors.first() {
+ sender.send(Err(RosterError::StanzaError(error.clone())));
+ } else {
+ sender.send(Err(RosterError::UnexpectedStanza(s.clone())));
+ }
+ return;
+ }
+ s => {
+ sender.send(Err(RosterError::UnexpectedStanza(s)));
+ return;
+ }
+ },
+ Err(e) => {
+ sender.send(Err(e.into()));
+ return;
+ }
+ },
+ Err(e) => {
+ sender.send(Err(RosterError::Write(WriteError::Actor(e.into()))));
+ return;
+ }
+ }
+ }
+ Command::UpdateContact(jid, contact_update, sender) => {
+ let iq_id = Uuid::new_v4().to_string();
+ let groups = Vec::from_iter(
+ contact_update
+ .groups
+ .into_iter()
+ .map(|group| stanza::roster::Group(Some(group))),
+ );
+ let set_stanza = Stanza::Iq(Iq {
+ from: Some(connection.jid().clone()),
+ id: iq_id.clone(),
+ to: None,
+ r#type: IqType::Set,
+ lang: None,
+ query: Some(iq::Query::Roster(stanza::roster::Query {
+ ver: None,
+ items: vec![stanza::roster::Item {
+ approved: None,
+ ask: false,
+ jid,
+ name: contact_update.name,
+ subscription: None,
+ groups,
+ }],
+ })),
+ errors: Vec::new(),
+ });
+ let (send, recv) = oneshot::channel();
+ {
+ self.pending.lock().await.insert(iq_id.clone(), send);
+ }
+ let result = connection.write_handle().write(set_stanza).await;
+ if let Err(e) = result {
+ sender.send(Err(RosterError::Write(e)));
+ return;
+ }
+ let iq_result = recv.await;
+ match iq_result {
+ Ok(i) => match i {
+ Ok(iq_result) => match iq_result {
+ Stanza::Iq(Iq {
+ from: _,
+ id,
+ to: _,
+ r#type,
+ lang: _,
+ query: _,
+ errors: _,
+ }) if id == iq_id && r#type == IqType::Result => {
+ sender.send(Ok(()));
+ return;
+ }
+ ref s @ Stanza::Iq(Iq {
+ from: _,
+ ref id,
+ to: _,
+ r#type,
+ lang: _,
+ query: _,
+ ref errors,
+ }) if *id == iq_id && r#type == IqType::Error => {
+ if let Some(error) = errors.first() {
+ sender.send(Err(RosterError::StanzaError(error.clone())));
+ } else {
+ sender.send(Err(RosterError::UnexpectedStanza(s.clone())));
+ }
+ return;
+ }
+ s => {
+ sender.send(Err(RosterError::UnexpectedStanza(s)));
+ return;
+ }
+ },
+ Err(e) => {
+ sender.send(Err(e.into()));
+ return;
+ }
+ },
+ Err(e) => {
+ sender.send(Err(RosterError::Write(WriteError::Actor(e.into()))));
+ return;
+ }
+ }
+ }
+ Command::SetStatus(online, sender) => {
+ let result = self.db.upsert_cached_status(online.clone()).await;
+ if let Err(e) = result {
+ let _ = self
+ .update_sender
+ .send(UpdateMessage::Error(Error::SetStatus(StatusError::Cache(
+ e.into(),
+ ))))
+ .await;
+ }
+ let result = connection
+ .write_handle()
+ .write(Stanza::Presence(online.into_stanza(None)))
+ .await
+ .map_err(|e| StatusError::Write(e));
+ // .map_err(|e| StatusError::Write(e));
+ let _ = sender.send(result);
+ }
+ // TODO: offline message queue
+ Command::SendMessage(jid, body, sender) => {
+ let id = Uuid::new_v4();
+ let message = Stanza::Message(stanza::client::message::Message {
+ from: Some(connection.jid().clone()),
+ id: Some(id.to_string()),
+ to: Some(jid.clone()),
+ // TODO: specify message type
+ r#type: stanza::client::message::MessageType::Chat,
+ // TODO: lang ?
+ lang: None,
+ subject: None,
+ body: Some(stanza::client::message::Body {
+ lang: None,
+ body: Some(body.body.clone()),
+ }),
+ thread: None,
+ delay: None,
+ });
+ let _ = sender.send(Ok(()));
+ // let _ = sender.send(Ok(message.clone()));
+ let result = connection.write_handle().write(message).await;
+ match result {
+ Ok(_) => {
+ let mut message = Message {
+ id,
+ from: connection.jid().clone(),
+ body,
+ timestamp: Utc::now(),
+ };
+ info!("send message {:?}", message);
+ if let Err(e) = self
+ .db
+ .create_message_with_self_resource_and_chat(
+ message.clone(),
+ jid.clone(),
+ )
+ .await
+ .map_err(|e| e.into())
+ {
+ tracing::error!("{}", e);
+ let _ =
+ self.update_sender
+ .send(UpdateMessage::Error(Error::MessageSend(
+ error::MessageSendError::MessageHistory(e),
+ )));
+ }
+ // TODO: don't do this, have separate from from details
+ message.from = message.from.as_bare();
+ let _ = self
+ .update_sender
+ .send(UpdateMessage::Message { to: jid, message })
+ .await;
+ }
+ Err(_) => {
+ // let _ = sender.send(result);
+ }
+ }
+ }
+ Command::SendPresence(jid, presence, sender) => {
+ let mut presence: stanza::client::presence::Presence = presence.into();
+ if let Some(jid) = jid {
+ presence.to = Some(jid);
+ };
+ let result = connection
+ .write_handle()
+ .write(Stanza::Presence(presence))
+ .await;
+ // .map_err(|e| StatusError::Write(e));
+ let _ = sender.send(result);
+ }
+ }
+ }
+
+ async fn handle_offline(self, command: Command) {
+ match command {
+ Command::GetRoster(sender) => {
+ let roster = self.db.read_cached_roster().await;
+ match roster {
+ Ok(roster) => {
+ let _ = sender.send(Ok(roster));
+ }
+ Err(e) => {
+ let _ = sender.send(Err(RosterError::Cache(e.into())));
+ }
+ }
+ }
+ Command::GetChats(sender) => {
+ let chats = self.db.read_chats().await.map_err(|e| e.into());
+ sender.send(chats);
+ }
+ Command::GetChatsOrdered(sender) => {
+ let chats = self.db.read_chats_ordered().await.map_err(|e| e.into());
+ sender.send(chats);
+ }
+ Command::GetChatsOrderedWithLatestMessages(sender) => {
+ let chats = self
+ .db
+ .read_chats_ordered_with_latest_messages()
+ .await
+ .map_err(|e| e.into());
+ sender.send(chats);
+ }
+ Command::GetChat(jid, sender) => {
+ let chats = self.db.read_chat(jid).await.map_err(|e| e.into());
+ sender.send(chats);
+ }
+ Command::GetMessages(jid, sender) => {
+ let messages = self
+ .db
+ .read_message_history(jid)
+ .await
+ .map_err(|e| e.into());
+ sender.send(messages);
+ }
+ Command::DeleteChat(jid, sender) => {
+ let result = self.db.delete_chat(jid).await.map_err(|e| e.into());
+ sender.send(result);
+ }
+ Command::DeleteMessage(uuid, sender) => {
+ let result = self.db.delete_message(uuid).await.map_err(|e| e.into());
+ sender.send(result);
+ }
+ Command::GetUser(jid, sender) => {
+ let user = self.db.read_user(jid).await.map_err(|e| e.into());
+ sender.send(user);
+ }
+ // TODO: offline queue to modify roster
+ Command::AddContact(_jid, sender) => {
+ sender.send(Err(RosterError::Write(WriteError::Disconnected)));
+ }
+ Command::BuddyRequest(_jid, sender) => {
+ sender.send(Err(WriteError::Disconnected));
+ }
+ Command::SubscriptionRequest(_jid, sender) => {
+ sender.send(Err(WriteError::Disconnected));
+ }
+ Command::AcceptBuddyRequest(_jid, sender) => {
+ sender.send(Err(WriteError::Disconnected));
+ }
+ Command::AcceptSubscriptionRequest(_jid, sender) => {
+ sender.send(Err(WriteError::Disconnected));
+ }
+ Command::UnsubscribeFromContact(_jid, sender) => {
+ sender.send(Err(WriteError::Disconnected));
+ }
+ Command::UnsubscribeContact(_jid, sender) => {
+ sender.send(Err(WriteError::Disconnected));
+ }
+ Command::UnfriendContact(_jid, sender) => {
+ sender.send(Err(WriteError::Disconnected));
+ }
+ Command::DeleteContact(_jid, sender) => {
+ sender.send(Err(RosterError::Write(WriteError::Disconnected)));
+ }
+ Command::UpdateContact(_jid, _contact_update, sender) => {
+ sender.send(Err(RosterError::Write(WriteError::Disconnected)));
+ }
+ Command::SetStatus(online, sender) => {
+ let result = self
+ .db
+ .upsert_cached_status(online)
+ .await
+ .map_err(|e| StatusError::Cache(e.into()));
+ sender.send(result);
+ }
+ // TODO: offline message queue
+ Command::SendMessage(_jid, _body, sender) => {
+ sender.send(Err(WriteError::Disconnected));
+ }
+ Command::SendPresence(_jid, _presence, sender) => {
+ sender.send(Err(WriteError::Disconnected));
+ }
+ }
+ }
+ // pub async fn handle_stream_error(self, error) {}
+ // stanza errors (recoverable)
+ // pub async fn handle_error(self, error: Error) {}
+ // when it aborts, must clear iq map no matter what
+ async fn on_abort(self) {
+ let mut iqs = self.pending.lock().await;
+ for (_id, sender) in iqs.drain() {
+ let _ = sender.send(Err(ReadError::LostConnection));
+ }
+ }
+
+ async fn handle_connection_error(self, error: ConnectionError) {
+ self.update_sender
+ .send(UpdateMessage::Error(
+ ConnectionError::AlreadyConnected.into(),
+ ))
+ .await;
+ }
+}
+
+impl From<Command> for CoreClientCommand<Command> {
+ fn from(value: Command) -> Self {
+ CoreClientCommand::Command(value)
+ }
+}
+
+#[derive(Debug, Clone)]
+pub enum UpdateMessage {
+ Error(Error),
+ Online(Online, Vec<Contact>),
+ Offline(Offline),
+ /// received roster from jabber server (replace full app roster state with this)
+ /// is this needed?
+ FullRoster(Vec<Contact>),
+ /// (only update app roster state, don't replace)
+ RosterUpdate(Contact),
+ RosterDelete(JID),
+ /// presences should be stored with users in the ui, not contacts, as presences can be received from anyone
+ Presence {
+ from: JID,
+ presence: Presence,
+ },
+ // TODO: receipts
+ // MessageDispatched(Uuid),
+ Message {
+ to: JID,
+ message: Message,
+ },
+ SubscriptionRequest(jid::JID),
+}