aboutsummaryrefslogtreecommitdiffstats
path: root/lampada/src/connection/read.rs
diff options
context:
space:
mode:
Diffstat (limited to 'lampada/src/connection/read.rs')
-rw-r--r--lampada/src/connection/read.rs78
1 files changed, 48 insertions, 30 deletions
diff --git a/lampada/src/connection/read.rs b/lampada/src/connection/read.rs
index 640ca8e..591a2cb 100644
--- a/lampada/src/connection/read.rs
+++ b/lampada/src/connection/read.rs
@@ -7,7 +7,8 @@ use std::{
time::Duration,
};
-use luz::{connection::Tls, jabber_stream::bound_stream::BoundJabberReader};
+use futures::{future::Fuse, FutureExt};
+use luz::jabber_stream::bound_stream::BoundJabberReader;
use stanza::client::Stanza;
use stanza::stream::Error as StreamErrorStanza;
use stanza::stream_error::Error as StreamError;
@@ -15,6 +16,8 @@ use tokio::{
sync::{mpsc, oneshot, Mutex},
task::{JoinHandle, JoinSet},
};
+#[cfg(target_arch = "wasm32")]
+use tokio_with_wasm::alias as tokio;
use tracing::info;
use crate::{Connected, Logic, WriteMessage};
@@ -23,12 +26,12 @@ use super::{write::WriteHandle, SupervisorCommand, SupervisorSender};
/// read actor
pub struct Read<Lgc> {
- stream: BoundJabberReader<Tls>,
+ stream: BoundJabberReader,
disconnecting: bool,
- disconnect_timedout: oneshot::Receiver<()>,
+ disconnect_timedout: Fuse<oneshot::Receiver<()>>,
// all the threads spawned by the current connection session
- tasks: JoinSet<()>,
+ // tasks: JoinSet<()>,
// for handling incoming stanzas
// jabber server must be able to both terminate the connection from error, and ask for data from the client (such as supported XEPs)
@@ -45,13 +48,13 @@ pub struct Read<Lgc> {
pub struct ReadState {
pub supervisor_control: SupervisorSender,
// TODO: when a stream dies, the iq gets from the server should not be replied to on the new stream
- pub tasks: JoinSet<()>,
+ // pub tasks: JoinSet<()>,
}
impl<Lgc> Read<Lgc> {
fn new(
- stream: BoundJabberReader<Tls>,
- tasks: JoinSet<()>,
+ stream: BoundJabberReader,
+ // tasks: JoinSet<()>,
connected: Connected,
logic: Lgc,
supervisor_control: SupervisorSender,
@@ -62,8 +65,8 @@ impl<Lgc> Read<Lgc> {
Self {
stream,
disconnecting: false,
- disconnect_timedout: recv,
- tasks,
+ disconnect_timedout: recv.fuse(),
+ // tasks,
connected,
logic,
supervisor_control,
@@ -73,7 +76,12 @@ impl<Lgc> Read<Lgc> {
}
}
-impl<Lgc: Clone + Logic + Send + 'static> Read<Lgc> {
+impl<Lgc> Read<Lgc>
+where
+ Lgc: Logic + Clone + 'static,
+ #[cfg(not(target_arch = "wasm32"))]
+ Lgc: Send,
+{
async fn run(mut self) {
println!("started read thread");
// let stanza = self.stream.read::<Stanza>().await;
@@ -91,7 +99,7 @@ impl<Lgc: Clone + Logic + Send + 'static> Read<Lgc> {
// when disconnect received,
ReadControl::Disconnect => {
let (send, recv) = oneshot::channel();
- self.disconnect_timedout = recv;
+ self.disconnect_timedout = recv.fuse();
self.disconnecting = true;
tokio::spawn(async {
tokio::time::sleep(Duration::from_secs(10)).await;
@@ -99,7 +107,7 @@ impl<Lgc: Clone + Logic + Send + 'static> Read<Lgc> {
})
},
ReadControl::Abort(sender) => {
- let _ = sender.send(ReadState { supervisor_control: self.supervisor_control, tasks: self.tasks });
+ let _ = sender.send(ReadState { supervisor_control: self.supervisor_control });
break;
},
};
@@ -111,11 +119,11 @@ impl<Lgc: Clone + Logic + Send + 'static> Read<Lgc> {
match s {
Stanza::Error(error) => {
self.logic.clone().handle_stream_error(error).await;
- self.supervisor_control.send(SupervisorCommand::Reconnect(ReadState { supervisor_control: self.supervisor_control.clone(), tasks: self.tasks })).await;
+ self.supervisor_control.send(SupervisorCommand::Reconnect(ReadState { supervisor_control: self.supervisor_control.clone() })).await;
break;
},
_ => {
- self.tasks.spawn(self.logic.clone().handle_stanza(s, self.connected.clone()));
+ tokio::spawn(self.logic.clone().handle_stanza(s, self.connected.clone()));
}
};
},
@@ -128,7 +136,7 @@ impl<Lgc: Clone + Logic + Send + 'static> Read<Lgc> {
let stream_error = match e {
peanuts::Error::ReadError(error) => None,
peanuts::Error::Utf8Error(utf8_error) => Some(StreamError::UnsupportedEncoding),
- peanuts::Error::ParseError(_) => Some(StreamError::BadFormat),
+ peanuts::Error::ParseError(_, _) => Some(StreamError::BadFormat),
peanuts::Error::EntityProcessError(_) => Some(StreamError::RestrictedXml),
peanuts::Error::InvalidCharRef(char_ref_error) => Some(StreamError::UnsupportedEncoding),
peanuts::Error::DuplicateNameSpaceDeclaration(namespace_declaration) => Some(StreamError::NotWellFormed),
@@ -139,9 +147,10 @@ impl<Lgc: Clone + Logic + Send + 'static> Read<Lgc> {
peanuts::Error::UndeclaredNamespace(_) => Some(StreamError::InvalidNamespace),
peanuts::Error::Deserialize(deserialize_error) => Some(StreamError::InvalidXml),
peanuts::Error::RootElementEnded => Some(StreamError::InvalidXml),
+ _ => None,
};
- let _ = self.on_crash.send((stream_error, ReadState { supervisor_control: self.supervisor_control, tasks: self.tasks }));
+ let _ = self.on_crash.send((stream_error, ReadState { supervisor_control: self.supervisor_control }));
}
break;
},
@@ -168,7 +177,8 @@ pub enum ReadControl {
pub struct ReadControlHandle {
sender: mpsc::Sender<ReadControl>,
- pub(crate) handle: JoinHandle<()>,
+ // TODO: same here
+ // pub(crate) handle: JoinHandle<()>,
}
impl Deref for ReadControlHandle {
@@ -186,56 +196,64 @@ impl DerefMut for ReadControlHandle {
}
impl ReadControlHandle {
- pub fn new<Lgc: Clone + Logic + Send + 'static>(
- stream: BoundJabberReader<Tls>,
+ pub fn new<Lgc>(
+ stream: BoundJabberReader,
connected: Connected,
logic: Lgc,
supervisor_control: SupervisorSender,
on_crash: oneshot::Sender<(Option<StreamError>, ReadState)>,
- ) -> Self {
+ ) -> Self
+ where
+ Lgc: Logic + Clone + 'static,
+ #[cfg(not(target_arch = "wasm32"))]
+ Lgc: Send,
+ {
let (control_sender, control_receiver) = mpsc::channel(20);
let actor = Read::new(
stream,
- JoinSet::new(),
+ // JoinSet::new(),
connected,
logic,
supervisor_control,
control_receiver,
on_crash,
);
- let handle = tokio::spawn(async move { actor.run().await });
+ tokio::spawn(async move { actor.run().await });
Self {
sender: control_sender,
- handle,
}
}
- pub fn reconnect<Lgc: Clone + Logic + Send + 'static>(
- stream: BoundJabberReader<Tls>,
- tasks: JoinSet<()>,
+ pub fn reconnect<Lgc>(
+ stream: BoundJabberReader,
+ // tasks: JoinSet<()>,
connected: Connected,
logic: Lgc,
supervisor_control: SupervisorSender,
on_crash: oneshot::Sender<(Option<StreamError>, ReadState)>,
- ) -> Self {
+ ) -> Self
+ where
+ Lgc: Logic + Clone + 'static,
+ #[cfg(not(target_arch = "wasm32"))]
+ Lgc: Send,
+ {
let (control_sender, control_receiver) = mpsc::channel(20);
let actor = Read::new(
stream,
- tasks,
+ // tasks,
connected,
logic,
supervisor_control,
control_receiver,
on_crash,
);
- let handle = tokio::spawn(async move { actor.run().await });
+ tokio::spawn(async move { actor.run().await });
Self {
sender: control_sender,
- handle,
}
}
}