diff options
author | 2025-03-26 14:29:40 +0000 | |
---|---|---|
committer | 2025-03-26 14:29:40 +0000 | |
commit | 2211f324782cdc617b4b5ecd071178e372539fe4 (patch) | |
tree | a5ea5ce11d748424447dee23173d3cb8aec648ea /lampada | |
parent | 2f8671978e18c1e1e7834056ae674f32fbde3868 (diff) | |
download | luz-2211f324782cdc617b4b5ecd071178e372539fe4.tar.gz luz-2211f324782cdc617b4b5ecd071178e372539fe4.tar.bz2 luz-2211f324782cdc617b4b5ecd071178e372539fe4.zip |
refactor: rename crates and move client logic to separate crate `filament`
Diffstat (limited to 'lampada')
-rw-r--r-- | lampada/.gitignore | 2 | ||||
-rw-r--r-- | lampada/Cargo.toml | 17 | ||||
-rw-r--r-- | lampada/README.md | 3 | ||||
-rw-r--r-- | lampada/scratch | 90 | ||||
-rw-r--r-- | lampada/src/connection/mod.rs | 374 | ||||
-rw-r--r-- | lampada/src/connection/read.rs | 233 | ||||
-rw-r--r-- | lampada/src/connection/write.rs | 258 | ||||
-rw-r--r-- | lampada/src/error.rs | 82 | ||||
-rw-r--r-- | lampada/src/lib.rs | 238 | ||||
-rw-r--r-- | lampada/src/main.rs | 42 |
10 files changed, 1339 insertions, 0 deletions
diff --git a/lampada/.gitignore b/lampada/.gitignore new file mode 100644 index 0000000..60868fd --- /dev/null +++ b/lampada/.gitignore @@ -0,0 +1,2 @@ +luz.db +.sqlx/ diff --git a/lampada/Cargo.toml b/lampada/Cargo.toml new file mode 100644 index 0000000..856fd7d --- /dev/null +++ b/lampada/Cargo.toml @@ -0,0 +1,17 @@ +[package] +name = "lampada" +version = "0.1.0" +edition = "2021" + +[dependencies] +futures = "0.3.31" +luz = { version = "0.1.0", path = "../luz" } +peanuts = { version = "0.1.0", path = "../../peanuts" } +jid = { version = "0.1.0", path = "../jid", features = ["sqlx"] } +stanza = { version = "0.1.0", path = "../stanza", features = ["xep_0203"] } +tokio = "1.42.0" +tokio-stream = "0.1.17" +tokio-util = "0.7.13" +tracing = "0.1.41" +tracing-subscriber = "0.3.19" +thiserror = "2.0.11" diff --git a/lampada/README.md b/lampada/README.md new file mode 100644 index 0000000..dc5f016 --- /dev/null +++ b/lampada/README.md @@ -0,0 +1,3 @@ +# lampada + +a core xmpp client that graciously manages streams, delegating logic to an implementor of a trait. diff --git a/lampada/scratch b/lampada/scratch new file mode 100644 index 0000000..e013ded --- /dev/null +++ b/lampada/scratch @@ -0,0 +1,90 @@ +macaw/céu +canopy/sol + +# logic: + +- db +- pending iqs +- ui update sender + + ## logic methods + + - handle_offline: called by lamp + - handle_online: called by lamp + - handle_stanza: called by read thread + + - handle_connect: called by lamp + - handle_disconnect: called by lamp + + - handle_error?: called by lamp handle threads and read thread for error logging + - handle_stream_error: called by supervisor when stream needs to be reset + + +# lamp: + +- login jid (bare or full) +- password provider +- lamp command receiver +- connected session state(connected, supervisorhandle to disconnect current connected session) +- connected state (if intended to be connected or not, for retrying reconnection every minute or something) +- on_crash for connection supervisor +- internal logic struct which has methods to handle logic commands + +# connected: + +- writehandle +- current full jid for connected session + +# supervisor: + +- control_recv +- read_thread_crash +- write_thread_crash +- read_control_handle +- write_control_handle +- on_crash +- connected +- password +- logic + +# read: + +must be passed around when crash +- supervisor_control +- tasks + +can be cloned from supervisor +- connected . +- logic . + +can be recreated by supervisor +- stream +- disconnecting +- disconnect_timedout +- on_crash +- control_recv + +# write: + +must be passed around when crash +- stanza_recv + +can be recreated by supervisor +- stream +- on_crash +- control_recv + + +message types: + +command: + +- getroster +- send message +- etc. + +lamp commands: + +- connect +- disconnect +- command(command) diff --git a/lampada/src/connection/mod.rs b/lampada/src/connection/mod.rs new file mode 100644 index 0000000..1e767b0 --- /dev/null +++ b/lampada/src/connection/mod.rs @@ -0,0 +1,374 @@ +// TODO: consider if this needs to be handled by a supervisor or could be handled by luz directly + +use std::{ + collections::HashMap, + ops::{Deref, DerefMut}, + sync::Arc, + time::Duration, +}; + +use jid::JID; +use luz::{connection::Tls, jabber_stream::bound_stream::BoundJabberStream}; +use read::{ReadControl, ReadControlHandle, ReadState}; +use stanza::client::Stanza; +use tokio::{ + sync::{mpsc, oneshot, Mutex}, + task::{JoinHandle, JoinSet}, +}; +use tracing::info; +use write::{WriteControl, WriteControlHandle, WriteHandle, WriteMessage, WriteState}; + +use crate::{ + error::{ConnectionError, WriteError}, + Connected, Logic, +}; + +mod read; +pub(crate) mod write; + +pub struct Supervisor<Lgc> { + command_recv: mpsc::Receiver<SupervisorCommand>, + reader_crash: oneshot::Receiver<ReadState>, + writer_crash: oneshot::Receiver<(WriteMessage, WriteState)>, + read_control_handle: ReadControlHandle, + write_control_handle: WriteControlHandle, + on_crash: oneshot::Sender<()>, + // jid in connected stays the same over the life of the supervisor (the connection session) + connected: Connected, + password: Arc<String>, + logic: Lgc, +} + +pub enum SupervisorCommand { + Disconnect, + // for if there was a stream error, require to reconnect + // couldn't stream errors just cause a crash? lol + Reconnect(ChildState), +} + +pub enum ChildState { + Write(WriteState), + Read(ReadState), +} + +impl<Lgc: Logic + Clone + Send + 'static> Supervisor<Lgc> { + fn new( + command_recv: mpsc::Receiver<SupervisorCommand>, + reader_crash: oneshot::Receiver<ReadState>, + writer_crash: oneshot::Receiver<(WriteMessage, WriteState)>, + read_control_handle: ReadControlHandle, + write_control_handle: WriteControlHandle, + on_crash: oneshot::Sender<()>, + connected: Connected, + password: Arc<String>, + logic: Lgc, + ) -> Self { + Self { + command_recv, + reader_crash, + writer_crash, + read_control_handle, + write_control_handle, + on_crash, + connected, + password, + logic, + } + } + + async fn run(mut self) { + loop { + tokio::select! { + Some(msg) = self.command_recv.recv() => { + match msg { + SupervisorCommand::Disconnect => { + info!("disconnecting"); + self.logic + .handle_disconnect(self.connected.clone()) + .await; + let _ = self.write_control_handle.send(WriteControl::Disconnect).await; + let _ = self.read_control_handle.send(ReadControl::Disconnect).await; + info!("sent disconnect command"); + tokio::select! { + _ = async { tokio::join!( + async { let _ = (&mut self.write_control_handle.handle).await; }, + async { let _ = (&mut self.read_control_handle.handle).await; } + ) } => {}, + // TODO: config timeout + _ = async { tokio::time::sleep(Duration::from_secs(5)) } => { + (&mut self.read_control_handle.handle).abort(); + (&mut self.write_control_handle.handle).abort(); + } + } + info!("disconnected"); + break; + }, + // TODO: Reconnect without aborting, gentle reconnect. + SupervisorCommand::Reconnect(state) => { + // TODO: please omfg + // send abort to read stream, as already done, consider + let (read_state, mut write_state); + match state { + ChildState::Write(receiver) => { + write_state = receiver; + let (send, recv) = oneshot::channel(); + let _ = self.read_control_handle.send(ReadControl::Abort(send)).await; + // TODO: need a tokio select, in case the state arrives from somewhere else + if let Ok(state) = recv.await { + read_state = state; + } else { + break + } + }, + ChildState::Read(read) => { + read_state = read; + let (send, recv) = oneshot::channel(); + let _ = self.write_control_handle.send(WriteControl::Abort(send)).await; + // TODO: need a tokio select, in case the state arrives from somewhere else + if let Ok(state) = recv.await { + write_state = state; + } else { + break + } + }, + } + + let mut jid = self.connected.jid.clone(); + let mut domain = jid.domainpart.clone(); + // TODO: make sure connect_and_login does not modify the jid, but instead returns a jid. or something like that + let connection = luz::connect_and_login(&mut jid, &*self.password, &mut domain).await; + match connection { + Ok(c) => { + let (read, write) = c.split(); + let (send, recv) = oneshot::channel(); + self.writer_crash = recv; + self.write_control_handle = + WriteControlHandle::reconnect(write, send, write_state.stanza_recv); + let (send, recv) = oneshot::channel(); + self.reader_crash = recv; + self.read_control_handle = ReadControlHandle::reconnect( + read, + read_state.tasks, + self.connected.clone(), + self.logic.clone(), + read_state.supervisor_control, + send, + ); + }, + Err(e) => { + // if reconnection failure, respond to all current write messages with lost connection error. the received processes should complete themselves. + write_state.stanza_recv.close(); + while let Some(msg) = write_state.stanza_recv.recv().await { + let _ = msg.respond_to.send(Err(WriteError::LostConnection)); + } + // TODO: is this the correct error? + self.logic.handle_connection_error(ConnectionError::LostConnection).await; + break; + }, + } + }, + } + }, + Ok((write_msg, mut write_state)) = &mut self.writer_crash => { + // consider awaiting/aborting the read and write threads + let (send, recv) = oneshot::channel(); + let _ = self.read_control_handle.send(ReadControl::Abort(send)).await; + let read_state = tokio::select! { + Ok(s) = recv => s, + Ok(s) = &mut self.reader_crash => s, + // in case, just break as irrecoverable + else => break, + }; + + let mut jid = self.connected.jid.clone(); + let mut domain = jid.domainpart.clone(); + // TODO: same here + let connection = luz::connect_and_login(&mut jid, &*self.password, &mut domain).await; + match connection { + Ok(c) => { + let (read, write) = c.split(); + let (send, recv) = oneshot::channel(); + self.writer_crash = recv; + self.write_control_handle = + WriteControlHandle::reconnect_retry(write, send, write_state.stanza_recv, write_msg); + let (send, recv) = oneshot::channel(); + self.reader_crash = recv; + self.read_control_handle = ReadControlHandle::reconnect( + read, + read_state.tasks, + self.connected.clone(), + self.logic.clone(), + read_state.supervisor_control, + send, + ); + }, + Err(e) => { + // if reconnection failure, respond to all current write messages with lost connection error. the received processes should complete themselves. + write_state.stanza_recv.close(); + let _ = write_msg.respond_to.send(Err(WriteError::LostConnection)); + while let Some(msg) = write_state.stanza_recv.recv().await { + let _ = msg.respond_to.send(Err(WriteError::LostConnection)); + } + // TODO: is this the correct error to send? + self.logic.handle_connection_error(ConnectionError::LostConnection).await; + break; + }, + } + }, + Ok(read_state) = &mut self.reader_crash => { + let (send, recv) = oneshot::channel(); + let _ = self.write_control_handle.send(WriteControl::Abort(send)).await; + let (retry_msg, mut write_state) = tokio::select! { + Ok(s) = recv => (None, s), + Ok(s) = &mut self.writer_crash => (Some(s.0), s.1), + // in case, just break as irrecoverable + else => break, + }; + + let mut jid = self.connected.jid.clone(); + let mut domain = jid.domainpart.clone(); + let connection = luz::connect_and_login(&mut jid, &*self.password, &mut domain).await; + match connection { + Ok(c) => { + let (read, write) = c.split(); + let (send, recv) = oneshot::channel(); + self.writer_crash = recv; + if let Some(msg) = retry_msg { + self.write_control_handle = + WriteControlHandle::reconnect_retry(write, send, write_state.stanza_recv, msg); + } else { + self.write_control_handle = WriteControlHandle::reconnect(write, send, write_state.stanza_recv) + } + let (send, recv) = oneshot::channel(); + self.reader_crash = recv; + self.read_control_handle = ReadControlHandle::reconnect( + read, + read_state.tasks, + self.connected.clone(), + self.logic.clone(), + read_state.supervisor_control, + send, + ); + }, + Err(e) => { + // if reconnection failure, respond to all current messages with lost connection error. + write_state.stanza_recv.close(); + if let Some(msg) = retry_msg { + msg.respond_to.send(Err(WriteError::LostConnection)); + } + while let Some(msg) = write_state.stanza_recv.recv().await { + msg.respond_to.send(Err(WriteError::LostConnection)); + } + // TODO: is this the correct error? + self.logic.handle_connection_error(ConnectionError::LostConnection).await; + break; + }, + } + }, + else => break, + } + } + // TODO: maybe don't just on_crash + let _ = self.on_crash.send(()); + } +} + +pub struct SupervisorHandle { + sender: SupervisorSender, + handle: JoinHandle<()>, +} + +impl Deref for SupervisorHandle { + type Target = SupervisorSender; + + fn deref(&self) -> &Self::Target { + &self.sender + } +} + +impl DerefMut for SupervisorHandle { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.sender + } +} + +#[derive(Clone)] +pub struct SupervisorSender { + sender: mpsc::Sender<SupervisorCommand>, +} + +impl Deref for SupervisorSender { + type Target = mpsc::Sender<SupervisorCommand>; + + fn deref(&self) -> &Self::Target { + &self.sender + } +} + +impl DerefMut for SupervisorSender { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.sender + } +} + +impl SupervisorHandle { + pub fn new<Lgc: Logic + Clone + Send + 'static>( + streams: BoundJabberStream<Tls>, + on_crash: oneshot::Sender<()>, + jid: JID, + password: Arc<String>, + logic: Lgc, + ) -> (WriteHandle, Self) { + let (command_send, command_recv) = mpsc::channel(20); + let (writer_crash_send, writer_crash_recv) = oneshot::channel(); + let (reader_crash_send, reader_crash_recv) = oneshot::channel(); + + let (read_stream, write_stream) = streams.split(); + + let (write_handle, write_control_handle) = + WriteControlHandle::new(write_stream, writer_crash_send); + + let connected = Connected { + jid, + write_handle: write_handle.clone(), + }; + + let supervisor_sender = SupervisorSender { + sender: command_send, + }; + + let read_control_handle = ReadControlHandle::new( + read_stream, + connected.clone(), + logic.clone(), + supervisor_sender.clone(), + reader_crash_send, + ); + + let actor = Supervisor::new( + command_recv, + reader_crash_recv, + writer_crash_recv, + read_control_handle, + write_control_handle, + on_crash, + connected, + password, + logic, + ); + + let handle = tokio::spawn(async move { actor.run().await }); + + ( + write_handle, + Self { + sender: supervisor_sender, + handle, + }, + ) + } + + pub fn sender(&self) -> SupervisorSender { + self.sender.clone() + } +} diff --git a/lampada/src/connection/read.rs b/lampada/src/connection/read.rs new file mode 100644 index 0000000..cc69387 --- /dev/null +++ b/lampada/src/connection/read.rs @@ -0,0 +1,233 @@ +use std::{ + collections::HashMap, + marker::PhantomData, + ops::{Deref, DerefMut}, + str::FromStr, + sync::Arc, + time::Duration, +}; + +use luz::{connection::Tls, jabber_stream::bound_stream::BoundJabberReader}; +use stanza::client::Stanza; +use tokio::{ + sync::{mpsc, oneshot, Mutex}, + task::{JoinHandle, JoinSet}, +}; +use tracing::info; + +use crate::{Connected, Logic}; + +use super::{write::WriteHandle, SupervisorCommand, SupervisorSender}; + +/// read actor +pub struct Read<Lgc> { + stream: BoundJabberReader<Tls>, + disconnecting: bool, + disconnect_timedout: oneshot::Receiver<()>, + + // all the threads spawned by the current connection session + tasks: JoinSet<()>, + + // for handling incoming stanzas + // jabber server must be able to both terminate the connection from error, and ask for data from the client (such as supported XEPs) + connected: Connected, + logic: Lgc, + supervisor_control: SupervisorSender, + + // control stuff + control_receiver: mpsc::Receiver<ReadControl>, + on_crash: oneshot::Sender<ReadState>, +} + +/// when a crash/abort occurs, this gets sent back to the supervisor, so that the connection session can continue +pub struct ReadState { + pub supervisor_control: SupervisorSender, + // TODO: when a stream dies, the iq gets from the server should not be replied to on the new stream + pub tasks: JoinSet<()>, +} + +impl<Lgc> Read<Lgc> { + fn new( + stream: BoundJabberReader<Tls>, + tasks: JoinSet<()>, + connected: Connected, + logic: Lgc, + supervisor_control: SupervisorSender, + control_receiver: mpsc::Receiver<ReadControl>, + on_crash: oneshot::Sender<ReadState>, + ) -> Self { + let (_send, recv) = oneshot::channel(); + Self { + stream, + disconnecting: false, + disconnect_timedout: recv, + tasks, + connected, + logic, + supervisor_control, + control_receiver, + on_crash, + } + } +} + +impl<Lgc: Clone + Logic + Send + 'static> Read<Lgc> { + async fn run(mut self) { + println!("started read thread"); + // let stanza = self.stream.read::<Stanza>().await; + // println!("{:?}", stanza); + loop { + tokio::select! { + // if still haven't received the end tag in time, just kill itself + // TODO: is this okay??? what if notification thread dies? + Ok(()) = &mut self.disconnect_timedout => { + info!("disconnect_timedout"); + break; + } + Some(msg) = self.control_receiver.recv() => { + match msg { + // when disconnect received, + ReadControl::Disconnect => { + let (send, recv) = oneshot::channel(); + self.disconnect_timedout = recv; + self.disconnecting = true; + tokio::spawn(async { + tokio::time::sleep(Duration::from_secs(10)).await; + let _ = send.send(()); + }) + }, + ReadControl::Abort(sender) => { + let _ = sender.send(ReadState { supervisor_control: self.supervisor_control, tasks: self.tasks }); + break; + }, + }; + }, + s = self.stream.read::<Stanza>() => { + println!("read stanza"); + match s { + Ok(s) => { + self.tasks.spawn(self.logic.clone().handle_stanza(s, self.connected.clone(), self.supervisor_control.clone())); + }, + Err(e) => { + println!("error: {:?}", e); + // TODO: NEXT write the correct error stanza depending on error, decide whether to reconnect or properly disconnect, depending on if disconnecting is true + // match e { + // peanuts::Error::ReadError(error) => todo!(), + // peanuts::Error::Utf8Error(utf8_error) => todo!(), + // peanuts::Error::ParseError(_) => todo!(), + // peanuts::Error::EntityProcessError(_) => todo!(), + // peanuts::Error::InvalidCharRef(_) => todo!(), + // peanuts::Error::DuplicateNameSpaceDeclaration(namespace_declaration) => todo!(), + // peanuts::Error::DuplicateAttribute(_) => todo!(), + // peanuts::Error::UnqualifiedNamespace(_) => todo!(), + // peanuts::Error::MismatchedEndTag(name, name1) => todo!(), + // peanuts::Error::NotInElement(_) => todo!(), + // peanuts::Error::ExtraData(_) => todo!(), + // peanuts::Error::UndeclaredNamespace(_) => todo!(), + // peanuts::Error::IncorrectName(name) => todo!(), + // peanuts::Error::DeserializeError(_) => todo!(), + // peanuts::Error::Deserialize(deserialize_error) => todo!(), + // peanuts::Error::RootElementEnded => todo!(), + // } + // TODO: make sure this only happens when an end tag is received + if self.disconnecting == true { + break; + } else { + let _ = self.on_crash.send(ReadState { supervisor_control: self.supervisor_control, tasks: self.tasks }); + } + break; + }, + } + }, + else => break + } + } + println!("stopping read thread"); + self.logic.on_abort().await; + } +} + +// what do stanza processes do? +// - update ui +// - access database +// - disconnect proper, reconnect +// - respond to server requests + +pub enum ReadControl { + Disconnect, + Abort(oneshot::Sender<ReadState>), +} + +pub struct ReadControlHandle { + sender: mpsc::Sender<ReadControl>, + pub(crate) handle: JoinHandle<()>, +} + +impl Deref for ReadControlHandle { + type Target = mpsc::Sender<ReadControl>; + + fn deref(&self) -> &Self::Target { + &self.sender + } +} + +impl DerefMut for ReadControlHandle { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.sender + } +} + +impl ReadControlHandle { + pub fn new<Lgc: Clone + Logic + Send + 'static>( + stream: BoundJabberReader<Tls>, + connected: Connected, + logic: Lgc, + supervisor_control: SupervisorSender, + on_crash: oneshot::Sender<ReadState>, + ) -> Self { + let (control_sender, control_receiver) = mpsc::channel(20); + + let actor = Read::new( + stream, + JoinSet::new(), + connected, + logic, + supervisor_control, + control_receiver, + on_crash, + ); + let handle = tokio::spawn(async move { actor.run().await }); + + Self { + sender: control_sender, + handle, + } + } + + pub fn reconnect<Lgc: Clone + Logic + Send + 'static>( + stream: BoundJabberReader<Tls>, + tasks: JoinSet<()>, + connected: Connected, + logic: Lgc, + supervisor_control: SupervisorSender, + on_crash: oneshot::Sender<ReadState>, + ) -> Self { + let (control_sender, control_receiver) = mpsc::channel(20); + + let actor = Read::new( + stream, + tasks, + connected, + logic, + supervisor_control, + control_receiver, + on_crash, + ); + let handle = tokio::spawn(async move { actor.run().await }); + + Self { + sender: control_sender, + handle, + } + } +} diff --git a/lampada/src/connection/write.rs b/lampada/src/connection/write.rs new file mode 100644 index 0000000..8f0c34b --- /dev/null +++ b/lampada/src/connection/write.rs @@ -0,0 +1,258 @@ +use std::ops::{Deref, DerefMut}; + +use luz::{connection::Tls, jabber_stream::bound_stream::BoundJabberWriter}; +use stanza::client::Stanza; +use tokio::{ + sync::{mpsc, oneshot}, + task::JoinHandle, +}; + +use crate::error::WriteError; + +/// actor that receives jabber stanzas to write, and if there is an error, sends a message back to the supervisor then aborts, so the supervisor can spawn a new stream. +pub struct Write { + stream: BoundJabberWriter<Tls>, + + /// connection session write queue + stanza_receiver: mpsc::Receiver<WriteMessage>, + + // control stuff + control_receiver: mpsc::Receiver<WriteControl>, + on_crash: oneshot::Sender<(WriteMessage, WriteState)>, +} + +/// when a crash/abort occurs, this gets sent back to the supervisor, possibly with the current write that failed, so that the connection session can continue +pub struct WriteState { + pub stanza_recv: mpsc::Receiver<WriteMessage>, +} + +#[derive(Debug)] +pub struct WriteMessage { + pub stanza: Stanza, + pub respond_to: oneshot::Sender<Result<(), WriteError>>, +} + +pub enum WriteControl { + Disconnect, + Abort(oneshot::Sender<WriteState>), +} + +impl Write { + fn new( + stream: BoundJabberWriter<Tls>, + stanza_receiver: mpsc::Receiver<WriteMessage>, + control_receiver: mpsc::Receiver<WriteControl>, + on_crash: oneshot::Sender<(WriteMessage, WriteState)>, + ) -> Self { + Self { + stream, + stanza_receiver, + control_receiver, + on_crash, + } + } + + async fn write(&mut self, stanza: &Stanza) -> Result<(), peanuts::Error> { + Ok(self.stream.write(stanza).await?) + } + + async fn run_reconnected(mut self, retry_msg: WriteMessage) { + // try to retry sending the message that failed to send previously + let result = self.stream.write(&retry_msg.stanza).await; + match result { + Err(e) => match &e { + peanuts::Error::ReadError(_error) => { + // make sure message is not lost from error, supervisor handles retry and reporting + // TODO: upon reconnect, make sure we are not stuck in a reconnection loop + let _ = self.on_crash.send(( + retry_msg, + WriteState { + stanza_recv: self.stanza_receiver, + }, + )); + return; + } + _ => { + let _ = retry_msg.respond_to.send(Err(e.into())); + } + }, + _ => { + let _ = retry_msg.respond_to.send(Ok(())); + } + } + // return to normal loop + self.run().await + } + + async fn run(mut self) { + loop { + tokio::select! { + Some(msg) = self.control_receiver.recv() => { + match msg { + WriteControl::Disconnect => { + // close the stanza_receiver channel and drain out all of the remaining stanzas to send + self.stanza_receiver.close(); + // TODO: put this in some kind of function to avoid code duplication + while let Some(msg) = self.stanza_receiver.recv().await { + let result = self.stream.write(&msg.stanza).await; + match result { + Err(e) => match &e { + peanuts::Error::ReadError(_error) => { + // if connection lost during disconnection, just send lost connection error to the write requests + let _ = msg.respond_to.send(Err(WriteError::LostConnection)); + while let Some(msg) = self.stanza_receiver.recv().await { + let _ = msg.respond_to.send(Err(WriteError::LostConnection)); + } + break; + } + // otherwise complete sending all the stanzas currently in the queue + _ => { + let _ = msg.respond_to.send(Err(e.into())); + } + }, + _ => { + let _ = msg.respond_to.send(Ok(())); + } + } + } + let _ = self.stream.try_close().await; + break; + }, + // in case of abort, stream is already fucked, just send the receiver ready for a reconnection at the same resource + WriteControl::Abort(sender) => { + let _ = sender.send(WriteState { stanza_recv: self.stanza_receiver }); + break; + }, + } + }, + Some(msg) = self.stanza_receiver.recv() => { + let result = self.stream.write(&msg.stanza).await; + match result { + Err(e) => match &e { + peanuts::Error::ReadError(_error) => { + // make sure message is not lost from error, supervisor handles retry and reporting + let _ = self.on_crash.send((msg, WriteState { stanza_recv: self.stanza_receiver })); + break; + } + _ => { + let _ = msg.respond_to.send(Err(e.into())); + } + }, + _ => { + let _ = msg.respond_to.send(Ok(())); + } + } + }, + else => break, + } + } + } +} + +#[derive(Clone)] +pub struct WriteHandle { + sender: mpsc::Sender<WriteMessage>, +} + +impl WriteHandle { + pub async fn write(&self, stanza: Stanza) -> Result<(), WriteError> { + let (send, recv) = oneshot::channel(); + self.send(WriteMessage { + stanza, + respond_to: send, + }) + .await + .map_err(|e| WriteError::Actor(e.into()))?; + // TODO: timeout + recv.await.map_err(|e| WriteError::Actor(e.into()))? + } +} + +impl Deref for WriteHandle { + type Target = mpsc::Sender<WriteMessage>; + + fn deref(&self) -> &Self::Target { + &self.sender + } +} + +impl DerefMut for WriteHandle { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.sender + } +} + +pub struct WriteControlHandle { + sender: mpsc::Sender<WriteControl>, + pub(crate) handle: JoinHandle<()>, +} + +impl Deref for WriteControlHandle { + type Target = mpsc::Sender<WriteControl>; + + fn deref(&self) -> &Self::Target { + &self.sender + } +} + +impl DerefMut for WriteControlHandle { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.sender + } +} + +impl WriteControlHandle { + pub fn new( + stream: BoundJabberWriter<Tls>, + on_crash: oneshot::Sender<(WriteMessage, WriteState)>, + ) -> (WriteHandle, Self) { + let (control_sender, control_receiver) = mpsc::channel(20); + let (stanza_sender, stanza_receiver) = mpsc::channel(20); + + let actor = Write::new(stream, stanza_receiver, control_receiver, on_crash); + let handle = tokio::spawn(async move { actor.run().await }); + + ( + WriteHandle { + sender: stanza_sender, + }, + Self { + sender: control_sender, + handle, + }, + ) + } + + pub fn reconnect_retry( + stream: BoundJabberWriter<Tls>, + on_crash: oneshot::Sender<(WriteMessage, WriteState)>, + stanza_receiver: mpsc::Receiver<WriteMessage>, + retry_msg: WriteMessage, + ) -> Self { + let (control_sender, control_receiver) = mpsc::channel(20); + + let actor = Write::new(stream, stanza_receiver, control_receiver, on_crash); + let handle = tokio::spawn(async move { actor.run_reconnected(retry_msg).await }); + + Self { + sender: control_sender, + handle, + } + } + + pub fn reconnect( + stream: BoundJabberWriter<Tls>, + on_crash: oneshot::Sender<(WriteMessage, WriteState)>, + stanza_receiver: mpsc::Receiver<WriteMessage>, + ) -> Self { + let (control_sender, control_receiver) = mpsc::channel(20); + + let actor = Write::new(stream, stanza_receiver, control_receiver, on_crash); + let handle = tokio::spawn(async move { actor.run().await }); + + Self { + sender: control_sender, + handle, + } + } +} diff --git a/lampada/src/error.rs b/lampada/src/error.rs new file mode 100644 index 0000000..cdfb4db --- /dev/null +++ b/lampada/src/error.rs @@ -0,0 +1,82 @@ +use std::sync::Arc; + +use stanza::client::Stanza; +use thiserror::Error; +use tokio::{ + sync::{mpsc::error::SendError, oneshot::error::RecvError}, + time::error::Elapsed, +}; + +#[derive(Debug, Error, Clone)] +pub enum ConnectionError { + #[error("connection failed: {0}")] + ConnectionFailed(#[from] luz::Error), + #[error("already connected")] + AlreadyConnected, + #[error("already disconnected")] + AlreadyDisconnected, + #[error("lost connection")] + LostConnection, + // TODO: Display for Content + #[error("disconnected")] + Disconnected, +} + +#[derive(Debug, Error, Clone)] +pub enum CommandError<T> { + #[error("actor: {0}")] + Actor(ActorError), + #[error("{0}")] + Error(#[from] T), +} + +#[derive(Debug, Error, Clone)] +pub enum WriteError { + #[error("xml: {0}")] + XML(#[from] peanuts::Error), + #[error("lost connection")] + LostConnection, + // TODO: should this be in writeerror or separate? + #[error("actor: {0}")] + Actor(#[from] ActorError), + #[error("disconnected")] + Disconnected, +} + +// TODO: separate peanuts read and write error? +// TODO: which crate +#[derive(Debug, Error, Clone)] +pub enum ReadError { + #[error("xml: {0}")] + XML(#[from] peanuts::Error), + #[error("lost connection")] + LostConnection, +} + +#[derive(Debug, Error, Clone)] +pub enum ActorError { + #[error("receive timed out")] + Timeout, + #[error("could not send message to actor, channel closed")] + Send, + #[error("could not receive message from actor, channel closed")] + Receive, +} + +impl From<Elapsed> for ActorError { + fn from(_e: Elapsed) -> Self { + Self::Timeout + } +} + +impl<T> From<SendError<T>> for ActorError { + fn from(_e: SendError<T>) -> Self { + Self::Send + } +} + +impl From<RecvError> for ActorError { + fn from(_e: RecvError) -> Self { + Self::Receive + } +} diff --git a/lampada/src/lib.rs b/lampada/src/lib.rs new file mode 100644 index 0000000..c61c596 --- /dev/null +++ b/lampada/src/lib.rs @@ -0,0 +1,238 @@ +use std::{ + collections::HashMap, + ops::{Deref, DerefMut}, + str::FromStr, + sync::Arc, + time::Duration, +}; + +pub use connection::write::WriteMessage; +pub use connection::SupervisorSender; +use error::ConnectionError; +use futures::{future::Fuse, FutureExt}; +use luz::JID; +use stanza::client::{ + iq::{self, Iq, IqType}, + Stanza, +}; +use tokio::{ + sync::{mpsc, oneshot, Mutex}, + task::JoinSet, + time::timeout, +}; +use tracing::{debug, info}; + +use crate::connection::write::WriteHandle; +use crate::connection::{SupervisorCommand, SupervisorHandle}; + +mod connection; +pub mod error; + +#[derive(Clone)] +pub struct Connected { + // full jid will stay stable across reconnections + jid: JID, + write_handle: WriteHandle, +} + +impl Connected { + pub fn jid(&self) -> &JID { + &self.jid + } + + pub fn write_handle(&self) -> &WriteHandle { + &self.write_handle + } +} + +/// everything that a particular xmpp client must implement +pub trait Logic { + /// the command message type + type Cmd; + + /// run after binding to the stream (e.g. for a chat client, ) + fn handle_connect(self, connection: Connected) -> impl std::future::Future<Output = ()> + Send; + + /// run before closing the stream (e.g. send unavailable presence in a chat client) + fn handle_disconnect( + self, + connection: Connected, + ) -> impl std::future::Future<Output = ()> + Send; + + /// run to handle an incoming xmpp stanza + fn handle_stanza( + self, + stanza: Stanza, + connection: Connected, + supervisor: SupervisorSender, + ) -> impl std::future::Future<Output = ()> + std::marker::Send; + + /// run to handle a command message when a connection is currently established + fn handle_online( + self, + command: Self::Cmd, + connection: Connected, + ) -> impl std::future::Future<Output = ()> + std::marker::Send; + + /// run to handle a command message when disconnected + fn handle_offline( + self, + command: Self::Cmd, + ) -> impl std::future::Future<Output = ()> + std::marker::Send; + + /// run as cleanup after either an abort or a disconnect (e.g. reply to all pending requests with a disconnected error) + fn on_abort(self) -> impl std::future::Future<Output = ()> + std::marker::Send; + + /// handle connection errors from the core client logic + fn handle_connection_error( + self, + error: ConnectionError, + ) -> impl std::future::Future<Output = ()> + std::marker::Send; + + // async fn handle_stream_error(self, error) {} +} + +/// an actor that implements xmpp core (rfc6120), manages connection/stream status, and delegates any other logic to the generic which implements Logic, allowing different kinds of clients (e.g. chat, social, pubsub) to be built upon the same core +pub struct CoreClient<Lgc: Logic> { + jid: JID, + // TODO: use a dyn passwordprovider trait to avoid storing password in memory + password: Arc<String>, + receiver: mpsc::Receiver<CoreClientCommand<Lgc::Cmd>>, + connected: Option<(Connected, SupervisorHandle)>, + // TODO: will need to have an auto reconnect state as well (e.g. in case server shut down, to try and reconnect later) + // connected_intention: bool, + /// if connection was shut down due to e.g. server shutdown, supervisor must be able to mark client as disconnected + connection_supervisor_shutdown: Fuse<oneshot::Receiver<()>>, + logic: Lgc, + // config: LampConfig, + // TODO: will grow forever at this point, maybe not required as tasks will naturally shut down anyway? + tasks: JoinSet<()>, +} + +impl<Lgc: Logic + Clone + Send + 'static> CoreClient<Lgc> { + /// create a new actor + pub fn new( + jid: JID, + password: String, + receiver: mpsc::Receiver<CoreClientCommand<Lgc::Cmd>>, + connected: Option<(Connected, SupervisorHandle)>, + connection_supervisor_shutdown: Fuse<oneshot::Receiver<()>>, + logic: Lgc, + ) -> Self { + Self { + jid, + password: Arc::new(password), + connected, + receiver, + connection_supervisor_shutdown, + logic, + tasks: JoinSet::new(), + } + } + + /// run the actor + pub async fn run(mut self) { + loop { + let msg = tokio::select! { + // this is okay, as when created the supervisor (and connection) doesn't exist, but a bit messy + // THIS IS NOT OKAY LOLLLL - apparently fusing is the best option??? + _ = &mut self.connection_supervisor_shutdown => { + self.connected = None; + continue; + } + Some(msg) = self.receiver.recv() => { + msg + }, + else => break, + }; + match msg { + CoreClientCommand::Connect => { + match self.connected { + Some(_) => { + self.logic + .clone() + .handle_connection_error(ConnectionError::AlreadyConnected) + .await; + } + None => { + let mut jid = self.jid.clone(); + let mut domain = jid.domainpart.clone(); + // TODO: check what happens upon reconnection with same resource (this is probably what one wants to do and why jid should be mutated from a bare jid to one with a resource) + let streams_result = + luz::connect_and_login(&mut jid, &*self.password, &mut domain) + .await; + match streams_result { + Ok(s) => { + debug!("ok stream result"); + let (shutdown_send, shutdown_recv) = oneshot::channel::<()>(); + let (writer, supervisor) = SupervisorHandle::new( + s, + shutdown_send, + jid.clone(), + self.password.clone(), + self.logic.clone(), + ); + + let shutdown_recv = shutdown_recv.fuse(); + self.connection_supervisor_shutdown = shutdown_recv; + + let connected = Connected { + jid, + write_handle: writer, + }; + + self.logic.clone().handle_connect(connected.clone()).await; + + self.connected = Some((connected, supervisor)); + } + Err(e) => { + tracing::error!("error: {}", e); + self.logic + .clone() + .handle_connection_error(ConnectionError::ConnectionFailed( + e.into(), + )) + .await; + } + } + } + }; + } + CoreClientCommand::Disconnect => match self.connected { + None => { + self.logic + .clone() + .handle_connection_error(ConnectionError::AlreadyDisconnected) + .await; + } + ref mut c => { + if let Some((connected, supervisor_handle)) = c.take() { + let _ = supervisor_handle.send(SupervisorCommand::Disconnect).await; + } else { + unreachable!() + }; + } + }, + CoreClientCommand::Command(command) => { + match self.connected.as_ref() { + Some((w, s)) => self + .tasks + .spawn(self.logic.clone().handle_online(command, w.clone())), + None => self.tasks.spawn(self.logic.clone().handle_offline(command)), + }; + } + } + } + } +} + +// TODO: generate methods for each with a macro +pub enum CoreClientCommand<C> { + // TODO: login invisible xep-0186 + /// connect to XMPP chat server. gets roster and publishes initial presence. + Connect, + /// disconnect from XMPP chat server, sending unavailable presence then closing stream. + Disconnect, + /// TODO: generics + Command(C), +} diff --git a/lampada/src/main.rs b/lampada/src/main.rs new file mode 100644 index 0000000..7b7469d --- /dev/null +++ b/lampada/src/main.rs @@ -0,0 +1,42 @@ +use std::{path::Path, str::FromStr, time::Duration}; + +use jid::JID; +use lampada::{db::Db, CoreClientCommand, LuzHandle}; +use sqlx::SqlitePool; +use tokio::{ + io::{AsyncReadExt, AsyncWriteExt}, + sync::oneshot, +}; +use tracing::info; + +#[tokio::main] +async fn main() { + tracing_subscriber::fmt::init(); + let db = Db::create_connect_and_migrate(Path::new("./luz.db")) + .await + .unwrap(); + let (luz, mut recv) = + LuzHandle::new("test@blos.sm".try_into().unwrap(), "slayed".to_string(), db); + + tokio::spawn(async move { + while let Some(msg) = recv.recv().await { + info!("{:#?}", msg) + } + }); + + luz.send(CoreClientCommand::Connect).await.unwrap(); + let (send, recv) = oneshot::channel(); + tokio::time::sleep(Duration::from_secs(5)).await; + info!("sending message"); + luz.send(CoreClientCommand::SendMessage( + JID::from_str("cel@blos.sm").unwrap(), + luz::chat::Body { + body: "hallo!!!".to_string(), + }, + send, + )) + .await + .unwrap(); + recv.await.unwrap().unwrap(); + println!("sent message"); +} |