aboutsummaryrefslogblamecommitdiffstats
path: root/filamento/src/db.rs
blob: b45e471dee44e11f6005fa84b79690f64df9e6bd (plain) (tree)
1
2
3
4
5
6
7
8
9
10
11
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
                                                                   
 
                            
             

                                              




                                         


               
                                          
                                                       
                     



                    
                       
               














                                          


         
                                       
                                            






















































                                                                                         
                                          





















































































































































































































































































































































































































































                                                                                                                                                                                                                                                              
 









































































































































































































                                                                                            











                                                      
                 



                                                       

                                                     
                          

                                                               








                                                                           

     
                                     
                                  
                                                                                               

                                                               
                                 

     
                                     
                                  


                                            
                                          
                                              
                                                               
                                 

     
















































































































































                                                                                             
     
 
                                                                       
         
                            



                                                                                


              
                                      

                                                                      























                                                                                

     
                                                       
                                                                            

                          
                                                                                                                                                                                                              





                              


                                                       
                                                                                          

                          
                                                                                                                                                                                   





                              


                                                                                                            
                                                                                                

                                                             
                              











                                                                                                                                                                                                               


                                                                                                            
                                     



                                                

                                                             
                              













                                                                                                                                                                                          

     
                                      

                                                                       


                                                                          



                                                                                                                                                                                                                                                              
                                                                          

                                                                                                                       

                                                                                













                                                                                        


              

                                                                               

















                                                                                    

     

                                                                                            























                                                                                        


                                                                                                    

                                                                                


















                                                                                       


              

                                                                                





















                                                                                                                                               


              

                                                                            
                                                                            


              
                                                                                           
         
                                                       

                               
                                          
         


              

                                                                            


















                                                                                        

     

                                                                                               






















                                                                                                                                                                                   

     
                                                                       

                                       
                        


                                                                                      




                                                                                                                                                                                                                       

                                                                      









                                                                                     

     

                                                                               


                                                                            


              
                                            



                               

                                               
                                     







                                                                                                                 

     
                                
 

                                                                      
                                                                            



                                                                 
                                                                 
                        
               








                                                                      

     

                                             
                                                                         
                        
               








                                                                                                                                                                                                                                                                                                                                                   

     

                                             
                                                          

                                              
                        
               



















                                                                                                                                                                                                                                                                                                                                                                                                 



                                             
                                                                    

                                                              
                        
               

































                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                            

     
                          

                                                              




                                                            

     
                                                                          
                          
               






                                                                


                                                                                                                    
                          
                                 




                            
                                      
                                               
                                            
                                                                                                                                                                                                                                                                          


              
                                                                                  
                                       
                          

                                                                         
                         













                                                                                                                                               

     
                                                                      
                          
                                                    

                         

                                                        
                  

                   
                            
                                      

                                             
                                                    
                            



                                                                                                    
                                                  


              
                                          



                            
                        





                                                              
                                                                                   







                                  
 
                                                                                                                
 

                                                                             
                                                                       


              

                                                                                















                                                                                         
                   

                                                                                         
                           
               














                                                                                                  
     
 
                                                  


                                              
                                               
                           
               





















                                                                                                                                                                        

     

                                                                      










                                                                   

     

                                                                                                                                                                                                           


              

                                                                    


                                                                                

              
 

                                                                                   




                                                                             

     
                                      
              

                             
                            
                             
                                                                                                                                                                                                                            

              
 
use std::{collections::HashSet, ops::Deref, path::Path, sync::Arc};

use chrono::{DateTime, Utc};
use jid::JID;
use rusqlite::{Connection, OptionalExtension};
use tokio::sync::{Mutex, MutexGuard};
use tokio::sync::{mpsc, oneshot};
use tokio::task::{spawn, spawn_blocking};
#[cfg(target_arch = "wasm32")]
use tokio_with_wasm::alias as tokio;
use tracing::debug;
use uuid::Uuid;

use crate::{
    chat::{Body, Chat, Delivery, Message},
    error::{DatabaseError as Error, DatabaseOpenError},
    presence::Online,
    roster::Contact,
    user::User,
};

#[derive(Clone, Debug)]
pub struct Db {
    sender: mpsc::Sender<DbCommand>,
}

impl Deref for Db {
    type Target = mpsc::Sender<DbCommand>;

    fn deref(&self) -> &Self::Target {
        &self.sender
    }
}

#[derive(Debug)]
pub struct DbActor {
    receiver: mpsc::Receiver<DbCommand>,
    db: Connection,
}

impl Db {
    #[cfg(not(target_arch = "wasm32"))]
    pub async fn create_connect_and_migrate(
        path: impl AsRef<Path> + Send,
    ) -> Result<Self, DatabaseOpenError> {
        let (sender, receiver) = mpsc::channel(20);
        let (result_send, result_recv) = oneshot::channel();
        spawn_blocking(move || {
            let result = DbActor::new(path, receiver).await;
            match result {
                Ok(a) => {
                    result_send.send(Ok(()));
                    a.run()
                }
                Err(e) => {
                    result_send.send(Err(e));
                }
            }
        });
        match result_recv.await {
            Ok(r) => match r {
                Ok(o) => Ok(Self { sender }),
                Err(e) => return Err(e),
            },
            Err(e) => return Err(e.into()),
        }
    }

    #[cfg(not(target_arch = "wasm32"))]
    pub async fn create_connect_and_migrate_memory() -> Result<Self, DatabaseOpenError> {
        let (sender, receiver) = mpsc::channel(20);
        let (result_send, result_recv) = oneshot::channel();
        spawn_blocking(move || {
            let result = DbActor::new_memory(receiver).await;
            match result {
                Ok(a) => {
                    result_send.send(Ok(()));
                    // TODO: async run when not wasm
                    a.run()
                }
                Err(e) => {
                    result_send.send(Err(e));
                }
            }
        });
        match result_recv.await {
            Ok(r) => match r {
                Ok(o) => Ok(Self { sender }),
                Err(e) => return Err(e),
            },
            Err(e) => return Err(e.into()),
        }
    }

    /// `file_name` should be a file not in a directory
    #[cfg(target_arch = "wasm32")]
    pub async fn create_connect_and_migrate(
        file_name: impl AsRef<str> + Send + 'static,
    ) -> Result<Self, DatabaseOpenError> {
        use tokio_with_wasm::spawn_local;

        let (sender, receiver) = mpsc::channel(20);
        let (result_send, result_recv) = oneshot::channel();
        spawn_blocking(move || {
            spawn_local(async move {
                debug!("installing opfs in spawn");
                rusqlite::ffi::install_opfs_sahpool(
                    Some(&rusqlite::ffi::OpfsSAHPoolCfg::default()),
                    false,
                )
                .await
                .unwrap();
                debug!("opfs installed");
                let file_name = format!("file:{}?vfs=opfs-sahpool", file_name.as_ref());
                let result = DbActor::new(file_name, receiver);
                match result {
                    Ok(a) => {
                        result_send.send(Ok(()));
                        a.run()
                    }
                    Err(e) => {
                        result_send.send(Err(e));
                    }
                }
            });
        });
        match result_recv.await {
            Ok(r) => match r {
                Ok(o) => Ok(Self { sender }),
                Err(e) => return Err(e),
            },
            Err(e) => return Err(e.into()),
        }
    }

