aboutsummaryrefslogtreecommitdiffstats
path: root/luz/src/connection/read.rs
diff options
context:
space:
mode:
Diffstat (limited to 'luz/src/connection/read.rs')
-rw-r--r--luz/src/connection/read.rs27
1 files changed, 13 insertions, 14 deletions
diff --git a/luz/src/connection/read.rs b/luz/src/connection/read.rs
index d310a12..4e55bc5 100644
--- a/luz/src/connection/read.rs
+++ b/luz/src/connection/read.rs
@@ -1,5 +1,6 @@
use std::{
collections::HashMap,
+ marker::PhantomData,
ops::{Deref, DerefMut},
str::FromStr,
sync::Arc,
@@ -22,13 +23,13 @@ use crate::{
error::{Error, IqError, MessageRecvError, PresenceError, ReadError, RosterError},
presence::{Offline, Online, Presence, PresenceType, Show},
roster::Contact,
- Connected, LogicState, UpdateMessage,
+ Connected, Logic, LogicState, UpdateMessage,
};
use super::{write::WriteHandle, SupervisorCommand, SupervisorSender};
/// read actor
-pub struct Read {
+pub struct Read<Lgc> {
stream: BoundJabberReader<Tls>,
disconnecting: bool,
disconnect_timedout: oneshot::Receiver<()>,
@@ -39,7 +40,7 @@ pub struct Read {
// for handling incoming stanzas
// jabber server must be able to both terminate the connection from error, and ask for data from the client (such as supported XEPs)
connected: Connected,
- logic: LogicState,
+ logic: Lgc,
supervisor_control: SupervisorSender,
// control stuff
@@ -54,12 +55,12 @@ pub struct ReadState {
pub tasks: JoinSet<()>,
}
-impl Read {
+impl<Lgc> Read<Lgc> {
fn new(
stream: BoundJabberReader<Tls>,
tasks: JoinSet<()>,
connected: Connected,
- logic: LogicState,
+ logic: Lgc,
supervisor_control: SupervisorSender,
control_receiver: mpsc::Receiver<ReadControl>,
on_crash: oneshot::Sender<ReadState>,
@@ -77,7 +78,9 @@ impl Read {
on_crash,
}
}
+}
+impl<Lgc: Clone + Logic + Send + 'static> Read<Lgc> {
async fn run(mut self) {
println!("started read thread");
// let stanza = self.stream.read::<Stanza>().await;
@@ -149,11 +152,7 @@ impl Read {
}
}
println!("stopping read thread");
- // when it aborts, must clear iq map no matter what
- let mut iqs = self.logic.pending.lock().await;
- for (_id, sender) in iqs.drain() {
- let _ = sender.send(Err(ReadError::LostConnection));
- }
+ self.logic.on_abort().await;
}
}
@@ -188,10 +187,10 @@ impl DerefMut for ReadControlHandle {
}
impl ReadControlHandle {
- pub fn new(
+ pub fn new<Lgc: Clone + Logic + Send + 'static>(
stream: BoundJabberReader<Tls>,
connected: Connected,
- logic: LogicState,
+ logic: Lgc,
supervisor_control: SupervisorSender,
on_crash: oneshot::Sender<ReadState>,
) -> Self {
@@ -214,11 +213,11 @@ impl ReadControlHandle {
}
}
- pub fn reconnect(
+ pub fn reconnect<Lgc: Clone + Logic + Send + 'static>(
stream: BoundJabberReader<Tls>,
tasks: JoinSet<()>,
connected: Connected,
- logic: LogicState,
+ logic: Lgc,
supervisor_control: SupervisorSender,
on_crash: oneshot::Sender<ReadState>,
) -> Self {