aboutsummaryrefslogblamecommitdiffstats
path: root/filamento/src/db.rs
blob: d9206cc049ecfdb95fb0903d73af02b01ae815b1 (plain) (tree)
1
2
3
4
5
6
7
8
9
                                            
 
                            
             
                                


               
                                          
                                                       
                     








                    
                        
         

























                                                       


                   
                                                                             
                     
                                                              
                     
                      





                          
                                                                            





                                                                            






                                                                            


                                                                                  
                                                                                                                 


                           
                           













                                                                                                

                                                                                                                 

                 
                 


                          
               


                                                            





















                                                                                                            
                                                                                                                       


                           
                           




























                                                                                                            
                                                                                                                       

                   

                   









                                   

     
                                                                             
                     
                                                      
                      







                                                                                                                                                                                                                                                              
                                                                                

                                                                                                                       
                                                                                      


























                                                                                     
                                                                                     
















                                                                                            
                                                                                                  




                                                                                             

















                                                                                                    
                                                                                      
































                                                                                    
                                                                                      








































                                                                                                                                          
                                                                                  






                                                                      
                                                                                                 

                                                                    
                                                



              
                                                                                  


                                                                             














                                                                                              
































                                                                                                  
                                                                             


                                       
                                                                                   
               

                              







                                                                                                                                                                                                                       
                                                                            







                                                                                                  










                                                                                     
                                                  

















                                                                                                             
                                      
 
                                                                            






                                                                       
                                                                       





                                                                    








                                                                                                                                                                                                                                                                                                                                    





                                                                



































                                                            
                            
                              
                            














                                                           

         
                                                  


















































































































                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                          




                                













                                                                          




                       




                                                                    
                                  





















                                                                                                                    





                                       
                          
                                      
                                                     
                                                                                                                                                                                                                                                


              
                                                                                        


                                                                        
                      



                                
                                                                                                                                                                                   
                     
                                    
                        
                            

                                
                                                     












                                                          







                                                                                                  
                                                        


              
                                                                      
                                                          


                         

                   
                            

                                          













                                                                                            








                                                                                                  
                                                        


              
                                                                                      






                                                                                    
                                                                                                                      
 
                                                                                   






                                                                  
                                                                                               
                                                     




                                                                                             

                    
 























                                                                                                                          
                                                                            





                                                                                       
                                                                                          









                                                                                                                             
                                                                          




                                                                                         
























                                                                                                                                                                                                      
 
use std::{collections::HashSet, path::Path};

use chrono::{DateTime, Utc};
use jid::JID;
use sqlx::{SqlitePool, migrate};
use uuid::Uuid;

use crate::{
    chat::{Body, Chat, Delivery, Message},
    error::{DatabaseError as Error, DatabaseOpenError},
    presence::Online,
    roster::Contact,
    user::User,
};

#[derive(Clone)]
pub struct Db {
    db: SqlitePool,
}

// TODO: turn into trait
impl Db {
    pub async fn create_connect_and_migrate(
        path: impl AsRef<Path>,
    ) -> Result<Self, DatabaseOpenError> {
        if let Some(dir) = path.as_ref().parent() {
            if dir.is_dir() {
            } else {
                tokio::fs::create_dir_all(dir).await?;
            }
            let _file = tokio::fs::OpenOptions::new()
                .append(true)
                .create(true)
                .open(path.as_ref())
                .await?;
        }
        let url = format!(
            "sqlite://{}",
            path.as_ref()
                .to_str()
                .ok_or(DatabaseOpenError::InvalidPath)?
        );
        let db = SqlitePool::connect(&url).await?;
        migrate!().run(&db).await?;
        Ok(Self { db })
    }

    pub(crate) fn new(db: SqlitePool) -> Self {
        Self { db }
    }

    pub(crate) async fn create_user(&self, user: User) -> Result<(), Error> {
        sqlx::query!(
            "insert into users ( jid, nick ) values ( ?, ? )",
            user.jid,
            user.nick,
        )
        .execute(&self.db)
        .await?;
        Ok(())
    }

