aboutsummaryrefslogblamecommitdiffstats
path: root/filamento/src/db.rs
blob: 79fdd9a237ba749073f74c3d4625150ece75d22a (plain) (tree)
1
2
3
4
5
6
7
                       
                                                                   
 
                            
                                 

                                              




                                         


               
                                          
                                                       
                     



                    
                       
               
                                             


                   
                                                   







                                      
                                                 
                   

 













                                                                                                    
         
                                       
                                            

                                          





                                                           



                                                                                         





                                                           





                                                       
                                          

                                         
                                                           



                                                            
                                                          



                                                                    









                                                            





                                                                                        
                                     

















                                                                                         
                                                           





                                                            

                                                                          

















                                                                             
                                  




                                      
                                                                                

                                                           
                                  




                                                       
                                                                                      

                                                                
                                  




                                                       
                                                                                                    

                                                                      
                                  






                                                                                                            
                     


                                                                  
                                  






                                                                                                            
                     







                                                   
                                  







                                                                             
                                  










                                                                                                                                                                                                                                                              
                                  



                                 
                                                                                         

                                                                 
                                  



                                 



                                         

                                                                    
                                  







                                                                                                    
                                  






                                                                                      
                                  



                                 
                                                                                      

                                                                   
                                  






                                                                                                 
                                  






                                                                                  
                                  








                                                                      
                                  






                                                                             
                                  





                                                                                                                                                                                                                       
                                                                                

                                                           
                                  



                                 
                                                                                                 






                                                                  
                                                                                         

                                                                    
                                  



                                 

                                      
                                                                                

                                                             
                                  







                                                                       
                                  








                                                                               
                                  










                                                                               
                                  










                                                                                       
                                  








                                                                                                                    
                      
                      







                                                
                                  



                                 
                                                                                           

                                                                    
                                  



                                 










                                                        
                                  















                                                                                                                      




                                                                         
                                                                                                              
                       
                                                                                  


                                                              











                                                     
                                  





                                                 
                





                                                   
                      


                                                     
                     


                                                     
                     



                                                     
                     


                                                                       
                     











                                                                       
                         


                                                        
                         










                                                                
                         
















                                                                     
                      

                                                     
                     
                      

                                                             
                       
                      

                                                   
                
                      























                                                                                     
                      
                      


                                                   
                      

                                                     













                                                        
                      


                                                             
                      






















                                                                     



                                                                        


















                                                                                           






                                                                                                                             











                                                                                                      


     


                                       



                                                     


                                                   
                                              
             
                                                   

                             
                                      

                          
                 



                                                       

                                                     
                          

                                                               




                                       


                                                     


                                                               

     
                                     
                                  


                                                     

                                                               
                                 

     
                                     
                                  

                                    
                                                     
                                          
                                              
                                                               
                                 

     



                                                          
































                                                                      
                                                                
























                                                                      


                                                                


                                                                  

























                                                                                          






































                                                                                 
                                                           
         
                                                    
     
 
                                                                       
         
                            



                                                                                


              
                                      
                                                                          
                          























                                                                                

     
                                                       
                                                                                

                          
                                                                                                                                                                                                              





                              


                                                       
                                                                                              

                          
                                                                                                                                                                                   





                              


                                                                                                            
                                                                                                    

                                                             
                              











                                                                                                                                                                                                               


                                                                                                            
                                     
              
                     

                                                

                                                             
                              













                                                                                                                                                                                          

     
                                       

                                                                       


                                                                          



                                                                                                                                                                                                                                                              
                                                                          

                                                                                                                       

                                                                                













                                                                                        


              
                                                                                   
                          

















                                                                                    

     
                                                                                               
                          























                                                                                        


                                                                                                    

                                                                                


















                                                                                       


              

                                                                                





















                                                                                                                                               


              
                                                                                
               
                                                                            


              
                                                                                           
         
                                                       

                               
                                          
         


              

                                                                            


















                                                                                        

     

                                                                                               






















                                                                                                                                                                                   

     
                                                                       
                                       




                                                          
                        
                                                                                      
                                              
           


              
                                



                                                                          
                                                                                                                                      








                                                   












                                                
                                                                                           

                                                 
                                                                                                                                                                                    





















                                                   
                                              
                                                
                                             


                                

     
                                                                                   
                        
                                                                                                                            

                   


              
                                                                          
               
                                                                                                                            



                                                                 
                                                                 
                        
               
                                                                                                                     







                                               

     

                                             
                                                                         
                        
               
                                                                                                                                                                                                                                                                                                                                                                                             







                                               

     

                                             
                                                          

                                              
                            
               























                                                          










                                                   



                                                         



                                             



                                             
                                                                    

                                                              
                        
               


































                                                          










                                                                          
                          









                                                                     

                                                       




                                                                    
                          




                                             

     
                          
                                                                  
                                        
                                                                                                            



                                  

     
                                                                              
                          
               
                       
                                                                                                                




                                      


                                                                                                                    
                                                                                              
                          
                                 

                         
                      
                      
                            






                                                                                  
                                            
                                                                                                                                                                                                                                             


              


                                                                                      
                                                                         
                    
           






                                                                                              
                    
                                  
           
                    

     






                                                                                                                                                        
           






                                                                                      

     
                                          



                            
                        





                                                              
                                                                                   







                                  
 
                                                                                                                
 

                                                                             
                                                                       


              

                                                                                
                                                                                                                                                               








                                                     

                                              





                   
                   
                                                                                             
                                               
                           
               
                     
                                                                                                                                                                                                               



                                         



                                                     

                                              



                                             
     
 
                                                  
              
                      
                                              
                                               
                           
               
                     
                                                                                                                                                                                                                                









                                                         

                                                  









                                             

     

                                                                      










                                                                   

     

                                                                                                                                                                                                           


              

                                                                    


                                                                                

              
 

                                                                                   




                                                                             

     
                                      
              

                             
                            
                             
                                                                                                                                                                                                                            

              
 
use core::fmt::Display;
use std::{collections::HashSet, ops::Deref, path::Path, sync::Arc};

use chrono::{DateTime, Utc};
use jid::{BareJID, FullJID, JID};
use rusqlite::{Connection, OptionalExtension};
use tokio::sync::{Mutex, MutexGuard};
use tokio::sync::{mpsc, oneshot};
use tokio::task::{spawn, spawn_blocking};
#[cfg(target_arch = "wasm32")]
use tokio_with_wasm::alias as tokio;
use tracing::debug;
use uuid::Uuid;

use crate::{
    chat::{Body, Chat, Delivery, Message},
    error::{DatabaseError as Error, DatabaseOpenError},
    presence::Online,
    roster::Contact,
    user::User,
};

#[derive(Clone, Debug)]
pub struct Db {
    sender: mpsc::UnboundedSender<DbCommand>,
}

impl Deref for Db {
    type Target = mpsc::UnboundedSender<DbCommand>;

    fn deref(&self) -> &Self::Target {
        &self.sender
    }
}

#[derive(Debug)]
pub struct DbActor {
    receiver: mpsc::UnboundedReceiver<DbCommand>,
    db: Connection,
}

macro_rules! impl_db_sends {
	($($command:ident => $name:ident($($arg:ident: $arg_t:ty),*) -> $ret:ty);*) => {
		$(
			pub(crate) async fn $name(&self, $($arg: $arg_t),*) -> Result<$ret, Error> {
				let (result, recv) = oneshot::channel();
				let command = DbCommand::$command { $($arg,)* result };
				let _ = self.sender.send(command);
				let result = recv.await?;
				result
			}
		)*
	}
}

impl Db {
    #[cfg(not(target_arch = "wasm32"))]
    pub async fn create_connect_and_migrate(
        path: impl AsRef<Path> + Send,
    ) -> Result<Self, DatabaseOpenError> {
        let (sender, receiver) = mpsc::unbounded_channel();

        let actor = DbActor::new(path, receiver)?;
        spawn_blocking(move || actor.run());

        Ok(Self { sender })
    }

