aboutsummaryrefslogtreecommitdiffstats
path: root/luz/src/connection/read.rs
diff options
context:
space:
mode:
Diffstat (limited to 'luz/src/connection/read.rs')
-rw-r--r--luz/src/connection/read.rs120
1 files changed, 120 insertions, 0 deletions
diff --git a/luz/src/connection/read.rs b/luz/src/connection/read.rs
new file mode 100644
index 0000000..7800d56
--- /dev/null
+++ b/luz/src/connection/read.rs
@@ -0,0 +1,120 @@
+use jabber::{connection::Tls, jabber_stream::bound_stream::BoundJabberReader};
+use sqlx::SqlitePool;
+use stanza::client::Stanza;
+use tokio::{
+ sync::{mpsc, oneshot},
+ task::{JoinHandle, JoinSet},
+};
+
+use crate::UpdateMessage;
+
+use super::{
+ write::{WriteHandle, WriteMessage},
+ SupervisorCommand,
+};
+
+pub struct Read {
+ // TODO: place iq hashmap here
+ control_receiver: mpsc::Receiver<ReadControl>,
+ stream: BoundJabberReader<Tls>,
+ on_crash: oneshot::Sender<(SqlitePool, mpsc::Sender<UpdateMessage>, JoinSet<()>)>,
+ db: SqlitePool,
+ update_sender: mpsc::Sender<UpdateMessage>,
+ supervisor_control: mpsc::Sender<SupervisorCommand>,
+ write_handle: WriteHandle,
+ tasks: JoinSet<()>,
+}
+
+impl Read {
+ fn new(
+ control_receiver: mpsc::Receiver<ReadControl>,
+ stream: BoundJabberReader<Tls>,
+ on_crash: oneshot::Sender<(SqlitePool, mpsc::Sender<UpdateMessage>, JoinSet<()>)>,
+ db: SqlitePool,
+ update_sender: mpsc::Sender<UpdateMessage>,
+ // jabber server must be able to both terminate the connection from error, and ask for data from the client (such as supported XEPs)
+ supervisor_control: mpsc::Sender<SupervisorCommand>,
+ write_sender: WriteHandle,
+ ) -> Self {
+ Self {
+ control_receiver,
+ stream,
+ on_crash,
+ db,
+ update_sender,
+ supervisor_control,
+ write_handle: write_sender,
+ tasks: JoinSet::new(),
+ }
+ }
+
+ async fn run(mut self) {
+ loop {
+ tokio::select! {
+ Some(msg) = self.control_receiver.recv() => {
+ match msg {
+ ReadControl::Disconnect => todo!(),
+ ReadControl::Abort(sender) => todo!(),
+ };
+ },
+ stanza = self.stream.read::<Stanza>() => {
+ match stanza {
+ Ok(_) => todo!(),
+ Err(_) => todo!(),
+ }
+ self.tasks.spawn();
+ },
+ else => break
+ }
+ }
+ }
+}
+
+trait Task {
+ async fn handle();
+}
+
+impl Task for Stanza {
+ async fn handle() {
+ todo!()
+ }
+}
+
+enum ReadControl {
+ Disconnect,
+ Abort(oneshot::Sender<mpsc::Receiver<WriteMessage>>),
+}
+
+pub struct ReadControlHandle {
+ sender: mpsc::Sender<ReadControl>,
+ handle: JoinHandle<()>,
+}
+
+impl ReadControlHandle {
+ pub fn new(
+ stream: BoundJabberReader<Tls>,
+ on_crash: oneshot::Sender<(SqlitePool, mpsc::Sender<UpdateMessage>, JoinSet<()>)>,
+ db: SqlitePool,
+ sender: mpsc::Sender<UpdateMessage>,
+ supervisor_control: mpsc::Sender<SupervisorCommand>,
+ jabber_write: WriteHandle,
+ ) -> Self {
+ let (control_sender, control_receiver) = mpsc::channel(20);
+
+ let actor = Read::new(
+ control_receiver,
+ stream,
+ on_crash,
+ db,
+ sender,
+ supervisor_control,
+ jabber_write,
+ );
+ let handle = tokio::spawn(async move { actor.run().await });
+
+ Self {
+ sender: control_sender,
+ handle,
+ }
+ }
+}