    #[cfg(target_arch = "wasm32")]
    pub async fn create_connect_and_migrate_memory() -> Result<Self, DatabaseOpenError> {
        let (sender, receiver) = mpsc::channel(20);
        let (result_send, result_recv) = oneshot::channel();
        spawn_blocking(move || {
            let result = DbActor::new_memory(receiver);
            match result {
                Ok(a) => {
                    result_send.send(Ok(()));
                    a.run()
                }
                Err(e) => {
                    result_send.send(Err(e));
                }
            }
        });
        match result_recv.await {
            Ok(r) => match r {
                Ok(o) => Ok(Self { sender }),
                Err(e) => return Err(e),
            },
            Err(e) => return Err(e.into()),
        }
    }

    pub(crate) async fn create_user(&self, user: User) -> Result<(), Error> {
        let (result, recv) = oneshot::channel();
        let command = DbCommand::CreateUser { user, result };
        self.sender.send(command).await?;
        let result = recv.await?;
        result
    }

    // TODO: this is not a 'read' user
    pub(crate) async fn read_user(&self, user: JID) -> Result<User, Error> {
        let (result, recv) = oneshot::channel();
        let command = DbCommand::ReadUser { user, result };
        self.sender.send(command).await?;
        let result = recv.await?;
        result
    }

    /// returns whether or not the nickname was updated
    pub(crate) async fn delete_user_nick(&self, jid: JID) -> Result<bool, Error> {
        let (result, recv) = oneshot::channel();
        let command = DbCommand::DeleteUserNick { jid, result };
        self.sender.send(command).await?;
        let result = recv.await?;
        result
    }

    /// returns whether or not the nickname was updated
    pub(crate) async fn upsert_user_nick(&self, jid: JID, nick: String) -> Result<bool, Error> {
        let (result, recv) = oneshot::channel();
        let command = DbCommand::UpsertUserNick { jid, nick, result };
        self.sender.send(command).await?;
        let result = recv.await?;
        result
    }

    /// returns whether or not the avatar was updated, and the file to delete if there existed an old avatar
    pub(crate) async fn delete_user_avatar(
        &self,
        jid: JID,
    ) -> Result<(bool, Option<String>), Error> {
        let (result, recv) = oneshot::channel();
        let command = DbCommand::DeleteUserAvatar { jid, result };
        self.sender.send(command).await?;
        let result = recv.await?;
        result
    }

    /// returns whether or not the avatar was updated, and the file to delete if there existed an old avatar
    pub(crate) async fn upsert_user_avatar(
        &self,
        jid: JID,
        avatar: String,
    ) -> Result<(bool, Option<String>), Error> {
        let (result, recv) = oneshot::channel();
        let command = DbCommand::UpsertUserAvatar {
            jid,
            avatar,
            result,
        };
        self.sender.send(command).await?;
        let result = recv.await?;
        result
    }

    // TODO: use references everywhere
    pub(crate) async fn update_user(&self, user: User) -> Result<(), Error> {
        let (result, recv) = oneshot::channel();
        let command = DbCommand::UpdateUser { user, result };
        self.sender.send(command).await?;
        let result = recv.await?;
        result
    }

    // TODO: should this be allowed? messages need to reference users. should probably only allow delete if every other thing referencing it has been deleted, or if you make clear to the user deleting a user will delete all messages associated with them.
    // pub(crate) async fn delete_user(&self, user: JID) -> Result<(), Error> {}

    /// does not create the underlying user, if underlying user does not exist, create_user() must be called separately
    pub(crate) async fn create_contact(&self, contact: Contact) -> Result<(), Error> {
        let (result, recv) = oneshot::channel();
        let command = DbCommand::CreateContact { contact, result };
        self.sender.send(command).await?;
        let result = recv.await?;
        result
    }

    pub(crate) async fn read_contact(&self, contact: JID) -> Result<Contact, Error> {
        let (result, recv) = oneshot::channel();
        let command = DbCommand::ReadContact { contact, result };
        self.sender.send(command).await?;
        let result = recv.await?;
        result
    }

    pub(crate) async fn read_contact_opt(&self, contact: JID) -> Result<Option<Contact>, Error> {
        let (result, recv) = oneshot::channel();
        let command = DbCommand::ReadContactOpt { contact, result };
        self.sender.send(command).await?;
        let result = recv.await?;
        result
    }

    /// does not update the underlying user, to update user, update_user() must be called separately
    pub(crate) async fn update_contact(&self, contact: Contact) -> Result<(), Error> {
        let (result, recv) = oneshot::channel();
        let command = DbCommand::UpdateContact { contact, result };
        self.sender.send(command).await?;
        let result = recv.await?;
        result
    }

    pub(crate) async fn upsert_contact(&self, contact: Contact) -> Result<(), Error> {
        let (result, recv) = oneshot::channel();
        let command = DbCommand::UpsertContact { contact, result };
        self.sender.send(command).await?;
        let result = recv.await?;
        result
    }

    pub(crate) async fn delete_contact(&self, contact: JID) -> Result<(), Error> {
        let (result, recv) = oneshot::channel();
        let command = DbCommand::DeleteContact { contact, result };
        self.sender.send(command).await?;
        let result = recv.await?;
        result
    }

    pub(crate) async fn replace_cached_roster(&self, roster: Vec<Contact>) -> Result<(), Error> {
        let (result, recv) = oneshot::channel();
        let command = DbCommand::ReplaceCachedRoster { roster, result };
        self.sender.send(command).await?;
        let result = recv.await?;
        result
    }

    pub(crate) async fn read_cached_roster(&self) -> Result<Vec<Contact>, Error> {
        let (result, recv) = oneshot::channel();
        let command = DbCommand::ReadCachedRoster { result };
        self.sender.send(command).await?;
        let result = recv.await?;
        result
    }

    pub(crate) async fn read_cached_roster_with_users(
        &self,
    ) -> Result<Vec<(Contact, User)>, Error> {
        let (result, recv) = oneshot::channel();
        let command = DbCommand::ReadCachedRosterWithUsers { result };
        self.sender.send(command).await?;
        let result = recv.await?;
        result
    }

    pub(crate) async fn create_chat(&self, chat: Chat) -> Result<(), Error> {
        let (result, recv) = oneshot::channel();
        let command = DbCommand::CreateChat { chat, result };
        self.sender.send(command).await?;
        let result = recv.await?;
        result
    }

    // TODO: what happens if a correspondent changes from a user to a contact? maybe just have correspondent be a user, then have the client make the user show up as a contact in ui if they are in the loaded roster.

    pub(crate) async fn read_chat(&self, chat: JID) -> Result<Chat, Error> {
        let (result, recv) = oneshot::channel();
        let command = DbCommand::ReadChat { chat, result };
        self.sender.send(command).await?;
        let result = recv.await?;
        result
    }

    pub(crate) async fn mark_chat_as_chatted(&self, chat: JID) -> Result<(), Error> {
        let (result, recv) = oneshot::channel();
        let command = DbCommand::MarkChatAsChatted { chat, result };
        self.sender.send(command).await?;
        let result = recv.await?;
        result
    }

