diff options
Diffstat (limited to 'luz/src/connection/read.rs')
-rw-r--r-- | luz/src/connection/read.rs | 26 |
1 files changed, 23 insertions, 3 deletions
diff --git a/luz/src/connection/read.rs b/luz/src/connection/read.rs index edc6cdb..c1e37b4 100644 --- a/luz/src/connection/read.rs +++ b/luz/src/connection/read.rs @@ -1,5 +1,7 @@ use std::{ + collections::HashMap, ops::{Deref, DerefMut}, + sync::Arc, time::Duration, }; @@ -7,7 +9,7 @@ use jabber::{connection::Tls, jabber_stream::bound_stream::BoundJabberReader}; use sqlx::SqlitePool; use stanza::client::Stanza; use tokio::{ - sync::{mpsc, oneshot}, + sync::{mpsc, oneshot, Mutex}, task::{JoinHandle, JoinSet}, }; @@ -28,6 +30,7 @@ pub struct Read { JoinSet<()>, mpsc::Sender<SupervisorCommand>, WriteHandle, + Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Error>>>>>, )>, db: SqlitePool, update_sender: mpsc::Sender<UpdateMessage>, @@ -36,6 +39,8 @@ pub struct Read { tasks: JoinSet<()>, disconnecting: bool, disconnect_timedout: oneshot::Receiver<()>, + // TODO: use proper stanza ids + pending_iqs: Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Error>>>>>, } impl Read { @@ -48,6 +53,7 @@ impl Read { JoinSet<()>, mpsc::Sender<SupervisorCommand>, WriteHandle, + Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Error>>>>>, )>, db: SqlitePool, update_sender: mpsc::Sender<UpdateMessage>, @@ -55,6 +61,7 @@ impl Read { supervisor_control: mpsc::Sender<SupervisorCommand>, write_handle: WriteHandle, tasks: JoinSet<()>, + pending_iqs: Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Error>>>>>, ) -> Self { let (send, recv) = oneshot::channel(); Self { @@ -68,6 +75,7 @@ impl Read { tasks, disconnecting: false, disconnect_timedout: recv, + pending_iqs, } } @@ -91,7 +99,7 @@ impl Read { }) }, ReadControl::Abort(sender) => { - let _ = sender.send((self.db, self.update_sender, self.tasks, self.supervisor_control, self.write_handle)); + let _ = sender.send((self.db, self.update_sender, self.tasks, self.supervisor_control, self.write_handle, self.pending_iqs)); break; }, }; @@ -126,7 +134,7 @@ impl Read { break; } else { // AAAAAAAAAAAAAAAAAAAAA i should really just have this stored in the supervisor and not gaf bout passing these references around - let _ = self.on_crash.send((self.db, self.update_sender, self.tasks, self.supervisor_control, self.write_handle)); + let _ = self.on_crash.send((self.db, self.update_sender, self.tasks, self.supervisor_control, self.write_handle, self.pending_iqs)); } break; }, @@ -134,6 +142,11 @@ impl Read { }, else => break } + // when it aborts, must clear iq map no matter what + let mut iqs = self.pending_iqs.lock().await; + for (_id, sender) in iqs.drain() { + let _ = sender.send(Err(Error::LostConnection)); + } } } } @@ -162,6 +175,7 @@ pub enum ReadControl { JoinSet<()>, mpsc::Sender<SupervisorCommand>, WriteHandle, + Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Error>>>>>, )>, ), } @@ -194,11 +208,13 @@ impl ReadControlHandle { JoinSet<()>, mpsc::Sender<SupervisorCommand>, WriteHandle, + Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Error>>>>>, )>, db: SqlitePool, sender: mpsc::Sender<UpdateMessage>, supervisor_control: mpsc::Sender<SupervisorCommand>, jabber_write: WriteHandle, + pending_iqs: Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Error>>>>>, ) -> Self { let (control_sender, control_receiver) = mpsc::channel(20); @@ -211,6 +227,7 @@ impl ReadControlHandle { supervisor_control, jabber_write, JoinSet::new(), + pending_iqs, ); let handle = tokio::spawn(async move { actor.run().await }); @@ -228,12 +245,14 @@ impl ReadControlHandle { JoinSet<()>, mpsc::Sender<SupervisorCommand>, WriteHandle, + Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Error>>>>>, )>, db: SqlitePool, sender: mpsc::Sender<UpdateMessage>, supervisor_control: mpsc::Sender<SupervisorCommand>, jabber_write: WriteHandle, tasks: JoinSet<()>, + pending_iqs: Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Error>>>>>, ) -> Self { let (control_sender, control_receiver) = mpsc::channel(20); @@ -246,6 +265,7 @@ impl ReadControlHandle { supervisor_control, jabber_write, tasks, + pending_iqs, ); let handle = tokio::spawn(async move { actor.run().await }); |