    #[cfg(not(target_arch = "wasm32"))]
    pub async fn create_connect_and_migrate_memory() -> Result<Self, DatabaseOpenError> {
        let (sender, receiver) = mpsc::unbounded_channel();

        let actor = DbActor::new_memory(receiver)?;
        spawn_blocking(move || actor.run());

        Ok(Self { sender })
    }

    /// `file_name` should be a file not in a directory
    #[cfg(target_arch = "wasm32")]
    pub async fn create_connect_and_migrate(
        file_name: impl AsRef<str> + Send + 'static,
    ) -> Result<Self, DatabaseOpenError> {
        use tokio_with_wasm::spawn_local;

        let (sender, receiver) = mpsc::unbounded_channel();
        let (result_send, result_recv) = oneshot::channel();
        spawn_blocking(move || {
            spawn_local(async move {
                debug!("installing opfs in spawn");
                match rusqlite::ffi::install_opfs_sahpool(
                    Some(&rusqlite::ffi::OpfsSAHPoolCfg::default()),
                    false,
                )
                .await
                {
                    Ok(_) => {}
                    Err(e) => {
                        use crate::error::OpfsSAHError;

                        let error: OpfsSAHError = e.into();
                        result_send.send(Err(error.into()));
                        return;
                    }
                }
                debug!("opfs installed");
                let file_name = format!("file:{}?vfs=opfs-sahpool", file_name.as_ref());
                let result = DbActor::new(file_name, receiver);
                match result {
                    Ok(a) => {
                        result_send.send(Ok(()));
                        a.run().await
                    }
                    Err(e) => {
                        result_send.send(Err(e));
                    }
                }
            });
        });
        match result_recv.await {
            Ok(r) => match r {
                Ok(o) => Ok(Self { sender }),
                Err(e) => return Err(e),
            },
            Err(e) => return Err(e.into()),
        }
    }

    #[cfg(target_arch = "wasm32")]
    pub async fn create_connect_and_migrate_memory() -> Result<Self, DatabaseOpenError> {
        let (sender, receiver) = mpsc::unbounded_channel();
        let (result_send, result_recv) = oneshot::channel();
        spawn_blocking(move || {
            let result = DbActor::new_memory(receiver);
            match result {
                Ok(a) => {
                    result_send.send(Ok(()));
                    tokio_with_wasm::spawn_local(async { a.run().await });
                    // a.run()
                }
                Err(e) => {
                    result_send.send(Err(e));
                }
            }
        });
        match result_recv.await {
            Ok(r) => match r {
                Ok(o) => Ok(Self { sender }),
                Err(e) => return Err(e),
            },
            Err(e) => return Err(e.into()),
        }
    }

    pub(crate) async fn create_user(&self, user: User) -> Result<(), Error> {
        let (result, recv) = oneshot::channel();
        let command = DbCommand::CreateUser { user, result };
        self.sender.send(command);
        let result = recv.await?;
        result
    }

    // TODO: this is not a 'read' user
    pub(crate) async fn read_user(&self, user: BareJID) -> Result<User, Error> {
        let (result, recv) = oneshot::channel();
        let command = DbCommand::ReadUser { user, result };
        self.sender.send(command);
        let result = recv.await?;
        result
    }

    /// returns whether or not the nickname was updated
    pub(crate) async fn delete_user_nick(&self, jid: BareJID) -> Result<bool, Error> {
        let (result, recv) = oneshot::channel();
        let command = DbCommand::DeleteUserNick { jid, result };
        self.sender.send(command);
        let result = recv.await?;
        result
    }

    /// returns whether or not the nickname was updated
    pub(crate) async fn upsert_user_nick(&self, jid: BareJID, nick: String) -> Result<bool, Error> {
        let (result, recv) = oneshot::channel();
        let command = DbCommand::UpsertUserNick { jid, nick, result };
        self.sender.send(command);
        let result = recv.await?;
        result
    }

    /// returns whether or not the avatar was updated, and the file to delete if there existed an old avatar
    pub(crate) async fn delete_user_avatar(
        &self,
        jid: BareJID,
    ) -> Result<(bool, Option<String>), Error> {
        let (result, recv) = oneshot::channel();
        let command = DbCommand::DeleteUserAvatar { jid, result };
        self.sender.send(command);
        let result = recv.await?;
        result
    }

    /// returns whether or not the avatar was updated, and the file to delete if there existed an old avatar
    pub(crate) async fn upsert_user_avatar(
        &self,
        jid: BareJID,
        avatar: String,
    ) -> Result<(bool, Option<String>), Error> {
        let (result, recv) = oneshot::channel();
        let command = DbCommand::UpsertUserAvatar {
            jid,
            avatar,
            result,
        };
        self.sender.send(command);
        let result = recv.await?;
        result
    }

    // TODO: use references everywhere
    pub(crate) async fn update_user(&self, user: User) -> Result<(), Error> {
        let (result, recv) = oneshot::channel();
        let command = DbCommand::UpdateUser { user, result };
        self.sender.send(command);
        let result = recv.await?;
        result
    }

    // TODO: should this be allowed? messages need to reference users. should probably only allow delete if every other thing referencing it has been deleted, or if you make clear to the user deleting a user will delete all messages associated with them.
    // pub(crate) async fn delete_user(&self, user: JID) -> Result<(), Error> {}

    /// does not create the underlying user, if underlying user does not exist, create_user() must be called separately
    pub(crate) async fn create_contact(&self, contact: Contact) -> Result<(), Error> {
        let (result, recv) = oneshot::channel();
        let command = DbCommand::CreateContact { contact, result };
        self.sender.send(command);
        let result = recv.await?;
        result
    }

    pub(crate) async fn read_contact(&self, contact: BareJID) -> Result<Contact, Error> {
        let (result, recv) = oneshot::channel();
        let command = DbCommand::ReadContact { contact, result };
        self.sender.send(command);
        let result = recv.await?;
        result
    }

    pub(crate) async fn read_contact_opt(
        &self,
        contact: BareJID,
    ) -> Result<Option<Contact>, Error> {
        let (result, recv) = oneshot::channel();
        let command = DbCommand::ReadContactOpt { contact, result };
        self.sender.send(command);
        let result = recv.await?;
        result
    }

    /// does not update the underlying user, to update user, update_user() must be called separately
    pub(crate) async fn update_contact(&self, contact: Contact) -> Result<(), Error> {
        let (result, recv) = oneshot::channel();
        let command = DbCommand::UpdateContact { contact, result };
        self.sender.send(command);
        let result = recv.await?;
        result
    }

    pub(crate) async fn upsert_contact(&self, contact: Contact) -> Result<(), Error> {
        let (result, recv) = oneshot::channel();
        let command = DbCommand::UpsertContact { contact, result };
        self.sender.send(command);
        let result = recv.await?;
        result
    }

    pub(crate) async fn delete_contact(&self, contact: BareJID) -> Result<(), Error> {
        let (result, recv) = oneshot::channel();
        let command = DbCommand::DeleteContact { contact, result };
        self.sender.send(command);
        let result = recv.await?;
        result
    }

    pub(crate) async fn replace_cached_roster(&self, roster: Vec<Contact>) -> Result<(), Error> {
        let (result, recv) = oneshot::channel();
        let command = DbCommand::ReplaceCachedRoster { roster, result };
        self.sender.send(command);
        let result = recv.await?;
        result
    }

    pub(crate) async fn read_cached_roster(&self) -> Result<Vec<Contact>, Error> {
        let (result, recv) = oneshot::channel();
        let command = DbCommand::ReadCachedRoster { result };
        self.sender.send(command);
        let result = recv.await?;
        result
    }

    pub(crate) async fn read_cached_roster_with_users(
        &self,
    ) -> Result<Vec<(Contact, User)>, Error> {
        let (result, recv) = oneshot::channel();
        let command = DbCommand::ReadCachedRosterWithUsers { result };
        self.sender.send(command);
        let result = recv.await?;
        result
    }

