aboutsummaryrefslogtreecommitdiffstats
path: root/luz/src/connection/read.rs
diff options
context:
space:
mode:
Diffstat (limited to 'luz/src/connection/read.rs')
-rw-r--r--luz/src/connection/read.rs183
1 files changed, 160 insertions, 23 deletions
diff --git a/luz/src/connection/read.rs b/luz/src/connection/read.rs
index 7800d56..edc6cdb 100644
--- a/luz/src/connection/read.rs
+++ b/luz/src/connection/read.rs
@@ -1,3 +1,8 @@
+use std::{
+ ops::{Deref, DerefMut},
+ time::Duration,
+};
+
use jabber::{connection::Tls, jabber_stream::bound_stream::BoundJabberReader};
use sqlx::SqlitePool;
use stanza::client::Stanza;
@@ -6,7 +11,7 @@ use tokio::{
task::{JoinHandle, JoinSet},
};
-use crate::UpdateMessage;
+use crate::{error::Error, UpdateMessage};
use super::{
write::{WriteHandle, WriteMessage},
@@ -17,25 +22,41 @@ pub struct Read {
// TODO: place iq hashmap here
control_receiver: mpsc::Receiver<ReadControl>,
stream: BoundJabberReader<Tls>,
- on_crash: oneshot::Sender<(SqlitePool, mpsc::Sender<UpdateMessage>, JoinSet<()>)>,
+ on_crash: oneshot::Sender<(
+ SqlitePool,
+ mpsc::Sender<UpdateMessage>,
+ JoinSet<()>,
+ mpsc::Sender<SupervisorCommand>,
+ WriteHandle,
+ )>,
db: SqlitePool,
update_sender: mpsc::Sender<UpdateMessage>,
supervisor_control: mpsc::Sender<SupervisorCommand>,
write_handle: WriteHandle,
tasks: JoinSet<()>,
+ disconnecting: bool,
+ disconnect_timedout: oneshot::Receiver<()>,
}
impl Read {
fn new(
control_receiver: mpsc::Receiver<ReadControl>,
stream: BoundJabberReader<Tls>,
- on_crash: oneshot::Sender<(SqlitePool, mpsc::Sender<UpdateMessage>, JoinSet<()>)>,
+ on_crash: oneshot::Sender<(
+ SqlitePool,
+ mpsc::Sender<UpdateMessage>,
+ JoinSet<()>,
+ mpsc::Sender<SupervisorCommand>,
+ WriteHandle,
+ )>,
db: SqlitePool,
update_sender: mpsc::Sender<UpdateMessage>,
// jabber server must be able to both terminate the connection from error, and ask for data from the client (such as supported XEPs)
supervisor_control: mpsc::Sender<SupervisorCommand>,
- write_sender: WriteHandle,
+ write_handle: WriteHandle,
+ tasks: JoinSet<()>,
) -> Self {
+ let (send, recv) = oneshot::channel();
Self {
control_receiver,
stream,
@@ -43,26 +64,73 @@ impl Read {
db,
update_sender,
supervisor_control,
- write_handle: write_sender,
- tasks: JoinSet::new(),
+ write_handle,
+ tasks,
+ disconnecting: false,
+ disconnect_timedout: recv,
}
}
async fn run(mut self) {
loop {
tokio::select! {
+ // if still haven't received the end tag in time, just kill itself
+ _ = &mut self.disconnect_timedout => {
+ break;
+ }
Some(msg) = self.control_receiver.recv() => {
match msg {
- ReadControl::Disconnect => todo!(),
- ReadControl::Abort(sender) => todo!(),
+ // when disconnect received,
+ ReadControl::Disconnect => {
+ let (send, recv) = oneshot::channel();
+ self.disconnect_timedout = recv;
+ self.disconnecting = true;
+ tokio::spawn(async {
+ tokio::time::sleep(Duration::from_secs(10)).await;
+ let _ = send.send(());
+ })
+ },
+ ReadControl::Abort(sender) => {
+ let _ = sender.send((self.db, self.update_sender, self.tasks, self.supervisor_control, self.write_handle));
+ break;
+ },
};
},
stanza = self.stream.read::<Stanza>() => {
match stanza {
- Ok(_) => todo!(),
- Err(_) => todo!(),
+ Ok(s) => {
+ self.tasks.spawn(handle_stanza(s, self.update_sender.clone(), self.db.clone(), self.supervisor_control.clone(), self.write_handle.clone()));
+ },
+ Err(e) => {
+ // TODO: NEXT write the correct error stanza depending on error, decide whether to reconnect or properly disconnect, depending on if disconnecting is true
+ // match e {
+ // peanuts::Error::ReadError(error) => todo!(),
+ // peanuts::Error::Utf8Error(utf8_error) => todo!(),
+ // peanuts::Error::ParseError(_) => todo!(),
+ // peanuts::Error::EntityProcessError(_) => todo!(),
+ // peanuts::Error::InvalidCharRef(_) => todo!(),
+ // peanuts::Error::DuplicateNameSpaceDeclaration(namespace_declaration) => todo!(),
+ // peanuts::Error::DuplicateAttribute(_) => todo!(),
+ // peanuts::Error::UnqualifiedNamespace(_) => todo!(),
+ // peanuts::Error::MismatchedEndTag(name, name1) => todo!(),
+ // peanuts::Error::NotInElement(_) => todo!(),
+ // peanuts::Error::ExtraData(_) => todo!(),
+ // peanuts::Error::UndeclaredNamespace(_) => todo!(),
+ // peanuts::Error::IncorrectName(name) => todo!(),
+ // peanuts::Error::DeserializeError(_) => todo!(),
+ // peanuts::Error::Deserialize(deserialize_error) => todo!(),
+ // peanuts::Error::RootElementEnded => todo!(),
+ // }
+ // TODO: make sure this only happens when an end tag is received
+ if self.disconnecting == true {
+ break;
+ } else {
+ // AAAAAAAAAAAAAAAAAAAAA i should really just have this stored in the supervisor and not gaf bout passing these references around
+ let _ = self.on_crash.send((self.db, self.update_sender, self.tasks, self.supervisor_control, self.write_handle));
+ }
+ break;
+ },
}
- self.tasks.spawn();
},
else => break
}
@@ -70,34 +138,102 @@ impl Read {
}
}
-trait Task {
- async fn handle();
-}
-
-impl Task for Stanza {
- async fn handle() {
- todo!()
- }
+// what do stanza processes do?
+// - update ui
+// - access database
+// - disconnect proper, reconnect
+// - respond to server requests
+async fn handle_stanza(
+ stanza: Stanza,
+ update_sender: mpsc::Sender<UpdateMessage>,
+ db: SqlitePool,
+ supervisor_control: mpsc::Sender<SupervisorCommand>,
+ write_handle: WriteHandle,
+) {
+ todo!()
}
-enum ReadControl {
+pub enum ReadControl {
Disconnect,
- Abort(oneshot::Sender<mpsc::Receiver<WriteMessage>>),
+ Abort(
+ oneshot::Sender<(
+ SqlitePool,
+ mpsc::Sender<UpdateMessage>,
+ JoinSet<()>,
+ mpsc::Sender<SupervisorCommand>,
+ WriteHandle,
+ )>,
+ ),
}
pub struct ReadControlHandle {
sender: mpsc::Sender<ReadControl>,
- handle: JoinHandle<()>,
+ pub(crate) handle: JoinHandle<()>,
+}
+
+impl Deref for ReadControlHandle {
+ type Target = mpsc::Sender<ReadControl>;
+
+ fn deref(&self) -> &Self::Target {
+ &self.sender
+ }
+}
+
+impl DerefMut for ReadControlHandle {
+ fn deref_mut(&mut self) -> &mut Self::Target {
+ &mut self.sender
+ }
}
impl ReadControlHandle {
pub fn new(
stream: BoundJabberReader<Tls>,
- on_crash: oneshot::Sender<(SqlitePool, mpsc::Sender<UpdateMessage>, JoinSet<()>)>,
+ on_crash: oneshot::Sender<(
+ SqlitePool,
+ mpsc::Sender<UpdateMessage>,
+ JoinSet<()>,
+ mpsc::Sender<SupervisorCommand>,
+ WriteHandle,
+ )>,
+ db: SqlitePool,
+ sender: mpsc::Sender<UpdateMessage>,
+ supervisor_control: mpsc::Sender<SupervisorCommand>,
+ jabber_write: WriteHandle,
+ ) -> Self {
+ let (control_sender, control_receiver) = mpsc::channel(20);
+
+ let actor = Read::new(
+ control_receiver,
+ stream,
+ on_crash,
+ db,
+ sender,
+ supervisor_control,
+ jabber_write,
+ JoinSet::new(),
+ );
+ let handle = tokio::spawn(async move { actor.run().await });
+
+ Self {
+ sender: control_sender,
+ handle,
+ }
+ }
+
+ pub fn reconnect(
+ stream: BoundJabberReader<Tls>,
+ on_crash: oneshot::Sender<(
+ SqlitePool,
+ mpsc::Sender<UpdateMessage>,
+ JoinSet<()>,
+ mpsc::Sender<SupervisorCommand>,
+ WriteHandle,
+ )>,
db: SqlitePool,
sender: mpsc::Sender<UpdateMessage>,
supervisor_control: mpsc::Sender<SupervisorCommand>,
jabber_write: WriteHandle,
+ tasks: JoinSet<()>,
) -> Self {
let (control_sender, control_receiver) = mpsc::channel(20);
@@ -109,6 +245,7 @@ impl ReadControlHandle {
sender,
supervisor_control,
jabber_write,
+ tasks,
);
let handle = tokio::spawn(async move { actor.run().await });