diff options
Diffstat (limited to 'filamento')
| -rw-r--r-- | filamento/Cargo.toml | 4 | ||||
| -rw-r--r-- | filamento/src/chat.rs | 14 | ||||
| -rw-r--r-- | filamento/src/db.rs | 41 | ||||
| -rw-r--r-- | filamento/src/error.rs | 84 | ||||
| -rw-r--r-- | filamento/src/lib.rs | 24 | ||||
| -rw-r--r-- | filamento/src/logic/local_only.rs | 7 | ||||
| -rw-r--r-- | filamento/src/logic/offline.rs | 8 | ||||
| -rw-r--r-- | filamento/src/logic/online.rs | 8 | ||||
| -rw-r--r-- | filamento/src/logic/process_stanza.rs | 10 | ||||
| -rw-r--r-- | filamento/src/roster.rs | 4 | ||||
| -rw-r--r-- | filamento/src/user.rs | 3 | 
11 files changed, 181 insertions, 26 deletions
| diff --git a/filamento/Cargo.toml b/filamento/Cargo.toml index 04fd90c..cc05288 100644 --- a/filamento/Cargo.toml +++ b/filamento/Cargo.toml @@ -3,6 +3,9 @@ name = "filamento"  version = "0.1.0"  edition = "2024" +[features] +serde = ["dep:serde", "jid/serde", "uuid/serde", "chrono/serde", "lampada/serde"] +  [dependencies]  futures = "0.3.30"  lampada = { version = "0.1.0", path = "../lampada" } @@ -21,6 +24,7 @@ base64 = "0.22.1"  sha1 = "0.10.6"  image = "0.25.6"  hex = "0.4.3" +serde = { version = "1.0.219", features = ["derive"], optional = true }  [target.'cfg(not(target_arch = "wasm32"))'.dependencies]  tokio = { workspace = true, features = ["sync", "time", "rt", "fs", "io-std"] } diff --git a/filamento/src/chat.rs b/filamento/src/chat.rs index 2aa2282..936613e 100644 --- a/filamento/src/chat.rs +++ b/filamento/src/chat.rs @@ -1,12 +1,13 @@  use chrono::{DateTime, Utc};  use jid::JID;  use rusqlite::{ -    ToSql,      types::{FromSql, ToSqlOutput, Value}, +    ToSql,  };  use uuid::Uuid; -#[derive(Debug, Clone)] +#[derive(Debug, Clone, PartialEq, Eq)] +#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize))]  pub struct Message {      pub id: Uuid,      // does not contain full user information @@ -20,7 +21,8 @@ pub struct Message {      pub body: Body,  } -#[derive(Debug, Clone, Copy)] +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize))]  pub enum Delivery {      Sending,      Written, @@ -67,13 +69,15 @@ impl FromSql for Delivery {  //     Outside,  // } -#[derive(Debug, Clone)] +#[derive(Debug, Clone, PartialEq, Eq)] +#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize))]  pub struct Body {      // TODO: rich text, other contents, threads      pub body: String,  } -#[derive(Debug, Clone)] +#[derive(Debug, Clone, PartialEq, Eq)] +#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize))]  pub struct Chat {      pub correspondent: JID,      pub have_chatted: bool, diff --git a/filamento/src/db.rs b/filamento/src/db.rs index 36ce7bf..d2c24a2 100644 --- a/filamento/src/db.rs +++ b/filamento/src/db.rs @@ -14,7 +14,7 @@ use crate::{      user::User,  }; -#[derive(Clone)] +#[derive(Clone, Debug)]  pub struct Db {      db: Arc<Mutex<rusqlite::Connection>>,  } @@ -84,6 +84,7 @@ impl Db {          Ok(())      } +    // TODO: this is not a 'read' user      pub(crate) async fn read_user(&self, user: JID) -> Result<User, Error> {          let db = self.db().await;          let user_opt = db @@ -556,6 +557,7 @@ impl Db {          Ok(chats)      } +    #[tracing::instrument]      async fn read_chat_id(&self, chat: JID) -> Result<Uuid, Error> {          let chat_id = self.db().await.query_row(              "select id from chats where correspondent = ?1", @@ -579,6 +581,7 @@ impl Db {      }      /// if the chat doesn't already exist, it must be created by calling create_chat() before running this function. +    #[tracing::instrument]      pub(crate) async fn create_message(          &self,          message: Message, @@ -587,6 +590,7 @@ impl Db {      ) -> Result<(), Error> {          let from_jid = from.as_bare();          let chat_id = self.read_chat_id(chat).await?; +        tracing::debug!("creating message");          self.db().await.execute("insert into messages (id, body, chat_id, from_jid, from_resource, timestamp, delivery) values (?1, ?2, ?3, ?4, ?5, ?6, ?7)", (&message.id, &message.body.body, &chat_id, &from_jid, &from.resourcepart, &message.timestamp, &message.delivery))?;          Ok(())      } @@ -596,7 +600,7 @@ impl Db {          let db = self.db().await;          db.execute(              "insert into users (jid) values (?1) on conflict do nothing", -            [&chat], +            [&bare_chat],          )?;          let id = Uuid::new_v4();          db.execute("insert into chats (id, correspondent, have_chatted) values (?1, ?2, ?3) on conflict do nothing", (id, &bare_chat, false))?; @@ -614,6 +618,7 @@ impl Db {      }      /// create direct message from incoming. MUST upsert chat and user +    #[tracing::instrument]      pub(crate) async fn create_message_with_user_resource(          &self,          message: Message, @@ -624,6 +629,8 @@ impl Db {          from: JID,      ) -> Result<(), Error> {          let from_jid = from.as_bare(); +        let chat = chat.as_bare(); +        tracing::debug!("creating resource");          if let Some(resource) = &from.resourcepart {              self.db().await.execute(                  "insert into resources (bare_jid, resource) values (?1, ?2) on conflict do nothing", @@ -634,6 +641,18 @@ impl Db {          Ok(())      } +    pub(crate) async fn update_message_delivery( +        &self, +        message: Uuid, +        delivery: Delivery, +    ) -> Result<(), Error> { +        self.db().await.execute( +            "update messages set delivery = ?1 where id = ?2", +            (delivery, message), +        )?; +        Ok(()) +    } +      // pub(crate) async fn read_message(&self, message: Uuid) -> Result<Message, Error> {      //     Ok(Message {      //         id: Uuid, @@ -653,6 +672,24 @@ impl Db {          Ok(())      } +    pub(crate) async fn read_message(&self, message: Uuid) -> Result<Message, Error> { +        let message = self.db().await.query_row( +            "select id, from_jid, delivery, timestamp, body from messages where id = ?1", +            [&message], +            |row| { +                Ok(Message { +                    id: row.get(0)?, +                    // TODO: full from +                    from: row.get(1)?, +                    delivery: row.get(2)?, +                    timestamp: row.get(3)?, +                    body: Body { body: row.get(4)? }, +                }) +            }, +        )?; +        Ok(message) +    } +      // TODO: paging      pub(crate) async fn read_message_history(&self, chat: JID) -> Result<Vec<Message>, Error> {          let chat_id = self.read_chat_id(chat).await?; diff --git a/filamento/src/error.rs b/filamento/src/error.rs index 6b7d0ae..bf28160 100644 --- a/filamento/src/error.rs +++ b/filamento/src/error.rs @@ -3,11 +3,12 @@ use std::{num::TryFromIntError, string::FromUtf8Error, sync::Arc};  use base64::DecodeError;  use image::ImageError;  use jid::JID; -use lampada::error::{ActorError, ConnectionError, ReadError, WriteError}; -use stanza::client::{Stanza, iq::Query}; +use lampada::error::{ActorError, ReadError, WriteError}; +use stanza::client::{iq::Query, Stanza};  use thiserror::Error;  pub use lampada::error::CommandError; +pub use lampada::error::ConnectionError;  use crate::files::FileStore; @@ -166,11 +167,86 @@ pub enum ResponseError {  #[derive(Debug, Error, Clone)]  #[error("database error: {0}")] -pub struct DatabaseError(pub Arc<rusqlite::Error>); +pub struct DatabaseError(pub Serializeable<Arc<rusqlite::Error>>); + +pub enum Serializeable<T> { +    String(String), +    Unserialized(T), +} + +impl<T: std::fmt::Display> std::fmt::Display for Serializeable<T> { +    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { +        match &self { +            Serializeable::String(s) => s.fmt(f), +            Serializeable::Unserialized(t) => t.fmt(f), +        } +    } +} + +impl<T: std::fmt::Debug> std::fmt::Debug for Serializeable<T> { +    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { +        match &self { +            Serializeable::String(s) => s.fmt(f), +            Serializeable::Unserialized(t) => t.fmt(f), +        } +    } +} + +impl<T: Clone> Clone for Serializeable<T> { +    fn clone(&self) -> Self { +        match self { +            Serializeable::String(s) => Self::String(s.clone()), +            Serializeable::Unserialized(t) => Self::Unserialized(t.clone()), +        } +    } +} + +#[cfg(feature = "serde")] +struct StringVisitor; + +#[cfg(feature = "serde")] +impl<'de> serde::de::Visitor<'de> for StringVisitor { +    type Value = String; + +    fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result { +        formatter.write_str("a string") +    } + +    fn visit_string<E>(self, v: String) -> Result<Self::Value, E> +    where +        E: serde::de::Error, +    { +        Ok(v) +    } +} + +#[cfg(feature = "serde")] +impl<'de> serde::Deserialize<'de> for DatabaseError { +    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error> +    where +        D: serde::Deserializer<'de>, +    { +        let string = deserializer.deserialize_string(StringVisitor)?; +        Ok(Self(Serializeable::String(string))) +    } +} + +#[cfg(feature = "serde")] +impl serde::Serialize for DatabaseError { +    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error> +    where +        S: serde::Serializer, +    { +        match &self.0 { +            Serializeable::String(s) => serializer.serialize_str(s), +            Serializeable::Unserialized(u) => serializer.serialize_str(&u.to_string()), +        } +    } +}  impl From<rusqlite::Error> for DatabaseError {      fn from(e: rusqlite::Error) -> Self { -        Self(Arc::new(e)) +        Self(Serializeable::Unserialized(Arc::new(e)))      }  } diff --git a/filamento/src/lib.rs b/filamento/src/lib.rs index e06f7c6..18ceb9d 100644 --- a/filamento/src/lib.rs +++ b/filamento/src/lib.rs @@ -70,6 +70,7 @@ pub enum Command<Fs: FileStore> {      /// get a specific chat by jid      GetChat(JID, oneshot::Sender<Result<Chat, DatabaseError>>),      /// get message history for chat (does appropriate mam things) +    GetMessage(Uuid, oneshot::Sender<Result<Message, DatabaseError>>),      // TODO: paging and filtering      GetMessages(JID, oneshot::Sender<Result<Vec<Message>, DatabaseError>>),      /// get message history for chat (does appropriate mam things) @@ -268,8 +269,15 @@ impl<Fs: FileStore + Clone + Send + Sync + 'static> Client<Fs> {  }  impl<Fs: FileStore> Client<Fs> { -    pub async fn connect(&self) -> Result<(), ActorError> { -        self.send(CoreClientCommand::Connect).await?; +    pub async fn connect(&self) -> Result<(), CommandError<ConnectionError>> { +        let (send, recv) = oneshot::channel::<Result<(), ConnectionError>>(); +        self.send(CoreClientCommand::Connect(send)) +            .await +            .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?; +        let result = timeout(self.timeout, recv) +            .await +            .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))? +            .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;          Ok(())      } @@ -374,6 +382,18 @@ impl<Fs: FileStore> Client<Fs> {          Ok(chat)      } +    pub async fn get_message(&self, id: Uuid) -> Result<Message, CommandError<DatabaseError>> { +        let (send, recv) = oneshot::channel(); +        self.send(CoreClientCommand::Command(Command::GetMessage(id, send))) +            .await +            .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?; +        let message = timeout(self.timeout, recv) +            .await +            .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))? +            .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??; +        Ok(message) +    } +      pub async fn get_messages(          &self,          jid: JID, diff --git a/filamento/src/logic/local_only.rs b/filamento/src/logic/local_only.rs index dc94d2c..a22efbd 100644 --- a/filamento/src/logic/local_only.rs +++ b/filamento/src/logic/local_only.rs @@ -44,6 +44,13 @@ pub async fn handle_get_chat<Fs: FileStore + Clone>(      Ok(logic.db().read_chat(jid).await?)  } +pub async fn handle_get_message<Fs: FileStore + Clone>( +    logic: &ClientLogic<Fs>, +    id: Uuid, +) -> Result<Message, DatabaseError> { +    Ok(logic.db().read_message(id).await?) +} +  pub async fn handle_get_messages<Fs: FileStore + Clone>(      logic: &ClientLogic<Fs>,      jid: JID, diff --git a/filamento/src/logic/offline.rs b/filamento/src/logic/offline.rs index 55e3d4a..42a38ca 100644 --- a/filamento/src/logic/offline.rs +++ b/filamento/src/logic/offline.rs @@ -23,8 +23,8 @@ use super::{      local_only::{          handle_delete_chat, handle_delete_messaage, handle_get_chat, handle_get_chats,          handle_get_chats_ordered, handle_get_chats_ordered_with_latest_messages, -        handle_get_chats_ordered_with_latest_messages_and_users, handle_get_messages, -        handle_get_messages_with_users, handle_get_user, +        handle_get_chats_ordered_with_latest_messages_and_users, handle_get_message, +        handle_get_messages, handle_get_messages_with_users, handle_get_user,      },  }; @@ -89,6 +89,10 @@ pub async fn handle_offline_result<Fs: FileStore + Clone>(              let chats = handle_get_chat(logic, jid).await;              sender.send(chats);          } +        Command::GetMessage(id, sender) => { +            let message = handle_get_message(logic, id).await; +            let _ = sender.send(message); +        }          Command::GetMessages(jid, sender) => {              let messages = handle_get_messages(logic, jid).await;              sender.send(messages); diff --git a/filamento/src/logic/online.rs b/filamento/src/logic/online.rs index c0c3a7f..3936584 100644 --- a/filamento/src/logic/online.rs +++ b/filamento/src/logic/online.rs @@ -23,6 +23,7 @@ use crate::{  use super::{      local_only::{ +        handle_get_message,          handle_delete_chat, handle_delete_messaage, handle_get_chat, handle_get_chats, handle_get_chats_ordered, handle_get_chats_ordered_with_latest_messages, handle_get_chats_ordered_with_latest_messages_and_users, handle_get_messages, handle_get_messages_with_users, handle_get_user      }, ClientLogic  }; @@ -583,6 +584,9 @@ pub async fn handle_send_message<Fs: FileStore + Clone>(logic: &ClientLogic<Fs>,      match result {          Ok(_) => {              info!("sent message: {:?}", message_stanza); +            if let Err(e) = logic.db().update_message_delivery(id, Delivery::Written).await { +                error!("updating message delivery: {}", e); +            }              logic                  .update_sender()                  .send(UpdateMessage::MessageDelivery { @@ -1107,6 +1111,10 @@ pub async fn handle_online_result<Fs: FileStore + Clone>(              let chat = handle_get_chat(logic, jid).await;              let _ = sender.send(chat);          } +        Command::GetMessage(id, sender) => { +            let message = handle_get_message(logic, id).await; +            let _ = sender.send(message); +        }          Command::GetMessages(jid, sender) => {              let messages = handle_get_messages(logic, jid).await;              let _ = sender.send(messages); diff --git a/filamento/src/logic/process_stanza.rs b/filamento/src/logic/process_stanza.rs index cdaff97..a5d40b2 100644 --- a/filamento/src/logic/process_stanza.rs +++ b/filamento/src/logic/process_stanza.rs @@ -90,17 +90,11 @@ pub async fn recv_message<Fs: FileStore + Clone>(                          )                          .await                      { -                        logic -                            .handle_error(Error::MessageRecv(MessageRecvError::MessageHistory(e))) -                            .await; -                        error!("failed to upsert chat and user") +                        error!("failed to create message: {}", e);                      }                  }                  Err(e) => { -                    logic -                        .handle_error(Error::MessageRecv(MessageRecvError::MessageHistory(e))) -                        .await; -                    error!("failed to upsert chat and user") +                    error!("failed to upsert chat and user: {}", e);                  }              }; diff --git a/filamento/src/roster.rs b/filamento/src/roster.rs index 99682b1..8f77086 100644 --- a/filamento/src/roster.rs +++ b/filamento/src/roster.rs @@ -11,7 +11,7 @@ pub struct ContactUpdate {      pub groups: HashSet<String>,  } -#[derive(Debug, Clone)] +#[derive(Debug, Clone, PartialEq, Eq)]  pub struct Contact {      // jid is the id used to reference everything, but not the primary key      pub user_jid: JID, @@ -24,7 +24,7 @@ pub struct Contact {      pub groups: HashSet<String>,  } -#[derive(Debug, Clone)] +#[derive(Debug, Clone, PartialEq, Eq)]  pub enum Subscription {      None,      PendingOut, diff --git a/filamento/src/user.rs b/filamento/src/user.rs index b2ea8e4..f19a4ad 100644 --- a/filamento/src/user.rs +++ b/filamento/src/user.rs @@ -1,6 +1,7 @@  use jid::JID; -#[derive(Debug, Clone)] +#[derive(Debug, Clone, PartialEq, Eq)] +#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize))]  pub struct User {      pub jid: JID,      pub nick: Option<String>, | 