    pub(crate) async fn read_user(&self, user: JID) -> Result<User, Error> {
        sqlx::query!(
            "insert into users ( jid ) values ( ? ) on conflict do nothing",
            user
        )
        .execute(&self.db)
        .await?;
        let user: User = sqlx::query_as("select * from users where jid = ?")
            .bind(user)
            .fetch_one(&self.db)
            .await?;
        Ok(user)
    }

    /// returns whether or not the nickname was updated
    pub(crate) async fn delete_user_nick(&self, jid: JID) -> Result<bool, Error> {
        if sqlx::query!(
            "insert into users (jid, nick) values (?, ?) on conflict do update set nick = ? where nick is not ?",
            jid,
            None::<String>,
            None::<String>,
            None::<String>,
        )
        .execute(&self.db)
        .await?
        .rows_affected()
            > 0
        {
            Ok(true)
        } else {
            Ok(false)
        }
    }

    /// returns whether or not the nickname was updated
    pub(crate) async fn upsert_user_nick(&self, jid: JID, nick: String) -> Result<bool, Error> {
        let rows_affected = sqlx::query!(
            "insert into users (jid, nick) values (?, ?) on conflict do update set nick = ? where nick is not ?",
            jid,
            nick,
            nick,
            nick
        )
        .execute(&self.db)
        .await?
        .rows_affected();
        tracing::debug!("rows affected: {}", rows_affected);
        if rows_affected > 0 {
            Ok(true)
        } else {
            Ok(false)
        }
    }

    /// returns whether or not the avatar was updated, and the file to delete if there existed an old avatar
    pub(crate) async fn delete_user_avatar(
        &self,
        jid: JID,
    ) -> Result<(bool, Option<String>), Error> {
        #[derive(sqlx::FromRow)]
        struct AvatarRow {
            avatar: Option<String>,
        }
        let old_avatar: Option<String> = sqlx::query_as("select avatar from users where jid = ?")
            .bind(jid.clone())
            .fetch_optional(&self.db)
            .await?
            .map(|row: AvatarRow| row.avatar)
            .unwrap_or(None);
        if sqlx::query!(
            "insert into users (jid, avatar) values (?, ?) on conflict do update set avatar = ? where avatar is not ?",
            jid,
            None::<String>,
            None::<String>,
            None::<String>,
        )
        .execute(&self.db)
        .await?
        .rows_affected()
            > 0
        {
            Ok((true, old_avatar))
        } else {
            Ok((false, old_avatar))
        }
    }

    /// returns whether or not the avatar was updated, and the file to delete if there existed an old avatar
    pub(crate) async fn upsert_user_avatar(
        &self,
        jid: JID,
        avatar: String,
    ) -> Result<(bool, Option<String>), Error> {
        #[derive(sqlx::FromRow)]
        struct AvatarRow {
            avatar: Option<String>,
        }
        let old_avatar: Option<String> = sqlx::query_as("select avatar from users where jid = ?")
            .bind(jid.clone())
            .fetch_optional(&self.db)
            .await?
            .map(|row: AvatarRow| row.avatar)
            .unwrap_or(None);
        if sqlx::query!(
            "insert into users (jid, avatar) values (?, ?) on conflict do update set avatar = ? where avatar is not ?",
            jid,
            avatar,
            avatar,
            avatar,
        )
        .execute(&self.db)
        .await?
        .rows_affected()
            > 0
        {
            Ok((true, old_avatar))
        } else {
            Ok((false, old_avatar))
        }
    }

    pub(crate) async fn update_user(&self, user: User) -> Result<(), Error> {
        sqlx::query!(
            "update users set nick = ? where jid = ?",
            user.nick,
            user.jid
        )
        .execute(&self.db)
        .await?;
        Ok(())
    }

    // TODO: should this be allowed? messages need to reference users. should probably only allow delete if every other thing referencing it has been deleted, or if you make clear to the user deleting a user will delete all messages associated with them.
    // pub(crate) async fn delete_user(&self, user: JID) -> Result<(), Error> {}

