diff options
Diffstat (limited to 'luz/src/connection/read.rs')
-rw-r--r-- | luz/src/connection/read.rs | 183 |
1 files changed, 160 insertions, 23 deletions
diff --git a/luz/src/connection/read.rs b/luz/src/connection/read.rs index 7800d56..edc6cdb 100644 --- a/luz/src/connection/read.rs +++ b/luz/src/connection/read.rs @@ -1,3 +1,8 @@ +use std::{ + ops::{Deref, DerefMut}, + time::Duration, +}; + use jabber::{connection::Tls, jabber_stream::bound_stream::BoundJabberReader}; use sqlx::SqlitePool; use stanza::client::Stanza; @@ -6,7 +11,7 @@ use tokio::{ task::{JoinHandle, JoinSet}, }; -use crate::UpdateMessage; +use crate::{error::Error, UpdateMessage}; use super::{ write::{WriteHandle, WriteMessage}, @@ -17,25 +22,41 @@ pub struct Read { // TODO: place iq hashmap here control_receiver: mpsc::Receiver<ReadControl>, stream: BoundJabberReader<Tls>, - on_crash: oneshot::Sender<(SqlitePool, mpsc::Sender<UpdateMessage>, JoinSet<()>)>, + on_crash: oneshot::Sender<( + SqlitePool, + mpsc::Sender<UpdateMessage>, + JoinSet<()>, + mpsc::Sender<SupervisorCommand>, + WriteHandle, + )>, db: SqlitePool, update_sender: mpsc::Sender<UpdateMessage>, supervisor_control: mpsc::Sender<SupervisorCommand>, write_handle: WriteHandle, tasks: JoinSet<()>, + disconnecting: bool, + disconnect_timedout: oneshot::Receiver<()>, } impl Read { fn new( control_receiver: mpsc::Receiver<ReadControl>, stream: BoundJabberReader<Tls>, - on_crash: oneshot::Sender<(SqlitePool, mpsc::Sender<UpdateMessage>, JoinSet<()>)>, + on_crash: oneshot::Sender<( + SqlitePool, + mpsc::Sender<UpdateMessage>, + JoinSet<()>, + mpsc::Sender<SupervisorCommand>, + WriteHandle, + )>, db: SqlitePool, update_sender: mpsc::Sender<UpdateMessage>, // jabber server must be able to both terminate the connection from error, and ask for data from the client (such as supported XEPs) supervisor_control: mpsc::Sender<SupervisorCommand>, - write_sender: WriteHandle, + write_handle: WriteHandle, + tasks: JoinSet<()>, ) -> Self { + let (send, recv) = oneshot::channel(); Self { control_receiver, stream, @@ -43,26 +64,73 @@ impl Read { db, update_sender, supervisor_control, - write_handle: write_sender, - tasks: JoinSet::new(), + write_handle, + tasks, + disconnecting: false, + disconnect_timedout: recv, } } async fn run(mut self) { loop { tokio::select! { + // if still haven't received the end tag in time, just kill itself + _ = &mut self.disconnect_timedout => { + break; + } Some(msg) = self.control_receiver.recv() => { match msg { - ReadControl::Disconnect => todo!(), - ReadControl::Abort(sender) => todo!(), + // when disconnect received, + ReadControl::Disconnect => { + let (send, recv) = oneshot::channel(); + self.disconnect_timedout = recv; + self.disconnecting = true; + tokio::spawn(async { + tokio::time::sleep(Duration::from_secs(10)).await; + let _ = send.send(()); + }) + }, + ReadControl::Abort(sender) => { + let _ = sender.send((self.db, self.update_sender, self.tasks, self.supervisor_control, self.write_handle)); + break; + }, }; }, stanza = self.stream.read::<Stanza>() => { match stanza { - Ok(_) => todo!(), - Err(_) => todo!(), + Ok(s) => { + self.tasks.spawn(handle_stanza(s, self.update_sender.clone(), self.db.clone(), self.supervisor_control.clone(), self.write_handle.clone())); + }, + Err(e) => { + // TODO: NEXT write the correct error stanza depending on error, decide whether to reconnect or properly disconnect, depending on if disconnecting is true + // match e { + // peanuts::Error::ReadError(error) => todo!(), + // peanuts::Error::Utf8Error(utf8_error) => todo!(), + // peanuts::Error::ParseError(_) => todo!(), + // peanuts::Error::EntityProcessError(_) => todo!(), + // peanuts::Error::InvalidCharRef(_) => todo!(), + // peanuts::Error::DuplicateNameSpaceDeclaration(namespace_declaration) => todo!(), + // peanuts::Error::DuplicateAttribute(_) => todo!(), + // peanuts::Error::UnqualifiedNamespace(_) => todo!(), + // peanuts::Error::MismatchedEndTag(name, name1) => todo!(), + // peanuts::Error::NotInElement(_) => todo!(), + // peanuts::Error::ExtraData(_) => todo!(), + // peanuts::Error::UndeclaredNamespace(_) => todo!(), + // peanuts::Error::IncorrectName(name) => todo!(), + // peanuts::Error::DeserializeError(_) => todo!(), + // peanuts::Error::Deserialize(deserialize_error) => todo!(), + // peanuts::Error::RootElementEnded => todo!(), + // } + // TODO: make sure this only happens when an end tag is received + if self.disconnecting == true { + break; + } else { + // AAAAAAAAAAAAAAAAAAAAA i should really just have this stored in the supervisor and not gaf bout passing these references around + let _ = self.on_crash.send((self.db, self.update_sender, self.tasks, self.supervisor_control, self.write_handle)); + } + break; + }, } - self.tasks.spawn(); }, else => break } @@ -70,34 +138,102 @@ impl Read { } } -trait Task { - async fn handle(); -} - -impl Task for Stanza { - async fn handle() { - todo!() - } +// what do stanza processes do? +// - update ui +// - access database +// - disconnect proper, reconnect +// - respond to server requests +async fn handle_stanza( + stanza: Stanza, + update_sender: mpsc::Sender<UpdateMessage>, + db: SqlitePool, + supervisor_control: mpsc::Sender<SupervisorCommand>, + write_handle: WriteHandle, +) { + todo!() } -enum ReadControl { +pub enum ReadControl { Disconnect, - Abort(oneshot::Sender<mpsc::Receiver<WriteMessage>>), + Abort( + oneshot::Sender<( + SqlitePool, + mpsc::Sender<UpdateMessage>, + JoinSet<()>, + mpsc::Sender<SupervisorCommand>, + WriteHandle, + )>, + ), } pub struct ReadControlHandle { sender: mpsc::Sender<ReadControl>, - handle: JoinHandle<()>, + pub(crate) handle: JoinHandle<()>, +} + +impl Deref for ReadControlHandle { + type Target = mpsc::Sender<ReadControl>; + + fn deref(&self) -> &Self::Target { + &self.sender + } +} + +impl DerefMut for ReadControlHandle { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.sender + } } impl ReadControlHandle { pub fn new( stream: BoundJabberReader<Tls>, - on_crash: oneshot::Sender<(SqlitePool, mpsc::Sender<UpdateMessage>, JoinSet<()>)>, + on_crash: oneshot::Sender<( + SqlitePool, + mpsc::Sender<UpdateMessage>, + JoinSet<()>, + mpsc::Sender<SupervisorCommand>, + WriteHandle, + )>, + db: SqlitePool, + sender: mpsc::Sender<UpdateMessage>, + supervisor_control: mpsc::Sender<SupervisorCommand>, + jabber_write: WriteHandle, + ) -> Self { + let (control_sender, control_receiver) = mpsc::channel(20); + + let actor = Read::new( + control_receiver, + stream, + on_crash, + db, + sender, + supervisor_control, + jabber_write, + JoinSet::new(), + ); + let handle = tokio::spawn(async move { actor.run().await }); + + Self { + sender: control_sender, + handle, + } + } + + pub fn reconnect( + stream: BoundJabberReader<Tls>, + on_crash: oneshot::Sender<( + SqlitePool, + mpsc::Sender<UpdateMessage>, + JoinSet<()>, + mpsc::Sender<SupervisorCommand>, + WriteHandle, + )>, db: SqlitePool, sender: mpsc::Sender<UpdateMessage>, supervisor_control: mpsc::Sender<SupervisorCommand>, jabber_write: WriteHandle, + tasks: JoinSet<()>, ) -> Self { let (control_sender, control_receiver) = mpsc::channel(20); @@ -109,6 +245,7 @@ impl ReadControlHandle { sender, supervisor_control, jabber_write, + tasks, ); let handle = tokio::spawn(async move { actor.run().await }); |