aboutsummaryrefslogtreecommitdiffstats
path: root/filamento/src/db.rs
diff options
context:
space:
mode:
Diffstat (limited to 'filamento/src/db.rs')
-rw-r--r--filamento/src/db.rs406
1 files changed, 367 insertions, 39 deletions
diff --git a/filamento/src/db.rs b/filamento/src/db.rs
index c19f16c..d9206cc 100644
--- a/filamento/src/db.rs
+++ b/filamento/src/db.rs
@@ -1,12 +1,12 @@
use std::{collections::HashSet, path::Path};
-use chrono::Utc;
+use chrono::{DateTime, Utc};
use jid::JID;
use sqlx::{SqlitePool, migrate};
use uuid::Uuid;
use crate::{
- chat::{Chat, Message},
+ chat::{Body, Chat, Delivery, Message},
error::{DatabaseError as Error, DatabaseOpenError},
presence::Online,
roster::Contact,
@@ -51,10 +51,9 @@ impl Db {
pub(crate) async fn create_user(&self, user: User) -> Result<(), Error> {
sqlx::query!(
- "insert into users ( jid, nick, cached_status_message ) values ( ?, ?, ? )",
+ "insert into users ( jid, nick ) values ( ?, ? )",
user.jid,
user.nick,
- user.cached_status_message
)
.execute(&self.db)
.await?;
@@ -75,22 +74,116 @@ impl Db {
Ok(user)
}
- pub(crate) async fn upsert_user_nick(&self, jid: JID, nick: String) -> Result<(), Error> {
- sqlx::query!(
- "insert into users (jid, nick) values (?, ?) on conflict do update set nick = ?",
+ /// returns whether or not the nickname was updated
+ pub(crate) async fn delete_user_nick(&self, jid: JID) -> Result<bool, Error> {
+ if sqlx::query!(
+ "insert into users (jid, nick) values (?, ?) on conflict do update set nick = ? where nick is not ?",
+ jid,
+ None::<String>,
+ None::<String>,
+ None::<String>,
+ )
+ .execute(&self.db)
+ .await?
+ .rows_affected()
+ > 0
+ {
+ Ok(true)
+ } else {
+ Ok(false)
+ }
+ }
+
+ /// returns whether or not the nickname was updated
+ pub(crate) async fn upsert_user_nick(&self, jid: JID, nick: String) -> Result<bool, Error> {
+ let rows_affected = sqlx::query!(
+ "insert into users (jid, nick) values (?, ?) on conflict do update set nick = ? where nick is not ?",
jid,
nick,
+ nick,
nick
)
.execute(&self.db)
- .await?;
- Ok(())
+ .await?
+ .rows_affected();
+ tracing::debug!("rows affected: {}", rows_affected);
+ if rows_affected > 0 {
+ Ok(true)
+ } else {
+ Ok(false)
+ }
+ }
+
+ /// returns whether or not the avatar was updated, and the file to delete if there existed an old avatar
+ pub(crate) async fn delete_user_avatar(
+ &self,
+ jid: JID,
+ ) -> Result<(bool, Option<String>), Error> {
+ #[derive(sqlx::FromRow)]
+ struct AvatarRow {
+ avatar: Option<String>,
+ }
+ let old_avatar: Option<String> = sqlx::query_as("select avatar from users where jid = ?")
+ .bind(jid.clone())
+ .fetch_optional(&self.db)
+ .await?
+ .map(|row: AvatarRow| row.avatar)
+ .unwrap_or(None);
+ if sqlx::query!(
+ "insert into users (jid, avatar) values (?, ?) on conflict do update set avatar = ? where avatar is not ?",
+ jid,
+ None::<String>,
+ None::<String>,
+ None::<String>,
+ )
+ .execute(&self.db)
+ .await?
+ .rows_affected()
+ > 0
+ {
+ Ok((true, old_avatar))
+ } else {
+ Ok((false, old_avatar))
+ }
+ }
+
+ /// returns whether or not the avatar was updated, and the file to delete if there existed an old avatar
+ pub(crate) async fn upsert_user_avatar(
+ &self,
+ jid: JID,
+ avatar: String,
+ ) -> Result<(bool, Option<String>), Error> {
+ #[derive(sqlx::FromRow)]
+ struct AvatarRow {
+ avatar: Option<String>,
+ }
+ let old_avatar: Option<String> = sqlx::query_as("select avatar from users where jid = ?")
+ .bind(jid.clone())
+ .fetch_optional(&self.db)
+ .await?
+ .map(|row: AvatarRow| row.avatar)
+ .unwrap_or(None);
+ if sqlx::query!(
+ "insert into users (jid, avatar) values (?, ?) on conflict do update set avatar = ? where avatar is not ?",
+ jid,
+ avatar,
+ avatar,
+ avatar,
+ )
+ .execute(&self.db)
+ .await?
+ .rows_affected()
+ > 0
+ {
+ Ok((true, old_avatar))
+ } else {
+ Ok((false, old_avatar))
+ }
}
pub(crate) async fn update_user(&self, user: User) -> Result<(), Error> {
sqlx::query!(
- "update users set cached_status_message = ?, nick = ? where jid = ?",
- user.cached_status_message,
+ "update users set nick = ? where jid = ?",
user.nick,
user.jid
)
@@ -266,10 +359,9 @@ impl Db {
}
pub(crate) async fn read_cached_roster(&self) -> Result<Vec<Contact>, Error> {
- let mut roster: Vec<Contact> =
- sqlx::query_as("select * from roster join users on jid = user_jid")
- .fetch_all(&self.db)
- .await?;
+ let mut roster: Vec<Contact> = sqlx::query_as("select * from roster")
+ .fetch_all(&self.db)
+ .await?;
for contact in &mut roster {
#[derive(sqlx::FromRow)]
struct Row {
@@ -285,13 +377,47 @@ impl Db {
Ok(roster)
}
+ pub(crate) async fn read_cached_roster_with_users(
+ &self,
+ ) -> Result<Vec<(Contact, User)>, Error> {
+ #[derive(sqlx::FromRow)]
+ struct Row {
+ #[sqlx(flatten)]
+ contact: Contact,
+ #[sqlx(flatten)]
+ user: User,
+ }
+ let mut roster: Vec<Row> =
+ sqlx::query_as("select * from roster join users on jid = user_jid")
+ .fetch_all(&self.db)
+ .await?;
+ for row in &mut roster {
+ #[derive(sqlx::FromRow)]
+ struct Row {
+ group_name: String,
+ }
+ let groups: Vec<Row> =
+ sqlx::query_as("select group_name from groups_roster where contact_jid = ?")
+ .bind(&row.contact.user_jid)
+ .fetch_all(&self.db)
+ .await?;
+ row.contact.groups = HashSet::from_iter(groups.into_iter().map(|row| row.group_name));
+ }
+ let roster = roster
+ .into_iter()
+ .map(|row| (row.contact, row.user))
+ .collect();
+ Ok(roster)
+ }
+
pub(crate) async fn create_chat(&self, chat: Chat) -> Result<(), Error> {
let id = Uuid::new_v4();
let jid = chat.correspondent();
sqlx::query!(
- "insert into chats (id, correspondent) values (?, ?)",
+ "insert into chats (id, correspondent, have_chatted) values (?, ?, ?)",
id,
- jid
+ jid,
+ chat.have_chatted,
)
.execute(&self.db)
.await?;
@@ -371,21 +497,197 @@ impl Db {
&self,
) -> Result<Vec<(Chat, Message)>, Error> {
#[derive(sqlx::FromRow)]
+ pub struct RowChat {
+ chat_correspondent: JID,
+ chat_have_chatted: bool,
+ }
+ impl From<RowChat> for Chat {
+ fn from(value: RowChat) -> Self {
+ Self {
+ correspondent: value.chat_correspondent,
+ have_chatted: value.chat_have_chatted,
+ }
+ }
+ }
+ #[derive(sqlx::FromRow)]
+ pub struct RowMessage {
+ message_id: Uuid,
+ message_body: String,
+ message_delivery: Option<Delivery>,
+ message_timestamp: DateTime<Utc>,
+ message_from_jid: JID,
+ }
+ impl From<RowMessage> for Message {
+ fn from(value: RowMessage) -> Self {
+ Self {
+ id: value.message_id,
+ from: value.message_from_jid,
+ delivery: value.message_delivery,
+ timestamp: value.message_timestamp,
+ body: Body {
+ body: value.message_body,
+ },
+ }
+ }
+ }
+
+ #[derive(sqlx::FromRow)]
+ pub struct ChatWithMessageRow {
+ #[sqlx(flatten)]
+ pub chat: RowChat,
+ #[sqlx(flatten)]
+ pub message: RowMessage,
+ }
+
pub struct ChatWithMessage {
+ chat: Chat,
+ message: Message,
+ }
+
+ impl From<ChatWithMessageRow> for ChatWithMessage {
+ fn from(value: ChatWithMessageRow) -> Self {
+ Self {
+ chat: value.chat.into(),
+ message: value.message.into(),
+ }
+ }
+ }
+
+ // TODO: sometimes chats have no messages.
+ let chats: Vec<ChatWithMessageRow> = sqlx::query_as("select c.*, m.* from chats c join (select chat_id, max(timestamp) max_timestamp from messages group by chat_id) max_timestamps on c.id = max_timestamps.chat_id join messages m on max_timestamps.chat_id = m.chat_id and max_timestamps.max_timestamp = m.timestamp order by m.timestamp desc")
+ .fetch_all(&self.db)
+ .await?;
+
+ let chats = chats
+ .into_iter()
+ .map(|chat_with_message_row| {
+ let chat_with_message: ChatWithMessage = chat_with_message_row.into();
+ (chat_with_message.chat, chat_with_message.message)
+ })
+ .collect();
+
+ Ok(chats)
+ }
+
+ /// chats ordered by date of last message
+ // greatest-n-per-group
+ pub(crate) async fn read_chats_ordered_with_latest_messages_and_users(
+ &self,
+ ) -> Result<Vec<((Chat, User), (Message, User))>, Error> {
+ #[derive(sqlx::FromRow)]
+ pub struct RowChat {
+ chat_correspondent: JID,
+ chat_have_chatted: bool,
+ }
+ impl From<RowChat> for Chat {
+ fn from(value: RowChat) -> Self {
+ Self {
+ correspondent: value.chat_correspondent,
+ have_chatted: value.chat_have_chatted,
+ }
+ }
+ }
+ #[derive(sqlx::FromRow)]
+ pub struct RowMessage {
+ message_id: Uuid,
+ message_body: String,
+ message_delivery: Option<Delivery>,
+ message_timestamp: DateTime<Utc>,
+ message_from_jid: JID,
+ }
+ impl From<RowMessage> for Message {
+ fn from(value: RowMessage) -> Self {
+ Self {
+ id: value.message_id,
+ from: value.message_from_jid,
+ delivery: value.message_delivery,
+ timestamp: value.message_timestamp,
+ body: Body {
+ body: value.message_body,
+ },
+ }
+ }
+ }
+ #[derive(sqlx::FromRow)]
+ pub struct RowChatUser {
+ chat_user_jid: JID,
+ chat_user_nick: Option<String>,
+ chat_user_avatar: Option<String>,
+ }
+ impl From<RowChatUser> for User {
+ fn from(value: RowChatUser) -> Self {
+ Self {
+ jid: value.chat_user_jid,
+ nick: value.chat_user_nick,
+ avatar: value.chat_user_avatar,
+ }
+ }
+ }
+ #[derive(sqlx::FromRow)]
+ pub struct RowMessageUser {
+ message_user_jid: JID,
+ message_user_nick: Option<String>,
+ message_user_avatar: Option<String>,
+ }
+ impl From<RowMessageUser> for User {
+ fn from(value: RowMessageUser) -> Self {
+ Self {
+ jid: value.message_user_jid,
+ nick: value.message_user_nick,
+ avatar: value.message_user_avatar,
+ }
+ }
+ }
+ #[derive(sqlx::FromRow)]
+ pub struct ChatWithMessageAndUsersRow {
+ #[sqlx(flatten)]
+ pub chat: RowChat,
#[sqlx(flatten)]
- pub chat: Chat,
+ pub chat_user: RowChatUser,
#[sqlx(flatten)]
- pub message: Message,
+ pub message: RowMessage,
+ #[sqlx(flatten)]
+ pub message_user: RowMessageUser,
+ }
+
+ impl From<ChatWithMessageAndUsersRow> for ChatWithMessageAndUsers {
+ fn from(value: ChatWithMessageAndUsersRow) -> Self {
+ Self {
+ chat: value.chat.into(),
+ chat_user: value.chat_user.into(),
+ message: value.message.into(),
+ message_user: value.message_user.into(),
+ }
+ }
}
- // TODO: i don't know if this will assign the right uuid to the latest message or the chat's id. should probably check but i don't think it matters as nothing ever gets called with the id of the latest message in the chats list
- let chats: Vec<ChatWithMessage> = sqlx::query_as("select c.*, m.* from chats c join (select chat_id, max(timestamp) max_timestamp from messages group by chat_id) max_timestamps on c.id = max_timestamps.chat_id join messages m on max_timestamps.chat_id = m.chat_id and max_timestamps.max_timestamp = m.timestamp order by m.timestamp desc")
+ pub struct ChatWithMessageAndUsers {
+ chat: Chat,
+ chat_user: User,
+ message: Message,
+ message_user: User,
+ }
+
+ let chats: Vec<ChatWithMessageAndUsersRow> = sqlx::query_as("select c.id as chat_id, c.correspondent as chat_correspondent, c.have_chatted as chat_have_chatted, m.id as message_id, m.body as message_body, m.delivery as message_delivery, m.timestamp as message_timestamp, m.from_jid as message_from_jid, cu.jid as chat_user_jid, cu.nick as chat_user_nick, cu.avatar as chat_user_avatar, mu.jid as message_user_jid, mu.nick as message_user_nick, mu.avatar as message_user_avatar from chats c join (select chat_id, max(timestamp) max_timestamp from messages group by chat_id) max_timestamps on c.id = max_timestamps.chat_id join messages m on max_timestamps.chat_id = m.chat_id and max_timestamps.max_timestamp = m.timestamp join users as cu on cu.jid = c.correspondent join users as mu on mu.jid = m.from_jid order by m.timestamp desc")
.fetch_all(&self.db)
.await?;
let chats = chats
.into_iter()
- .map(|chat_with_message| (chat_with_message.chat, chat_with_message.message))
+ .map(|chat_with_message_and_users_row| {
+ let chat_with_message_and_users: ChatWithMessageAndUsers =
+ chat_with_message_and_users_row.into();
+ (
+ (
+ chat_with_message_and_users.chat,
+ chat_with_message_and_users.chat_user,
+ ),
+ (
+ chat_with_message_and_users.message,
+ chat_with_message_and_users.message_user,
+ ),
+ )
+ })
.collect();
Ok(chats)
@@ -441,12 +743,14 @@ impl Db {
.execute(&self.db)
.await?;
let id = Uuid::new_v4();
- let chat: Chat = sqlx::query_as("insert into chats (id, correspondent, have_chatted) values (?, ?, ?) on conflict do nothing returning *")
+ let chat: Chat = sqlx::query_as("insert into chats (id, correspondent, have_chatted) values (?, ?, ?) on conflict do nothing; select * from chats where correspondent = ?")
.bind(id)
- .bind(bare_chat)
+ .bind(bare_chat.clone())
.bind(false)
+ .bind(bare_chat)
.fetch_one(&self.db)
.await?;
+ tracing::debug!("CHECKING chat: {:?}", chat);
Ok(chat.have_chatted)
}
@@ -472,7 +776,7 @@ impl Db {
Ok(())
}
- // create direct message from incoming
+ /// create direct message from incoming. MUST upsert chat and user
pub(crate) async fn create_message_with_user_resource(
&self,
message: Message,
@@ -482,20 +786,20 @@ impl Db {
) -> Result<(), Error> {
let bare_chat = chat.as_bare();
let resource = &chat.resourcepart;
- sqlx::query!(
- "insert into users (jid) values (?) on conflict do nothing",
- bare_chat
- )
- .execute(&self.db)
- .await?;
- let id = Uuid::new_v4();
- sqlx::query!(
- "insert into chats (id, correspondent) values (?, ?) on conflict do nothing",
- id,
- bare_chat
- )
- .execute(&self.db)
- .await?;
+ // sqlx::query!(
+ // "insert into users (jid) values (?) on conflict do nothing",
+ // bare_chat
+ // )
+ // .execute(&self.db)
+ // .await?;
+ // let id = Uuid::new_v4();
+ // sqlx::query!(
+ // "insert into chats (id, correspondent) values (?, ?) on conflict do nothing",
+ // id,
+ // bare_chat
+ // )
+ // .execute(&self.db)
+ // .await?;
if let Some(resource) = resource {
sqlx::query!(
"insert into resources (bare_jid, resource) values (?, ?) on conflict do nothing",
@@ -537,6 +841,30 @@ impl Db {
Ok(messages)
}
+ pub(crate) async fn read_message_history_with_users(
+ &self,
+ chat: JID,
+ ) -> Result<Vec<(Message, User)>, Error> {
+ let chat_id = self.read_chat_id(chat).await?;
+ #[derive(sqlx::FromRow)]
+ pub struct Row {
+ #[sqlx(flatten)]
+ user: User,
+ #[sqlx(flatten)]
+ message: Message,
+ }
+ let messages: Vec<Row> =
+ sqlx::query_as("select * from messages join users on jid = from_jid where chat_id = ? order by timestamp asc")
+ .bind(chat_id)
+ .fetch_all(&self.db)
+ .await?;
+ let messages = messages
+ .into_iter()
+ .map(|row| (row.message, row.user))
+ .collect();
+ Ok(messages)
+ }
+
pub(crate) async fn read_cached_status(&self) -> Result<Online, Error> {
let online: Online = sqlx::query_as("select * from cached_status where id = 0")
.fetch_one(&self.db)