    pub(crate) async fn update_chat_correspondent(
        &self,
        old_chat: Chat,
        new_correspondent: JID,
    ) -> Result<Chat, Error> {
        let (result, recv) = oneshot::channel();
        let command = DbCommand::UpdateChatCorrespondent {
            old_chat,
            new_correspondent,
            result,
        };
        self.sender.send(command).await?;
        let result = recv.await?;
        result
    }

    // pub(crate) async fn update_chat

    pub(crate) async fn delete_chat(&self, chat: JID) -> Result<(), Error> {
        let (result, recv) = oneshot::channel();
        let command = DbCommand::DeleteChat { chat, result };
        self.sender.send(command).await?;
        let result = recv.await?;
        result
    }

    /// TODO: sorting and filtering (for now there is no sorting)
    pub(crate) async fn read_chats(&self) -> Result<Vec<Chat>, Error> {
        let (result, recv) = oneshot::channel();
        let command = DbCommand::ReadChats { result };
        self.sender.send(command).await?;
        let result = recv.await?;
        result
    }

    /// chats ordered by date of last message
    // greatest-n-per-group
    pub(crate) async fn read_chats_ordered(&self) -> Result<Vec<Chat>, Error> {
        let (result, recv) = oneshot::channel();
        let command = DbCommand::ReadChatsOrdered { result };
        self.sender.send(command).await?;
        let result = recv.await?;
        result
    }

    /// chats ordered by date of last message
    // greatest-n-per-group
    pub(crate) async fn read_chats_ordered_with_latest_messages(
        &self,
    ) -> Result<Vec<(Chat, Message)>, Error> {
        let (result, recv) = oneshot::channel();
        let command = DbCommand::ReadChatsOrderedWithLatestMessages { result };
        self.sender.send(command).await?;
        let result = recv.await?;
        result
    }

    /// chats ordered by date of last message
    // greatest-n-per-group
    pub(crate) async fn read_chats_ordered_with_latest_messages_and_users(
        &self,
    ) -> Result<Vec<((Chat, User), (Message, User))>, Error> {
        let (result, recv) = oneshot::channel();
        let command = DbCommand::ReadChatsOrderedWithLatestMessagesAndUsers { result };
        self.sender.send(command).await?;
        let result = recv.await?;
        result
    }

    /// if the chat doesn't already exist, it must be created by calling create_chat() before running this function.
    #[tracing::instrument]
    pub(crate) async fn create_message(
        &self,
        message: Message,
        chat: JID,
        from: JID,
    ) -> Result<(), Error> {
        let (result, recv) = oneshot::channel();
        let command = DbCommand::CreateMessage {
            message,
            chat,
            from,
            result,
        };
        self.sender.send(command).await?;
        let result = recv.await?;
        result
    }

    pub(crate) async fn upsert_chat_and_user(&self, chat: JID) -> Result<bool, Error> {
        let (result, recv) = oneshot::channel();
        let command = DbCommand::UpsertChatAndUser { chat, result };
        self.sender.send(command).await?;
        let result = recv.await?;
        result
    }

    /// create direct message from incoming. MUST upsert chat and user
    #[tracing::instrument]
    pub(crate) async fn create_message_with_user_resource(
        &self,
        message: Message,
        // TODO: enforce two kinds of jid. bare and full
        // must be bare jid
        chat: JID,
        // full jid
        from: JID,
    ) -> Result<(), Error> {
        let (result, recv) = oneshot::channel();
        let command = DbCommand::CreateMessageWithUserResource {
            message,
            chat,
            from,
            result,
        };
        self.sender.send(command).await?;
        let result = recv.await?;
        result
    }

    pub(crate) async fn update_message_delivery(
        &self,
        message: Uuid,
        delivery: Delivery,
    ) -> Result<(), Error> {
        let (result, recv) = oneshot::channel();
        let command = DbCommand::UpdateMessageDelivery {
            message,
            delivery,
            result,
        };
        self.sender.send(command).await?;
        let result = recv.await?;
        result
    }

    // pub(crate) async fn read_message(&self, message: Uuid) -> Result<Message, Error> {
    //     Ok(Message {
    //         id: Uuid,
    //         from: todo!(),
    //         delivery: todo!(),
    //         timestamp: todo!(),
    //         body: todo!(),
    //     })
    // }

    // TODO: message updates/edits pub(crate) async fn update_message(&self, message: Message) -> Result<(), Error> {}

    pub(crate) async fn delete_message(&self, message: Uuid) -> Result<(), Error> {
        let (result, recv) = oneshot::channel();
        let command = DbCommand::DeleteMessage { message, result };
        self.sender.send(command).await?;
        let result = recv.await?;
        result
    }

    pub(crate) async fn read_message(&self, message: Uuid) -> Result<Message, Error> {
        let (result, recv) = oneshot::channel();
        let command = DbCommand::ReadMessage { message, result };
        self.sender.send(command).await?;
        let result = recv.await?;
        result
    }

    // TODO: paging
    pub(crate) async fn read_message_history(&self, chat: JID) -> Result<Vec<Message>, Error> {
        let (result, recv) = oneshot::channel();
        let command = DbCommand::ReadMessageHistory { chat, result };
        self.sender.send(command).await?;
        let result = recv.await?;
        result
    }

    pub(crate) async fn read_message_history_with_users(
        &self,
        chat: JID,
    ) -> Result<Vec<(Message, User)>, Error> {
        let (result, recv) = oneshot::channel();
        let command = DbCommand::ReadMessageHistoryWithUsers { chat, result };
        self.sender.send(command).await?;
        let result = recv.await?;
        result
    }

    pub(crate) async fn read_cached_status(&self) -> Result<Online, Error> {
        let (result, recv) = oneshot::channel();
        let command = DbCommand::ReadCachedStatus { result };
        self.sender.send(command).await?;
        let result = recv.await?;
        result
    }

    pub(crate) async fn upsert_cached_status(&self, status: Online) -> Result<(), Error> {
        let (result, recv) = oneshot::channel();
        let command = DbCommand::UpsertCachedStatus { status, result };
        self.sender.send(command).await?;
        let result = recv.await?;
        result
    }

    pub(crate) async fn delete_cached_status(&self) -> Result<(), Error> {
        let (result, recv) = oneshot::channel();
        let command = DbCommand::DeleteCachedStatus { result };
        self.sender.send(command).await?;
        let result = recv.await?;
        result
    }

    pub(crate) async fn read_capabilities(&self, node: String) -> Result<String, Error> {
        let (result, recv) = oneshot::channel();
        let command = DbCommand::ReadCapabilities { node, result };
        self.sender.send(command).await?;
        let result = recv.await?;
        result
    }

    pub(crate) async fn upsert_capabilities(
        &self,
        node: String,
        capabilities: String,
    ) -> Result<(), Error> {
        let (result, recv) = oneshot::channel();
        let command = DbCommand::UpsertCapabilities {
            node,
            capabilities,
            result,
        };
        self.sender.send(command).await?;
        let result = recv.await?;
        result
    }
}

