diff options
Diffstat (limited to '')
-rw-r--r-- | filamento/src/db.rs | 234 | ||||
-rw-r--r-- | filamento/src/files/opfs.rs | 38 |
2 files changed, 154 insertions, 118 deletions
diff --git a/filamento/src/db.rs b/filamento/src/db.rs index b45e471..d4edb4c 100644 --- a/filamento/src/db.rs +++ b/filamento/src/db.rs @@ -1,3 +1,4 @@ +use core::fmt::Display; use std::{collections::HashSet, ops::Deref, path::Path, sync::Arc}; use chrono::{DateTime, Utc}; @@ -21,11 +22,11 @@ use crate::{ #[derive(Clone, Debug)] pub struct Db { - sender: mpsc::Sender<DbCommand>, + sender: mpsc::UnboundedSender<DbCommand>, } impl Deref for Db { - type Target = mpsc::Sender<DbCommand>; + type Target = mpsc::UnboundedSender<DbCommand>; fn deref(&self) -> &Self::Target { &self.sender @@ -34,10 +35,24 @@ impl Deref for Db { #[derive(Debug)] pub struct DbActor { - receiver: mpsc::Receiver<DbCommand>, + receiver: mpsc::UnboundedReceiver<DbCommand>, db: Connection, } +macro_rules! impl_db_sends { + ($($command:ident => $name:ident($($arg:ident: $arg_t:ty),*) -> $ret:ty);*) => { + $( + pub(crate) async fn $name(&self, $($arg: $arg_t),*) -> Result<$ret, Error> { + let (result, recv) = oneshot::channel(); + let command = DbCommand::$command { $($arg,)* result }; + let _ = self.sender.send(command); + let result = recv.await?; + result + } + )* + } +} + impl Db { #[cfg(not(target_arch = "wasm32"))] pub async fn create_connect_and_migrate( @@ -99,7 +114,7 @@ impl Db { ) -> Result<Self, DatabaseOpenError> { use tokio_with_wasm::spawn_local; - let (sender, receiver) = mpsc::channel(20); + let (sender, receiver) = mpsc::unbounded_channel(); let (result_send, result_recv) = oneshot::channel(); spawn_blocking(move || { spawn_local(async move { @@ -116,7 +131,7 @@ impl Db { match result { Ok(a) => { result_send.send(Ok(())); - a.run() + a.run().await } Err(e) => { result_send.send(Err(e)); @@ -135,14 +150,15 @@ impl Db { #[cfg(target_arch = "wasm32")] pub async fn create_connect_and_migrate_memory() -> Result<Self, DatabaseOpenError> { - let (sender, receiver) = mpsc::channel(20); + let (sender, receiver) = mpsc::unbounded_channel(); let (result_send, result_recv) = oneshot::channel(); spawn_blocking(move || { let result = DbActor::new_memory(receiver); match result { Ok(a) => { result_send.send(Ok(())); - a.run() + tokio_with_wasm::spawn_local(async { a.run().await }); + // a.run() } Err(e) => { result_send.send(Err(e)); @@ -161,7 +177,7 @@ impl Db { pub(crate) async fn create_user(&self, user: User) -> Result<(), Error> { let (result, recv) = oneshot::channel(); let command = DbCommand::CreateUser { user, result }; - self.sender.send(command).await?; + self.sender.send(command); let result = recv.await?; result } @@ -170,7 +186,7 @@ impl Db { pub(crate) async fn read_user(&self, user: JID) -> Result<User, Error> { let (result, recv) = oneshot::channel(); let command = DbCommand::ReadUser { user, result }; - self.sender.send(command).await?; + self.sender.send(command); let result = recv.await?; result } @@ -179,7 +195,7 @@ impl Db { pub(crate) async fn delete_user_nick(&self, jid: JID) -> Result<bool, Error> { let (result, recv) = oneshot::channel(); let command = DbCommand::DeleteUserNick { jid, result }; - self.sender.send(command).await?; + self.sender.send(command); let result = recv.await?; result } @@ -188,7 +204,7 @@ impl Db { pub(crate) async fn upsert_user_nick(&self, jid: JID, nick: String) -> Result<bool, Error> { let (result, recv) = oneshot::channel(); let command = DbCommand::UpsertUserNick { jid, nick, result }; - self.sender.send(command).await?; + self.sender.send(command); let result = recv.await?; result } @@ -200,7 +216,7 @@ impl Db { ) -> Result<(bool, Option<String>), Error> { let (result, recv) = oneshot::channel(); let command = DbCommand::DeleteUserAvatar { jid, result }; - self.sender.send(command).await?; + self.sender.send(command); let result = recv.await?; result } @@ -217,7 +233,7 @@ impl Db { avatar, result, }; - self.sender.send(command).await?; + self.sender.send(command); let result = recv.await?; result } @@ -226,7 +242,7 @@ impl Db { pub(crate) async fn update_user(&self, user: User) -> Result<(), Error> { let (result, recv) = oneshot::channel(); let command = DbCommand::UpdateUser { user, result }; - self.sender.send(command).await?; + self.sender.send(command); let result = recv.await?; result } @@ -238,7 +254,7 @@ impl Db { pub(crate) async fn create_contact(&self, contact: Contact) -> Result<(), Error> { let (result, recv) = oneshot::channel(); let command = DbCommand::CreateContact { contact, result }; - self.sender.send(command).await?; + self.sender.send(command); let result = recv.await?; result } @@ -246,7 +262,7 @@ impl Db { pub(crate) async fn read_contact(&self, contact: JID) -> Result<Contact, Error> { let (result, recv) = oneshot::channel(); let command = DbCommand::ReadContact { contact, result }; - self.sender.send(command).await?; + self.sender.send(command); let result = recv.await?; result } @@ -254,7 +270,7 @@ impl Db { pub(crate) async fn read_contact_opt(&self, contact: JID) -> Result<Option<Contact>, Error> { let (result, recv) = oneshot::channel(); let command = DbCommand::ReadContactOpt { contact, result }; - self.sender.send(command).await?; + self.sender.send(command); let result = recv.await?; result } @@ -263,7 +279,7 @@ impl Db { pub(crate) async fn update_contact(&self, contact: Contact) -> Result<(), Error> { let (result, recv) = oneshot::channel(); let command = DbCommand::UpdateContact { contact, result }; - self.sender.send(command).await?; + self.sender.send(command); let result = recv.await?; result } @@ -271,7 +287,7 @@ impl Db { pub(crate) async fn upsert_contact(&self, contact: Contact) -> Result<(), Error> { let (result, recv) = oneshot::channel(); let command = DbCommand::UpsertContact { contact, result }; - self.sender.send(command).await?; + self.sender.send(command); let result = recv.await?; result } @@ -279,7 +295,7 @@ impl Db { pub(crate) async fn delete_contact(&self, contact: JID) -> Result<(), Error> { let (result, recv) = oneshot::channel(); let command = DbCommand::DeleteContact { contact, result }; - self.sender.send(command).await?; + self.sender.send(command); let result = recv.await?; result } @@ -287,7 +303,7 @@ impl Db { pub(crate) async fn replace_cached_roster(&self, roster: Vec<Contact>) -> Result<(), Error> { let (result, recv) = oneshot::channel(); let command = DbCommand::ReplaceCachedRoster { roster, result }; - self.sender.send(command).await?; + self.sender.send(command); let result = recv.await?; result } @@ -295,7 +311,7 @@ impl Db { pub(crate) async fn read_cached_roster(&self) -> Result<Vec<Contact>, Error> { let (result, recv) = oneshot::channel(); let command = DbCommand::ReadCachedRoster { result }; - self.sender.send(command).await?; + self.sender.send(command); let result = recv.await?; result } @@ -305,7 +321,7 @@ impl Db { ) -> Result<Vec<(Contact, User)>, Error> { let (result, recv) = oneshot::channel(); let command = DbCommand::ReadCachedRosterWithUsers { result }; - self.sender.send(command).await?; + self.sender.send(command); let result = recv.await?; result } @@ -313,7 +329,7 @@ impl Db { pub(crate) async fn create_chat(&self, chat: Chat) -> Result<(), Error> { let (result, recv) = oneshot::channel(); let command = DbCommand::CreateChat { chat, result }; - self.sender.send(command).await?; + self.sender.send(command); let result = recv.await?; result } @@ -323,7 +339,7 @@ impl Db { pub(crate) async fn read_chat(&self, chat: JID) -> Result<Chat, Error> { let (result, recv) = oneshot::channel(); let command = DbCommand::ReadChat { chat, result }; - self.sender.send(command).await?; + self.sender.send(command); let result = recv.await?; result } @@ -331,7 +347,7 @@ impl Db { pub(crate) async fn mark_chat_as_chatted(&self, chat: JID) -> Result<(), Error> { let (result, recv) = oneshot::channel(); let command = DbCommand::MarkChatAsChatted { chat, result }; - self.sender.send(command).await?; + self.sender.send(command); let result = recv.await?; result } @@ -347,7 +363,7 @@ impl Db { new_correspondent, result, }; - self.sender.send(command).await?; + self.sender.send(command); let result = recv.await?; result } @@ -357,7 +373,7 @@ impl Db { pub(crate) async fn delete_chat(&self, chat: JID) -> Result<(), Error> { let (result, recv) = oneshot::channel(); let command = DbCommand::DeleteChat { chat, result }; - self.sender.send(command).await?; + self.sender.send(command); let result = recv.await?; result } @@ -366,7 +382,7 @@ impl Db { pub(crate) async fn read_chats(&self) -> Result<Vec<Chat>, Error> { let (result, recv) = oneshot::channel(); let command = DbCommand::ReadChats { result }; - self.sender.send(command).await?; + self.sender.send(command); let result = recv.await?; result } @@ -376,7 +392,7 @@ impl Db { pub(crate) async fn read_chats_ordered(&self) -> Result<Vec<Chat>, Error> { let (result, recv) = oneshot::channel(); let command = DbCommand::ReadChatsOrdered { result }; - self.sender.send(command).await?; + self.sender.send(command); let result = recv.await?; result } @@ -388,7 +404,7 @@ impl Db { ) -> Result<Vec<(Chat, Message)>, Error> { let (result, recv) = oneshot::channel(); let command = DbCommand::ReadChatsOrderedWithLatestMessages { result }; - self.sender.send(command).await?; + self.sender.send(command); let result = recv.await?; result } @@ -400,7 +416,7 @@ impl Db { ) -> Result<Vec<((Chat, User), (Message, User))>, Error> { let (result, recv) = oneshot::channel(); let command = DbCommand::ReadChatsOrderedWithLatestMessagesAndUsers { result }; - self.sender.send(command).await?; + self.sender.send(command); let result = recv.await?; result } @@ -420,7 +436,7 @@ impl Db { from, result, }; - self.sender.send(command).await?; + self.sender.send(command); let result = recv.await?; result } @@ -428,7 +444,7 @@ impl Db { pub(crate) async fn upsert_chat_and_user(&self, chat: JID) -> Result<bool, Error> { let (result, recv) = oneshot::channel(); let command = DbCommand::UpsertChatAndUser { chat, result }; - self.sender.send(command).await?; + self.sender.send(command); let result = recv.await?; result } @@ -444,6 +460,7 @@ impl Db { // full jid from: JID, ) -> Result<(), Error> { + tracing::info!("MSGDEBUG create_message_with_user_resource exists"); let (result, recv) = oneshot::channel(); let command = DbCommand::CreateMessageWithUserResource { message, @@ -451,7 +468,7 @@ impl Db { from, result, }; - self.sender.send(command).await?; + self.sender.send(command); let result = recv.await?; result } @@ -467,7 +484,7 @@ impl Db { delivery, result, }; - self.sender.send(command).await?; + self.sender.send(command); let result = recv.await?; result } @@ -484,73 +501,17 @@ impl Db { // TODO: message updates/edits pub(crate) async fn update_message(&self, message: Message) -> Result<(), Error> {} - pub(crate) async fn delete_message(&self, message: Uuid) -> Result<(), Error> { - let (result, recv) = oneshot::channel(); - let command = DbCommand::DeleteMessage { message, result }; - self.sender.send(command).await?; - let result = recv.await?; - result - } - - pub(crate) async fn read_message(&self, message: Uuid) -> Result<Message, Error> { - let (result, recv) = oneshot::channel(); - let command = DbCommand::ReadMessage { message, result }; - self.sender.send(command).await?; - let result = recv.await?; - result - } - - // TODO: paging - pub(crate) async fn read_message_history(&self, chat: JID) -> Result<Vec<Message>, Error> { - let (result, recv) = oneshot::channel(); - let command = DbCommand::ReadMessageHistory { chat, result }; - self.sender.send(command).await?; - let result = recv.await?; - result - } - - pub(crate) async fn read_message_history_with_users( - &self, - chat: JID, - ) -> Result<Vec<(Message, User)>, Error> { - let (result, recv) = oneshot::channel(); - let command = DbCommand::ReadMessageHistoryWithUsers { chat, result }; - self.sender.send(command).await?; - let result = recv.await?; - result - } - - pub(crate) async fn read_cached_status(&self) -> Result<Online, Error> { - let (result, recv) = oneshot::channel(); - let command = DbCommand::ReadCachedStatus { result }; - self.sender.send(command).await?; - let result = recv.await?; - result - } - - pub(crate) async fn upsert_cached_status(&self, status: Online) -> Result<(), Error> { - let (result, recv) = oneshot::channel(); - let command = DbCommand::UpsertCachedStatus { status, result }; - self.sender.send(command).await?; - let result = recv.await?; - result - } - - pub(crate) async fn delete_cached_status(&self) -> Result<(), Error> { - let (result, recv) = oneshot::channel(); - let command = DbCommand::DeleteCachedStatus { result }; - self.sender.send(command).await?; - let result = recv.await?; - result - } - - pub(crate) async fn read_capabilities(&self, node: String) -> Result<String, Error> { - let (result, recv) = oneshot::channel(); - let command = DbCommand::ReadCapabilities { node, result }; - self.sender.send(command).await?; - let result = recv.await?; - result - } + impl_db_sends!( + ReadCapabilities => read_capabilities(node: String) -> String; + DeleteCachedStatus => delete_cached_status() -> (); + UpsertCachedStatus => upsert_cached_status(status: Online) -> (); + ReadCachedStatus => read_cached_status() -> Online; + ReadMessageHistoryWithUsers => read_message_history_with_users(chat: JID) -> Vec<(Message, User)>; + // TODO: paging + ReadMessageHistory => read_message_history(chat: JID) -> Vec<Message>; + ReadMessage => read_message(message: Uuid) -> Message; + DeleteMessage => delete_message(message: Uuid) -> () + ); pub(crate) async fn upsert_capabilities( &self, @@ -563,13 +524,14 @@ impl Db { capabilities, result, }; - self.sender.send(command).await?; + self.sender.send(command); let result = recv.await?; result } } // TODO: i should really just make an actor macro +#[derive(Debug)] pub enum DbCommand { CreateUser { user: User, @@ -734,6 +696,52 @@ pub enum DbCommand { }, } +impl Display for DbCommand { + #[rustfmt::skip] + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.write_str(match self { + DbCommand::CreateUser { user, result } => "CreateUser", + DbCommand::ReadUser { user, result } => "ReadUser", + DbCommand::DeleteUserNick { jid, result } => "DeleteUserNick", + DbCommand::UpsertUserNick { jid, nick, result } => "UpsertUserNick", + DbCommand::DeleteUserAvatar { jid, result } => "DeleteUserAvatar", + DbCommand::UpsertUserAvatar { jid, avatar, result } => "UpsertUserAvatar", + DbCommand::UpdateUser { user, result } => "UpdateUser", + DbCommand::CreateContact { contact, result } => "CreateContact", + DbCommand::ReadContact { contact, result } => "ReadContact", + DbCommand::ReadContactOpt { contact, result } => "ReadContactOpt", + DbCommand::UpdateContact { contact, result } => "UpdateContact", + DbCommand::UpsertContact { contact, result } => "UpsertContact", + DbCommand::DeleteContact { contact, result } => "DeleteContact", + DbCommand::ReplaceCachedRoster { roster, result } => "ReplaceCachedRoster", + DbCommand::ReadCachedRoster { result } => "ReadCachedRoster", + DbCommand::ReadCachedRosterWithUsers { result } => "ReadCachedRosterWithUsers", + DbCommand::CreateChat { chat, result } => "CreateChat", + DbCommand::ReadChat { chat, result } => "ReadChat", + DbCommand::MarkChatAsChatted { chat, result } => "MarkChatAsChatted", + DbCommand::UpdateChatCorrespondent { old_chat, new_correspondent, result } => "UpdateChatCorrespondent", + DbCommand::DeleteChat { chat, result } => "DeleteChat", + DbCommand::ReadChats { result } => "ReadChats", + DbCommand::ReadChatsOrdered { result } => "ReadChatsOrdered", + DbCommand::ReadChatsOrderedWithLatestMessages { result } => "ReadChatsOrderedWithLatestMessages", + DbCommand::ReadChatsOrderedWithLatestMessagesAndUsers { result } => "ReadChatsOrderedWithLatestMessagesAndUsers", + DbCommand::CreateMessage { message, chat, from, result } => "CreateMessage", + DbCommand::UpsertChatAndUser { chat, result } => "UpsertChatAndUser", + DbCommand::CreateMessageWithUserResource { message, chat, from, result } => "CreateMessageWithUserResource", + DbCommand::UpdateMessageDelivery { message, delivery, result } => "UpdateMessageDelivery", + DbCommand::DeleteMessage { message, result } => "DeleteMessage", + DbCommand::ReadMessage { message, result } => "ReadMessage", + DbCommand::ReadMessageHistory { chat, result } => "ReadMessageHistory", + DbCommand::ReadMessageHistoryWithUsers { chat, result } => "ReadMessageHistoryWithUsers", + DbCommand::ReadCachedStatus { result } => "ReadCachedStatus", + DbCommand::UpsertCachedStatus { status, result } => "UpsertCachedStatus", + DbCommand::DeleteCachedStatus { result } => "DeleteCachedStatus", + DbCommand::ReadCapabilities { node, result } => "ReadCapabilities", + DbCommand::UpsertCapabilities { node, capabilities, result } => "UpsertCapabilities", + }) + } +} + impl DbActor { /// must be run in blocking spawn #[cfg(not(target_arch = "wasm32"))] @@ -773,7 +781,9 @@ impl DbActor { /// must be run in blocking spawn #[cfg(target_arch = "wasm32")] - pub fn new_memory(receiver: mpsc::Receiver<DbCommand>) -> Result<Self, DatabaseOpenError> { + pub fn new_memory( + receiver: mpsc::UnboundedReceiver<DbCommand>, + ) -> Result<Self, DatabaseOpenError> { let db = Connection::open("mem.db")?; db.execute_batch(include_str!("../migrations/1.sql"))?; Ok(Self { db, receiver }) @@ -783,15 +793,17 @@ impl DbActor { #[cfg(target_arch = "wasm32")] pub fn new( file_name: impl AsRef<Path>, - receiver: mpsc::Receiver<DbCommand>, + receiver: mpsc::UnboundedReceiver<DbCommand>, ) -> Result<Self, DatabaseOpenError> { let db = Connection::open(file_name)?; db.execute_batch(include_str!("../migrations/1.sql"))?; Ok(Self { db, receiver }) } - pub(crate) fn run(mut self) { - while let Some(cmd) = self.receiver.blocking_recv() { + pub(crate) async fn run(mut self) { + while let Some(cmd) = self.receiver.recv().await { + let cmd_name = cmd.to_string(); + tracing::warn!("command recv: {cmd_name}"); match cmd { DbCommand::CreateUser { user, result } => { result.send(self.create_user(user)); @@ -909,10 +921,14 @@ impl DbActor { result.send(self.read_message(message)); } DbCommand::ReadMessageHistory { chat, result } => { + tracing::warn!("ReadMessageHistory: {chat}"); result.send(self.read_message_history(chat)); + tracing::warn!("ReadMessageHistory: read and sent") } DbCommand::ReadMessageHistoryWithUsers { chat, result } => { + tracing::warn!("ReadMessageHistoryWithUsers: {chat}"); result.send(self.read_message_history_with_users(chat)); + tracing::warn!("ReadMessageHistoryWithUsers: read and sent") } DbCommand::ReadCachedStatus { result } => { result.send(self.read_cached_status()); @@ -934,7 +950,9 @@ impl DbActor { result.send(self.upsert_capabilities(node, capabilities)); } } + tracing::warn!("command finished: {cmd_name}"); } + tracing::error!("command: db actor exited"); } pub(crate) fn create_user(&self, user: User) -> Result<(), Error> { diff --git a/filamento/src/files/opfs.rs b/filamento/src/files/opfs.rs index e040762..fb32c6e 100644 --- a/filamento/src/files/opfs.rs +++ b/filamento/src/files/opfs.rs @@ -1,10 +1,11 @@ use std::path::Path; use thiserror::Error; -use wasm_bindgen::JsValue; +use wasm_bindgen::{JsCast, JsValue}; use wasm_bindgen_futures::JsFuture; use web_sys::{ - window, File, FileSystemDirectoryHandle, FileSystemFileHandle, FileSystemGetDirectoryOptions, FileSystemGetFileOptions, FileSystemWritableFileStream, Url + File, FileSystemDirectoryHandle, FileSystemFileHandle, FileSystemGetDirectoryOptions, + FileSystemGetFileOptions, FileSystemWritableFileStream, Url, js_sys, window, }; use crate::FileStore; @@ -26,7 +27,9 @@ impl FilesOPFS { JsFuture::from(opfs_root.get_directory_handle_with_options(directory, &options)) .await? .into(); - Ok(Self { directory: directory_string }) + Ok(Self { + directory: directory_string, + }) } pub async fn get_src(&self, file_name: impl AsRef<str>) -> Result<String, OPFSError> { @@ -73,12 +76,21 @@ impl FileStore for FilesOPFS { .into(); let options = FileSystemGetFileOptions::new(); options.set_create(true); - let handle: FileSystemFileHandle = JsFuture::from(directory.get_file_handle_with_options(name, &options)) - .await? - .into(); + let handle: FileSystemFileHandle = + JsFuture::from(directory.get_file_handle_with_options(name, &options)) + .await? + .into(); let write_handle: FileSystemWritableFileStream = JsFuture::from(handle.create_writable()).await?.into(); - let write_promise = write_handle.write_with_u8_array(data)?; + + let buffer = js_sys::ArrayBuffer::new(data.len() as u32); + let u8arr = js_sys::Uint8Array::new(&buffer); + for (idx, v) in data.iter().enumerate() { + u8arr.set_index(idx as u32, *v); + } + + let write_promise = write_handle.write_with_js_u8_array(&u8arr).unwrap(); + // let write_promise = write_handle.write_with_u8_array(data)?; let _ = JsFuture::from(write_promise).await?; let _ = JsFuture::from(write_handle.close()).await?; Ok(()) @@ -98,13 +110,19 @@ impl FileStore for FilesOPFS { #[derive(Error, Clone, Debug)] pub enum OPFSError { - #[error("not found")] - NotFound, + #[error("js opfs error: {0}")] + Error(String), } // TODO: better errors impl From<JsValue> for OPFSError { fn from(value: JsValue) -> Self { - Self::NotFound + Self::Error( + value + .dyn_into::<js_sys::Error>() + .ok() + .and_then(|err| err.message().as_string()) + .unwrap_or(String::from("<no string>")), + ) } } |