    /// does not create the underlying user, if underlying user does not exist, create_user() must be called separately
    pub(crate) async fn create_contact(&self, contact: Contact) -> Result<(), Error> {
        sqlx::query!(
            "insert into roster ( user_jid, name, subscription ) values ( ?, ?, ? )",
            contact.user_jid,
            contact.name,
            contact.subscription
        )
        .execute(&self.db)
        .await?;
        // TODO: abstract this out in to add_to_group() function ?
        for group in contact.groups {
            sqlx::query!(
                "insert into groups (group_name) values (?) on conflict do nothing",
                group
            )
            .execute(&self.db)
            .await?;
            sqlx::query!(
                "insert into groups_roster (group_name, contact_jid) values (?, ?)",
                group,
                contact.user_jid
            )
            .execute(&self.db)
            .await?;
        }
        Ok(())
    }

    pub(crate) async fn read_contact(&self, contact: JID) -> Result<Contact, Error> {
        let mut contact: Contact = sqlx::query_as("select * from roster where user_jid = ?")
            .bind(contact)
            .fetch_one(&self.db)
            .await?;
        #[derive(sqlx::FromRow)]
        struct Row {
            group_name: String,
        }
        let groups: Vec<Row> =
            sqlx::query_as("select group_name from groups_roster where contact_jid = ?")
                .bind(&contact.user_jid)
                .fetch_all(&self.db)
                .await?;
        contact.groups = HashSet::from_iter(groups.into_iter().map(|row| row.group_name));
        Ok(contact)
    }

    pub(crate) async fn read_contact_opt(&self, contact: &JID) -> Result<Option<Contact>, Error> {
        let contact: Option<Contact> =
            sqlx::query_as("select * from roster join users on jid = user_jid where jid = ?")
                .bind(contact)
                .fetch_optional(&self.db)
                .await?;
        if let Some(mut contact) = contact {
            #[derive(sqlx::FromRow)]
            struct Row {
                group_name: String,
            }
            let groups: Vec<Row> =
                sqlx::query_as("select group_name from groups_roster where contact_jid = ?")
                    .bind(&contact.user_jid)
                    .fetch_all(&self.db)
                    .await?;
            contact.groups = HashSet::from_iter(groups.into_iter().map(|row| row.group_name));
            Ok(Some(contact))
        } else {
            Ok(None)
        }
    }

    /// does not update the underlying user, to update user, update_user() must be called separately
    pub(crate) async fn update_contact(&self, contact: Contact) -> Result<(), Error> {
        sqlx::query!(
            "update roster set name = ?, subscription = ? where user_jid = ?",
            contact.name,
            contact.subscription,
            contact.user_jid
        )
        .execute(&self.db)
        .await?;
        sqlx::query!(
            "delete from groups_roster where contact_jid = ?",
            contact.user_jid
        )
        .execute(&self.db)
        .await?;
        // TODO: delete orphaned groups from groups table
        for group in contact.groups {
            sqlx::query!(
                "insert into groups (group_name) values (?) on conflict do nothing",
                group
            )
            .execute(&self.db)
            .await?;
            sqlx::query!(
                "insert into groups_roster (group_name, contact_jid) values (?, ?)",
                group,
                contact.user_jid
            )
            .execute(&self.db)
            .await?;
        }
        Ok(())
    }

    pub(crate) async fn upsert_contact(&self, contact: Contact) -> Result<(), Error> {
        sqlx::query!(
            "insert into users ( jid ) values ( ? ) on conflict do nothing",
            contact.user_jid,
        )
        .execute(&self.db)
        .await?;
        sqlx::query!(
            "insert into roster ( user_jid, name, subscription ) values ( ?, ?, ? ) on conflict do update set name = ?, subscription = ?",
            contact.user_jid,
            contact.name,
            contact.subscription,
            contact.name,
            contact.subscription
        )
        .execute(&self.db)
        .await?;
        sqlx::query!(
            "delete from groups_roster where contact_jid = ?",
            contact.user_jid
        )
        .execute(&self.db)
        .await?;
        // TODO: delete orphaned groups from groups table
        for group in contact.groups {
            sqlx::query!(
                "insert into groups (group_name) values (?) on conflict do nothing",
                group
            )
            .execute(&self.db)
            .await?;
            sqlx::query!(
                "insert into groups_roster (group_name, contact_jid) values (?, ?)",
                group,
                contact.user_jid
            )
            .execute(&self.db)
            .await?;
        }
        Ok(())
    }

