aboutsummaryrefslogtreecommitdiffstats
path: root/filamento/src
diff options
context:
space:
mode:
Diffstat (limited to 'filamento/src')
-rw-r--r--filamento/src/chat.rs14
-rw-r--r--filamento/src/db.rs41
-rw-r--r--filamento/src/error.rs84
-rw-r--r--filamento/src/lib.rs24
-rw-r--r--filamento/src/logic/local_only.rs7
-rw-r--r--filamento/src/logic/offline.rs8
-rw-r--r--filamento/src/logic/online.rs8
-rw-r--r--filamento/src/logic/process_stanza.rs10
-rw-r--r--filamento/src/roster.rs4
-rw-r--r--filamento/src/user.rs3
10 files changed, 177 insertions, 26 deletions
diff --git a/filamento/src/chat.rs b/filamento/src/chat.rs
index 2aa2282..936613e 100644
--- a/filamento/src/chat.rs
+++ b/filamento/src/chat.rs
@@ -1,12 +1,13 @@
use chrono::{DateTime, Utc};
use jid::JID;
use rusqlite::{
- ToSql,
types::{FromSql, ToSqlOutput, Value},
+ ToSql,
};
use uuid::Uuid;
-#[derive(Debug, Clone)]
+#[derive(Debug, Clone, PartialEq, Eq)]
+#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize))]
pub struct Message {
pub id: Uuid,
// does not contain full user information
@@ -20,7 +21,8 @@ pub struct Message {
pub body: Body,
}
-#[derive(Debug, Clone, Copy)]
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize))]
pub enum Delivery {
Sending,
Written,
@@ -67,13 +69,15 @@ impl FromSql for Delivery {
// Outside,
// }
-#[derive(Debug, Clone)]
+#[derive(Debug, Clone, PartialEq, Eq)]
+#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize))]
pub struct Body {
// TODO: rich text, other contents, threads
pub body: String,
}
-#[derive(Debug, Clone)]
+#[derive(Debug, Clone, PartialEq, Eq)]
+#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize))]
pub struct Chat {
pub correspondent: JID,
pub have_chatted: bool,
diff --git a/filamento/src/db.rs b/filamento/src/db.rs
index 36ce7bf..d2c24a2 100644
--- a/filamento/src/db.rs
+++ b/filamento/src/db.rs
@@ -14,7 +14,7 @@ use crate::{
user::User,
};
-#[derive(Clone)]
+#[derive(Clone, Debug)]
pub struct Db {
db: Arc<Mutex<rusqlite::Connection>>,
}
@@ -84,6 +84,7 @@ impl Db {
Ok(())
}
+ // TODO: this is not a 'read' user
pub(crate) async fn read_user(&self, user: JID) -> Result<User, Error> {
let db = self.db().await;
let user_opt = db
@@ -556,6 +557,7 @@ impl Db {
Ok(chats)
}
+ #[tracing::instrument]
async fn read_chat_id(&self, chat: JID) -> Result<Uuid, Error> {
let chat_id = self.db().await.query_row(
"select id from chats where correspondent = ?1",
@@ -579,6 +581,7 @@ impl Db {
}
/// if the chat doesn't already exist, it must be created by calling create_chat() before running this function.
+ #[tracing::instrument]
pub(crate) async fn create_message(
&self,
message: Message,
@@ -587,6 +590,7 @@ impl Db {
) -> Result<(), Error> {
let from_jid = from.as_bare();
let chat_id = self.read_chat_id(chat).await?;
+ tracing::debug!("creating message");
self.db().await.execute("insert into messages (id, body, chat_id, from_jid, from_resource, timestamp, delivery) values (?1, ?2, ?3, ?4, ?5, ?6, ?7)", (&message.id, &message.body.body, &chat_id, &from_jid, &from.resourcepart, &message.timestamp, &message.delivery))?;
Ok(())
}
@@ -596,7 +600,7 @@ impl Db {
let db = self.db().await;
db.execute(
"insert into users (jid) values (?1) on conflict do nothing",
- [&chat],
+ [&bare_chat],
)?;
let id = Uuid::new_v4();
db.execute("insert into chats (id, correspondent, have_chatted) values (?1, ?2, ?3) on conflict do nothing", (id, &bare_chat, false))?;
@@ -614,6 +618,7 @@ impl Db {
}
/// create direct message from incoming. MUST upsert chat and user
+ #[tracing::instrument]
pub(crate) async fn create_message_with_user_resource(
&self,
message: Message,
@@ -624,6 +629,8 @@ impl Db {
from: JID,
) -> Result<(), Error> {
let from_jid = from.as_bare();
+ let chat = chat.as_bare();
+ tracing::debug!("creating resource");
if let Some(resource) = &from.resourcepart {
self.db().await.execute(
"insert into resources (bare_jid, resource) values (?1, ?2) on conflict do nothing",
@@ -634,6 +641,18 @@ impl Db {
Ok(())
}
+ pub(crate) async fn update_message_delivery(
+ &self,
+ message: Uuid,
+ delivery: Delivery,
+ ) -> Result<(), Error> {
+ self.db().await.execute(
+ "update messages set delivery = ?1 where id = ?2",
+ (delivery, message),
+ )?;
+ Ok(())
+ }
+
// pub(crate) async fn read_message(&self, message: Uuid) -> Result<Message, Error> {
// Ok(Message {
// id: Uuid,
@@ -653,6 +672,24 @@ impl Db {
Ok(())
}
+ pub(crate) async fn read_message(&self, message: Uuid) -> Result<Message, Error> {
+ let message = self.db().await.query_row(
+ "select id, from_jid, delivery, timestamp, body from messages where id = ?1",
+ [&message],
+ |row| {
+ Ok(Message {
+ id: row.get(0)?,
+ // TODO: full from
+ from: row.get(1)?,
+ delivery: row.get(2)?,
+ timestamp: row.get(3)?,
+ body: Body { body: row.get(4)? },
+ })
+ },
+ )?;
+ Ok(message)
+ }
+
// TODO: paging
pub(crate) async fn read_message_history(&self, chat: JID) -> Result<Vec<Message>, Error> {
let chat_id = self.read_chat_id(chat).await?;
diff --git a/filamento/src/error.rs b/filamento/src/error.rs
index 6b7d0ae..bf28160 100644
--- a/filamento/src/error.rs
+++ b/filamento/src/error.rs
@@ -3,11 +3,12 @@ use std::{num::TryFromIntError, string::FromUtf8Error, sync::Arc};
use base64::DecodeError;
use image::ImageError;
use jid::JID;
-use lampada::error::{ActorError, ConnectionError, ReadError, WriteError};
-use stanza::client::{Stanza, iq::Query};
+use lampada::error::{ActorError, ReadError, WriteError};
+use stanza::client::{iq::Query, Stanza};
use thiserror::Error;
pub use lampada::error::CommandError;
+pub use lampada::error::ConnectionError;
use crate::files::FileStore;
@@ -166,11 +167,86 @@ pub enum ResponseError {
#[derive(Debug, Error, Clone)]
#[error("database error: {0}")]
-pub struct DatabaseError(pub Arc<rusqlite::Error>);
+pub struct DatabaseError(pub Serializeable<Arc<rusqlite::Error>>);
+
+pub enum Serializeable<T> {
+ String(String),
+ Unserialized(T),
+}
+
+impl<T: std::fmt::Display> std::fmt::Display for Serializeable<T> {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ match &self {
+ Serializeable::String(s) => s.fmt(f),
+ Serializeable::Unserialized(t) => t.fmt(f),
+ }
+ }
+}
+
+impl<T: std::fmt::Debug> std::fmt::Debug for Serializeable<T> {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ match &self {
+ Serializeable::String(s) => s.fmt(f),
+ Serializeable::Unserialized(t) => t.fmt(f),
+ }
+ }
+}
+
+impl<T: Clone> Clone for Serializeable<T> {
+ fn clone(&self) -> Self {
+ match self {
+ Serializeable::String(s) => Self::String(s.clone()),
+ Serializeable::Unserialized(t) => Self::Unserialized(t.clone()),
+ }
+ }
+}
+
+#[cfg(feature = "serde")]
+struct StringVisitor;
+
+#[cfg(feature = "serde")]
+impl<'de> serde::de::Visitor<'de> for StringVisitor {
+ type Value = String;
+
+ fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
+ formatter.write_str("a string")
+ }
+
+ fn visit_string<E>(self, v: String) -> Result<Self::Value, E>
+ where
+ E: serde::de::Error,
+ {
+ Ok(v)
+ }
+}
+
+#[cfg(feature = "serde")]
+impl<'de> serde::Deserialize<'de> for DatabaseError {
+ fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
+ where
+ D: serde::Deserializer<'de>,
+ {
+ let string = deserializer.deserialize_string(StringVisitor)?;
+ Ok(Self(Serializeable::String(string)))
+ }
+}
+
+#[cfg(feature = "serde")]
+impl serde::Serialize for DatabaseError {
+ fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
+ where
+ S: serde::Serializer,
+ {
+ match &self.0 {
+ Serializeable::String(s) => serializer.serialize_str(s),
+ Serializeable::Unserialized(u) => serializer.serialize_str(&u.to_string()),
+ }
+ }
+}
impl From<rusqlite::Error> for DatabaseError {
fn from(e: rusqlite::Error) -> Self {
- Self(Arc::new(e))
+ Self(Serializeable::Unserialized(Arc::new(e)))
}
}
diff --git a/filamento/src/lib.rs b/filamento/src/lib.rs
index e06f7c6..18ceb9d 100644
--- a/filamento/src/lib.rs
+++ b/filamento/src/lib.rs
@@ -70,6 +70,7 @@ pub enum Command<Fs: FileStore> {
/// get a specific chat by jid
GetChat(JID, oneshot::Sender<Result<Chat, DatabaseError>>),
/// get message history for chat (does appropriate mam things)
+ GetMessage(Uuid, oneshot::Sender<Result<Message, DatabaseError>>),
// TODO: paging and filtering
GetMessages(JID, oneshot::Sender<Result<Vec<Message>, DatabaseError>>),
/// get message history for chat (does appropriate mam things)
@@ -268,8 +269,15 @@ impl<Fs: FileStore + Clone + Send + Sync + 'static> Client<Fs> {
}
impl<Fs: FileStore> Client<Fs> {
- pub async fn connect(&self) -> Result<(), ActorError> {
- self.send(CoreClientCommand::Connect).await?;
+ pub async fn connect(&self) -> Result<(), CommandError<ConnectionError>> {
+ let (send, recv) = oneshot::channel::<Result<(), ConnectionError>>();
+ self.send(CoreClientCommand::Connect(send))
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
+ let result = timeout(self.timeout, recv)
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
Ok(())
}
@@ -374,6 +382,18 @@ impl<Fs: FileStore> Client<Fs> {
Ok(chat)
}
+ pub async fn get_message(&self, id: Uuid) -> Result<Message, CommandError<DatabaseError>> {
+ let (send, recv) = oneshot::channel();
+ self.send(CoreClientCommand::Command(Command::GetMessage(id, send)))
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
+ let message = timeout(self.timeout, recv)
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
+ Ok(message)
+ }
+
pub async fn get_messages(
&self,
jid: JID,
diff --git a/filamento/src/logic/local_only.rs b/filamento/src/logic/local_only.rs
index dc94d2c..a22efbd 100644
--- a/filamento/src/logic/local_only.rs
+++ b/filamento/src/logic/local_only.rs
@@ -44,6 +44,13 @@ pub async fn handle_get_chat<Fs: FileStore + Clone>(
Ok(logic.db().read_chat(jid).await?)
}
+pub async fn handle_get_message<Fs: FileStore + Clone>(
+ logic: &ClientLogic<Fs>,
+ id: Uuid,
+) -> Result<Message, DatabaseError> {
+ Ok(logic.db().read_message(id).await?)
+}
+
pub async fn handle_get_messages<Fs: FileStore + Clone>(
logic: &ClientLogic<Fs>,
jid: JID,
diff --git a/filamento/src/logic/offline.rs b/filamento/src/logic/offline.rs
index 55e3d4a..42a38ca 100644
--- a/filamento/src/logic/offline.rs
+++ b/filamento/src/logic/offline.rs
@@ -23,8 +23,8 @@ use super::{
local_only::{
handle_delete_chat, handle_delete_messaage, handle_get_chat, handle_get_chats,
handle_get_chats_ordered, handle_get_chats_ordered_with_latest_messages,
- handle_get_chats_ordered_with_latest_messages_and_users, handle_get_messages,
- handle_get_messages_with_users, handle_get_user,
+ handle_get_chats_ordered_with_latest_messages_and_users, handle_get_message,
+ handle_get_messages, handle_get_messages_with_users, handle_get_user,
},
};
@@ -89,6 +89,10 @@ pub async fn handle_offline_result<Fs: FileStore + Clone>(
let chats = handle_get_chat(logic, jid).await;
sender.send(chats);
}
+ Command::GetMessage(id, sender) => {
+ let message = handle_get_message(logic, id).await;
+ let _ = sender.send(message);
+ }
Command::GetMessages(jid, sender) => {
let messages = handle_get_messages(logic, jid).await;
sender.send(messages);
diff --git a/filamento/src/logic/online.rs b/filamento/src/logic/online.rs
index c0c3a7f..3936584 100644
--- a/filamento/src/logic/online.rs
+++ b/filamento/src/logic/online.rs
@@ -23,6 +23,7 @@ use crate::{
use super::{
local_only::{
+ handle_get_message,
handle_delete_chat, handle_delete_messaage, handle_get_chat, handle_get_chats, handle_get_chats_ordered, handle_get_chats_ordered_with_latest_messages, handle_get_chats_ordered_with_latest_messages_and_users, handle_get_messages, handle_get_messages_with_users, handle_get_user
}, ClientLogic
};
@@ -583,6 +584,9 @@ pub async fn handle_send_message<Fs: FileStore + Clone>(logic: &ClientLogic<Fs>,
match result {
Ok(_) => {
info!("sent message: {:?}", message_stanza);
+ if let Err(e) = logic.db().update_message_delivery(id, Delivery::Written).await {
+ error!("updating message delivery: {}", e);
+ }
logic
.update_sender()
.send(UpdateMessage::MessageDelivery {
@@ -1107,6 +1111,10 @@ pub async fn handle_online_result<Fs: FileStore + Clone>(
let chat = handle_get_chat(logic, jid).await;
let _ = sender.send(chat);
}
+ Command::GetMessage(id, sender) => {
+ let message = handle_get_message(logic, id).await;
+ let _ = sender.send(message);
+ }
Command::GetMessages(jid, sender) => {
let messages = handle_get_messages(logic, jid).await;
let _ = sender.send(messages);
diff --git a/filamento/src/logic/process_stanza.rs b/filamento/src/logic/process_stanza.rs
index cdaff97..a5d40b2 100644
--- a/filamento/src/logic/process_stanza.rs
+++ b/filamento/src/logic/process_stanza.rs
@@ -90,17 +90,11 @@ pub async fn recv_message<Fs: FileStore + Clone>(
)
.await
{
- logic
- .handle_error(Error::MessageRecv(MessageRecvError::MessageHistory(e)))
- .await;
- error!("failed to upsert chat and user")
+ error!("failed to create message: {}", e);
}
}
Err(e) => {
- logic
- .handle_error(Error::MessageRecv(MessageRecvError::MessageHistory(e)))
- .await;
- error!("failed to upsert chat and user")
+ error!("failed to upsert chat and user: {}", e);
}
};
diff --git a/filamento/src/roster.rs b/filamento/src/roster.rs
index 99682b1..8f77086 100644
--- a/filamento/src/roster.rs
+++ b/filamento/src/roster.rs
@@ -11,7 +11,7 @@ pub struct ContactUpdate {
pub groups: HashSet<String>,
}
-#[derive(Debug, Clone)]
+#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Contact {
// jid is the id used to reference everything, but not the primary key
pub user_jid: JID,
@@ -24,7 +24,7 @@ pub struct Contact {
pub groups: HashSet<String>,
}
-#[derive(Debug, Clone)]
+#[derive(Debug, Clone, PartialEq, Eq)]
pub enum Subscription {
None,
PendingOut,
diff --git a/filamento/src/user.rs b/filamento/src/user.rs
index b2ea8e4..f19a4ad 100644
--- a/filamento/src/user.rs
+++ b/filamento/src/user.rs
@@ -1,6 +1,7 @@
use jid::JID;
-#[derive(Debug, Clone)]
+#[derive(Debug, Clone, PartialEq, Eq)]
+#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize))]
pub struct User {
pub jid: JID,
pub nick: Option<String>,