aboutsummaryrefslogtreecommitdiffstats
path: root/luz/src/connection/read.rs
diff options
context:
space:
mode:
Diffstat (limited to 'luz/src/connection/read.rs')
-rw-r--r--luz/src/connection/read.rs30
1 files changed, 19 insertions, 11 deletions
diff --git a/luz/src/connection/read.rs b/luz/src/connection/read.rs
index c1e37b4..3c61780 100644
--- a/luz/src/connection/read.rs
+++ b/luz/src/connection/read.rs
@@ -80,10 +80,15 @@ impl Read {
}
async fn run(mut self) {
+ println!("started read thread");
+ // let stanza = self.stream.read::<Stanza>().await;
+ // println!("{:?}", stanza);
loop {
tokio::select! {
// if still haven't received the end tag in time, just kill itself
- _ = &mut self.disconnect_timedout => {
+ // TODO: is this okay??? what if notification thread dies?
+ Ok(()) = &mut self.disconnect_timedout => {
+ println!("disconnect_timedout");
break;
}
Some(msg) = self.control_receiver.recv() => {
@@ -99,17 +104,19 @@ impl Read {
})
},
ReadControl::Abort(sender) => {
- let _ = sender.send((self.db, self.update_sender, self.tasks, self.supervisor_control, self.write_handle, self.pending_iqs));
+ let _ = sender.send((self.db, self.update_sender, self.tasks, self.supervisor_control, self.write_handle, self.pending_iqs.clone()));
break;
},
};
},
- stanza = self.stream.read::<Stanza>() => {
- match stanza {
+ s = self.stream.read::<Stanza>() => {
+ println!("read stanza");
+ match s {
Ok(s) => {
self.tasks.spawn(handle_stanza(s, self.update_sender.clone(), self.db.clone(), self.supervisor_control.clone(), self.write_handle.clone()));
},
Err(e) => {
+ println!("error: {:?}", e);
// TODO: NEXT write the correct error stanza depending on error, decide whether to reconnect or properly disconnect, depending on if disconnecting is true
// match e {
// peanuts::Error::ReadError(error) => todo!(),
@@ -134,7 +141,7 @@ impl Read {
break;
} else {
// AAAAAAAAAAAAAAAAAAAAA i should really just have this stored in the supervisor and not gaf bout passing these references around
- let _ = self.on_crash.send((self.db, self.update_sender, self.tasks, self.supervisor_control, self.write_handle, self.pending_iqs));
+ let _ = self.on_crash.send((self.db, self.update_sender, self.tasks, self.supervisor_control, self.write_handle, self.pending_iqs.clone()));
}
break;
},
@@ -142,11 +149,12 @@ impl Read {
},
else => break
}
- // when it aborts, must clear iq map no matter what
- let mut iqs = self.pending_iqs.lock().await;
- for (_id, sender) in iqs.drain() {
- let _ = sender.send(Err(Error::LostConnection));
- }
+ }
+ println!("stopping read thread");
+ // when it aborts, must clear iq map no matter what
+ let mut iqs = self.pending_iqs.lock().await;
+ for (_id, sender) in iqs.drain() {
+ let _ = sender.send(Err(Error::LostConnection));
}
}
}
@@ -163,7 +171,7 @@ async fn handle_stanza(
supervisor_control: mpsc::Sender<SupervisorCommand>,
write_handle: WriteHandle,
) {
- todo!()
+ println!("{:?}", stanza)
}
pub enum ReadControl {