aboutsummaryrefslogtreecommitdiffstats
path: root/lampada/src/lib.rs
diff options
context:
space:
mode:
Diffstat (limited to 'lampada/src/lib.rs')
-rw-r--r--lampada/src/lib.rs120
1 files changed, 81 insertions, 39 deletions
diff --git a/lampada/src/lib.rs b/lampada/src/lib.rs
index 7346c42..d1d451e 100644
--- a/lampada/src/lib.rs
+++ b/lampada/src/lib.rs
@@ -1,3 +1,5 @@
+#![feature(where_clause_attrs)]
+
use std::{
collections::HashMap,
ops::{Deref, DerefMut},
@@ -10,6 +12,7 @@ pub use connection::write::WriteMessage;
pub use connection::SupervisorSender;
use error::ConnectionError;
use futures::{future::Fuse, FutureExt};
+use jid::{BareJID, FullJID};
use luz::JID;
use stanza::client::{
iq::{self, Iq, IqType},
@@ -21,6 +24,8 @@ use tokio::{
task::JoinSet,
time::timeout,
};
+#[cfg(target_arch = "wasm32")]
+use tokio_with_wasm::alias as tokio;
use tracing::{debug, info};
use crate::connection::write::WriteHandle;
@@ -32,24 +37,18 @@ pub mod error;
#[derive(Clone)]
pub struct Connected {
// full jid will stay stable across reconnections
- jid: JID,
+ jid: FullJID,
write_handle: WriteHandle,
- // the server jid
- server: JID,
}
impl Connected {
- pub fn jid(&self) -> &JID {
+ pub fn jid(&self) -> &FullJID {
&self.jid
}
pub fn write_handle(&self) -> &WriteHandle {
&self.write_handle
}
-
- pub fn server(&self) -> &JID {
- &self.server
- }
}
/// everything that a particular xmpp client must implement
@@ -58,20 +57,24 @@ pub trait Logic {
type Cmd;
/// run after binding to the stream (e.g. for a chat client, )
+ #[cfg(not(target_arch = "wasm32"))]
fn handle_connect(self, connection: Connected) -> impl std::future::Future<Output = ()> + Send;
/// run before closing the stream (e.g. send unavailable presence in a chat client)
+ #[cfg(not(target_arch = "wasm32"))]
fn handle_disconnect(
self,
connection: Connected,
) -> impl std::future::Future<Output = ()> + Send;
+ #[cfg(not(target_arch = "wasm32"))]
fn handle_stream_error(
self,
stream_error: StreamError,
) -> impl std::future::Future<Output = ()> + Send;
/// run to handle an incoming xmpp stanza
+ #[cfg(not(target_arch = "wasm32"))]
fn handle_stanza(
self,
stanza: Stanza,
@@ -79,6 +82,7 @@ pub trait Logic {
) -> impl std::future::Future<Output = ()> + std::marker::Send;
/// run to handle a command message when a connection is currently established
+ #[cfg(not(target_arch = "wasm32"))]
fn handle_online(
self,
command: Self::Cmd,
@@ -86,21 +90,67 @@ pub trait Logic {
) -> impl std::future::Future<Output = ()> + std::marker::Send;
/// run to handle a command message when disconnected
+ #[cfg(not(target_arch = "wasm32"))]
fn handle_offline(
self,
command: Self::Cmd,
) -> impl std::future::Future<Output = ()> + std::marker::Send;
/// run as cleanup after either an abort or a disconnect (e.g. reply to all pending requests with a disconnected error)
+ #[cfg(not(target_arch = "wasm32"))]
fn on_abort(self) -> impl std::future::Future<Output = ()> + std::marker::Send;
/// handle connection errors from the core client logic
+ #[cfg(not(target_arch = "wasm32"))]
fn handle_connection_error(
self,
error: ConnectionError,
) -> impl std::future::Future<Output = ()> + std::marker::Send;
// async fn handle_stream_error(self, error) {}
+ #[cfg(target_arch = "wasm32")]
+ fn handle_connect(self, connection: Connected) -> impl std::future::Future<Output = ()>;
+
+ /// run before closing the stream (e.g. send unavailable presence in a chat client)
+ #[cfg(target_arch = "wasm32")]
+ fn handle_disconnect(self, connection: Connected) -> impl std::future::Future<Output = ()>;
+
+ #[cfg(target_arch = "wasm32")]
+ fn handle_stream_error(
+ self,
+ stream_error: StreamError,
+ ) -> impl std::future::Future<Output = ()>;
+
+ /// run to handle an incoming xmpp stanza
+ #[cfg(target_arch = "wasm32")]
+ fn handle_stanza(
+ self,
+ stanza: Stanza,
+ connection: Connected,
+ ) -> impl std::future::Future<Output = ()>;
+
+ /// run to handle a command message when a connection is currently established
+ #[cfg(target_arch = "wasm32")]
+ fn handle_online(
+ self,
+ command: Self::Cmd,
+ connection: Connected,
+ ) -> impl std::future::Future<Output = ()>;
+
+ /// run to handle a command message when disconnected
+ #[cfg(target_arch = "wasm32")]
+ fn handle_offline(self, command: Self::Cmd) -> impl std::future::Future<Output = ()>;
+
+ /// run as cleanup after either an abort or a disconnect (e.g. reply to all pending requests with a disconnected error)
+ #[cfg(target_arch = "wasm32")]
+ fn on_abort(self) -> impl std::future::Future<Output = ()>;
+
+ /// handle connection errors from the core client logic
+ #[cfg(target_arch = "wasm32")]
+ fn handle_connection_error(
+ self,
+ error: ConnectionError,
+ ) -> impl std::future::Future<Output = ()>;
}
/// an actor that implements xmpp core (rfc6120), manages connection/stream status, and delegates any other logic to the generic which implements Logic, allowing different kinds of clients (e.g. chat, social, pubsub) to be built upon the same core
@@ -117,10 +167,15 @@ pub struct CoreClient<Lgc: Logic> {
logic: Lgc,
// config: LampConfig,
// TODO: will grow forever at this point, maybe not required as tasks will naturally shut down anyway?
- tasks: JoinSet<()>,
+ // tasks: JoinSet<()>,
}
-impl<Lgc: Logic + Clone + Send + 'static> CoreClient<Lgc> {
+impl<Lgc> CoreClient<Lgc>
+where
+ Lgc: Logic + Clone + 'static,
+ #[cfg(not(target_arch = "wasm32"))]
+ Lgc: Send,
+{
/// create a new actor
pub fn new(
jid: JID,
@@ -137,7 +192,7 @@ impl<Lgc: Logic + Clone + Send + 'static> CoreClient<Lgc> {
receiver,
connection_supervisor_shutdown,
logic,
- tasks: JoinSet::new(),
+ // tasks: JoinSet::new(),
}
}
@@ -157,42 +212,27 @@ impl<Lgc: Logic + Clone + Send + 'static> CoreClient<Lgc> {
else => break,
};
match msg {
- CoreClientCommand::Connect => {
+ CoreClientCommand::Connect(sender) => {
match self.connected {
Some(_) => {
+ sender.send(Err(ConnectionError::AlreadyConnected));
self.logic
.clone()
.handle_connection_error(ConnectionError::AlreadyConnected)
.await;
}
None => {
- let mut jid = self.jid.clone();
- let mut domain = jid.domainpart.clone();
// TODO: check what happens upon reconnection with same resource (this is probably what one wants to do and why jid should be mutated from a bare jid to one with a resource)
let streams_result =
- luz::connect_and_login(&mut jid, &*self.password, &mut domain)
- .await;
- let server: JID = match domain.parse() {
- Ok(j) => j,
- Err(e) => {
- self.logic
- .clone()
- .handle_connection_error(ConnectionError::InvalidServerJID(
- e,
- ))
- .await;
- continue;
- }
- };
+ luz::connect_and_login(&self.jid, &*self.password).await;
match streams_result {
- Ok(s) => {
+ Ok((s, full_jid)) => {
debug!("ok stream result");
let (shutdown_send, shutdown_recv) = oneshot::channel::<()>();
let (writer, supervisor) = SupervisorHandle::new(
s,
shutdown_send,
- jid.clone(),
- server.clone(),
+ full_jid.clone(),
self.password.clone(),
self.logic.clone(),
);
@@ -201,23 +241,25 @@ impl<Lgc: Logic + Clone + Send + 'static> CoreClient<Lgc> {
self.connection_supervisor_shutdown = shutdown_recv;
let connected = Connected {
- jid,
+ jid: full_jid.clone(),
write_handle: writer,
- server,
};
self.logic.clone().handle_connect(connected.clone()).await;
self.connected = Some((connected, supervisor));
+ // REMEMBER TO NOTIFY IT@S GOOD
+ sender.send(Ok(full_jid.resourcepart));
}
Err(e) => {
tracing::error!("error: {}", e);
self.logic
.clone()
.handle_connection_error(ConnectionError::ConnectionFailed(
- e.into(),
+ e.clone().into(),
))
.await;
+ sender.send(Err(ConnectionError::ConnectionFailed(e)));
}
}
}
@@ -240,10 +282,10 @@ impl<Lgc: Logic + Clone + Send + 'static> CoreClient<Lgc> {
},
CoreClientCommand::Command(command) => {
match self.connected.as_ref() {
- Some((w, s)) => self
- .tasks
- .spawn(self.logic.clone().handle_online(command, w.clone())),
- None => self.tasks.spawn(self.logic.clone().handle_offline(command)),
+ Some((w, s)) => {
+ tokio::spawn(self.logic.clone().handle_online(command, w.clone()))
+ }
+ None => tokio::spawn(self.logic.clone().handle_offline(command)),
};
}
}
@@ -255,7 +297,7 @@ impl<Lgc: Logic + Clone + Send + 'static> CoreClient<Lgc> {
pub enum CoreClientCommand<C> {
// TODO: login invisible xep-0186
/// connect to XMPP chat server. gets roster and publishes initial presence.
- Connect,
+ Connect(oneshot::Sender<Result<String, ConnectionError>>),
/// disconnect from XMPP chat server, sending unavailable presence then closing stream.
Disconnect,
/// TODO: generics