aboutsummaryrefslogtreecommitdiffstats
path: root/lampada/src
diff options
context:
space:
mode:
authorLibravatar cel 🌸 <cel@bunny.garden>2025-03-26 14:29:40 +0000
committerLibravatar cel 🌸 <cel@bunny.garden>2025-03-26 14:29:40 +0000
commit2211f324782cdc617b4b5ecd071178e372539fe4 (patch)
treea5ea5ce11d748424447dee23173d3cb8aec648ea /lampada/src
parent2f8671978e18c1e1e7834056ae674f32fbde3868 (diff)
downloadluz-2211f324782cdc617b4b5ecd071178e372539fe4.tar.gz
luz-2211f324782cdc617b4b5ecd071178e372539fe4.tar.bz2
luz-2211f324782cdc617b4b5ecd071178e372539fe4.zip
refactor: rename crates and move client logic to separate crate `filament`
Diffstat (limited to '')
-rw-r--r--lampada/src/connection/mod.rs (renamed from luz/src/connection/mod.rs)19
-rw-r--r--lampada/src/connection/read.rs (renamed from luz/src/connection/read.rs)15
-rw-r--r--lampada/src/connection/write.rs (renamed from luz/src/connection/write.rs)2
-rw-r--r--lampada/src/error.rs82
-rw-r--r--lampada/src/lib.rs238
-rw-r--r--lampada/src/main.rs (renamed from luz/src/main.rs)6
6 files changed, 337 insertions, 25 deletions
diff --git a/luz/src/connection/mod.rs b/lampada/src/connection/mod.rs
index 288de70..1e767b0 100644
--- a/luz/src/connection/mod.rs
+++ b/lampada/src/connection/mod.rs
@@ -7,8 +7,8 @@ use std::{
time::Duration,
};
-use jabber::{connection::Tls, jabber_stream::bound_stream::BoundJabberStream};
use jid::JID;
+use luz::{connection::Tls, jabber_stream::bound_stream::BoundJabberStream};
use read::{ReadControl, ReadControlHandle, ReadState};
use stanza::client::Stanza;
use tokio::{
@@ -19,9 +19,8 @@ use tracing::info;
use write::{WriteControl, WriteControlHandle, WriteHandle, WriteMessage, WriteState};
use crate::{
- db::Db,
- error::{ConnectionError, Error, ReadError, WriteError},
- Connected, Logic, LogicState, UpdateMessage,
+ error::{ConnectionError, WriteError},
+ Connected, Logic,
};
mod read;
@@ -84,7 +83,9 @@ impl<Lgc: Logic + Clone + Send + 'static> Supervisor<Lgc> {
match msg {
SupervisorCommand::Disconnect => {
info!("disconnecting");
- // TODO: do handle_disconnect here
+ self.logic
+ .handle_disconnect(self.connected.clone())
+ .await;
let _ = self.write_control_handle.send(WriteControl::Disconnect).await;
let _ = self.read_control_handle.send(ReadControl::Disconnect).await;
info!("sent disconnect command");
@@ -108,11 +109,11 @@ impl<Lgc: Logic + Clone + Send + 'static> Supervisor<Lgc> {
// send abort to read stream, as already done, consider
let (read_state, mut write_state);
match state {
- // TODO: proper state things for read and write thread
ChildState::Write(receiver) => {
write_state = receiver;
let (send, recv) = oneshot::channel();
let _ = self.read_control_handle.send(ReadControl::Abort(send)).await;
+ // TODO: need a tokio select, in case the state arrives from somewhere else
if let Ok(state) = recv.await {
read_state = state;
} else {
@@ -135,7 +136,7 @@ impl<Lgc: Logic + Clone + Send + 'static> Supervisor<Lgc> {
let mut jid = self.connected.jid.clone();
let mut domain = jid.domainpart.clone();
// TODO: make sure connect_and_login does not modify the jid, but instead returns a jid. or something like that
- let connection = jabber::connect_and_login(&mut jid, &*self.password, &mut domain).await;
+ let connection = luz::connect_and_login(&mut jid, &*self.password, &mut domain).await;
match connection {
Ok(c) => {
let (read, write) = c.split();
@@ -182,7 +183,7 @@ impl<Lgc: Logic + Clone + Send + 'static> Supervisor<Lgc> {
let mut jid = self.connected.jid.clone();
let mut domain = jid.domainpart.clone();
// TODO: same here
- let connection = jabber::connect_and_login(&mut jid, &*self.password, &mut domain).await;
+ let connection = luz::connect_and_login(&mut jid, &*self.password, &mut domain).await;
match connection {
Ok(c) => {
let (read, write) = c.split();
@@ -226,7 +227,7 @@ impl<Lgc: Logic + Clone + Send + 'static> Supervisor<Lgc> {
let mut jid = self.connected.jid.clone();
let mut domain = jid.domainpart.clone();
- let connection = jabber::connect_and_login(&mut jid, &*self.password, &mut domain).await;
+ let connection = luz::connect_and_login(&mut jid, &*self.password, &mut domain).await;
match connection {
Ok(c) => {
let (read, write) = c.split();
diff --git a/luz/src/connection/read.rs b/lampada/src/connection/read.rs
index 4e55bc5..cc69387 100644
--- a/luz/src/connection/read.rs
+++ b/lampada/src/connection/read.rs
@@ -7,24 +7,15 @@ use std::{
time::Duration,
};
-use chrono::{DateTime, Utc};
-use jabber::{connection::Tls, jabber_stream::bound_stream::BoundJabberReader};
+use luz::{connection::Tls, jabber_stream::bound_stream::BoundJabberReader};
use stanza::client::Stanza;
use tokio::{
sync::{mpsc, oneshot, Mutex},
task::{JoinHandle, JoinSet},
};
use tracing::info;
-use uuid::Uuid;
-
-use crate::{
- chat::{Body, Message},
- db::Db,
- error::{Error, IqError, MessageRecvError, PresenceError, ReadError, RosterError},
- presence::{Offline, Online, Presence, PresenceType, Show},
- roster::Contact,
- Connected, Logic, LogicState, UpdateMessage,
-};
+
+use crate::{Connected, Logic};
use super::{write::WriteHandle, SupervisorCommand, SupervisorSender};
diff --git a/luz/src/connection/write.rs b/lampada/src/connection/write.rs
index ff78b81..8f0c34b 100644
--- a/luz/src/connection/write.rs
+++ b/lampada/src/connection/write.rs
@@ -1,6 +1,6 @@
use std::ops::{Deref, DerefMut};
-use jabber::{connection::Tls, jabber_stream::bound_stream::BoundJabberWriter};
+use luz::{connection::Tls, jabber_stream::bound_stream::BoundJabberWriter};
use stanza::client::Stanza;
use tokio::{
sync::{mpsc, oneshot},
diff --git a/lampada/src/error.rs b/lampada/src/error.rs
new file mode 100644
index 0000000..cdfb4db
--- /dev/null
+++ b/lampada/src/error.rs
@@ -0,0 +1,82 @@
+use std::sync::Arc;
+
+use stanza::client::Stanza;
+use thiserror::Error;
+use tokio::{
+ sync::{mpsc::error::SendError, oneshot::error::RecvError},
+ time::error::Elapsed,
+};
+
+#[derive(Debug, Error, Clone)]
+pub enum ConnectionError {
+ #[error("connection failed: {0}")]
+ ConnectionFailed(#[from] luz::Error),
+ #[error("already connected")]
+ AlreadyConnected,
+ #[error("already disconnected")]
+ AlreadyDisconnected,
+ #[error("lost connection")]
+ LostConnection,
+ // TODO: Display for Content
+ #[error("disconnected")]
+ Disconnected,
+}
+
+#[derive(Debug, Error, Clone)]
+pub enum CommandError<T> {
+ #[error("actor: {0}")]
+ Actor(ActorError),
+ #[error("{0}")]
+ Error(#[from] T),
+}
+
+#[derive(Debug, Error, Clone)]
+pub enum WriteError {
+ #[error("xml: {0}")]
+ XML(#[from] peanuts::Error),
+ #[error("lost connection")]
+ LostConnection,
+ // TODO: should this be in writeerror or separate?
+ #[error("actor: {0}")]
+ Actor(#[from] ActorError),
+ #[error("disconnected")]
+ Disconnected,
+}
+
+// TODO: separate peanuts read and write error?
+// TODO: which crate
+#[derive(Debug, Error, Clone)]
+pub enum ReadError {
+ #[error("xml: {0}")]
+ XML(#[from] peanuts::Error),
+ #[error("lost connection")]
+ LostConnection,
+}
+
+#[derive(Debug, Error, Clone)]
+pub enum ActorError {
+ #[error("receive timed out")]
+ Timeout,
+ #[error("could not send message to actor, channel closed")]
+ Send,
+ #[error("could not receive message from actor, channel closed")]
+ Receive,
+}
+
+impl From<Elapsed> for ActorError {
+ fn from(_e: Elapsed) -> Self {
+ Self::Timeout
+ }
+}
+
+impl<T> From<SendError<T>> for ActorError {
+ fn from(_e: SendError<T>) -> Self {
+ Self::Send
+ }
+}
+
+impl From<RecvError> for ActorError {
+ fn from(_e: RecvError) -> Self {
+ Self::Receive
+ }
+}
diff --git a/lampada/src/lib.rs b/lampada/src/lib.rs
new file mode 100644
index 0000000..c61c596
--- /dev/null
+++ b/lampada/src/lib.rs
@@ -0,0 +1,238 @@
+use std::{
+ collections::HashMap,
+ ops::{Deref, DerefMut},
+ str::FromStr,
+ sync::Arc,
+ time::Duration,
+};
+
+pub use connection::write::WriteMessage;
+pub use connection::SupervisorSender;
+use error::ConnectionError;
+use futures::{future::Fuse, FutureExt};
+use luz::JID;
+use stanza::client::{
+ iq::{self, Iq, IqType},
+ Stanza,
+};
+use tokio::{
+ sync::{mpsc, oneshot, Mutex},
+ task::JoinSet,
+ time::timeout,
+};
+use tracing::{debug, info};
+
+use crate::connection::write::WriteHandle;
+use crate::connection::{SupervisorCommand, SupervisorHandle};
+
+mod connection;
+pub mod error;
+
+#[derive(Clone)]
+pub struct Connected {
+ // full jid will stay stable across reconnections
+ jid: JID,
+ write_handle: WriteHandle,
+}
+
+impl Connected {
+ pub fn jid(&self) -> &JID {
+ &self.jid
+ }
+
+ pub fn write_handle(&self) -> &WriteHandle {
+ &self.write_handle
+ }
+}
+
+/// everything that a particular xmpp client must implement
+pub trait Logic {
+ /// the command message type
+ type Cmd;
+
+ /// run after binding to the stream (e.g. for a chat client, )
+ fn handle_connect(self, connection: Connected) -> impl std::future::Future<Output = ()> + Send;
+
+ /// run before closing the stream (e.g. send unavailable presence in a chat client)
+ fn handle_disconnect(
+ self,
+ connection: Connected,
+ ) -> impl std::future::Future<Output = ()> + Send;
+
+ /// run to handle an incoming xmpp stanza
+ fn handle_stanza(
+ self,
+ stanza: Stanza,
+ connection: Connected,
+ supervisor: SupervisorSender,
+ ) -> impl std::future::Future<Output = ()> + std::marker::Send;
+
+ /// run to handle a command message when a connection is currently established
+ fn handle_online(
+ self,
+ command: Self::Cmd,
+ connection: Connected,
+ ) -> impl std::future::Future<Output = ()> + std::marker::Send;
+
+ /// run to handle a command message when disconnected
+ fn handle_offline(
+ self,
+ command: Self::Cmd,
+ ) -> impl std::future::Future<Output = ()> + std::marker::Send;
+
+ /// run as cleanup after either an abort or a disconnect (e.g. reply to all pending requests with a disconnected error)
+ fn on_abort(self) -> impl std::future::Future<Output = ()> + std::marker::Send;
+
+ /// handle connection errors from the core client logic
+ fn handle_connection_error(
+ self,
+ error: ConnectionError,
+ ) -> impl std::future::Future<Output = ()> + std::marker::Send;
+
+ // async fn handle_stream_error(self, error) {}
+}
+
+/// an actor that implements xmpp core (rfc6120), manages connection/stream status, and delegates any other logic to the generic which implements Logic, allowing different kinds of clients (e.g. chat, social, pubsub) to be built upon the same core
+pub struct CoreClient<Lgc: Logic> {
+ jid: JID,
+ // TODO: use a dyn passwordprovider trait to avoid storing password in memory
+ password: Arc<String>,
+ receiver: mpsc::Receiver<CoreClientCommand<Lgc::Cmd>>,
+ connected: Option<(Connected, SupervisorHandle)>,
+ // TODO: will need to have an auto reconnect state as well (e.g. in case server shut down, to try and reconnect later)
+ // connected_intention: bool,
+ /// if connection was shut down due to e.g. server shutdown, supervisor must be able to mark client as disconnected
+ connection_supervisor_shutdown: Fuse<oneshot::Receiver<()>>,
+ logic: Lgc,
+ // config: LampConfig,
+ // TODO: will grow forever at this point, maybe not required as tasks will naturally shut down anyway?
+ tasks: JoinSet<()>,
+}
+
+impl<Lgc: Logic + Clone + Send + 'static> CoreClient<Lgc> {
+ /// create a new actor
+ pub fn new(
+ jid: JID,
+ password: String,
+ receiver: mpsc::Receiver<CoreClientCommand<Lgc::Cmd>>,
+ connected: Option<(Connected, SupervisorHandle)>,
+ connection_supervisor_shutdown: Fuse<oneshot::Receiver<()>>,
+ logic: Lgc,
+ ) -> Self {
+ Self {
+ jid,
+ password: Arc::new(password),
+ connected,
+ receiver,
+ connection_supervisor_shutdown,
+ logic,
+ tasks: JoinSet::new(),
+ }
+ }
+
+ /// run the actor
+ pub async fn run(mut self) {
+ loop {
+ let msg = tokio::select! {
+ // this is okay, as when created the supervisor (and connection) doesn't exist, but a bit messy
+ // THIS IS NOT OKAY LOLLLL - apparently fusing is the best option???
+ _ = &mut self.connection_supervisor_shutdown => {
+ self.connected = None;
+ continue;
+ }
+ Some(msg) = self.receiver.recv() => {
+ msg
+ },
+ else => break,
+ };
+ match msg {
+ CoreClientCommand::Connect => {
+ match self.connected {
+ Some(_) => {
+ self.logic
+ .clone()
+ .handle_connection_error(ConnectionError::AlreadyConnected)
+ .await;
+ }
+ None => {
+ let mut jid = self.jid.clone();
+ let mut domain = jid.domainpart.clone();
+ // TODO: check what happens upon reconnection with same resource (this is probably what one wants to do and why jid should be mutated from a bare jid to one with a resource)
+ let streams_result =
+ luz::connect_and_login(&mut jid, &*self.password, &mut domain)
+ .await;
+ match streams_result {
+ Ok(s) => {
+ debug!("ok stream result");
+ let (shutdown_send, shutdown_recv) = oneshot::channel::<()>();
+ let (writer, supervisor) = SupervisorHandle::new(
+ s,
+ shutdown_send,
+ jid.clone(),
+ self.password.clone(),
+ self.logic.clone(),
+ );
+
+ let shutdown_recv = shutdown_recv.fuse();
+ self.connection_supervisor_shutdown = shutdown_recv;
+
+ let connected = Connected {
+ jid,
+ write_handle: writer,
+ };
+
+ self.logic.clone().handle_connect(connected.clone()).await;
+
+ self.connected = Some((connected, supervisor));
+ }
+ Err(e) => {
+ tracing::error!("error: {}", e);
+ self.logic
+ .clone()
+ .handle_connection_error(ConnectionError::ConnectionFailed(
+ e.into(),
+ ))
+ .await;
+ }
+ }
+ }
+ };
+ }
+ CoreClientCommand::Disconnect => match self.connected {
+ None => {
+ self.logic
+ .clone()
+ .handle_connection_error(ConnectionError::AlreadyDisconnected)
+ .await;
+ }
+ ref mut c => {
+ if let Some((connected, supervisor_handle)) = c.take() {
+ let _ = supervisor_handle.send(SupervisorCommand::Disconnect).await;
+ } else {
+ unreachable!()
+ };
+ }
+ },
+ CoreClientCommand::Command(command) => {
+ match self.connected.as_ref() {
+ Some((w, s)) => self
+ .tasks
+ .spawn(self.logic.clone().handle_online(command, w.clone())),
+ None => self.tasks.spawn(self.logic.clone().handle_offline(command)),
+ };
+ }
+ }
+ }
+ }
+}
+
+// TODO: generate methods for each with a macro
+pub enum CoreClientCommand<C> {
+ // TODO: login invisible xep-0186
+ /// connect to XMPP chat server. gets roster and publishes initial presence.
+ Connect,
+ /// disconnect from XMPP chat server, sending unavailable presence then closing stream.
+ Disconnect,
+ /// TODO: generics
+ Command(C),
+}
diff --git a/luz/src/main.rs b/lampada/src/main.rs
index 5aeef14..7b7469d 100644
--- a/luz/src/main.rs
+++ b/lampada/src/main.rs
@@ -1,7 +1,7 @@
use std::{path::Path, str::FromStr, time::Duration};
use jid::JID;
-use luz::{db::Db, LuzHandle, LuzMessage};
+use lampada::{db::Db, CoreClientCommand, LuzHandle};
use sqlx::SqlitePool;
use tokio::{
io::{AsyncReadExt, AsyncWriteExt},
@@ -24,11 +24,11 @@ async fn main() {
}
});
- luz.send(LuzMessage::Connect).await.unwrap();
+ luz.send(CoreClientCommand::Connect).await.unwrap();
let (send, recv) = oneshot::channel();
tokio::time::sleep(Duration::from_secs(5)).await;
info!("sending message");
- luz.send(LuzMessage::SendMessage(
+ luz.send(CoreClientCommand::SendMessage(
JID::from_str("cel@blos.sm").unwrap(),
luz::chat::Body {
body: "hallo!!!".to_string(),