diff options
Diffstat (limited to 'lampada/src/connection/read.rs')
-rw-r--r-- | lampada/src/connection/read.rs | 60 |
1 files changed, 34 insertions, 26 deletions
diff --git a/lampada/src/connection/read.rs b/lampada/src/connection/read.rs index cc69387..640ca8e 100644 --- a/lampada/src/connection/read.rs +++ b/lampada/src/connection/read.rs @@ -9,13 +9,15 @@ use std::{ use luz::{connection::Tls, jabber_stream::bound_stream::BoundJabberReader}; use stanza::client::Stanza; +use stanza::stream::Error as StreamErrorStanza; +use stanza::stream_error::Error as StreamError; use tokio::{ sync::{mpsc, oneshot, Mutex}, task::{JoinHandle, JoinSet}, }; use tracing::info; -use crate::{Connected, Logic}; +use crate::{Connected, Logic, WriteMessage}; use super::{write::WriteHandle, SupervisorCommand, SupervisorSender}; @@ -36,7 +38,7 @@ pub struct Read<Lgc> { // control stuff control_receiver: mpsc::Receiver<ReadControl>, - on_crash: oneshot::Sender<ReadState>, + on_crash: oneshot::Sender<(Option<StreamError>, ReadState)>, } /// when a crash/abort occurs, this gets sent back to the supervisor, so that the connection session can continue @@ -54,7 +56,7 @@ impl<Lgc> Read<Lgc> { logic: Lgc, supervisor_control: SupervisorSender, control_receiver: mpsc::Receiver<ReadControl>, - on_crash: oneshot::Sender<ReadState>, + on_crash: oneshot::Sender<(Option<StreamError>, ReadState)>, ) -> Self { let (_send, recv) = oneshot::channel(); Self { @@ -106,34 +108,40 @@ impl<Lgc: Clone + Logic + Send + 'static> Read<Lgc> { println!("read stanza"); match s { Ok(s) => { - self.tasks.spawn(self.logic.clone().handle_stanza(s, self.connected.clone(), self.supervisor_control.clone())); + match s { + Stanza::Error(error) => { + self.logic.clone().handle_stream_error(error).await; + self.supervisor_control.send(SupervisorCommand::Reconnect(ReadState { supervisor_control: self.supervisor_control.clone(), tasks: self.tasks })).await; + break; + }, + _ => { + self.tasks.spawn(self.logic.clone().handle_stanza(s, self.connected.clone())); + } + }; }, Err(e) => { println!("error: {:?}", e); - // TODO: NEXT write the correct error stanza depending on error, decide whether to reconnect or properly disconnect, depending on if disconnecting is true - // match e { - // peanuts::Error::ReadError(error) => todo!(), - // peanuts::Error::Utf8Error(utf8_error) => todo!(), - // peanuts::Error::ParseError(_) => todo!(), - // peanuts::Error::EntityProcessError(_) => todo!(), - // peanuts::Error::InvalidCharRef(_) => todo!(), - // peanuts::Error::DuplicateNameSpaceDeclaration(namespace_declaration) => todo!(), - // peanuts::Error::DuplicateAttribute(_) => todo!(), - // peanuts::Error::UnqualifiedNamespace(_) => todo!(), - // peanuts::Error::MismatchedEndTag(name, name1) => todo!(), - // peanuts::Error::NotInElement(_) => todo!(), - // peanuts::Error::ExtraData(_) => todo!(), - // peanuts::Error::UndeclaredNamespace(_) => todo!(), - // peanuts::Error::IncorrectName(name) => todo!(), - // peanuts::Error::DeserializeError(_) => todo!(), - // peanuts::Error::Deserialize(deserialize_error) => todo!(), - // peanuts::Error::RootElementEnded => todo!(), - // } // TODO: make sure this only happens when an end tag is received if self.disconnecting == true { break; } else { - let _ = self.on_crash.send(ReadState { supervisor_control: self.supervisor_control, tasks: self.tasks }); + let stream_error = match e { + peanuts::Error::ReadError(error) => None, + peanuts::Error::Utf8Error(utf8_error) => Some(StreamError::UnsupportedEncoding), + peanuts::Error::ParseError(_) => Some(StreamError::BadFormat), + peanuts::Error::EntityProcessError(_) => Some(StreamError::RestrictedXml), + peanuts::Error::InvalidCharRef(char_ref_error) => Some(StreamError::UnsupportedEncoding), + peanuts::Error::DuplicateNameSpaceDeclaration(namespace_declaration) => Some(StreamError::NotWellFormed), + peanuts::Error::DuplicateAttribute(_) => Some(StreamError::NotWellFormed), + peanuts::Error::MismatchedEndTag(name, name1) => Some(StreamError::NotWellFormed), + peanuts::Error::NotInElement(_) => Some(StreamError::InvalidXml), + peanuts::Error::ExtraData(_) => None, + peanuts::Error::UndeclaredNamespace(_) => Some(StreamError::InvalidNamespace), + peanuts::Error::Deserialize(deserialize_error) => Some(StreamError::InvalidXml), + peanuts::Error::RootElementEnded => Some(StreamError::InvalidXml), + }; + + let _ = self.on_crash.send((stream_error, ReadState { supervisor_control: self.supervisor_control, tasks: self.tasks })); } break; }, @@ -183,7 +191,7 @@ impl ReadControlHandle { connected: Connected, logic: Lgc, supervisor_control: SupervisorSender, - on_crash: oneshot::Sender<ReadState>, + on_crash: oneshot::Sender<(Option<StreamError>, ReadState)>, ) -> Self { let (control_sender, control_receiver) = mpsc::channel(20); @@ -210,7 +218,7 @@ impl ReadControlHandle { connected: Connected, logic: Lgc, supervisor_control: SupervisorSender, - on_crash: oneshot::Sender<ReadState>, + on_crash: oneshot::Sender<(Option<StreamError>, ReadState)>, ) -> Self { let (control_sender, control_receiver) = mpsc::channel(20); |