aboutsummaryrefslogtreecommitdiffstats
path: root/luz/src/connection/read.rs
diff options
context:
space:
mode:
Diffstat (limited to 'luz/src/connection/read.rs')
-rw-r--r--luz/src/connection/read.rs242
1 files changed, 0 insertions, 242 deletions
diff --git a/luz/src/connection/read.rs b/luz/src/connection/read.rs
deleted file mode 100644
index 4e55bc5..0000000
--- a/luz/src/connection/read.rs
+++ /dev/null
@@ -1,242 +0,0 @@
-use std::{
- collections::HashMap,
- marker::PhantomData,
- ops::{Deref, DerefMut},
- str::FromStr,
- sync::Arc,
- time::Duration,
-};
-
-use chrono::{DateTime, Utc};
-use jabber::{connection::Tls, jabber_stream::bound_stream::BoundJabberReader};
-use stanza::client::Stanza;
-use tokio::{
- sync::{mpsc, oneshot, Mutex},
- task::{JoinHandle, JoinSet},
-};
-use tracing::info;
-use uuid::Uuid;
-
-use crate::{
- chat::{Body, Message},
- db::Db,
- error::{Error, IqError, MessageRecvError, PresenceError, ReadError, RosterError},
- presence::{Offline, Online, Presence, PresenceType, Show},
- roster::Contact,
- Connected, Logic, LogicState, UpdateMessage,
-};
-
-use super::{write::WriteHandle, SupervisorCommand, SupervisorSender};
-
-/// read actor
-pub struct Read<Lgc> {
- stream: BoundJabberReader<Tls>,
- disconnecting: bool,
- disconnect_timedout: oneshot::Receiver<()>,
-
- // all the threads spawned by the current connection session
- tasks: JoinSet<()>,
-
- // for handling incoming stanzas
- // jabber server must be able to both terminate the connection from error, and ask for data from the client (such as supported XEPs)
- connected: Connected,
- logic: Lgc,
- supervisor_control: SupervisorSender,
-
- // control stuff
- control_receiver: mpsc::Receiver<ReadControl>,
- on_crash: oneshot::Sender<ReadState>,
-}
-
-/// when a crash/abort occurs, this gets sent back to the supervisor, so that the connection session can continue
-pub struct ReadState {
- pub supervisor_control: SupervisorSender,
- // TODO: when a stream dies, the iq gets from the server should not be replied to on the new stream
- pub tasks: JoinSet<()>,
-}
-
-impl<Lgc> Read<Lgc> {
- fn new(
- stream: BoundJabberReader<Tls>,
- tasks: JoinSet<()>,
- connected: Connected,
- logic: Lgc,
- supervisor_control: SupervisorSender,
- control_receiver: mpsc::Receiver<ReadControl>,
- on_crash: oneshot::Sender<ReadState>,
- ) -> Self {
- let (_send, recv) = oneshot::channel();
- Self {
- stream,
- disconnecting: false,
- disconnect_timedout: recv,
- tasks,
- connected,
- logic,
- supervisor_control,
- control_receiver,
- on_crash,
- }
- }
-}
-
-impl<Lgc: Clone + Logic + Send + 'static> Read<Lgc> {
- async fn run(mut self) {
- println!("started read thread");
- // let stanza = self.stream.read::<Stanza>().await;
- // println!("{:?}", stanza);
- loop {
- tokio::select! {
- // if still haven't received the end tag in time, just kill itself
- // TODO: is this okay??? what if notification thread dies?
- Ok(()) = &mut self.disconnect_timedout => {
- info!("disconnect_timedout");
- break;
- }
- Some(msg) = self.control_receiver.recv() => {
- match msg {
- // when disconnect received,
- ReadControl::Disconnect => {
- let (send, recv) = oneshot::channel();
- self.disconnect_timedout = recv;
- self.disconnecting = true;
- tokio::spawn(async {
- tokio::time::sleep(Duration::from_secs(10)).await;
- let _ = send.send(());
- })
- },
- ReadControl::Abort(sender) => {
- let _ = sender.send(ReadState { supervisor_control: self.supervisor_control, tasks: self.tasks });
- break;
- },
- };
- },
- s = self.stream.read::<Stanza>() => {
- println!("read stanza");
- match s {
- Ok(s) => {
- self.tasks.spawn(self.logic.clone().handle_stanza(s, self.connected.clone(), self.supervisor_control.clone()));
- },
- Err(e) => {
- println!("error: {:?}", e);
- // TODO: NEXT write the correct error stanza depending on error, decide whether to reconnect or properly disconnect, depending on if disconnecting is true
- // match e {
- // peanuts::Error::ReadError(error) => todo!(),
- // peanuts::Error::Utf8Error(utf8_error) => todo!(),
- // peanuts::Error::ParseError(_) => todo!(),
- // peanuts::Error::EntityProcessError(_) => todo!(),
- // peanuts::Error::InvalidCharRef(_) => todo!(),
- // peanuts::Error::DuplicateNameSpaceDeclaration(namespace_declaration) => todo!(),
- // peanuts::Error::DuplicateAttribute(_) => todo!(),
- // peanuts::Error::UnqualifiedNamespace(_) => todo!(),
- // peanuts::Error::MismatchedEndTag(name, name1) => todo!(),
- // peanuts::Error::NotInElement(_) => todo!(),
- // peanuts::Error::ExtraData(_) => todo!(),
- // peanuts::Error::UndeclaredNamespace(_) => todo!(),
- // peanuts::Error::IncorrectName(name) => todo!(),
- // peanuts::Error::DeserializeError(_) => todo!(),
- // peanuts::Error::Deserialize(deserialize_error) => todo!(),
- // peanuts::Error::RootElementEnded => todo!(),
- // }
- // TODO: make sure this only happens when an end tag is received
- if self.disconnecting == true {
- break;
- } else {
- let _ = self.on_crash.send(ReadState { supervisor_control: self.supervisor_control, tasks: self.tasks });
- }
- break;
- },
- }
- },
- else => break
- }
- }
- println!("stopping read thread");
- self.logic.on_abort().await;
- }
-}
-
-// what do stanza processes do?
-// - update ui
-// - access database
-// - disconnect proper, reconnect
-// - respond to server requests
-
-pub enum ReadControl {
- Disconnect,
- Abort(oneshot::Sender<ReadState>),
-}
-
-pub struct ReadControlHandle {
- sender: mpsc::Sender<ReadControl>,
- pub(crate) handle: JoinHandle<()>,
-}
-
-impl Deref for ReadControlHandle {
- type Target = mpsc::Sender<ReadControl>;
-
- fn deref(&self) -> &Self::Target {
- &self.sender
- }
-}
-
-impl DerefMut for ReadControlHandle {
- fn deref_mut(&mut self) -> &mut Self::Target {
- &mut self.sender
- }
-}
-
-impl ReadControlHandle {
- pub fn new<Lgc: Clone + Logic + Send + 'static>(
- stream: BoundJabberReader<Tls>,
- connected: Connected,
- logic: Lgc,
- supervisor_control: SupervisorSender,
- on_crash: oneshot::Sender<ReadState>,
- ) -> Self {
- let (control_sender, control_receiver) = mpsc::channel(20);
-
- let actor = Read::new(
- stream,
- JoinSet::new(),
- connected,
- logic,
- supervisor_control,
- control_receiver,
- on_crash,
- );
- let handle = tokio::spawn(async move { actor.run().await });
-
- Self {
- sender: control_sender,
- handle,
- }
- }
-
- pub fn reconnect<Lgc: Clone + Logic + Send + 'static>(
- stream: BoundJabberReader<Tls>,
- tasks: JoinSet<()>,
- connected: Connected,
- logic: Lgc,
- supervisor_control: SupervisorSender,
- on_crash: oneshot::Sender<ReadState>,
- ) -> Self {
- let (control_sender, control_receiver) = mpsc::channel(20);
-
- let actor = Read::new(
- stream,
- tasks,
- connected,
- logic,
- supervisor_control,
- control_receiver,
- on_crash,
- );
- let handle = tokio::spawn(async move { actor.run().await });
-
- Self {
- sender: control_sender,
- handle,
- }
- }
-}