diff options
Diffstat (limited to 'luz/src/connection/read.rs')
-rw-r--r-- | luz/src/connection/read.rs | 27 |
1 files changed, 13 insertions, 14 deletions
diff --git a/luz/src/connection/read.rs b/luz/src/connection/read.rs index d310a12..4e55bc5 100644 --- a/luz/src/connection/read.rs +++ b/luz/src/connection/read.rs @@ -1,5 +1,6 @@ use std::{ collections::HashMap, + marker::PhantomData, ops::{Deref, DerefMut}, str::FromStr, sync::Arc, @@ -22,13 +23,13 @@ use crate::{ error::{Error, IqError, MessageRecvError, PresenceError, ReadError, RosterError}, presence::{Offline, Online, Presence, PresenceType, Show}, roster::Contact, - Connected, LogicState, UpdateMessage, + Connected, Logic, LogicState, UpdateMessage, }; use super::{write::WriteHandle, SupervisorCommand, SupervisorSender}; /// read actor -pub struct Read { +pub struct Read<Lgc> { stream: BoundJabberReader<Tls>, disconnecting: bool, disconnect_timedout: oneshot::Receiver<()>, @@ -39,7 +40,7 @@ pub struct Read { // for handling incoming stanzas // jabber server must be able to both terminate the connection from error, and ask for data from the client (such as supported XEPs) connected: Connected, - logic: LogicState, + logic: Lgc, supervisor_control: SupervisorSender, // control stuff @@ -54,12 +55,12 @@ pub struct ReadState { pub tasks: JoinSet<()>, } -impl Read { +impl<Lgc> Read<Lgc> { fn new( stream: BoundJabberReader<Tls>, tasks: JoinSet<()>, connected: Connected, - logic: LogicState, + logic: Lgc, supervisor_control: SupervisorSender, control_receiver: mpsc::Receiver<ReadControl>, on_crash: oneshot::Sender<ReadState>, @@ -77,7 +78,9 @@ impl Read { on_crash, } } +} +impl<Lgc: Clone + Logic + Send + 'static> Read<Lgc> { async fn run(mut self) { println!("started read thread"); // let stanza = self.stream.read::<Stanza>().await; @@ -149,11 +152,7 @@ impl Read { } } println!("stopping read thread"); - // when it aborts, must clear iq map no matter what - let mut iqs = self.logic.pending.lock().await; - for (_id, sender) in iqs.drain() { - let _ = sender.send(Err(ReadError::LostConnection)); - } + self.logic.on_abort().await; } } @@ -188,10 +187,10 @@ impl DerefMut for ReadControlHandle { } impl ReadControlHandle { - pub fn new( + pub fn new<Lgc: Clone + Logic + Send + 'static>( stream: BoundJabberReader<Tls>, connected: Connected, - logic: LogicState, + logic: Lgc, supervisor_control: SupervisorSender, on_crash: oneshot::Sender<ReadState>, ) -> Self { @@ -214,11 +213,11 @@ impl ReadControlHandle { } } - pub fn reconnect( + pub fn reconnect<Lgc: Clone + Logic + Send + 'static>( stream: BoundJabberReader<Tls>, tasks: JoinSet<()>, connected: Connected, - logic: LogicState, + logic: Lgc, supervisor_control: SupervisorSender, on_crash: oneshot::Sender<ReadState>, ) -> Self { |