aboutsummaryrefslogtreecommitdiffstats
path: root/lampada
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--lampada/Cargo.toml6
-rw-r--r--lampada/src/connection/mod.rs27
-rw-r--r--lampada/src/connection/read.rs55
-rw-r--r--lampada/src/connection/write.rs11
-rw-r--r--lampada/src/error.rs11
-rw-r--r--lampada/src/lib.rs74
6 files changed, 130 insertions, 54 deletions
diff --git a/lampada/Cargo.toml b/lampada/Cargo.toml
index 22f133d..090aa9f 100644
--- a/lampada/Cargo.toml
+++ b/lampada/Cargo.toml
@@ -9,13 +9,13 @@ luz = { version = "0.1.0", path = "../luz" }
peanuts = { version = "0.1.0", git = "https://bunny.garden/peanuts" }
jid = { version = "0.1.0", path = "../jid" }
stanza = { version = "0.1.0", path = "../stanza", features = ["xep_0203"] }
-tokio = { version = "1.42.0", features = ["macros"] }
+tokio = { workspace = true, features = ["macros", "sync"] }
tracing = "0.1.41"
thiserror = "2.0.11"
[target.'cfg(target_arch = "wasm32")'.dependencies]
-tokio = { version = "1.42.0", features = ["macros", "rt", "time"] }
-wasm-bindgen-futures = "0.4"
+tokio = { workspace = true, features = ["macros", "rt", "time", "sync"] }
+tokio_with_wasm = { version = "0.8.2", features = ["macros", "rt", "time", "sync"] }
# [target.'cfg(not(target_arch = "wasm32"))'.dependencies]
# tokio = { version = "1.42.0", features = ["rt-multi-thread"] }
diff --git a/lampada/src/connection/mod.rs b/lampada/src/connection/mod.rs
index a3dde16..3a3187f 100644
--- a/lampada/src/connection/mod.rs
+++ b/lampada/src/connection/mod.rs
@@ -15,6 +15,8 @@ use tokio::{
sync::{mpsc, oneshot, Mutex},
task::{JoinHandle, JoinSet},
};
+#[cfg(target_arch = "wasm32")]
+use tokio_with_wasm::alias as tokio;
use tracing::info;
use write::{WriteControl, WriteControlHandle, WriteHandle, WriteMessage, WriteState};
@@ -46,7 +48,12 @@ pub enum SupervisorCommand {
Reconnect(ReadState),
}
-impl<Lgc: Logic + Clone + Send + 'static> Supervisor<Lgc> {
+impl<Lgc> Supervisor<Lgc>
+where
+ Lgc: Logic + Clone + 'static,
+ #[cfg(not(target_arch = "wasm32"))]
+ Lgc: Send,
+{
fn new(
command_recv: mpsc::Receiver<SupervisorCommand>,
reader_crash: oneshot::Receiver<(Option<StreamError>, ReadState)>,
@@ -129,7 +136,7 @@ impl<Lgc: Logic + Clone + Send + 'static> Supervisor<Lgc> {
self.reader_crash = recv;
self.read_control_handle = ReadControlHandle::reconnect(
read,
- read_state.tasks,
+ // read_state.tasks,
self.connected.clone(),
self.logic.clone(),
read_state.supervisor_control,
@@ -177,7 +184,7 @@ impl<Lgc: Logic + Clone + Send + 'static> Supervisor<Lgc> {
self.reader_crash = recv;
self.read_control_handle = ReadControlHandle::reconnect(
read,
- read_state.tasks,
+ // read_state.tasks,
self.connected.clone(),
self.logic.clone(),
read_state.supervisor_control,
@@ -225,7 +232,7 @@ impl<Lgc: Logic + Clone + Send + 'static> Supervisor<Lgc> {
self.reader_crash = recv;
self.read_control_handle = ReadControlHandle::reconnect(
read,
- read_state.tasks,
+ // read_state.tasks,
self.connected.clone(),
self.logic.clone(),
read_state.supervisor_control,
@@ -295,14 +302,19 @@ impl DerefMut for SupervisorSender {
}
impl SupervisorHandle {
- pub fn new<Lgc: Logic + Clone + Send + 'static>(
+ pub fn new<Lgc>(
streams: BoundJabberStream,
on_crash: oneshot::Sender<()>,
jid: JID,
server: JID,
password: Arc<String>,
logic: Lgc,
- ) -> (WriteHandle, Self) {
+ ) -> (WriteHandle, Self)
+ where
+ Lgc: Logic + Clone + 'static,
+ #[cfg(not(target_arch = "wasm32"))]
+ Lgc: Send,
+ {
let (command_send, command_recv) = mpsc::channel(20);
let (writer_crash_send, writer_crash_recv) = oneshot::channel();
let (reader_crash_send, reader_crash_recv) = oneshot::channel();
@@ -342,9 +354,6 @@ impl SupervisorHandle {
logic,
);
- #[cfg(target_arch = "wasm32")]
- wasm_bindgen_futures::spawn_local(async move { actor.run().await });
- #[cfg(not(target_arch = "wasm32"))]
tokio::spawn(async move { actor.run().await });
(
diff --git a/lampada/src/connection/read.rs b/lampada/src/connection/read.rs
index 4451d99..591a2cb 100644
--- a/lampada/src/connection/read.rs
+++ b/lampada/src/connection/read.rs
@@ -16,6 +16,8 @@ use tokio::{
sync::{mpsc, oneshot, Mutex},
task::{JoinHandle, JoinSet},
};
+#[cfg(target_arch = "wasm32")]
+use tokio_with_wasm::alias as tokio;
use tracing::info;
use crate::{Connected, Logic, WriteMessage};
@@ -29,7 +31,7 @@ pub struct Read<Lgc> {
disconnect_timedout: Fuse<oneshot::Receiver<()>>,
// all the threads spawned by the current connection session
- tasks: JoinSet<()>,
+ // tasks: JoinSet<()>,
// for handling incoming stanzas
// jabber server must be able to both terminate the connection from error, and ask for data from the client (such as supported XEPs)
@@ -46,13 +48,13 @@ pub struct Read<Lgc> {
pub struct ReadState {
pub supervisor_control: SupervisorSender,
// TODO: when a stream dies, the iq gets from the server should not be replied to on the new stream
- pub tasks: JoinSet<()>,
+ // pub tasks: JoinSet<()>,
}
impl<Lgc> Read<Lgc> {
fn new(
stream: BoundJabberReader,
- tasks: JoinSet<()>,
+ // tasks: JoinSet<()>,
connected: Connected,
logic: Lgc,
supervisor_control: SupervisorSender,
@@ -64,7 +66,7 @@ impl<Lgc> Read<Lgc> {
stream,
disconnecting: false,
disconnect_timedout: recv.fuse(),
- tasks,
+ // tasks,
connected,
logic,
supervisor_control,
@@ -74,7 +76,12 @@ impl<Lgc> Read<Lgc> {
}
}
-impl<Lgc: Clone + Logic + Send + 'static> Read<Lgc> {
+impl<Lgc> Read<Lgc>
+where
+ Lgc: Logic + Clone + 'static,
+ #[cfg(not(target_arch = "wasm32"))]
+ Lgc: Send,
+{
async fn run(mut self) {
println!("started read thread");
// let stanza = self.stream.read::<Stanza>().await;
@@ -100,7 +107,7 @@ impl<Lgc: Clone + Logic + Send + 'static> Read<Lgc> {
})
},
ReadControl::Abort(sender) => {
- let _ = sender.send(ReadState { supervisor_control: self.supervisor_control, tasks: self.tasks });
+ let _ = sender.send(ReadState { supervisor_control: self.supervisor_control });
break;
},
};
@@ -112,11 +119,11 @@ impl<Lgc: Clone + Logic + Send + 'static> Read<Lgc> {
match s {
Stanza::Error(error) => {
self.logic.clone().handle_stream_error(error).await;
- self.supervisor_control.send(SupervisorCommand::Reconnect(ReadState { supervisor_control: self.supervisor_control.clone(), tasks: self.tasks })).await;
+ self.supervisor_control.send(SupervisorCommand::Reconnect(ReadState { supervisor_control: self.supervisor_control.clone() })).await;
break;
},
_ => {
- self.tasks.spawn(self.logic.clone().handle_stanza(s, self.connected.clone()));
+ tokio::spawn(self.logic.clone().handle_stanza(s, self.connected.clone()));
}
};
},
@@ -143,7 +150,7 @@ impl<Lgc: Clone + Logic + Send + 'static> Read<Lgc> {
_ => None,
};
- let _ = self.on_crash.send((stream_error, ReadState { supervisor_control: self.supervisor_control, tasks: self.tasks }));
+ let _ = self.on_crash.send((stream_error, ReadState { supervisor_control: self.supervisor_control }));
}
break;
},
@@ -189,27 +196,29 @@ impl DerefMut for ReadControlHandle {
}
impl ReadControlHandle {
- pub fn new<Lgc: Clone + Logic + Send + 'static>(
+ pub fn new<Lgc>(
stream: BoundJabberReader,
connected: Connected,
logic: Lgc,
supervisor_control: SupervisorSender,
on_crash: oneshot::Sender<(Option<StreamError>, ReadState)>,
- ) -> Self {
+ ) -> Self
+ where
+ Lgc: Logic + Clone + 'static,
+ #[cfg(not(target_arch = "wasm32"))]
+ Lgc: Send,
+ {
let (control_sender, control_receiver) = mpsc::channel(20);
let actor = Read::new(
stream,
- JoinSet::new(),
+ // JoinSet::new(),
connected,
logic,
supervisor_control,
control_receiver,
on_crash,
);
- #[cfg(target_arch = "wasm32")]
- wasm_bindgen_futures::spawn_local(async move { actor.run().await });
- #[cfg(not(target_arch = "wasm32"))]
tokio::spawn(async move { actor.run().await });
Self {
@@ -217,28 +226,30 @@ impl ReadControlHandle {
}
}
- pub fn reconnect<Lgc: Clone + Logic + Send + 'static>(
+ pub fn reconnect<Lgc>(
stream: BoundJabberReader,
- tasks: JoinSet<()>,
+ // tasks: JoinSet<()>,
connected: Connected,
logic: Lgc,
supervisor_control: SupervisorSender,
on_crash: oneshot::Sender<(Option<StreamError>, ReadState)>,
- ) -> Self {
+ ) -> Self
+ where
+ Lgc: Logic + Clone + 'static,
+ #[cfg(not(target_arch = "wasm32"))]
+ Lgc: Send,
+ {
let (control_sender, control_receiver) = mpsc::channel(20);
let actor = Read::new(
stream,
- tasks,
+ // tasks,
connected,
logic,
supervisor_control,
control_receiver,
on_crash,
);
- #[cfg(target_arch = "wasm32")]
- wasm_bindgen_futures::spawn_local(async move { actor.run().await });
- #[cfg(not(target_arch = "wasm32"))]
tokio::spawn(async move { actor.run().await });
Self {
diff --git a/lampada/src/connection/write.rs b/lampada/src/connection/write.rs
index 4c6ed24..b982eea 100644
--- a/lampada/src/connection/write.rs
+++ b/lampada/src/connection/write.rs
@@ -8,6 +8,8 @@ use tokio::{
sync::{mpsc, oneshot},
task::JoinHandle,
};
+#[cfg(target_arch = "wasm32")]
+use tokio_with_wasm::alias as tokio;
use crate::error::WriteError;
@@ -218,9 +220,6 @@ impl WriteControlHandle {
let (stanza_sender, stanza_receiver) = mpsc::channel(20);
let actor = Write::new(stream, stanza_receiver, control_receiver, on_crash);
- #[cfg(target_arch = "wasm32")]
- wasm_bindgen_futures::spawn_local(async move { actor.run().await });
- #[cfg(not(target_arch = "wasm32"))]
tokio::spawn(async move { actor.run().await });
(
@@ -242,9 +241,6 @@ impl WriteControlHandle {
let (control_sender, control_receiver) = mpsc::channel(20);
let actor = Write::new(stream, stanza_receiver, control_receiver, on_crash);
- #[cfg(target_arch = "wasm32")]
- wasm_bindgen_futures::spawn_local(async move { actor.run_reconnected(retry_msg).await });
- #[cfg(not(target_arch = "wasm32"))]
tokio::spawn(async move { actor.run_reconnected(retry_msg).await });
Self {
@@ -261,9 +257,6 @@ impl WriteControlHandle {
let actor = Write::new(stream, stanza_receiver, control_receiver, on_crash);
- #[cfg(target_arch = "wasm32")]
- wasm_bindgen_futures::spawn_local(async move { actor.run().await });
- #[cfg(not(target_arch = "wasm32"))]
tokio::spawn(async move { actor.run().await });
Self {
diff --git a/lampada/src/error.rs b/lampada/src/error.rs
index 8104155..40be012 100644
--- a/lampada/src/error.rs
+++ b/lampada/src/error.rs
@@ -1,11 +1,14 @@
use std::sync::Arc;
+#[cfg(not(target_arch = "wasm32"))]
+use ::tokio::time::error::Elapsed;
use stanza::client::Stanza;
use thiserror::Error;
-use tokio::{
- sync::{mpsc::error::SendError, oneshot::error::RecvError},
- time::error::Elapsed,
-};
+use tokio::sync::{mpsc::error::SendError, oneshot::error::RecvError};
+#[cfg(target_arch = "wasm32")]
+use tokio::time::Elapsed;
+#[cfg(target_arch = "wasm32")]
+use tokio_with_wasm::alias as tokio;
#[derive(Debug, Error, Clone)]
pub enum ConnectionError {
diff --git a/lampada/src/lib.rs b/lampada/src/lib.rs
index 7346c42..dacc56d 100644
--- a/lampada/src/lib.rs
+++ b/lampada/src/lib.rs
@@ -1,3 +1,5 @@
+#![feature(where_clause_attrs)]
+
use std::{
collections::HashMap,
ops::{Deref, DerefMut},
@@ -21,6 +23,8 @@ use tokio::{
task::JoinSet,
time::timeout,
};
+#[cfg(target_arch = "wasm32")]
+use tokio_with_wasm::alias as tokio;
use tracing::{debug, info};
use crate::connection::write::WriteHandle;
@@ -58,20 +62,24 @@ pub trait Logic {
type Cmd;
/// run after binding to the stream (e.g. for a chat client, )
+ #[cfg(not(target_arch = "wasm32"))]
fn handle_connect(self, connection: Connected) -> impl std::future::Future<Output = ()> + Send;
/// run before closing the stream (e.g. send unavailable presence in a chat client)
+ #[cfg(not(target_arch = "wasm32"))]
fn handle_disconnect(
self,
connection: Connected,
) -> impl std::future::Future<Output = ()> + Send;
+ #[cfg(not(target_arch = "wasm32"))]
fn handle_stream_error(
self,
stream_error: StreamError,
) -> impl std::future::Future<Output = ()> + Send;
/// run to handle an incoming xmpp stanza
+ #[cfg(not(target_arch = "wasm32"))]
fn handle_stanza(
self,
stanza: Stanza,
@@ -79,6 +87,7 @@ pub trait Logic {
) -> impl std::future::Future<Output = ()> + std::marker::Send;
/// run to handle a command message when a connection is currently established
+ #[cfg(not(target_arch = "wasm32"))]
fn handle_online(
self,
command: Self::Cmd,
@@ -86,21 +95,67 @@ pub trait Logic {
) -> impl std::future::Future<Output = ()> + std::marker::Send;
/// run to handle a command message when disconnected
+ #[cfg(not(target_arch = "wasm32"))]
fn handle_offline(
self,
command: Self::Cmd,
) -> impl std::future::Future<Output = ()> + std::marker::Send;
/// run as cleanup after either an abort or a disconnect (e.g. reply to all pending requests with a disconnected error)
+ #[cfg(not(target_arch = "wasm32"))]
fn on_abort(self) -> impl std::future::Future<Output = ()> + std::marker::Send;
/// handle connection errors from the core client logic
+ #[cfg(not(target_arch = "wasm32"))]
fn handle_connection_error(
self,
error: ConnectionError,
) -> impl std::future::Future<Output = ()> + std::marker::Send;
// async fn handle_stream_error(self, error) {}
+ #[cfg(target_arch = "wasm32")]
+ fn handle_connect(self, connection: Connected) -> impl std::future::Future<Output = ()>;
+
+ /// run before closing the stream (e.g. send unavailable presence in a chat client)
+ #[cfg(target_arch = "wasm32")]
+ fn handle_disconnect(self, connection: Connected) -> impl std::future::Future<Output = ()>;
+
+ #[cfg(target_arch = "wasm32")]
+ fn handle_stream_error(
+ self,
+ stream_error: StreamError,
+ ) -> impl std::future::Future<Output = ()>;
+
+ /// run to handle an incoming xmpp stanza
+ #[cfg(target_arch = "wasm32")]
+ fn handle_stanza(
+ self,
+ stanza: Stanza,
+ connection: Connected,
+ ) -> impl std::future::Future<Output = ()>;
+
+ /// run to handle a command message when a connection is currently established
+ #[cfg(target_arch = "wasm32")]
+ fn handle_online(
+ self,
+ command: Self::Cmd,
+ connection: Connected,
+ ) -> impl std::future::Future<Output = ()>;
+
+ /// run to handle a command message when disconnected
+ #[cfg(target_arch = "wasm32")]
+ fn handle_offline(self, command: Self::Cmd) -> impl std::future::Future<Output = ()>;
+
+ /// run as cleanup after either an abort or a disconnect (e.g. reply to all pending requests with a disconnected error)
+ #[cfg(target_arch = "wasm32")]
+ fn on_abort(self) -> impl std::future::Future<Output = ()>;
+
+ /// handle connection errors from the core client logic
+ #[cfg(target_arch = "wasm32")]
+ fn handle_connection_error(
+ self,
+ error: ConnectionError,
+ ) -> impl std::future::Future<Output = ()>;
}
/// an actor that implements xmpp core (rfc6120), manages connection/stream status, and delegates any other logic to the generic which implements Logic, allowing different kinds of clients (e.g. chat, social, pubsub) to be built upon the same core
@@ -117,10 +172,15 @@ pub struct CoreClient<Lgc: Logic> {
logic: Lgc,
// config: LampConfig,
// TODO: will grow forever at this point, maybe not required as tasks will naturally shut down anyway?
- tasks: JoinSet<()>,
+ // tasks: JoinSet<()>,
}
-impl<Lgc: Logic + Clone + Send + 'static> CoreClient<Lgc> {
+impl<Lgc> CoreClient<Lgc>
+where
+ Lgc: Logic + Clone + 'static,
+ #[cfg(not(target_arch = "wasm32"))]
+ Lgc: Send,
+{
/// create a new actor
pub fn new(
jid: JID,
@@ -137,7 +197,7 @@ impl<Lgc: Logic + Clone + Send + 'static> CoreClient<Lgc> {
receiver,
connection_supervisor_shutdown,
logic,
- tasks: JoinSet::new(),
+ // tasks: JoinSet::new(),
}
}
@@ -240,10 +300,10 @@ impl<Lgc: Logic + Clone + Send + 'static> CoreClient<Lgc> {
},
CoreClientCommand::Command(command) => {
match self.connected.as_ref() {
- Some((w, s)) => self
- .tasks
- .spawn(self.logic.clone().handle_online(command, w.clone())),
- None => self.tasks.spawn(self.logic.clone().handle_offline(command)),
+ Some((w, s)) => {
+ tokio::spawn(self.logic.clone().handle_online(command, w.clone()))
+ }
+ None => tokio::spawn(self.logic.clone().handle_offline(command)),
};
}
}