// TODO: i should really just make an actor macro
pub enum DbCommand {
    CreateUser {
        user: User,
        result: oneshot::Sender<Result<(), Error>>,
    },
    ReadUser {
        user: JID,
        result: oneshot::Sender<Result<User, Error>>,
    },
    DeleteUserNick {
        jid: JID,
        result: oneshot::Sender<Result<bool, Error>>,
    },
    UpsertUserNick {
        jid: JID,
        nick: String,
        result: oneshot::Sender<Result<bool, Error>>,
    },
    DeleteUserAvatar {
        jid: JID,
        result: oneshot::Sender<Result<(bool, Option<String>), Error>>,
    },
    UpsertUserAvatar {
        jid: JID,
        avatar: String,
        result: oneshot::Sender<Result<(bool, Option<String>), Error>>,
    },
    UpdateUser {
        user: User,
        result: oneshot::Sender<Result<(), Error>>,
    },
    CreateContact {
        contact: Contact,
        result: oneshot::Sender<Result<(), Error>>,
    },
    ReadContact {
        contact: JID,
        result: oneshot::Sender<Result<Contact, Error>>,
    },
    ReadContactOpt {
        contact: JID,
        result: oneshot::Sender<Result<Option<Contact>, Error>>,
    },
    UpdateContact {
        contact: Contact,
        result: oneshot::Sender<Result<(), Error>>,
    },
    UpsertContact {
        contact: Contact,
        result: oneshot::Sender<Result<(), Error>>,
    },
    DeleteContact {
        contact: JID,
        result: oneshot::Sender<Result<(), Error>>,
    },
    ReplaceCachedRoster {
        roster: Vec<Contact>,
        result: oneshot::Sender<Result<(), Error>>,
    },
    ReadCachedRoster {
        result: oneshot::Sender<Result<Vec<Contact>, Error>>,
    },
    ReadCachedRosterWithUsers {
        result: oneshot::Sender<Result<Vec<(Contact, User)>, Error>>,
    },
    CreateChat {
        chat: Chat,
        result: oneshot::Sender<Result<(), Error>>,
    },
    ReadChat {
        chat: JID,
        result: oneshot::Sender<Result<Chat, Error>>,
    },
    MarkChatAsChatted {
        chat: JID,
        result: oneshot::Sender<Result<(), Error>>,
    },
    UpdateChatCorrespondent {
        old_chat: Chat,
        new_correspondent: JID,
        result: oneshot::Sender<Result<Chat, Error>>,
    },
    DeleteChat {
        chat: JID,
        result: oneshot::Sender<Result<(), Error>>,
    },
    ReadChats {
        result: oneshot::Sender<Result<Vec<Chat>, Error>>,
    },
    ReadChatsOrdered {
        result: oneshot::Sender<Result<Vec<Chat>, Error>>,
    },
    ReadChatsOrderedWithLatestMessages {
        result: oneshot::Sender<Result<Vec<(Chat, Message)>, Error>>,
    },
    ReadChatsOrderedWithLatestMessagesAndUsers {
        result: oneshot::Sender<Result<Vec<((Chat, User), (Message, User))>, Error>>,
    },
    // ReadChatID {

    //     result: oneshot::Sender<Result<, Error>>,
    // },
    // ReadChatIDOpt {
    //     chat: JID,
    //     result: oneshot::Sender<Result<Option<Uuid>, Error>>,
    // },
    CreateMessage {
        message: Message,
        chat: JID,
        from: JID,
        result: oneshot::Sender<Result<(), Error>>,
    },
    UpsertChatAndUser {
        chat: JID,
        result: oneshot::Sender<Result<bool, Error>>,
    },
    CreateMessageWithUserResource {
        message: Message,
        chat: JID,
        from: JID,
        result: oneshot::Sender<Result<(), Error>>,
    },
    UpdateMessageDelivery {
        message: Uuid,
        delivery: Delivery,
        result: oneshot::Sender<Result<(), Error>>,
    },
    DeleteMessage {
        message: Uuid,
        result: oneshot::Sender<Result<(), Error>>,
    },
    ReadMessage {
        message: Uuid,
        result: oneshot::Sender<Result<Message, Error>>,
    },
    ReadMessageHistory {
        chat: JID,
        result: oneshot::Sender<Result<Vec<Message>, Error>>,
    },
    ReadMessageHistoryWithUsers {
        chat: JID,
        result: oneshot::Sender<Result<Vec<(Message, User)>, Error>>,
    },
    ReadCachedStatus {
        result: oneshot::Sender<Result<Online, Error>>,
    },
    UpsertCachedStatus {
        status: Online,
        result: oneshot::Sender<Result<(), Error>>,
    },
    DeleteCachedStatus {
        result: oneshot::Sender<Result<(), Error>>,
    },
    ReadCapabilities {
        node: String,
        result: oneshot::Sender<Result<String, Error>>,
    },
    UpsertCapabilities {
        node: String,
        capabilities: String,
        result: oneshot::Sender<Result<(), Error>>,
    },
}

impl DbActor {
    /// must be run in blocking spawn
    #[cfg(not(target_arch = "wasm32"))]
    pub(crate) fn new(path: impl AsRef<Path>, receiver: mpsc::Receiver<DbCommand>) -> Self {
        if let Some(dir) = path.as_ref().parent() {
            if dir.is_dir() {
            } else {
                tokio::fs::create_dir_all(dir).await?;
            }
            let _file = tokio::fs::OpenOptions::new()
                .append(true)
                .create(true)
                .open(path.as_ref())
                .await?;
        }
        let url = format!(
            "{}",
            path.as_ref()
                .to_str()
                .ok_or(DatabaseOpenError::InvalidPath)?
        );
        // let db = SqlitePool::connect(&url).await?;
        // migrate!().run(&db).await?;
        // Ok(Self { db })
        let db = Connection::open(url)?;
        db.execute_batch(include_str!("../migrations/1.sql"))?;
        Ok(Self { db, receiver })
    }

    /// must be run in blocking spawn
    #[cfg(not(target_arch = "wasm32"))]
    pub(crate) fn new_memory(receiver: mpsc::Receiver<DbCommand>) -> Self {
        let db = Connection::open_in_memory()?;
        db.execute_batch(include_str!("../migrations/1.sql"))?;
        Ok(Self { db, receiver })
    }

    /// must be run in blocking spawn
    #[cfg(target_arch = "wasm32")]
    pub fn new_memory(receiver: mpsc::Receiver<DbCommand>) -> Result<Self, DatabaseOpenError> {
        let db = Connection::open("mem.db")?;
        db.execute_batch(include_str!("../migrations/1.sql"))?;
        Ok(Self { db, receiver })
    }

    /// must be run in blocking spawn
    #[cfg(target_arch = "wasm32")]
    pub fn new(
        file_name: impl AsRef<Path>,
        receiver: mpsc::Receiver<DbCommand>,
    ) -> Result<Self, DatabaseOpenError> {
        let db = Connection::open(file_name)?;
        db.execute_batch(include_str!("../migrations/1.sql"))?;
        Ok(Self { db, receiver })
    }