    pub(crate) async fn delete_contact(&self, contact: JID) -> Result<(), Error> {
        sqlx::query!("delete from roster where user_jid = ?", contact)
            .execute(&self.db)
            .await?;
        // TODO: delete orphaned groups from groups table
        Ok(())
    }

    pub(crate) async fn replace_cached_roster(&self, roster: Vec<Contact>) -> Result<(), Error> {
        sqlx::query!("delete from roster").execute(&self.db).await?;
        for contact in roster {
            self.upsert_contact(contact).await?;
        }
        Ok(())
    }

    pub(crate) async fn read_cached_roster(&self) -> Result<Vec<Contact>, Error> {
        let mut roster: Vec<Contact> = sqlx::query_as("select * from roster")
            .fetch_all(&self.db)
            .await?;
        for contact in &mut roster {
            #[derive(sqlx::FromRow)]
            struct Row {
                group_name: String,
            }
            let groups: Vec<Row> =
                sqlx::query_as("select group_name from groups_roster where contact_jid = ?")
                    .bind(&contact.user_jid)
                    .fetch_all(&self.db)
                    .await?;
            contact.groups = HashSet::from_iter(groups.into_iter().map(|row| row.group_name));
        }
        Ok(roster)
    }

    pub(crate) async fn read_cached_roster_with_users(
        &self,
    ) -> Result<Vec<(Contact, User)>, Error> {
        #[derive(sqlx::FromRow)]
        struct Row {
            #[sqlx(flatten)]
            contact: Contact,
            #[sqlx(flatten)]
            user: User,
        }
        let mut roster: Vec<Row> =
            sqlx::query_as("select * from roster join users on jid = user_jid")
                .fetch_all(&self.db)
                .await?;
        for row in &mut roster {
            #[derive(sqlx::FromRow)]
            struct Row {
                group_name: String,
            }
            let groups: Vec<Row> =
                sqlx::query_as("select group_name from groups_roster where contact_jid = ?")
                    .bind(&row.contact.user_jid)
                    .fetch_all(&self.db)
                    .await?;
            row.contact.groups = HashSet::from_iter(groups.into_iter().map(|row| row.group_name));
        }
        let roster = roster
            .into_iter()
            .map(|row| (row.contact, row.user))
            .collect();
        Ok(roster)
    }

    pub(crate) async fn create_chat(&self, chat: Chat) -> Result<(), Error> {
        let id = Uuid::new_v4();
        let jid = chat.correspondent();
        sqlx::query!(
            "insert into chats (id, correspondent, have_chatted) values (?, ?, ?)",
            id,
            jid,
            chat.have_chatted,
        )
        .execute(&self.db)
        .await?;
        Ok(())
    }

    // TODO: what happens if a correspondent changes from a user to a contact? maybe just have correspondent be a user, then have the client make the user show up as a contact in ui if they are in the loaded roster.

    pub(crate) async fn read_chat(&self, chat: JID) -> Result<Chat, Error> {
        // check if the chat correponding with the jid exists
        let chat: Chat = sqlx::query_as("select correspondent from chats where correspondent = ?")
            .bind(chat)
            .fetch_one(&self.db)
            .await?;
        Ok(chat)
    }

    pub(crate) async fn mark_chat_as_chatted(&self, chat: JID) -> Result<(), Error> {
        let jid = chat.as_bare();
        sqlx::query!(
            "update chats set have_chatted = true where correspondent = ?",
            jid
        )
        .execute(&self.db)
        .await?;
        Ok(())
    }

