diff options
Diffstat (limited to 'luz/src/connection/read.rs')
-rw-r--r-- | luz/src/connection/read.rs | 242 |
1 files changed, 0 insertions, 242 deletions
diff --git a/luz/src/connection/read.rs b/luz/src/connection/read.rs deleted file mode 100644 index 4e55bc5..0000000 --- a/luz/src/connection/read.rs +++ /dev/null @@ -1,242 +0,0 @@ -use std::{ - collections::HashMap, - marker::PhantomData, - ops::{Deref, DerefMut}, - str::FromStr, - sync::Arc, - time::Duration, -}; - -use chrono::{DateTime, Utc}; -use jabber::{connection::Tls, jabber_stream::bound_stream::BoundJabberReader}; -use stanza::client::Stanza; -use tokio::{ - sync::{mpsc, oneshot, Mutex}, - task::{JoinHandle, JoinSet}, -}; -use tracing::info; -use uuid::Uuid; - -use crate::{ - chat::{Body, Message}, - db::Db, - error::{Error, IqError, MessageRecvError, PresenceError, ReadError, RosterError}, - presence::{Offline, Online, Presence, PresenceType, Show}, - roster::Contact, - Connected, Logic, LogicState, UpdateMessage, -}; - -use super::{write::WriteHandle, SupervisorCommand, SupervisorSender}; - -/// read actor -pub struct Read<Lgc> { - stream: BoundJabberReader<Tls>, - disconnecting: bool, - disconnect_timedout: oneshot::Receiver<()>, - - // all the threads spawned by the current connection session - tasks: JoinSet<()>, - - // for handling incoming stanzas - // jabber server must be able to both terminate the connection from error, and ask for data from the client (such as supported XEPs) - connected: Connected, - logic: Lgc, - supervisor_control: SupervisorSender, - - // control stuff - control_receiver: mpsc::Receiver<ReadControl>, - on_crash: oneshot::Sender<ReadState>, -} - -/// when a crash/abort occurs, this gets sent back to the supervisor, so that the connection session can continue -pub struct ReadState { - pub supervisor_control: SupervisorSender, - // TODO: when a stream dies, the iq gets from the server should not be replied to on the new stream - pub tasks: JoinSet<()>, -} - -impl<Lgc> Read<Lgc> { - fn new( - stream: BoundJabberReader<Tls>, - tasks: JoinSet<()>, - connected: Connected, - logic: Lgc, - supervisor_control: SupervisorSender, - control_receiver: mpsc::Receiver<ReadControl>, - on_crash: oneshot::Sender<ReadState>, - ) -> Self { - let (_send, recv) = oneshot::channel(); - Self { - stream, - disconnecting: false, - disconnect_timedout: recv, - tasks, - connected, - logic, - supervisor_control, - control_receiver, - on_crash, - } - } -} - -impl<Lgc: Clone + Logic + Send + 'static> Read<Lgc> { - async fn run(mut self) { - println!("started read thread"); - // let stanza = self.stream.read::<Stanza>().await; - // println!("{:?}", stanza); - loop { - tokio::select! { - // if still haven't received the end tag in time, just kill itself - // TODO: is this okay??? what if notification thread dies? - Ok(()) = &mut self.disconnect_timedout => { - info!("disconnect_timedout"); - break; - } - Some(msg) = self.control_receiver.recv() => { - match msg { - // when disconnect received, - ReadControl::Disconnect => { - let (send, recv) = oneshot::channel(); - self.disconnect_timedout = recv; - self.disconnecting = true; - tokio::spawn(async { - tokio::time::sleep(Duration::from_secs(10)).await; - let _ = send.send(()); - }) - }, - ReadControl::Abort(sender) => { - let _ = sender.send(ReadState { supervisor_control: self.supervisor_control, tasks: self.tasks }); - break; - }, - }; - }, - s = self.stream.read::<Stanza>() => { - println!("read stanza"); - match s { - Ok(s) => { - self.tasks.spawn(self.logic.clone().handle_stanza(s, self.connected.clone(), self.supervisor_control.clone())); - }, - Err(e) => { - println!("error: {:?}", e); - // TODO: NEXT write the correct error stanza depending on error, decide whether to reconnect or properly disconnect, depending on if disconnecting is true - // match e { - // peanuts::Error::ReadError(error) => todo!(), - // peanuts::Error::Utf8Error(utf8_error) => todo!(), - // peanuts::Error::ParseError(_) => todo!(), - // peanuts::Error::EntityProcessError(_) => todo!(), - // peanuts::Error::InvalidCharRef(_) => todo!(), - // peanuts::Error::DuplicateNameSpaceDeclaration(namespace_declaration) => todo!(), - // peanuts::Error::DuplicateAttribute(_) => todo!(), - // peanuts::Error::UnqualifiedNamespace(_) => todo!(), - // peanuts::Error::MismatchedEndTag(name, name1) => todo!(), - // peanuts::Error::NotInElement(_) => todo!(), - // peanuts::Error::ExtraData(_) => todo!(), - // peanuts::Error::UndeclaredNamespace(_) => todo!(), - // peanuts::Error::IncorrectName(name) => todo!(), - // peanuts::Error::DeserializeError(_) => todo!(), - // peanuts::Error::Deserialize(deserialize_error) => todo!(), - // peanuts::Error::RootElementEnded => todo!(), - // } - // TODO: make sure this only happens when an end tag is received - if self.disconnecting == true { - break; - } else { - let _ = self.on_crash.send(ReadState { supervisor_control: self.supervisor_control, tasks: self.tasks }); - } - break; - }, - } - }, - else => break - } - } - println!("stopping read thread"); - self.logic.on_abort().await; - } -} - -// what do stanza processes do? -// - update ui -// - access database -// - disconnect proper, reconnect -// - respond to server requests - -pub enum ReadControl { - Disconnect, - Abort(oneshot::Sender<ReadState>), -} - -pub struct ReadControlHandle { - sender: mpsc::Sender<ReadControl>, - pub(crate) handle: JoinHandle<()>, -} - -impl Deref for ReadControlHandle { - type Target = mpsc::Sender<ReadControl>; - - fn deref(&self) -> &Self::Target { - &self.sender - } -} - -impl DerefMut for ReadControlHandle { - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.sender - } -} - -impl ReadControlHandle { - pub fn new<Lgc: Clone + Logic + Send + 'static>( - stream: BoundJabberReader<Tls>, - connected: Connected, - logic: Lgc, - supervisor_control: SupervisorSender, - on_crash: oneshot::Sender<ReadState>, - ) -> Self { - let (control_sender, control_receiver) = mpsc::channel(20); - - let actor = Read::new( - stream, - JoinSet::new(), - connected, - logic, - supervisor_control, - control_receiver, - on_crash, - ); - let handle = tokio::spawn(async move { actor.run().await }); - - Self { - sender: control_sender, - handle, - } - } - - pub fn reconnect<Lgc: Clone + Logic + Send + 'static>( - stream: BoundJabberReader<Tls>, - tasks: JoinSet<()>, - connected: Connected, - logic: Lgc, - supervisor_control: SupervisorSender, - on_crash: oneshot::Sender<ReadState>, - ) -> Self { - let (control_sender, control_receiver) = mpsc::channel(20); - - let actor = Read::new( - stream, - tasks, - connected, - logic, - supervisor_control, - control_receiver, - on_crash, - ); - let handle = tokio::spawn(async move { actor.run().await }); - - Self { - sender: control_sender, - handle, - } - } -} |