    pub(crate) async fn create_chat(&self, chat: Chat) -> Result<(), Error> {
        let (result, recv) = oneshot::channel();
        let command = DbCommand::CreateChat { chat, result };
        self.sender.send(command);
        let result = recv.await?;
        result
    }

    // TODO: what happens if a correspondent changes from a user to a contact? maybe just have correspondent be a user, then have the client make the user show up as a contact in ui if they are in the loaded roster.

    pub(crate) async fn read_chat(&self, chat: BareJID) -> Result<Chat, Error> {
        let (result, recv) = oneshot::channel();
        let command = DbCommand::ReadChat { chat, result };
        self.sender.send(command);
        let result = recv.await?;
        result
    }

    pub(crate) async fn read_chat_and_user(&self, chat: BareJID) -> Result<(Chat, User), Error> {
        let (result, recv) = oneshot::channel();
        let command = DbCommand::ReadChatAndUser { chat, result };
        self.sender.send(command);
        let result = recv.await?;
        result
    }

    pub(crate) async fn mark_chat_as_chatted(&self, chat: BareJID) -> Result<(), Error> {
        let (result, recv) = oneshot::channel();
        let command = DbCommand::MarkChatAsChatted { chat, result };
        self.sender.send(command);
        let result = recv.await?;
        result
    }

    // pub(crate) async fn update_chat

    pub(crate) async fn delete_chat(&self, chat: BareJID) -> Result<(), Error> {
        let (result, recv) = oneshot::channel();
        let command = DbCommand::DeleteChat { chat, result };
        self.sender.send(command);
        let result = recv.await?;
        result
    }

    /// TODO: sorting and filtering (for now there is no sorting)
    pub(crate) async fn read_chats(&self) -> Result<Vec<Chat>, Error> {
        let (result, recv) = oneshot::channel();
        let command = DbCommand::ReadChats { result };
        self.sender.send(command);
        let result = recv.await?;
        result
    }

    /// chats ordered by date of last message
    // greatest-n-per-group
    pub(crate) async fn read_chats_ordered(&self) -> Result<Vec<Chat>, Error> {
        let (result, recv) = oneshot::channel();
        let command = DbCommand::ReadChatsOrdered { result };
        self.sender.send(command);
        let result = recv.await?;
        result
    }

    /// chats ordered by date of last message
    // greatest-n-per-group
    pub(crate) async fn read_chats_ordered_with_latest_messages(
        &self,
    ) -> Result<Vec<(Chat, Message)>, Error> {
        let (result, recv) = oneshot::channel();
        let command = DbCommand::ReadChatsOrderedWithLatestMessages { result };
        self.sender.send(command);
        let result = recv.await?;
        result
    }

    /// chats ordered by date of last message
    // greatest-n-per-group
    pub(crate) async fn read_chats_ordered_with_latest_messages_and_users(
        &self,
    ) -> Result<Vec<((Chat, User), (Message, User))>, Error> {
        let (result, recv) = oneshot::channel();
        let command = DbCommand::ReadChatsOrderedWithLatestMessagesAndUsers { result };
        self.sender.send(command);
        let result = recv.await?;
        result
    }

    /// if the chat doesn't already exist, it must be created by calling create_chat() before running this function.
    #[tracing::instrument]
    pub(crate) async fn create_message(
        &self,
        message: Message,
        chat: BareJID,
        from: BareJID,
    ) -> Result<(), Error> {
        let (result, recv) = oneshot::channel();
        let command = DbCommand::CreateMessage {
            message,
            chat,
            from,
            result,
        };
        self.sender.send(command);
        let result = recv.await?;
        result
    }

    pub(crate) async fn upsert_chat_and_user(&self, chat: BareJID) -> Result<bool, Error> {
        let (result, recv) = oneshot::channel();
        let command = DbCommand::UpsertChatAndUser { chat, result };
        self.sender.send(command);
        let result = recv.await?;
        result
    }

    pub(crate) async fn update_message_delivery(
        &self,
        message: Uuid,
        delivery: Delivery,
    ) -> Result<(), Error> {
        let (result, recv) = oneshot::channel();
        let command = DbCommand::UpdateMessageDelivery {
            message,
            delivery,
            result,
        };
        self.sender.send(command);
        let result = recv.await?;
        result
    }

    // pub(crate) async fn read_message(&self, message: Uuid) -> Result<Message, Error> {
    //     Ok(Message {
    //         id: Uuid,
    //         from: todo!(),
    //         delivery: todo!(),
    //         timestamp: todo!(),
    //         body: todo!(),
    //     })
    // }

    // TODO: message updates/edits pub(crate) async fn update_message(&self, message: Message) -> Result<(), Error> {}

    impl_db_sends!(
        ReadCapabilities => read_capabilities(node: String) -> String;
        DeleteCachedStatus => delete_cached_status() -> ();
        UpsertCachedStatus => upsert_cached_status(status: Online) -> ();
        ReadCachedStatus => read_cached_status() -> Online;
        ReadMessageHistoryWithUsers => read_message_history_with_users(chat: BareJID) -> Vec<(Message, User)>;
        // TODO: paging
        ReadMessageHistory => read_message_history(chat: BareJID) -> Vec<Message>;
        ReadMessage => read_message(message: Uuid) -> Message;
        DeleteMessage => delete_message(message: Uuid) -> ()
    );

    pub(crate) async fn upsert_capabilities(
        &self,
        node: String,
        capabilities: String,
    ) -> Result<(), Error> {
        let (result, recv) = oneshot::channel();
        let command = DbCommand::UpsertCapabilities {
            node,
            capabilities,
            result,
        };
        self.sender.send(command);
        let result = recv.await?;
        result
    }
}

// TODO: i should really just make an actor macro
#[derive(Debug)]
pub enum DbCommand {
    CreateUser {
        user: User,
        result: oneshot::Sender<Result<(), Error>>,
    },
    ReadUser {
        user: BareJID,
        result: oneshot::Sender<Result<User, Error>>,
    },
    DeleteUserNick {
        jid: BareJID,
        result: oneshot::Sender<Result<bool, Error>>,
    },
    UpsertUserNick {
        jid: BareJID,
        nick: String,
        result: oneshot::Sender<Result<bool, Error>>,
    },
    DeleteUserAvatar {
        jid: BareJID,
        result: oneshot::Sender<Result<(bool, Option<String>), Error>>,
    },
    UpsertUserAvatar {
        jid: BareJID,
        avatar: String,
        result: oneshot::Sender<Result<(bool, Option<String>), Error>>,
    },
    UpdateUser {
        user: User,
        result: oneshot::Sender<Result<(), Error>>,
    },
    CreateContact {
        contact: Contact,
        result: oneshot::Sender<Result<(), Error>>,
    },
    ReadContact {
        contact: BareJID,
        result: oneshot::Sender<Result<Contact, Error>>,
    },
    ReadContactOpt {
        contact: BareJID,
        result: oneshot::Sender<Result<Option<Contact>, Error>>,
    },
    UpdateContact {
        contact: Contact,
        result: oneshot::Sender<Result<(), Error>>,
    },
    UpsertContact {
        contact: Contact,
        result: oneshot::Sender<Result<(), Error>>,
    },
    DeleteContact {
        contact: BareJID,
        result: oneshot::Sender<Result<(), Error>>,
    },
    ReplaceCachedRoster {
        roster: Vec<Contact>,
        result: oneshot::Sender<Result<(), Error>>,
    },
    ReadCachedRoster {
        result: oneshot::Sender<Result<Vec<Contact>, Error>>,
    },
    ReadCachedRosterWithUsers {
        result: oneshot::Sender<Result<Vec<(Contact, User)>, Error>>,
    },
    CreateChat {
        chat: Chat,
        result: oneshot::Sender<Result<(), Error>>,
    },
    ReadChat {
        chat: BareJID,
        result: oneshot::Sender<Result<Chat, Error>>,
    },
    ReadChatAndUser {
        chat: BareJID,
        result: oneshot::Sender<Result<(Chat, User), Error>>,
    },
    MarkChatAsChatted {
        chat: BareJID,
        result: oneshot::Sender<Result<(), Error>>,
    },
    DeleteChat {
        chat: BareJID,
        result: oneshot::Sender<Result<(), Error>>,
    },
    ReadChats {
        result: oneshot::Sender<Result<Vec<Chat>, Error>>,
    },
    ReadChatsOrdered {
        result: oneshot::Sender<Result<Vec<Chat>, Error>>,
    },
    ReadChatsOrderedWithLatestMessages {
        result: oneshot::Sender<Result<Vec<(Chat, Message)>, Error>>,
    },
    ReadChatsOrderedWithLatestMessagesAndUsers {
        result: oneshot::Sender<Result<Vec<((Chat, User), (Message, User))>, Error>>,
    },
    // ReadChatID {