    pub(crate) async fn update_chat_correspondent(
        &self,
        old_chat: Chat,
        new_correspondent: JID,
    ) -> Result<Chat, Error> {
        // TODO: update other chat data if it differs (for now there is only correspondent so doesn't matter)
        let new_jid = &new_correspondent;
        let old_jid = old_chat.correspondent();
        sqlx::query!(
            "update chats set correspondent = ? where correspondent = ?",
            new_jid,
            old_jid,
        )
        .execute(&self.db)
        .await?;
        let chat = self.read_chat(new_correspondent).await?;
        Ok(chat)
    }

    // pub(crate) async fn update_chat

    pub(crate) async fn delete_chat(&self, chat: JID) -> Result<(), Error> {
        sqlx::query!("delete from chats where correspondent = ?", chat)
            .execute(&self.db)
            .await?;
        Ok(())
    }

    /// TODO: sorting and filtering (for now there is no sorting)
    pub(crate) async fn read_chats(&self) -> Result<Vec<Chat>, Error> {
        let chats: Vec<Chat> = sqlx::query_as("select * from chats")
            .fetch_all(&self.db)
            .await?;
        Ok(chats)
    }

    /// chats ordered by date of last message
    // greatest-n-per-group
    pub(crate) async fn read_chats_ordered(&self) -> Result<Vec<Chat>, Error> {
        let chats = sqlx::query_as("select c.*, m.* from chats c join (select chat_id, max(timestamp) max_timestamp from messages group by chat_id) max_timestamps on c.id = max_timestamps.chat_id join messages m on max_timestamps.chat_id = m.chat_id and max_timestamps.max_timestamp = m.timestamp order by m.timestamp desc")
            .fetch_all(&self.db)
            .await?;
        Ok(chats)
    }

    /// chats ordered by date of last message
    // greatest-n-per-group
    pub(crate) async fn read_chats_ordered_with_latest_messages(
        &self,
    ) -> Result<Vec<(Chat, Message)>, Error> {
        #[derive(sqlx::FromRow)]
        pub struct RowChat {
            chat_correspondent: JID,
            chat_have_chatted: bool,
        }
        impl From<RowChat> for Chat {
            fn from(value: RowChat) -> Self {
                Self {
                    correspondent: value.chat_correspondent,
                    have_chatted: value.chat_have_chatted,
                }
            }
        }
        #[derive(sqlx::FromRow)]
        pub struct RowMessage {
            message_id: Uuid,
            message_body: String,
            message_delivery: Option<Delivery>,
            message_timestamp: DateTime<Utc>,
            message_from_jid: JID,
        }
        impl From<RowMessage> for Message {
            fn from(value: RowMessage) -> Self {
                Self {
                    id: value.message_id,
                    from: value.message_from_jid,
                    delivery: value.message_delivery,
                    timestamp: value.message_timestamp,
                    body: Body {
                        body: value.message_body,
                    },
                }
            }
        }

        #[derive(sqlx::FromRow)]
        pub struct ChatWithMessageRow {
            #[sqlx(flatten)]
            pub chat: RowChat,
            #[sqlx(flatten)]
            pub message: RowMessage,
        }

        pub struct ChatWithMessage {
            chat: Chat,
            message: Message,
        }

        impl From<ChatWithMessageRow> for ChatWithMessage {
            fn from(value: ChatWithMessageRow) -> Self {
                Self {
                    chat: value.chat.into(),
                    message: value.message.into(),
                }
            }
        }

        // TODO: sometimes chats have no messages.
        let chats: Vec<ChatWithMessageRow> = sqlx::query_as("select c.*, m.* from chats c join (select chat_id, max(timestamp) max_timestamp from messages group by chat_id) max_timestamps on c.id = max_timestamps.chat_id join messages m on max_timestamps.chat_id = m.chat_id and max_timestamps.max_timestamp = m.timestamp order by m.timestamp desc")
            .fetch_all(&self.db)
            .await?;

        let chats = chats
            .into_iter()
            .map(|chat_with_message_row| {
                let chat_with_message: ChatWithMessage = chat_with_message_row.into();
                (chat_with_message.chat, chat_with_message.message)
            })
            .collect();

        Ok(chats)
    }

