aboutsummaryrefslogtreecommitdiffstats
path: root/lampada
diff options
context:
space:
mode:
authorLibravatar cel 🌸 <cel@bunny.garden>2025-03-26 14:29:40 +0000
committerLibravatar cel 🌸 <cel@bunny.garden>2025-03-26 14:29:40 +0000
commit2211f324782cdc617b4b5ecd071178e372539fe4 (patch)
treea5ea5ce11d748424447dee23173d3cb8aec648ea /lampada
parent2f8671978e18c1e1e7834056ae674f32fbde3868 (diff)
downloadluz-2211f324782cdc617b4b5ecd071178e372539fe4.tar.gz
luz-2211f324782cdc617b4b5ecd071178e372539fe4.tar.bz2
luz-2211f324782cdc617b4b5ecd071178e372539fe4.zip
refactor: rename crates and move client logic to separate crate `filament`
Diffstat (limited to 'lampada')
-rw-r--r--lampada/.gitignore2
-rw-r--r--lampada/Cargo.toml17
-rw-r--r--lampada/README.md3
-rw-r--r--lampada/scratch90
-rw-r--r--lampada/src/connection/mod.rs374
-rw-r--r--lampada/src/connection/read.rs233
-rw-r--r--lampada/src/connection/write.rs258
-rw-r--r--lampada/src/error.rs82
-rw-r--r--lampada/src/lib.rs238
-rw-r--r--lampada/src/main.rs42
10 files changed, 1339 insertions, 0 deletions
diff --git a/lampada/.gitignore b/lampada/.gitignore
new file mode 100644
index 0000000..60868fd
--- /dev/null
+++ b/lampada/.gitignore
@@ -0,0 +1,2 @@
+luz.db
+.sqlx/
diff --git a/lampada/Cargo.toml b/lampada/Cargo.toml
new file mode 100644
index 0000000..856fd7d
--- /dev/null
+++ b/lampada/Cargo.toml
@@ -0,0 +1,17 @@
+[package]
+name = "lampada"
+version = "0.1.0"
+edition = "2021"
+
+[dependencies]
+futures = "0.3.31"
+luz = { version = "0.1.0", path = "../luz" }
+peanuts = { version = "0.1.0", path = "../../peanuts" }
+jid = { version = "0.1.0", path = "../jid", features = ["sqlx"] }
+stanza = { version = "0.1.0", path = "../stanza", features = ["xep_0203"] }
+tokio = "1.42.0"
+tokio-stream = "0.1.17"
+tokio-util = "0.7.13"
+tracing = "0.1.41"
+tracing-subscriber = "0.3.19"
+thiserror = "2.0.11"
diff --git a/lampada/README.md b/lampada/README.md
new file mode 100644
index 0000000..dc5f016
--- /dev/null
+++ b/lampada/README.md
@@ -0,0 +1,3 @@
+# lampada
+
+a core xmpp client that graciously manages streams, delegating logic to an implementor of a trait.
diff --git a/lampada/scratch b/lampada/scratch
new file mode 100644
index 0000000..e013ded
--- /dev/null
+++ b/lampada/scratch
@@ -0,0 +1,90 @@
+macaw/céu
+canopy/sol
+
+# logic:
+
+- db
+- pending iqs
+- ui update sender
+
+ ## logic methods
+
+ - handle_offline: called by lamp
+ - handle_online: called by lamp
+ - handle_stanza: called by read thread
+
+ - handle_connect: called by lamp
+ - handle_disconnect: called by lamp
+
+ - handle_error?: called by lamp handle threads and read thread for error logging
+ - handle_stream_error: called by supervisor when stream needs to be reset
+
+
+# lamp:
+
+- login jid (bare or full)
+- password provider
+- lamp command receiver
+- connected session state(connected, supervisorhandle to disconnect current connected session)
+- connected state (if intended to be connected or not, for retrying reconnection every minute or something)
+- on_crash for connection supervisor
+- internal logic struct which has methods to handle logic commands
+
+# connected:
+
+- writehandle
+- current full jid for connected session
+
+# supervisor:
+
+- control_recv
+- read_thread_crash
+- write_thread_crash
+- read_control_handle
+- write_control_handle
+- on_crash
+- connected
+- password
+- logic
+
+# read:
+
+must be passed around when crash
+- supervisor_control
+- tasks
+
+can be cloned from supervisor
+- connected .
+- logic .
+
+can be recreated by supervisor
+- stream
+- disconnecting
+- disconnect_timedout
+- on_crash
+- control_recv
+
+# write:
+
+must be passed around when crash
+- stanza_recv
+
+can be recreated by supervisor
+- stream
+- on_crash
+- control_recv
+
+
+message types:
+
+command:
+
+- getroster
+- send message
+- etc.
+
+lamp commands:
+
+- connect
+- disconnect
+- command(command)
diff --git a/lampada/src/connection/mod.rs b/lampada/src/connection/mod.rs
new file mode 100644
index 0000000..1e767b0
--- /dev/null
+++ b/lampada/src/connection/mod.rs
@@ -0,0 +1,374 @@
+// TODO: consider if this needs to be handled by a supervisor or could be handled by luz directly
+
+use std::{
+ collections::HashMap,
+ ops::{Deref, DerefMut},
+ sync::Arc,
+ time::Duration,
+};
+
+use jid::JID;
+use luz::{connection::Tls, jabber_stream::bound_stream::BoundJabberStream};
+use read::{ReadControl, ReadControlHandle, ReadState};
+use stanza::client::Stanza;
+use tokio::{
+ sync::{mpsc, oneshot, Mutex},
+ task::{JoinHandle, JoinSet},
+};
+use tracing::info;
+use write::{WriteControl, WriteControlHandle, WriteHandle, WriteMessage, WriteState};
+
+use crate::{
+ error::{ConnectionError, WriteError},
+ Connected, Logic,
+};
+
+mod read;
+pub(crate) mod write;
+
+pub struct Supervisor<Lgc> {
+ command_recv: mpsc::Receiver<SupervisorCommand>,
+ reader_crash: oneshot::Receiver<ReadState>,
+ writer_crash: oneshot::Receiver<(WriteMessage, WriteState)>,
+ read_control_handle: ReadControlHandle,
+ write_control_handle: WriteControlHandle,
+ on_crash: oneshot::Sender<()>,
+ // jid in connected stays the same over the life of the supervisor (the connection session)
+ connected: Connected,
+ password: Arc<String>,
+ logic: Lgc,
+}
+
+pub enum SupervisorCommand {
+ Disconnect,
+ // for if there was a stream error, require to reconnect
+ // couldn't stream errors just cause a crash? lol
+ Reconnect(ChildState),
+}
+
+pub enum ChildState {
+ Write(WriteState),
+ Read(ReadState),
+}
+
+impl<Lgc: Logic + Clone + Send + 'static> Supervisor<Lgc> {
+ fn new(
+ command_recv: mpsc::Receiver<SupervisorCommand>,
+ reader_crash: oneshot::Receiver<ReadState>,
+ writer_crash: oneshot::Receiver<(WriteMessage, WriteState)>,
+ read_control_handle: ReadControlHandle,
+ write_control_handle: WriteControlHandle,
+ on_crash: oneshot::Sender<()>,
+ connected: Connected,
+ password: Arc<String>,
+ logic: Lgc,
+ ) -> Self {
+ Self {
+ command_recv,
+ reader_crash,
+ writer_crash,
+ read_control_handle,
+ write_control_handle,
+ on_crash,
+ connected,
+ password,
+ logic,
+ }
+ }
+
+ async fn run(mut self) {
+ loop {
+ tokio::select! {
+ Some(msg) = self.command_recv.recv() => {
+ match msg {
+ SupervisorCommand::Disconnect => {
+ info!("disconnecting");
+ self.logic
+ .handle_disconnect(self.connected.clone())
+ .await;
+ let _ = self.write_control_handle.send(WriteControl::Disconnect).await;
+ let _ = self.read_control_handle.send(ReadControl::Disconnect).await;
+ info!("sent disconnect command");
+ tokio::select! {
+ _ = async { tokio::join!(
+ async { let _ = (&mut self.write_control_handle.handle).await; },
+ async { let _ = (&mut self.read_control_handle.handle).await; }
+ ) } => {},
+ // TODO: config timeout
+ _ = async { tokio::time::sleep(Duration::from_secs(5)) } => {
+ (&mut self.read_control_handle.handle).abort();
+ (&mut self.write_control_handle.handle).abort();
+ }
+ }
+ info!("disconnected");
+ break;
+ },
+ // TODO: Reconnect without aborting, gentle reconnect.
+ SupervisorCommand::Reconnect(state) => {
+ // TODO: please omfg
+ // send abort to read stream, as already done, consider
+ let (read_state, mut write_state);
+ match state {
+ ChildState::Write(receiver) => {
+ write_state = receiver;
+ let (send, recv) = oneshot::channel();
+ let _ = self.read_control_handle.send(ReadControl::Abort(send)).await;
+ // TODO: need a tokio select, in case the state arrives from somewhere else
+ if let Ok(state) = recv.await {
+ read_state = state;
+ } else {
+ break
+ }
+ },
+ ChildState::Read(read) => {
+ read_state = read;
+ let (send, recv) = oneshot::channel();
+ let _ = self.write_control_handle.send(WriteControl::Abort(send)).await;
+ // TODO: need a tokio select, in case the state arrives from somewhere else
+ if let Ok(state) = recv.await {
+ write_state = state;
+ } else {
+ break
+ }
+ },
+ }
+
+ let mut jid = self.connected.jid.clone();
+ let mut domain = jid.domainpart.clone();
+ // TODO: make sure connect_and_login does not modify the jid, but instead returns a jid. or something like that
+ let connection = luz::connect_and_login(&mut jid, &*self.password, &mut domain).await;
+ match connection {
+ Ok(c) => {
+ let (read, write) = c.split();
+ let (send, recv) = oneshot::channel();
+ self.writer_crash = recv;
+ self.write_control_handle =
+ WriteControlHandle::reconnect(write, send, write_state.stanza_recv);
+ let (send, recv) = oneshot::channel();
+ self.reader_crash = recv;
+ self.read_control_handle = ReadControlHandle::reconnect(
+ read,
+ read_state.tasks,
+ self.connected.clone(),
+ self.logic.clone(),
+ read_state.supervisor_control,
+ send,
+ );
+ },
+ Err(e) => {
+ // if reconnection failure, respond to all current write messages with lost connection error. the received processes should complete themselves.
+ write_state.stanza_recv.close();
+ while let Some(msg) = write_state.stanza_recv.recv().await {
+ let _ = msg.respond_to.send(Err(WriteError::LostConnection));
+ }
+ // TODO: is this the correct error?
+ self.logic.handle_connection_error(ConnectionError::LostConnection).await;
+ break;
+ },
+ }
+ },
+ }
+ },
+ Ok((write_msg, mut write_state)) = &mut self.writer_crash => {
+ // consider awaiting/aborting the read and write threads
+ let (send, recv) = oneshot::channel();
+ let _ = self.read_control_handle.send(ReadControl::Abort(send)).await;
+ let read_state = tokio::select! {
+ Ok(s) = recv => s,
+ Ok(s) = &mut self.reader_crash => s,
+ // in case, just break as irrecoverable
+ else => break,
+ };
+
+ let mut jid = self.connected.jid.clone();
+ let mut domain = jid.domainpart.clone();
+ // TODO: same here
+ let connection = luz::connect_and_login(&mut jid, &*self.password, &mut domain).await;
+ match connection {
+ Ok(c) => {
+ let (read, write) = c.split();
+ let (send, recv) = oneshot::channel();
+ self.writer_crash = recv;
+ self.write_control_handle =
+ WriteControlHandle::reconnect_retry(write, send, write_state.stanza_recv, write_msg);
+ let (send, recv) = oneshot::channel();
+ self.reader_crash = recv;
+ self.read_control_handle = ReadControlHandle::reconnect(
+ read,
+ read_state.tasks,
+ self.connected.clone(),
+ self.logic.clone(),
+ read_state.supervisor_control,
+ send,
+ );
+ },
+ Err(e) => {
+ // if reconnection failure, respond to all current write messages with lost connection error. the received processes should complete themselves.
+ write_state.stanza_recv.close();
+ let _ = write_msg.respond_to.send(Err(WriteError::LostConnection));
+ while let Some(msg) = write_state.stanza_recv.recv().await {
+ let _ = msg.respond_to.send(Err(WriteError::LostConnection));
+ }
+ // TODO: is this the correct error to send?
+ self.logic.handle_connection_error(ConnectionError::LostConnection).await;
+ break;
+ },
+ }
+ },
+ Ok(read_state) = &mut self.reader_crash => {
+ let (send, recv) = oneshot::channel();
+ let _ = self.write_control_handle.send(WriteControl::Abort(send)).await;
+ let (retry_msg, mut write_state) = tokio::select! {
+ Ok(s) = recv => (None, s),
+ Ok(s) = &mut self.writer_crash => (Some(s.0), s.1),
+ // in case, just break as irrecoverable
+ else => break,
+ };
+
+ let mut jid = self.connected.jid.clone();
+ let mut domain = jid.domainpart.clone();
+ let connection = luz::connect_and_login(&mut jid, &*self.password, &mut domain).await;
+ match connection {
+ Ok(c) => {
+ let (read, write) = c.split();
+ let (send, recv) = oneshot::channel();
+ self.writer_crash = recv;
+ if let Some(msg) = retry_msg {
+ self.write_control_handle =
+ WriteControlHandle::reconnect_retry(write, send, write_state.stanza_recv, msg);
+ } else {
+ self.write_control_handle = WriteControlHandle::reconnect(write, send, write_state.stanza_recv)
+ }
+ let (send, recv) = oneshot::channel();
+ self.reader_crash = recv;
+ self.read_control_handle = ReadControlHandle::reconnect(
+ read,
+ read_state.tasks,
+ self.connected.clone(),
+ self.logic.clone(),
+ read_state.supervisor_control,
+ send,
+ );
+ },
+ Err(e) => {
+ // if reconnection failure, respond to all current messages with lost connection error.
+ write_state.stanza_recv.close();
+ if let Some(msg) = retry_msg {
+ msg.respond_to.send(Err(WriteError::LostConnection));
+ }
+ while let Some(msg) = write_state.stanza_recv.recv().await {
+ msg.respond_to.send(Err(WriteError::LostConnection));
+ }
+ // TODO: is this the correct error?
+ self.logic.handle_connection_error(ConnectionError::LostConnection).await;
+ break;
+ },
+ }
+ },
+ else => break,
+ }
+ }
+ // TODO: maybe don't just on_crash
+ let _ = self.on_crash.send(());
+ }
+}
+
+pub struct SupervisorHandle {
+ sender: SupervisorSender,
+ handle: JoinHandle<()>,
+}
+
+impl Deref for SupervisorHandle {
+ type Target = SupervisorSender;
+
+ fn deref(&self) -> &Self::Target {
+ &self.sender
+ }
+}
+
+impl DerefMut for SupervisorHandle {
+ fn deref_mut(&mut self) -> &mut Self::Target {
+ &mut self.sender
+ }
+}
+
+#[derive(Clone)]
+pub struct SupervisorSender {
+ sender: mpsc::Sender<SupervisorCommand>,
+}
+
+impl Deref for SupervisorSender {
+ type Target = mpsc::Sender<SupervisorCommand>;
+
+ fn deref(&self) -> &Self::Target {
+ &self.sender
+ }
+}
+
+impl DerefMut for SupervisorSender {
+ fn deref_mut(&mut self) -> &mut Self::Target {
+ &mut self.sender
+ }
+}
+
+impl SupervisorHandle {
+ pub fn new<Lgc: Logic + Clone + Send + 'static>(
+ streams: BoundJabberStream<Tls>,
+ on_crash: oneshot::Sender<()>,
+ jid: JID,
+ password: Arc<String>,
+ logic: Lgc,
+ ) -> (WriteHandle, Self) {
+ let (command_send, command_recv) = mpsc::channel(20);
+ let (writer_crash_send, writer_crash_recv) = oneshot::channel();
+ let (reader_crash_send, reader_crash_recv) = oneshot::channel();
+
+ let (read_stream, write_stream) = streams.split();
+
+ let (write_handle, write_control_handle) =
+ WriteControlHandle::new(write_stream, writer_crash_send);
+
+ let connected = Connected {
+ jid,
+ write_handle: write_handle.clone(),
+ };
+
+ let supervisor_sender = SupervisorSender {
+ sender: command_send,
+ };
+
+ let read_control_handle = ReadControlHandle::new(
+ read_stream,
+ connected.clone(),
+ logic.clone(),
+ supervisor_sender.clone(),
+ reader_crash_send,
+ );
+
+ let actor = Supervisor::new(
+ command_recv,
+ reader_crash_recv,
+ writer_crash_recv,
+ read_control_handle,
+ write_control_handle,
+ on_crash,
+ connected,
+ password,
+ logic,
+ );
+
+ let handle = tokio::spawn(async move { actor.run().await });
+
+ (
+ write_handle,
+ Self {
+ sender: supervisor_sender,
+ handle,
+ },
+ )
+ }
+
+ pub fn sender(&self) -> SupervisorSender {
+ self.sender.clone()
+ }
+}
diff --git a/lampada/src/connection/read.rs b/lampada/src/connection/read.rs
new file mode 100644
index 0000000..cc69387
--- /dev/null
+++ b/lampada/src/connection/read.rs
@@ -0,0 +1,233 @@
+use std::{
+ collections::HashMap,
+ marker::PhantomData,
+ ops::{Deref, DerefMut},
+ str::FromStr,
+ sync::Arc,
+ time::Duration,
+};
+
+use luz::{connection::Tls, jabber_stream::bound_stream::BoundJabberReader};
+use stanza::client::Stanza;
+use tokio::{
+ sync::{mpsc, oneshot, Mutex},
+ task::{JoinHandle, JoinSet},
+};
+use tracing::info;
+
+use crate::{Connected, Logic};
+
+use super::{write::WriteHandle, SupervisorCommand, SupervisorSender};
+
+/// read actor
+pub struct Read<Lgc> {
+ stream: BoundJabberReader<Tls>,
+ disconnecting: bool,
+ disconnect_timedout: oneshot::Receiver<()>,
+
+ // all the threads spawned by the current connection session
+ tasks: JoinSet<()>,
+
+ // for handling incoming stanzas
+ // jabber server must be able to both terminate the connection from error, and ask for data from the client (such as supported XEPs)
+ connected: Connected,
+ logic: Lgc,
+ supervisor_control: SupervisorSender,
+
+ // control stuff
+ control_receiver: mpsc::Receiver<ReadControl>,
+ on_crash: oneshot::Sender<ReadState>,
+}
+
+/// when a crash/abort occurs, this gets sent back to the supervisor, so that the connection session can continue
+pub struct ReadState {
+ pub supervisor_control: SupervisorSender,
+ // TODO: when a stream dies, the iq gets from the server should not be replied to on the new stream
+ pub tasks: JoinSet<()>,
+}
+
+impl<Lgc> Read<Lgc> {
+ fn new(
+ stream: BoundJabberReader<Tls>,
+ tasks: JoinSet<()>,
+ connected: Connected,
+ logic: Lgc,
+ supervisor_control: SupervisorSender,
+ control_receiver: mpsc::Receiver<ReadControl>,
+ on_crash: oneshot::Sender<ReadState>,
+ ) -> Self {
+ let (_send, recv) = oneshot::channel();
+ Self {
+ stream,
+ disconnecting: false,
+ disconnect_timedout: recv,
+ tasks,
+ connected,
+ logic,
+ supervisor_control,
+ control_receiver,
+ on_crash,
+ }
+ }
+}
+
+impl<Lgc: Clone + Logic + Send + 'static> Read<Lgc> {
+ async fn run(mut self) {
+ println!("started read thread");
+ // let stanza = self.stream.read::<Stanza>().await;
+ // println!("{:?}", stanza);
+ loop {
+ tokio::select! {
+ // if still haven't received the end tag in time, just kill itself
+ // TODO: is this okay??? what if notification thread dies?
+ Ok(()) = &mut self.disconnect_timedout => {
+ info!("disconnect_timedout");
+ break;
+ }
+ Some(msg) = self.control_receiver.recv() => {
+ match msg {
+ // when disconnect received,
+ ReadControl::Disconnect => {
+ let (send, recv) = oneshot::channel();
+ self.disconnect_timedout = recv;
+ self.disconnecting = true;
+ tokio::spawn(async {
+ tokio::time::sleep(Duration::from_secs(10)).await;
+ let _ = send.send(());
+ })
+ },
+ ReadControl::Abort(sender) => {
+ let _ = sender.send(ReadState { supervisor_control: self.supervisor_control, tasks: self.tasks });
+ break;
+ },
+ };
+ },
+ s = self.stream.read::<Stanza>() => {
+ println!("read stanza");
+ match s {
+ Ok(s) => {
+ self.tasks.spawn(self.logic.clone().handle_stanza(s, self.connected.clone(), self.supervisor_control.clone()));
+ },
+ Err(e) => {
+ println!("error: {:?}", e);
+ // TODO: NEXT write the correct error stanza depending on error, decide whether to reconnect or properly disconnect, depending on if disconnecting is true
+ // match e {
+ // peanuts::Error::ReadError(error) => todo!(),
+ // peanuts::Error::Utf8Error(utf8_error) => todo!(),
+ // peanuts::Error::ParseError(_) => todo!(),
+ // peanuts::Error::EntityProcessError(_) => todo!(),
+ // peanuts::Error::InvalidCharRef(_) => todo!(),
+ // peanuts::Error::DuplicateNameSpaceDeclaration(namespace_declaration) => todo!(),
+ // peanuts::Error::DuplicateAttribute(_) => todo!(),
+ // peanuts::Error::UnqualifiedNamespace(_) => todo!(),
+ // peanuts::Error::MismatchedEndTag(name, name1) => todo!(),
+ // peanuts::Error::NotInElement(_) => todo!(),
+ // peanuts::Error::ExtraData(_) => todo!(),
+ // peanuts::Error::UndeclaredNamespace(_) => todo!(),
+ // peanuts::Error::IncorrectName(name) => todo!(),
+ // peanuts::Error::DeserializeError(_) => todo!(),
+ // peanuts::Error::Deserialize(deserialize_error) => todo!(),
+ // peanuts::Error::RootElementEnded => todo!(),
+ // }
+ // TODO: make sure this only happens when an end tag is received
+ if self.disconnecting == true {
+ break;
+ } else {
+ let _ = self.on_crash.send(ReadState { supervisor_control: self.supervisor_control, tasks: self.tasks });
+ }
+ break;
+ },
+ }
+ },
+ else => break
+ }
+ }
+ println!("stopping read thread");
+ self.logic.on_abort().await;
+ }
+}
+
+// what do stanza processes do?
+// - update ui
+// - access database
+// - disconnect proper, reconnect
+// - respond to server requests
+
+pub enum ReadControl {
+ Disconnect,
+ Abort(oneshot::Sender<ReadState>),
+}
+
+pub struct ReadControlHandle {
+ sender: mpsc::Sender<ReadControl>,
+ pub(crate) handle: JoinHandle<()>,
+}
+
+impl Deref for ReadControlHandle {
+ type Target = mpsc::Sender<ReadControl>;
+
+ fn deref(&self) -> &Self::Target {
+ &self.sender
+ }
+}
+
+impl DerefMut for ReadControlHandle {
+ fn deref_mut(&mut self) -> &mut Self::Target {
+ &mut self.sender
+ }
+}
+
+impl ReadControlHandle {
+ pub fn new<Lgc: Clone + Logic + Send + 'static>(
+ stream: BoundJabberReader<Tls>,
+ connected: Connected,
+ logic: Lgc,
+ supervisor_control: SupervisorSender,
+ on_crash: oneshot::Sender<ReadState>,
+ ) -> Self {
+ let (control_sender, control_receiver) = mpsc::channel(20);
+
+ let actor = Read::new(
+ stream,
+ JoinSet::new(),
+ connected,
+ logic,
+ supervisor_control,
+ control_receiver,
+ on_crash,
+ );
+ let handle = tokio::spawn(async move { actor.run().await });
+
+ Self {
+ sender: control_sender,
+ handle,
+ }
+ }
+
+ pub fn reconnect<Lgc: Clone + Logic + Send + 'static>(
+ stream: BoundJabberReader<Tls>,
+ tasks: JoinSet<()>,
+ connected: Connected,
+ logic: Lgc,
+ supervisor_control: SupervisorSender,
+ on_crash: oneshot::Sender<ReadState>,
+ ) -> Self {
+ let (control_sender, control_receiver) = mpsc::channel(20);
+
+ let actor = Read::new(
+ stream,
+ tasks,
+ connected,
+ logic,
+ supervisor_control,
+ control_receiver,
+ on_crash,
+ );
+ let handle = tokio::spawn(async move { actor.run().await });
+
+ Self {
+ sender: control_sender,
+ handle,
+ }
+ }
+}
diff --git a/lampada/src/connection/write.rs b/lampada/src/connection/write.rs
new file mode 100644
index 0000000..8f0c34b
--- /dev/null
+++ b/lampada/src/connection/write.rs
@@ -0,0 +1,258 @@
+use std::ops::{Deref, DerefMut};
+
+use luz::{connection::Tls, jabber_stream::bound_stream::BoundJabberWriter};
+use stanza::client::Stanza;
+use tokio::{
+ sync::{mpsc, oneshot},
+ task::JoinHandle,
+};
+
+use crate::error::WriteError;
+
+/// actor that receives jabber stanzas to write, and if there is an error, sends a message back to the supervisor then aborts, so the supervisor can spawn a new stream.
+pub struct Write {
+ stream: BoundJabberWriter<Tls>,
+
+ /// connection session write queue
+ stanza_receiver: mpsc::Receiver<WriteMessage>,
+
+ // control stuff
+ control_receiver: mpsc::Receiver<WriteControl>,
+ on_crash: oneshot::Sender<(WriteMessage, WriteState)>,
+}
+
+/// when a crash/abort occurs, this gets sent back to the supervisor, possibly with the current write that failed, so that the connection session can continue
+pub struct WriteState {
+ pub stanza_recv: mpsc::Receiver<WriteMessage>,
+}
+
+#[derive(Debug)]
+pub struct WriteMessage {
+ pub stanza: Stanza,
+ pub respond_to: oneshot::Sender<Result<(), WriteError>>,
+}
+
+pub enum WriteControl {
+ Disconnect,
+ Abort(oneshot::Sender<WriteState>),
+}
+
+impl Write {
+ fn new(
+ stream: BoundJabberWriter<Tls>,
+ stanza_receiver: mpsc::Receiver<WriteMessage>,
+ control_receiver: mpsc::Receiver<WriteControl>,
+ on_crash: oneshot::Sender<(WriteMessage, WriteState)>,
+ ) -> Self {
+ Self {
+ stream,
+ stanza_receiver,
+ control_receiver,
+ on_crash,
+ }
+ }
+
+ async fn write(&mut self, stanza: &Stanza) -> Result<(), peanuts::Error> {
+ Ok(self.stream.write(stanza).await?)
+ }
+
+ async fn run_reconnected(mut self, retry_msg: WriteMessage) {
+ // try to retry sending the message that failed to send previously
+ let result = self.stream.write(&retry_msg.stanza).await;
+ match result {
+ Err(e) => match &e {
+ peanuts::Error::ReadError(_error) => {
+ // make sure message is not lost from error, supervisor handles retry and reporting
+ // TODO: upon reconnect, make sure we are not stuck in a reconnection loop
+ let _ = self.on_crash.send((
+ retry_msg,
+ WriteState {
+ stanza_recv: self.stanza_receiver,
+ },
+ ));
+ return;
+ }
+ _ => {
+ let _ = retry_msg.respond_to.send(Err(e.into()));
+ }
+ },
+ _ => {
+ let _ = retry_msg.respond_to.send(Ok(()));
+ }
+ }
+ // return to normal loop
+ self.run().await
+ }
+
+ async fn run(mut self) {
+ loop {
+ tokio::select! {
+ Some(msg) = self.control_receiver.recv() => {
+ match msg {
+ WriteControl::Disconnect => {
+ // close the stanza_receiver channel and drain out all of the remaining stanzas to send
+ self.stanza_receiver.close();
+ // TODO: put this in some kind of function to avoid code duplication
+ while let Some(msg) = self.stanza_receiver.recv().await {
+ let result = self.stream.write(&msg.stanza).await;
+ match result {
+ Err(e) => match &e {
+ peanuts::Error::ReadError(_error) => {
+ // if connection lost during disconnection, just send lost connection error to the write requests
+ let _ = msg.respond_to.send(Err(WriteError::LostConnection));
+ while let Some(msg) = self.stanza_receiver.recv().await {
+ let _ = msg.respond_to.send(Err(WriteError::LostConnection));
+ }
+ break;
+ }
+ // otherwise complete sending all the stanzas currently in the queue
+ _ => {
+ let _ = msg.respond_to.send(Err(e.into()));
+ }
+ },
+ _ => {
+ let _ = msg.respond_to.send(Ok(()));
+ }
+ }
+ }
+ let _ = self.stream.try_close().await;
+ break;
+ },
+ // in case of abort, stream is already fucked, just send the receiver ready for a reconnection at the same resource
+ WriteControl::Abort(sender) => {
+ let _ = sender.send(WriteState { stanza_recv: self.stanza_receiver });
+ break;
+ },
+ }
+ },
+ Some(msg) = self.stanza_receiver.recv() => {
+ let result = self.stream.write(&msg.stanza).await;
+ match result {
+ Err(e) => match &e {
+ peanuts::Error::ReadError(_error) => {
+ // make sure message is not lost from error, supervisor handles retry and reporting
+ let _ = self.on_crash.send((msg, WriteState { stanza_recv: self.stanza_receiver }));
+ break;
+ }
+ _ => {
+ let _ = msg.respond_to.send(Err(e.into()));
+ }
+ },
+ _ => {
+ let _ = msg.respond_to.send(Ok(()));
+ }
+ }
+ },
+ else => break,
+ }
+ }
+ }
+}
+
+#[derive(Clone)]
+pub struct WriteHandle {
+ sender: mpsc::Sender<WriteMessage>,
+}
+
+impl WriteHandle {
+ pub async fn write(&self, stanza: Stanza) -> Result<(), WriteError> {
+ let (send, recv) = oneshot::channel();
+ self.send(WriteMessage {
+ stanza,
+ respond_to: send,
+ })
+ .await
+ .map_err(|e| WriteError::Actor(e.into()))?;
+ // TODO: timeout
+ recv.await.map_err(|e| WriteError::Actor(e.into()))?
+ }
+}
+
+impl Deref for WriteHandle {
+ type Target = mpsc::Sender<WriteMessage>;
+
+ fn deref(&self) -> &Self::Target {
+ &self.sender
+ }
+}
+
+impl DerefMut for WriteHandle {
+ fn deref_mut(&mut self) -> &mut Self::Target {
+ &mut self.sender
+ }
+}
+
+pub struct WriteControlHandle {
+ sender: mpsc::Sender<WriteControl>,
+ pub(crate) handle: JoinHandle<()>,
+}
+
+impl Deref for WriteControlHandle {
+ type Target = mpsc::Sender<WriteControl>;
+
+ fn deref(&self) -> &Self::Target {
+ &self.sender
+ }
+}
+
+impl DerefMut for WriteControlHandle {
+ fn deref_mut(&mut self) -> &mut Self::Target {
+ &mut self.sender
+ }
+}
+
+impl WriteControlHandle {
+ pub fn new(
+ stream: BoundJabberWriter<Tls>,
+ on_crash: oneshot::Sender<(WriteMessage, WriteState)>,
+ ) -> (WriteHandle, Self) {
+ let (control_sender, control_receiver) = mpsc::channel(20);
+ let (stanza_sender, stanza_receiver) = mpsc::channel(20);
+
+ let actor = Write::new(stream, stanza_receiver, control_receiver, on_crash);
+ let handle = tokio::spawn(async move { actor.run().await });
+
+ (
+ WriteHandle {
+ sender: stanza_sender,
+ },
+ Self {
+ sender: control_sender,
+ handle,
+ },
+ )
+ }
+
+ pub fn reconnect_retry(
+ stream: BoundJabberWriter<Tls>,
+ on_crash: oneshot::Sender<(WriteMessage, WriteState)>,
+ stanza_receiver: mpsc::Receiver<WriteMessage>,
+ retry_msg: WriteMessage,
+ ) -> Self {
+ let (control_sender, control_receiver) = mpsc::channel(20);
+
+ let actor = Write::new(stream, stanza_receiver, control_receiver, on_crash);
+ let handle = tokio::spawn(async move { actor.run_reconnected(retry_msg).await });
+
+ Self {
+ sender: control_sender,
+ handle,
+ }
+ }
+
+ pub fn reconnect(
+ stream: BoundJabberWriter<Tls>,
+ on_crash: oneshot::Sender<(WriteMessage, WriteState)>,
+ stanza_receiver: mpsc::Receiver<WriteMessage>,
+ ) -> Self {
+ let (control_sender, control_receiver) = mpsc::channel(20);
+
+ let actor = Write::new(stream, stanza_receiver, control_receiver, on_crash);
+ let handle = tokio::spawn(async move { actor.run().await });
+
+ Self {
+ sender: control_sender,
+ handle,
+ }
+ }
+}
diff --git a/lampada/src/error.rs b/lampada/src/error.rs
new file mode 100644
index 0000000..cdfb4db
--- /dev/null
+++ b/lampada/src/error.rs
@@ -0,0 +1,82 @@
+use std::sync::Arc;
+
+use stanza::client::Stanza;
+use thiserror::Error;
+use tokio::{
+ sync::{mpsc::error::SendError, oneshot::error::RecvError},
+ time::error::Elapsed,
+};
+
+#[derive(Debug, Error, Clone)]
+pub enum ConnectionError {
+ #[error("connection failed: {0}")]
+ ConnectionFailed(#[from] luz::Error),
+ #[error("already connected")]
+ AlreadyConnected,
+ #[error("already disconnected")]
+ AlreadyDisconnected,
+ #[error("lost connection")]
+ LostConnection,
+ // TODO: Display for Content
+ #[error("disconnected")]
+ Disconnected,
+}
+
+#[derive(Debug, Error, Clone)]
+pub enum CommandError<T> {
+ #[error("actor: {0}")]
+ Actor(ActorError),
+ #[error("{0}")]
+ Error(#[from] T),
+}
+
+#[derive(Debug, Error, Clone)]
+pub enum WriteError {
+ #[error("xml: {0}")]
+ XML(#[from] peanuts::Error),
+ #[error("lost connection")]
+ LostConnection,
+ // TODO: should this be in writeerror or separate?
+ #[error("actor: {0}")]
+ Actor(#[from] ActorError),
+ #[error("disconnected")]
+ Disconnected,
+}
+
+// TODO: separate peanuts read and write error?
+// TODO: which crate
+#[derive(Debug, Error, Clone)]
+pub enum ReadError {
+ #[error("xml: {0}")]
+ XML(#[from] peanuts::Error),
+ #[error("lost connection")]
+ LostConnection,
+}
+
+#[derive(Debug, Error, Clone)]
+pub enum ActorError {
+ #[error("receive timed out")]
+ Timeout,
+ #[error("could not send message to actor, channel closed")]
+ Send,
+ #[error("could not receive message from actor, channel closed")]
+ Receive,
+}
+
+impl From<Elapsed> for ActorError {
+ fn from(_e: Elapsed) -> Self {
+ Self::Timeout
+ }
+}
+
+impl<T> From<SendError<T>> for ActorError {
+ fn from(_e: SendError<T>) -> Self {
+ Self::Send
+ }
+}
+
+impl From<RecvError> for ActorError {
+ fn from(_e: RecvError) -> Self {
+ Self::Receive
+ }
+}
diff --git a/lampada/src/lib.rs b/lampada/src/lib.rs
new file mode 100644
index 0000000..c61c596
--- /dev/null
+++ b/lampada/src/lib.rs
@@ -0,0 +1,238 @@
+use std::{
+ collections::HashMap,
+ ops::{Deref, DerefMut},
+ str::FromStr,
+ sync::Arc,
+ time::Duration,
+};
+
+pub use connection::write::WriteMessage;
+pub use connection::SupervisorSender;
+use error::ConnectionError;
+use futures::{future::Fuse, FutureExt};
+use luz::JID;
+use stanza::client::{
+ iq::{self, Iq, IqType},
+ Stanza,
+};
+use tokio::{
+ sync::{mpsc, oneshot, Mutex},
+ task::JoinSet,
+ time::timeout,
+};
+use tracing::{debug, info};
+
+use crate::connection::write::WriteHandle;
+use crate::connection::{SupervisorCommand, SupervisorHandle};
+
+mod connection;
+pub mod error;
+
+#[derive(Clone)]
+pub struct Connected {
+ // full jid will stay stable across reconnections
+ jid: JID,
+ write_handle: WriteHandle,
+}
+
+impl Connected {
+ pub fn jid(&self) -> &JID {
+ &self.jid
+ }
+
+ pub fn write_handle(&self) -> &WriteHandle {
+ &self.write_handle
+ }
+}
+
+/// everything that a particular xmpp client must implement
+pub trait Logic {
+ /// the command message type
+ type Cmd;
+
+ /// run after binding to the stream (e.g. for a chat client, )
+ fn handle_connect(self, connection: Connected) -> impl std::future::Future<Output = ()> + Send;
+
+ /// run before closing the stream (e.g. send unavailable presence in a chat client)
+ fn handle_disconnect(
+ self,
+ connection: Connected,
+ ) -> impl std::future::Future<Output = ()> + Send;
+
+ /// run to handle an incoming xmpp stanza
+ fn handle_stanza(
+ self,
+ stanza: Stanza,
+ connection: Connected,
+ supervisor: SupervisorSender,
+ ) -> impl std::future::Future<Output = ()> + std::marker::Send;
+
+ /// run to handle a command message when a connection is currently established
+ fn handle_online(
+ self,
+ command: Self::Cmd,
+ connection: Connected,
+ ) -> impl std::future::Future<Output = ()> + std::marker::Send;
+
+ /// run to handle a command message when disconnected
+ fn handle_offline(
+ self,
+ command: Self::Cmd,
+ ) -> impl std::future::Future<Output = ()> + std::marker::Send;
+
+ /// run as cleanup after either an abort or a disconnect (e.g. reply to all pending requests with a disconnected error)
+ fn on_abort(self) -> impl std::future::Future<Output = ()> + std::marker::Send;
+
+ /// handle connection errors from the core client logic
+ fn handle_connection_error(
+ self,
+ error: ConnectionError,
+ ) -> impl std::future::Future<Output = ()> + std::marker::Send;
+
+ // async fn handle_stream_error(self, error) {}
+}
+
+/// an actor that implements xmpp core (rfc6120), manages connection/stream status, and delegates any other logic to the generic which implements Logic, allowing different kinds of clients (e.g. chat, social, pubsub) to be built upon the same core
+pub struct CoreClient<Lgc: Logic> {
+ jid: JID,
+ // TODO: use a dyn passwordprovider trait to avoid storing password in memory
+ password: Arc<String>,
+ receiver: mpsc::Receiver<CoreClientCommand<Lgc::Cmd>>,
+ connected: Option<(Connected, SupervisorHandle)>,
+ // TODO: will need to have an auto reconnect state as well (e.g. in case server shut down, to try and reconnect later)
+ // connected_intention: bool,
+ /// if connection was shut down due to e.g. server shutdown, supervisor must be able to mark client as disconnected
+ connection_supervisor_shutdown: Fuse<oneshot::Receiver<()>>,
+ logic: Lgc,
+ // config: LampConfig,
+ // TODO: will grow forever at this point, maybe not required as tasks will naturally shut down anyway?
+ tasks: JoinSet<()>,
+}
+
+impl<Lgc: Logic + Clone + Send + 'static> CoreClient<Lgc> {
+ /// create a new actor
+ pub fn new(
+ jid: JID,
+ password: String,
+ receiver: mpsc::Receiver<CoreClientCommand<Lgc::Cmd>>,
+ connected: Option<(Connected, SupervisorHandle)>,
+ connection_supervisor_shutdown: Fuse<oneshot::Receiver<()>>,
+ logic: Lgc,
+ ) -> Self {
+ Self {
+ jid,
+ password: Arc::new(password),
+ connected,
+ receiver,
+ connection_supervisor_shutdown,
+ logic,
+ tasks: JoinSet::new(),
+ }
+ }
+
+ /// run the actor
+ pub async fn run(mut self) {
+ loop {
+ let msg = tokio::select! {
+ // this is okay, as when created the supervisor (and connection) doesn't exist, but a bit messy
+ // THIS IS NOT OKAY LOLLLL - apparently fusing is the best option???
+ _ = &mut self.connection_supervisor_shutdown => {
+ self.connected = None;
+ continue;
+ }
+ Some(msg) = self.receiver.recv() => {
+ msg
+ },
+ else => break,
+ };
+ match msg {
+ CoreClientCommand::Connect => {
+ match self.connected {
+ Some(_) => {
+ self.logic
+ .clone()
+ .handle_connection_error(ConnectionError::AlreadyConnected)
+ .await;
+ }
+ None => {
+ let mut jid = self.jid.clone();
+ let mut domain = jid.domainpart.clone();
+ // TODO: check what happens upon reconnection with same resource (this is probably what one wants to do and why jid should be mutated from a bare jid to one with a resource)
+ let streams_result =
+ luz::connect_and_login(&mut jid, &*self.password, &mut domain)
+ .await;
+ match streams_result {
+ Ok(s) => {
+ debug!("ok stream result");
+ let (shutdown_send, shutdown_recv) = oneshot::channel::<()>();
+ let (writer, supervisor) = SupervisorHandle::new(
+ s,
+ shutdown_send,
+ jid.clone(),
+ self.password.clone(),
+ self.logic.clone(),
+ );
+
+ let shutdown_recv = shutdown_recv.fuse();
+ self.connection_supervisor_shutdown = shutdown_recv;
+
+ let connected = Connected {
+ jid,
+ write_handle: writer,
+ };
+
+ self.logic.clone().handle_connect(connected.clone()).await;
+
+ self.connected = Some((connected, supervisor));
+ }
+ Err(e) => {
+ tracing::error!("error: {}", e);
+ self.logic
+ .clone()
+ .handle_connection_error(ConnectionError::ConnectionFailed(
+ e.into(),
+ ))
+ .await;
+ }
+ }
+ }
+ };
+ }
+ CoreClientCommand::Disconnect => match self.connected {
+ None => {
+ self.logic
+ .clone()
+ .handle_connection_error(ConnectionError::AlreadyDisconnected)
+ .await;
+ }
+ ref mut c => {
+ if let Some((connected, supervisor_handle)) = c.take() {
+ let _ = supervisor_handle.send(SupervisorCommand::Disconnect).await;
+ } else {
+ unreachable!()
+ };
+ }
+ },
+ CoreClientCommand::Command(command) => {
+ match self.connected.as_ref() {
+ Some((w, s)) => self
+ .tasks
+ .spawn(self.logic.clone().handle_online(command, w.clone())),
+ None => self.tasks.spawn(self.logic.clone().handle_offline(command)),
+ };
+ }
+ }
+ }
+ }
+}
+
+// TODO: generate methods for each with a macro
+pub enum CoreClientCommand<C> {
+ // TODO: login invisible xep-0186
+ /// connect to XMPP chat server. gets roster and publishes initial presence.
+ Connect,
+ /// disconnect from XMPP chat server, sending unavailable presence then closing stream.
+ Disconnect,
+ /// TODO: generics
+ Command(C),
+}
diff --git a/lampada/src/main.rs b/lampada/src/main.rs
new file mode 100644
index 0000000..7b7469d
--- /dev/null
+++ b/lampada/src/main.rs
@@ -0,0 +1,42 @@
+use std::{path::Path, str::FromStr, time::Duration};
+
+use jid::JID;
+use lampada::{db::Db, CoreClientCommand, LuzHandle};
+use sqlx::SqlitePool;
+use tokio::{
+ io::{AsyncReadExt, AsyncWriteExt},
+ sync::oneshot,
+};
+use tracing::info;
+
+#[tokio::main]
+async fn main() {
+ tracing_subscriber::fmt::init();
+ let db = Db::create_connect_and_migrate(Path::new("./luz.db"))
+ .await
+ .unwrap();
+ let (luz, mut recv) =
+ LuzHandle::new("test@blos.sm".try_into().unwrap(), "slayed".to_string(), db);
+
+ tokio::spawn(async move {
+ while let Some(msg) = recv.recv().await {
+ info!("{:#?}", msg)
+ }
+ });
+
+ luz.send(CoreClientCommand::Connect).await.unwrap();
+ let (send, recv) = oneshot::channel();
+ tokio::time::sleep(Duration::from_secs(5)).await;
+ info!("sending message");
+ luz.send(CoreClientCommand::SendMessage(
+ JID::from_str("cel@blos.sm").unwrap(),
+ luz::chat::Body {
+ body: "hallo!!!".to_string(),
+ },
+ send,
+ ))
+ .await
+ .unwrap();
+ recv.await.unwrap().unwrap();
+ println!("sent message");
+}