    //     result: oneshot::Sender<Result<, Error>>,
    // },
    // ReadChatIDOpt {
    //     chat: JID,
    //     result: oneshot::Sender<Result<Option<Uuid>, Error>>,
    // },
    CreateMessage {
        message: Message,
        chat: BareJID,
        from: BareJID,
        result: oneshot::Sender<Result<(), Error>>,
    },
    UpsertChatAndUser {
        chat: BareJID,
        result: oneshot::Sender<Result<bool, Error>>,
    },
    UpdateMessageDelivery {
        message: Uuid,
        delivery: Delivery,
        result: oneshot::Sender<Result<(), Error>>,
    },
    DeleteMessage {
        message: Uuid,
        result: oneshot::Sender<Result<(), Error>>,
    },
    ReadMessage {
        message: Uuid,
        result: oneshot::Sender<Result<Message, Error>>,
    },
    ReadMessageHistory {
        chat: BareJID,
        result: oneshot::Sender<Result<Vec<Message>, Error>>,
    },
    ReadMessageHistoryWithUsers {
        chat: BareJID,
        result: oneshot::Sender<Result<Vec<(Message, User)>, Error>>,
    },
    ReadCachedStatus {
        result: oneshot::Sender<Result<Online, Error>>,
    },
    UpsertCachedStatus {
        status: Online,
        result: oneshot::Sender<Result<(), Error>>,
    },
    DeleteCachedStatus {
        result: oneshot::Sender<Result<(), Error>>,
    },
    ReadCapabilities {
        node: String,
        result: oneshot::Sender<Result<String, Error>>,
    },
    UpsertCapabilities {
        node: String,
        capabilities: String,
        result: oneshot::Sender<Result<(), Error>>,
    },
}

impl Display for DbCommand {
    #[rustfmt::skip]
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        f.write_str(match self {
            DbCommand::CreateUser { user, result } => "CreateUser",
            DbCommand::ReadUser { user, result } => "ReadUser",
            DbCommand::DeleteUserNick { jid, result } => "DeleteUserNick",
            DbCommand::UpsertUserNick { jid, nick, result } => "UpsertUserNick",
            DbCommand::DeleteUserAvatar { jid, result } => "DeleteUserAvatar",
            DbCommand::UpsertUserAvatar { jid, avatar, result } => "UpsertUserAvatar",
            DbCommand::UpdateUser { user, result } => "UpdateUser",
            DbCommand::CreateContact { contact, result } => "CreateContact",
            DbCommand::ReadContact { contact, result } => "ReadContact",
            DbCommand::ReadContactOpt { contact, result } => "ReadContactOpt",
            DbCommand::UpdateContact { contact, result } => "UpdateContact",
            DbCommand::UpsertContact { contact, result } => "UpsertContact",
            DbCommand::DeleteContact { contact, result } => "DeleteContact",
            DbCommand::ReplaceCachedRoster { roster, result } => "ReplaceCachedRoster",
            DbCommand::ReadCachedRoster { result } => "ReadCachedRoster",
            DbCommand::ReadCachedRosterWithUsers { result } => "ReadCachedRosterWithUsers",
            DbCommand::CreateChat { chat, result } => "CreateChat",
            DbCommand::ReadChat { chat, result } => "ReadChat",
            DbCommand::MarkChatAsChatted { chat, result } => "MarkChatAsChatted",
            DbCommand::DeleteChat { chat, result } => "DeleteChat",
            DbCommand::ReadChats { result } => "ReadChats",
            DbCommand::ReadChatsOrdered { result } => "ReadChatsOrdered",
            DbCommand::ReadChatsOrderedWithLatestMessages { result } => "ReadChatsOrderedWithLatestMessages",
            DbCommand::ReadChatsOrderedWithLatestMessagesAndUsers { result } => "ReadChatsOrderedWithLatestMessagesAndUsers",
            DbCommand::CreateMessage { message, chat, from, result } => "CreateMessage",
            DbCommand::UpsertChatAndUser { chat, result } => "UpsertChatAndUser",
            DbCommand::UpdateMessageDelivery { message, delivery, result } => "UpdateMessageDelivery",
            DbCommand::DeleteMessage { message, result } => "DeleteMessage",
            DbCommand::ReadMessage { message, result } => "ReadMessage",
            DbCommand::ReadMessageHistory { chat, result } => "ReadMessageHistory",
            DbCommand::ReadMessageHistoryWithUsers { chat, result } => "ReadMessageHistoryWithUsers",
            DbCommand::ReadCachedStatus { result } => "ReadCachedStatus",
            DbCommand::UpsertCachedStatus { status, result } => "UpsertCachedStatus",
            DbCommand::DeleteCachedStatus { result } => "DeleteCachedStatus",
            DbCommand::ReadCapabilities { node, result } => "ReadCapabilities",
            DbCommand::UpsertCapabilities { node, capabilities, result } => "UpsertCapabilities",
            DbCommand::ReadChatAndUser { chat, result } => "ReadChatAndUser",
        })
    }
}

impl DbActor {
    /// must be run in blocking spawn
    #[cfg(not(target_arch = "wasm32"))]
    pub(crate) fn new(
        path: impl AsRef<Path>,
        receiver: mpsc::UnboundedReceiver<DbCommand>,
    ) -> Result<Self, DatabaseOpenError> {
        if let Some(dir) = path.as_ref().parent() {
            if dir.is_dir() {
            } else {
                std::fs::create_dir_all(dir)?;
            }
            let _file = std::fs::OpenOptions::new()
                .append(true)
                .create(true)
                .open(path.as_ref())?;
        }
        let url = format!(
            "{}",
            path.as_ref()
                .to_str()
                .ok_or(DatabaseOpenError::InvalidPath)?
        );
        // let db = SqlitePool::connect(&url).await?;
        // migrate!().run(&db).await?;
        // Ok(Self { db })
        let db = Connection::open(url)?;
        db.execute_batch(include_str!("../migrations/1.sql"))?;
        Ok(Self { db, receiver })
    }

    /// must be run in blocking spawn
    #[cfg(not(target_arch = "wasm32"))]
    pub(crate) fn new_memory(
        receiver: mpsc::UnboundedReceiver<DbCommand>,
    ) -> Result<Self, DatabaseOpenError> {
        let db = Connection::open_in_memory()?;
        db.execute_batch(include_str!("../migrations/1.sql"))?;
        Ok(Self { db, receiver })
    }

    /// must be run in blocking spawn
    #[cfg(target_arch = "wasm32")]
    pub fn new_memory(
        receiver: mpsc::UnboundedReceiver<DbCommand>,
    ) -> Result<Self, DatabaseOpenError> {
        let db = Connection::open("mem.db")?;
        db.execute_batch(include_str!("../migrations/1.sql"))?;
        Ok(Self { db, receiver })
    }

    /// must be run in blocking spawn
    #[cfg(target_arch = "wasm32")]
    pub fn new(
        file_name: impl AsRef<Path>,
        receiver: mpsc::UnboundedReceiver<DbCommand>,
    ) -> Result<Self, DatabaseOpenError> {
        let db = Connection::open(file_name)?;
        db.execute_batch(include_str!("../migrations/1.sql"))?;
        Ok(Self { db, receiver })
    }

