aboutsummaryrefslogtreecommitdiffstats
path: root/lampada/src/connection/read.rs
diff options
context:
space:
mode:
Diffstat (limited to 'lampada/src/connection/read.rs')
-rw-r--r--lampada/src/connection/read.rs233
1 files changed, 233 insertions, 0 deletions
diff --git a/lampada/src/connection/read.rs b/lampada/src/connection/read.rs
new file mode 100644
index 0000000..cc69387
--- /dev/null
+++ b/lampada/src/connection/read.rs
@@ -0,0 +1,233 @@
+use std::{
+ collections::HashMap,
+ marker::PhantomData,
+ ops::{Deref, DerefMut},
+ str::FromStr,
+ sync::Arc,
+ time::Duration,
+};
+
+use luz::{connection::Tls, jabber_stream::bound_stream::BoundJabberReader};
+use stanza::client::Stanza;
+use tokio::{
+ sync::{mpsc, oneshot, Mutex},
+ task::{JoinHandle, JoinSet},
+};
+use tracing::info;
+
+use crate::{Connected, Logic};
+
+use super::{write::WriteHandle, SupervisorCommand, SupervisorSender};
+
+/// read actor
+pub struct Read<Lgc> {
+ stream: BoundJabberReader<Tls>,
+ disconnecting: bool,
+ disconnect_timedout: oneshot::Receiver<()>,
+
+ // all the threads spawned by the current connection session
+ tasks: JoinSet<()>,
+
+ // for handling incoming stanzas
+ // jabber server must be able to both terminate the connection from error, and ask for data from the client (such as supported XEPs)
+ connected: Connected,
+ logic: Lgc,
+ supervisor_control: SupervisorSender,
+
+ // control stuff
+ control_receiver: mpsc::Receiver<ReadControl>,
+ on_crash: oneshot::Sender<ReadState>,
+}
+
+/// when a crash/abort occurs, this gets sent back to the supervisor, so that the connection session can continue
+pub struct ReadState {
+ pub supervisor_control: SupervisorSender,
+ // TODO: when a stream dies, the iq gets from the server should not be replied to on the new stream
+ pub tasks: JoinSet<()>,
+}
+
+impl<Lgc> Read<Lgc> {
+ fn new(
+ stream: BoundJabberReader<Tls>,
+ tasks: JoinSet<()>,
+ connected: Connected,
+ logic: Lgc,
+ supervisor_control: SupervisorSender,
+ control_receiver: mpsc::Receiver<ReadControl>,
+ on_crash: oneshot::Sender<ReadState>,
+ ) -> Self {
+ let (_send, recv) = oneshot::channel();
+ Self {
+ stream,
+ disconnecting: false,
+ disconnect_timedout: recv,
+ tasks,
+ connected,
+ logic,
+ supervisor_control,
+ control_receiver,
+ on_crash,
+ }
+ }
+}
+
+impl<Lgc: Clone + Logic + Send + 'static> Read<Lgc> {
+ async fn run(mut self) {
+ println!("started read thread");
+ // let stanza = self.stream.read::<Stanza>().await;
+ // println!("{:?}", stanza);
+ loop {
+ tokio::select! {
+ // if still haven't received the end tag in time, just kill itself
+ // TODO: is this okay??? what if notification thread dies?
+ Ok(()) = &mut self.disconnect_timedout => {
+ info!("disconnect_timedout");
+ break;
+ }
+ Some(msg) = self.control_receiver.recv() => {
+ match msg {
+ // when disconnect received,
+ ReadControl::Disconnect => {
+ let (send, recv) = oneshot::channel();
+ self.disconnect_timedout = recv;
+ self.disconnecting = true;
+ tokio::spawn(async {
+ tokio::time::sleep(Duration::from_secs(10)).await;
+ let _ = send.send(());
+ })
+ },
+ ReadControl::Abort(sender) => {
+ let _ = sender.send(ReadState { supervisor_control: self.supervisor_control, tasks: self.tasks });
+ break;
+ },
+ };
+ },
+ s = self.stream.read::<Stanza>() => {
+ println!("read stanza");
+ match s {
+ Ok(s) => {
+ self.tasks.spawn(self.logic.clone().handle_stanza(s, self.connected.clone(), self.supervisor_control.clone()));
+ },
+ Err(e) => {
+ println!("error: {:?}", e);
+ // TODO: NEXT write the correct error stanza depending on error, decide whether to reconnect or properly disconnect, depending on if disconnecting is true
+ // match e {
+ // peanuts::Error::ReadError(error) => todo!(),
+ // peanuts::Error::Utf8Error(utf8_error) => todo!(),
+ // peanuts::Error::ParseError(_) => todo!(),
+ // peanuts::Error::EntityProcessError(_) => todo!(),
+ // peanuts::Error::InvalidCharRef(_) => todo!(),
+ // peanuts::Error::DuplicateNameSpaceDeclaration(namespace_declaration) => todo!(),
+ // peanuts::Error::DuplicateAttribute(_) => todo!(),
+ // peanuts::Error::UnqualifiedNamespace(_) => todo!(),
+ // peanuts::Error::MismatchedEndTag(name, name1) => todo!(),
+ // peanuts::Error::NotInElement(_) => todo!(),
+ // peanuts::Error::ExtraData(_) => todo!(),
+ // peanuts::Error::UndeclaredNamespace(_) => todo!(),
+ // peanuts::Error::IncorrectName(name) => todo!(),
+ // peanuts::Error::DeserializeError(_) => todo!(),
+ // peanuts::Error::Deserialize(deserialize_error) => todo!(),
+ // peanuts::Error::RootElementEnded => todo!(),
+ // }
+ // TODO: make sure this only happens when an end tag is received
+ if self.disconnecting == true {
+ break;
+ } else {
+ let _ = self.on_crash.send(ReadState { supervisor_control: self.supervisor_control, tasks: self.tasks });
+ }
+ break;
+ },
+ }
+ },
+ else => break
+ }
+ }
+ println!("stopping read thread");
+ self.logic.on_abort().await;
+ }
+}
+
+// what do stanza processes do?
+// - update ui
+// - access database
+// - disconnect proper, reconnect
+// - respond to server requests
+
+pub enum ReadControl {
+ Disconnect,
+ Abort(oneshot::Sender<ReadState>),
+}
+
+pub struct ReadControlHandle {
+ sender: mpsc::Sender<ReadControl>,
+ pub(crate) handle: JoinHandle<()>,
+}
+
+impl Deref for ReadControlHandle {
+ type Target = mpsc::Sender<ReadControl>;
+
+ fn deref(&self) -> &Self::Target {
+ &self.sender
+ }
+}
+
+impl DerefMut for ReadControlHandle {
+ fn deref_mut(&mut self) -> &mut Self::Target {
+ &mut self.sender
+ }
+}
+
+impl ReadControlHandle {
+ pub fn new<Lgc: Clone + Logic + Send + 'static>(
+ stream: BoundJabberReader<Tls>,
+ connected: Connected,
+ logic: Lgc,
+ supervisor_control: SupervisorSender,
+ on_crash: oneshot::Sender<ReadState>,
+ ) -> Self {
+ let (control_sender, control_receiver) = mpsc::channel(20);
+
+ let actor = Read::new(
+ stream,
+ JoinSet::new(),
+ connected,
+ logic,
+ supervisor_control,
+ control_receiver,
+ on_crash,
+ );
+ let handle = tokio::spawn(async move { actor.run().await });
+
+ Self {
+ sender: control_sender,
+ handle,
+ }
+ }
+
+ pub fn reconnect<Lgc: Clone + Logic + Send + 'static>(
+ stream: BoundJabberReader<Tls>,
+ tasks: JoinSet<()>,
+ connected: Connected,
+ logic: Lgc,
+ supervisor_control: SupervisorSender,
+ on_crash: oneshot::Sender<ReadState>,
+ ) -> Self {
+ let (control_sender, control_receiver) = mpsc::channel(20);
+
+ let actor = Read::new(
+ stream,
+ tasks,
+ connected,
+ logic,
+ supervisor_control,
+ control_receiver,
+ on_crash,
+ );
+ let handle = tokio::spawn(async move { actor.run().await });
+
+ Self {
+ sender: control_sender,
+ handle,
+ }
+ }
+}