    pub(crate) fn run(mut self) {
        while let Some(cmd) = self.receiver.blocking_recv() {
            match cmd {
                DbCommand::CreateUser { user, result } => {
                    result.send(self.create_user(user));
                }
                DbCommand::ReadUser { user, result } => {
                    result.send(self.read_user(user));
                }
                DbCommand::DeleteUserNick { jid, result } => {
                    result.send(self.delete_user_nick(jid));
                }
                DbCommand::UpsertUserNick { jid, nick, result } => {
                    result.send(self.upsert_user_nick(jid, nick));
                }
                DbCommand::DeleteUserAvatar { jid, result } => {
                    result.send(self.delete_user_avatar(jid));
                }
                DbCommand::UpsertUserAvatar {
                    jid,
                    avatar,
                    result,
                } => {
                    result.send(self.upsert_user_avatar(jid, avatar));
                }
                DbCommand::UpdateUser { user, result } => {
                    result.send(self.update_user(user));
                }
                DbCommand::CreateContact { contact, result } => {
                    result.send(self.create_contact(contact));
                }
                DbCommand::ReadContact { contact, result } => {
                    result.send(self.read_contact(contact));
                }
                DbCommand::ReadContactOpt { contact, result } => {
                    result.send(self.read_contact_opt(&contact));
                }
                DbCommand::UpdateContact { contact, result } => {
                    result.send(self.update_contact(contact));
                }
                DbCommand::UpsertContact { contact, result } => {
                    result.send(self.upsert_contact(contact));
                }
                DbCommand::DeleteContact { contact, result } => {
                    result.send(self.delete_contact(contact));
                }
                DbCommand::ReplaceCachedRoster { roster, result } => {
                    result.send(self.replace_cached_roster(roster));
                }
                DbCommand::ReadCachedRoster { result } => {
                    result.send(self.read_cached_roster());
                }
                DbCommand::ReadCachedRosterWithUsers { result } => {
                    result.send(self.read_cached_roster_with_users());
                }
                DbCommand::CreateChat { chat, result } => {
                    result.send(self.create_chat(chat));
                }
                DbCommand::ReadChat { chat, result } => {
                    result.send(self.read_chat(chat));
                }
                DbCommand::MarkChatAsChatted { chat, result } => {
                    result.send(self.mark_chat_as_chatted(chat));
                }
                DbCommand::UpdateChatCorrespondent {
                    old_chat,
                    new_correspondent,
                    result,
                } => {
                    result.send(self.update_chat_correspondent(old_chat, new_correspondent));
                }
                DbCommand::DeleteChat { chat, result } => {
                    result.send(self.delete_chat(chat));
                }
                DbCommand::ReadChats { result } => {
                    result.send(self.read_chats());
                }
                DbCommand::ReadChatsOrdered { result } => {
                    result.send(self.read_chats_ordered());
                }
                DbCommand::ReadChatsOrderedWithLatestMessages { result } => {
                    result.send(self.read_chats_ordered_with_latest_messages());
                }
                DbCommand::ReadChatsOrderedWithLatestMessagesAndUsers { result } => {
                    result.send(self.read_chats_ordered_with_latest_messages_and_users());
                }
                DbCommand::CreateMessage {
                    message,
                    chat,
                    from,
                    result,
                } => {
                    result.send(self.create_message(message, chat, from));
                }
                DbCommand::UpsertChatAndUser { chat, result } => {
                    result.send(self.upsert_chat_and_user(&chat));
                }
                DbCommand::CreateMessageWithUserResource {
                    message,
                    chat,
                    from,
                    result,
                } => {
                    result.send(self.create_message_with_user_resource(message, chat, from));
                }
                DbCommand::UpdateMessageDelivery {
                    message,
                    delivery,
                    result,
                } => {
                    result.send(self.update_message_delivery(message, delivery));
                }
                DbCommand::DeleteMessage { message, result } => {
                    result.send(self.delete_message(message));
                }
                DbCommand::ReadMessage { message, result } => {
                    result.send(self.read_message(message));
                }
                DbCommand::ReadMessageHistory { chat, result } => {
                    result.send(self.read_message_history(chat));
                }
                DbCommand::ReadMessageHistoryWithUsers { chat, result } => {
                    result.send(self.read_message_history_with_users(chat));
                }
                DbCommand::ReadCachedStatus { result } => {
                    result.send(self.read_cached_status());
                }
                DbCommand::UpsertCachedStatus { status, result } => {
                    result.send(self.upsert_cached_status(status));
                }
                DbCommand::DeleteCachedStatus { result } => {
                    result.send(self.delete_cached_status());
                }
                DbCommand::ReadCapabilities { node, result } => {
                    result.send(self.read_capabilities(node));
                }
                DbCommand::UpsertCapabilities {
                    node,
                    capabilities,
                    result,
                } => {
                    result.send(self.upsert_capabilities(node, capabilities));
                }
            }
        }
    }

    pub(crate) fn create_user(&self, user: User) -> Result<(), Error> {
        {
            self.db.execute(
                "insert into users ( jid, nick, avatar ) values ( ?1, ?2, ?3 )",
                (user.jid, user.nick, user.avatar),
            )?;
        }
        Ok(())
    }

    // TODO: this is not a 'read' user
    pub(crate) fn read_user(&self, user: JID) -> Result<User, Error> {
        let db = &self.db;
        let user_opt = db
            .query_row(
                "select jid, nick, avatar from users where jid = ?1",
                [&user],
                |row| {
                    Ok(User {
                        jid: row.get(0)?,
                        nick: row.get(1)?,
                        avatar: row.get(2)?,
                    })
                },
            )
            .optional()?;
        match user_opt {
            Some(user) => Ok(user),
            None => {
                db.execute("insert into users ( jid ) values ( ?1 )", [&user])?;
                Ok(User {
                    jid: user,
                    nick: None,
                    avatar: None,
                })
            }
        }
    }

    /// returns whether or not the nickname was updated
    pub(crate) fn delete_user_nick(&self, jid: JID) -> Result<bool, Error> {
        let rows_affected;
        {
            rows_affected = self.db.execute("insert into users (jid, nick) values (?1, ?2) on conflict do update set nick = ?3 where nick is not ?4", (jid, None::<String>, None::<String>, None::<String>))?;
        }
        if rows_affected > 0 {
            Ok(true)
        } else {
            Ok(false)
        }
    }

    /// returns whether or not the nickname was updated
    pub(crate) fn upsert_user_nick(&self, jid: JID, nick: String) -> Result<bool, Error> {
        let rows_affected;
        {
            rows_affected = self.db.execute("insert into users (jid, nick) values (?1, ?2) on conflict do update set nick = ?3 where nick is not ?4", (jid, &nick, &nick, &nick))?;
        }
        if rows_affected > 0 {
            Ok(true)
        } else {
            Ok(false)
        }
    }

    /// returns whether or not the avatar was updated, and the file to delete if there existed an old avatar
    pub(crate) fn delete_user_avatar(&self, jid: JID) -> Result<(bool, Option<String>), Error> {
        let (old_avatar, rows_affected): (Option<String>, _);
        {
            let db = &self.db;
            old_avatar = db
                .query_row("select avatar from users where jid = ?1", [&jid], |row| {
                    Ok(row.get(0)?)
                })
                .optional()?;
            rows_affected = db.execute("insert into users (jid, avatar) values (?1, ?2) on conflict do update set avatar = ?3 where avatar is not ?4", (jid, None::<String>, None::<String>, None::<String>))?;
        }
        if rows_affected > 0 {
            Ok((true, old_avatar))
        } else {
            Ok((false, old_avatar))
        }
    }