    pub(crate) async fn run(mut self) {
        while let Some(cmd) = self.receiver.recv().await {
            let cmd_name = cmd.to_string();
            tracing::warn!("command recv: {cmd_name}");
            match cmd {
                DbCommand::CreateUser { user, result } => {
                    result.send(self.create_user(user));
                }
                DbCommand::ReadUser { user, result } => {
                    result.send(self.read_user(user));
                }
                DbCommand::DeleteUserNick { jid, result } => {
                    result.send(self.delete_user_nick(jid));
                }
                DbCommand::UpsertUserNick { jid, nick, result } => {
                    result.send(self.upsert_user_nick(jid, nick));
                }
                DbCommand::DeleteUserAvatar { jid, result } => {
                    result.send(self.delete_user_avatar(jid));
                }
                DbCommand::UpsertUserAvatar {
                    jid,
                    avatar,
                    result,
                } => {
                    result.send(self.upsert_user_avatar(jid, avatar));
                }
                DbCommand::UpdateUser { user, result } => {
                    result.send(self.update_user(user));
                }
                DbCommand::CreateContact { contact, result } => {
                    result.send(self.create_contact(contact));
                }
                DbCommand::ReadContact { contact, result } => {
                    result.send(self.read_contact(contact));
                }
                DbCommand::ReadContactOpt { contact, result } => {
                    result.send(self.read_contact_opt(contact));
                }
                DbCommand::UpdateContact { contact, result } => {
                    result.send(self.update_contact(contact));
                }
                DbCommand::UpsertContact { contact, result } => {
                    result.send(self.upsert_contact(contact));
                }
                DbCommand::DeleteContact { contact, result } => {
                    result.send(self.delete_contact(contact));
                }
                DbCommand::ReplaceCachedRoster { roster, result } => {
                    result.send(self.replace_cached_roster(roster));
                }
                DbCommand::ReadCachedRoster { result } => {
                    result.send(self.read_cached_roster());
                }
                DbCommand::ReadCachedRosterWithUsers { result } => {
                    result.send(self.read_cached_roster_with_users());
                }
                DbCommand::CreateChat { chat, result } => {
                    result.send(self.create_chat(chat));
                }
                DbCommand::ReadChat { chat, result } => {
                    result.send(self.read_chat(chat));
                }
                DbCommand::ReadChatAndUser { chat, result } => {
                    result.send(self.read_chat_and_user(chat));
                }
                DbCommand::MarkChatAsChatted { chat, result } => {
                    result.send(self.mark_chat_as_chatted(chat));
                }
                DbCommand::DeleteChat { chat, result } => {
                    result.send(self.delete_chat(chat));
                }
                DbCommand::ReadChats { result } => {
                    result.send(self.read_chats());
                }
                DbCommand::ReadChatsOrdered { result } => {
                    result.send(self.read_chats_ordered());
                }
                DbCommand::ReadChatsOrderedWithLatestMessages { result } => {
                    result.send(self.read_chats_ordered_with_latest_messages());
                }
                DbCommand::ReadChatsOrderedWithLatestMessagesAndUsers { result } => {
                    result.send(self.read_chats_ordered_with_latest_messages_and_users());
                }
                DbCommand::CreateMessage {
                    message,
                    chat,
                    from,
                    result,
                } => {
                    result.send(self.create_message(message, chat, from));
                }
                DbCommand::UpsertChatAndUser { chat, result } => {
                    result.send(self.upsert_chat_and_user(&chat));
                }
                DbCommand::UpdateMessageDelivery {
                    message,
                    delivery,
                    result,
                } => {
                    result.send(self.update_message_delivery(message, delivery));
                }
                DbCommand::DeleteMessage { message, result } => {
                    result.send(self.delete_message(message));
                }
                DbCommand::ReadMessage { message, result } => {
                    result.send(self.read_message(message));
                }
                DbCommand::ReadMessageHistory { chat, result } => {
                    result.send(self.read_message_history(chat));
                }
                DbCommand::ReadMessageHistoryWithUsers { chat, result } => {
                    result.send(self.read_message_history_with_users(chat));
                }
                DbCommand::ReadCachedStatus { result } => {
                    result.send(self.read_cached_status());
                }
                DbCommand::UpsertCachedStatus { status, result } => {
                    result.send(self.upsert_cached_status(status));
                }
                DbCommand::DeleteCachedStatus { result } => {
                    result.send(self.delete_cached_status());
                }
                DbCommand::ReadCapabilities { node, result } => {
                    result.send(self.read_capabilities(node));
                }
                DbCommand::UpsertCapabilities {
                    node,
                    capabilities,
                    result,
                } => {
                    result.send(self.upsert_capabilities(node, capabilities));
                }
            }
            tracing::warn!("command finished: {cmd_name}");
        }
        tracing::error!("command: db actor exited");
    }

    pub(crate) fn create_user(&self, user: User) -> Result<(), Error> {
        {
            self.db.execute(
                "insert into users ( jid, nick, avatar ) values ( ?1, ?2, ?3 )",
                (user.jid, user.nick, user.avatar),
            )?;
        }
        Ok(())
    }

    // TODO: this is not a 'read' user
    pub(crate) fn read_user(&self, user: BareJID) -> Result<User, Error> {
        let db = &self.db;
        let user_opt = db
            .query_row(
                "select jid, nick, avatar from users where jid = ?1",
                [&user],
                |row| {
                    Ok(User {
                        jid: row.get(0)?,
                        nick: row.get(1)?,
                        avatar: row.get(2)?,
                    })
                },
            )
            .optional()?;
        match user_opt {
            Some(user) => Ok(user),
            None => {
                db.execute("insert into users ( jid ) values ( ?1 )", [&user])?;
                Ok(User {
                    jid: user,
                    nick: None,
                    avatar: None,
                })
            }
        }
    }

    /// returns whether or not the nickname was updated
    pub(crate) fn delete_user_nick(&self, jid: BareJID) -> Result<bool, Error> {
        let rows_affected;
        {
            rows_affected = self.db.execute("insert into users (jid, nick) values (?1, ?2) on conflict do update set nick = ?3 where nick is not ?4", (jid, None::<String>, None::<String>, None::<String>))?;
        }
        if rows_affected > 0 {
            Ok(true)
        } else {
            Ok(false)
        }
    }

    /// returns whether or not the nickname was updated
    pub(crate) fn upsert_user_nick(&self, jid: BareJID, nick: String) -> Result<bool, Error> {
        let rows_affected;
        {
            rows_affected = self.db.execute("insert into users (jid, nick) values (?1, ?2) on conflict do update set nick = ?3 where nick is not ?4", (jid, &nick, &nick, &nick))?;
        }
        if rows_affected > 0 {
            Ok(true)
        } else {
            Ok(false)
        }
    }

    /// returns whether or not the avatar was updated, and the file to delete if there existed an old avatar
    pub(crate) fn delete_user_avatar(&self, jid: BareJID) -> Result<(bool, Option<String>), Error> {
        let (old_avatar, rows_affected): (Option<String>, _);
        {
            let db = &self.db;
            old_avatar = db
                .query_row("select avatar from users where jid = ?1", [&jid], |row| {
                    Ok(row.get(0)?)
                })
                .optional()?;
            rows_affected = db.execute("insert into users (jid, avatar) values (?1, ?2) on conflict do update set avatar = ?3 where avatar is not ?4", (jid, None::<String>, None::<String>, None::<String>))?;
        }
        if rows_affected > 0 {
            Ok((true, old_avatar))
        } else {
            Ok((false, old_avatar))
        }
    }

