diff options
Diffstat (limited to 'luz/src/connection/read.rs')
-rw-r--r-- | luz/src/connection/read.rs | 120 |
1 files changed, 120 insertions, 0 deletions
diff --git a/luz/src/connection/read.rs b/luz/src/connection/read.rs new file mode 100644 index 0000000..7800d56 --- /dev/null +++ b/luz/src/connection/read.rs @@ -0,0 +1,120 @@ +use jabber::{connection::Tls, jabber_stream::bound_stream::BoundJabberReader}; +use sqlx::SqlitePool; +use stanza::client::Stanza; +use tokio::{ + sync::{mpsc, oneshot}, + task::{JoinHandle, JoinSet}, +}; + +use crate::UpdateMessage; + +use super::{ + write::{WriteHandle, WriteMessage}, + SupervisorCommand, +}; + +pub struct Read { + // TODO: place iq hashmap here + control_receiver: mpsc::Receiver<ReadControl>, + stream: BoundJabberReader<Tls>, + on_crash: oneshot::Sender<(SqlitePool, mpsc::Sender<UpdateMessage>, JoinSet<()>)>, + db: SqlitePool, + update_sender: mpsc::Sender<UpdateMessage>, + supervisor_control: mpsc::Sender<SupervisorCommand>, + write_handle: WriteHandle, + tasks: JoinSet<()>, +} + +impl Read { + fn new( + control_receiver: mpsc::Receiver<ReadControl>, + stream: BoundJabberReader<Tls>, + on_crash: oneshot::Sender<(SqlitePool, mpsc::Sender<UpdateMessage>, JoinSet<()>)>, + db: SqlitePool, + update_sender: mpsc::Sender<UpdateMessage>, + // jabber server must be able to both terminate the connection from error, and ask for data from the client (such as supported XEPs) + supervisor_control: mpsc::Sender<SupervisorCommand>, + write_sender: WriteHandle, + ) -> Self { + Self { + control_receiver, + stream, + on_crash, + db, + update_sender, + supervisor_control, + write_handle: write_sender, + tasks: JoinSet::new(), + } + } + + async fn run(mut self) { + loop { + tokio::select! { + Some(msg) = self.control_receiver.recv() => { + match msg { + ReadControl::Disconnect => todo!(), + ReadControl::Abort(sender) => todo!(), + }; + }, + stanza = self.stream.read::<Stanza>() => { + match stanza { + Ok(_) => todo!(), + Err(_) => todo!(), + } + self.tasks.spawn(); + }, + else => break + } + } + } +} + +trait Task { + async fn handle(); +} + +impl Task for Stanza { + async fn handle() { + todo!() + } +} + +enum ReadControl { + Disconnect, + Abort(oneshot::Sender<mpsc::Receiver<WriteMessage>>), +} + +pub struct ReadControlHandle { + sender: mpsc::Sender<ReadControl>, + handle: JoinHandle<()>, +} + +impl ReadControlHandle { + pub fn new( + stream: BoundJabberReader<Tls>, + on_crash: oneshot::Sender<(SqlitePool, mpsc::Sender<UpdateMessage>, JoinSet<()>)>, + db: SqlitePool, + sender: mpsc::Sender<UpdateMessage>, + supervisor_control: mpsc::Sender<SupervisorCommand>, + jabber_write: WriteHandle, + ) -> Self { + let (control_sender, control_receiver) = mpsc::channel(20); + + let actor = Read::new( + control_receiver, + stream, + on_crash, + db, + sender, + supervisor_control, + jabber_write, + ); + let handle = tokio::spawn(async move { actor.run().await }); + + Self { + sender: control_sender, + handle, + } + } +} |