    /// returns whether or not the avatar was updated, and the file to delete if there existed an old avatar
    pub(crate) fn upsert_user_avatar(
        &self,
        jid: JID,
        avatar: String,
    ) -> Result<(bool, Option<String>), Error> {
        let (old_avatar, rows_affected): (Option<String>, _);
        {
            let db = &self.db;
            old_avatar = db
                .query_row("select avatar from users where jid = ?1", [&jid], |row| {
                    let avatar: Option<String> = row.get(0)?;
                    Ok(avatar)
                })
                .optional()?
                .unwrap_or_default();
            rows_affected = db.execute("insert into users (jid, avatar) values (?1, ?2) on conflict do update set avatar = ?3 where avatar is not ?4", (jid, &avatar, &avatar, &avatar))?;
        }
        if rows_affected > 0 {
            Ok((true, old_avatar))
        } else {
            Ok((false, old_avatar))
        }
    }

    // TODO: use references everywhere
    pub(crate) fn update_user(&self, user: User) -> Result<(), Error> {
        self.db.execute(
            "update users set nick = ?1, avatar = ?2 where user_jid = ?1",
            (&user.nick, &user.avatar, &user.jid),
        )?;
        Ok(())
    }

    // TODO: should this be allowed? messages need to reference users. should probably only allow delete if every other thing referencing it has been deleted, or if you make clear to the user deleting a user will delete all messages associated with them.
    // pub(crate) fn delete_user(&self, user: JID) -> Result<(), Error> {}

    /// does not create the underlying user, if underlying user does not exist, create_user() must be called separately
    pub(crate) fn create_contact(&self, contact: Contact) -> Result<(), Error> {
        let db = &self.db;
        db.execute(
            "insert into roster ( user_jid, name, subscription ) values ( ?1, ?2, ?3 )",
            (&contact.user_jid, &contact.name, contact.subscription),
        )?;
        for group in contact.groups {
            db.execute(
                "insert into groups (group_name) values (?1) on conflict do nothing",
                [&group],
            )?;
            db.execute(
                "insert into groups_roster (group_name, contact_jid) values (?1, ?2)",
                (group, &contact.user_jid),
            )?;
        }
        Ok(())
    }

    pub(crate) fn read_contact(&self, contact: JID) -> Result<Contact, Error> {
        let db = &self.db;
        let mut contact_item = db.query_row(
            "select user_jid, name, subscription from roster where user_jid = ?1",
            [&contact],
            |row| {
                Ok(Contact {
                    user_jid: row.get(0)?,
                    name: row.get(1)?,
                    subscription: row.get(2)?,
                    groups: HashSet::new(),
                })
            },
        )?;
        let groups: Result<HashSet<String>, _> = db
            .prepare("select group_name from groups_roster where contact_jid = ?1")?
            .query_map([&contact], |row| Ok(row.get(0)?))?
            .collect();
        contact_item.groups = groups?;
        Ok(contact_item)
    }

    pub(crate) fn read_contact_opt(&self, contact: &JID) -> Result<Option<Contact>, Error> {
        let db = &self.db;
        let contact_item = db
            .query_row(
                "select user_jid, name, subscription from roster where user_jid = ?1",
                [&contact],
                |row| {
                    Ok(Contact {
                        user_jid: row.get(0)?,
                        name: row.get(1)?,
                        subscription: row.get(2)?,
                        groups: HashSet::new(),
                    })
                },
            )
            .optional()?;
        if let Some(mut contact_item) = contact_item {
            let groups: Result<HashSet<String>, _> = db
                .prepare("select group_name from groups_roster where contact_jid = ?1")?
                .query_map([&contact], |row| Ok(row.get(0)?))?
                .collect();
            contact_item.groups = groups?;
            Ok(Some(contact_item))
        } else {
            Ok(None)
        }
    }

    /// does not update the underlying user, to update user, update_user() must be called separately
    pub(crate) fn update_contact(&self, contact: Contact) -> Result<(), Error> {
        let db = &self.db;
        db.execute(
            "update roster set name = ?1, subscription = ?2 where user_jid = ?3",
            (&contact.name, &contact.subscription, &contact.user_jid),
        )?;
        db.execute(
            "delete from groups_roster where contact_jid = ?1",
            [&contact.user_jid],
        )?;
        for group in contact.groups {
            db.execute(
                "insert into groups (group_name) values (?1) on conflict do nothing",
                [&group],
            )?;
            db.execute(
                "insert into groups_roster (group_name, contact_jid), values (?1, ?2)",
                (&group, &contact.user_jid),
            )?;
        }
        // TODO: delete orphaned groups from groups table, users etc.
        Ok(())
    }

    pub(crate) fn upsert_contact(&self, contact: Contact) -> Result<(), Error> {
        let db = &self.db;
        db.execute(
            "insert into users (jid) values (?1) on conflict do nothing",
            [&contact.user_jid],
        )?;
        db.execute(
            "insert into roster ( user_jid, name, subscription ) values ( ?1, ?2, ?3 ) on conflict do update set name = ?4, subscription = ?5",
            (&contact.user_jid, &contact.name, &contact.subscription, &contact.name, &contact.subscription),
        )?;
        db.execute(
            "delete from groups_roster where contact_jid = ?1",
            [&contact.user_jid],
        )?;
        for group in contact.groups {
            db.execute(
                "insert into groups (group_name) values (?1) on conflict do nothing",
                [&group],
            )?;
            db.execute(
                "insert into groups_roster (group_name, contact_jid) values (?1, ?2)",
                (group, &contact.user_jid),
            )?;
        }
        Ok(())
    }

    pub(crate) fn delete_contact(&self, contact: JID) -> Result<(), Error> {
        self.db
            .execute("delete from roster where user_jid = ?1", [&contact])?;
        Ok(())
    }

    pub(crate) fn replace_cached_roster(&self, roster: Vec<Contact>) -> Result<(), Error> {
        {
            self.db.execute("delete from roster", [])?;
        }
        for contact in roster {
            self.upsert_contact(contact)?;
        }
        Ok(())
    }

    pub(crate) fn read_cached_roster(&self) -> Result<Vec<Contact>, Error> {
        let db = &self.db;
        let mut roster: Vec<_> = db
            .prepare("select user_jid, name, subscription from roster")?
            .query_map([], |row| {
                Ok(Contact {
                    user_jid: row.get(0)?,
                    name: row.get(1)?,
                    subscription: row.get(2)?,
                    groups: HashSet::new(),
                })
            })?
            .collect::<Result<Vec<_>, _>>()?;
        for contact in &mut roster {
            let groups: Result<HashSet<String>, _> = db
                .prepare("select group_name from groups_roster where contact_jid = ?1")?
                .query_map([&contact.user_jid], |row| Ok(row.get(0)?))?
                .collect();
            contact.groups = groups?;
        }
        Ok(roster)
    }