    /// chats ordered by date of last message
    // greatest-n-per-group
    pub(crate) async fn read_chats_ordered_with_latest_messages_and_users(
        &self,
    ) -> Result<Vec<((Chat, User), (Message, User))>, Error> {
        #[derive(sqlx::FromRow)]
        pub struct RowChat {
            chat_correspondent: JID,
            chat_have_chatted: bool,
        }
        impl From<RowChat> for Chat {
            fn from(value: RowChat) -> Self {
                Self {
                    correspondent: value.chat_correspondent,
                    have_chatted: value.chat_have_chatted,
                }
            }
        }
        #[derive(sqlx::FromRow)]
        pub struct RowMessage {
            message_id: Uuid,
            message_body: String,
            message_delivery: Option<Delivery>,
            message_timestamp: DateTime<Utc>,
            message_from_jid: JID,
        }
        impl From<RowMessage> for Message {
            fn from(value: RowMessage) -> Self {
                Self {
                    id: value.message_id,
                    from: value.message_from_jid,
                    delivery: value.message_delivery,
                    timestamp: value.message_timestamp,
                    body: Body {
                        body: value.message_body,
                    },
                }
            }
        }
        #[derive(sqlx::FromRow)]
        pub struct RowChatUser {
            chat_user_jid: JID,
            chat_user_nick: Option<String>,
            chat_user_avatar: Option<String>,
        }
        impl From<RowChatUser> for User {
            fn from(value: RowChatUser) -> Self {
                Self {
                    jid: value.chat_user_jid,
                    nick: value.chat_user_nick,
                    avatar: value.chat_user_avatar,
                }
            }
        }
        #[derive(sqlx::FromRow)]
        pub struct RowMessageUser {
            message_user_jid: JID,
            message_user_nick: Option<String>,
            message_user_avatar: Option<String>,
        }
        impl From<RowMessageUser> for User {
            fn from(value: RowMessageUser) -> Self {
                Self {
                    jid: value.message_user_jid,
                    nick: value.message_user_nick,
                    avatar: value.message_user_avatar,
                }
            }
        }
        #[derive(sqlx::FromRow)]
        pub struct ChatWithMessageAndUsersRow {
            #[sqlx(flatten)]
            pub chat: RowChat,
            #[sqlx(flatten)]
            pub chat_user: RowChatUser,
            #[sqlx(flatten)]
            pub message: RowMessage,
            #[sqlx(flatten)]
            pub message_user: RowMessageUser,
        }

        impl From<ChatWithMessageAndUsersRow> for ChatWithMessageAndUsers {
            fn from(value: ChatWithMessageAndUsersRow) -> Self {
                Self {
                    chat: value.chat.into(),
                    chat_user: value.chat_user.into(),
                    message: value.message.into(),
                    message_user: value.message_user.into(),
                }
            }
        }

        pub struct ChatWithMessageAndUsers {
            chat: Chat,
            chat_user: User,
            message: Message,
            message_user: User,
        }

        let chats: Vec<ChatWithMessageAndUsersRow> = sqlx::query_as("select c.id as chat_id, c.correspondent as chat_correspondent, c.have_chatted as chat_have_chatted, m.id as message_id, m.body as message_body, m.delivery as message_delivery, m.timestamp as message_timestamp, m.from_jid as message_from_jid, cu.jid as chat_user_jid, cu.nick as chat_user_nick, cu.avatar as chat_user_avatar, mu.jid as message_user_jid, mu.nick as message_user_nick, mu.avatar as message_user_avatar from chats c join (select chat_id, max(timestamp) max_timestamp from messages group by chat_id) max_timestamps on c.id = max_timestamps.chat_id join messages m on max_timestamps.chat_id = m.chat_id and max_timestamps.max_timestamp = m.timestamp join users as cu on cu.jid = c.correspondent join users as mu on mu.jid = m.from_jid order by m.timestamp desc")
            .fetch_all(&self.db)
            .await?;

        let chats = chats
            .into_iter()
            .map(|chat_with_message_and_users_row| {
                let chat_with_message_and_users: ChatWithMessageAndUsers =
                    chat_with_message_and_users_row.into();
                (
                    (
                        chat_with_message_and_users.chat,
                        chat_with_message_and_users.chat_user,
                    ),
                    (
                        chat_with_message_and_users.message,
                        chat_with_message_and_users.message_user,
                    ),
                )
            })
            .collect();

        Ok(chats)
    }

