diff options
Diffstat (limited to '')
| -rw-r--r-- | filamento/Cargo.toml | 17 | ||||
| -rw-r--r-- | filamento/README.md | 3 | ||||
| -rw-r--r-- | filamento/filamento.db | bin | 0 -> 90112 bytes | |||
| -rw-r--r-- | filamento/migrations/20240113011930_luz.sql (renamed from luz/migrations/20240113011930_luz.sql) | 0 | ||||
| -rw-r--r-- | filamento/src/chat.rs (renamed from luz/src/chat.rs) | 0 | ||||
| -rw-r--r-- | filamento/src/db.rs (renamed from luz/src/db/mod.rs) | 0 | ||||
| -rw-r--r-- | filamento/src/error.rs | 142 | ||||
| -rw-r--r-- | filamento/src/lib.rs | 1598 | ||||
| -rw-r--r-- | filamento/src/presence.rs (renamed from luz/src/presence.rs) | 0 | ||||
| -rw-r--r-- | filamento/src/roster.rs (renamed from luz/src/roster.rs) | 0 | ||||
| -rw-r--r-- | filamento/src/user.rs (renamed from luz/src/user.rs) | 0 | 
11 files changed, 1760 insertions, 0 deletions
| diff --git a/filamento/Cargo.toml b/filamento/Cargo.toml new file mode 100644 index 0000000..e25024a --- /dev/null +++ b/filamento/Cargo.toml @@ -0,0 +1,17 @@ +[package] +name = "filamento" +version = "0.1.0" +edition = "2024" + +[dependencies] +futures = "0.3.31" +lampada = { version = "0.1.0", path = "../lampada" } +tokio = "1.42.0" +thiserror = "2.0.11" +stanza = { version = "0.1.0", path = "../stanza", features = ["xep_0203"] } +sqlx = { version = "0.8.3", features = ["sqlite", "runtime-tokio", "uuid", "chrono"] } +# TODO: re-export jid? +jid = { version = "0.1.0", path = "../jid", features = ["sqlx"] } +uuid = { version = "1.13.1", features = ["v4"] } +tracing = "0.1.41" +chrono = "0.4.40" diff --git a/filamento/README.md b/filamento/README.md new file mode 100644 index 0000000..57b4135 --- /dev/null +++ b/filamento/README.md @@ -0,0 +1,3 @@ +# filament + +a high-level xmpp chat client using luz diff --git a/filamento/filamento.db b/filamento/filamento.dbBinary files differ new file mode 100644 index 0000000..5c3c720 --- /dev/null +++ b/filamento/filamento.db diff --git a/luz/migrations/20240113011930_luz.sql b/filamento/migrations/20240113011930_luz.sql index 148598b..148598b 100644 --- a/luz/migrations/20240113011930_luz.sql +++ b/filamento/migrations/20240113011930_luz.sql diff --git a/luz/src/chat.rs b/filamento/src/chat.rs index c1194ea..c1194ea 100644 --- a/luz/src/chat.rs +++ b/filamento/src/chat.rs diff --git a/luz/src/db/mod.rs b/filamento/src/db.rs index aea40ac..aea40ac 100644 --- a/luz/src/db/mod.rs +++ b/filamento/src/db.rs diff --git a/filamento/src/error.rs b/filamento/src/error.rs new file mode 100644 index 0000000..996a503 --- /dev/null +++ b/filamento/src/error.rs @@ -0,0 +1,142 @@ +use std::sync::Arc; + +use lampada::error::{ConnectionError, ReadError, WriteError}; +use stanza::client::Stanza; +use thiserror::Error; + +pub use lampada::error::CommandError; + +// for the client logic impl +#[derive(Debug, Error, Clone)] +pub enum Error { +    #[error("core error: {0}")] +    Connection(#[from] ConnectionError), +    #[error("received unrecognized/unsupported content")] +    UnrecognizedContent, +    // TODO: include content +    // UnrecognizedContent(peanuts::element::Content), +    #[error("iq receive error: {0}")] +    Iq(IqError), +    // TODO: change to Connecting(ConnectingError) +    #[error("connecting: {0}")] +    Connecting(#[from] ConnectionJobError), +    #[error("presence: {0}")] +    Presence(#[from] PresenceError), +    #[error("set status: {0}")] +    SetStatus(#[from] StatusError), +    // TODO: have different ones for get/update/set +    #[error("roster: {0}")] +    Roster(RosterError), +    #[error("stream error: {0}")] +    Stream(#[from] stanza::stream::Error), +    #[error("message send error: {0}")] +    MessageSend(MessageSendError), +    #[error("message receive error: {0}")] +    MessageRecv(MessageRecvError), +} + +#[derive(Debug, Error, Clone)] +pub enum MessageSendError { +    #[error("could not add to message history: {0}")] +    MessageHistory(#[from] DatabaseError), +} + +#[derive(Debug, Error, Clone)] +pub enum MessageRecvError { +    #[error("could not add to message history: {0}")] +    MessageHistory(#[from] DatabaseError), +    #[error("missing from")] +    MissingFrom, +} + +#[derive(Debug, Error, Clone)] +pub enum StatusError { +    #[error("cache: {0}")] +    Cache(#[from] DatabaseError), +    #[error("stream write: {0}")] +    Write(#[from] WriteError), +} + +#[derive(Debug, Clone, Error)] +pub enum ConnectionJobError { +    // #[error("connection failed: {0}")] +    // ConnectionFailed(#[from] luz::Error), +    #[error("failed roster retreival: {0}")] +    RosterRetreival(#[from] RosterError), +    #[error("failed to send available presence: {0}")] +    SendPresence(#[from] WriteError), +    #[error("cached status: {0}")] +    StatusCacheError(#[from] DatabaseError), +} + +#[derive(Debug, Error, Clone)] +pub enum RosterError { +    #[error("cache: {0}")] +    Cache(#[from] DatabaseError), +    #[error("stream write: {0}")] +    Write(#[from] WriteError), +    // TODO: display for stanza, to show as xml, same for read error types. +    #[error("unexpected reply: {0:?}")] +    UnexpectedStanza(Stanza), +    #[error("stream read: {0}")] +    Read(#[from] ReadError), +    #[error("stanza error: {0}")] +    StanzaError(#[from] stanza::client::error::Error), +} + +#[derive(Debug, Error, Clone)] +#[error("database error: {0}")] +pub struct DatabaseError(Arc<sqlx::Error>); + +impl From<sqlx::Error> for DatabaseError { +    fn from(e: sqlx::Error) -> Self { +        Self(Arc::new(e)) +    } +} + +impl From<sqlx::Error> for DatabaseOpenError { +    fn from(e: sqlx::Error) -> Self { +        Self::Error(Arc::new(e)) +    } +} + +#[derive(Debug, Error, Clone)] +// TODO: should probably have all iq query related errors here, including read, write, stanza error, etc. +pub enum IqError { +    #[error("no iq with id matching `{0}`")] +    NoMatchingId(String), +} + +#[derive(Debug, Error, Clone)] +pub enum DatabaseOpenError { +    #[error("error: {0}")] +    Error(Arc<sqlx::Error>), +    #[error("migration: {0}")] +    Migration(Arc<sqlx::migrate::MigrateError>), +    #[error("io: {0}")] +    Io(Arc<tokio::io::Error>), +    #[error("invalid path")] +    InvalidPath, +} + +impl From<sqlx::migrate::MigrateError> for DatabaseOpenError { +    fn from(e: sqlx::migrate::MigrateError) -> Self { +        Self::Migration(Arc::new(e)) +    } +} + +impl From<tokio::io::Error> for DatabaseOpenError { +    fn from(e: tokio::io::Error) -> Self { +        Self::Io(Arc::new(e)) +    } +} + +#[derive(Debug, Error, Clone)] +pub enum PresenceError { +    #[error("unsupported")] +    Unsupported, +    #[error("missing from")] +    MissingFrom, +    #[error("stanza error: {0}")] +    StanzaError(#[from] stanza::client::error::Error), +} diff --git a/filamento/src/lib.rs b/filamento/src/lib.rs new file mode 100644 index 0000000..db59a67 --- /dev/null +++ b/filamento/src/lib.rs @@ -0,0 +1,1598 @@ +use std::{ +    collections::HashMap, +    ops::{Deref, DerefMut}, +    str::FromStr, +    sync::Arc, +    time::Duration, +}; + +use chat::{Body, Chat, Message}; +use chrono::Utc; +use db::Db; +use error::{ +    ConnectionJobError, DatabaseError, Error, IqError, MessageRecvError, PresenceError, +    RosterError, StatusError, +}; +use futures::FutureExt; +use jid::JID; +use lampada::{ +    Connected, CoreClient, CoreClientCommand, Logic, SupervisorSender, WriteMessage, +    error::{ActorError, CommandError, ConnectionError, ReadError, WriteError}, +}; +use presence::{Offline, Online, Presence, PresenceType, Show}; +use roster::{Contact, ContactUpdate}; +use stanza::client::{ +    Stanza, +    iq::{self, Iq, IqType}, +}; +use tokio::{ +    sync::{Mutex, mpsc, oneshot}, +    time::timeout, +}; +use tracing::{debug, info}; +use user::User; +use uuid::Uuid; + +pub mod chat; +pub mod db; +pub mod error; +pub mod presence; +pub mod roster; +pub mod user; + +pub enum Command { +    /// get the roster. if offline, retreive cached version from database. should be stored in application memory +    GetRoster(oneshot::Sender<Result<Vec<Contact>, RosterError>>), +    /// get all chats. chat will include 10 messages in their message Vec (enough for chat previews) +    // TODO: paging and filtering +    GetChats(oneshot::Sender<Result<Vec<Chat>, DatabaseError>>), +    // TODO: paging and filtering +    GetChatsOrdered(oneshot::Sender<Result<Vec<Chat>, DatabaseError>>), +    // TODO: paging and filtering +    GetChatsOrderedWithLatestMessages(oneshot::Sender<Result<Vec<(Chat, Message)>, DatabaseError>>), +    /// get a specific chat by jid +    GetChat(JID, oneshot::Sender<Result<Chat, DatabaseError>>), +    /// get message history for chat (does appropriate mam things) +    // TODO: paging and filtering +    GetMessages(JID, oneshot::Sender<Result<Vec<Message>, DatabaseError>>), +    /// delete a chat from your chat history, along with all the corresponding messages +    DeleteChat(JID, oneshot::Sender<Result<(), DatabaseError>>), +    /// delete a message from your chat history +    DeleteMessage(Uuid, oneshot::Sender<Result<(), DatabaseError>>), +    /// get a user from your users database +    GetUser(JID, oneshot::Sender<Result<User, DatabaseError>>), +    /// add a contact to your roster, with a status of none, no subscriptions. +    AddContact(JID, oneshot::Sender<Result<(), RosterError>>), +    /// send a friend request i.e. a subscription request with a subscription pre-approval. if not already added to roster server adds to roster. +    BuddyRequest(JID, oneshot::Sender<Result<(), WriteError>>), +    /// send a subscription request, without pre-approval. if not already added to roster server adds to roster. +    SubscriptionRequest(JID, oneshot::Sender<Result<(), WriteError>>), +    /// accept a friend request by accepting a pending subscription and sending a subscription request back. if not already added to roster adds to roster. +    AcceptBuddyRequest(JID, oneshot::Sender<Result<(), WriteError>>), +    /// accept a pending subscription and doesn't send a subscription request back. if not already added to roster adds to roster. +    AcceptSubscriptionRequest(JID, oneshot::Sender<Result<(), WriteError>>), +    /// unsubscribe to a contact, but don't remove their subscription. +    UnsubscribeFromContact(JID, oneshot::Sender<Result<(), WriteError>>), +    /// stop a contact from being subscribed, but stay subscribed to the contact. +    UnsubscribeContact(JID, oneshot::Sender<Result<(), WriteError>>), +    /// remove subscriptions to and from contact, but keep in roster. +    UnfriendContact(JID, oneshot::Sender<Result<(), WriteError>>), +    /// remove a contact from the contact list. will remove subscriptions if not already done then delete contact from roster. +    DeleteContact(JID, oneshot::Sender<Result<(), RosterError>>), +    /// update contact. contact details will be overwritten with the contents of the contactupdate struct. +    UpdateContact(JID, ContactUpdate, oneshot::Sender<Result<(), RosterError>>), +    /// set online status. if disconnected, will be cached so when client connects, will be sent as the initial presence. +    SetStatus(Online, oneshot::Sender<Result<(), StatusError>>), +    /// send presence stanza +    // TODO: cache presence stanza +    SendPresence( +        Option<JID>, +        PresenceType, +        oneshot::Sender<Result<(), WriteError>>, +    ), +    /// send a directed presence (usually to a non-contact). +    // TODO: should probably make it so people can add non-contact auto presence sharing in the client (most likely through setting an internal setting) +    /// send a message to a jid (any kind of jid that can receive a message, e.g. a user or a +    /// chatroom). if disconnected, will be cached so when client connects, message will be sent. +    SendMessage(JID, Body, oneshot::Sender<Result<(), WriteError>>), +} +/// an xmpp client that is suited for a chat client use case +#[derive(Debug)] +pub struct Client { +    sender: mpsc::Sender<CoreClientCommand<Command>>, +    timeout: Duration, +} + +impl Clone for Client { +    fn clone(&self) -> Self { +        Self { +            sender: self.sender.clone(), +            timeout: self.timeout, +        } +    } +} + +impl Deref for Client { +    type Target = mpsc::Sender<CoreClientCommand<Command>>; + +    fn deref(&self) -> &Self::Target { +        &self.sender +    } +} + +impl DerefMut for Client { +    fn deref_mut(&mut self) -> &mut Self::Target { +        &mut self.sender +    } +} + +impl Client { +    pub async fn connect(&self) -> Result<(), ActorError> { +        self.send(CoreClientCommand::Connect).await?; +        Ok(()) +    } + +    pub async fn disconnect(&self, offline: Offline) -> Result<(), ActorError> { +        self.send(CoreClientCommand::Disconnect).await?; +        Ok(()) +    } + +    pub fn new(jid: JID, password: String, db: Db) -> (Self, mpsc::Receiver<UpdateMessage>) { +        let (command_sender, command_receiver) = mpsc::channel(20); +        let (update_send, update_recv) = mpsc::channel(20); + +        // might be bad, first supervisor shutdown notification oneshot is never used (disgusting) +        let (_sup_send, sup_recv) = oneshot::channel(); +        let sup_recv = sup_recv.fuse(); + +        let logic = ClientLogic { +            db, +            pending: Arc::new(Mutex::new(HashMap::new())), +            update_sender: update_send, +        }; + +        let actor: CoreClient<ClientLogic> = +            CoreClient::new(jid, password, command_receiver, None, sup_recv, logic); +        tokio::spawn(async move { actor.run().await }); + +        ( +            Self { +                sender: command_sender, +                // TODO: configure timeout +                timeout: Duration::from_secs(10), +            }, +            update_recv, +        ) +    } + +    pub async fn get_roster(&self) -> Result<Vec<Contact>, CommandError<RosterError>> { +        let (send, recv) = oneshot::channel(); +        self.send(CoreClientCommand::Command(Command::GetRoster(send))) +            .await +            .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?; +        let roster = timeout(self.timeout, recv) +            .await +            .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))? +            .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??; +        Ok(roster) +    } + +    pub async fn get_chats(&self) -> Result<Vec<Chat>, CommandError<DatabaseError>> { +        let (send, recv) = oneshot::channel(); +        self.send(CoreClientCommand::Command(Command::GetChats(send))) +            .await +            .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?; +        let chats = timeout(self.timeout, recv) +            .await +            .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))? +            .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??; +        Ok(chats) +    } + +    pub async fn get_chats_ordered(&self) -> Result<Vec<Chat>, CommandError<DatabaseError>> { +        let (send, recv) = oneshot::channel(); +        self.send(CoreClientCommand::Command(Command::GetChatsOrdered(send))) +            .await +            .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?; +        let chats = timeout(self.timeout, recv) +            .await +            .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))? +            .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??; +        Ok(chats) +    } + +    pub async fn get_chats_ordered_with_latest_messages( +        &self, +    ) -> Result<Vec<(Chat, Message)>, CommandError<DatabaseError>> { +        let (send, recv) = oneshot::channel(); +        self.send(CoreClientCommand::Command( +            Command::GetChatsOrderedWithLatestMessages(send), +        )) +        .await +        .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?; +        let chats = timeout(self.timeout, recv) +            .await +            .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))? +            .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??; +        Ok(chats) +    } + +    pub async fn get_chat(&self, jid: JID) -> Result<Chat, CommandError<DatabaseError>> { +        let (send, recv) = oneshot::channel(); +        self.send(CoreClientCommand::Command(Command::GetChat(jid, send))) +            .await +            .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?; +        let chat = timeout(self.timeout, recv) +            .await +            .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))? +            .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??; +        Ok(chat) +    } + +    pub async fn get_messages( +        &self, +        jid: JID, +    ) -> Result<Vec<Message>, CommandError<DatabaseError>> { +        let (send, recv) = oneshot::channel(); +        self.send(CoreClientCommand::Command(Command::GetMessages(jid, send))) +            .await +            .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?; +        let messages = timeout(self.timeout, recv) +            .await +            .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))? +            .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??; +        Ok(messages) +    } + +    pub async fn delete_chat(&self, jid: JID) -> Result<(), CommandError<DatabaseError>> { +        let (send, recv) = oneshot::channel(); +        self.send(CoreClientCommand::Command(Command::DeleteChat(jid, send))) +            .await +            .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?; +        let result = timeout(self.timeout, recv) +            .await +            .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))? +            .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??; +        Ok(result) +    } + +    pub async fn delete_message(&self, id: Uuid) -> Result<(), CommandError<DatabaseError>> { +        let (send, recv) = oneshot::channel(); +        self.send(CoreClientCommand::Command(Command::DeleteMessage(id, send))) +            .await +            .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?; +        let result = timeout(self.timeout, recv) +            .await +            .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))? +            .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??; +        Ok(result) +    } + +    pub async fn get_user(&self, jid: JID) -> Result<User, CommandError<DatabaseError>> { +        let (send, recv) = oneshot::channel(); +        self.send(CoreClientCommand::Command(Command::GetUser(jid, send))) +            .await +            .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?; +        let result = timeout(self.timeout, recv) +            .await +            .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))? +            .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??; +        Ok(result) +    } + +    pub async fn add_contact(&self, jid: JID) -> Result<(), CommandError<RosterError>> { +        let (send, recv) = oneshot::channel(); +        self.send(CoreClientCommand::Command(Command::AddContact(jid, send))) +            .await +            .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?; +        let result = timeout(self.timeout, recv) +            .await +            .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))? +            .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??; +        Ok(result) +    } + +    pub async fn buddy_request(&self, jid: JID) -> Result<(), CommandError<WriteError>> { +        let (send, recv) = oneshot::channel(); +        self.send(CoreClientCommand::Command(Command::BuddyRequest(jid, send))) +            .await +            .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?; +        let result = timeout(self.timeout, recv) +            .await +            .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))? +            .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??; +        Ok(result) +    } + +    pub async fn subscription_request(&self, jid: JID) -> Result<(), CommandError<WriteError>> { +        let (send, recv) = oneshot::channel(); +        self.send(CoreClientCommand::Command(Command::SubscriptionRequest( +            jid, send, +        ))) +        .await +        .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?; +        let result = timeout(self.timeout, recv) +            .await +            .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))? +            .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??; +        Ok(result) +    } + +    pub async fn accept_buddy_request(&self, jid: JID) -> Result<(), CommandError<WriteError>> { +        let (send, recv) = oneshot::channel(); +        self.send(CoreClientCommand::Command(Command::AcceptBuddyRequest( +            jid, send, +        ))) +        .await +        .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?; +        let result = timeout(self.timeout, recv) +            .await +            .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))? +            .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??; +        Ok(result) +    } + +    pub async fn accept_subscription_request( +        &self, +        jid: JID, +    ) -> Result<(), CommandError<WriteError>> { +        let (send, recv) = oneshot::channel(); +        self.send(CoreClientCommand::Command( +            Command::AcceptSubscriptionRequest(jid, send), +        )) +        .await +        .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?; +        let result = timeout(self.timeout, recv) +            .await +            .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))? +            .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??; +        Ok(result) +    } + +    pub async fn unsubscribe_from_contact(&self, jid: JID) -> Result<(), CommandError<WriteError>> { +        let (send, recv) = oneshot::channel(); +        self.send(CoreClientCommand::Command(Command::UnsubscribeFromContact( +            jid, send, +        ))) +        .await +        .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?; +        let result = timeout(self.timeout, recv) +            .await +            .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))? +            .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??; +        Ok(result) +    } + +    pub async fn unsubscribe_contact(&self, jid: JID) -> Result<(), CommandError<WriteError>> { +        let (send, recv) = oneshot::channel(); +        self.send(CoreClientCommand::Command(Command::UnsubscribeContact( +            jid, send, +        ))) +        .await +        .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?; +        let result = timeout(self.timeout, recv) +            .await +            .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))? +            .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??; +        Ok(result) +    } + +    pub async fn unfriend_contact(&self, jid: JID) -> Result<(), CommandError<WriteError>> { +        let (send, recv) = oneshot::channel(); +        self.send(CoreClientCommand::Command(Command::UnfriendContact( +            jid, send, +        ))) +        .await +        .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?; +        let result = timeout(self.timeout, recv) +            .await +            .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))? +            .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??; +        Ok(result) +    } + +    pub async fn delete_contact(&self, jid: JID) -> Result<(), CommandError<RosterError>> { +        let (send, recv) = oneshot::channel(); +        self.send(CoreClientCommand::Command(Command::DeleteContact( +            jid, send, +        ))) +        .await +        .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?; +        let result = timeout(self.timeout, recv) +            .await +            .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))? +            .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??; +        Ok(result) +    } + +    pub async fn update_contact( +        &self, +        jid: JID, +        update: ContactUpdate, +    ) -> Result<(), CommandError<RosterError>> { +        let (send, recv) = oneshot::channel(); +        self.send(CoreClientCommand::Command(Command::UpdateContact( +            jid, update, send, +        ))) +        .await +        .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?; +        let result = timeout(self.timeout, recv) +            .await +            .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))? +            .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??; +        Ok(result) +    } + +    pub async fn set_status(&self, online: Online) -> Result<(), CommandError<StatusError>> { +        let (send, recv) = oneshot::channel(); +        self.send(CoreClientCommand::Command(Command::SetStatus(online, send))) +            .await +            .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?; +        let result = timeout(self.timeout, recv) +            .await +            .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))? +            .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??; +        Ok(result) +    } + +    pub async fn send_message(&self, jid: JID, body: Body) -> Result<(), CommandError<WriteError>> { +        let (send, recv) = oneshot::channel(); +        self.send(CoreClientCommand::Command(Command::SendMessage( +            jid, body, send, +        ))) +        .await +        .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?; +        let result = timeout(self.timeout, recv) +            .await +            .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))? +            .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??; +        Ok(result) +    } +} + +#[derive(Clone)] +pub struct ClientLogic { +    db: Db, +    pending: Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, ReadError>>>>>, +    update_sender: mpsc::Sender<UpdateMessage>, +} + +impl Logic for ClientLogic { +    type Cmd = Command; + +    async fn handle_connect(self, connection: Connected) { +        let (send, recv) = oneshot::channel(); +        debug!("getting roster"); +        self.clone() +            .handle_online(Command::GetRoster(send), connection.clone()) +            .await; +        debug!("sent roster req"); +        let roster = recv.await; +        debug!("got roster"); +        match roster { +            Ok(r) => match r { +                Ok(roster) => { +                    let online = self.db.read_cached_status().await; +                    let online = match online { +                        Ok(online) => online, +                        Err(e) => { +                            let _ = self +                                .update_sender +                                .send(UpdateMessage::Error(Error::Connecting( +                                    ConnectionJobError::StatusCacheError(e.into()), +                                ))) +                                .await; +                            Online::default() +                        } +                    }; +                    let (send, recv) = oneshot::channel(); +                    self.clone() +                        .handle_online( +                            Command::SendPresence(None, PresenceType::Online(online.clone()), send), +                            connection, +                        ) +                        .await; +                    let set_status = recv.await; +                    match set_status { +                        Ok(s) => match s { +                            Ok(()) => { +                                let _ = self +                                    .update_sender +                                    .send(UpdateMessage::Online(online, roster)) +                                    .await; +                            } +                            Err(e) => { +                                let _ = self +                                    .update_sender +                                    .send(UpdateMessage::Error(Error::Connecting(e.into()))) +                                    .await; +                            } +                        }, +                        Err(e) => { +                            let _ = self +                                .update_sender +                                .send(UpdateMessage::Error(Error::Connecting( +                                    ConnectionJobError::SendPresence(WriteError::Actor(e.into())), +                                ))) +                                .await; +                        } +                    } +                } +                Err(e) => { +                    let _ = self +                        .update_sender +                        .send(UpdateMessage::Error(Error::Connecting(e.into()))) +                        .await; +                } +            }, +            Err(e) => { +                let _ = self +                    .update_sender +                    .send(UpdateMessage::Error(Error::Connecting( +                        ConnectionJobError::RosterRetreival(RosterError::Write(WriteError::Actor( +                            e.into(), +                        ))), +                    ))) +                    .await; +            } +        } +    } + +    async fn handle_disconnect(self, connection: Connected) { +        // TODO: be able to set offline status message +        let offline_presence: stanza::client::presence::Presence = +            Offline::default().into_stanza(None); +        let stanza = Stanza::Presence(offline_presence); +        // TODO: timeout and error check +        connection.write_handle().write(stanza).await; +        let _ = self +            .update_sender +            .send(UpdateMessage::Offline(Offline::default())) +            .await; +    } + +    async fn handle_stanza( +        self, +        stanza: Stanza, +        connection: Connected, +        supervisor: SupervisorSender, +    ) { +        match stanza { +            Stanza::Message(stanza_message) => { +                if let Some(mut from) = stanza_message.from { +                    // TODO: don't ignore delay from. xep says SHOULD send error if incorrect. +                    let timestamp = stanza_message +                        .delay +                        .map(|delay| delay.stamp) +                        .unwrap_or_else(|| Utc::now()); +                    // TODO: group chat messages +                    let mut message = Message { +                        id: stanza_message +                            .id +                            // TODO: proper id storage +                            .map(|id| Uuid::from_str(&id).unwrap_or_else(|_| Uuid::new_v4())) +                            .unwrap_or_else(|| Uuid::new_v4()), +                        from: from.clone(), +                        timestamp, +                        body: Body { +                            // TODO: should this be an option? +                            body: stanza_message +                                .body +                                .map(|body| body.body) +                                .unwrap_or_default() +                                .unwrap_or_default(), +                        }, +                    }; +                    // TODO: can this be more efficient? +                    let result = self +                        .db +                        .create_message_with_user_resource_and_chat(message.clone(), from.clone()) +                        .await; +                    if let Err(e) = result { +                        tracing::error!("messagecreate"); +                        let _ = self +                            .update_sender +                            .send(UpdateMessage::Error(Error::MessageRecv( +                                MessageRecvError::MessageHistory(e.into()), +                            ))) +                            .await; +                    } +                    message.from = message.from.as_bare(); +                    from = from.as_bare(); +                    let _ = self +                        .update_sender +                        .send(UpdateMessage::Message { to: from, message }) +                        .await; +                } else { +                    let _ = self +                        .update_sender +                        .send(UpdateMessage::Error(Error::MessageRecv( +                            MessageRecvError::MissingFrom, +                        ))) +                        .await; +                } +            } +            Stanza::Presence(presence) => { +                if let Some(from) = presence.from { +                    match presence.r#type { +                        Some(r#type) => match r#type { +                            // error processing a presence from somebody +                            stanza::client::presence::PresenceType::Error => { +                                // TODO: is there any other information that should go with the error? also MUST have an error, otherwise it's a different error. maybe it shoulnd't be an option. +                                let _ = self +                                    .update_sender +                                    .send(UpdateMessage::Error(Error::Presence( +                                        // TODO: ughhhhhhhhhhhhh these stanza errors should probably just have an option, and custom display +                                        PresenceError::StanzaError( +                                            presence +                                                .errors +                                                .first() +                                                .cloned() +                                                .expect("error MUST have error"), +                                        ), +                                    ))) +                                    .await; +                            } +                            // should not happen (error to server) +                            stanza::client::presence::PresenceType::Probe => { +                                // TODO: should probably write an error and restart stream +                                let _ = self +                                    .update_sender +                                    .send(UpdateMessage::Error(Error::Presence( +                                        PresenceError::Unsupported, +                                    ))) +                                    .await; +                            } +                            stanza::client::presence::PresenceType::Subscribe => { +                                // may get a subscription request from somebody who is not a contact!!! therefore should be its own kind of event +                                let _ = self +                                    .update_sender +                                    .send(UpdateMessage::SubscriptionRequest(from)) +                                    .await; +                            } +                            stanza::client::presence::PresenceType::Unavailable => { +                                let offline = Offline { +                                    status: presence.status.map(|status| status.status.0), +                                }; +                                let timestamp = presence +                                    .delay +                                    .map(|delay| delay.stamp) +                                    .unwrap_or_else(|| Utc::now()); +                                let _ = self +                                    .update_sender +                                    .send(UpdateMessage::Presence { +                                        from, +                                        presence: Presence { +                                            timestamp, +                                            presence: PresenceType::Offline(offline), +                                        }, +                                    }) +                                    .await; +                            } +                            // for now, do nothing, as these are simply informational. will receive roster push from the server regarding the changes to do with them. +                            stanza::client::presence::PresenceType::Subscribed => {} +                            stanza::client::presence::PresenceType::Unsubscribe => {} +                            stanza::client::presence::PresenceType::Unsubscribed => {} +                        }, +                        None => { +                            let online = Online { +                                show: presence.show.map(|show| match show { +                                    stanza::client::presence::Show::Away => Show::Away, +                                    stanza::client::presence::Show::Chat => Show::Chat, +                                    stanza::client::presence::Show::Dnd => Show::DoNotDisturb, +                                    stanza::client::presence::Show::Xa => Show::ExtendedAway, +                                }), +                                status: presence.status.map(|status| status.status.0), +                                priority: presence.priority.map(|priority| priority.0), +                            }; +                            let timestamp = presence +                                .delay +                                .map(|delay| delay.stamp) +                                .unwrap_or_else(|| Utc::now()); +                            let _ = self +                                .update_sender +                                .send(UpdateMessage::Presence { +                                    from, +                                    presence: Presence { +                                        timestamp, +                                        presence: PresenceType::Online(online), +                                    }, +                                }) +                                .await; +                        } +                    } +                } else { +                    let _ = self +                        .update_sender +                        .send(UpdateMessage::Error(Error::Presence( +                            PresenceError::MissingFrom, +                        ))) +                        .await; +                } +            } +            Stanza::Iq(iq) => match iq.r#type { +                stanza::client::iq::IqType::Error | stanza::client::iq::IqType::Result => { +                    let send; +                    { +                        send = self.pending.lock().await.remove(&iq.id); +                    } +                    if let Some(send) = send { +                        send.send(Ok(Stanza::Iq(iq))); +                    } else { +                        let _ = self +                            .update_sender +                            .send(UpdateMessage::Error(Error::Iq(IqError::NoMatchingId( +                                iq.id, +                            )))) +                            .await; +                    } +                } +                // TODO: send unsupported to server +                // TODO: proper errors i am so tired please +                stanza::client::iq::IqType::Get => {} +                stanza::client::iq::IqType::Set => { +                    if let Some(query) = iq.query { +                        match query { +                            stanza::client::iq::Query::Roster(mut query) => { +                                // TODO: there should only be one +                                if let Some(item) = query.items.pop() { +                                    match item.subscription { +                                        Some(stanza::roster::Subscription::Remove) => { +                                            self.db.delete_contact(item.jid.clone()).await; +                                            self.update_sender +                                                .send(UpdateMessage::RosterDelete(item.jid)) +                                                .await; +                                            // TODO: send result +                                        } +                                        _ => { +                                            let contact: Contact = item.into(); +                                            if let Err(e) = +                                                self.db.upsert_contact(contact.clone()).await +                                            { +                                                let _ = self +                                                    .update_sender +                                                    .send(UpdateMessage::Error(Error::Roster( +                                                        RosterError::Cache(e.into()), +                                                    ))) +                                                    .await; +                                            } +                                            let _ = self +                                                .update_sender +                                                .send(UpdateMessage::RosterUpdate(contact)) +                                                .await; +                                            // TODO: send result +                                            // write_handle.write(Stanza::Iq(stanza::client::iq::Iq { +                                            //     from: , +                                            //     id: todo!(), +                                            //     to: todo!(), +                                            //     r#type: todo!(), +                                            //     lang: todo!(), +                                            //     query: todo!(), +                                            //     errors: todo!(), +                                            // })); +                                        } +                                    } +                                } +                            } +                            // TODO: send unsupported to server +                            _ => {} +                        } +                    } else { +                        // TODO: send error (unsupported) to server +                    } +                } +            }, +            Stanza::Error(error) => { +                let _ = self +                    .update_sender +                    .send(UpdateMessage::Error(Error::Stream(error))) +                    .await; +                // TODO: reconnect +            } +            Stanza::OtherContent(content) => { +                let _ = self +                    .update_sender +                    .send(UpdateMessage::Error(Error::UnrecognizedContent)); +                // TODO: send error to write_thread +            } +        } +    } + +    async fn handle_online(self, command: Command, connection: Connected) { +        match command { +            Command::GetRoster(result_sender) => { +                // TODO: jid resource should probably be stored within the connection +                debug!("before client_jid lock"); +                debug!("after client_jid lock"); +                let iq_id = Uuid::new_v4().to_string(); +                let (send, iq_recv) = oneshot::channel(); +                { +                    self.pending.lock().await.insert(iq_id.clone(), send); +                } +                let stanza = Stanza::Iq(Iq { +                    from: Some(connection.jid().clone()), +                    id: iq_id.to_string(), +                    to: None, +                    r#type: IqType::Get, +                    lang: None, +                    query: Some(iq::Query::Roster(stanza::roster::Query { +                        ver: None, +                        items: Vec::new(), +                    })), +                    errors: Vec::new(), +                }); +                let (send, recv) = oneshot::channel(); +                let _ = connection +                    .write_handle() +                    .send(WriteMessage { +                        stanza, +                        respond_to: send, +                    }) +                    .await; +                // TODO: timeout +                match recv.await { +                    Ok(Ok(())) => info!("roster request sent"), +                    Ok(Err(e)) => { +                        // TODO: log errors if fail to send +                        let _ = result_sender.send(Err(RosterError::Write(e.into()))); +                        return; +                    } +                    Err(e) => { +                        let _ = result_sender +                            .send(Err(RosterError::Write(WriteError::Actor(e.into())))); +                        return; +                    } +                }; +                // TODO: timeout +                match iq_recv.await { +                    Ok(Ok(stanza)) => match stanza { +                        Stanza::Iq(Iq { +                            from: _, +                            id, +                            to: _, +                            r#type, +                            lang: _, +                            query: Some(iq::Query::Roster(stanza::roster::Query { ver: _, items })), +                            errors: _, +                        }) if id == iq_id && r#type == IqType::Result => { +                            let contacts: Vec<Contact> = +                                items.into_iter().map(|item| item.into()).collect(); +                            if let Err(e) = self.db.replace_cached_roster(contacts.clone()).await { +                                self.update_sender +                                    .send(UpdateMessage::Error(Error::Roster(RosterError::Cache( +                                        e.into(), +                                    )))) +                                    .await; +                            }; +                            result_sender.send(Ok(contacts)); +                            return; +                        } +                        ref s @ Stanza::Iq(Iq { +                            from: _, +                            ref id, +                            to: _, +                            r#type, +                            lang: _, +                            query: _, +                            ref errors, +                        }) if *id == iq_id && r#type == IqType::Error => { +                            if let Some(error) = errors.first() { +                                result_sender.send(Err(RosterError::StanzaError(error.clone()))); +                            } else { +                                result_sender.send(Err(RosterError::UnexpectedStanza(s.clone()))); +                            } +                            return; +                        } +                        s => { +                            result_sender.send(Err(RosterError::UnexpectedStanza(s))); +                            return; +                        } +                    }, +                    Ok(Err(e)) => { +                        result_sender.send(Err(RosterError::Read(e))); +                        return; +                    } +                    Err(e) => { +                        result_sender.send(Err(RosterError::Write(WriteError::Actor(e.into())))); +                        return; +                    } +                } +            } +            Command::GetChats(sender) => { +                let chats = self.db.read_chats().await.map_err(|e| e.into()); +                sender.send(chats); +            } +            Command::GetChatsOrdered(sender) => { +                let chats = self.db.read_chats_ordered().await.map_err(|e| e.into()); +                sender.send(chats); +            } +            Command::GetChatsOrderedWithLatestMessages(sender) => { +                let chats = self +                    .db +                    .read_chats_ordered_with_latest_messages() +                    .await +                    .map_err(|e| e.into()); +                sender.send(chats); +            } +            Command::GetChat(jid, sender) => { +                let chats = self.db.read_chat(jid).await.map_err(|e| e.into()); +                sender.send(chats); +            } +            Command::GetMessages(jid, sender) => { +                let messages = self +                    .db +                    .read_message_history(jid) +                    .await +                    .map_err(|e| e.into()); +                sender.send(messages); +            } +            Command::DeleteChat(jid, sender) => { +                let result = self.db.delete_chat(jid).await.map_err(|e| e.into()); +                sender.send(result); +            } +            Command::DeleteMessage(uuid, sender) => { +                let result = self.db.delete_message(uuid).await.map_err(|e| e.into()); +                sender.send(result); +            } +            Command::GetUser(jid, sender) => { +                let user = self.db.read_user(jid).await.map_err(|e| e.into()); +                sender.send(user); +            } +            // TODO: offline queue to modify roster +            Command::AddContact(jid, sender) => { +                let iq_id = Uuid::new_v4().to_string(); +                let set_stanza = Stanza::Iq(Iq { +                    from: Some(connection.jid().clone()), +                    id: iq_id.clone(), +                    to: None, +                    r#type: IqType::Set, +                    lang: None, +                    query: Some(iq::Query::Roster(stanza::roster::Query { +                        ver: None, +                        items: vec![stanza::roster::Item { +                            approved: None, +                            ask: false, +                            jid, +                            name: None, +                            subscription: None, +                            groups: Vec::new(), +                        }], +                    })), +                    errors: Vec::new(), +                }); +                let (send, recv) = oneshot::channel(); +                { +                    self.pending.lock().await.insert(iq_id.clone(), send); +                } +                // TODO: write_handle send helper function +                let result = connection.write_handle().write(set_stanza).await; +                if let Err(e) = result { +                    sender.send(Err(RosterError::Write(e))); +                    return; +                } +                let iq_result = recv.await; +                match iq_result { +                    Ok(i) => match i { +                        Ok(iq_result) => match iq_result { +                            Stanza::Iq(Iq { +                                from: _, +                                id, +                                to: _, +                                r#type, +                                lang: _, +                                query: _, +                                errors: _, +                            }) if id == iq_id && r#type == IqType::Result => { +                                sender.send(Ok(())); +                                return; +                            } +                            ref s @ Stanza::Iq(Iq { +                                from: _, +                                ref id, +                                to: _, +                                r#type, +                                lang: _, +                                query: _, +                                ref errors, +                            }) if *id == iq_id && r#type == IqType::Error => { +                                if let Some(error) = errors.first() { +                                    sender.send(Err(RosterError::StanzaError(error.clone()))); +                                } else { +                                    sender.send(Err(RosterError::UnexpectedStanza(s.clone()))); +                                } +                                return; +                            } +                            s => { +                                sender.send(Err(RosterError::UnexpectedStanza(s))); +                                return; +                            } +                        }, +                        Err(e) => { +                            sender.send(Err(e.into())); +                            return; +                        } +                    }, +                    Err(e) => { +                        sender.send(Err(RosterError::Write(WriteError::Actor(e.into())))); +                        return; +                    } +                } +            } +            Command::BuddyRequest(jid, sender) => { +                let presence = Stanza::Presence(stanza::client::presence::Presence { +                    from: None, +                    id: None, +                    to: Some(jid.clone()), +                    r#type: Some(stanza::client::presence::PresenceType::Subscribe), +                    lang: None, +                    show: None, +                    status: None, +                    priority: None, +                    errors: Vec::new(), +                    delay: None, +                }); +                let result = connection.write_handle().write(presence).await; +                match result { +                    Err(_) => { +                        let _ = sender.send(result); +                    } +                    Ok(()) => { +                        let presence = Stanza::Presence(stanza::client::presence::Presence { +                            from: None, +                            id: None, +                            to: Some(jid), +                            r#type: Some(stanza::client::presence::PresenceType::Subscribed), +                            lang: None, +                            show: None, +                            status: None, +                            priority: None, +                            errors: Vec::new(), +                            delay: None, +                        }); +                        let result = connection.write_handle().write(presence).await; +                        let _ = sender.send(result); +                    } +                } +            } +            Command::SubscriptionRequest(jid, sender) => { +                // TODO: i should probably have builders +                let presence = Stanza::Presence(stanza::client::presence::Presence { +                    from: None, +                    id: None, +                    to: Some(jid), +                    r#type: Some(stanza::client::presence::PresenceType::Subscribe), +                    lang: None, +                    show: None, +                    status: None, +                    priority: None, +                    errors: Vec::new(), +                    delay: None, +                }); +                let result = connection.write_handle().write(presence).await; +                let _ = sender.send(result); +            } +            Command::AcceptBuddyRequest(jid, sender) => { +                let presence = Stanza::Presence(stanza::client::presence::Presence { +                    from: None, +                    id: None, +                    to: Some(jid.clone()), +                    r#type: Some(stanza::client::presence::PresenceType::Subscribed), +                    lang: None, +                    show: None, +                    status: None, +                    priority: None, +                    errors: Vec::new(), +                    delay: None, +                }); +                let result = connection.write_handle().write(presence).await; +                match result { +                    Err(_) => { +                        let _ = sender.send(result); +                    } +                    Ok(()) => { +                        let presence = Stanza::Presence(stanza::client::presence::Presence { +                            from: None, +                            id: None, +                            to: Some(jid), +                            r#type: Some(stanza::client::presence::PresenceType::Subscribe), +                            lang: None, +                            show: None, +                            status: None, +                            priority: None, +                            errors: Vec::new(), +                            delay: None, +                        }); +                        let result = connection.write_handle().write(presence).await; +                        let _ = sender.send(result); +                    } +                } +            } +            Command::AcceptSubscriptionRequest(jid, sender) => { +                let presence = Stanza::Presence(stanza::client::presence::Presence { +                    from: None, +                    id: None, +                    to: Some(jid), +                    r#type: Some(stanza::client::presence::PresenceType::Subscribe), +                    lang: None, +                    show: None, +                    status: None, +                    priority: None, +                    errors: Vec::new(), +                    delay: None, +                }); +                let result = connection.write_handle().write(presence).await; +                let _ = sender.send(result); +            } +            Command::UnsubscribeFromContact(jid, sender) => { +                let presence = Stanza::Presence(stanza::client::presence::Presence { +                    from: None, +                    id: None, +                    to: Some(jid), +                    r#type: Some(stanza::client::presence::PresenceType::Unsubscribe), +                    lang: None, +                    show: None, +                    status: None, +                    priority: None, +                    errors: Vec::new(), +                    delay: None, +                }); +                let result = connection.write_handle().write(presence).await; +                let _ = sender.send(result); +            } +            Command::UnsubscribeContact(jid, sender) => { +                let presence = Stanza::Presence(stanza::client::presence::Presence { +                    from: None, +                    id: None, +                    to: Some(jid), +                    r#type: Some(stanza::client::presence::PresenceType::Unsubscribed), +                    lang: None, +                    show: None, +                    status: None, +                    priority: None, +                    errors: Vec::new(), +                    delay: None, +                }); +                let result = connection.write_handle().write(presence).await; +                let _ = sender.send(result); +            } +            Command::UnfriendContact(jid, sender) => { +                let presence = Stanza::Presence(stanza::client::presence::Presence { +                    from: None, +                    id: None, +                    to: Some(jid.clone()), +                    r#type: Some(stanza::client::presence::PresenceType::Unsubscribe), +                    lang: None, +                    show: None, +                    status: None, +                    priority: None, +                    errors: Vec::new(), +                    delay: None, +                }); +                let result = connection.write_handle().write(presence).await; +                match result { +                    Err(_) => { +                        let _ = sender.send(result); +                    } +                    Ok(()) => { +                        let presence = Stanza::Presence(stanza::client::presence::Presence { +                            from: None, +                            id: None, +                            to: Some(jid), +                            r#type: Some(stanza::client::presence::PresenceType::Unsubscribed), +                            lang: None, +                            show: None, +                            status: None, +                            priority: None, +                            errors: Vec::new(), +                            delay: None, +                        }); +                        let result = connection.write_handle().write(presence).await; +                        let _ = sender.send(result); +                    } +                } +            } +            Command::DeleteContact(jid, sender) => { +                let iq_id = Uuid::new_v4().to_string(); +                let set_stanza = Stanza::Iq(Iq { +                    from: Some(connection.jid().clone()), +                    id: iq_id.clone(), +                    to: None, +                    r#type: IqType::Set, +                    lang: None, +                    query: Some(iq::Query::Roster(stanza::roster::Query { +                        ver: None, +                        items: vec![stanza::roster::Item { +                            approved: None, +                            ask: false, +                            jid, +                            name: None, +                            subscription: Some(stanza::roster::Subscription::Remove), +                            groups: Vec::new(), +                        }], +                    })), +                    errors: Vec::new(), +                }); +                let (send, recv) = oneshot::channel(); +                { +                    self.pending.lock().await.insert(iq_id.clone(), send); +                } +                let result = connection.write_handle().write(set_stanza).await; +                if let Err(e) = result { +                    sender.send(Err(RosterError::Write(e))); +                    return; +                } +                let iq_result = recv.await; +                match iq_result { +                    Ok(i) => match i { +                        Ok(iq_result) => match iq_result { +                            Stanza::Iq(Iq { +                                from: _, +                                id, +                                to: _, +                                r#type, +                                lang: _, +                                query: _, +                                errors: _, +                            }) if id == iq_id && r#type == IqType::Result => { +                                sender.send(Ok(())); +                                return; +                            } +                            ref s @ Stanza::Iq(Iq { +                                from: _, +                                ref id, +                                to: _, +                                r#type, +                                lang: _, +                                query: _, +                                ref errors, +                            }) if *id == iq_id && r#type == IqType::Error => { +                                if let Some(error) = errors.first() { +                                    sender.send(Err(RosterError::StanzaError(error.clone()))); +                                } else { +                                    sender.send(Err(RosterError::UnexpectedStanza(s.clone()))); +                                } +                                return; +                            } +                            s => { +                                sender.send(Err(RosterError::UnexpectedStanza(s))); +                                return; +                            } +                        }, +                        Err(e) => { +                            sender.send(Err(e.into())); +                            return; +                        } +                    }, +                    Err(e) => { +                        sender.send(Err(RosterError::Write(WriteError::Actor(e.into())))); +                        return; +                    } +                } +            } +            Command::UpdateContact(jid, contact_update, sender) => { +                let iq_id = Uuid::new_v4().to_string(); +                let groups = Vec::from_iter( +                    contact_update +                        .groups +                        .into_iter() +                        .map(|group| stanza::roster::Group(Some(group))), +                ); +                let set_stanza = Stanza::Iq(Iq { +                    from: Some(connection.jid().clone()), +                    id: iq_id.clone(), +                    to: None, +                    r#type: IqType::Set, +                    lang: None, +                    query: Some(iq::Query::Roster(stanza::roster::Query { +                        ver: None, +                        items: vec![stanza::roster::Item { +                            approved: None, +                            ask: false, +                            jid, +                            name: contact_update.name, +                            subscription: None, +                            groups, +                        }], +                    })), +                    errors: Vec::new(), +                }); +                let (send, recv) = oneshot::channel(); +                { +                    self.pending.lock().await.insert(iq_id.clone(), send); +                } +                let result = connection.write_handle().write(set_stanza).await; +                if let Err(e) = result { +                    sender.send(Err(RosterError::Write(e))); +                    return; +                } +                let iq_result = recv.await; +                match iq_result { +                    Ok(i) => match i { +                        Ok(iq_result) => match iq_result { +                            Stanza::Iq(Iq { +                                from: _, +                                id, +                                to: _, +                                r#type, +                                lang: _, +                                query: _, +                                errors: _, +                            }) if id == iq_id && r#type == IqType::Result => { +                                sender.send(Ok(())); +                                return; +                            } +                            ref s @ Stanza::Iq(Iq { +                                from: _, +                                ref id, +                                to: _, +                                r#type, +                                lang: _, +                                query: _, +                                ref errors, +                            }) if *id == iq_id && r#type == IqType::Error => { +                                if let Some(error) = errors.first() { +                                    sender.send(Err(RosterError::StanzaError(error.clone()))); +                                } else { +                                    sender.send(Err(RosterError::UnexpectedStanza(s.clone()))); +                                } +                                return; +                            } +                            s => { +                                sender.send(Err(RosterError::UnexpectedStanza(s))); +                                return; +                            } +                        }, +                        Err(e) => { +                            sender.send(Err(e.into())); +                            return; +                        } +                    }, +                    Err(e) => { +                        sender.send(Err(RosterError::Write(WriteError::Actor(e.into())))); +                        return; +                    } +                } +            } +            Command::SetStatus(online, sender) => { +                let result = self.db.upsert_cached_status(online.clone()).await; +                if let Err(e) = result { +                    let _ = self +                        .update_sender +                        .send(UpdateMessage::Error(Error::SetStatus(StatusError::Cache( +                            e.into(), +                        )))) +                        .await; +                } +                let result = connection +                    .write_handle() +                    .write(Stanza::Presence(online.into_stanza(None))) +                    .await +                    .map_err(|e| StatusError::Write(e)); +                // .map_err(|e| StatusError::Write(e)); +                let _ = sender.send(result); +            } +            // TODO: offline message queue +            Command::SendMessage(jid, body, sender) => { +                let id = Uuid::new_v4(); +                let message = Stanza::Message(stanza::client::message::Message { +                    from: Some(connection.jid().clone()), +                    id: Some(id.to_string()), +                    to: Some(jid.clone()), +                    // TODO: specify message type +                    r#type: stanza::client::message::MessageType::Chat, +                    // TODO: lang ? +                    lang: None, +                    subject: None, +                    body: Some(stanza::client::message::Body { +                        lang: None, +                        body: Some(body.body.clone()), +                    }), +                    thread: None, +                    delay: None, +                }); +                let _ = sender.send(Ok(())); +                // let _ = sender.send(Ok(message.clone())); +                let result = connection.write_handle().write(message).await; +                match result { +                    Ok(_) => { +                        let mut message = Message { +                            id, +                            from: connection.jid().clone(), +                            body, +                            timestamp: Utc::now(), +                        }; +                        info!("send message {:?}", message); +                        if let Err(e) = self +                            .db +                            .create_message_with_self_resource_and_chat( +                                message.clone(), +                                jid.clone(), +                            ) +                            .await +                            .map_err(|e| e.into()) +                        { +                            tracing::error!("{}", e); +                            let _ = +                                self.update_sender +                                    .send(UpdateMessage::Error(Error::MessageSend( +                                        error::MessageSendError::MessageHistory(e), +                                    ))); +                        } +                        // TODO: don't do this, have separate from from details +                        message.from = message.from.as_bare(); +                        let _ = self +                            .update_sender +                            .send(UpdateMessage::Message { to: jid, message }) +                            .await; +                    } +                    Err(_) => { +                        // let _ = sender.send(result); +                    } +                } +            } +            Command::SendPresence(jid, presence, sender) => { +                let mut presence: stanza::client::presence::Presence = presence.into(); +                if let Some(jid) = jid { +                    presence.to = Some(jid); +                }; +                let result = connection +                    .write_handle() +                    .write(Stanza::Presence(presence)) +                    .await; +                // .map_err(|e| StatusError::Write(e)); +                let _ = sender.send(result); +            } +        } +    } + +    async fn handle_offline(self, command: Command) { +        match command { +            Command::GetRoster(sender) => { +                let roster = self.db.read_cached_roster().await; +                match roster { +                    Ok(roster) => { +                        let _ = sender.send(Ok(roster)); +                    } +                    Err(e) => { +                        let _ = sender.send(Err(RosterError::Cache(e.into()))); +                    } +                } +            } +            Command::GetChats(sender) => { +                let chats = self.db.read_chats().await.map_err(|e| e.into()); +                sender.send(chats); +            } +            Command::GetChatsOrdered(sender) => { +                let chats = self.db.read_chats_ordered().await.map_err(|e| e.into()); +                sender.send(chats); +            } +            Command::GetChatsOrderedWithLatestMessages(sender) => { +                let chats = self +                    .db +                    .read_chats_ordered_with_latest_messages() +                    .await +                    .map_err(|e| e.into()); +                sender.send(chats); +            } +            Command::GetChat(jid, sender) => { +                let chats = self.db.read_chat(jid).await.map_err(|e| e.into()); +                sender.send(chats); +            } +            Command::GetMessages(jid, sender) => { +                let messages = self +                    .db +                    .read_message_history(jid) +                    .await +                    .map_err(|e| e.into()); +                sender.send(messages); +            } +            Command::DeleteChat(jid, sender) => { +                let result = self.db.delete_chat(jid).await.map_err(|e| e.into()); +                sender.send(result); +            } +            Command::DeleteMessage(uuid, sender) => { +                let result = self.db.delete_message(uuid).await.map_err(|e| e.into()); +                sender.send(result); +            } +            Command::GetUser(jid, sender) => { +                let user = self.db.read_user(jid).await.map_err(|e| e.into()); +                sender.send(user); +            } +            // TODO: offline queue to modify roster +            Command::AddContact(_jid, sender) => { +                sender.send(Err(RosterError::Write(WriteError::Disconnected))); +            } +            Command::BuddyRequest(_jid, sender) => { +                sender.send(Err(WriteError::Disconnected)); +            } +            Command::SubscriptionRequest(_jid, sender) => { +                sender.send(Err(WriteError::Disconnected)); +            } +            Command::AcceptBuddyRequest(_jid, sender) => { +                sender.send(Err(WriteError::Disconnected)); +            } +            Command::AcceptSubscriptionRequest(_jid, sender) => { +                sender.send(Err(WriteError::Disconnected)); +            } +            Command::UnsubscribeFromContact(_jid, sender) => { +                sender.send(Err(WriteError::Disconnected)); +            } +            Command::UnsubscribeContact(_jid, sender) => { +                sender.send(Err(WriteError::Disconnected)); +            } +            Command::UnfriendContact(_jid, sender) => { +                sender.send(Err(WriteError::Disconnected)); +            } +            Command::DeleteContact(_jid, sender) => { +                sender.send(Err(RosterError::Write(WriteError::Disconnected))); +            } +            Command::UpdateContact(_jid, _contact_update, sender) => { +                sender.send(Err(RosterError::Write(WriteError::Disconnected))); +            } +            Command::SetStatus(online, sender) => { +                let result = self +                    .db +                    .upsert_cached_status(online) +                    .await +                    .map_err(|e| StatusError::Cache(e.into())); +                sender.send(result); +            } +            // TODO: offline message queue +            Command::SendMessage(_jid, _body, sender) => { +                sender.send(Err(WriteError::Disconnected)); +            } +            Command::SendPresence(_jid, _presence, sender) => { +                sender.send(Err(WriteError::Disconnected)); +            } +        } +    } +    // pub async fn handle_stream_error(self, error) {} +    // stanza errors (recoverable) +    // pub async fn handle_error(self, error: Error) {} +    // when it aborts, must clear iq map no matter what +    async fn on_abort(self) { +        let mut iqs = self.pending.lock().await; +        for (_id, sender) in iqs.drain() { +            let _ = sender.send(Err(ReadError::LostConnection)); +        } +    } + +    async fn handle_connection_error(self, error: ConnectionError) { +        self.update_sender +            .send(UpdateMessage::Error( +                ConnectionError::AlreadyConnected.into(), +            )) +            .await; +    } +} + +impl From<Command> for CoreClientCommand<Command> { +    fn from(value: Command) -> Self { +        CoreClientCommand::Command(value) +    } +} + +#[derive(Debug, Clone)] +pub enum UpdateMessage { +    Error(Error), +    Online(Online, Vec<Contact>), +    Offline(Offline), +    /// received roster from jabber server (replace full app roster state with this) +    /// is this needed? +    FullRoster(Vec<Contact>), +    /// (only update app roster state, don't replace) +    RosterUpdate(Contact), +    RosterDelete(JID), +    /// presences should be stored with users in the ui, not contacts, as presences can be received from anyone +    Presence { +        from: JID, +        presence: Presence, +    }, +    // TODO: receipts +    // MessageDispatched(Uuid), +    Message { +        to: JID, +        message: Message, +    }, +    SubscriptionRequest(jid::JID), +} diff --git a/luz/src/presence.rs b/filamento/src/presence.rs index e35761c..e35761c 100644 --- a/luz/src/presence.rs +++ b/filamento/src/presence.rs diff --git a/luz/src/roster.rs b/filamento/src/roster.rs index 43c32f5..43c32f5 100644 --- a/luz/src/roster.rs +++ b/filamento/src/roster.rs diff --git a/luz/src/user.rs b/filamento/src/user.rs index 9914d14..9914d14 100644 --- a/luz/src/user.rs +++ b/filamento/src/user.rs | 
