use std::{ collections::HashMap, marker::PhantomData, ops::{Deref, DerefMut}, str::FromStr, sync::Arc, time::Duration, }; use luz::{connection::Tls, jabber_stream::bound_stream::BoundJabberReader}; use stanza::client::Stanza; use tokio::{ sync::{mpsc, oneshot, Mutex}, task::{JoinHandle, JoinSet}, }; use tracing::info; use crate::{Connected, Logic}; use super::{write::WriteHandle, SupervisorCommand, SupervisorSender}; /// read actor pub struct Read { stream: BoundJabberReader, disconnecting: bool, disconnect_timedout: oneshot::Receiver<()>, // all the threads spawned by the current connection session tasks: JoinSet<()>, // for handling incoming stanzas // jabber server must be able to both terminate the connection from error, and ask for data from the client (such as supported XEPs) connected: Connected, logic: Lgc, supervisor_control: SupervisorSender, // control stuff control_receiver: mpsc::Receiver, on_crash: oneshot::Sender, } /// when a crash/abort occurs, this gets sent back to the supervisor, so that the connection session can continue pub struct ReadState { pub supervisor_control: SupervisorSender, // TODO: when a stream dies, the iq gets from the server should not be replied to on the new stream pub tasks: JoinSet<()>, } impl Read { fn new( stream: BoundJabberReader, tasks: JoinSet<()>, connected: Connected, logic: Lgc, supervisor_control: SupervisorSender, control_receiver: mpsc::Receiver, on_crash: oneshot::Sender, ) -> Self { let (_send, recv) = oneshot::channel(); Self { stream, disconnecting: false, disconnect_timedout: recv, tasks, connected, logic, supervisor_control, control_receiver, on_crash, } } } impl Read { async fn run(mut self) { println!("started read thread"); // let stanza = self.stream.read::().await; // println!("{:?}", stanza); loop { tokio::select! { // if still haven't received the end tag in time, just kill itself // TODO: is this okay??? what if notification thread dies? Ok(()) = &mut self.disconnect_timedout => { info!("disconnect_timedout"); break; } Some(msg) = self.control_receiver.recv() => { match msg { // when disconnect received, ReadControl::Disconnect => { let (send, recv) = oneshot::channel(); self.disconnect_timedout = recv; self.disconnecting = true; tokio::spawn(async { tokio::time::sleep(Duration::from_secs(10)).await; let _ = send.send(()); }) }, ReadControl::Abort(sender) => { let _ = sender.send(ReadState { supervisor_control: self.supervisor_control, tasks: self.tasks }); break; }, }; }, s = self.stream.read::() => { println!("read stanza"); match s { Ok(s) => { self.tasks.spawn(self.logic.clone().handle_stanza(s, self.connected.clone(), self.supervisor_control.clone())); }, Err(e) => { println!("error: {:?}", e); // TODO: NEXT write the correct error stanza depending on error, decide whether to reconnect or properly disconnect, depending on if disconnecting is true // match e { // peanuts::Error::ReadError(error) => todo!(), // peanuts::Error::Utf8Error(utf8_error) => todo!(), // peanuts::Error::ParseError(_) => todo!(), // peanuts::Error::EntityProcessError(_) => todo!(), // peanuts::Error::InvalidCharRef(_) => todo!(), // peanuts::Error::DuplicateNameSpaceDeclaration(namespace_declaration) => todo!(), // peanuts::Error::DuplicateAttribute(_) => todo!(), // peanuts::Error::UnqualifiedNamespace(_) => todo!(), // peanuts::Error::MismatchedEndTag(name, name1) => todo!(), // peanuts::Error::NotInElement(_) => todo!(), // peanuts::Error::ExtraData(_) => todo!(), // peanuts::Error::UndeclaredNamespace(_) => todo!(), // peanuts::Error::IncorrectName(name) => todo!(), // peanuts::Error::DeserializeError(_) => todo!(), // peanuts::Error::Deserialize(deserialize_error) => todo!(), // peanuts::Error::RootElementEnded => todo!(), // } // TODO: make sure this only happens when an end tag is received if self.disconnecting == true { break; } else { let _ = self.on_crash.send(ReadState { supervisor_control: self.supervisor_control, tasks: self.tasks }); } break; }, } }, else => break } } println!("stopping read thread"); self.logic.on_abort().await; } } // what do stanza processes do? // - update ui // - access database // - disconnect proper, reconnect // - respond to server requests pub enum ReadControl { Disconnect, Abort(oneshot::Sender), } pub struct ReadControlHandle { sender: mpsc::Sender, pub(crate) handle: JoinHandle<()>, } impl Deref for ReadControlHandle { type Target = mpsc::Sender; fn deref(&self) -> &Self::Target { &self.sender } } impl DerefMut for ReadControlHandle { fn deref_mut(&mut self) -> &mut Self::Target { &mut self.sender } } impl ReadControlHandle { pub fn new( stream: BoundJabberReader, connected: Connected, logic: Lgc, supervisor_control: SupervisorSender, on_crash: oneshot::Sender, ) -> Self { let (control_sender, control_receiver) = mpsc::channel(20); let actor = Read::new( stream, JoinSet::new(), connected, logic, supervisor_control, control_receiver, on_crash, ); let handle = tokio::spawn(async move { actor.run().await }); Self { sender: control_sender, handle, } } pub fn reconnect( stream: BoundJabberReader, tasks: JoinSet<()>, connected: Connected, logic: Lgc, supervisor_control: SupervisorSender, on_crash: oneshot::Sender, ) -> Self { let (control_sender, control_receiver) = mpsc::channel(20); let actor = Read::new( stream, tasks, connected, logic, supervisor_control, control_receiver, on_crash, ); let handle = tokio::spawn(async move { actor.run().await }); Self { sender: control_sender, handle, } } }