    pub(crate) fn read_cached_roster_with_users(&self) -> Result<Vec<(Contact, User)>, Error> {
        let db = &self.db;
        let mut roster: Vec<(Contact, User)> = db.prepare("select user_jid, name, subscription, jid, nick, avatar from roster join users on jid = user_jid")?.query_map([], |row| {
            Ok((
                Contact {
                    user_jid: row.get(0)?,
                    name: row.get(1)?,
                    subscription: row.get(2)?,
                    groups: HashSet::new(),
                },
                User {
                    jid: row.get(3)?,
                    nick: row.get(4)?,
                    avatar: row.get(5)?,
                }
            ))
        })?.collect::<Result<Vec<_>, _>>()?;
        for (contact, _) in &mut roster {
            let groups: Result<HashSet<String>, _> = db
                .prepare("select group_name from groups_roster where contact_jid = ?1")?
                .query_map([&contact.user_jid], |row| Ok(row.get(0)?))?
                .collect();
            contact.groups = groups?;
        }
        Ok(roster)
    }

    pub(crate) fn create_chat(&self, chat: Chat) -> Result<(), Error> {
        let id = Uuid::new_v4();
        let jid = chat.correspondent();
        self.db.execute(
            "insert into chats (id, correspondent, have_chatted) values (?1, ?2, ?3)",
            (id, jid, chat.have_chatted),
        )?;
        Ok(())
    }

    // TODO: what happens if a correspondent changes from a user to a contact? maybe just have correspondent be a user, then have the client make the user show up as a contact in ui if they are in the loaded roster.

    pub(crate) fn read_chat(&self, chat: JID) -> Result<Chat, Error> {
        let chat = self.db.query_row(
            "select correspondent, have_chatted from chats where correspondent = ?1",
            [&chat],
            |row| {
                Ok(Chat {
                    correspondent: row.get(0)?,
                    have_chatted: row.get(1)?,
                })
            },
        )?;
        Ok(chat)
    }

    pub(crate) fn mark_chat_as_chatted(&self, chat: JID) -> Result<(), Error> {
        self.db.execute(
            "update chats set have_chatted = true where correspondent = ?1",
            [chat],
        )?;
        Ok(())
    }

    pub(crate) fn update_chat_correspondent(
        &self,
        old_chat: Chat,
        new_correspondent: JID,
    ) -> Result<Chat, Error> {
        let new_jid = &new_correspondent;
        let old_jid = old_chat.correspondent();
        let chat = self.db.query_row(
            "update chats set correspondent = ?1 where correspondent = ?2 returning correspondent, have_chatted",
            [new_jid, old_jid],
            |row| Ok(Chat {
                correspondent: row.get(0)?,
                have_chatted: row.get(1)?,
            })
        )?;
        Ok(chat)
    }

    // pub(crate) fn update_chat

    pub(crate) fn delete_chat(&self, chat: JID) -> Result<(), Error> {
        self.db
            .execute("delete from chats where correspondent = ?1", [chat])?;
        Ok(())
    }

    /// TODO: sorting and filtering (for now there is no sorting)
    pub(crate) fn read_chats(&self) -> Result<Vec<Chat>, Error> {
        let chats = self
            .db
            .prepare("select correspondent, have_chatted from chats")?
            .query_map([], |row| {
                Ok(Chat {
                    correspondent: row.get(0)?,
                    have_chatted: row.get(1)?,
                })
            })?
            .collect::<Result<Vec<_>, _>>()?;
        Ok(chats)
    }

    /// chats ordered by date of last message
    // greatest-n-per-group
    pub(crate) fn read_chats_ordered(&self) -> Result<Vec<Chat>, Error> {
        let chats = self
            .db
            .prepare("select c.correspondent, c.have_chatted, m.* from chats c join (select chat_id, max(timestamp) max_timestamp from messages group by chat_id) max_timestamps on c.id = max_timestamps.chat_id join messages m on max_timestamps.chat_id = m.chat_id and max_timestamps.max_timestamp = m.timestamp order by m.timestamp desc")?
            .query_map([], |row| {
                Ok(Chat {
                    correspondent: row.get(0)?,
                    have_chatted: row.get(1)?,
                })
            })?
            .collect::<Result<Vec<_>, _>>()?;
        Ok(chats)
    }

    /// chats ordered by date of last message
    // greatest-n-per-group
    pub(crate) fn read_chats_ordered_with_latest_messages(
        &self,
    ) -> Result<Vec<(Chat, Message)>, Error> {
        let chats = self
            .db
            .prepare("select c.correspondent, c.have_chatted, m.id, m.from_jid, m.delivery, m.timestamp, m.body from chats c join (select chat_id, max(timestamp) max_timestamp from messages group by chat_id) max_timestamps on c.id = max_timestamps.chat_id join messages m on max_timestamps.chat_id = m.chat_id and max_timestamps.max_timestamp = m.timestamp order by m.timestamp desc")?
            .query_map([], |row| {
                Ok((
                    Chat {
                        correspondent: row.get(0)?,
                        have_chatted: row.get(1)?,
                    },
                    Message {
                        id: row.get(2)?,
                        from: row.get(3)?,
                        delivery: row.get(4)?,
                        timestamp: row.get(5)?,
                        body: Body {
                            body: row.get(6)?,
                        },
                    }
                ))
            })?
            .collect::<Result<Vec<_>, _>>()?;
        Ok(chats)
    }

    /// chats ordered by date of last message
    // greatest-n-per-group
    pub(crate) fn read_chats_ordered_with_latest_messages_and_users(
        &self,
    ) -> Result<Vec<((Chat, User), (Message, User))>, Error> {
        let chats = self
            .db
            .prepare("select c.id as chat_id, c.correspondent as chat_correspondent, c.have_chatted as chat_have_chatted, m.id as message_id, m.body as message_body, m.delivery as message_delivery, m.timestamp as message_timestamp, m.from_jid as message_from_jid, cu.jid as chat_user_jid, cu.nick as chat_user_nick, cu.avatar as chat_user_avatar, mu.jid as message_user_jid, mu.nick as message_user_nick, mu.avatar as message_user_avatar from chats c join (select chat_id, max(timestamp) max_timestamp from messages group by chat_id) max_timestamps on c.id = max_timestamps.chat_id join messages m on max_timestamps.chat_id = m.chat_id and max_timestamps.max_timestamp = m.timestamp join users as cu on cu.jid = c.correspondent join users as mu on mu.jid = m.from_jid order by m.timestamp desc")?
            .query_map([], |row| {
                Ok((
                    (
                        Chat {
                            correspondent: row.get("chat_correspondent")?,
                            have_chatted: row.get("chat_have_chatted")?,
                        },
                        User {
                            jid: row.get("chat_user_jid")?,
                            nick: row.get("chat_user_nick")?,
                            avatar: row.get("chat_user_avatar")?,
                        }
                    ),
                    (
                        Message {
                            id: row.get("message_id")?,
                            from: row.get("message_from_jid")?,
                            delivery: row.get("message_delivery")?,
                            timestamp: row.get("message_timestamp")?,
                            body: Body {
                                body: row.get("message_body")?,
                            },
                        },
                        User {
                            jid: row.get("message_user_jid")?,
                            nick: row.get("message_user_nick")?,
                            avatar: row.get("message_user_avatar")?,
                        }
                    ),
                ))
            })?
            .collect::<Result<Vec<_>, _>>()?;
        Ok(chats)
    }

    #[tracing::instrument]
    fn read_chat_id(&self, chat: JID) -> Result<Uuid, Error> {
        let chat_id = self.db.query_row(
            "select id from chats where correspondent = ?1",
            [chat],
            |row| Ok(row.get(0)?),
        )?;
        Ok(chat_id)
    }

