summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--src/lib.rs309
1 files changed, 224 insertions, 85 deletions
diff --git a/src/lib.rs b/src/lib.rs
index 11ebc54..b0cada7 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -6,7 +6,7 @@ use std::{
ops::{Deref, DerefMut},
rc::Rc,
str::FromStr,
- sync::{atomic::AtomicUsize, Arc},
+ sync::{atomic::AtomicUsize, Arc, RwLock},
thread::sleep,
time::{self, Duration},
};
@@ -30,7 +30,7 @@ use leptos_meta::Stylesheet;
use reactive_stores::Store;
use stylance::import_style;
use thiserror::Error;
-use tokio::sync::{mpsc::Receiver, Mutex};
+use tokio::sync::{mpsc::{self, Receiver}, Mutex};
use tracing::debug;
use uuid::Uuid;
@@ -224,6 +224,78 @@ fn LoginPage(set_app: WriteSignal<AppState>, set_client: RwSignal<Option<(Client
}
}
+pub struct MessageSubscriptions {
+ all: Vec<mpsc::Sender<(JID, MacawMessage)>>,
+ subset: HashMap<JID, Vec<mpsc::Sender<MacawMessage>>>,
+}
+
+impl MessageSubscriptions {
+ pub fn new() -> Self {
+ Self {
+ all: Vec::new(),
+ subset: HashMap::new(),
+ }
+ }
+
+ pub async fn broadcast(
+ &mut self,
+ to: JID,
+ message: MacawMessage,
+ ) {
+ // subscriptions to all
+ let mut index = 0;
+ while let Some(sender) = self.all.get(index) {
+ match sender.send((to.clone(), message.clone())).await {
+ Ok(_) => {
+ index += 1;
+ }
+ Err(_) => {
+ self.all.swap_remove(index);
+ }
+ }
+ }
+
+ // subscriptions to specific chat
+ if let Some(subscribers) = self.subset.get_mut(&to) {
+ let mut index = 0;
+ while let Some(sender) = subscribers.get(index) {
+ match sender.send(message.clone()).await {
+ Ok(_) => {
+ index += 1;
+ }
+ Err(_) => {
+ subscribers.swap_remove(index);
+ }
+ }
+ }
+ if subscribers.is_empty() {
+ self.subset.remove(&to);
+ }
+ }
+ }
+
+ pub fn subscribe_all(
+ &mut self,
+ ) -> Receiver<(JID, MacawMessage)> {
+ let (send, recv) = mpsc::channel(10);
+ self.all.push(send);
+ recv
+ }
+
+ pub fn subscribe_chat(
+ &mut self,
+ chat: JID,
+ ) -> Receiver<MacawMessage> {
+ let (send, recv) = mpsc::channel(10);
+ if let Some(chat_subscribers) = self.subset.get_mut(&chat) {
+ chat_subscribers.push(send);
+ } else {
+ self.subset.insert(chat, vec![send]);
+ }
+ recv
+ }
+}
+
#[component]
fn Macaw(
// TODO: logout
@@ -233,22 +305,15 @@ fn Macaw(
) -> impl IntoView {
provide_context(client);
- let (new_messages, set_new_messages) = signal(None::<(JID, MacawMessage)>);
- provide_context(new_messages);
+ let message_subscriptions = RwSignal::new(MessageSubscriptions::new());
+ provide_context(message_subscriptions);
let messages_store: StateStore<Uuid, Store<Message>> = StateStore::new();
- provide_context(RwSignal::new_local(messages_store));
+ provide_context(messages_store);
let chats_store: StateStore<JID, Store<Chat>> = StateStore::new();
- provide_context(RwSignal::new_local(chats_store));
+ provide_context(chats_store);
let users_store: StateStore<JID, Store<User>> = StateStore::new();
- provide_context(RwSignal::new_local(users_store));
-
- // // here we create a signal in the root that can be consumed
- // // anywhere in the app.
- // let (count, set_count) = signal(0);
- // // we'll pass the setter to specific components,
- // // but provide the count itself to the whole app via context
- // provide_context(count);
+ provide_context(users_store);
OnceResource::new(async move {
while let Some(update) = updates.recv().await {
@@ -259,8 +324,11 @@ fn Macaw(
UpdateMessage::RosterDelete(jid) => {},
UpdateMessage::Presence { from, presence } => {},
UpdateMessage::Message { to, from, message } => {
+ // debug!("before got message");
let new_message = MacawMessage::got_message_and_user(message, from);
- set_new_messages.set(Some((to, new_message)));
+ // debug!("after got message");
+ spawn_local(async move { message_subscriptions.write_untracked().broadcast(to, new_message).await });
+ // debug!("after set message");
},
UpdateMessage::MessageDelivery { id, chat, delivery } => {},
UpdateMessage::SubscriptionRequest(jid) => {},
@@ -274,84 +342,148 @@ fn Macaw(
}
// V has to be an arc signal
-struct StateStore<K, V> {
- store: Rc<RefCell<HashMap<K, (V, usize)>>>,
+#[derive(Debug)]
+struct ArcStateStore<K, V> {
+ store: Arc<RwLock<HashMap<K, (V, usize)>>>,
+}
+
+impl<K, V> PartialEq for ArcStateStore<K, V> {
+ fn eq(&self, other: &Self) -> bool {
+ Arc::ptr_eq(&self.store, &other.store)
+ }
}
-impl<K, V> Clone for StateStore<K, V> {
+impl<K, V> Clone for ArcStateStore<K, V> {
fn clone(&self) -> Self {
Self {
- store: self.store.clone(),
+ store: Arc::clone(&self.store)
}
}
}
-impl<K, V> StateStore<K, V> {
+impl<K, V> Eq for ArcStateStore<K, V> {}
+
+impl<K, V> ArcStateStore<K, V> {
pub fn new() -> Self {
Self {
- store: Rc::new(RefCell::new(HashMap::new())),
+ store: Arc::new(RwLock::new(HashMap::new())),
}
}
}
-impl<K: Eq + std::hash::Hash + Clone, V: Clone> StateStore<K, V> {
- pub fn store(&self, key: K, value: V) -> StateListener<K, V> {
- let mut store = self.store.borrow_mut();
- if let Some((v, count)) = store.get_mut(&key) {
- *v = value.clone();
- *count += 1;
- StateListener {
- value,
- cleaner: StateCleaner { key, _ty: PhantomData },
- }
- } else {
- store.insert(key.clone(), (value.clone(), 1));
- StateListener {
- value,
- cleaner: StateCleaner {
- key,
- _ty: PhantomData,
- }
- }
+#[derive(Debug)]
+struct StateStore<K, V, S = SyncStorage> {
+ inner: ArenaItem<ArcStateStore<K, V>, S>
+}
+
+impl<K, V, S> Dispose for StateStore<K, V, S> {
+ fn dispose(self) {
+ self.inner.dispose()
+ }
+}
+
+impl<K, V> StateStore<K, V> where K: Send + Sync + 'static, V: Send + Sync + 'static {
+ pub fn new() -> Self {
+ Self::new_with_storage()
+ }
+}
+
+impl<K, V, S> StateStore<K, V, S> where K: 'static, V: 'static, S: Storage<ArcStateStore<K, V>> {
+ pub fn new_with_storage() -> Self {
+ Self {
+ inner: ArenaItem::new_with_storage(ArcStateStore::new()),
}
}
+}
- pub fn init() {
+impl<K, V> StateStore<K, V, LocalStorage> where K: 'static, V: 'static {
+ pub fn new_local() -> Self {
+ Self::new_with_storage()
+ }
+}
+impl<K: std::marker::Send + std::marker::Sync + 'static, V: std::marker::Send + std::marker::Sync + 'static> From<ArcStateStore<K, V>> for StateStore<K, V> {
+ fn from(value: ArcStateStore<K, V>) -> Self {
+ Self {
+ inner: ArenaItem::new_with_storage(value),
+ }
}
}
-impl<K: Eq + std::hash::Hash , V> StateStore<K, V> {
+impl<K: 'static, V: 'static> FromLocal<ArcStateStore<K, V>> for StateStore<K, V, LocalStorage> {
+ fn from_local(value: ArcStateStore<K, V>) -> Self {
+ Self {
+ inner: ArenaItem::new_with_storage(value),
+ }
+ }
+}
+
+impl<K, V, S> Copy for StateStore<K, V, S> {}
+
+impl<K, V, S> Clone for StateStore<K, V, S> {
+ fn clone(&self) -> Self {
+ *self
+ }
+}
+
+impl<K: Eq + std::hash::Hash + Clone, V: Clone> StateStore<K, V> where K: Send + Sync + 'static, V: Send + Sync + 'static {
+ pub fn store(&self, key: K, value: V) -> StateListener<K, V> {
+ {
+ let store = self.inner.try_get_value().unwrap();
+ let mut store = store.store.write().unwrap();
+ debug!("got store");
+ debug!("got store 2");
+ if let Some((v, count)) = store.get_mut(&key) {
+ *v = value.clone();
+ *count += 1;
+ } else {
+ store.insert(key.clone(), (value.clone(), 1));
+ }
+ };
+ StateListener {
+ value,
+ cleaner: StateCleaner { key, state_store: self.clone() },
+ }
+ }
+}
+
+impl<K, V> StateStore<K, V> where K: Eq + std::hash::Hash + Send + Sync + 'static, V: Send + Sync + 'static {
pub fn update(&self, key: &K, value: V) {
- if let Some((v, _)) = self.store.borrow_mut().get_mut(key) {
+ let store = self.inner.try_get_value().unwrap();
+ let mut store = store.store.write().unwrap();
+ if let Some((v, _)) = store.get_mut(key) {
*v = value;
}
}
pub fn modify(&self, key: &K, modify: impl Fn(&mut V)) {
- if let Some((v, _)) = self.store.borrow_mut().get_mut(key) {
+ let store = self.inner.try_get_value().unwrap();
+ let mut store = store.store.write().unwrap();
+ if let Some((v, _)) = store.get_mut(key) {
modify(v);
}
}
fn remove(&self, key: &K) {
- let mut store = self.store.borrow_mut();
+ let store = self.inner.try_get_value().unwrap();
+ let mut store = store.store.write().unwrap();
if let Some((_v, count)) = store.get_mut(key) {
*count -= 1;
if *count == 0 {
store.remove(key);
+ debug!("dropped item from store");
}
}
}
}
#[derive(Clone)]
-struct StateListener<K, V> where K: Eq + std::hash::Hash + 'static, V: 'static {
+struct StateListener<K, V> where K: Eq + std::hash::Hash + 'static + std::marker::Send + std::marker::Sync, V: 'static + std::marker::Send + std::marker::Sync {
value: V,
cleaner: StateCleaner<K, V>
}
-impl<K: std::cmp::Eq + std::hash::Hash, V> Deref for StateListener<K, V> {
+impl<K: std::cmp::Eq + std::hash::Hash + std::marker::Send + std::marker::Sync, V: std::marker::Send + std::marker::Sync> Deref for StateListener<K, V> {
type Target = V;
fn deref(&self) -> &Self::Target {
@@ -359,35 +491,41 @@ impl<K: std::cmp::Eq + std::hash::Hash, V> Deref for StateListener<K, V> {
}
}
-impl<K: std::cmp::Eq + std::hash::Hash, V> DerefMut for StateListener<K, V> {
+impl<K: std::cmp::Eq + std::hash::Hash + Send + Sync, V: Send + Sync> DerefMut for StateListener<K, V> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.value
}
}
-struct StateCleaner<K, V> where K: Eq + std::hash::Hash + 'static, V: 'static {
+struct ArcStateCleaner<K, V> {
+ key: K,
+ state_store: ArcStateStore<K, V>,
+}
+
+struct StateCleaner<K, V> where K: Eq + std::hash::Hash + Send + Sync + 'static, V: Send + Sync + 'static {
key: K,
- _ty: PhantomData<V>,
- // state_store: StateStore<K, V>,
+ state_store: StateStore<K, V>,
}
-impl<K, V> Clone for StateCleaner<K, V> where K: Eq + std::hash::Hash + Clone {
+impl<K, V> Clone for StateCleaner<K, V> where K: Eq + std::hash::Hash + Clone + Send + Sync, V: Send + Sync {
fn clone(&self) -> Self {
- let state_store = use_context::<RwSignal<StateStore<K, V>, LocalStorage>>().unwrap();
- if let Some((_v, count)) = state_store.read_untracked().store.borrow_mut().get_mut(&self.key) {
- *count += 1;
+ {
+ let store = self.state_store.inner.try_get_value().unwrap();
+ let mut store = store.store.write().unwrap();
+ if let Some((_v, count)) = store.get_mut(&self.key) {
+ *count += 1;
+ }
}
Self {
key: self.key.clone(),
- _ty: PhantomData,
+ state_store: self.state_store.clone(),
}
}
}
-impl<K: Eq + std::hash::Hash + 'static, V: 'static> Drop for StateCleaner<K, V> {
+impl<K: Eq + std::hash::Hash + Send + Sync + 'static, V: Send + Sync + 'static> Drop for StateCleaner<K, V> {
fn drop(&mut self) {
- let state_store = use_context::<RwSignal<StateStore<K, V>, LocalStorage>>().unwrap();
- state_store.read_untracked().remove(&self.key)
+ self.state_store.remove(&self.key);
}
}
@@ -399,10 +537,10 @@ struct MacawChat {
impl MacawChat {
fn got_chat_and_user(chat: Chat, user: User) -> Self {
- let chat_state_store: RwSignal<StateStore<JID, Store<Chat>>, LocalStorage> = use_context().expect("no chat state store");
- let user_state_store: RwSignal<StateStore<JID, Store<User>>, LocalStorage> = use_context().expect("no user state store");
- let user = user_state_store.read_untracked().store(user.jid.clone(), Store::new(user));
- let chat = chat_state_store.read_untracked().store(chat.correspondent.clone(), Store::new(chat));
+ let chat_state_store: StateStore<JID, Store<Chat>> = use_context().expect("no chat state store");
+ let user_state_store: StateStore<JID, Store<User>> = use_context().expect("no user state store");
+ let user = user_state_store.store(user.jid.clone(), Store::new(user));
+ let chat = chat_state_store.store(chat.correspondent.clone(), Store::new(chat));
Self {
chat,
user,
@@ -432,10 +570,10 @@ struct MacawMessage {
impl MacawMessage {
fn got_message_and_user(message: Message, user: User) -> Self {
- let message_state_store: RwSignal<StateStore<Uuid, Store<Message>>, LocalStorage> = use_context().expect("no message state store");
- let user_state_store: RwSignal<StateStore<JID, Store<User>>, LocalStorage> = use_context().expect("no user state store");
- let message = message_state_store.read_untracked().store(message.id, Store::new(message));
- let user = user_state_store.read_untracked().store(user.jid.clone(), Store::new(user));
+ let message_state_store: StateStore<Uuid, Store<Message>> = use_context().expect("no message state store");
+ let user_state_store: StateStore<JID, Store<User>> = use_context().expect("no user state store");
+ let message = message_state_store.store(message.id, Store::new(message));
+ let user = user_state_store.store(user.jid.clone(), Store::new(user));
Self {
message,
user,
@@ -477,7 +615,6 @@ impl DerefMut for MacawUser {
#[component]
fn ChatsList() -> impl IntoView {
- let client = use_context::<Client>().expect("client not in context");
let (chats, set_chats) = signal(IndexMap::new());
let load_chats = LocalResource::new(move || async move {
@@ -497,25 +634,27 @@ fn ChatsList() -> impl IntoView {
});
// TODO: filter new messages signal
- let new_messages_signal: ReadSignal<Option<(JID, MacawMessage)>> = use_context().unwrap();
- spawn_local(async move {
- let mut new_message = new_messages_signal.to_stream();
+ let new_messages_signal: RwSignal<MessageSubscriptions> = use_context().unwrap();
+ let _load_new_messages = LocalResource::new(move || async move {
load_chats.await;
- while let Some(new_message) = new_message.next().await {
+ let mut new_messages = new_messages_signal.write().subscribe_all();
+ while let Some((to, new_message)) = new_messages.recv().await {
debug!("got new message in let");
- if let Some((to, new_message)) = new_message {
- if let Some((chat, _latest_message)) = set_chats.write().shift_remove(&to) {
- debug!("chat existed");
- set_chats.write().insert_before(0, to, (chat, new_message));
- debug!("done setting");
- } else {
- debug!("the chat didn't exist");
- let chat = client.get_chat(to.clone()).await.unwrap();
- let user = client.get_user(to.clone()).await.unwrap();
- let chat = MacawChat::got_chat_and_user(chat, user);
- set_chats.write().insert_before(0, to, (chat, new_message));
- debug!("done setting");
- }
+ let mut chats = set_chats.write();
+ if let Some((chat, _latest_message)) = chats.shift_remove(&to) {
+ debug!("chat existed");
+ chats.insert_before(0, to, (chat.clone(), new_message));
+ debug!("done setting");
+ } else {
+ debug!("the chat didn't exist");
+ let client = use_context::<Client>().expect("client not in context");
+ let chat = client.get_chat(to.clone()).await.unwrap();
+ let user = client.get_user(to.clone()).await.unwrap();
+ debug!("before got chat");
+ let chat = MacawChat::got_chat_and_user(chat, user);
+ debug!("after got chat");
+ chats.insert_before(0, to, (chat, new_message));
+ debug!("done setting");
}
}
debug!("set the new message");