aboutsummaryrefslogtreecommitdiffstats
path: root/lampada/src/connection/read.rs
diff options
context:
space:
mode:
Diffstat (limited to 'lampada/src/connection/read.rs')
-rw-r--r--lampada/src/connection/read.rs60
1 files changed, 34 insertions, 26 deletions
diff --git a/lampada/src/connection/read.rs b/lampada/src/connection/read.rs
index cc69387..640ca8e 100644
--- a/lampada/src/connection/read.rs
+++ b/lampada/src/connection/read.rs
@@ -9,13 +9,15 @@ use std::{
use luz::{connection::Tls, jabber_stream::bound_stream::BoundJabberReader};
use stanza::client::Stanza;
+use stanza::stream::Error as StreamErrorStanza;
+use stanza::stream_error::Error as StreamError;
use tokio::{
sync::{mpsc, oneshot, Mutex},
task::{JoinHandle, JoinSet},
};
use tracing::info;
-use crate::{Connected, Logic};
+use crate::{Connected, Logic, WriteMessage};
use super::{write::WriteHandle, SupervisorCommand, SupervisorSender};
@@ -36,7 +38,7 @@ pub struct Read<Lgc> {
// control stuff
control_receiver: mpsc::Receiver<ReadControl>,
- on_crash: oneshot::Sender<ReadState>,
+ on_crash: oneshot::Sender<(Option<StreamError>, ReadState)>,
}
/// when a crash/abort occurs, this gets sent back to the supervisor, so that the connection session can continue
@@ -54,7 +56,7 @@ impl<Lgc> Read<Lgc> {
logic: Lgc,
supervisor_control: SupervisorSender,
control_receiver: mpsc::Receiver<ReadControl>,
- on_crash: oneshot::Sender<ReadState>,
+ on_crash: oneshot::Sender<(Option<StreamError>, ReadState)>,
) -> Self {
let (_send, recv) = oneshot::channel();
Self {
@@ -106,34 +108,40 @@ impl<Lgc: Clone + Logic + Send + 'static> Read<Lgc> {
println!("read stanza");
match s {
Ok(s) => {
- self.tasks.spawn(self.logic.clone().handle_stanza(s, self.connected.clone(), self.supervisor_control.clone()));
+ match s {
+ Stanza::Error(error) => {
+ self.logic.clone().handle_stream_error(error).await;
+ self.supervisor_control.send(SupervisorCommand::Reconnect(ReadState { supervisor_control: self.supervisor_control.clone(), tasks: self.tasks })).await;
+ break;
+ },
+ _ => {
+ self.tasks.spawn(self.logic.clone().handle_stanza(s, self.connected.clone()));
+ }
+ };
},
Err(e) => {
println!("error: {:?}", e);
- // TODO: NEXT write the correct error stanza depending on error, decide whether to reconnect or properly disconnect, depending on if disconnecting is true
- // match e {
- // peanuts::Error::ReadError(error) => todo!(),
- // peanuts::Error::Utf8Error(utf8_error) => todo!(),
- // peanuts::Error::ParseError(_) => todo!(),
- // peanuts::Error::EntityProcessError(_) => todo!(),
- // peanuts::Error::InvalidCharRef(_) => todo!(),
- // peanuts::Error::DuplicateNameSpaceDeclaration(namespace_declaration) => todo!(),
- // peanuts::Error::DuplicateAttribute(_) => todo!(),
- // peanuts::Error::UnqualifiedNamespace(_) => todo!(),
- // peanuts::Error::MismatchedEndTag(name, name1) => todo!(),
- // peanuts::Error::NotInElement(_) => todo!(),
- // peanuts::Error::ExtraData(_) => todo!(),
- // peanuts::Error::UndeclaredNamespace(_) => todo!(),
- // peanuts::Error::IncorrectName(name) => todo!(),
- // peanuts::Error::DeserializeError(_) => todo!(),
- // peanuts::Error::Deserialize(deserialize_error) => todo!(),
- // peanuts::Error::RootElementEnded => todo!(),
- // }
// TODO: make sure this only happens when an end tag is received
if self.disconnecting == true {
break;
} else {
- let _ = self.on_crash.send(ReadState { supervisor_control: self.supervisor_control, tasks: self.tasks });
+ let stream_error = match e {
+ peanuts::Error::ReadError(error) => None,
+ peanuts::Error::Utf8Error(utf8_error) => Some(StreamError::UnsupportedEncoding),
+ peanuts::Error::ParseError(_) => Some(StreamError::BadFormat),
+ peanuts::Error::EntityProcessError(_) => Some(StreamError::RestrictedXml),
+ peanuts::Error::InvalidCharRef(char_ref_error) => Some(StreamError::UnsupportedEncoding),
+ peanuts::Error::DuplicateNameSpaceDeclaration(namespace_declaration) => Some(StreamError::NotWellFormed),
+ peanuts::Error::DuplicateAttribute(_) => Some(StreamError::NotWellFormed),
+ peanuts::Error::MismatchedEndTag(name, name1) => Some(StreamError::NotWellFormed),
+ peanuts::Error::NotInElement(_) => Some(StreamError::InvalidXml),
+ peanuts::Error::ExtraData(_) => None,
+ peanuts::Error::UndeclaredNamespace(_) => Some(StreamError::InvalidNamespace),
+ peanuts::Error::Deserialize(deserialize_error) => Some(StreamError::InvalidXml),
+ peanuts::Error::RootElementEnded => Some(StreamError::InvalidXml),
+ };
+
+ let _ = self.on_crash.send((stream_error, ReadState { supervisor_control: self.supervisor_control, tasks: self.tasks }));
}
break;
},
@@ -183,7 +191,7 @@ impl ReadControlHandle {
connected: Connected,
logic: Lgc,
supervisor_control: SupervisorSender,
- on_crash: oneshot::Sender<ReadState>,
+ on_crash: oneshot::Sender<(Option<StreamError>, ReadState)>,
) -> Self {
let (control_sender, control_receiver) = mpsc::channel(20);
@@ -210,7 +218,7 @@ impl ReadControlHandle {
connected: Connected,
logic: Lgc,
supervisor_control: SupervisorSender,
- on_crash: oneshot::Sender<ReadState>,
+ on_crash: oneshot::Sender<(Option<StreamError>, ReadState)>,
) -> Self {
let (control_sender, control_receiver) = mpsc::channel(20);