    /// returns whether or not the avatar was updated, and the file to delete if there existed an old avatar
    pub(crate) fn upsert_user_avatar(
        &self,
        jid: BareJID,
        avatar: String,
    ) -> Result<(bool, Option<String>), Error> {
        let (old_avatar, rows_affected): (Option<String>, _);
        {
            let db = &self.db;
            old_avatar = db
                .query_row("select avatar from users where jid = ?1", [&jid], |row| {
                    let avatar: Option<String> = row.get(0)?;
                    Ok(avatar)
                })
                .optional()?
                .unwrap_or_default();
            rows_affected = db.execute("insert into users (jid, avatar) values (?1, ?2) on conflict do update set avatar = ?3 where avatar is not ?4", (jid, &avatar, &avatar, &avatar))?;
        }
        if rows_affected > 0 {
            Ok((true, old_avatar))
        } else {
            Ok((false, old_avatar))
        }
    }

    // TODO: use references everywhere?
    pub(crate) fn update_user(&self, user: User) -> Result<(), Error> {
        self.db.execute(
            "update users set nick = ?1, avatar = ?2 where user_jid = ?1",
            (&user.nick, &user.avatar, &user.jid),
        )?;
        Ok(())
    }

    // TODO: should this be allowed? messages need to reference users. should probably only allow delete if every other thing referencing it has been deleted, or if you make clear to the user deleting a user will delete all messages associated with them.
    // pub(crate) fn delete_user(&self, user: JID) -> Result<(), Error> {}

    /// does not create the underlying user, if underlying user does not exist, create_user() must be called separately
    pub(crate) fn create_contact(&self, contact: Contact) -> Result<(), Error> {
        let db = &self.db;
        db.execute(
            "insert into roster ( user_jid, name, subscription ) values ( ?1, ?2, ?3 )",
            (&contact.user_jid, &contact.name, contact.subscription),
        )?;
        for group in contact.groups {
            db.execute(
                "insert into groups (group_name) values (?1) on conflict do nothing",
                [&group],
            )?;
            db.execute(
                "insert into groups_roster (group_name, contact_jid) values (?1, ?2)",
                (group, &contact.user_jid),
            )?;
        }
        Ok(())
    }

    pub(crate) fn read_contact(&self, contact: BareJID) -> Result<Contact, Error> {
        let db = &self.db;
        let mut contact_item = db.query_row(
            "select user_jid, name, subscription from roster where user_jid = ?1",
            [&contact],
            |row| {
                Ok(Contact {
                    user_jid: row.get(0)?,
                    name: row.get(1)?,
                    subscription: row.get(2)?,
                    groups: HashSet::new(),
                })
            },
        )?;
        let groups: Result<HashSet<String>, _> = db
            .prepare("select group_name from groups_roster where contact_jid = ?1")?
            .query_map([&contact], |row| Ok(row.get(0)?))?
            .collect();
        contact_item.groups = groups?;
        Ok(contact_item)
    }

    pub(crate) fn read_contact_opt(&self, contact: BareJID) -> Result<Option<Contact>, Error> {
        let db = &self.db;
        let contact_item = db
            .query_row(
                "select user_jid, name, subscription from roster where user_jid = ?1",
                [&contact],
                |row| {
                    Ok(Contact {
                        user_jid: row.get(0)?,
                        name: row.get(1)?,
                        subscription: row.get(2)?,
                        groups: HashSet::new(),
                    })
                },
            )
            .optional()?;
        if let Some(mut contact_item) = contact_item {
            let groups: Result<HashSet<String>, _> = db
                .prepare("select group_name from groups_roster where contact_jid = ?1")?
                .query_map([&contact], |row| Ok(row.get(0)?))?
                .collect();
            contact_item.groups = groups?;
            Ok(Some(contact_item))
        } else {
            Ok(None)
        }
    }

    /// does not update the underlying user, to update user, update_user() must be called separately
    pub(crate) fn update_contact(&self, contact: Contact) -> Result<(), Error> {
        let db = &self.db;
        db.execute(
            "update roster set name = ?1, subscription = ?2 where user_jid = ?3",
            (&contact.name, &contact.subscription, &contact.user_jid),
        )?;
        db.execute(
            "delete from groups_roster where contact_jid = ?1",
            [&contact.user_jid],
        )?;
        for group in contact.groups {
            db.execute(
                "insert into groups (group_name) values (?1) on conflict do nothing",
                [&group],
            )?;
            db.execute(
                "insert into groups_roster (group_name, contact_jid), values (?1, ?2)",
                (&group, &contact.user_jid),
            )?;
        }
        // TODO: delete orphaned groups from groups table, users etc.
        Ok(())
    }

    pub(crate) fn upsert_contact(&self, contact: Contact) -> Result<(), Error> {
        let db = &self.db;
        db.execute(
            "insert into users (jid) values (?1) on conflict do nothing",
            [&contact.user_jid],
        )?;
        db.execute(
            "insert into roster ( user_jid, name, subscription ) values ( ?1, ?2, ?3 ) on conflict do update set name = ?4, subscription = ?5",
            (&contact.user_jid, &contact.name, &contact.subscription, &contact.name, &contact.subscription),
        )?;
        db.execute(
            "delete from groups_roster where contact_jid = ?1",
            [&contact.user_jid],
        )?;
        for group in contact.groups {
            db.execute(
                "insert into groups (group_name) values (?1) on conflict do nothing",
                [&group],
            )?;
            db.execute(
                "insert into groups_roster (group_name, contact_jid) values (?1, ?2)",
                (group, &contact.user_jid),
            )?;
        }
        Ok(())
    }

    pub(crate) fn delete_contact(&self, contact: BareJID) -> Result<(), Error> {
        self.db
            .execute("delete from roster where user_jid = ?1", [&contact])?;
        Ok(())
    }

    pub(crate) fn replace_cached_roster(&self, roster: Vec<Contact>) -> Result<(), Error> {
        {
            self.db.execute("delete from roster", [])?;
        }
        for contact in roster {
            self.upsert_contact(contact)?;
        }
        Ok(())
    }

    pub(crate) fn read_cached_roster(&self) -> Result<Vec<Contact>, Error> {
        let db = &self.db;
        let mut roster: Vec<_> = db
            .prepare("select user_jid, name, subscription from roster")?
            .query_map([], |row| {
                Ok(Contact {
                    user_jid: row.get(0)?,
                    name: row.get(1)?,
                    subscription: row.get(2)?,
                    groups: HashSet::new(),
                })
            })?
            .collect::<Result<Vec<_>, _>>()?;
        for contact in &mut roster {
            let groups: Result<HashSet<String>, _> = db
                .prepare("select group_name from groups_roster where contact_jid = ?1")?
                .query_map([&contact.user_jid], |row| Ok(row.get(0)?))?
                .collect();
            contact.groups = groups?;
        }
        Ok(roster)
    }

    pub(crate) fn read_cached_roster_with_users(&self) -> Result<Vec<(Contact, User)>, Error> {
        let db = &self.db;
        let mut roster: Vec<(Contact, User)> = db.prepare("select user_jid, name, subscription, jid, nick, avatar from roster join users on jid = user_jid")?.query_map([], |row| {
            Ok((
                Contact {
                    user_jid: row.get(0)?,
                    name: row.get(1)?,
                    subscription: row.get(2)?,
                    groups: HashSet::new(),
                },
                User {
                    jid: row.get(3)?,
                    nick: row.get(4)?,
                    avatar: row.get(5)?,
                }
            ))
        })?.collect::<Result<Vec<_>, _>>()?;
        for (contact, _) in &mut roster {
            let groups: Result<HashSet<String>, _> = db
                .prepare("select group_name from groups_roster where contact_jid = ?1")?
                .query_map([&contact.user_jid], |row| Ok(row.get(0)?))?
                .collect();
            contact.groups = groups?;
        }
        Ok(roster)
    }

    pub(crate) fn create_chat(&self, chat: Chat) -> Result<(), Error> {
        let jid = chat.correspondent();
        debug!("aick: before user identity upsert {jid}");
        let identity = self.upsert_user_identity(jid)?;
        debug!("aick: chat user identity: {identity}");
        let id = Uuid::new_v4();
        debug!("aick: chat uuid: {id}");
        self.db.execute(
            "insert into chats (id, correspondent, have_chatted) values (?1, ?2, ?3)",
            (id, identity, chat.have_chatted),
        )?;
        Ok(())
    }

