diff options
Diffstat (limited to 'luz/src/connection/read.rs')
-rw-r--r-- | luz/src/connection/read.rs | 30 |
1 files changed, 19 insertions, 11 deletions
diff --git a/luz/src/connection/read.rs b/luz/src/connection/read.rs index c1e37b4..3c61780 100644 --- a/luz/src/connection/read.rs +++ b/luz/src/connection/read.rs @@ -80,10 +80,15 @@ impl Read { } async fn run(mut self) { + println!("started read thread"); + // let stanza = self.stream.read::<Stanza>().await; + // println!("{:?}", stanza); loop { tokio::select! { // if still haven't received the end tag in time, just kill itself - _ = &mut self.disconnect_timedout => { + // TODO: is this okay??? what if notification thread dies? + Ok(()) = &mut self.disconnect_timedout => { + println!("disconnect_timedout"); break; } Some(msg) = self.control_receiver.recv() => { @@ -99,17 +104,19 @@ impl Read { }) }, ReadControl::Abort(sender) => { - let _ = sender.send((self.db, self.update_sender, self.tasks, self.supervisor_control, self.write_handle, self.pending_iqs)); + let _ = sender.send((self.db, self.update_sender, self.tasks, self.supervisor_control, self.write_handle, self.pending_iqs.clone())); break; }, }; }, - stanza = self.stream.read::<Stanza>() => { - match stanza { + s = self.stream.read::<Stanza>() => { + println!("read stanza"); + match s { Ok(s) => { self.tasks.spawn(handle_stanza(s, self.update_sender.clone(), self.db.clone(), self.supervisor_control.clone(), self.write_handle.clone())); }, Err(e) => { + println!("error: {:?}", e); // TODO: NEXT write the correct error stanza depending on error, decide whether to reconnect or properly disconnect, depending on if disconnecting is true // match e { // peanuts::Error::ReadError(error) => todo!(), @@ -134,7 +141,7 @@ impl Read { break; } else { // AAAAAAAAAAAAAAAAAAAAA i should really just have this stored in the supervisor and not gaf bout passing these references around - let _ = self.on_crash.send((self.db, self.update_sender, self.tasks, self.supervisor_control, self.write_handle, self.pending_iqs)); + let _ = self.on_crash.send((self.db, self.update_sender, self.tasks, self.supervisor_control, self.write_handle, self.pending_iqs.clone())); } break; }, @@ -142,11 +149,12 @@ impl Read { }, else => break } - // when it aborts, must clear iq map no matter what - let mut iqs = self.pending_iqs.lock().await; - for (_id, sender) in iqs.drain() { - let _ = sender.send(Err(Error::LostConnection)); - } + } + println!("stopping read thread"); + // when it aborts, must clear iq map no matter what + let mut iqs = self.pending_iqs.lock().await; + for (_id, sender) in iqs.drain() { + let _ = sender.send(Err(Error::LostConnection)); } } } @@ -163,7 +171,7 @@ async fn handle_stanza( supervisor_control: mpsc::Sender<SupervisorCommand>, write_handle: WriteHandle, ) { - todo!() + println!("{:?}", stanza) } pub enum ReadControl { |