aboutsummaryrefslogtreecommitdiffstats
path: root/lampada/src
diff options
context:
space:
mode:
Diffstat (limited to 'lampada/src')
-rw-r--r--lampada/src/connection/mod.rs69
-rw-r--r--lampada/src/connection/read.rs78
-rw-r--r--lampada/src/connection/write.rs26
-rw-r--r--lampada/src/error.rs15
-rw-r--r--lampada/src/lib.rs120
5 files changed, 190 insertions, 118 deletions
diff --git a/lampada/src/connection/mod.rs b/lampada/src/connection/mod.rs
index 2d570ae..51c3758 100644
--- a/lampada/src/connection/mod.rs
+++ b/lampada/src/connection/mod.rs
@@ -7,14 +7,16 @@ use std::{
time::Duration,
};
-use jid::JID;
-use luz::{connection::Tls, jabber_stream::bound_stream::BoundJabberStream};
+use jid::{BareJID, FullJID, JID};
+use luz::jabber_stream::bound_stream::BoundJabberStream;
use read::{ReadControl, ReadControlHandle, ReadState};
use stanza::{client::Stanza, stream_error::Error as StreamError};
use tokio::{
sync::{mpsc, oneshot, Mutex},
task::{JoinHandle, JoinSet},
};
+#[cfg(target_arch = "wasm32")]
+use tokio_with_wasm::alias as tokio;
use tracing::info;
use write::{WriteControl, WriteControlHandle, WriteHandle, WriteMessage, WriteState};
@@ -46,7 +48,12 @@ pub enum SupervisorCommand {
Reconnect(ReadState),
}
-impl<Lgc: Logic + Clone + Send + 'static> Supervisor<Lgc> {
+impl<Lgc> Supervisor<Lgc>
+where
+ Lgc: Logic + Clone + 'static,
+ #[cfg(not(target_arch = "wasm32"))]
+ Lgc: Send,
+{
fn new(
command_recv: mpsc::Receiver<SupervisorCommand>,
reader_crash: oneshot::Receiver<(Option<StreamError>, ReadState)>,
@@ -86,13 +93,13 @@ impl<Lgc: Logic + Clone + Send + 'static> Supervisor<Lgc> {
info!("sent disconnect command");
tokio::select! {
_ = async { tokio::join!(
- async { let _ = (&mut self.write_control_handle.handle).await; },
- async { let _ = (&mut self.read_control_handle.handle).await; }
+ // async { let _ = (&mut self.write_control_handle.handle).await; },
+ // async { let _ = (&mut self.read_control_handle.handle).await; }
) } => {},
// TODO: config timeout
_ = async { tokio::time::sleep(Duration::from_secs(5)) } => {
- (&mut self.read_control_handle.handle).abort();
- (&mut self.write_control_handle.handle).abort();
+ // (&mut self.read_control_handle.handle).abort();
+ // (&mut self.write_control_handle.handle).abort();
}
}
info!("disconnected");
@@ -114,12 +121,11 @@ impl<Lgc: Logic + Clone + Send + 'static> Supervisor<Lgc> {
break
}
- let mut jid = self.connected.jid.clone();
- let mut domain = jid.domainpart.clone();
+ let mut jid = self.connected.jid.clone().into();
// TODO: make sure connect_and_login does not modify the jid, but instead returns a jid. or something like that
- let connection = luz::connect_and_login(&mut jid, &*self.password, &mut domain).await;
+ let connection = luz::connect_and_login(&jid, &*self.password).await;
match connection {
- Ok(c) => {
+ Ok((c, full_jid)) => {
let (read, write) = c.split();
let (send, recv) = oneshot::channel();
self.writer_crash = recv;
@@ -129,7 +135,7 @@ impl<Lgc: Logic + Clone + Send + 'static> Supervisor<Lgc> {
self.reader_crash = recv;
self.read_control_handle = ReadControlHandle::reconnect(
read,
- read_state.tasks,
+ // read_state.tasks,
self.connected.clone(),
self.logic.clone(),
read_state.supervisor_control,
@@ -162,12 +168,11 @@ impl<Lgc: Logic + Clone + Send + 'static> Supervisor<Lgc> {
else => break,
};
- let mut jid = self.connected.jid.clone();
- let mut domain = jid.domainpart.clone();
+ let mut jid = self.connected.jid.clone().into();
// TODO: same here
- let connection = luz::connect_and_login(&mut jid, &*self.password, &mut domain).await;
+ let connection = luz::connect_and_login(&jid, &*self.password).await;
match connection {
- Ok(c) => {
+ Ok((c, full_jid)) => {
let (read, write) = c.split();
let (send, recv) = oneshot::channel();
self.writer_crash = recv;
@@ -177,7 +182,7 @@ impl<Lgc: Logic + Clone + Send + 'static> Supervisor<Lgc> {
self.reader_crash = recv;
self.read_control_handle = ReadControlHandle::reconnect(
read,
- read_state.tasks,
+ // read_state.tasks,
self.connected.clone(),
self.logic.clone(),
read_state.supervisor_control,
@@ -207,11 +212,10 @@ impl<Lgc: Logic + Clone + Send + 'static> Supervisor<Lgc> {
else => break,
};
- let mut jid = self.connected.jid.clone();
- let mut domain = jid.domainpart.clone();
- let connection = luz::connect_and_login(&mut jid, &*self.password, &mut domain).await;
+ let mut jid = self.connected.jid.clone().into();
+ let connection = luz::connect_and_login(&jid, &*self.password).await;
match connection {
- Ok(c) => {
+ Ok((c, full_jid)) => {
let (read, write) = c.split();
let (send, recv) = oneshot::channel();
self.writer_crash = recv;
@@ -225,7 +229,7 @@ impl<Lgc: Logic + Clone + Send + 'static> Supervisor<Lgc> {
self.reader_crash = recv;
self.read_control_handle = ReadControlHandle::reconnect(
read,
- read_state.tasks,
+ // read_state.tasks,
self.connected.clone(),
self.logic.clone(),
read_state.supervisor_control,
@@ -257,7 +261,8 @@ impl<Lgc: Logic + Clone + Send + 'static> Supervisor<Lgc> {
pub struct SupervisorHandle {
sender: SupervisorSender,
- handle: JoinHandle<()>,
+ // TODO: is not having handles fine?
+ // handle: JoinHandle<()>,
}
impl Deref for SupervisorHandle {
@@ -294,14 +299,18 @@ impl DerefMut for SupervisorSender {
}
impl SupervisorHandle {
- pub fn new<Lgc: Logic + Clone + Send + 'static>(
- streams: BoundJabberStream<Tls>,
+ pub fn new<Lgc>(
+ streams: BoundJabberStream,
on_crash: oneshot::Sender<()>,
- jid: JID,
- server: JID,
+ jid: FullJID,
password: Arc<String>,
logic: Lgc,
- ) -> (WriteHandle, Self) {
+ ) -> (WriteHandle, Self)
+ where
+ Lgc: Logic + Clone + 'static,
+ #[cfg(not(target_arch = "wasm32"))]
+ Lgc: Send,
+ {
let (command_send, command_recv) = mpsc::channel(20);
let (writer_crash_send, writer_crash_recv) = oneshot::channel();
let (reader_crash_send, reader_crash_recv) = oneshot::channel();
@@ -314,7 +323,6 @@ impl SupervisorHandle {
let connected = Connected {
jid,
write_handle: write_handle.clone(),
- server,
};
let supervisor_sender = SupervisorSender {
@@ -341,13 +349,12 @@ impl SupervisorHandle {
logic,
);
- let handle = tokio::spawn(async move { actor.run().await });
+ tokio::spawn(async move { actor.run().await });
(
write_handle,
Self {
sender: supervisor_sender,
- handle,
},
)
}
diff --git a/lampada/src/connection/read.rs b/lampada/src/connection/read.rs
index 640ca8e..591a2cb 100644
--- a/lampada/src/connection/read.rs
+++ b/lampada/src/connection/read.rs
@@ -7,7 +7,8 @@ use std::{
time::Duration,
};
-use luz::{connection::Tls, jabber_stream::bound_stream::BoundJabberReader};
+use futures::{future::Fuse, FutureExt};
+use luz::jabber_stream::bound_stream::BoundJabberReader;
use stanza::client::Stanza;
use stanza::stream::Error as StreamErrorStanza;
use stanza::stream_error::Error as StreamError;
@@ -15,6 +16,8 @@ use tokio::{
sync::{mpsc, oneshot, Mutex},
task::{JoinHandle, JoinSet},
};
+#[cfg(target_arch = "wasm32")]
+use tokio_with_wasm::alias as tokio;
use tracing::info;
use crate::{Connected, Logic, WriteMessage};
@@ -23,12 +26,12 @@ use super::{write::WriteHandle, SupervisorCommand, SupervisorSender};
/// read actor
pub struct Read<Lgc> {
- stream: BoundJabberReader<Tls>,
+ stream: BoundJabberReader,
disconnecting: bool,
- disconnect_timedout: oneshot::Receiver<()>,
+ disconnect_timedout: Fuse<oneshot::Receiver<()>>,
// all the threads spawned by the current connection session
- tasks: JoinSet<()>,
+ // tasks: JoinSet<()>,
// for handling incoming stanzas
// jabber server must be able to both terminate the connection from error, and ask for data from the client (such as supported XEPs)
@@ -45,13 +48,13 @@ pub struct Read<Lgc> {
pub struct ReadState {
pub supervisor_control: SupervisorSender,
// TODO: when a stream dies, the iq gets from the server should not be replied to on the new stream
- pub tasks: JoinSet<()>,
+ // pub tasks: JoinSet<()>,
}
impl<Lgc> Read<Lgc> {
fn new(
- stream: BoundJabberReader<Tls>,
- tasks: JoinSet<()>,
+ stream: BoundJabberReader,
+ // tasks: JoinSet<()>,
connected: Connected,
logic: Lgc,
supervisor_control: SupervisorSender,
@@ -62,8 +65,8 @@ impl<Lgc> Read<Lgc> {
Self {
stream,
disconnecting: false,
- disconnect_timedout: recv,
- tasks,
+ disconnect_timedout: recv.fuse(),
+ // tasks,
connected,
logic,
supervisor_control,
@@ -73,7 +76,12 @@ impl<Lgc> Read<Lgc> {
}
}
-impl<Lgc: Clone + Logic + Send + 'static> Read<Lgc> {
+impl<Lgc> Read<Lgc>
+where
+ Lgc: Logic + Clone + 'static,
+ #[cfg(not(target_arch = "wasm32"))]
+ Lgc: Send,
+{
async fn run(mut self) {
println!("started read thread");
// let stanza = self.stream.read::<Stanza>().await;
@@ -91,7 +99,7 @@ impl<Lgc: Clone + Logic + Send + 'static> Read<Lgc> {
// when disconnect received,
ReadControl::Disconnect => {
let (send, recv) = oneshot::channel();
- self.disconnect_timedout = recv;
+ self.disconnect_timedout = recv.fuse();
self.disconnecting = true;
tokio::spawn(async {
tokio::time::sleep(Duration::from_secs(10)).await;
@@ -99,7 +107,7 @@ impl<Lgc: Clone + Logic + Send + 'static> Read<Lgc> {
})
},
ReadControl::Abort(sender) => {
- let _ = sender.send(ReadState { supervisor_control: self.supervisor_control, tasks: self.tasks });
+ let _ = sender.send(ReadState { supervisor_control: self.supervisor_control });
break;
},
};
@@ -111,11 +119,11 @@ impl<Lgc: Clone + Logic + Send + 'static> Read<Lgc> {
match s {
Stanza::Error(error) => {
self.logic.clone().handle_stream_error(error).await;
- self.supervisor_control.send(SupervisorCommand::Reconnect(ReadState { supervisor_control: self.supervisor_control.clone(), tasks: self.tasks })).await;
+ self.supervisor_control.send(SupervisorCommand::Reconnect(ReadState { supervisor_control: self.supervisor_control.clone() })).await;
break;
},
_ => {
- self.tasks.spawn(self.logic.clone().handle_stanza(s, self.connected.clone()));
+ tokio::spawn(self.logic.clone().handle_stanza(s, self.connected.clone()));
}
};
},
@@ -128,7 +136,7 @@ impl<Lgc: Clone + Logic + Send + 'static> Read<Lgc> {
let stream_error = match e {
peanuts::Error::ReadError(error) => None,
peanuts::Error::Utf8Error(utf8_error) => Some(StreamError::UnsupportedEncoding),
- peanuts::Error::ParseError(_) => Some(StreamError::BadFormat),
+ peanuts::Error::ParseError(_, _) => Some(StreamError::BadFormat),
peanuts::Error::EntityProcessError(_) => Some(StreamError::RestrictedXml),
peanuts::Error::InvalidCharRef(char_ref_error) => Some(StreamError::UnsupportedEncoding),
peanuts::Error::DuplicateNameSpaceDeclaration(namespace_declaration) => Some(StreamError::NotWellFormed),
@@ -139,9 +147,10 @@ impl<Lgc: Clone + Logic + Send + 'static> Read<Lgc> {
peanuts::Error::UndeclaredNamespace(_) => Some(StreamError::InvalidNamespace),
peanuts::Error::Deserialize(deserialize_error) => Some(StreamError::InvalidXml),
peanuts::Error::RootElementEnded => Some(StreamError::InvalidXml),
+ _ => None,
};
- let _ = self.on_crash.send((stream_error, ReadState { supervisor_control: self.supervisor_control, tasks: self.tasks }));
+ let _ = self.on_crash.send((stream_error, ReadState { supervisor_control: self.supervisor_control }));
}
break;
},
@@ -168,7 +177,8 @@ pub enum ReadControl {
pub struct ReadControlHandle {
sender: mpsc::Sender<ReadControl>,
- pub(crate) handle: JoinHandle<()>,
+ // TODO: same here
+ // pub(crate) handle: JoinHandle<()>,
}
impl Deref for ReadControlHandle {
@@ -186,56 +196,64 @@ impl DerefMut for ReadControlHandle {
}
impl ReadControlHandle {
- pub fn new<Lgc: Clone + Logic + Send + 'static>(
- stream: BoundJabberReader<Tls>,
+ pub fn new<Lgc>(
+ stream: BoundJabberReader,
connected: Connected,
logic: Lgc,
supervisor_control: SupervisorSender,
on_crash: oneshot::Sender<(Option<StreamError>, ReadState)>,
- ) -> Self {
+ ) -> Self
+ where
+ Lgc: Logic + Clone + 'static,
+ #[cfg(not(target_arch = "wasm32"))]
+ Lgc: Send,
+ {
let (control_sender, control_receiver) = mpsc::channel(20);
let actor = Read::new(
stream,
- JoinSet::new(),
+ // JoinSet::new(),
connected,
logic,
supervisor_control,
control_receiver,
on_crash,
);
- let handle = tokio::spawn(async move { actor.run().await });
+ tokio::spawn(async move { actor.run().await });
Self {
sender: control_sender,
- handle,
}
}
- pub fn reconnect<Lgc: Clone + Logic + Send + 'static>(
- stream: BoundJabberReader<Tls>,
- tasks: JoinSet<()>,
+ pub fn reconnect<Lgc>(
+ stream: BoundJabberReader,
+ // tasks: JoinSet<()>,
connected: Connected,
logic: Lgc,
supervisor_control: SupervisorSender,
on_crash: oneshot::Sender<(Option<StreamError>, ReadState)>,
- ) -> Self {
+ ) -> Self
+ where
+ Lgc: Logic + Clone + 'static,
+ #[cfg(not(target_arch = "wasm32"))]
+ Lgc: Send,
+ {
let (control_sender, control_receiver) = mpsc::channel(20);
let actor = Read::new(
stream,
- tasks,
+ // tasks,
connected,
logic,
supervisor_control,
control_receiver,
on_crash,
);
- let handle = tokio::spawn(async move { actor.run().await });
+ tokio::spawn(async move { actor.run().await });
Self {
sender: control_sender,
- handle,
}
}
}
diff --git a/lampada/src/connection/write.rs b/lampada/src/connection/write.rs
index 1070cdf..b982eea 100644
--- a/lampada/src/connection/write.rs
+++ b/lampada/src/connection/write.rs
@@ -1,6 +1,6 @@
use std::ops::{Deref, DerefMut};
-use luz::{connection::Tls, jabber_stream::bound_stream::BoundJabberWriter};
+use luz::jabber_stream::bound_stream::BoundJabberWriter;
use stanza::{
client::Stanza, stream::Error as StreamErrorStanza, stream_error::Error as StreamError,
};
@@ -8,12 +8,14 @@ use tokio::{
sync::{mpsc, oneshot},
task::JoinHandle,
};
+#[cfg(target_arch = "wasm32")]
+use tokio_with_wasm::alias as tokio;
use crate::error::WriteError;
/// actor that receives jabber stanzas to write, and if there is an error, sends a message back to the supervisor then aborts, so the supervisor can spawn a new stream.
pub struct Write {
- stream: BoundJabberWriter<Tls>,
+ stream: BoundJabberWriter,
/// connection session write queue
stanza_receiver: mpsc::Receiver<WriteMessage>,
@@ -41,7 +43,7 @@ pub enum WriteControl {
impl Write {
fn new(
- stream: BoundJabberWriter<Tls>,
+ stream: BoundJabberWriter,
stanza_receiver: mpsc::Receiver<WriteMessage>,
control_receiver: mpsc::Receiver<WriteControl>,
on_crash: oneshot::Sender<(WriteMessage, WriteState)>,
@@ -192,7 +194,7 @@ impl DerefMut for WriteHandle {
pub struct WriteControlHandle {
sender: mpsc::Sender<WriteControl>,
- pub(crate) handle: JoinHandle<()>,
+ // pub(crate) handle: JoinHandle<()>,
}
impl Deref for WriteControlHandle {
@@ -211,14 +213,14 @@ impl DerefMut for WriteControlHandle {
impl WriteControlHandle {
pub fn new(
- stream: BoundJabberWriter<Tls>,
+ stream: BoundJabberWriter,
on_crash: oneshot::Sender<(WriteMessage, WriteState)>,
) -> (WriteHandle, Self) {
let (control_sender, control_receiver) = mpsc::channel(20);
let (stanza_sender, stanza_receiver) = mpsc::channel(20);
let actor = Write::new(stream, stanza_receiver, control_receiver, on_crash);
- let handle = tokio::spawn(async move { actor.run().await });
+ tokio::spawn(async move { actor.run().await });
(
WriteHandle {
@@ -226,13 +228,12 @@ impl WriteControlHandle {
},
Self {
sender: control_sender,
- handle,
},
)
}
pub fn reconnect_retry(
- stream: BoundJabberWriter<Tls>,
+ stream: BoundJabberWriter,
on_crash: oneshot::Sender<(WriteMessage, WriteState)>,
stanza_receiver: mpsc::Receiver<WriteMessage>,
retry_msg: WriteMessage,
@@ -240,27 +241,26 @@ impl WriteControlHandle {
let (control_sender, control_receiver) = mpsc::channel(20);
let actor = Write::new(stream, stanza_receiver, control_receiver, on_crash);
- let handle = tokio::spawn(async move { actor.run_reconnected(retry_msg).await });
+ tokio::spawn(async move { actor.run_reconnected(retry_msg).await });
Self {
sender: control_sender,
- handle,
}
}
pub fn reconnect(
- stream: BoundJabberWriter<Tls>,
+ stream: BoundJabberWriter,
on_crash: oneshot::Sender<(WriteMessage, WriteState)>,
stanza_receiver: mpsc::Receiver<WriteMessage>,
) -> Self {
let (control_sender, control_receiver) = mpsc::channel(20);
let actor = Write::new(stream, stanza_receiver, control_receiver, on_crash);
- let handle = tokio::spawn(async move { actor.run().await });
+
+ tokio::spawn(async move { actor.run().await });
Self {
sender: control_sender,
- handle,
}
}
}
diff --git a/lampada/src/error.rs b/lampada/src/error.rs
index 8104155..a5af3a2 100644
--- a/lampada/src/error.rs
+++ b/lampada/src/error.rs
@@ -2,10 +2,13 @@ use std::sync::Arc;
use stanza::client::Stanza;
use thiserror::Error;
-use tokio::{
- sync::{mpsc::error::SendError, oneshot::error::RecvError},
- time::error::Elapsed,
-};
+use tokio::sync::{mpsc::error::SendError, oneshot::error::RecvError};
+#[cfg(not(target_arch = "wasm32"))]
+use tokio::time::error::Elapsed;
+#[cfg(target_arch = "wasm32")]
+use tokio::time::Elapsed;
+#[cfg(target_arch = "wasm32")]
+use tokio_with_wasm::alias as tokio;
#[derive(Debug, Error, Clone)]
pub enum ConnectionError {
@@ -21,10 +24,11 @@ pub enum ConnectionError {
#[error("disconnected")]
Disconnected,
#[error("invalid server jid: {0}")]
- InvalidServerJID(#[from] jid::ParseError),
+ InvalidServerJID(#[from] jid::JIDError),
}
#[derive(Debug, Error, Clone)]
+#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize))]
pub enum CommandError<T> {
#[error("actor: {0}")]
Actor(ActorError),
@@ -58,6 +62,7 @@ pub enum ReadError {
}
#[derive(Debug, Error, Clone)]
+#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize))]
pub enum ActorError {
#[error("receive timed out")]
Timeout,
diff --git a/lampada/src/lib.rs b/lampada/src/lib.rs
index 7346c42..d1d451e 100644
--- a/lampada/src/lib.rs
+++ b/lampada/src/lib.rs
@@ -1,3 +1,5 @@
+#![feature(where_clause_attrs)]
+
use std::{
collections::HashMap,
ops::{Deref, DerefMut},
@@ -10,6 +12,7 @@ pub use connection::write::WriteMessage;
pub use connection::SupervisorSender;
use error::ConnectionError;
use futures::{future::Fuse, FutureExt};
+use jid::{BareJID, FullJID};
use luz::JID;
use stanza::client::{
iq::{self, Iq, IqType},
@@ -21,6 +24,8 @@ use tokio::{
task::JoinSet,
time::timeout,
};
+#[cfg(target_arch = "wasm32")]
+use tokio_with_wasm::alias as tokio;
use tracing::{debug, info};
use crate::connection::write::WriteHandle;
@@ -32,24 +37,18 @@ pub mod error;
#[derive(Clone)]
pub struct Connected {
// full jid will stay stable across reconnections
- jid: JID,
+ jid: FullJID,
write_handle: WriteHandle,
- // the server jid
- server: JID,
}
impl Connected {
- pub fn jid(&self) -> &JID {
+ pub fn jid(&self) -> &FullJID {
&self.jid
}
pub fn write_handle(&self) -> &WriteHandle {
&self.write_handle
}
-
- pub fn server(&self) -> &JID {
- &self.server
- }
}
/// everything that a particular xmpp client must implement
@@ -58,20 +57,24 @@ pub trait Logic {
type Cmd;
/// run after binding to the stream (e.g. for a chat client, )
+ #[cfg(not(target_arch = "wasm32"))]
fn handle_connect(self, connection: Connected) -> impl std::future::Future<Output = ()> + Send;
/// run before closing the stream (e.g. send unavailable presence in a chat client)
+ #[cfg(not(target_arch = "wasm32"))]
fn handle_disconnect(
self,
connection: Connected,
) -> impl std::future::Future<Output = ()> + Send;
+ #[cfg(not(target_arch = "wasm32"))]
fn handle_stream_error(
self,
stream_error: StreamError,
) -> impl std::future::Future<Output = ()> + Send;
/// run to handle an incoming xmpp stanza
+ #[cfg(not(target_arch = "wasm32"))]
fn handle_stanza(
self,
stanza: Stanza,
@@ -79,6 +82,7 @@ pub trait Logic {
) -> impl std::future::Future<Output = ()> + std::marker::Send;
/// run to handle a command message when a connection is currently established
+ #[cfg(not(target_arch = "wasm32"))]
fn handle_online(
self,
command: Self::Cmd,
@@ -86,21 +90,67 @@ pub trait Logic {
) -> impl std::future::Future<Output = ()> + std::marker::Send;
/// run to handle a command message when disconnected
+ #[cfg(not(target_arch = "wasm32"))]
fn handle_offline(
self,
command: Self::Cmd,
) -> impl std::future::Future<Output = ()> + std::marker::Send;
/// run as cleanup after either an abort or a disconnect (e.g. reply to all pending requests with a disconnected error)
+ #[cfg(not(target_arch = "wasm32"))]
fn on_abort(self) -> impl std::future::Future<Output = ()> + std::marker::Send;
/// handle connection errors from the core client logic
+ #[cfg(not(target_arch = "wasm32"))]
fn handle_connection_error(
self,
error: ConnectionError,
) -> impl std::future::Future<Output = ()> + std::marker::Send;
// async fn handle_stream_error(self, error) {}
+ #[cfg(target_arch = "wasm32")]
+ fn handle_connect(self, connection: Connected) -> impl std::future::Future<Output = ()>;
+
+ /// run before closing the stream (e.g. send unavailable presence in a chat client)
+ #[cfg(target_arch = "wasm32")]
+ fn handle_disconnect(self, connection: Connected) -> impl std::future::Future<Output = ()>;
+
+ #[cfg(target_arch = "wasm32")]
+ fn handle_stream_error(
+ self,
+ stream_error: StreamError,
+ ) -> impl std::future::Future<Output = ()>;
+
+ /// run to handle an incoming xmpp stanza
+ #[cfg(target_arch = "wasm32")]
+ fn handle_stanza(
+ self,
+ stanza: Stanza,
+ connection: Connected,
+ ) -> impl std::future::Future<Output = ()>;
+
+ /// run to handle a command message when a connection is currently established
+ #[cfg(target_arch = "wasm32")]
+ fn handle_online(
+ self,
+ command: Self::Cmd,
+ connection: Connected,
+ ) -> impl std::future::Future<Output = ()>;
+
+ /// run to handle a command message when disconnected
+ #[cfg(target_arch = "wasm32")]
+ fn handle_offline(self, command: Self::Cmd) -> impl std::future::Future<Output = ()>;
+
+ /// run as cleanup after either an abort or a disconnect (e.g. reply to all pending requests with a disconnected error)
+ #[cfg(target_arch = "wasm32")]
+ fn on_abort(self) -> impl std::future::Future<Output = ()>;
+
+ /// handle connection errors from the core client logic
+ #[cfg(target_arch = "wasm32")]
+ fn handle_connection_error(
+ self,
+ error: ConnectionError,
+ ) -> impl std::future::Future<Output = ()>;
}
/// an actor that implements xmpp core (rfc6120), manages connection/stream status, and delegates any other logic to the generic which implements Logic, allowing different kinds of clients (e.g. chat, social, pubsub) to be built upon the same core
@@ -117,10 +167,15 @@ pub struct CoreClient<Lgc: Logic> {
logic: Lgc,
// config: LampConfig,
// TODO: will grow forever at this point, maybe not required as tasks will naturally shut down anyway?
- tasks: JoinSet<()>,
+ // tasks: JoinSet<()>,
}
-impl<Lgc: Logic + Clone + Send + 'static> CoreClient<Lgc> {
+impl<Lgc> CoreClient<Lgc>
+where
+ Lgc: Logic + Clone + 'static,
+ #[cfg(not(target_arch = "wasm32"))]
+ Lgc: Send,
+{
/// create a new actor
pub fn new(
jid: JID,
@@ -137,7 +192,7 @@ impl<Lgc: Logic + Clone + Send + 'static> CoreClient<Lgc> {
receiver,
connection_supervisor_shutdown,
logic,
- tasks: JoinSet::new(),
+ // tasks: JoinSet::new(),
}
}
@@ -157,42 +212,27 @@ impl<Lgc: Logic + Clone + Send + 'static> CoreClient<Lgc> {
else => break,
};
match msg {
- CoreClientCommand::Connect => {
+ CoreClientCommand::Connect(sender) => {
match self.connected {
Some(_) => {
+ sender.send(Err(ConnectionError::AlreadyConnected));
self.logic
.clone()
.handle_connection_error(ConnectionError::AlreadyConnected)
.await;
}
None => {
- let mut jid = self.jid.clone();
- let mut domain = jid.domainpart.clone();
// TODO: check what happens upon reconnection with same resource (this is probably what one wants to do and why jid should be mutated from a bare jid to one with a resource)
let streams_result =
- luz::connect_and_login(&mut jid, &*self.password, &mut domain)
- .await;
- let server: JID = match domain.parse() {
- Ok(j) => j,
- Err(e) => {
- self.logic
- .clone()
- .handle_connection_error(ConnectionError::InvalidServerJID(
- e,
- ))
- .await;
- continue;
- }
- };
+ luz::connect_and_login(&self.jid, &*self.password).await;
match streams_result {
- Ok(s) => {
+ Ok((s, full_jid)) => {
debug!("ok stream result");
let (shutdown_send, shutdown_recv) = oneshot::channel::<()>();
let (writer, supervisor) = SupervisorHandle::new(
s,
shutdown_send,
- jid.clone(),
- server.clone(),
+ full_jid.clone(),
self.password.clone(),
self.logic.clone(),
);
@@ -201,23 +241,25 @@ impl<Lgc: Logic + Clone + Send + 'static> CoreClient<Lgc> {
self.connection_supervisor_shutdown = shutdown_recv;
let connected = Connected {
- jid,
+ jid: full_jid.clone(),
write_handle: writer,
- server,
};
self.logic.clone().handle_connect(connected.clone()).await;
self.connected = Some((connected, supervisor));
+ // REMEMBER TO NOTIFY IT@S GOOD
+ sender.send(Ok(full_jid.resourcepart));
}
Err(e) => {
tracing::error!("error: {}", e);
self.logic
.clone()
.handle_connection_error(ConnectionError::ConnectionFailed(
- e.into(),
+ e.clone().into(),
))
.await;
+ sender.send(Err(ConnectionError::ConnectionFailed(e)));
}
}
}
@@ -240,10 +282,10 @@ impl<Lgc: Logic + Clone + Send + 'static> CoreClient<Lgc> {
},
CoreClientCommand::Command(command) => {
match self.connected.as_ref() {
- Some((w, s)) => self
- .tasks
- .spawn(self.logic.clone().handle_online(command, w.clone())),
- None => self.tasks.spawn(self.logic.clone().handle_offline(command)),
+ Some((w, s)) => {
+ tokio::spawn(self.logic.clone().handle_online(command, w.clone()))
+ }
+ None => tokio::spawn(self.logic.clone().handle_offline(command)),
};
}
}
@@ -255,7 +297,7 @@ impl<Lgc: Logic + Clone + Send + 'static> CoreClient<Lgc> {
pub enum CoreClientCommand<C> {
// TODO: login invisible xep-0186
/// connect to XMPP chat server. gets roster and publishes initial presence.
- Connect,
+ Connect(oneshot::Sender<Result<String, ConnectionError>>),
/// disconnect from XMPP chat server, sending unavailable presence then closing stream.
Disconnect,
/// TODO: generics