    /// TODO: this is NOT a read
    pub(crate) fn read_chat(&self, chat: BareJID) -> Result<Chat, Error> {
        let chat_opt = self
            .db
            .query_row(
                "select primary_jid, have_chatted from chats join identities on correspondent = identities.id where primary_jid = ?1",
                [&chat],
                |row| {
                    Ok(Chat {
                        correspondent: row.get(0)?,
                        have_chatted: row.get(1)?,
                    })
                },
            )
            .optional()?;
        match chat_opt {
            Some(chat) => return Ok(chat),
            None => {
                let chat = Chat {
                    correspondent: chat,
                    have_chatted: false,
                };
                self.create_chat(chat.clone())?;
                Ok(chat)
            }
        }
    }

    pub(crate) fn read_chat_and_user(&self, chat: BareJID) -> Result<(Chat, User), Error> {
        let user = self.read_user(chat.clone())?;
        let chat_opt = self.db.query_row(
            "select primary_jid, have_chatted, jid, nick, avatar from chats join identities i on chats.correspondent = i.id join users on jid = primary_jid where primary_jid = ?1",
            [&chat],
            |row| {
                Ok((
                    Chat {
                        correspondent: row.get(0)?,
                        have_chatted: row.get(1)?,
                    },
                    User {
                        jid: row.get(2)?,
                        nick: row.get(3)?,
                        avatar: row.get(4)?,
                    }
                ))
            },
        ).optional()?;
        match chat_opt {
            Some(chat) => return Ok(chat),
            None => {
                let chat = Chat {
                    correspondent: chat,
                    have_chatted: false,
                };
                debug!("aick: creating chat");
                self.create_chat(chat.clone())?;
                debug!("aick: created chat");
                Ok((chat, user))
            }
        }
    }

    pub(crate) fn mark_chat_as_chatted(&self, chat: BareJID) -> Result<(), Error> {
        self.db.execute(
            "update chats set have_chatted = true where correspondent = (select id from identities where primary_jid = ?1)",
            [chat],
        )?;
        Ok(())
    }

    pub(crate) fn delete_chat(&self, chat: BareJID) -> Result<(), Error> {
        self.db
            .execute("delete from chats where correspondent = (select id from identities where primary_jid = ?1)", [chat])?;
        Ok(())
    }

    /// TODO: sorting and filtering (for now there is no sorting)
    pub(crate) fn read_chats(&self) -> Result<Vec<Chat>, Error> {
        let chats = self
            .db
            .prepare("select primary_jid, have_chatted from chats join identities on correspondent = identities.id")?
            .query_map([], |row| {
                Ok(Chat {
                    correspondent: row.get(0)?,
                    have_chatted: row.get(1)?,
                })
            })?
            .collect::<Result<Vec<_>, _>>()?;
        Ok(chats)
    }

    /// chats ordered by date of last message
    // greatest-n-per-group
    pub(crate) fn read_chats_ordered(&self) -> Result<Vec<Chat>, Error> {
        let chats = self
            .db
            .prepare("select i.primary_jid, c.have_chatted, m.* from chats c join identities i on c.correspondent = i.id join (select chat_id, max(timestamp) max_timestamp from messages group by chat_id) max_timestamps on c.id = max_timestamps.chat_id join messages m on max_timestamps.chat_id = m.chat_id and max_timestamps.max_timestamp = m.timestamp order by m.timestamp desc")?
            .query_map([], |row| {
                Ok(Chat {
                    correspondent: row.get(0)?,
                    have_chatted: row.get(1)?,
                })
            })?
            .collect::<Result<Vec<_>, _>>()?;
        Ok(chats)
    }

    /// chats ordered by date of last message
    // greatest-n-per-group
    pub(crate) fn read_chats_ordered_with_latest_messages(
        &self,
    ) -> Result<Vec<(Chat, Message)>, Error> {
        let mut chats = self
            .db
            .prepare(
                "
SELECT ci.primary_jid,
       c.have_chatted,
       m.id,
       ui.primary_jid,
       m.delivery,
       m.timestamp,
       m.body
FROM   chats c
       JOIN (SELECT chat_id,
                    Max(timestamp) max_timestamp
             FROM   messages
             GROUP  BY chat_id) max_timestamps
         ON c.id = max_timestamps.chat_id
       JOIN messages m
         ON max_timestamps.chat_id = m.chat_id
            AND max_timestamps.max_timestamp = m.timestamp
       JOIN identities AS ci
         ON ci.id = c.correspondent
       JOIN identities AS ui
         ON ui.id = m.from_identity
ORDER  BY m.timestamp DESC",
            )?
            .query_map([], |row| {
                Ok((
                    Chat {
                        correspondent: row.get(0)?,
                        have_chatted: row.get(1)?,
                    },
                    Message {
                        id: row.get(2)?,
                        from: row.get(3)?,
                        delivery: row.get(4)?,
                        timestamp: row.get(5)?,
                        body: Body { body: row.get(6)? },
                        // TODO: query raw sources.
                        source: Vec::new(),
                    },
                ))
            })?
            .collect::<Result<Vec<_>, _>>()?;
        Ok(chats)
    }

    /// chats ordered by date of last message
    // greatest-n-per-group
    pub(crate) fn read_chats_ordered_with_latest_messages_and_users(
        &self,
    ) -> Result<Vec<((Chat, User), (Message, User))>, Error> {
        let chats = self
            .db
            .prepare(
                "
SELECT c.id            AS chat_id,
       ci.primary_jid  AS chat_correspondent,
       c.have_chatted  AS chat_have_chatted,
       m.id            AS message_id,
       m.body          AS message_body,
       m.delivery      AS message_delivery,
       m.timestamp     AS message_timestamp,
       ui.primary_jid  AS message_from_jid,
       cu.jid          AS chat_user_jid,
       cu.nick         AS chat_user_nick,
       cu.avatar       AS chat_user_avatar,
       mu.jid          AS message_user_jid,
       mu.nick         AS message_user_nick,
       mu.avatar       AS message_user_avatar
FROM   chats c
       JOIN (SELECT chat_id,
                    Max(timestamp) max_timestamp
             FROM   messages
             GROUP  BY chat_id) max_timestamps
         ON c.id = max_timestamps.chat_id
       JOIN messages m
         ON max_timestamps.chat_id = m.chat_id
            AND max_timestamps.max_timestamp = m.timestamp
       JOIN identities AS ci
         ON ci.id = c.correspondent
       JOIN identities AS ui
         ON ui.id = m.from_identity
       JOIN users AS cu
         ON cu.jid = ci.primary_jid
       JOIN users AS mu
         ON mu.jid = ui.primary_jid
ORDER  BY m.timestamp DESC",
            )?
            .query_map([], |row| {
                Ok((
                    (
                        Chat {
                            correspondent: row.get("chat_correspondent")?,
                            have_chatted: row.get("chat_have_chatted")?,
                        },
                        User {
                            jid: row.get("chat_user_jid")?,
                            nick: row.get("chat_user_nick")?,
                            avatar: row.get("chat_user_avatar")?,
                        },
                    ),
                    (
                        Message {
                            id: row.get("message_id")?,
                            from: row.get("message_from_jid")?,
                            delivery: row.get("message_delivery")?,
                            timestamp: row.get("message_timestamp")?,
                            body: Body {
                                body: row.get("message_body")?,
                            },
                            // TODO: query raw sources.
                            source: Vec::new(),
                        },
                        User {
                            jid: row.get("message_user_jid")?,
                            nick: row.get("message_user_nick")?,
                            avatar: row.get("message_user_avatar")?,
                        },
                    ),
                ))
            })?
            .collect::<Result<Vec<_>, _>>()?;
        Ok(chats)
    }

    #[tracing::instrument]
    fn read_chat_id(&self, chat: BareJID) -> Result<Uuid, Error> {
        let chat_id = self.db.query_row(
            "select id from chats where correspondent = (select id from identities where primary_jid = ?1)",
            [chat],
            |row| Ok(row.get(0)?),
        )?;
        Ok(chat_id)
    }

