use std::{
borrow::Borrow,
cell::RefCell,
collections::HashMap,
marker::PhantomData,
ops::{Deref, DerefMut},
rc::Rc,
str::FromStr,
sync::{atomic::AtomicUsize, Arc, RwLock},
thread::sleep,
time::{self, Duration},
};
use filamento::{
chat::{Chat, Message},
db::Db,
error::{CommandError, ConnectionError, DatabaseError},
files::FilesMem,
user::User,
UpdateMessage,
};
use futures::stream::StreamExt;
use indexmap::IndexMap;
use jid::JID;
use leptos::{
prelude::*,
task::{spawn, spawn_local},
};
use leptos_meta::Stylesheet;
use reactive_stores::Store;
use stylance::import_style;
use thiserror::Error;
use tokio::sync::{mpsc::{self, Receiver}, Mutex};
use tracing::debug;
use uuid::Uuid;
pub enum AppState {
LoggedOut,
LoggedIn,
}
#[derive(Clone)]
pub struct Client {
client: filamento::Client<FilesMem>,
jid: Arc<JID>,
file_store: FilesMem,
}
impl Deref for Client {
type Target = filamento::Client<FilesMem>;
fn deref(&self) -> &Self::Target {
&self.client
}
}
impl DerefMut for Client {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.client
}
}
#[component]
pub fn App() -> impl IntoView {
let (app, set_app) = signal(AppState::LoggedOut);
let client: RwSignal<Option<(Client, Receiver<UpdateMessage>)>> = RwSignal::new(None);
view! {
{move || match &*app.read() {
AppState::LoggedOut => view! { <LoginPage set_app set_client=client /> }.into_any(),
AppState::LoggedIn => {
if let Some((client, updates)) = client.write_untracked().take() {
view! { <Macaw client updates /> }.into_any()
} else {
set_app.set(AppState::LoggedOut);
view! { <LoginPage set_app set_client=client /> }.into_any()
}
}
}}
}
}
#[derive(Clone, Debug, Error)]
pub enum LoginError {
#[error("Missing Password")]
MissingPassword,
#[error("Missing JID")]
MissingJID,
#[error("Invalid JID: {0}")]
InvalidJID(#[from] jid::ParseError),
#[error("Connection Error: {0}")]
ConnectionError(#[from] CommandError<ConnectionError>),
}
#[component]
fn LoginPage(set_app: WriteSignal<AppState>, set_client: RwSignal<Option<(Client, Receiver<UpdateMessage>)>>) -> impl IntoView {
let jid = RwSignal::new("".to_string());
let password = RwSignal::new("".to_string());
let remember_me = RwSignal::new(false);
let connect_on_login = RwSignal::new(true);
let (error, set_error) = signal(None::<LoginError>);
let error_message = move || {
error.with(|error| {
if let Some(error) = error {
view! { <div class="error">{error.to_string()}</div> }.into_any()
} else {
view! {}.into_any()
}
})
};
let (login_pending, set_login_pending) = signal(false);
let login = Action::new_local(move |_| {
async move {
set_login_pending.set(true);
if jid.read_untracked().is_empty() {
set_error.set(Some(LoginError::MissingJID));
set_login_pending.set(false);
return
}
if password.read_untracked().is_empty() {
set_error.set(Some(LoginError::MissingPassword));
set_login_pending.set(false);
return
}
let jid = match JID::from_str(&jid.read_untracked()) {
Ok(j) => j,
Err(e) => {
set_error.set(Some(e.into()));
set_login_pending.set(false);
return
},
};
// initialise the client
let db = Db::create_connect_and_migrate("mem.db").await.unwrap();
let files_mem = FilesMem::new();
let (client, updates) =
filamento::Client::new(jid.clone(), password.read_untracked().clone(), db, files_mem.clone());
// TODO: remember_me
let client = Client {
client,
jid: Arc::new(jid),
file_store: files_mem,
};
if *connect_on_login.read_untracked() {
match client.connect().await {
Ok(_) => {},
Err(e) => {
set_error.set(Some(e.into()));
set_login_pending.set(false);
return
},
}
}
// debug!("before setting app state");
set_client.set(Some((client, updates)));
set_app.set(AppState::LoggedIn);
}
});
view! {
<div class="center fill">
<div id="login-form" class="panel">
<div id="hero">
<img src="/assets/icon.png" />
<h1>Macaw Instant Messenger</h1>
</div>
{error_message}
<form on:submit=move |ev| {
ev.prevent_default();
login.dispatch(());
}>
<label for="jid">JID</label>
<input
disabled=login_pending
placeholder="caw@macaw.chat"
type="text"
bind:value=jid
name="jid"
id="jid"
autofocus="true"
/>
<label for="password">Password</label>
<input
disabled=login_pending
placeholder="••••••••"
type="password"
bind:value=password
name="password"
id="password"
/>
<div>
<label for="remember_me">Remember me</label>
<input
disabled=login_pending
type="checkbox"
bind:checked=remember_me
name="remember_me"
id="remember_me"
/>
</div>
<div>
<label for="connect_on_login">Connect on login</label>
<input
disabled=login_pending
type="checkbox"
bind:checked=connect_on_login
name="connect_on_login"
id="connect_on_login"
/>
</div>
<input disabled=login_pending class="button" type="submit" value="Log In" />
</form>
</div>
</div>
}
}
pub struct MessageSubscriptions {
all: Vec<mpsc::Sender<(JID, MacawMessage)>>,
subset: HashMap<JID, Vec<mpsc::Sender<MacawMessage>>>,
}
impl MessageSubscriptions {
pub fn new() -> Self {
Self {
all: Vec::new(),
subset: HashMap::new(),
}
}
pub async fn broadcast(
&mut self,
to: JID,
message: MacawMessage,
) {
// subscriptions to all
let mut index = 0;
while let Some(sender) = self.all.get(index) {
match sender.send((to.clone(), message.clone())).await {
Ok(_) => {
index += 1;
}
Err(_) => {
self.all.swap_remove(index);
}
}
}
// subscriptions to specific chat
if let Some(subscribers) = self.subset.get_mut(&to) {
let mut index = 0;
while let Some(sender) = subscribers.get(index) {
match sender.send(message.clone()).await {
Ok(_) => {
index += 1;
}
Err(_) => {
subscribers.swap_remove(index);
}
}
}
if subscribers.is_empty() {
self.subset.remove(&to);
}
}
}
pub fn subscribe_all(
&mut self,
) -> Receiver<(JID, MacawMessage)> {
let (send, recv) = mpsc::channel(10);
self.all.push(send);
recv
}
pub fn subscribe_chat(
&mut self,
chat: JID,
) -> Receiver<MacawMessage> {
let (send, recv) = mpsc::channel(10);
if let Some(chat_subscribers) = self.subset.get_mut(&chat) {
chat_subscribers.push(send);
} else {
self.subset.insert(chat, vec![send]);
}
recv
}
}
#[component]
fn Macaw(
// TODO: logout
// app_state: WriteSignal<Option<essage>)>, LocalStorage>,
client: Client,
mut updates: Receiver<UpdateMessage>,
) -> impl IntoView {
provide_context(client);
let message_subscriptions = RwSignal::new(MessageSubscriptions::new());
provide_context(message_subscriptions);
let messages_store: StateStore<Uuid, Store<Message>> = StateStore::new();
provide_context(messages_store);
let chats_store: StateStore<JID, Store<Chat>> = StateStore::new();
provide_context(chats_store);
let users_store: StateStore<JID, Store<User>> = StateStore::new();
provide_context(users_store);
OnceResource::new(async move {
while let Some(update) = updates.recv().await {
match update {
UpdateMessage::Online(online, items) => {},
UpdateMessage::Offline(offline) => {},
UpdateMessage::RosterUpdate(contact, user) => {},
UpdateMessage::RosterDelete(jid) => {},
UpdateMessage::Presence { from, presence } => {},
UpdateMessage::Message { to, from, message } => {
// debug!("before got message");
let new_message = MacawMessage::got_message_and_user(message, from);
// debug!("after got message");
spawn_local(async move { message_subscriptions.write_untracked().broadcast(to, new_message).await });
// debug!("after set message");
},
UpdateMessage::MessageDelivery { id, chat, delivery } => {},
UpdateMessage::SubscriptionRequest(jid) => {},
UpdateMessage::NickChanged { jid, nick } => {},
UpdateMessage::AvatarChanged { jid, id } => {},
}
}
});
view! { <ChatsList /> }
}
// V has to be an arc signal
#[derive(Debug)]
struct ArcStateStore<K, V> {
store: Arc<RwLock<HashMap<K, (V, usize)>>>,
}
impl<K, V> PartialEq for ArcStateStore<K, V> {
fn eq(&self, other: &Self) -> bool {
Arc::ptr_eq(&self.store, &other.store)
}
}
impl<K, V> Clone for ArcStateStore<K, V> {
fn clone(&self) -> Self {
Self {
store: Arc::clone(&self.store)
}
}
}
impl<K, V> Eq for ArcStateStore<K, V> {}
impl<K, V> ArcStateStore<K, V> {
pub fn new() -> Self {
Self {
store: Arc::new(RwLock::new(HashMap::new())),
}
}
}
#[derive(Debug)]
struct StateStore<K, V, S = SyncStorage> {
inner: ArenaItem<ArcStateStore<K, V>, S>
}
impl<K, V, S> Dispose for StateStore<K, V, S> {
fn dispose(self) {
self.inner.dispose()
}
}
impl<K, V> StateStore<K, V> where K: Send + Sync + 'static, V: Send + Sync + 'static {
pub fn new() -> Self {
Self::new_with_storage()
}
}
impl<K, V, S> StateStore<K, V, S> where K: 'static, V: 'static, S: Storage<ArcStateStore<K, V>> {
pub fn new_with_storage() -> Self {
Self {
inner: ArenaItem::new_with_storage(ArcStateStore::new()),
}
}
}
impl<K, V> StateStore<K, V, LocalStorage> where K: 'static, V: 'static {
pub fn new_local() -> Self {
Self::new_with_storage()
}
}
impl<K: std::marker::Send + std::marker::Sync + 'static, V: std::marker::Send + std::marker::Sync + 'static> From<ArcStateStore<K, V>> for StateStore<K, V> {
fn from(value: ArcStateStore<K, V>) -> Self {
Self {
inner: ArenaItem::new_with_storage(value),
}
}
}
impl<K: 'static, V: 'static> FromLocal<ArcStateStore<K, V>> for StateStore<K, V, LocalStorage> {
fn from_local(value: ArcStateStore<K, V>) -> Self {
Self {
inner: ArenaItem::new_with_storage(value),
}
}
}
impl<K, V, S> Copy for StateStore<K, V, S> {}
impl<K, V, S> Clone for StateStore<K, V, S> {
fn clone(&self) -> Self {
*self
}
}
impl<K: Eq + std::hash::Hash + Clone, V: Clone> StateStore<K, V> where K: Send + Sync + 'static, V: Send + Sync + 'static {
pub fn store(&self, key: K, value: V) -> StateListener<K, V> {
{
let store = self.inner.try_get_value().unwrap();
let mut store = store.store.write().unwrap();
debug!("got store");
debug!("got store 2");
if let Some((v, count)) = store.get_mut(&key) {
*v = value.clone();
*count += 1;
} else {
store.insert(key.clone(), (value.clone(), 1));
}
};
StateListener {
value,
cleaner: StateCleaner { key, state_store: self.clone() },
}
}
}
impl<K, V> StateStore<K, V> where K: Eq + std::hash::Hash + Send + Sync + 'static, V: Send + Sync + 'static {
pub fn update(&self, key: &K, value: V) {
let store = self.inner.try_get_value().unwrap();
let mut store = store.store.write().unwrap();
if let Some((v, _)) = store.get_mut(key) {
*v = value;
}
}
pub fn modify(&self, key: &K, modify: impl Fn(&mut V)) {
let store = self.inner.try_get_value().unwrap();
let mut store = store.store.write().unwrap();
if let Some((v, _)) = store.get_mut(key) {
modify(v);
}
}
fn remove(&self, key: &K) {
let store = self.inner.try_get_value().unwrap();
let mut store = store.store.write().unwrap();
if let Some((_v, count)) = store.get_mut(key) {
*count -= 1;
if *count == 0 {
store.remove(key);
debug!("dropped item from store");
}
}
}
}
#[derive(Clone)]
struct StateListener<K, V> where K: Eq + std::hash::Hash + 'static + std::marker::Send + std::marker::Sync, V: 'static + std::marker::Send + std::marker::Sync {
value: V,
cleaner: StateCleaner<K, V>
}
impl<K: std::cmp::Eq + std::hash::Hash + std::marker::Send + std::marker::Sync, V: std::marker::Send + std::marker::Sync> Deref for StateListener<K, V> {
type Target = V;
fn deref(&self) -> &Self::Target {
&self.value
}
}
impl<K: std::cmp::Eq + std::hash::Hash + Send + Sync, V: Send + Sync> DerefMut for StateListener<K, V> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.value
}
}
struct ArcStateCleaner<K, V> {
key: K,
state_store: ArcStateStore<K, V>,
}
struct StateCleaner<K, V> where K: Eq + std::hash::Hash + Send + Sync + 'static, V: Send + Sync + 'static {
key: K,
state_store: StateStore<K, V>,
}
impl<K, V> Clone for StateCleaner<K, V> where K: Eq + std::hash::Hash + Clone + Send + Sync, V: Send + Sync {
fn clone(&self) -> Self {
{
let store = self.state_store.inner.try_get_value().unwrap();
let mut store = store.store.write().unwrap();
if let Some((_v, count)) = store.get_mut(&self.key) {
*count += 1;
}
}
Self {
key: self.key.clone(),
state_store: self.state_store.clone(),
}
}
}
impl<K: Eq + std::hash::Hash + Send + Sync + 'static, V: Send + Sync + 'static> Drop for StateCleaner<K, V> {
fn drop(&mut self) {
self.state_store.remove(&self.key);
}
}
#[derive(Clone)]
struct MacawChat {
chat: StateListener<JID, Store<Chat>>,
user: StateListener<JID, Store<User>>,
}
impl MacawChat {
fn got_chat_and_user(chat: Chat, user: User) -> Self {
let chat_state_store: StateStore<JID, Store<Chat>> = use_context().expect("no chat state store");
let user_state_store: StateStore<JID, Store<User>> = use_context().expect("no user state store");
let user = user_state_store.store(user.jid.clone(), Store::new(user));
let chat = chat_state_store.store(chat.correspondent.clone(), Store::new(chat));
Self {
chat,
user,
}
}
}
impl Deref for MacawChat {
type Target = StateListener<JID, Store<Chat>>;
fn deref(&self) -> &Self::Target {
&self.chat
}
}
impl DerefMut for MacawChat {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.chat
}
}
#[derive(Clone)]
struct MacawMessage {
message: StateListener<Uuid, Store<Message>>,
user: StateListener<JID, Store<User>>,
}
impl MacawMessage {
fn got_message_and_user(message: Message, user: User) -> Self {
let message_state_store: StateStore<Uuid, Store<Message>> = use_context().expect("no message state store");
let user_state_store: StateStore<JID, Store<User>> = use_context().expect("no user state store");
let message = message_state_store.store(message.id, Store::new(message));
let user = user_state_store.store(user.jid.clone(), Store::new(user));
Self {
message,
user,
}
}
}
impl Deref for MacawMessage {
type Target = StateListener<Uuid, Store<Message>>;
fn deref(&self) -> &Self::Target {
&self.message
}
}
impl DerefMut for MacawMessage {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.message
}
}
struct MacawUser {
user: StateListener<JID, Store<User>>,
}
impl Deref for MacawUser {
type Target = StateListener<JID, Store<User>>;
fn deref(&self) -> &Self::Target {
&self.user
}
}
impl DerefMut for MacawUser {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.user
}
}
#[component]
fn ChatsList() -> impl IntoView {
let (chats, set_chats) = signal(IndexMap::new());
let load_chats = LocalResource::new(move || async move {
let client = use_context::<Client>().expect("client not in context");
let chats = client.get_chats_ordered_with_latest_messages_and_users().await.map_err(|e| e.to_string());
match chats {
Ok(c) => {
let chats = c.into_iter().map(|((chat, chat_user), (message, message_user))| {
(chat.correspondent.clone(), (MacawChat::got_chat_and_user(chat, chat_user), MacawMessage::got_message_and_user(message, message_user)))
}).collect::<IndexMap<JID, _>>();
set_chats.set(chats);
},
Err(_) => {
// TODO: show error message at top of chats list
},
}
});
// TODO: filter new messages signal
let new_messages_signal: RwSignal<MessageSubscriptions> = use_context().unwrap();
let _load_new_messages = LocalResource::new(move || async move {
load_chats.await;
let mut new_messages = new_messages_signal.write().subscribe_all();
while let Some((to, new_message)) = new_messages.recv().await {
debug!("got new message in let");
let mut chats = set_chats.write();
if let Some((chat, _latest_message)) = chats.shift_remove(&to) {
debug!("chat existed");
chats.insert_before(0, to, (chat.clone(), new_message));
debug!("done setting");
} else {
debug!("the chat didn't exist");
let client = use_context::<Client>().expect("client not in context");
let chat = client.get_chat(to.clone()).await.unwrap();
let user = client.get_user(to.clone()).await.unwrap();
debug!("before got chat");
let chat = MacawChat::got_chat_and_user(chat, user);
debug!("after got chat");
chats.insert_before(0, to, (chat, new_message));
debug!("done setting");
}
}
debug!("set the new message");
});
view! {
<div class="chats-list panel">
<h2>Chats</h2>
<div>
<For each=move || chats.get() key=|chat| chat.0.clone() let(chat)>
<p>{chat.0.to_string()}</p>
</For>
</div>
</div>
}
}