aboutsummaryrefslogtreecommitdiffstats
path: root/lampada/src/connection/mod.rs
diff options
context:
space:
mode:
Diffstat (limited to 'lampada/src/connection/mod.rs')
-rw-r--r--lampada/src/connection/mod.rs69
1 files changed, 38 insertions, 31 deletions
diff --git a/lampada/src/connection/mod.rs b/lampada/src/connection/mod.rs
index 2d570ae..51c3758 100644
--- a/lampada/src/connection/mod.rs
+++ b/lampada/src/connection/mod.rs
@@ -7,14 +7,16 @@ use std::{
time::Duration,
};
-use jid::JID;
-use luz::{connection::Tls, jabber_stream::bound_stream::BoundJabberStream};
+use jid::{BareJID, FullJID, JID};
+use luz::jabber_stream::bound_stream::BoundJabberStream;
use read::{ReadControl, ReadControlHandle, ReadState};
use stanza::{client::Stanza, stream_error::Error as StreamError};
use tokio::{
sync::{mpsc, oneshot, Mutex},
task::{JoinHandle, JoinSet},
};
+#[cfg(target_arch = "wasm32")]
+use tokio_with_wasm::alias as tokio;
use tracing::info;
use write::{WriteControl, WriteControlHandle, WriteHandle, WriteMessage, WriteState};
@@ -46,7 +48,12 @@ pub enum SupervisorCommand {
Reconnect(ReadState),
}
-impl<Lgc: Logic + Clone + Send + 'static> Supervisor<Lgc> {
+impl<Lgc> Supervisor<Lgc>
+where
+ Lgc: Logic + Clone + 'static,
+ #[cfg(not(target_arch = "wasm32"))]
+ Lgc: Send,
+{
fn new(
command_recv: mpsc::Receiver<SupervisorCommand>,
reader_crash: oneshot::Receiver<(Option<StreamError>, ReadState)>,
@@ -86,13 +93,13 @@ impl<Lgc: Logic + Clone + Send + 'static> Supervisor<Lgc> {
info!("sent disconnect command");
tokio::select! {
_ = async { tokio::join!(
- async { let _ = (&mut self.write_control_handle.handle).await; },
- async { let _ = (&mut self.read_control_handle.handle).await; }
+ // async { let _ = (&mut self.write_control_handle.handle).await; },
+ // async { let _ = (&mut self.read_control_handle.handle).await; }
) } => {},
// TODO: config timeout
_ = async { tokio::time::sleep(Duration::from_secs(5)) } => {
- (&mut self.read_control_handle.handle).abort();
- (&mut self.write_control_handle.handle).abort();
+ // (&mut self.read_control_handle.handle).abort();
+ // (&mut self.write_control_handle.handle).abort();
}
}
info!("disconnected");
@@ -114,12 +121,11 @@ impl<Lgc: Logic + Clone + Send + 'static> Supervisor<Lgc> {
break
}
- let mut jid = self.connected.jid.clone();
- let mut domain = jid.domainpart.clone();
+ let mut jid = self.connected.jid.clone().into();
// TODO: make sure connect_and_login does not modify the jid, but instead returns a jid. or something like that
- let connection = luz::connect_and_login(&mut jid, &*self.password, &mut domain).await;
+ let connection = luz::connect_and_login(&jid, &*self.password).await;
match connection {
- Ok(c) => {
+ Ok((c, full_jid)) => {
let (read, write) = c.split();
let (send, recv) = oneshot::channel();
self.writer_crash = recv;
@@ -129,7 +135,7 @@ impl<Lgc: Logic + Clone + Send + 'static> Supervisor<Lgc> {
self.reader_crash = recv;
self.read_control_handle = ReadControlHandle::reconnect(
read,
- read_state.tasks,
+ // read_state.tasks,
self.connected.clone(),
self.logic.clone(),
read_state.supervisor_control,
@@ -162,12 +168,11 @@ impl<Lgc: Logic + Clone + Send + 'static> Supervisor<Lgc> {
else => break,
};
- let mut jid = self.connected.jid.clone();
- let mut domain = jid.domainpart.clone();
+ let mut jid = self.connected.jid.clone().into();
// TODO: same here
- let connection = luz::connect_and_login(&mut jid, &*self.password, &mut domain).await;
+ let connection = luz::connect_and_login(&jid, &*self.password).await;
match connection {
- Ok(c) => {
+ Ok((c, full_jid)) => {
let (read, write) = c.split();
let (send, recv) = oneshot::channel();
self.writer_crash = recv;
@@ -177,7 +182,7 @@ impl<Lgc: Logic + Clone + Send + 'static> Supervisor<Lgc> {
self.reader_crash = recv;
self.read_control_handle = ReadControlHandle::reconnect(
read,
- read_state.tasks,
+ // read_state.tasks,
self.connected.clone(),
self.logic.clone(),
read_state.supervisor_control,
@@ -207,11 +212,10 @@ impl<Lgc: Logic + Clone + Send + 'static> Supervisor<Lgc> {
else => break,
};
- let mut jid = self.connected.jid.clone();
- let mut domain = jid.domainpart.clone();
- let connection = luz::connect_and_login(&mut jid, &*self.password, &mut domain).await;
+ let mut jid = self.connected.jid.clone().into();
+ let connection = luz::connect_and_login(&jid, &*self.password).await;
match connection {
- Ok(c) => {
+ Ok((c, full_jid)) => {
let (read, write) = c.split();
let (send, recv) = oneshot::channel();
self.writer_crash = recv;
@@ -225,7 +229,7 @@ impl<Lgc: Logic + Clone + Send + 'static> Supervisor<Lgc> {
self.reader_crash = recv;
self.read_control_handle = ReadControlHandle::reconnect(
read,
- read_state.tasks,
+ // read_state.tasks,
self.connected.clone(),
self.logic.clone(),
read_state.supervisor_control,
@@ -257,7 +261,8 @@ impl<Lgc: Logic + Clone + Send + 'static> Supervisor<Lgc> {
pub struct SupervisorHandle {
sender: SupervisorSender,
- handle: JoinHandle<()>,
+ // TODO: is not having handles fine?
+ // handle: JoinHandle<()>,
}
impl Deref for SupervisorHandle {
@@ -294,14 +299,18 @@ impl DerefMut for SupervisorSender {
}
impl SupervisorHandle {
- pub fn new<Lgc: Logic + Clone + Send + 'static>(
- streams: BoundJabberStream<Tls>,
+ pub fn new<Lgc>(
+ streams: BoundJabberStream,
on_crash: oneshot::Sender<()>,
- jid: JID,
- server: JID,
+ jid: FullJID,
password: Arc<String>,
logic: Lgc,
- ) -> (WriteHandle, Self) {
+ ) -> (WriteHandle, Self)
+ where
+ Lgc: Logic + Clone + 'static,
+ #[cfg(not(target_arch = "wasm32"))]
+ Lgc: Send,
+ {
let (command_send, command_recv) = mpsc::channel(20);
let (writer_crash_send, writer_crash_recv) = oneshot::channel();
let (reader_crash_send, reader_crash_recv) = oneshot::channel();
@@ -314,7 +323,6 @@ impl SupervisorHandle {
let connected = Connected {
jid,
write_handle: write_handle.clone(),
- server,
};
let supervisor_sender = SupervisorSender {
@@ -341,13 +349,12 @@ impl SupervisorHandle {
logic,
);
- let handle = tokio::spawn(async move { actor.run().await });
+ tokio::spawn(async move { actor.run().await });
(
write_handle,
Self {
sender: supervisor_sender,
- handle,
},
)
}