From 5272456beab233acb53a140db6a85da5f4ccd2d2 Mon Sep 17 00:00:00 2001 From: cel 🌸 Date: Tue, 25 Mar 2025 22:49:38 +0000 Subject: refactor(luz): separate logic from xmpp core --- luz/scratch | 87 ++ luz/src/connection/mod.rs | 263 +++--- luz/src/connection/read.rs | 383 ++------- luz/src/connection/write.rs | 50 +- luz/src/lib.rs | 1884 +++++++++++++++++++++++++------------------ luz/src/main.rs | 6 +- 6 files changed, 1390 insertions(+), 1283 deletions(-) create mode 100644 luz/scratch diff --git a/luz/scratch b/luz/scratch new file mode 100644 index 0000000..9954aef --- /dev/null +++ b/luz/scratch @@ -0,0 +1,87 @@ +# logic: + +- db +- pending iqs +- ui update sender + + ## logic methods + + - handle_offline: called by lamp + - handle_online: called by lamp + - handle_stanza: called by read thread + + - handle_connect: called by lamp + - handle_disconnect: called by lamp + + - handle_error?: called by lamp handle threads and read thread for error logging + - handle_stream_error: called by supervisor when stream needs to be reset + + +# lamp: + +- login jid (bare or full) +- password provider +- lamp command receiver +- connected session state(connected, supervisorhandle to disconnect current connected session) +- connected state (if intended to be connected or not, for retrying reconnection every minute or something) +- on_crash for connection supervisor +- internal logic struct which has methods to handle logic commands + +# connected: + +- writehandle +- current full jid for connected session + +# supervisor: + +- control_recv +- read_thread_crash +- write_thread_crash +- read_control_handle +- write_control_handle +- on_crash +- connected +- password +- logic + +# read: + +must be passed around when crash +- supervisor_control +- tasks + +can be cloned from supervisor +- connected . +- logic . + +can be recreated by supervisor +- stream +- disconnecting +- disconnect_timedout +- on_crash +- control_recv + +# write: + +must be passed around when crash +- stanza_recv + +can be recreated by supervisor +- stream +- on_crash +- control_recv + + +message types: + +command: + +- getroster +- send message +- etc. + +lamp commands: + +- connect +- disconnect +- command(command) diff --git a/luz/src/connection/mod.rs b/luz/src/connection/mod.rs index 84109bc..e209d47 100644 --- a/luz/src/connection/mod.rs +++ b/luz/src/connection/mod.rs @@ -9,139 +9,120 @@ use std::{ use jabber::{connection::Tls, jabber_stream::bound_stream::BoundJabberStream}; use jid::JID; -use read::{ReadControl, ReadControlHandle}; +use read::{ReadControl, ReadControlHandle, ReadState}; use stanza::client::Stanza; use tokio::{ sync::{mpsc, oneshot, Mutex}, task::{JoinHandle, JoinSet}, }; use tracing::info; -use write::{WriteControl, WriteControlHandle, WriteHandle, WriteMessage}; +use write::{WriteControl, WriteControlHandle, WriteHandle, WriteMessage, WriteState}; use crate::{ db::Db, error::{Error, ReadError, WriteError}, - UpdateMessage, + Connected, LogicState, UpdateMessage, }; mod read; pub(crate) mod write; pub struct Supervisor { - connection_commands: mpsc::Receiver, - writer_crash: oneshot::Receiver<(WriteMessage, mpsc::Receiver)>, - reader_crash: oneshot::Receiver<( - Db, - mpsc::Sender, - tokio::task::JoinSet<()>, - mpsc::Sender, - WriteHandle, - Arc>>>>, - )>, - sender: mpsc::Sender, - writer_handle: WriteControlHandle, - reader_handle: ReadControlHandle, - on_shutdown: oneshot::Sender<()>, - jid: Arc>, + command_recv: mpsc::Receiver, + reader_crash: oneshot::Receiver, + writer_crash: oneshot::Receiver<(WriteMessage, WriteState)>, + read_control_handle: ReadControlHandle, + write_control_handle: WriteControlHandle, + on_crash: oneshot::Sender<()>, + // jid in connected stays the same over the life of the supervisor (the connection session) + connected: Connected, password: Arc, + logic: LogicState, } pub enum SupervisorCommand { Disconnect, // for if there was a stream error, require to reconnect // couldn't stream errors just cause a crash? lol - Reconnect(State), + Reconnect(ChildState), } -pub enum State { - Write(mpsc::Receiver), - Read( - ( - Db, - mpsc::Sender, - tokio::task::JoinSet<()>, - mpsc::Sender, - WriteHandle, - Arc>>>>, - ), - ), +pub enum ChildState { + Write(WriteState), + Read(ReadState), } impl Supervisor { fn new( - connection_commands: mpsc::Receiver, - writer_crash: oneshot::Receiver<(WriteMessage, mpsc::Receiver)>, - reader_crash: oneshot::Receiver<( - Db, - mpsc::Sender, - JoinSet<()>, - mpsc::Sender, - WriteHandle, - Arc>>>>, - )>, - sender: mpsc::Sender, - writer_handle: WriteControlHandle, - reader_handle: ReadControlHandle, - on_shutdown: oneshot::Sender<()>, - jid: Arc>, + command_recv: mpsc::Receiver, + reader_crash: oneshot::Receiver, + writer_crash: oneshot::Receiver<(WriteMessage, WriteState)>, + read_control_handle: ReadControlHandle, + write_control_handle: WriteControlHandle, + on_crash: oneshot::Sender<()>, + connected: Connected, password: Arc, + logic: LogicState, ) -> Self { Self { - connection_commands, - writer_crash, - sender, - writer_handle, - reader_handle, + command_recv, reader_crash, - on_shutdown, - jid, + writer_crash, + read_control_handle, + write_control_handle, + on_crash, + connected, password, + logic, } } async fn run(mut self) { loop { tokio::select! { - Some(msg) = self.connection_commands.recv() => { + Some(msg) = self.command_recv.recv() => { match msg { SupervisorCommand::Disconnect => { info!("disconnecting"); - let _ = self.writer_handle.send(WriteControl::Disconnect).await; - let _ = self.reader_handle.send(ReadControl::Disconnect).await; + // TODO: do handle_disconnect here + let _ = self.write_control_handle.send(WriteControl::Disconnect).await; + let _ = self.read_control_handle.send(ReadControl::Disconnect).await; info!("sent disconnect command"); tokio::select! { _ = async { tokio::join!( - async { let _ = (&mut self.writer_handle.handle).await; }, - async { let _ = (&mut self.reader_handle.handle).await; } + async { let _ = (&mut self.write_control_handle.handle).await; }, + async { let _ = (&mut self.read_control_handle.handle).await; } ) } => {}, + // TODO: config timeout _ = async { tokio::time::sleep(Duration::from_secs(5)) } => { - (&mut self.reader_handle.handle).abort(); - (&mut self.writer_handle.handle).abort(); + (&mut self.read_control_handle.handle).abort(); + (&mut self.write_control_handle.handle).abort(); } } info!("disconnected"); break; }, + // TODO: Reconnect without aborting, gentle reconnect. SupervisorCommand::Reconnect(state) => { // TODO: please omfg // send abort to read stream, as already done, consider let (read_state, mut write_state); match state { // TODO: proper state things for read and write thread - State::Write(receiver) => { + ChildState::Write(receiver) => { write_state = receiver; let (send, recv) = oneshot::channel(); - let _ = self.reader_handle.send(ReadControl::Abort(send)).await; + let _ = self.read_control_handle.send(ReadControl::Abort(send)).await; if let Ok(state) = recv.await { read_state = state; } else { break } }, - State::Read(read) => { + ChildState::Read(read) => { read_state = read; let (send, recv) = oneshot::channel(); - let _ = self.writer_handle.send(WriteControl::Abort(send)).await; + let _ = self.write_control_handle.send(WriteControl::Abort(send)).await; // TODO: need a tokio select, in case the state arrives from somewhere else if let Ok(state) = recv.await { write_state = state; @@ -151,103 +132,99 @@ impl Supervisor { }, } - let mut jid = self.jid.lock().await; + let mut jid = self.connected.jid.clone(); let mut domain = jid.domainpart.clone(); + // TODO: make sure connect_and_login does not modify the jid, but instead returns a jid. or something like that let connection = jabber::connect_and_login(&mut jid, &*self.password, &mut domain).await; match connection { Ok(c) => { - let (read, write) = c.split(); let (send, recv) = oneshot::channel(); self.writer_crash = recv; - self.writer_handle = - WriteControlHandle::reconnect(write, send, write_state); + self.write_control_handle = + WriteControlHandle::reconnect(write, send, write_state.stanza_recv); let (send, recv) = oneshot::channel(); self.reader_crash = recv; - let (db, update_sender, tasks, supervisor_command, write_sender, pending_iqs) = read_state; - self.reader_handle = ReadControlHandle::reconnect( + self.read_control_handle = ReadControlHandle::reconnect( read, + read_state.tasks, + self.connected.clone(), + self.logic.clone(), + read_state.supervisor_control, send, - db, - update_sender, - supervisor_command, - write_sender, - tasks, - pending_iqs, ); }, Err(e) => { // if reconnection failure, respond to all current write messages with lost connection error. the received processes should complete themselves. - write_state.close(); - while let Some(msg) = write_state.recv().await { + write_state.stanza_recv.close(); + while let Some(msg) = write_state.stanza_recv.recv().await { let _ = msg.respond_to.send(Err(WriteError::LostConnection)); } // TODO: is this the correct error? - let _ = self.sender.send(UpdateMessage::Error(Error::LostConnection)).await; + let _ = self.logic.update_sender.send(UpdateMessage::Error(Error::LostConnection)).await; break; }, } }, } }, - Ok((write_msg, mut write_recv)) = &mut self.writer_crash => { + Ok((write_msg, mut write_state)) = &mut self.writer_crash => { // consider awaiting/aborting the read and write threads let (send, recv) = oneshot::channel(); - let _ = self.reader_handle.send(ReadControl::Abort(send)).await; - let (db, update_sender, tasks, supervisor_command, write_sender, pending_iqs) = tokio::select! { + let _ = self.read_control_handle.send(ReadControl::Abort(send)).await; + let read_state = tokio::select! { Ok(s) = recv => s, Ok(s) = &mut self.reader_crash => s, // in case, just break as irrecoverable else => break, }; - let mut jid = self.jid.lock().await; + let mut jid = self.connected.jid.clone(); let mut domain = jid.domainpart.clone(); + // TODO: same here let connection = jabber::connect_and_login(&mut jid, &*self.password, &mut domain).await; match connection { Ok(c) => { let (read, write) = c.split(); let (send, recv) = oneshot::channel(); self.writer_crash = recv; - self.writer_handle = - WriteControlHandle::reconnect_retry(write, send, write_msg, write_recv); + self.write_control_handle = + WriteControlHandle::reconnect_retry(write, send, write_state.stanza_recv, write_msg); let (send, recv) = oneshot::channel(); self.reader_crash = recv; - self.reader_handle = ReadControlHandle::reconnect( + self.read_control_handle = ReadControlHandle::reconnect( read, + read_state.tasks, + self.connected.clone(), + self.logic.clone(), + read_state.supervisor_control, send, - db, - update_sender, - supervisor_command, - write_sender, - tasks, - pending_iqs, ); }, Err(e) => { // if reconnection failure, respond to all current write messages with lost connection error. the received processes should complete themselves. - write_recv.close(); + write_state.stanza_recv.close(); let _ = write_msg.respond_to.send(Err(WriteError::LostConnection)); - while let Some(msg) = write_recv.recv().await { + while let Some(msg) = write_state.stanza_recv.recv().await { let _ = msg.respond_to.send(Err(WriteError::LostConnection)); } // TODO: is this the correct error to send? - let _ = self.sender.send(UpdateMessage::Error(Error::LostConnection)).await; + let _ = self.logic.update_sender.send(UpdateMessage::Error(Error::LostConnection)).await; break; }, } }, - Ok((db, update_sender, tasks, supervisor_control, write_handle, pending_iqs)) = &mut self.reader_crash => { + Ok(read_state) = &mut self.reader_crash => { let (send, recv) = oneshot::channel(); - let _ = self.writer_handle.send(WriteControl::Abort(send)).await; - let (retry_msg, mut write_receiver) = tokio::select! { + let _ = self.write_control_handle.send(WriteControl::Abort(send)).await; + let (retry_msg, mut write_state) = tokio::select! { Ok(s) = recv => (None, s), Ok(s) = &mut self.writer_crash => (Some(s.0), s.1), // in case, just break as irrecoverable else => break, }; - let mut jid = self.jid.lock().await; + let mut jid = self.connected.jid.clone(); let mut domain = jid.domainpart.clone(); let connection = jabber::connect_and_login(&mut jid, &*self.password, &mut domain).await; match connection { @@ -256,35 +233,33 @@ impl Supervisor { let (send, recv) = oneshot::channel(); self.writer_crash = recv; if let Some(msg) = retry_msg { - self.writer_handle = - WriteControlHandle::reconnect_retry(write, send, msg, write_receiver); + self.write_control_handle = + WriteControlHandle::reconnect_retry(write, send, write_state.stanza_recv, msg); } else { - self.writer_handle = WriteControlHandle::reconnect(write, send, write_receiver) + self.write_control_handle = WriteControlHandle::reconnect(write, send, write_state.stanza_recv) } let (send, recv) = oneshot::channel(); self.reader_crash = recv; - self.reader_handle = ReadControlHandle::reconnect( + self.read_control_handle = ReadControlHandle::reconnect( read, + read_state.tasks, + self.connected.clone(), + self.logic.clone(), + read_state.supervisor_control, send, - db, - update_sender, - supervisor_control, - write_handle, - tasks, - pending_iqs, ); }, Err(e) => { // if reconnection failure, respond to all current messages with lost connection error. - write_receiver.close(); + write_state.stanza_recv.close(); if let Some(msg) = retry_msg { msg.respond_to.send(Err(WriteError::LostConnection)); } - while let Some(msg) = write_receiver.recv().await { + while let Some(msg) = write_state.stanza_recv.recv().await { msg.respond_to.send(Err(WriteError::LostConnection)); } // TODO: is this the correct error? - let _ = self.sender.send(UpdateMessage::Error(Error::LostConnection)).await; + let _ = self.logic.update_sender.send(UpdateMessage::Error(Error::LostConnection)).await; break; }, } @@ -292,7 +267,8 @@ impl Supervisor { else => break, } } - let _ = self.on_shutdown.send(()); + // TODO: maybe don't just on_crash + let _ = self.on_crash.send(()); } } @@ -337,40 +313,47 @@ impl DerefMut for SupervisorSender { impl SupervisorHandle { pub fn new( streams: BoundJabberStream, - update_sender: mpsc::Sender, - db: Db, - on_shutdown: oneshot::Sender<()>, - jid: Arc>, + on_crash: oneshot::Sender<()>, + jid: JID, password: Arc, - pending_iqs: Arc>>>>, + logic: LogicState, ) -> (WriteHandle, Self) { - let (command_sender, command_receiver) = mpsc::channel(20); - let (writer_error_sender, writer_error_receiver) = oneshot::channel(); - let (reader_crash_sender, reader_crash_receiver) = oneshot::channel(); + let (command_send, command_recv) = mpsc::channel(20); + let (writer_crash_send, writer_crash_recv) = oneshot::channel(); + let (reader_crash_send, reader_crash_recv) = oneshot::channel(); + + let (read_stream, write_stream) = streams.split(); - let (reader, writer) = streams.split(); let (write_handle, write_control_handle) = - WriteControlHandle::new(writer, writer_error_sender); - let jabber_reader_control_handle = ReadControlHandle::new( - reader, - reader_crash_sender, - db, - update_sender.clone(), - command_sender.clone(), - write_handle.clone(), - pending_iqs, + WriteControlHandle::new(write_stream, writer_crash_send); + + let connected = Connected { + jid, + write_handle: write_handle.clone(), + }; + + let supervisor_sender = SupervisorSender { + sender: command_send, + }; + + let read_control_handle = ReadControlHandle::new( + read_stream, + connected.clone(), + logic.clone(), + supervisor_sender.clone(), + reader_crash_send, ); let actor = Supervisor::new( - command_receiver, - writer_error_receiver, - reader_crash_receiver, - update_sender, + command_recv, + reader_crash_recv, + writer_crash_recv, + read_control_handle, write_control_handle, - jabber_reader_control_handle, - on_shutdown, - jid, + on_crash, + connected, password, + logic, ); let handle = tokio::spawn(async move { actor.run().await }); @@ -378,9 +361,7 @@ impl SupervisorHandle { ( write_handle, Self { - sender: SupervisorSender { - sender: command_sender, - }, + sender: supervisor_sender, handle, }, ) diff --git a/luz/src/connection/read.rs b/luz/src/connection/read.rs index d510c5d..d310a12 100644 --- a/luz/src/connection/read.rs +++ b/luz/src/connection/read.rs @@ -22,66 +22,59 @@ use crate::{ error::{Error, IqError, MessageRecvError, PresenceError, ReadError, RosterError}, presence::{Offline, Online, Presence, PresenceType, Show}, roster::Contact, - UpdateMessage, + Connected, LogicState, UpdateMessage, }; -use super::{write::WriteHandle, SupervisorCommand}; +use super::{write::WriteHandle, SupervisorCommand, SupervisorSender}; +/// read actor pub struct Read { - control_receiver: mpsc::Receiver, stream: BoundJabberReader, - on_crash: oneshot::Sender<( - Db, - mpsc::Sender, - JoinSet<()>, - mpsc::Sender, - WriteHandle, - Arc>>>>, - )>, - db: Db, - update_sender: mpsc::Sender, - supervisor_control: mpsc::Sender, - write_handle: WriteHandle, - tasks: JoinSet<()>, disconnecting: bool, disconnect_timedout: oneshot::Receiver<()>, - // TODO: use proper stanza ids - pending_iqs: Arc>>>>, + + // all the threads spawned by the current connection session + tasks: JoinSet<()>, + + // for handling incoming stanzas + // jabber server must be able to both terminate the connection from error, and ask for data from the client (such as supported XEPs) + connected: Connected, + logic: LogicState, + supervisor_control: SupervisorSender, + + // control stuff + control_receiver: mpsc::Receiver, + on_crash: oneshot::Sender, +} + +/// when a crash/abort occurs, this gets sent back to the supervisor, so that the connection session can continue +pub struct ReadState { + pub supervisor_control: SupervisorSender, + // TODO: when a stream dies, the iq gets from the server should not be replied to on the new stream + pub tasks: JoinSet<()>, } impl Read { fn new( - control_receiver: mpsc::Receiver, stream: BoundJabberReader, - on_crash: oneshot::Sender<( - Db, - mpsc::Sender, - JoinSet<()>, - mpsc::Sender, - WriteHandle, - Arc>>>>, - )>, - db: Db, - update_sender: mpsc::Sender, - // jabber server must be able to both terminate the connection from error, and ask for data from the client (such as supported XEPs) - supervisor_control: mpsc::Sender, - write_handle: WriteHandle, tasks: JoinSet<()>, - pending_iqs: Arc>>>>, + connected: Connected, + logic: LogicState, + supervisor_control: SupervisorSender, + control_receiver: mpsc::Receiver, + on_crash: oneshot::Sender, ) -> Self { let (_send, recv) = oneshot::channel(); Self { - control_receiver, stream, - on_crash, - db, - update_sender, - supervisor_control, - write_handle, - tasks, disconnecting: false, disconnect_timedout: recv, - pending_iqs, + tasks, + connected, + logic, + supervisor_control, + control_receiver, + on_crash, } } @@ -110,7 +103,7 @@ impl Read { }) }, ReadControl::Abort(sender) => { - let _ = sender.send((self.db, self.update_sender, self.tasks, self.supervisor_control, self.write_handle, self.pending_iqs.clone())); + let _ = sender.send(ReadState { supervisor_control: self.supervisor_control, tasks: self.tasks }); break; }, }; @@ -119,7 +112,7 @@ impl Read { println!("read stanza"); match s { Ok(s) => { - self.tasks.spawn(handle_stanza(s, self.update_sender.clone(), self.db.clone(), self.supervisor_control.clone(), self.write_handle.clone(), self.pending_iqs.clone())); + self.tasks.spawn(self.logic.clone().handle_stanza(s, self.connected.clone(), self.supervisor_control.clone())); }, Err(e) => { println!("error: {:?}", e); @@ -146,8 +139,7 @@ impl Read { if self.disconnecting == true { break; } else { - // AAAAAAAAAAAAAAAAAAAAA i should really just have this stored in the supervisor and not gaf bout passing these references around - let _ = self.on_crash.send((self.db, self.update_sender, self.tasks, self.supervisor_control, self.write_handle, self.pending_iqs.clone())); + let _ = self.on_crash.send(ReadState { supervisor_control: self.supervisor_control, tasks: self.tasks }); } break; }, @@ -158,7 +150,7 @@ impl Read { } println!("stopping read thread"); // when it aborts, must clear iq map no matter what - let mut iqs = self.pending_iqs.lock().await; + let mut iqs = self.logic.pending.lock().await; for (_id, sender) in iqs.drain() { let _ = sender.send(Err(ReadError::LostConnection)); } @@ -170,249 +162,10 @@ impl Read { // - access database // - disconnect proper, reconnect // - respond to server requests -async fn handle_stanza( - stanza: Stanza, - update_sender: mpsc::Sender, - db: Db, - supervisor_control: mpsc::Sender, - write_handle: WriteHandle, - pending_iqs: Arc>>>>, -) { - match stanza { - Stanza::Message(stanza_message) => { - if let Some(mut from) = stanza_message.from { - // TODO: don't ignore delay from. xep says SHOULD send error if incorrect. - let timestamp = stanza_message - .delay - .map(|delay| delay.stamp) - .unwrap_or_else(|| Utc::now()); - // TODO: group chat messages - let mut message = Message { - id: stanza_message - .id - // TODO: proper id storage - .map(|id| Uuid::from_str(&id).unwrap_or_else(|_| Uuid::new_v4())) - .unwrap_or_else(|| Uuid::new_v4()), - from: from.clone(), - timestamp, - body: Body { - // TODO: should this be an option? - body: stanza_message - .body - .map(|body| body.body) - .unwrap_or_default() - .unwrap_or_default(), - }, - }; - // TODO: can this be more efficient? - let result = db - .create_message_with_user_resource_and_chat(message.clone(), from.clone()) - .await; - if let Err(e) = result { - tracing::error!("messagecreate"); - let _ = update_sender - .send(UpdateMessage::Error(Error::MessageRecv( - MessageRecvError::MessageHistory(e.into()), - ))) - .await; - } - message.from = message.from.as_bare(); - from = from.as_bare(); - let _ = update_sender - .send(UpdateMessage::Message { to: from, message }) - .await; - } else { - let _ = update_sender - .send(UpdateMessage::Error(Error::MessageRecv( - MessageRecvError::MissingFrom, - ))) - .await; - } - } - Stanza::Presence(presence) => { - if let Some(from) = presence.from { - match presence.r#type { - Some(r#type) => match r#type { - // error processing a presence from somebody - stanza::client::presence::PresenceType::Error => { - // TODO: is there any other information that should go with the error? also MUST have an error, otherwise it's a different error. maybe it shoulnd't be an option. - let _ = update_sender - .send(UpdateMessage::Error(Error::Presence( - // TODO: ughhhhhhhhhhhhh these stanza errors should probably just have an option, and custom display - PresenceError::StanzaError( - presence - .errors - .first() - .cloned() - .expect("error MUST have error"), - ), - ))) - .await; - } - // should not happen (error to server) - stanza::client::presence::PresenceType::Probe => { - // TODO: should probably write an error and restart stream - let _ = update_sender - .send(UpdateMessage::Error(Error::Presence( - PresenceError::Unsupported, - ))) - .await; - } - stanza::client::presence::PresenceType::Subscribe => { - // may get a subscription request from somebody who is not a contact!!! therefore should be its own kind of event - let _ = update_sender - .send(UpdateMessage::SubscriptionRequest(from)) - .await; - } - stanza::client::presence::PresenceType::Unavailable => { - let offline = Offline { - status: presence.status.map(|status| status.status.0), - }; - let timestamp = presence - .delay - .map(|delay| delay.stamp) - .unwrap_or_else(|| Utc::now()); - let _ = update_sender - .send(UpdateMessage::Presence { - from, - presence: Presence { - timestamp, - presence: PresenceType::Offline(offline), - }, - }) - .await; - } - // for now, do nothing, as these are simply informational. will receive roster push from the server regarding the changes to do with them. - stanza::client::presence::PresenceType::Subscribed => {} - stanza::client::presence::PresenceType::Unsubscribe => {} - stanza::client::presence::PresenceType::Unsubscribed => {} - }, - None => { - let online = Online { - show: presence.show.map(|show| match show { - stanza::client::presence::Show::Away => Show::Away, - stanza::client::presence::Show::Chat => Show::Chat, - stanza::client::presence::Show::Dnd => Show::DoNotDisturb, - stanza::client::presence::Show::Xa => Show::ExtendedAway, - }), - status: presence.status.map(|status| status.status.0), - priority: presence.priority.map(|priority| priority.0), - }; - let timestamp = presence - .delay - .map(|delay| delay.stamp) - .unwrap_or_else(|| Utc::now()); - let _ = update_sender - .send(UpdateMessage::Presence { - from, - presence: Presence { - timestamp, - presence: PresenceType::Online(online), - }, - }) - .await; - } - } - } else { - let _ = update_sender - .send(UpdateMessage::Error(Error::Presence( - PresenceError::MissingFrom, - ))) - .await; - } - } - Stanza::Iq(iq) => match iq.r#type { - stanza::client::iq::IqType::Error | stanza::client::iq::IqType::Result => { - let send; - { - send = pending_iqs.lock().await.remove(&iq.id); - } - if let Some(send) = send { - send.send(Ok(Stanza::Iq(iq))); - } else { - let _ = update_sender - .send(UpdateMessage::Error(Error::Iq(IqError::NoMatchingId( - iq.id, - )))) - .await; - } - } - // TODO: send unsupported to server - // TODO: proper errors i am so tired please - stanza::client::iq::IqType::Get => {} - stanza::client::iq::IqType::Set => { - if let Some(query) = iq.query { - match query { - stanza::client::iq::Query::Roster(mut query) => { - // TODO: there should only be one - if let Some(item) = query.items.pop() { - match item.subscription { - Some(stanza::roster::Subscription::Remove) => { - db.delete_contact(item.jid.clone()).await; - update_sender - .send(UpdateMessage::RosterDelete(item.jid)) - .await; - // TODO: send result - } - _ => { - let contact: Contact = item.into(); - if let Err(e) = db.upsert_contact(contact.clone()).await { - let _ = update_sender - .send(UpdateMessage::Error(Error::Roster( - RosterError::Cache(e.into()), - ))) - .await; - } - let _ = update_sender - .send(UpdateMessage::RosterUpdate(contact)) - .await; - // TODO: send result - // write_handle.write(Stanza::Iq(stanza::client::iq::Iq { - // from: , - // id: todo!(), - // to: todo!(), - // r#type: todo!(), - // lang: todo!(), - // query: todo!(), - // errors: todo!(), - // })); - } - } - } - } - // TODO: send unsupported to server - _ => {} - } - } else { - // TODO: send error (unsupported) to server - } - } - }, - Stanza::Error(error) => { - let _ = update_sender - .send(UpdateMessage::Error(Error::Stream(error))) - .await; - // TODO: reconnect - } - Stanza::OtherContent(content) => { - let _ = update_sender.send(UpdateMessage::Error(Error::UnrecognizedContent(content))); - // TODO: send error to write_thread - } - } -} pub enum ReadControl { Disconnect, - Abort( - oneshot::Sender<( - Db, - mpsc::Sender, - JoinSet<()>, - mpsc::Sender, - WriteHandle, - Arc>>>>, - )>, - ), + Abort(oneshot::Sender), } pub struct ReadControlHandle { @@ -437,32 +190,21 @@ impl DerefMut for ReadControlHandle { impl ReadControlHandle { pub fn new( stream: BoundJabberReader, - on_crash: oneshot::Sender<( - Db, - mpsc::Sender, - JoinSet<()>, - mpsc::Sender, - WriteHandle, - Arc>>>>, - )>, - db: Db, - sender: mpsc::Sender, - supervisor_control: mpsc::Sender, - jabber_write: WriteHandle, - pending_iqs: Arc>>>>, + connected: Connected, + logic: LogicState, + supervisor_control: SupervisorSender, + on_crash: oneshot::Sender, ) -> Self { let (control_sender, control_receiver) = mpsc::channel(20); let actor = Read::new( - control_receiver, stream, - on_crash, - db, - sender, - supervisor_control, - jabber_write, JoinSet::new(), - pending_iqs, + connected, + logic, + supervisor_control, + control_receiver, + on_crash, ); let handle = tokio::spawn(async move { actor.run().await }); @@ -474,33 +216,22 @@ impl ReadControlHandle { pub fn reconnect( stream: BoundJabberReader, - on_crash: oneshot::Sender<( - Db, - mpsc::Sender, - JoinSet<()>, - mpsc::Sender, - WriteHandle, - Arc>>>>, - )>, - db: Db, - sender: mpsc::Sender, - supervisor_control: mpsc::Sender, - jabber_write: WriteHandle, tasks: JoinSet<()>, - pending_iqs: Arc>>>>, + connected: Connected, + logic: LogicState, + supervisor_control: SupervisorSender, + on_crash: oneshot::Sender, ) -> Self { let (control_sender, control_receiver) = mpsc::channel(20); let actor = Read::new( - control_receiver, stream, - on_crash, - db, - sender, - supervisor_control, - jabber_write, tasks, - pending_iqs, + connected, + logic, + supervisor_control, + control_receiver, + on_crash, ); let handle = tokio::spawn(async move { actor.run().await }); diff --git a/luz/src/connection/write.rs b/luz/src/connection/write.rs index 3333d38..ff78b81 100644 --- a/luz/src/connection/write.rs +++ b/luz/src/connection/write.rs @@ -9,12 +9,21 @@ use tokio::{ use crate::error::WriteError; -// actor that receives jabber stanzas to write, and if there is an error, sends a message back to the supervisor then aborts, so the supervisor can spawn a new stream. +/// actor that receives jabber stanzas to write, and if there is an error, sends a message back to the supervisor then aborts, so the supervisor can spawn a new stream. pub struct Write { + stream: BoundJabberWriter, + + /// connection session write queue stanza_receiver: mpsc::Receiver, + + // control stuff control_receiver: mpsc::Receiver, - stream: BoundJabberWriter, - on_crash: oneshot::Sender<(WriteMessage, mpsc::Receiver)>, + on_crash: oneshot::Sender<(WriteMessage, WriteState)>, +} + +/// when a crash/abort occurs, this gets sent back to the supervisor, possibly with the current write that failed, so that the connection session can continue +pub struct WriteState { + pub stanza_recv: mpsc::Receiver, } #[derive(Debug)] @@ -25,21 +34,21 @@ pub struct WriteMessage { pub enum WriteControl { Disconnect, - Abort(oneshot::Sender>), + Abort(oneshot::Sender), } impl Write { fn new( + stream: BoundJabberWriter, stanza_receiver: mpsc::Receiver, control_receiver: mpsc::Receiver, - stream: BoundJabberWriter, - supervisor: oneshot::Sender<(WriteMessage, mpsc::Receiver)>, + on_crash: oneshot::Sender<(WriteMessage, WriteState)>, ) -> Self { Self { + stream, stanza_receiver, control_receiver, - stream, - on_crash: supervisor, + on_crash, } } @@ -55,7 +64,12 @@ impl Write { peanuts::Error::ReadError(_error) => { // make sure message is not lost from error, supervisor handles retry and reporting // TODO: upon reconnect, make sure we are not stuck in a reconnection loop - let _ = self.on_crash.send((retry_msg, self.stanza_receiver)); + let _ = self.on_crash.send(( + retry_msg, + WriteState { + stanza_recv: self.stanza_receiver, + }, + )); return; } _ => { @@ -106,7 +120,7 @@ impl Write { }, // in case of abort, stream is already fucked, just send the receiver ready for a reconnection at the same resource WriteControl::Abort(sender) => { - let _ = sender.send(self.stanza_receiver); + let _ = sender.send(WriteState { stanza_recv: self.stanza_receiver }); break; }, } @@ -117,7 +131,7 @@ impl Write { Err(e) => match &e { peanuts::Error::ReadError(_error) => { // make sure message is not lost from error, supervisor handles retry and reporting - let _ = self.on_crash.send((msg, self.stanza_receiver)); + let _ = self.on_crash.send((msg, WriteState { stanza_recv: self.stanza_receiver })); break; } _ => { @@ -190,12 +204,12 @@ impl DerefMut for WriteControlHandle { impl WriteControlHandle { pub fn new( stream: BoundJabberWriter, - supervisor: oneshot::Sender<(WriteMessage, mpsc::Receiver)>, + on_crash: oneshot::Sender<(WriteMessage, WriteState)>, ) -> (WriteHandle, Self) { let (control_sender, control_receiver) = mpsc::channel(20); let (stanza_sender, stanza_receiver) = mpsc::channel(20); - let actor = Write::new(stanza_receiver, control_receiver, stream, supervisor); + let actor = Write::new(stream, stanza_receiver, control_receiver, on_crash); let handle = tokio::spawn(async move { actor.run().await }); ( @@ -211,13 +225,13 @@ impl WriteControlHandle { pub fn reconnect_retry( stream: BoundJabberWriter, - supervisor: oneshot::Sender<(WriteMessage, mpsc::Receiver)>, - retry_msg: WriteMessage, + on_crash: oneshot::Sender<(WriteMessage, WriteState)>, stanza_receiver: mpsc::Receiver, + retry_msg: WriteMessage, ) -> Self { let (control_sender, control_receiver) = mpsc::channel(20); - let actor = Write::new(stanza_receiver, control_receiver, stream, supervisor); + let actor = Write::new(stream, stanza_receiver, control_receiver, on_crash); let handle = tokio::spawn(async move { actor.run_reconnected(retry_msg).await }); Self { @@ -228,12 +242,12 @@ impl WriteControlHandle { pub fn reconnect( stream: BoundJabberWriter, - supervisor: oneshot::Sender<(WriteMessage, mpsc::Receiver)>, + on_crash: oneshot::Sender<(WriteMessage, WriteState)>, stanza_receiver: mpsc::Receiver, ) -> Self { let (control_sender, control_receiver) = mpsc::channel(20); - let actor = Write::new(stanza_receiver, control_receiver, stream, supervisor); + let actor = Write::new(stream, stanza_receiver, control_receiver, on_crash); let handle = tokio::spawn(async move { actor.run().await }); Self { diff --git a/luz/src/lib.rs b/luz/src/lib.rs index 99c96e0..3498ff1 100644 --- a/luz/src/lib.rs +++ b/luz/src/lib.rs @@ -1,6 +1,7 @@ use std::{ collections::HashMap, ops::{Deref, DerefMut}, + str::FromStr, sync::Arc, time::Duration, }; @@ -10,12 +11,12 @@ use chrono::Utc; use connection::{write::WriteMessage, SupervisorSender}; use db::Db; use error::{ - ActorError, CommandError, ConnectionError, DatabaseError, ReadError, RosterError, StatusError, - WriteError, + ActorError, CommandError, ConnectionError, DatabaseError, IqError, MessageRecvError, + PresenceError, ReadError, RosterError, StatusError, WriteError, }; use futures::{future::Fuse, FutureExt}; use jabber::JID; -use presence::{Offline, Online, Presence, PresenceType}; +use presence::{Offline, Online, Presence, PresenceType, Show}; use roster::{Contact, ContactUpdate}; use sqlx::SqlitePool; use stanza::client::{ @@ -43,395 +44,766 @@ pub mod presence; pub mod roster; pub mod user; -pub struct Luz { - command_sender: mpsc::Sender, - receiver: mpsc::Receiver, - jid: Arc>, - // TODO: use a dyn passwordprovider trait to avoid storing password in memory - password: Arc, - connected: Arc>>, - pending_iqs: Arc>>>>, - db: Db, - sender: mpsc::Sender, - /// if connection was shut down due to e.g. server shutdown, supervisor must be able to mark client as disconnected - connection_supervisor_shutdown: Fuse>, - // TODO: will need to have an auto reconnect state as well (e.g. in case server shut down, to try and reconnect later) - // TODO: will grow forever at this point, maybe not required as tasks will naturally shut down anyway? - tasks: JoinSet<()>, +pub enum Command { + /// get the roster. if offline, retreive cached version from database. should be stored in application memory + GetRoster(oneshot::Sender, RosterError>>), + /// get all chats. chat will include 10 messages in their message Vec (enough for chat previews) + // TODO: paging and filtering + GetChats(oneshot::Sender, DatabaseError>>), + // TODO: paging and filtering + GetChatsOrdered(oneshot::Sender, DatabaseError>>), + // TODO: paging and filtering + GetChatsOrderedWithLatestMessages(oneshot::Sender, DatabaseError>>), + /// get a specific chat by jid + GetChat(JID, oneshot::Sender>), + /// get message history for chat (does appropriate mam things) + // TODO: paging and filtering + GetMessages(JID, oneshot::Sender, DatabaseError>>), + /// delete a chat from your chat history, along with all the corresponding messages + DeleteChat(JID, oneshot::Sender>), + /// delete a message from your chat history + DeleteMessage(Uuid, oneshot::Sender>), + /// get a user from your users database + GetUser(JID, oneshot::Sender>), + /// add a contact to your roster, with a status of none, no subscriptions. + AddContact(JID, oneshot::Sender>), + /// send a friend request i.e. a subscription request with a subscription pre-approval. if not already added to roster server adds to roster. + BuddyRequest(JID, oneshot::Sender>), + /// send a subscription request, without pre-approval. if not already added to roster server adds to roster. + SubscriptionRequest(JID, oneshot::Sender>), + /// accept a friend request by accepting a pending subscription and sending a subscription request back. if not already added to roster adds to roster. + AcceptBuddyRequest(JID, oneshot::Sender>), + /// accept a pending subscription and doesn't send a subscription request back. if not already added to roster adds to roster. + AcceptSubscriptionRequest(JID, oneshot::Sender>), + /// unsubscribe to a contact, but don't remove their subscription. + UnsubscribeFromContact(JID, oneshot::Sender>), + /// stop a contact from being subscribed, but stay subscribed to the contact. + UnsubscribeContact(JID, oneshot::Sender>), + /// remove subscriptions to and from contact, but keep in roster. + UnfriendContact(JID, oneshot::Sender>), + /// remove a contact from the contact list. will remove subscriptions if not already done then delete contact from roster. + DeleteContact(JID, oneshot::Sender>), + /// update contact. contact details will be overwritten with the contents of the contactupdate struct. + UpdateContact(JID, ContactUpdate, oneshot::Sender>), + /// set online status. if disconnected, will be cached so when client connects, will be sent as the initial presence. + SetStatus(Online, oneshot::Sender>), + /// send presence stanza + // TODO: cache presence stanza + SendPresence( + Option, + PresenceType, + oneshot::Sender>, + ), + /// send a directed presence (usually to a non-contact). + // TODO: should probably make it so people can add non-contact auto presence sharing in the client (most likely through setting an internal setting) + /// send a message to a jid (any kind of jid that can receive a message, e.g. a user or a + /// chatroom). if disconnected, will be cached so when client connects, message will be sent. + SendMessage(JID, Body, oneshot::Sender>), } -impl Luz { - fn new( - command_sender: mpsc::Sender, - receiver: mpsc::Receiver, - jid: Arc>, - password: String, - connected: Arc>>, - connection_supervisor_shutdown: Fuse>, - db: Db, - sender: mpsc::Sender, - ) -> Self { - Self { - jid, - password: Arc::new(password), - connected, - db, - receiver, - sender, - tasks: JoinSet::new(), - connection_supervisor_shutdown, - pending_iqs: Arc::new(Mutex::new(HashMap::new())), - command_sender, - } +#[derive(Debug)] +pub struct Client { + sender: mpsc::Sender, + timeout: Duration, +} + +impl Clone for Client { + fn clone(&self) -> Self { + Self { + sender: self.sender.clone(), + timeout: self.timeout, + } + } +} + +impl Deref for Client { + type Target = mpsc::Sender; + + fn deref(&self) -> &Self::Target { + &self.sender + } +} + +impl DerefMut for Client { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.sender + } +} + +impl Client { + pub async fn connect(&self) -> Result<(), ActorError> { + self.send(LuzMessage::Connect).await?; + Ok(()) + } + + pub async fn disconnect(&self, offline: Offline) -> Result<(), ActorError> { + self.send(LuzMessage::Disconnect).await?; + Ok(()) + } + + pub fn new(jid: JID, password: String, db: Db) -> (Self, mpsc::Receiver) { + let (command_sender, command_receiver) = mpsc::channel(20); + let (update_send, update_recv) = mpsc::channel(20); + + // might be bad, first supervisor shutdown notification oneshot is never used (disgusting) + let (_sup_send, sup_recv) = oneshot::channel(); + let sup_recv = sup_recv.fuse(); + + let logic = LogicState { + db, + pending: Arc::new(Mutex::new(HashMap::new())), + update_sender: update_send, + }; + + let actor = Luz::new(jid, password, command_receiver, None, sup_recv, logic); + tokio::spawn(async move { actor.run().await }); + + ( + Self { + sender: command_sender, + // TODO: configure timeout + timeout: Duration::from_secs(10), + }, + update_recv, + ) + } + + pub async fn get_roster(&self) -> Result, CommandError> { + let (send, recv) = oneshot::channel(); + self.send(LuzMessage::Command(Command::GetRoster(send))) + .await + .map_err(|e| CommandError::Actor(Into::::into(e)))?; + let roster = timeout(self.timeout, recv) + .await + .map_err(|e| CommandError::Actor(Into::::into(e)))? + .map_err(|e| CommandError::Actor(Into::::into(e)))??; + Ok(roster) + } + + pub async fn get_chats(&self) -> Result, CommandError> { + let (send, recv) = oneshot::channel(); + self.send(LuzMessage::Command(Command::GetChats(send))) + .await + .map_err(|e| CommandError::Actor(Into::::into(e)))?; + let chats = timeout(self.timeout, recv) + .await + .map_err(|e| CommandError::Actor(Into::::into(e)))? + .map_err(|e| CommandError::Actor(Into::::into(e)))??; + Ok(chats) + } + + pub async fn get_chats_ordered(&self) -> Result, CommandError> { + let (send, recv) = oneshot::channel(); + self.send(LuzMessage::Command(Command::GetChatsOrdered(send))) + .await + .map_err(|e| CommandError::Actor(Into::::into(e)))?; + let chats = timeout(self.timeout, recv) + .await + .map_err(|e| CommandError::Actor(Into::::into(e)))? + .map_err(|e| CommandError::Actor(Into::::into(e)))??; + Ok(chats) + } + + pub async fn get_chats_ordered_with_latest_messages( + &self, + ) -> Result, CommandError> { + let (send, recv) = oneshot::channel(); + self.send(LuzMessage::Command( + Command::GetChatsOrderedWithLatestMessages(send), + )) + .await + .map_err(|e| CommandError::Actor(Into::::into(e)))?; + let chats = timeout(self.timeout, recv) + .await + .map_err(|e| CommandError::Actor(Into::::into(e)))? + .map_err(|e| CommandError::Actor(Into::::into(e)))??; + Ok(chats) + } + + pub async fn get_chat(&self, jid: JID) -> Result> { + let (send, recv) = oneshot::channel(); + self.send(LuzMessage::Command(Command::GetChat(jid, send))) + .await + .map_err(|e| CommandError::Actor(Into::::into(e)))?; + let chat = timeout(self.timeout, recv) + .await + .map_err(|e| CommandError::Actor(Into::::into(e)))? + .map_err(|e| CommandError::Actor(Into::::into(e)))??; + Ok(chat) + } + + pub async fn get_messages( + &self, + jid: JID, + ) -> Result, CommandError> { + let (send, recv) = oneshot::channel(); + self.send(LuzMessage::Command(Command::GetMessages(jid, send))) + .await + .map_err(|e| CommandError::Actor(Into::::into(e)))?; + let messages = timeout(self.timeout, recv) + .await + .map_err(|e| CommandError::Actor(Into::::into(e)))? + .map_err(|e| CommandError::Actor(Into::::into(e)))??; + Ok(messages) + } + + pub async fn delete_chat(&self, jid: JID) -> Result<(), CommandError> { + let (send, recv) = oneshot::channel(); + self.send(LuzMessage::Command(Command::DeleteChat(jid, send))) + .await + .map_err(|e| CommandError::Actor(Into::::into(e)))?; + let result = timeout(self.timeout, recv) + .await + .map_err(|e| CommandError::Actor(Into::::into(e)))? + .map_err(|e| CommandError::Actor(Into::::into(e)))??; + Ok(result) + } + + pub async fn delete_message(&self, id: Uuid) -> Result<(), CommandError> { + let (send, recv) = oneshot::channel(); + self.send(LuzMessage::Command(Command::DeleteMessage(id, send))) + .await + .map_err(|e| CommandError::Actor(Into::::into(e)))?; + let result = timeout(self.timeout, recv) + .await + .map_err(|e| CommandError::Actor(Into::::into(e)))? + .map_err(|e| CommandError::Actor(Into::::into(e)))??; + Ok(result) + } + + pub async fn get_user(&self, jid: JID) -> Result> { + let (send, recv) = oneshot::channel(); + self.send(LuzMessage::Command(Command::GetUser(jid, send))) + .await + .map_err(|e| CommandError::Actor(Into::::into(e)))?; + let result = timeout(self.timeout, recv) + .await + .map_err(|e| CommandError::Actor(Into::::into(e)))? + .map_err(|e| CommandError::Actor(Into::::into(e)))??; + Ok(result) + } + + pub async fn add_contact(&self, jid: JID) -> Result<(), CommandError> { + let (send, recv) = oneshot::channel(); + self.send(LuzMessage::Command(Command::AddContact(jid, send))) + .await + .map_err(|e| CommandError::Actor(Into::::into(e)))?; + let result = timeout(self.timeout, recv) + .await + .map_err(|e| CommandError::Actor(Into::::into(e)))? + .map_err(|e| CommandError::Actor(Into::::into(e)))??; + Ok(result) + } + + pub async fn buddy_request(&self, jid: JID) -> Result<(), CommandError> { + let (send, recv) = oneshot::channel(); + self.send(LuzMessage::Command(Command::BuddyRequest(jid, send))) + .await + .map_err(|e| CommandError::Actor(Into::::into(e)))?; + let result = timeout(self.timeout, recv) + .await + .map_err(|e| CommandError::Actor(Into::::into(e)))? + .map_err(|e| CommandError::Actor(Into::::into(e)))??; + Ok(result) + } + + pub async fn subscription_request(&self, jid: JID) -> Result<(), CommandError> { + let (send, recv) = oneshot::channel(); + self.send(LuzMessage::Command(Command::SubscriptionRequest(jid, send))) + .await + .map_err(|e| CommandError::Actor(Into::::into(e)))?; + let result = timeout(self.timeout, recv) + .await + .map_err(|e| CommandError::Actor(Into::::into(e)))? + .map_err(|e| CommandError::Actor(Into::::into(e)))??; + Ok(result) + } + + pub async fn accept_buddy_request(&self, jid: JID) -> Result<(), CommandError> { + let (send, recv) = oneshot::channel(); + self.send(LuzMessage::Command(Command::AcceptBuddyRequest(jid, send))) + .await + .map_err(|e| CommandError::Actor(Into::::into(e)))?; + let result = timeout(self.timeout, recv) + .await + .map_err(|e| CommandError::Actor(Into::::into(e)))? + .map_err(|e| CommandError::Actor(Into::::into(e)))??; + Ok(result) + } + + pub async fn accept_subscription_request( + &self, + jid: JID, + ) -> Result<(), CommandError> { + let (send, recv) = oneshot::channel(); + self.send(LuzMessage::Command(Command::AcceptSubscriptionRequest( + jid, send, + ))) + .await + .map_err(|e| CommandError::Actor(Into::::into(e)))?; + let result = timeout(self.timeout, recv) + .await + .map_err(|e| CommandError::Actor(Into::::into(e)))? + .map_err(|e| CommandError::Actor(Into::::into(e)))??; + Ok(result) + } + + pub async fn unsubscribe_from_contact(&self, jid: JID) -> Result<(), CommandError> { + let (send, recv) = oneshot::channel(); + self.send(LuzMessage::Command(Command::UnsubscribeFromContact( + jid, send, + ))) + .await + .map_err(|e| CommandError::Actor(Into::::into(e)))?; + let result = timeout(self.timeout, recv) + .await + .map_err(|e| CommandError::Actor(Into::::into(e)))? + .map_err(|e| CommandError::Actor(Into::::into(e)))??; + Ok(result) } - async fn run(mut self) { - loop { - let msg = tokio::select! { - // this is okay, as when created the supervisor (and connection) doesn't exist, but a bit messy - // THIS IS NOT OKAY LOLLLL - apparently fusing is the best option??? - _ = &mut self.connection_supervisor_shutdown => { - *self.connected.lock().await = None; - continue; - } - Some(msg) = self.receiver.recv() => { - msg - }, - else => break, - }; - // TODO: consider separating disconnect/connect and commands apart from commandmessage - // TODO: dispatch commands separate tasks - match msg { - CommandMessage::Connect => { - let mut connection_lock = self.connected.lock().await; - match connection_lock.as_ref() { - Some(_) => { - self.sender - .send(UpdateMessage::Error(Error::AlreadyConnected)) + pub async fn unsubscribe_contact(&self, jid: JID) -> Result<(), CommandError> { + let (send, recv) = oneshot::channel(); + self.send(LuzMessage::Command(Command::UnsubscribeContact(jid, send))) + .await + .map_err(|e| CommandError::Actor(Into::::into(e)))?; + let result = timeout(self.timeout, recv) + .await + .map_err(|e| CommandError::Actor(Into::::into(e)))? + .map_err(|e| CommandError::Actor(Into::::into(e)))??; + Ok(result) + } + + pub async fn unfriend_contact(&self, jid: JID) -> Result<(), CommandError> { + let (send, recv) = oneshot::channel(); + self.send(LuzMessage::Command(Command::UnfriendContact(jid, send))) + .await + .map_err(|e| CommandError::Actor(Into::::into(e)))?; + let result = timeout(self.timeout, recv) + .await + .map_err(|e| CommandError::Actor(Into::::into(e)))? + .map_err(|e| CommandError::Actor(Into::::into(e)))??; + Ok(result) + } + + pub async fn delete_contact(&self, jid: JID) -> Result<(), CommandError> { + let (send, recv) = oneshot::channel(); + self.send(LuzMessage::Command(Command::DeleteContact(jid, send))) + .await + .map_err(|e| CommandError::Actor(Into::::into(e)))?; + let result = timeout(self.timeout, recv) + .await + .map_err(|e| CommandError::Actor(Into::::into(e)))? + .map_err(|e| CommandError::Actor(Into::::into(e)))??; + Ok(result) + } + + pub async fn update_contact( + &self, + jid: JID, + update: ContactUpdate, + ) -> Result<(), CommandError> { + let (send, recv) = oneshot::channel(); + self.send(LuzMessage::Command(Command::UpdateContact( + jid, update, send, + ))) + .await + .map_err(|e| CommandError::Actor(Into::::into(e)))?; + let result = timeout(self.timeout, recv) + .await + .map_err(|e| CommandError::Actor(Into::::into(e)))? + .map_err(|e| CommandError::Actor(Into::::into(e)))??; + Ok(result) + } + + pub async fn set_status(&self, online: Online) -> Result<(), CommandError> { + let (send, recv) = oneshot::channel(); + self.send(LuzMessage::Command(Command::SetStatus(online, send))) + .await + .map_err(|e| CommandError::Actor(Into::::into(e)))?; + let result = timeout(self.timeout, recv) + .await + .map_err(|e| CommandError::Actor(Into::::into(e)))? + .map_err(|e| CommandError::Actor(Into::::into(e)))??; + Ok(result) + } + + pub async fn send_message(&self, jid: JID, body: Body) -> Result<(), CommandError> { + let (send, recv) = oneshot::channel(); + self.send(LuzMessage::Command(Command::SendMessage(jid, body, send))) + .await + .map_err(|e| CommandError::Actor(Into::::into(e)))?; + let result = timeout(self.timeout, recv) + .await + .map_err(|e| CommandError::Actor(Into::::into(e)))? + .map_err(|e| CommandError::Actor(Into::::into(e)))??; + Ok(result) + } +} + +#[derive(Clone)] +pub struct LogicState { + db: Db, + pending: Arc>>>>, + update_sender: mpsc::Sender, +} + +impl LogicState { + pub async fn handle_connect(self, connection: Connected) { + let (send, recv) = oneshot::channel(); + debug!("getting roster"); + self.clone() + .handle_online(Command::GetRoster(send), connection.clone()) + .await; + debug!("sent roster req"); + let roster = recv.await; + debug!("got roster"); + match roster { + Ok(r) => match r { + Ok(roster) => { + let online = self.db.read_cached_status().await; + let online = match online { + Ok(online) => online, + Err(e) => { + let _ = self + .update_sender + .send(UpdateMessage::Error(Error::Connecting( + ConnectionError::StatusCacheError(e.into()), + ))) .await; + Online::default() } - None => { - let streams_result; - { - let mut jid = self.jid.lock().await; - let mut domain = jid.domainpart.clone(); - // TODO: check what happens upon reconnection with same resource (this is probably what one wants to do and why jid should be mutated from a bare jid to one with a resource) - streams_result = jabber::connect_and_login( - &mut jid, - &*self.password, - &mut domain, - ) - .await; - debug!("connected and logged in as {}", jid); - debug!("test"); + }; + let (send, recv) = oneshot::channel(); + self.clone() + .handle_online( + Command::SendPresence(None, PresenceType::Online(online.clone()), send), + connection, + ) + .await; + let set_status = recv.await; + match set_status { + Ok(s) => match s { + Ok(()) => { + let _ = self + .update_sender + .send(UpdateMessage::Online(online, roster)) + .await; } - match streams_result { - Ok(s) => { - debug!("ok stream result"); - let (shutdown_send, shutdown_recv) = oneshot::channel::<()>(); - let (writer, supervisor) = SupervisorHandle::new( - s, - self.sender.clone(), - self.db.clone(), - shutdown_send, - self.jid.clone(), - self.password.clone(), - self.pending_iqs.clone(), - ); - let shutdown_recv = shutdown_recv.fuse(); - self.connection_supervisor_shutdown = shutdown_recv; - // TODO: get roster and send initial presence - let (send, recv) = oneshot::channel(); - debug!("getting roster"); - CommandMessage::GetRoster(send) - .handle_online( - writer.clone(), - supervisor.sender(), - self.jid.clone(), - self.db.clone(), - self.sender.clone(), - self.pending_iqs.clone(), - ) - .await; - debug!("sent roster req"); - let roster = recv.await; - debug!("got roster"); - match roster { - Ok(r) => { - match r { - Ok(roster) => { - let online = self.db.read_cached_status().await; - let online = match online { - Ok(online) => online, - Err(e) => { - let _ = self - .sender - .send(UpdateMessage::Error( - Error::Connecting( - ConnectionError::StatusCacheError( - e.into(), - ), - ), - )) - .await; - Online::default() - } - }; - let (send, recv) = oneshot::channel(); - CommandMessage::SendPresence( - None, - PresenceType::Online(online.clone()), - send, - ) - .handle_online( - writer.clone(), - supervisor.sender(), - self.jid.clone(), - self.db.clone(), - self.sender.clone(), - self.pending_iqs.clone(), - ) - .await; - let set_status = recv.await; - match set_status { - Ok(s) => match s { - Ok(()) => { - *connection_lock = - Some((writer, supervisor)); - let _ = self - .sender - .send(UpdateMessage::Online( - online, roster, - )) - .await; - continue; - } - Err(e) => { - let _ = self - .sender - .send(UpdateMessage::Error( - Error::Connecting(e.into()), - )) - .await; - } - }, - Err(e) => { - let _ = self.sender.send(UpdateMessage::Error(Error::Connecting(ConnectionError::SendPresence(WriteError::Actor(e.into()))))).await; - } - } - } - Err(e) => { - let _ = self - .sender - .send(UpdateMessage::Error( - Error::Connecting(e.into()), - )) - .await; - } - } - } - Err(e) => { - let _ = self - .sender - .send(UpdateMessage::Error(Error::Connecting( - ConnectionError::RosterRetreival( - RosterError::Write(WriteError::Actor( - e.into(), - )), - ), - ))) - .await; - } - } - } - Err(e) => { - tracing::error!("error: {}", e); - let _ = - self.sender.send(UpdateMessage::Error(Error::Connecting( - ConnectionError::ConnectionFailed(e.into()), - ))); - } + Err(e) => { + let _ = self + .update_sender + .send(UpdateMessage::Error(Error::Connecting(e.into()))) + .await; } - } - }; - } - CommandMessage::Disconnect(offline) => { - match self.connected.lock().await.as_mut() { - None => { + }, + Err(e) => { let _ = self - .sender - .send(UpdateMessage::Error(Error::AlreadyDisconnected)) + .update_sender + .send(UpdateMessage::Error(Error::Connecting( + ConnectionError::SendPresence(WriteError::Actor(e.into())), + ))) .await; } - mut c => { - if let Some((write_handle, supervisor_handle)) = c.take() { - let offline_presence: stanza::client::presence::Presence = - offline.clone().into_stanza(None); - let stanza = Stanza::Presence(offline_presence); - // TODO: timeout and error check - write_handle.write(stanza).await; - let _ = supervisor_handle.send(SupervisorCommand::Disconnect).await; - let _ = self.sender.send(UpdateMessage::Offline(offline)).await; - } else { - unreachable!() - }; - } } } - _ => { - match self.connected.lock().await.as_ref() { - Some((w, s)) => self.tasks.spawn(msg.handle_online( - w.clone(), - s.sender(), - self.jid.clone(), - self.db.clone(), - self.sender.clone(), - self.pending_iqs.clone(), - )), - None => self.tasks.spawn(msg.handle_offline( - self.jid.clone(), - self.db.clone(), - self.sender.clone(), - )), - }; + Err(e) => { + let _ = self + .update_sender + .send(UpdateMessage::Error(Error::Connecting(e.into()))) + .await; } + }, + Err(e) => { + let _ = self + .update_sender + .send(UpdateMessage::Error(Error::Connecting( + ConnectionError::RosterRetreival(RosterError::Write(WriteError::Actor( + e.into(), + ))), + ))) + .await; } } } -} -impl CommandMessage { - pub async fn handle_offline( + pub async fn handle_disconnect(self, connection: Connected) { + // TODO: be able to set offline status message + let offline_presence: stanza::client::presence::Presence = + Offline::default().into_stanza(None); + let stanza = Stanza::Presence(offline_presence); + // TODO: timeout and error check + connection.write_handle.write(stanza).await; + let _ = self + .update_sender + .send(UpdateMessage::Offline(Offline::default())) + .await; + } + + /// stanza errors (recoverable) + pub async fn handle_error(self, error: Error) {} + + pub async fn handle_stanza( self, - jid: Arc>, - db: Db, - update_sender: mpsc::Sender, + stanza: Stanza, + connection: Connected, + supervisor: SupervisorSender, ) { - match self { - CommandMessage::Connect => unreachable!(), - CommandMessage::Disconnect(offline) => unreachable!(), - CommandMessage::GetRoster(sender) => { - let roster = db.read_cached_roster().await; - match roster { - Ok(roster) => { - let _ = sender.send(Ok(roster)); - } - Err(e) => { - let _ = sender.send(Err(RosterError::Cache(e.into()))); + match stanza { + Stanza::Message(stanza_message) => { + if let Some(mut from) = stanza_message.from { + // TODO: don't ignore delay from. xep says SHOULD send error if incorrect. + let timestamp = stanza_message + .delay + .map(|delay| delay.stamp) + .unwrap_or_else(|| Utc::now()); + // TODO: group chat messages + let mut message = Message { + id: stanza_message + .id + // TODO: proper id storage + .map(|id| Uuid::from_str(&id).unwrap_or_else(|_| Uuid::new_v4())) + .unwrap_or_else(|| Uuid::new_v4()), + from: from.clone(), + timestamp, + body: Body { + // TODO: should this be an option? + body: stanza_message + .body + .map(|body| body.body) + .unwrap_or_default() + .unwrap_or_default(), + }, + }; + // TODO: can this be more efficient? + let result = self + .db + .create_message_with_user_resource_and_chat(message.clone(), from.clone()) + .await; + if let Err(e) = result { + tracing::error!("messagecreate"); + let _ = self + .update_sender + .send(UpdateMessage::Error(Error::MessageRecv( + MessageRecvError::MessageHistory(e.into()), + ))) + .await; } + message.from = message.from.as_bare(); + from = from.as_bare(); + let _ = self + .update_sender + .send(UpdateMessage::Message { to: from, message }) + .await; + } else { + let _ = self + .update_sender + .send(UpdateMessage::Error(Error::MessageRecv( + MessageRecvError::MissingFrom, + ))) + .await; } } - CommandMessage::GetChats(sender) => { - let chats = db.read_chats().await.map_err(|e| e.into()); - sender.send(chats); - } - CommandMessage::GetChatsOrdered(sender) => { - let chats = db.read_chats_ordered().await.map_err(|e| e.into()); - sender.send(chats); - } - CommandMessage::GetChatsOrderedWithLatestMessages(sender) => { - let chats = db - .read_chats_ordered_with_latest_messages() - .await - .map_err(|e| e.into()); - sender.send(chats); - } - CommandMessage::GetChat(jid, sender) => { - let chats = db.read_chat(jid).await.map_err(|e| e.into()); - sender.send(chats); - } - CommandMessage::GetMessages(jid, sender) => { - let messages = db.read_message_history(jid).await.map_err(|e| e.into()); - sender.send(messages); - } - CommandMessage::DeleteChat(jid, sender) => { - let result = db.delete_chat(jid).await.map_err(|e| e.into()); - sender.send(result); - } - CommandMessage::DeleteMessage(uuid, sender) => { - let result = db.delete_message(uuid).await.map_err(|e| e.into()); - sender.send(result); - } - CommandMessage::GetUser(jid, sender) => { - let user = db.read_user(jid).await.map_err(|e| e.into()); - sender.send(user); - } - // TODO: offline queue to modify roster - CommandMessage::AddContact(jid, sender) => { - sender.send(Err(RosterError::Write(WriteError::Disconnected))); - } - CommandMessage::BuddyRequest(jid, sender) => { - sender.send(Err(WriteError::Disconnected)); - } - CommandMessage::SubscriptionRequest(jid, sender) => { - sender.send(Err(WriteError::Disconnected)); - } - CommandMessage::AcceptBuddyRequest(jid, sender) => { - sender.send(Err(WriteError::Disconnected)); - } - CommandMessage::AcceptSubscriptionRequest(jid, sender) => { - sender.send(Err(WriteError::Disconnected)); - } - CommandMessage::UnsubscribeFromContact(jid, sender) => { - sender.send(Err(WriteError::Disconnected)); - } - CommandMessage::UnsubscribeContact(jid, sender) => { - sender.send(Err(WriteError::Disconnected)); - } - CommandMessage::UnfriendContact(jid, sender) => { - sender.send(Err(WriteError::Disconnected)); - } - CommandMessage::DeleteContact(jid, sender) => { - sender.send(Err(RosterError::Write(WriteError::Disconnected))); - } - CommandMessage::UpdateContact(jid, contact_update, sender) => { - sender.send(Err(RosterError::Write(WriteError::Disconnected))); - } - CommandMessage::SetStatus(online, sender) => { - let result = db - .upsert_cached_status(online) - .await - .map_err(|e| StatusError::Cache(e.into())); - sender.send(result); + Stanza::Presence(presence) => { + if let Some(from) = presence.from { + match presence.r#type { + Some(r#type) => match r#type { + // error processing a presence from somebody + stanza::client::presence::PresenceType::Error => { + // TODO: is there any other information that should go with the error? also MUST have an error, otherwise it's a different error. maybe it shoulnd't be an option. + let _ = self + .update_sender + .send(UpdateMessage::Error(Error::Presence( + // TODO: ughhhhhhhhhhhhh these stanza errors should probably just have an option, and custom display + PresenceError::StanzaError( + presence + .errors + .first() + .cloned() + .expect("error MUST have error"), + ), + ))) + .await; + } + // should not happen (error to server) + stanza::client::presence::PresenceType::Probe => { + // TODO: should probably write an error and restart stream + let _ = self + .update_sender + .send(UpdateMessage::Error(Error::Presence( + PresenceError::Unsupported, + ))) + .await; + } + stanza::client::presence::PresenceType::Subscribe => { + // may get a subscription request from somebody who is not a contact!!! therefore should be its own kind of event + let _ = self + .update_sender + .send(UpdateMessage::SubscriptionRequest(from)) + .await; + } + stanza::client::presence::PresenceType::Unavailable => { + let offline = Offline { + status: presence.status.map(|status| status.status.0), + }; + let timestamp = presence + .delay + .map(|delay| delay.stamp) + .unwrap_or_else(|| Utc::now()); + let _ = self + .update_sender + .send(UpdateMessage::Presence { + from, + presence: Presence { + timestamp, + presence: PresenceType::Offline(offline), + }, + }) + .await; + } + // for now, do nothing, as these are simply informational. will receive roster push from the server regarding the changes to do with them. + stanza::client::presence::PresenceType::Subscribed => {} + stanza::client::presence::PresenceType::Unsubscribe => {} + stanza::client::presence::PresenceType::Unsubscribed => {} + }, + None => { + let online = Online { + show: presence.show.map(|show| match show { + stanza::client::presence::Show::Away => Show::Away, + stanza::client::presence::Show::Chat => Show::Chat, + stanza::client::presence::Show::Dnd => Show::DoNotDisturb, + stanza::client::presence::Show::Xa => Show::ExtendedAway, + }), + status: presence.status.map(|status| status.status.0), + priority: presence.priority.map(|priority| priority.0), + }; + let timestamp = presence + .delay + .map(|delay| delay.stamp) + .unwrap_or_else(|| Utc::now()); + let _ = self + .update_sender + .send(UpdateMessage::Presence { + from, + presence: Presence { + timestamp, + presence: PresenceType::Online(online), + }, + }) + .await; + } + } + } else { + let _ = self + .update_sender + .send(UpdateMessage::Error(Error::Presence( + PresenceError::MissingFrom, + ))) + .await; + } } - // TODO: offline message queue - CommandMessage::SendMessage(jid, body, sender) => { - sender.send(Err(WriteError::Disconnected)); + Stanza::Iq(iq) => match iq.r#type { + stanza::client::iq::IqType::Error | stanza::client::iq::IqType::Result => { + let send; + { + send = self.pending.lock().await.remove(&iq.id); + } + if let Some(send) = send { + send.send(Ok(Stanza::Iq(iq))); + } else { + let _ = self + .update_sender + .send(UpdateMessage::Error(Error::Iq(IqError::NoMatchingId( + iq.id, + )))) + .await; + } + } + // TODO: send unsupported to server + // TODO: proper errors i am so tired please + stanza::client::iq::IqType::Get => {} + stanza::client::iq::IqType::Set => { + if let Some(query) = iq.query { + match query { + stanza::client::iq::Query::Roster(mut query) => { + // TODO: there should only be one + if let Some(item) = query.items.pop() { + match item.subscription { + Some(stanza::roster::Subscription::Remove) => { + self.db.delete_contact(item.jid.clone()).await; + self.update_sender + .send(UpdateMessage::RosterDelete(item.jid)) + .await; + // TODO: send result + } + _ => { + let contact: Contact = item.into(); + if let Err(e) = + self.db.upsert_contact(contact.clone()).await + { + let _ = self + .update_sender + .send(UpdateMessage::Error(Error::Roster( + RosterError::Cache(e.into()), + ))) + .await; + } + let _ = self + .update_sender + .send(UpdateMessage::RosterUpdate(contact)) + .await; + // TODO: send result + // write_handle.write(Stanza::Iq(stanza::client::iq::Iq { + // from: , + // id: todo!(), + // to: todo!(), + // r#type: todo!(), + // lang: todo!(), + // query: todo!(), + // errors: todo!(), + // })); + } + } + } + } + // TODO: send unsupported to server + _ => {} + } + } else { + // TODO: send error (unsupported) to server + } + } + }, + Stanza::Error(error) => { + let _ = self + .update_sender + .send(UpdateMessage::Error(Error::Stream(error))) + .await; + // TODO: reconnect } - CommandMessage::SendPresence(jid, presence, sender) => { - sender.send(Err(WriteError::Disconnected)); + Stanza::OtherContent(content) => { + let _ = self + .update_sender + .send(UpdateMessage::Error(Error::UnrecognizedContent(content))); + // TODO: send error to write_thread } } } - pub async fn handle_online( - mut self, - write_handle: WriteHandle, - supervisor_control: SupervisorSender, - // TODO: jid could lose resource by the end - client_jid: Arc>, - db: Db, - update_sender: mpsc::Sender, - pending_iqs: Arc>>>>, - ) { - match self { - CommandMessage::Connect => unreachable!(), - CommandMessage::Disconnect(_) => unreachable!(), - CommandMessage::GetRoster(result_sender) => { + // pub async fn handle_stream_error(self, error) {} + pub async fn handle_online(self, command: Command, connection: Connected) { + match command { + Command::GetRoster(result_sender) => { // TODO: jid resource should probably be stored within the connection - let owned_jid: JID; debug!("before client_jid lock"); - { - owned_jid = client_jid.lock().await.clone(); - } debug!("after client_jid lock"); let iq_id = Uuid::new_v4().to_string(); let (send, iq_recv) = oneshot::channel(); { - pending_iqs.lock().await.insert(iq_id.clone(), send); + self.pending.lock().await.insert(iq_id.clone(), send); } let stanza = Stanza::Iq(Iq { - from: Some(owned_jid), + from: Some(connection.jid), id: iq_id.to_string(), to: None, r#type: IqType::Get, @@ -443,7 +815,8 @@ impl CommandMessage { errors: Vec::new(), }); let (send, recv) = oneshot::channel(); - let _ = write_handle + let _ = connection + .write_handle .send(WriteMessage { stanza, respond_to: send, @@ -477,8 +850,8 @@ impl CommandMessage { }) if id == iq_id && r#type == IqType::Result => { let contacts: Vec = items.into_iter().map(|item| item.into()).collect(); - if let Err(e) = db.replace_cached_roster(contacts.clone()).await { - update_sender + if let Err(e) = self.db.replace_cached_roster(contacts.clone()).await { + self.update_sender .send(UpdateMessage::Error(Error::Roster(RosterError::Cache( e.into(), )))) @@ -518,50 +891,51 @@ impl CommandMessage { } } } - CommandMessage::GetChats(sender) => { - let chats = db.read_chats().await.map_err(|e| e.into()); + Command::GetChats(sender) => { + let chats = self.db.read_chats().await.map_err(|e| e.into()); sender.send(chats); } - CommandMessage::GetChatsOrdered(sender) => { - let chats = db.read_chats_ordered().await.map_err(|e| e.into()); + Command::GetChatsOrdered(sender) => { + let chats = self.db.read_chats_ordered().await.map_err(|e| e.into()); sender.send(chats); } - CommandMessage::GetChatsOrderedWithLatestMessages(sender) => { - let chats = db + Command::GetChatsOrderedWithLatestMessages(sender) => { + let chats = self + .db .read_chats_ordered_with_latest_messages() .await .map_err(|e| e.into()); sender.send(chats); } - CommandMessage::GetChat(jid, sender) => { - let chats = db.read_chat(jid).await.map_err(|e| e.into()); + Command::GetChat(jid, sender) => { + let chats = self.db.read_chat(jid).await.map_err(|e| e.into()); sender.send(chats); } - CommandMessage::GetMessages(jid, sender) => { - let messages = db.read_message_history(jid).await.map_err(|e| e.into()); + Command::GetMessages(jid, sender) => { + let messages = self + .db + .read_message_history(jid) + .await + .map_err(|e| e.into()); sender.send(messages); } - CommandMessage::DeleteChat(jid, sender) => { - let result = db.delete_chat(jid).await.map_err(|e| e.into()); + Command::DeleteChat(jid, sender) => { + let result = self.db.delete_chat(jid).await.map_err(|e| e.into()); sender.send(result); } - CommandMessage::DeleteMessage(uuid, sender) => { - let result = db.delete_message(uuid).await.map_err(|e| e.into()); + Command::DeleteMessage(uuid, sender) => { + let result = self.db.delete_message(uuid).await.map_err(|e| e.into()); sender.send(result); } - CommandMessage::GetUser(jid, sender) => { - let user = db.read_user(jid).await.map_err(|e| e.into()); + Command::GetUser(jid, sender) => { + let user = self.db.read_user(jid).await.map_err(|e| e.into()); sender.send(user); } // TODO: offline queue to modify roster - CommandMessage::AddContact(jid, sender) => { - let owned_jid; - { - owned_jid = client_jid.lock().await.clone(); - } + Command::AddContact(jid, sender) => { let iq_id = Uuid::new_v4().to_string(); let set_stanza = Stanza::Iq(Iq { - from: Some(owned_jid), + from: Some(connection.jid), id: iq_id.clone(), to: None, r#type: IqType::Set, @@ -581,10 +955,10 @@ impl CommandMessage { }); let (send, recv) = oneshot::channel(); { - pending_iqs.lock().await.insert(iq_id.clone(), send); + self.pending.lock().await.insert(iq_id.clone(), send); } // TODO: write_handle send helper function - let result = write_handle.write(set_stanza).await; + let result = connection.write_handle.write(set_stanza).await; if let Err(e) = result { sender.send(Err(RosterError::Write(e))); return; @@ -637,7 +1011,7 @@ impl CommandMessage { } } } - CommandMessage::BuddyRequest(jid, sender) => { + Command::BuddyRequest(jid, sender) => { let presence = Stanza::Presence(stanza::client::presence::Presence { from: None, id: None, @@ -650,7 +1024,7 @@ impl CommandMessage { errors: Vec::new(), delay: None, }); - let result = write_handle.write(presence).await; + let result = connection.write_handle.write(presence).await; match result { Err(_) => { let _ = sender.send(result); @@ -668,12 +1042,12 @@ impl CommandMessage { errors: Vec::new(), delay: None, }); - let result = write_handle.write(presence).await; + let result = connection.write_handle.write(presence).await; let _ = sender.send(result); } } } - CommandMessage::SubscriptionRequest(jid, sender) => { + Command::SubscriptionRequest(jid, sender) => { // TODO: i should probably have builders let presence = Stanza::Presence(stanza::client::presence::Presence { from: None, @@ -687,10 +1061,10 @@ impl CommandMessage { errors: Vec::new(), delay: None, }); - let result = write_handle.write(presence).await; + let result = connection.write_handle.write(presence).await; let _ = sender.send(result); } - CommandMessage::AcceptBuddyRequest(jid, sender) => { + Command::AcceptBuddyRequest(jid, sender) => { let presence = Stanza::Presence(stanza::client::presence::Presence { from: None, id: None, @@ -703,7 +1077,7 @@ impl CommandMessage { errors: Vec::new(), delay: None, }); - let result = write_handle.write(presence).await; + let result = connection.write_handle.write(presence).await; match result { Err(_) => { let _ = sender.send(result); @@ -721,12 +1095,12 @@ impl CommandMessage { errors: Vec::new(), delay: None, }); - let result = write_handle.write(presence).await; + let result = connection.write_handle.write(presence).await; let _ = sender.send(result); } } } - CommandMessage::AcceptSubscriptionRequest(jid, sender) => { + Command::AcceptSubscriptionRequest(jid, sender) => { let presence = Stanza::Presence(stanza::client::presence::Presence { from: None, id: None, @@ -739,10 +1113,10 @@ impl CommandMessage { errors: Vec::new(), delay: None, }); - let result = write_handle.write(presence).await; + let result = connection.write_handle.write(presence).await; let _ = sender.send(result); } - CommandMessage::UnsubscribeFromContact(jid, sender) => { + Command::UnsubscribeFromContact(jid, sender) => { let presence = Stanza::Presence(stanza::client::presence::Presence { from: None, id: None, @@ -755,10 +1129,10 @@ impl CommandMessage { errors: Vec::new(), delay: None, }); - let result = write_handle.write(presence).await; + let result = connection.write_handle.write(presence).await; let _ = sender.send(result); } - CommandMessage::UnsubscribeContact(jid, sender) => { + Command::UnsubscribeContact(jid, sender) => { let presence = Stanza::Presence(stanza::client::presence::Presence { from: None, id: None, @@ -771,10 +1145,10 @@ impl CommandMessage { errors: Vec::new(), delay: None, }); - let result = write_handle.write(presence).await; + let result = connection.write_handle.write(presence).await; let _ = sender.send(result); } - CommandMessage::UnfriendContact(jid, sender) => { + Command::UnfriendContact(jid, sender) => { let presence = Stanza::Presence(stanza::client::presence::Presence { from: None, id: None, @@ -787,7 +1161,7 @@ impl CommandMessage { errors: Vec::new(), delay: None, }); - let result = write_handle.write(presence).await; + let result = connection.write_handle.write(presence).await; match result { Err(_) => { let _ = sender.send(result); @@ -805,19 +1179,15 @@ impl CommandMessage { errors: Vec::new(), delay: None, }); - let result = write_handle.write(presence).await; + let result = connection.write_handle.write(presence).await; let _ = sender.send(result); } } } - CommandMessage::DeleteContact(jid, sender) => { - let owned_jid; - { - owned_jid = client_jid.lock().await.clone(); - } + Command::DeleteContact(jid, sender) => { let iq_id = Uuid::new_v4().to_string(); let set_stanza = Stanza::Iq(Iq { - from: Some(owned_jid), + from: Some(connection.jid), id: iq_id.clone(), to: None, r#type: IqType::Set, @@ -837,9 +1207,9 @@ impl CommandMessage { }); let (send, recv) = oneshot::channel(); { - pending_iqs.lock().await.insert(iq_id.clone(), send); + self.pending.lock().await.insert(iq_id.clone(), send); } - let result = write_handle.write(set_stanza).await; + let result = connection.write_handle.write(set_stanza).await; if let Err(e) = result { sender.send(Err(RosterError::Write(e))); return; @@ -892,11 +1262,7 @@ impl CommandMessage { } } } - CommandMessage::UpdateContact(jid, contact_update, sender) => { - let owned_jid; - { - owned_jid = client_jid.lock().await.clone(); - } + Command::UpdateContact(jid, contact_update, sender) => { let iq_id = Uuid::new_v4().to_string(); let groups = Vec::from_iter( contact_update @@ -905,7 +1271,7 @@ impl CommandMessage { .map(|group| stanza::roster::Group(Some(group))), ); let set_stanza = Stanza::Iq(Iq { - from: Some(owned_jid), + from: Some(connection.jid), id: iq_id.clone(), to: None, r#type: IqType::Set, @@ -925,9 +1291,9 @@ impl CommandMessage { }); let (send, recv) = oneshot::channel(); { - pending_iqs.lock().await.insert(iq_id.clone(), send); + self.pending.lock().await.insert(iq_id.clone(), send); } - let result = write_handle.write(set_stanza).await; + let result = connection.write_handle.write(set_stanza).await; if let Err(e) = result { sender.send(Err(RosterError::Write(e))); return; @@ -980,16 +1346,18 @@ impl CommandMessage { } } } - CommandMessage::SetStatus(online, sender) => { - let result = db.upsert_cached_status(online.clone()).await; + Command::SetStatus(online, sender) => { + let result = self.db.upsert_cached_status(online.clone()).await; if let Err(e) = result { - let _ = update_sender + let _ = self + .update_sender .send(UpdateMessage::Error(Error::SetStatus(StatusError::Cache( e.into(), )))) .await; } - let result = write_handle + let result = connection + .write_handle .write(Stanza::Presence(online.into_stanza(None))) .await .map_err(|e| StatusError::Write(e)); @@ -997,15 +1365,10 @@ impl CommandMessage { let _ = sender.send(result); } // TODO: offline message queue - CommandMessage::SendMessage(jid, body, sender) => { + Command::SendMessage(jid, body, sender) => { let id = Uuid::new_v4(); - let owned_jid: JID; - { - // TODO: timeout - owned_jid = client_jid.lock().await.clone(); - } let message = Stanza::Message(stanza::client::message::Message { - from: Some(owned_jid.clone()), + from: Some(connection.jid.clone()), id: Some(id.to_string()), to: Some(jid.clone()), // TODO: specify message type @@ -1022,17 +1385,18 @@ impl CommandMessage { }); let _ = sender.send(Ok(())); // let _ = sender.send(Ok(message.clone())); - let result = write_handle.write(message).await; + let result = connection.write_handle.write(message).await; match result { Ok(_) => { let mut message = Message { id, - from: owned_jid, + from: connection.jid, body, timestamp: Utc::now(), }; info!("send message {:?}", message); - if let Err(e) = db + if let Err(e) = self + .db .create_message_with_self_resource_and_chat( message.clone(), jid.clone(), @@ -1041,13 +1405,16 @@ impl CommandMessage { .map_err(|e| e.into()) { tracing::error!("{}", e); - let _ = update_sender.send(UpdateMessage::Error(Error::MessageSend( - error::MessageSendError::MessageHistory(e), - ))); + let _ = + self.update_sender + .send(UpdateMessage::Error(Error::MessageSend( + error::MessageSendError::MessageHistory(e), + ))); } // TODO: don't do this, have separate from from details message.from = message.from.as_bare(); - let _ = update_sender + let _ = self + .update_sender .send(UpdateMessage::Message { to: jid, message }) .await; } @@ -1056,417 +1423,344 @@ impl CommandMessage { } } } - CommandMessage::SendPresence(jid, presence, sender) => { + Command::SendPresence(jid, presence, sender) => { let mut presence: stanza::client::presence::Presence = presence.into(); if let Some(jid) = jid { presence.to = Some(jid); }; - let result = write_handle.write(Stanza::Presence(presence)).await; + let result = connection + .write_handle + .write(Stanza::Presence(presence)) + .await; // .map_err(|e| StatusError::Write(e)); let _ = sender.send(result); } } } -} -// TODO: separate sender and receiver, store handle to Luz process to ensure dropping -// #[derive(Clone)] -#[derive(Debug)] -pub struct LuzHandle { - sender: mpsc::Sender, - timeout: Duration, -} - -impl Clone for LuzHandle { - fn clone(&self) -> Self { - Self { - sender: self.sender.clone(), - timeout: self.timeout, + pub async fn handle_offline(self, command: Command) { + match command { + Command::GetRoster(sender) => { + let roster = self.db.read_cached_roster().await; + match roster { + Ok(roster) => { + let _ = sender.send(Ok(roster)); + } + Err(e) => { + let _ = sender.send(Err(RosterError::Cache(e.into()))); + } + } + } + Command::GetChats(sender) => { + let chats = self.db.read_chats().await.map_err(|e| e.into()); + sender.send(chats); + } + Command::GetChatsOrdered(sender) => { + let chats = self.db.read_chats_ordered().await.map_err(|e| e.into()); + sender.send(chats); + } + Command::GetChatsOrderedWithLatestMessages(sender) => { + let chats = self + .db + .read_chats_ordered_with_latest_messages() + .await + .map_err(|e| e.into()); + sender.send(chats); + } + Command::GetChat(jid, sender) => { + let chats = self.db.read_chat(jid).await.map_err(|e| e.into()); + sender.send(chats); + } + Command::GetMessages(jid, sender) => { + let messages = self + .db + .read_message_history(jid) + .await + .map_err(|e| e.into()); + sender.send(messages); + } + Command::DeleteChat(jid, sender) => { + let result = self.db.delete_chat(jid).await.map_err(|e| e.into()); + sender.send(result); + } + Command::DeleteMessage(uuid, sender) => { + let result = self.db.delete_message(uuid).await.map_err(|e| e.into()); + sender.send(result); + } + Command::GetUser(jid, sender) => { + let user = self.db.read_user(jid).await.map_err(|e| e.into()); + sender.send(user); + } + // TODO: offline queue to modify roster + Command::AddContact(_jid, sender) => { + sender.send(Err(RosterError::Write(WriteError::Disconnected))); + } + Command::BuddyRequest(_jid, sender) => { + sender.send(Err(WriteError::Disconnected)); + } + Command::SubscriptionRequest(_jid, sender) => { + sender.send(Err(WriteError::Disconnected)); + } + Command::AcceptBuddyRequest(_jid, sender) => { + sender.send(Err(WriteError::Disconnected)); + } + Command::AcceptSubscriptionRequest(_jid, sender) => { + sender.send(Err(WriteError::Disconnected)); + } + Command::UnsubscribeFromContact(_jid, sender) => { + sender.send(Err(WriteError::Disconnected)); + } + Command::UnsubscribeContact(_jid, sender) => { + sender.send(Err(WriteError::Disconnected)); + } + Command::UnfriendContact(_jid, sender) => { + sender.send(Err(WriteError::Disconnected)); + } + Command::DeleteContact(_jid, sender) => { + sender.send(Err(RosterError::Write(WriteError::Disconnected))); + } + Command::UpdateContact(_jid, _contact_update, sender) => { + sender.send(Err(RosterError::Write(WriteError::Disconnected))); + } + Command::SetStatus(online, sender) => { + let result = self + .db + .upsert_cached_status(online) + .await + .map_err(|e| StatusError::Cache(e.into())); + sender.send(result); + } + // TODO: offline message queue + Command::SendMessage(_jid, _body, sender) => { + sender.send(Err(WriteError::Disconnected)); + } + Command::SendPresence(_jid, _presence, sender) => { + sender.send(Err(WriteError::Disconnected)); + } } } } -impl Deref for LuzHandle { - type Target = mpsc::Sender; - - fn deref(&self) -> &Self::Target { - &self.sender - } -} - -impl DerefMut for LuzHandle { - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.sender - } +#[derive(Clone)] +pub struct Connected { + // full jid will stay stable across reconnections + jid: JID, + write_handle: WriteHandle, } -impl LuzHandle { - // TODO: database creation separate - pub fn new(jid: JID, password: String, db: Db) -> (Self, mpsc::Receiver) { - let (command_sender, command_receiver) = mpsc::channel(20); - let (update_sender, update_receiver) = mpsc::channel(20); - // might be bad, first supervisor shutdown notification oneshot is never used (disgusting) - let (sup_send, sup_recv) = oneshot::channel(); - let mut sup_recv = sup_recv.fuse(); - - let actor = Luz::new( - command_sender.clone(), - command_receiver, - Arc::new(Mutex::new(jid)), - password, - Arc::new(Mutex::new(None)), - sup_recv, - db, - update_sender, - ); - tokio::spawn(async move { actor.run().await }); - - ( - Self { - sender: command_sender, - // TODO: configure timeout - timeout: Duration::from_secs(10), - }, - update_receiver, - ) - } - - pub async fn connect(&self) -> Result<(), ActorError> { - self.send(CommandMessage::Connect).await?; - Ok(()) - } - - pub async fn disconnect(&self, offline: Offline) -> Result<(), ActorError> { - self.send(CommandMessage::Disconnect(offline)).await?; - Ok(()) - } - - pub async fn get_roster(&self) -> Result, CommandError> { - let (send, recv) = oneshot::channel(); - self.send(CommandMessage::GetRoster(send)) - .await - .map_err(|e| CommandError::Actor(Into::::into(e)))?; - let roster = timeout(self.timeout, recv) - .await - .map_err(|e| CommandError::Actor(Into::::into(e)))? - .map_err(|e| CommandError::Actor(Into::::into(e)))??; - Ok(roster) - } - - pub async fn get_chats(&self) -> Result, CommandError> { - let (send, recv) = oneshot::channel(); - self.send(CommandMessage::GetChats(send)) - .await - .map_err(|e| CommandError::Actor(Into::::into(e)))?; - let chats = timeout(self.timeout, recv) - .await - .map_err(|e| CommandError::Actor(Into::::into(e)))? - .map_err(|e| CommandError::Actor(Into::::into(e)))??; - Ok(chats) - } - - pub async fn get_chats_ordered(&self) -> Result, CommandError> { - let (send, recv) = oneshot::channel(); - self.send(CommandMessage::GetChatsOrdered(send)) - .await - .map_err(|e| CommandError::Actor(Into::::into(e)))?; - let chats = timeout(self.timeout, recv) - .await - .map_err(|e| CommandError::Actor(Into::::into(e)))? - .map_err(|e| CommandError::Actor(Into::::into(e)))??; - Ok(chats) - } - - pub async fn get_chats_ordered_with_latest_messages( - &self, - ) -> Result, CommandError> { - let (send, recv) = oneshot::channel(); - self.send(CommandMessage::GetChatsOrderedWithLatestMessages(send)) - .await - .map_err(|e| CommandError::Actor(Into::::into(e)))?; - let chats = timeout(self.timeout, recv) - .await - .map_err(|e| CommandError::Actor(Into::::into(e)))? - .map_err(|e| CommandError::Actor(Into::::into(e)))??; - Ok(chats) - } +pub trait Logic {} - pub async fn get_chat(&self, jid: JID) -> Result> { - let (send, recv) = oneshot::channel(); - self.send(CommandMessage::GetChat(jid, send)) - .await - .map_err(|e| CommandError::Actor(Into::::into(e)))?; - let chat = timeout(self.timeout, recv) - .await - .map_err(|e| CommandError::Actor(Into::::into(e)))? - .map_err(|e| CommandError::Actor(Into::::into(e)))??; - Ok(chat) - } +pub struct Luz { + jid: JID, + password: Arc, + receiver: mpsc::Receiver, + // TODO: use a dyn passwordprovider trait to avoid storing password in memory + connected: Option<(Connected, SupervisorHandle)>, + // connected_intention: bool, + /// if connection was shut down due to e.g. server shutdown, supervisor must be able to mark client as disconnected + connection_supervisor_shutdown: Fuse>, + // TODO: will need to have an auto reconnect state as well (e.g. in case server shut down, to try and reconnect later) + // TODO: will grow forever at this point, maybe not required as tasks will naturally shut down anyway? + // TODO: genericize + logic: LogicState, + // config: LampConfig, + tasks: JoinSet<()>, +} - pub async fn get_messages( - &self, +impl Luz { + fn new( jid: JID, - ) -> Result, CommandError> { - let (send, recv) = oneshot::channel(); - self.send(CommandMessage::GetMessages(jid, send)) - .await - .map_err(|e| CommandError::Actor(Into::::into(e)))?; - let messages = timeout(self.timeout, recv) - .await - .map_err(|e| CommandError::Actor(Into::::into(e)))? - .map_err(|e| CommandError::Actor(Into::::into(e)))??; - Ok(messages) + password: String, + receiver: mpsc::Receiver, + connected: Option<(Connected, SupervisorHandle)>, + connection_supervisor_shutdown: Fuse>, + logic: LogicState, + ) -> Self { + Self { + jid, + password: Arc::new(password), + connected, + receiver, + connection_supervisor_shutdown, + logic, + tasks: JoinSet::new(), + } } - pub async fn delete_chat(&self, jid: JID) -> Result<(), CommandError> { - let (send, recv) = oneshot::channel(); - self.send(CommandMessage::DeleteChat(jid, send)) - .await - .map_err(|e| CommandError::Actor(Into::::into(e)))?; - let result = timeout(self.timeout, recv) - .await - .map_err(|e| CommandError::Actor(Into::::into(e)))? - .map_err(|e| CommandError::Actor(Into::::into(e)))??; - Ok(result) - } + async fn run(mut self) { + loop { + let msg = tokio::select! { + // this is okay, as when created the supervisor (and connection) doesn't exist, but a bit messy + // THIS IS NOT OKAY LOLLLL - apparently fusing is the best option??? + _ = &mut self.connection_supervisor_shutdown => { + self.connected = None; + continue; + } + Some(msg) = self.receiver.recv() => { + msg + }, + else => break, + }; + // TODO: consider separating disconnect/connect and commands apart from commandmessage + // TODO: dispatch commands separate tasks + match msg { + LuzMessage::Connect => { + match self.connected { + Some(_) => { + self.logic + .clone() + .update_sender + .send(UpdateMessage::Error(Error::AlreadyConnected)) + .await; + } + None => { + let mut jid = self.jid.clone(); + let mut domain = jid.domainpart.clone(); + // TODO: check what happens upon reconnection with same resource (this is probably what one wants to do and why jid should be mutated from a bare jid to one with a resource) + let streams_result = + jabber::connect_and_login(&mut jid, &*self.password, &mut domain) + .await; + match streams_result { + Ok(s) => { + debug!("ok stream result"); + let (shutdown_send, shutdown_recv) = oneshot::channel::<()>(); + let (writer, supervisor) = SupervisorHandle::new( + s, + shutdown_send, + jid.clone(), + self.password.clone(), + self.logic.clone(), + ); - pub async fn delete_message(&self, id: Uuid) -> Result<(), CommandError> { - let (send, recv) = oneshot::channel(); - self.send(CommandMessage::DeleteMessage(id, send)) - .await - .map_err(|e| CommandError::Actor(Into::::into(e)))?; - let result = timeout(self.timeout, recv) - .await - .map_err(|e| CommandError::Actor(Into::::into(e)))? - .map_err(|e| CommandError::Actor(Into::::into(e)))??; - Ok(result) - } + let shutdown_recv = shutdown_recv.fuse(); + self.connection_supervisor_shutdown = shutdown_recv; - pub async fn get_user(&self, jid: JID) -> Result> { - let (send, recv) = oneshot::channel(); - self.send(CommandMessage::GetUser(jid, send)) - .await - .map_err(|e| CommandError::Actor(Into::::into(e)))?; - let result = timeout(self.timeout, recv) - .await - .map_err(|e| CommandError::Actor(Into::::into(e)))? - .map_err(|e| CommandError::Actor(Into::::into(e)))??; - Ok(result) - } + let connected = Connected { + jid, + write_handle: writer, + }; - pub async fn add_contact(&self, jid: JID) -> Result<(), CommandError> { - let (send, recv) = oneshot::channel(); - self.send(CommandMessage::AddContact(jid, send)) - .await - .map_err(|e| CommandError::Actor(Into::::into(e)))?; - let result = timeout(self.timeout, recv) - .await - .map_err(|e| CommandError::Actor(Into::::into(e)))? - .map_err(|e| CommandError::Actor(Into::::into(e)))??; - Ok(result) - } + self.logic.clone().handle_connect(connected.clone()).await; - pub async fn buddy_request(&self, jid: JID) -> Result<(), CommandError> { - let (send, recv) = oneshot::channel(); - self.send(CommandMessage::BuddyRequest(jid, send)) - .await - .map_err(|e| CommandError::Actor(Into::::into(e)))?; - let result = timeout(self.timeout, recv) - .await - .map_err(|e| CommandError::Actor(Into::::into(e)))? - .map_err(|e| CommandError::Actor(Into::::into(e)))??; - Ok(result) + self.connected = Some((connected, supervisor)); + } + Err(e) => { + tracing::error!("error: {}", e); + let _ = self.logic.clone().update_sender.send( + UpdateMessage::Error(Error::Connecting( + ConnectionError::ConnectionFailed(e.into()), + )), + ); + } + } + } + }; + } + LuzMessage::Disconnect => match self.connected { + None => { + let _ = self + .logic + .update_sender + .send(UpdateMessage::Error(Error::AlreadyDisconnected)) + .await; + } + ref mut c => { + if let Some((connected, supervisor_handle)) = c.take() { + // TODO: better disconnect logic, only reflect once actually disconnected + // TODO: call within supervisor instead + self.logic + .clone() + .handle_disconnect(connected.clone()) + .await; + let _ = supervisor_handle.send(SupervisorCommand::Disconnect).await; + } else { + unreachable!() + }; + } + }, + LuzMessage::Command(command) => { + match self.connected.as_ref() { + Some((w, s)) => self + .tasks + .spawn(self.logic.clone().handle_online(command, w.clone())), + None => self.tasks.spawn(self.logic.clone().handle_offline(command)), + }; + } + } + } } +} - pub async fn subscription_request(&self, jid: JID) -> Result<(), CommandError> { - let (send, recv) = oneshot::channel(); - self.send(CommandMessage::SubscriptionRequest(jid, send)) - .await - .map_err(|e| CommandError::Actor(Into::::into(e)))?; - let result = timeout(self.timeout, recv) - .await - .map_err(|e| CommandError::Actor(Into::::into(e)))? - .map_err(|e| CommandError::Actor(Into::::into(e)))??; - Ok(result) - } +// TODO: separate sender and receiver, store handle to Luz process to ensure dropping +// #[derive(Clone)] +#[derive(Debug)] +pub struct LuzHandle { + sender: mpsc::Sender, + timeout: Duration, +} - pub async fn accept_buddy_request(&self, jid: JID) -> Result<(), CommandError> { - let (send, recv) = oneshot::channel(); - self.send(CommandMessage::AcceptBuddyRequest(jid, send)) - .await - .map_err(|e| CommandError::Actor(Into::::into(e)))?; - let result = timeout(self.timeout, recv) - .await - .map_err(|e| CommandError::Actor(Into::::into(e)))? - .map_err(|e| CommandError::Actor(Into::::into(e)))??; - Ok(result) +impl Clone for LuzHandle { + fn clone(&self) -> Self { + Self { + sender: self.sender.clone(), + timeout: self.timeout, + } } +} - pub async fn accept_subscription_request( - &self, - jid: JID, - ) -> Result<(), CommandError> { - let (send, recv) = oneshot::channel(); - self.send(CommandMessage::AcceptSubscriptionRequest(jid, send)) - .await - .map_err(|e| CommandError::Actor(Into::::into(e)))?; - let result = timeout(self.timeout, recv) - .await - .map_err(|e| CommandError::Actor(Into::::into(e)))? - .map_err(|e| CommandError::Actor(Into::::into(e)))??; - Ok(result) - } +impl Deref for LuzHandle { + type Target = mpsc::Sender; - pub async fn unsubscribe_from_contact(&self, jid: JID) -> Result<(), CommandError> { - let (send, recv) = oneshot::channel(); - self.send(CommandMessage::UnsubscribeFromContact(jid, send)) - .await - .map_err(|e| CommandError::Actor(Into::::into(e)))?; - let result = timeout(self.timeout, recv) - .await - .map_err(|e| CommandError::Actor(Into::::into(e)))? - .map_err(|e| CommandError::Actor(Into::::into(e)))??; - Ok(result) + fn deref(&self) -> &Self::Target { + &self.sender } +} - pub async fn unsubscribe_contact(&self, jid: JID) -> Result<(), CommandError> { - let (send, recv) = oneshot::channel(); - self.send(CommandMessage::UnsubscribeContact(jid, send)) - .await - .map_err(|e| CommandError::Actor(Into::::into(e)))?; - let result = timeout(self.timeout, recv) - .await - .map_err(|e| CommandError::Actor(Into::::into(e)))? - .map_err(|e| CommandError::Actor(Into::::into(e)))??; - Ok(result) +impl DerefMut for LuzHandle { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.sender } +} - pub async fn unfriend_contact(&self, jid: JID) -> Result<(), CommandError> { - let (send, recv) = oneshot::channel(); - self.send(CommandMessage::UnfriendContact(jid, send)) - .await - .map_err(|e| CommandError::Actor(Into::::into(e)))?; - let result = timeout(self.timeout, recv) - .await - .map_err(|e| CommandError::Actor(Into::::into(e)))? - .map_err(|e| CommandError::Actor(Into::::into(e)))??; - Ok(result) - } +impl LuzHandle { + // TODO: database creation separate + pub fn new(jid: JID, password: String, logic: LogicState) -> Self { + let (command_sender, command_receiver) = mpsc::channel(20); - pub async fn delete_contact(&self, jid: JID) -> Result<(), CommandError> { - let (send, recv) = oneshot::channel(); - self.send(CommandMessage::DeleteContact(jid, send)) - .await - .map_err(|e| CommandError::Actor(Into::::into(e)))?; - let result = timeout(self.timeout, recv) - .await - .map_err(|e| CommandError::Actor(Into::::into(e)))? - .map_err(|e| CommandError::Actor(Into::::into(e)))??; - Ok(result) - } + // might be bad, first supervisor shutdown notification oneshot is never used (disgusting) + let (_sup_send, sup_recv) = oneshot::channel(); + let sup_recv = sup_recv.fuse(); - pub async fn update_contact( - &self, - jid: JID, - update: ContactUpdate, - ) -> Result<(), CommandError> { - let (send, recv) = oneshot::channel(); - self.send(CommandMessage::UpdateContact(jid, update, send)) - .await - .map_err(|e| CommandError::Actor(Into::::into(e)))?; - let result = timeout(self.timeout, recv) - .await - .map_err(|e| CommandError::Actor(Into::::into(e)))? - .map_err(|e| CommandError::Actor(Into::::into(e)))??; - Ok(result) + let actor = Luz::new(jid, password, command_receiver, None, sup_recv, logic); + tokio::spawn(async move { actor.run().await }); + + Self { + sender: command_sender, + // TODO: configure timeout + timeout: Duration::from_secs(10), + } } - pub async fn set_status(&self, online: Online) -> Result<(), CommandError> { - let (send, recv) = oneshot::channel(); - self.send(CommandMessage::SetStatus(online, send)) - .await - .map_err(|e| CommandError::Actor(Into::::into(e)))?; - let result = timeout(self.timeout, recv) - .await - .map_err(|e| CommandError::Actor(Into::::into(e)))? - .map_err(|e| CommandError::Actor(Into::::into(e)))??; - Ok(result) + pub async fn connect(&self) -> Result<(), ActorError> { + self.send(LuzMessage::Connect).await?; + Ok(()) } - pub async fn send_message(&self, jid: JID, body: Body) -> Result<(), CommandError> { - let (send, recv) = oneshot::channel(); - self.send(CommandMessage::SendMessage(jid, body, send)) - .await - .map_err(|e| CommandError::Actor(Into::::into(e)))?; - let result = timeout(self.timeout, recv) - .await - .map_err(|e| CommandError::Actor(Into::::into(e)))? - .map_err(|e| CommandError::Actor(Into::::into(e)))??; - Ok(result) + pub async fn disconnect(&self, offline: Offline) -> Result<(), ActorError> { + self.send(LuzMessage::Disconnect).await?; + Ok(()) } } // TODO: generate methods for each with a macro -pub enum CommandMessage { +pub enum LuzMessage { // TODO: login invisible xep-0186 /// connect to XMPP chat server. gets roster and publishes initial presence. Connect, /// disconnect from XMPP chat server, sending unavailable presence then closing stream. - Disconnect(Offline), - /// get the roster. if offline, retreive cached version from database. should be stored in application memory - GetRoster(oneshot::Sender, RosterError>>), - /// get all chats. chat will include 10 messages in their message Vec (enough for chat previews) - // TODO: paging and filtering - GetChats(oneshot::Sender, DatabaseError>>), - // TODO: paging and filtering - GetChatsOrdered(oneshot::Sender, DatabaseError>>), - // TODO: paging and filtering - GetChatsOrderedWithLatestMessages(oneshot::Sender, DatabaseError>>), - /// get a specific chat by jid - GetChat(JID, oneshot::Sender>), - /// get message history for chat (does appropriate mam things) - // TODO: paging and filtering - GetMessages(JID, oneshot::Sender, DatabaseError>>), - /// delete a chat from your chat history, along with all the corresponding messages - DeleteChat(JID, oneshot::Sender>), - /// delete a message from your chat history - DeleteMessage(Uuid, oneshot::Sender>), - /// get a user from your users database - GetUser(JID, oneshot::Sender>), - /// add a contact to your roster, with a status of none, no subscriptions. - AddContact(JID, oneshot::Sender>), - /// send a friend request i.e. a subscription request with a subscription pre-approval. if not already added to roster server adds to roster. - BuddyRequest(JID, oneshot::Sender>), - /// send a subscription request, without pre-approval. if not already added to roster server adds to roster. - SubscriptionRequest(JID, oneshot::Sender>), - /// accept a friend request by accepting a pending subscription and sending a subscription request back. if not already added to roster adds to roster. - AcceptBuddyRequest(JID, oneshot::Sender>), - /// accept a pending subscription and doesn't send a subscription request back. if not already added to roster adds to roster. - AcceptSubscriptionRequest(JID, oneshot::Sender>), - /// unsubscribe to a contact, but don't remove their subscription. - UnsubscribeFromContact(JID, oneshot::Sender>), - /// stop a contact from being subscribed, but stay subscribed to the contact. - UnsubscribeContact(JID, oneshot::Sender>), - /// remove subscriptions to and from contact, but keep in roster. - UnfriendContact(JID, oneshot::Sender>), - /// remove a contact from the contact list. will remove subscriptions if not already done then delete contact from roster. - DeleteContact(JID, oneshot::Sender>), - /// update contact. contact details will be overwritten with the contents of the contactupdate struct. - UpdateContact(JID, ContactUpdate, oneshot::Sender>), - /// set online status. if disconnected, will be cached so when client connects, will be sent as the initial presence. - SetStatus(Online, oneshot::Sender>), - /// send presence stanza - // TODO: cache presence stanza - SendPresence( - Option, - PresenceType, - oneshot::Sender>, - ), - /// send a directed presence (usually to a non-contact). - // TODO: should probably make it so people can add non-contact auto presence sharing in the client (most likely through setting an internal setting) - /// send a message to a jid (any kind of jid that can receive a message, e.g. a user or a - /// chatroom). if disconnected, will be cached so when client connects, message will be sent. - SendMessage(JID, Body, oneshot::Sender>), + Disconnect, + /// TODO: generics + Command(Command), } #[derive(Debug, Clone)] diff --git a/luz/src/main.rs b/luz/src/main.rs index 2bfd086..5aeef14 100644 --- a/luz/src/main.rs +++ b/luz/src/main.rs @@ -1,7 +1,7 @@ use std::{path::Path, str::FromStr, time::Duration}; use jid::JID; -use luz::{db::Db, CommandMessage, LuzHandle}; +use luz::{db::Db, LuzHandle, LuzMessage}; use sqlx::SqlitePool; use tokio::{ io::{AsyncReadExt, AsyncWriteExt}, @@ -24,11 +24,11 @@ async fn main() { } }); - luz.send(CommandMessage::Connect).await.unwrap(); + luz.send(LuzMessage::Connect).await.unwrap(); let (send, recv) = oneshot::channel(); tokio::time::sleep(Duration::from_secs(5)).await; info!("sending message"); - luz.send(CommandMessage::SendMessage( + luz.send(LuzMessage::SendMessage( JID::from_str("cel@blos.sm").unwrap(), luz::chat::Body { body: "hallo!!!".to_string(), -- cgit