aboutsummaryrefslogtreecommitdiffstats
path: root/luz/src/connection/read.rs
diff options
context:
space:
mode:
Diffstat (limited to 'luz/src/connection/read.rs')
-rw-r--r--luz/src/connection/read.rs26
1 files changed, 23 insertions, 3 deletions
diff --git a/luz/src/connection/read.rs b/luz/src/connection/read.rs
index edc6cdb..c1e37b4 100644
--- a/luz/src/connection/read.rs
+++ b/luz/src/connection/read.rs
@@ -1,5 +1,7 @@
use std::{
+ collections::HashMap,
ops::{Deref, DerefMut},
+ sync::Arc,
time::Duration,
};
@@ -7,7 +9,7 @@ use jabber::{connection::Tls, jabber_stream::bound_stream::BoundJabberReader};
use sqlx::SqlitePool;
use stanza::client::Stanza;
use tokio::{
- sync::{mpsc, oneshot},
+ sync::{mpsc, oneshot, Mutex},
task::{JoinHandle, JoinSet},
};
@@ -28,6 +30,7 @@ pub struct Read {
JoinSet<()>,
mpsc::Sender<SupervisorCommand>,
WriteHandle,
+ Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Error>>>>>,
)>,
db: SqlitePool,
update_sender: mpsc::Sender<UpdateMessage>,
@@ -36,6 +39,8 @@ pub struct Read {
tasks: JoinSet<()>,
disconnecting: bool,
disconnect_timedout: oneshot::Receiver<()>,
+ // TODO: use proper stanza ids
+ pending_iqs: Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Error>>>>>,
}
impl Read {
@@ -48,6 +53,7 @@ impl Read {
JoinSet<()>,
mpsc::Sender<SupervisorCommand>,
WriteHandle,
+ Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Error>>>>>,
)>,
db: SqlitePool,
update_sender: mpsc::Sender<UpdateMessage>,
@@ -55,6 +61,7 @@ impl Read {
supervisor_control: mpsc::Sender<SupervisorCommand>,
write_handle: WriteHandle,
tasks: JoinSet<()>,
+ pending_iqs: Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Error>>>>>,
) -> Self {
let (send, recv) = oneshot::channel();
Self {
@@ -68,6 +75,7 @@ impl Read {
tasks,
disconnecting: false,
disconnect_timedout: recv,
+ pending_iqs,
}
}
@@ -91,7 +99,7 @@ impl Read {
})
},
ReadControl::Abort(sender) => {
- let _ = sender.send((self.db, self.update_sender, self.tasks, self.supervisor_control, self.write_handle));
+ let _ = sender.send((self.db, self.update_sender, self.tasks, self.supervisor_control, self.write_handle, self.pending_iqs));
break;
},
};
@@ -126,7 +134,7 @@ impl Read {
break;
} else {
// AAAAAAAAAAAAAAAAAAAAA i should really just have this stored in the supervisor and not gaf bout passing these references around
- let _ = self.on_crash.send((self.db, self.update_sender, self.tasks, self.supervisor_control, self.write_handle));
+ let _ = self.on_crash.send((self.db, self.update_sender, self.tasks, self.supervisor_control, self.write_handle, self.pending_iqs));
}
break;
},
@@ -134,6 +142,11 @@ impl Read {
},
else => break
}
+ // when it aborts, must clear iq map no matter what
+ let mut iqs = self.pending_iqs.lock().await;
+ for (_id, sender) in iqs.drain() {
+ let _ = sender.send(Err(Error::LostConnection));
+ }
}
}
}
@@ -162,6 +175,7 @@ pub enum ReadControl {
JoinSet<()>,
mpsc::Sender<SupervisorCommand>,
WriteHandle,
+ Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Error>>>>>,
)>,
),
}
@@ -194,11 +208,13 @@ impl ReadControlHandle {
JoinSet<()>,
mpsc::Sender<SupervisorCommand>,
WriteHandle,
+ Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Error>>>>>,
)>,
db: SqlitePool,
sender: mpsc::Sender<UpdateMessage>,
supervisor_control: mpsc::Sender<SupervisorCommand>,
jabber_write: WriteHandle,
+ pending_iqs: Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Error>>>>>,
) -> Self {
let (control_sender, control_receiver) = mpsc::channel(20);
@@ -211,6 +227,7 @@ impl ReadControlHandle {
supervisor_control,
jabber_write,
JoinSet::new(),
+ pending_iqs,
);
let handle = tokio::spawn(async move { actor.run().await });
@@ -228,12 +245,14 @@ impl ReadControlHandle {
JoinSet<()>,
mpsc::Sender<SupervisorCommand>,
WriteHandle,
+ Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Error>>>>>,
)>,
db: SqlitePool,
sender: mpsc::Sender<UpdateMessage>,
supervisor_control: mpsc::Sender<SupervisorCommand>,
jabber_write: WriteHandle,
tasks: JoinSet<()>,
+ pending_iqs: Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Error>>>>>,
) -> Self {
let (control_sender, control_receiver) = mpsc::channel(20);
@@ -246,6 +265,7 @@ impl ReadControlHandle {
supervisor_control,
jabber_write,
tasks,
+ pending_iqs,
);
let handle = tokio::spawn(async move { actor.run().await });