aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorLibravatar cel 🌸 <cel@bunny.garden>2025-02-11 22:20:16 +0000
committerLibravatar cel 🌸 <cel@bunny.garden>2025-02-11 22:20:16 +0000
commitad0054ea56747abc6454aa81f20b9c0653aa0da1 (patch)
tree8014b679cee29baaa9b5d103464ed6f91dd1f383
parent36348285317f6e073581479821564ddf825777c7 (diff)
downloadluz-ad0054ea56747abc6454aa81f20b9c0653aa0da1.tar.gz
luz-ad0054ea56747abc6454aa81f20b9c0653aa0da1.tar.bz2
luz-ad0054ea56747abc6454aa81f20b9c0653aa0da1.zip
actors complete ?
-rw-r--r--luz/.gitignore1
-rw-r--r--luz/Cargo.toml2
-rw-r--r--luz/src/connection/read.rs30
-rw-r--r--luz/src/connection/write.rs2
-rw-r--r--luz/src/error.rs1
-rw-r--r--luz/src/lib.rs95
-rw-r--r--luz/src/main.rs24
7 files changed, 129 insertions, 26 deletions
diff --git a/luz/.gitignore b/luz/.gitignore
new file mode 100644
index 0000000..1a2cec2
--- /dev/null
+++ b/luz/.gitignore
@@ -0,0 +1 @@
+luz.db
diff --git a/luz/Cargo.toml b/luz/Cargo.toml
index 6b2066b..ebff7d9 100644
--- a/luz/Cargo.toml
+++ b/luz/Cargo.toml
@@ -13,3 +13,5 @@ stanza = { version = "0.1.0", path = "../stanza" }
tokio = "1.42.0"
tokio-stream = "0.1.17"
tokio-util = "0.7.13"
+tracing = "0.1.41"
+tracing-subscriber = "0.3.19"
diff --git a/luz/src/connection/read.rs b/luz/src/connection/read.rs
index c1e37b4..3c61780 100644
--- a/luz/src/connection/read.rs
+++ b/luz/src/connection/read.rs
@@ -80,10 +80,15 @@ impl Read {
}
async fn run(mut self) {
+ println!("started read thread");
+ // let stanza = self.stream.read::<Stanza>().await;
+ // println!("{:?}", stanza);
loop {
tokio::select! {
// if still haven't received the end tag in time, just kill itself
- _ = &mut self.disconnect_timedout => {
+ // TODO: is this okay??? what if notification thread dies?
+ Ok(()) = &mut self.disconnect_timedout => {
+ println!("disconnect_timedout");
break;
}
Some(msg) = self.control_receiver.recv() => {
@@ -99,17 +104,19 @@ impl Read {
})
},
ReadControl::Abort(sender) => {
- let _ = sender.send((self.db, self.update_sender, self.tasks, self.supervisor_control, self.write_handle, self.pending_iqs));
+ let _ = sender.send((self.db, self.update_sender, self.tasks, self.supervisor_control, self.write_handle, self.pending_iqs.clone()));
break;
},
};
},
- stanza = self.stream.read::<Stanza>() => {
- match stanza {
+ s = self.stream.read::<Stanza>() => {
+ println!("read stanza");
+ match s {
Ok(s) => {
self.tasks.spawn(handle_stanza(s, self.update_sender.clone(), self.db.clone(), self.supervisor_control.clone(), self.write_handle.clone()));
},
Err(e) => {
+ println!("error: {:?}", e);
// TODO: NEXT write the correct error stanza depending on error, decide whether to reconnect or properly disconnect, depending on if disconnecting is true
// match e {
// peanuts::Error::ReadError(error) => todo!(),
@@ -134,7 +141,7 @@ impl Read {
break;
} else {
// AAAAAAAAAAAAAAAAAAAAA i should really just have this stored in the supervisor and not gaf bout passing these references around
- let _ = self.on_crash.send((self.db, self.update_sender, self.tasks, self.supervisor_control, self.write_handle, self.pending_iqs));
+ let _ = self.on_crash.send((self.db, self.update_sender, self.tasks, self.supervisor_control, self.write_handle, self.pending_iqs.clone()));
}
break;
},
@@ -142,11 +149,12 @@ impl Read {
},
else => break
}
- // when it aborts, must clear iq map no matter what
- let mut iqs = self.pending_iqs.lock().await;
- for (_id, sender) in iqs.drain() {
- let _ = sender.send(Err(Error::LostConnection));
- }
+ }
+ println!("stopping read thread");
+ // when it aborts, must clear iq map no matter what
+ let mut iqs = self.pending_iqs.lock().await;
+ for (_id, sender) in iqs.drain() {
+ let _ = sender.send(Err(Error::LostConnection));
}
}
}
@@ -163,7 +171,7 @@ async fn handle_stanza(
supervisor_control: mpsc::Sender<SupervisorCommand>,
write_handle: WriteHandle,
) {
- todo!()
+ println!("{:?}", stanza)
}
pub enum ReadControl {
diff --git a/luz/src/connection/write.rs b/luz/src/connection/write.rs
index 09638a8..18dba5c 100644
--- a/luz/src/connection/write.rs
+++ b/luz/src/connection/write.rs
@@ -18,7 +18,7 @@ pub struct Write {
}
pub struct WriteMessage {
- stanza: Stanza,
+ pub stanza: Stanza,
pub respond_to: oneshot::Sender<Result<(), Error>>,
}
diff --git a/luz/src/error.rs b/luz/src/error.rs
index 2809e8d..6c3fb5d 100644
--- a/luz/src/error.rs
+++ b/luz/src/error.rs
@@ -1,3 +1,4 @@
+#[derive(Debug)]
pub enum Error {
AlreadyConnected,
Jabber(jabber::Error),
diff --git a/luz/src/lib.rs b/luz/src/lib.rs
index 9d8ea66..4d36a81 100644
--- a/luz/src/lib.rs
+++ b/luz/src/lib.rs
@@ -1,9 +1,19 @@
-use std::{collections::HashMap, sync::Arc};
+use std::{
+ collections::HashMap,
+ ops::{Deref, DerefMut},
+ sync::Arc,
+};
-use connection::SupervisorSender;
+use connection::{write::WriteMessage, SupervisorSender};
use jabber::JID;
use sqlx::SqlitePool;
-use stanza::{client::Stanza, roster};
+use stanza::{
+ client::{
+ iq::{self, Iq, IqType},
+ Stanza,
+ },
+ roster::{self, Query},
+};
use tokio::{
sync::{mpsc, oneshot, Mutex},
task::JoinSet,
@@ -58,6 +68,7 @@ impl Luz {
async fn run(mut self) {
loop {
tokio::select! {
+ // this is okay, as when created the supervisor (and connection) doesn't exist, but a bit messy
_ = &mut self.connection_supervisor_shutdown => {
*self.connected.lock().await = None
}
@@ -66,7 +77,8 @@ impl Luz {
// TODO: dispatch commands separate tasks
match msg {
CommandMessage::Connect => {
- match self.connected.lock().await.as_ref() {
+ let mut connection_lock = self.connected.lock().await;
+ match connection_lock.as_ref() {
Some(_) => {
self.sender
.send(UpdateMessage::Error(Error::AlreadyConnected))
@@ -92,7 +104,10 @@ impl Luz {
self.pending_iqs.clone(),
);
self.connection_supervisor_shutdown = shutdown_recv;
- *self.connected.lock().await = Some((writer, supervisor));
+ *connection_lock = Some((writer, supervisor));
+ self.sender
+ .send(UpdateMessage::Connected)
+ .await;
}
Err(e) => {
self.sender.send(UpdateMessage::Error(e.into()));
@@ -161,18 +176,70 @@ impl CommandMessage {
sender: mpsc::Sender<UpdateMessage>,
pending_iqs: Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Error>>>>>,
) {
- todo!()
+ match self {
+ CommandMessage::Connect => unreachable!(),
+ CommandMessage::Disconnect => unreachable!(),
+ CommandMessage::GetRoster => {
+ // TODO: jid resource should probably be stored within the connection
+ let owned_jid: JID;
+ {
+ owned_jid = jid.lock().await.clone();
+ }
+ let stanza = Stanza::Iq(Iq {
+ from: Some(owned_jid),
+ id: "getting-roster".to_string(),
+ to: None,
+ r#type: IqType::Get,
+ lang: None,
+ query: Some(iq::Query::Roster(roster::Query {
+ ver: None,
+ items: Vec::new(),
+ })),
+ errors: Vec::new(),
+ });
+ let (send, recv) = oneshot::channel();
+ let _ = write_handle
+ .send(WriteMessage {
+ stanza,
+ respond_to: send,
+ })
+ .await;
+ match recv.await {
+ Ok(Ok(())) => println!("roster request sent"),
+ e => println!("error: {:?}", e),
+ };
+ }
+ CommandMessage::SendMessage(jid, _) => todo!(),
+ }
}
}
// TODO: separate sender and receiver, store handle to Luz process to ensure dropping
+// #[derive(Clone)]
pub struct LuzHandle {
sender: mpsc::Sender<CommandMessage>,
- receiver: mpsc::Receiver<UpdateMessage>,
+}
+
+impl Deref for LuzHandle {
+ type Target = mpsc::Sender<CommandMessage>;
+
+ fn deref(&self) -> &Self::Target {
+ &self.sender
+ }
+}
+
+impl DerefMut for LuzHandle {
+ fn deref_mut(&mut self) -> &mut Self::Target {
+ &mut self.sender
+ }
}
impl LuzHandle {
- pub fn new(jid: JID, password: String, db: SqlitePool) -> Self {
+ pub fn new(
+ jid: JID,
+ password: String,
+ db: SqlitePool,
+ ) -> (Self, mpsc::Receiver<UpdateMessage>) {
let (command_sender, command_receiver) = mpsc::channel(20);
let (update_sender, update_receiver) = mpsc::channel(20);
// might be bad, first supervisor shutdown notification oneshot is never used (disgusting)
@@ -189,10 +256,12 @@ impl LuzHandle {
);
tokio::spawn(async move { actor.run().await });
- Self {
- sender: command_sender,
- receiver: update_receiver,
- }
+ (
+ Self {
+ sender: command_sender,
+ },
+ update_receiver,
+ )
}
}
@@ -204,7 +273,9 @@ pub enum CommandMessage {
SendMessage(JID, String),
}
+#[derive(Debug)]
pub enum UpdateMessage {
Error(Error),
+ Connected,
Roster(Vec<roster::Item>),
}
diff --git a/luz/src/main.rs b/luz/src/main.rs
index e7a11a9..7b3815f 100644
--- a/luz/src/main.rs
+++ b/luz/src/main.rs
@@ -1,3 +1,23 @@
-fn main() {
- println!("Hello, world!");
+use std::time::Duration;
+
+use luz::{CommandMessage, LuzHandle};
+use sqlx::SqlitePool;
+use tokio::io::{AsyncReadExt, AsyncWriteExt};
+
+#[tokio::main]
+async fn main() {
+ tracing_subscriber::fmt::init();
+ let db = SqlitePool::connect("./luz.db").await.unwrap();
+ let (luz, mut recv) =
+ LuzHandle::new("test@blos.sm".try_into().unwrap(), "slayed".to_string(), db);
+
+ tokio::spawn(async move {
+ while let Some(msg) = recv.recv().await {
+ println!("{:#?}", msg)
+ }
+ });
+
+ luz.send(CommandMessage::Connect).await.unwrap();
+ luz.send(CommandMessage::GetRoster).await.unwrap();
+ tokio::time::sleep(Duration::from_secs(15)).await;
}