    async fn read_chat_id(&self, chat: JID) -> Result<Uuid, Error> {
        #[derive(sqlx::FromRow)]
        struct Row {
            id: Uuid,
        }
        let chat = chat.as_bare();
        let chat_id: Row = sqlx::query_as("select id from chats where correspondent = ?")
            .bind(chat)
            .fetch_one(&self.db)
            .await?;
        let chat_id = chat_id.id;
        Ok(chat_id)
    }

    async fn read_chat_id_opt(&self, chat: JID) -> Result<Option<Uuid>, Error> {
        #[derive(sqlx::FromRow)]
        struct Row {
            id: Uuid,
        }
        let chat_id: Option<Row> = sqlx::query_as("select id from chats where correspondent = ?")
            .bind(chat)
            .fetch_optional(&self.db)
            .await?;
        let chat_id = chat_id.map(|row| row.id);
        Ok(chat_id)
    }

    /// if the chat doesn't already exist, it must be created by calling create_chat() before running this function.
    pub(crate) async fn create_message(
        &self,
        message: Message,
        chat: JID,
        from: JID,
    ) -> Result<(), Error> {
        // TODO: one query
        let from_jid = from.as_bare();
        let chat_id = self.read_chat_id(chat).await?;
        sqlx::query!("insert into messages (id, body, chat_id, from_jid, from_resource, timestamp) values (?, ?, ?, ?, ?, ?)", message.id, message.body.body, chat_id, from_jid, from.resourcepart, message.timestamp).execute(&self.db).await?;
        Ok(())
    }

    pub(crate) async fn upsert_chat_and_user(&self, chat: &JID) -> Result<bool, Error> {
        let bare_chat = chat.as_bare();
        sqlx::query!(
            "insert into users (jid) values (?) on conflict do nothing",
            bare_chat,
        )
        .execute(&self.db)
        .await?;
        let id = Uuid::new_v4();
        let chat: Chat = sqlx::query_as("insert into chats (id, correspondent, have_chatted) values (?, ?, ?) on conflict do nothing; select * from chats where correspondent = ?")
            .bind(id)
            .bind(bare_chat.clone())
            .bind(false)
            .bind(bare_chat)
            .fetch_one(&self.db)
            .await?;
        tracing::debug!("CHECKING chat: {:?}", chat);
        Ok(chat.have_chatted)
    }

    /// MUST upsert chat beforehand
    pub(crate) async fn create_message_with_self_resource(
        &self,
        message: Message,
        chat: JID,
        // full jid
        from: JID,
    ) -> Result<(), Error> {
        let from_jid = from.as_bare();
        if let Some(resource) = &from.resourcepart {
            sqlx::query!(
                "insert into resources (bare_jid, resource) values (?, ?) on conflict do nothing",
                from_jid,
                resource
            )
            .execute(&self.db)
            .await?;
        }
        self.create_message(message, chat, from).await?;
        Ok(())
    }