    fn read_chat_id_opt(&self, chat: JID) -> Result<Option<Uuid>, Error> {
        let chat_id = self
            .db
            .query_row(
                "select id from chats where correspondent = ?1",
                [chat],
                |row| Ok(row.get(0)?),
            )
            .optional()?;
        Ok(chat_id)
    }

    /// if the chat doesn't already exist, it must be created by calling create_chat() before running this function.
    #[tracing::instrument]
    pub(crate) fn create_message(
        &self,
        message: Message,
        chat: JID,
        from: JID,
    ) -> Result<(), Error> {
        let from_jid = from.as_bare();
        let chat_id = self.read_chat_id(chat)?;
        tracing::debug!("creating message");
        self.db.execute("insert into messages (id, body, chat_id, from_jid, from_resource, timestamp, delivery) values (?1, ?2, ?3, ?4, ?5, ?6, ?7)", (&message.id, &message.body.body, &chat_id, &from_jid, &from.resourcepart, &message.timestamp, &message.delivery))?;
        Ok(())
    }

    pub(crate) fn upsert_chat_and_user(&self, chat: &JID) -> Result<bool, Error> {
        let bare_chat = chat.as_bare();
        let db = &self.db;
        db.execute(
            "insert into users (jid) values (?1) on conflict do nothing",
            [&bare_chat],
        )?;
        let id = Uuid::new_v4();
        db.execute("insert into chats (id, correspondent, have_chatted) values (?1, ?2, ?3) on conflict do nothing", (id, &bare_chat, false))?;
        let chat = db.query_row(
            "select correspondent, have_chatted from chats where correspondent = ?1",
            [&bare_chat],
            |row| {
                Ok(Chat {
                    correspondent: row.get(0)?,
                    have_chatted: row.get(1)?,
                })
            },
        )?;
        Ok(chat.have_chatted)
    }

    /// create direct message from incoming. MUST upsert chat and user
    #[tracing::instrument]
    pub(crate) fn create_message_with_user_resource(
        &self,
        message: Message,
        // TODO: enforce two kinds of jid. bare and full
        // must be bare jid
        chat: JID,
        // full jid
        from: JID,
    ) -> Result<(), Error> {
        let from_jid = from.as_bare();
        let chat = chat.as_bare();
        tracing::debug!("creating resource");
        if let Some(resource) = &from.resourcepart {
            self.db.execute(
                "insert into resources (bare_jid, resource) values (?1, ?2) on conflict do nothing",
                (&from_jid, resource),
            )?;
        }
        self.create_message(message, chat, from)?;
        Ok(())
    }

    pub(crate) fn update_message_delivery(
        &self,
        message: Uuid,
        delivery: Delivery,
    ) -> Result<(), Error> {
        self.db.execute(
            "update messages set delivery = ?1 where id = ?2",
            (delivery, message),
        )?;
        Ok(())
    }

    // pub(crate) fn read_message(&self, message: Uuid) -> Result<Message, Error> {
    //     Ok(Message {
    //         id: Uuid,
    //         from: todo!(),
    //         delivery: todo!(),
    //         timestamp: todo!(),
    //         body: todo!(),
    //     })
    // }

    // TODO: message updates/edits pub(crate) fn update_message(&self, message: Message) -> Result<(), Error> {}

    pub(crate) fn delete_message(&self, message: Uuid) -> Result<(), Error> {
        self.db
            .execute("delete from messages where id = ?1", [message])?;
        Ok(())
    }

    pub(crate) fn read_message(&self, message: Uuid) -> Result<Message, Error> {
        let message = self.db.query_row(
            "select id, from_jid, delivery, timestamp, body from messages where id = ?1",
            [&message],
            |row| {
                Ok(Message {
                    id: row.get(0)?,
                    // TODO: full from
                    from: row.get(1)?,
                    delivery: row.get(2)?,
                    timestamp: row.get(3)?,
                    body: Body { body: row.get(4)? },
                })
            },
        )?;
        Ok(message)
    }

    // TODO: paging
    pub(crate) fn read_message_history(&self, chat: JID) -> Result<Vec<Message>, Error> {
        let chat_id = self.read_chat_id(chat)?;
        let messages = self
            .db
            .prepare(
                "select id, from_jid, delivery, timestamp, body from messages where chat_id = ?1",
            )?
            .query_map([chat_id], |row| {
                Ok(Message {
                    id: row.get(0)?,
                    // TODO: full from
                    from: row.get(1)?,
                    delivery: row.get(2)?,
                    timestamp: row.get(3)?,
                    body: Body { body: row.get(4)? },
                })
            })?
            .collect::<Result<Vec<_>, _>>()?;
        Ok(messages)
    }

    pub(crate) fn read_message_history_with_users(
        &self,
        chat: JID,
    ) -> Result<Vec<(Message, User)>, Error> {
        let chat_id = self.read_chat_id(chat)?;
        let messages = self
            .db
            .prepare(
                "select id, from_jid, delivery, timestamp, body, jid, nick, avatar from messages join users on jid = from_jid where chat_id = ? order by timestamp asc",
            )?
            .query_map([chat_id], |row| {
                Ok((
                    Message {
                        id: row.get(0)?,
                        // TODO: full from
                        from: row.get(1)?,
                        delivery: row.get(2)?,
                        timestamp: row.get(3)?,
                        body: Body { body: row.get(4)? },
                    },
                    User {
                        jid: row.get(5)?,
                        nick: row.get(6)?,
                        avatar: row.get(7)?,
                    }
                ))
            })?
            .collect::<Result<Vec<_>, _>>()?;
        Ok(messages)
    }

    pub(crate) fn read_cached_status(&self) -> Result<Online, Error> {
        let status = self.db.query_row(
            "select show, message from cached_status where id = 0",
            [],
            |row| {
                Ok(Online {
                    show: row.get(0)?,
                    status: row.get(1)?,
                    priority: None,
                })
            },
        )?;
        Ok(status)
    }

    pub(crate) fn upsert_cached_status(&self, status: Online) -> Result<(), Error> {
        self.db.execute("insert into cached_status (id, show, message) values (0, ?1, ?2) on conflict do update set show = ?3, message = ?4", (status.show, &status.status, status.show, &status.status))?;
        Ok(())
    }

    pub(crate) fn delete_cached_status(&self) -> Result<(), Error> {
        self.db.execute(
            "update cached_status set show = null, message = null where id = 0",
            [],
        )?;
        Ok(())
    }

    pub(crate) fn read_capabilities(&self, node: String) -> Result<String, Error> {
        let capabilities = self.db.query_row(
            "select capabilities from capability_hash_nodes where node = ?1",
            [node],
            |row| Ok(row.get(0)?),
        )?;
        Ok(capabilities)
    }

    pub(crate) fn upsert_capabilities(
        &self,
        node: String,
        capabilities: String,
    ) -> Result<(), Error> {
        let now = Utc::now();
        self.db.execute("insert into capability_hash_nodes (node, timestamp, capabilities) values (?1, ?2, ?3) on conflict do update set timestamp = ?, capabilities = ?", (node, now, &capabilities, now, &capabilities))?;
        Ok(())
    }
}