    fn read_chat_id_opt(&self, chat: BareJID) -> Result<Option<Uuid>, Error> {
        let chat_id = self
            .db
            .query_row(
                "select id from chats where correspondent = (select id from identities where primary_jid = ?1)",
                [chat],
                |row| Ok(row.get(0)?),
            )
            .optional()?;
        Ok(chat_id)
    }

    /// if the chat doesn't already exist, it must be created by calling create_chat() before running this function.
    /// create direct message from incoming. MUST upsert user w/ identity, and chat w/identity
    #[tracing::instrument]
    pub(crate) fn create_message(
        &self,
        message: Message,
        chat: BareJID,
        from: BareJID,
    ) -> Result<(), Error> {
        debug!("oomla: 1");
        let chat_identity = self.upsert_user_identity(&chat)?;
        debug!("oomla: upserted chat user and got identity {chat_identity}");
        let (chat_id, _) = self.upsert_chat(chat_identity)?;
        debug!("oomla: upserted chat and got chat id {chat_id}");
        let from_identity = self.upsert_user_identity(&from)?;
        debug!("oomla: upserted from user and got user identity {from_identity}");
        tracing::debug!("creating message");
        self.db.execute("insert into messages (id, body, chat_id, from_identity, timestamp, delivery) values (?1, ?2, ?3, ?4, ?5, ?6)", (&message.id, &message.body.body, &chat_id, &from_identity, &message.timestamp, &message.delivery))?;
        Ok(())
    }

    // returns the user identity
    pub(crate) fn upsert_user_identity(&self, chat: &BareJID) -> Result<Uuid, Error> {
        self.db.execute(
            "insert into users (jid) values (?1) on conflict do nothing",
            [&chat],
        )?;
        let identity = Uuid::new_v4();
        self.db.execute(
            "insert into identities (id, primary_jid) values (?1, ?2) on conflict do nothing",
            (identity, &chat),
        )?;
        let identity = self.db.query_row(
            "select id from identities where primary_jid = ?1",
            [&chat],
            |row| Ok(row.get(0)?),
        )?;
        Ok(identity)
    }

    pub(crate) fn upsert_chat(&self, identity: Uuid) -> Result<(Uuid, bool), Error> {
        let chat_id = Uuid::new_v4();
        self.db.execute("insert into chats (id, correspondent, have_chatted) values (?1, ?2, ?3) on conflict do nothing", (chat_id, &identity, false))?;
        let (chat_id, have_chatted) = self.db.query_row(
            "select id, have_chatted from chats where correspondent = ?1",
            [identity],
            |row| Ok((row.get(0)?, row.get(1)?)),
        )?;
        Ok((chat_id, have_chatted))
    }

    pub(crate) fn upsert_chat_and_user(&self, chat: &BareJID) -> Result<bool, Error> {
        let chat_identity = self.upsert_user_identity(&chat)?;
        let (_chat_id, have_chatted) = self.upsert_chat(chat_identity)?;
        Ok(have_chatted)
    }

    pub(crate) fn update_message_delivery(
        &self,
        message: Uuid,
        delivery: Delivery,
    ) -> Result<(), Error> {
        self.db.execute(
            "update messages set delivery = ?1 where id = ?2",
            (delivery, message),
        )?;
        Ok(())
    }

    // pub(crate) fn read_message(&self, message: Uuid) -> Result<Message, Error> {
    //     Ok(Message {
    //         id: Uuid,
    //         from: todo!(),
    //         delivery: todo!(),
    //         timestamp: todo!(),
    //         body: todo!(),
    //     })
    // }

    // TODO: message updates/edits pub(crate) fn update_message(&self, message: Message) -> Result<(), Error> {}

    pub(crate) fn delete_message(&self, message: Uuid) -> Result<(), Error> {
        self.db
            .execute("delete from messages where id = ?1", [message])?;
        Ok(())
    }

    pub(crate) fn read_message(&self, message: Uuid) -> Result<Message, Error> {
        let message = self.db.query_row(
            "select id, primary_jid, delivery, timestamp, body from messages join identities on identities.id = messages.from_identity where messages.id = ?1",
            [&message],
            |row| {
                Ok(Message {
                    id: row.get(0)?,
                    // TODO: full from
                    from: row.get(1)?,
                    delivery: row.get(2)?,
                    timestamp: row.get(3)?,
                    body: Body { body: row.get(4)? },
                    // TODO: query raw sources
                    source: Vec::new(),
                })
            },
        )?;
        Ok(message)
    }

    // TODO: paging
    pub(crate) fn read_message_history(&self, chat: BareJID) -> Result<Vec<Message>, Error> {
        let chat_id = self.read_chat_id(chat)?;
        let messages = self
            .db
            .prepare(
                "select id, primary_jid, delivery, timestamp, body from messages join identities on identities.id = messages.from_identity where chat_id = (select id from identities where primary_jid = ?1)",
            )?
            .query_map([chat_id], |row| {
                Ok(Message {
                    id: row.get(0)?,
                    from: row.get(1)?,
                    delivery: row.get(2)?,
                    timestamp: row.get(3)?,
                    body: Body { body: row.get(4)? },
                    // TODO: query raw sources
                    source: Vec::new(),
                })
            })?
            .collect::<Result<Vec<_>, _>>()?;
        Ok(messages)
    }

    pub(crate) fn read_message_history_with_users(
        &self,
        chat: BareJID,
    ) -> Result<Vec<(Message, User)>, Error> {
        let chat_id = self.read_chat_id(chat)?;
        let messages = self
            .db
            .prepare(
                "select id, primary_jid, delivery, timestamp, body, jid, nick, avatar from messages join users on jid = (select primary_jid from identities where id = from_identity) where chat_id = ? order by timestamp asc",
            )?
            .query_map([chat_id], |row| {
                Ok((
                    Message {
                        id: row.get(0)?,
                        // TODO: full from
                        from: row.get(1)?,
                        delivery: row.get(2)?,
                        timestamp: row.get(3)?,
                        body: Body { body: row.get(4)? },
                        // TODO: query raw sources
                        source: Vec::new(),
                    },
                    User {
                        jid: row.get(5)?,
                        nick: row.get(6)?,
                        avatar: row.get(7)?,
                    }
                ))
            })?
            .collect::<Result<Vec<_>, _>>()?;
        Ok(messages)
    }

    pub(crate) fn read_cached_status(&self) -> Result<Online, Error> {
        let status = self.db.query_row(
            "select show, message from cached_status where id = 0",
            [],
            |row| {
                Ok(Online {
                    show: row.get(0)?,
                    status: row.get(1)?,
                    priority: None,
                })
            },
        )?;
        Ok(status)
    }

    pub(crate) fn upsert_cached_status(&self, status: Online) -> Result<(), Error> {
        self.db.execute("insert into cached_status (id, show, message) values (0, ?1, ?2) on conflict do update set show = ?3, message = ?4", (status.show, &status.status, status.show, &status.status))?;
        Ok(())
    }

    pub(crate) fn delete_cached_status(&self) -> Result<(), Error> {
        self.db.execute(
            "update cached_status set show = null, message = null where id = 0",
            [],
        )?;
        Ok(())
    }

    pub(crate) fn read_capabilities(&self, node: String) -> Result<String, Error> {
        let capabilities = self.db.query_row(
            "select capabilities from capability_hash_nodes where node = ?1",
            [node],
            |row| Ok(row.get(0)?),
        )?;
        Ok(capabilities)
    }

    pub(crate) fn upsert_capabilities(
        &self,
        node: String,
        capabilities: String,
    ) -> Result<(), Error> {
        let now = Utc::now();
        self.db.execute("insert into capability_hash_nodes (node, timestamp, capabilities) values (?1, ?2, ?3) on conflict do update set timestamp = ?, capabilities = ?", (node, now, &capabilities, now, &capabilities))?;
        Ok(())
    }
}