    /// create direct message from incoming. MUST upsert chat and user
    pub(crate) async fn create_message_with_user_resource(
        &self,
        message: Message,
        chat: JID,
        // full jid
        from: JID,
    ) -> Result<(), Error> {
        let bare_chat = chat.as_bare();
        let resource = &chat.resourcepart;
        // sqlx::query!(
        //     "insert into users (jid) values (?) on conflict do nothing",
        //     bare_chat
        // )
        // .execute(&self.db)
        // .await?;
        // let id = Uuid::new_v4();
        // sqlx::query!(
        //     "insert into chats (id, correspondent) values (?, ?) on conflict do nothing",
        //     id,
        //     bare_chat
        // )
        // .execute(&self.db)
        // .await?;
        if let Some(resource) = resource {
            sqlx::query!(
                "insert into resources (bare_jid, resource) values (?, ?) on conflict do nothing",
                bare_chat,
                resource
            )
            .execute(&self.db)
            .await?;
        }
        self.create_message(message, chat, from).await?;
        Ok(())
    }

    pub(crate) async fn read_message(&self, message: Uuid) -> Result<Message, Error> {
        let message: Message = sqlx::query_as("select * from messages where id = ?")
            .bind(message)
            .fetch_one(&self.db)
            .await?;
        Ok(message)
    }

    // TODO: message updates/edits pub(crate) async fn update_message(&self, message: Message) -> Result<(), Error> {}

    pub(crate) async fn delete_message(&self, message: Uuid) -> Result<(), Error> {
        sqlx::query!("delete from messages where id = ?", message)
            .execute(&self.db)
            .await?;
        Ok(())
    }

    // TODO: paging
    pub(crate) async fn read_message_history(&self, chat: JID) -> Result<Vec<Message>, Error> {
        let chat_id = self.read_chat_id(chat).await?;
        let messages: Vec<Message> =
            sqlx::query_as("select * from messages where chat_id = ? order by timestamp asc")
                .bind(chat_id)
                .fetch_all(&self.db)
                .await?;
        Ok(messages)
    }

    pub(crate) async fn read_message_history_with_users(
        &self,
        chat: JID,
    ) -> Result<Vec<(Message, User)>, Error> {
        let chat_id = self.read_chat_id(chat).await?;
        #[derive(sqlx::FromRow)]
        pub struct Row {
            #[sqlx(flatten)]
            user: User,
            #[sqlx(flatten)]
            message: Message,
        }
        let messages: Vec<Row> =
            sqlx::query_as("select * from messages join users on jid = from_jid where chat_id = ? order by timestamp asc")
                .bind(chat_id)
                .fetch_all(&self.db)
                .await?;
        let messages = messages
            .into_iter()
            .map(|row| (row.message, row.user))
            .collect();
        Ok(messages)
    }

    pub(crate) async fn read_cached_status(&self) -> Result<Online, Error> {
        let online: Online = sqlx::query_as("select * from cached_status where id = 0")
            .fetch_one(&self.db)
            .await?;
        Ok(online)
    }

    pub(crate) async fn upsert_cached_status(&self, status: Online) -> Result<(), Error> {
        sqlx::query!(
            "insert into cached_status (id, show, message) values (0, ?, ?) on conflict do update set show = ?, message = ?",
            status.show,
            status.status,
            status.show,
            status.status
        ).execute(&self.db).await?;
        Ok(())
    }

    pub(crate) async fn delete_cached_status(&self) -> Result<(), Error> {
        sqlx::query!("update cached_status set show = null, message = null where id = 0")
            .execute(&self.db)
            .await?;
        Ok(())
    }

    pub(crate) async fn read_capabilities(&self, node: &str) -> Result<String, Error> {
        #[derive(sqlx::FromRow)]
        struct Row {
            capabilities: String,
        }
        let row: Row =
            sqlx::query_as("select capabilities from capability_hash_nodes where node = ?")
                .bind(node)
                .fetch_one(&self.db)
                .await?;
        Ok(row.capabilities)
    }

    pub(crate) async fn upsert_capabilities(
        &self,
        node: &str,
        capabilities: &str,
    ) -> Result<(), Error> {
        let now = Utc::now();
        sqlx::query!(
            "insert into capability_hash_nodes (node, timestamp, capabilities) values (?, ?, ?) on conflict do update set timestamp = ?, capabilities = ?", node, now, capabilities, now, capabilities
        ).execute(&self.db).await?;
        Ok(())
    }
}