aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorLibravatar cel 🌸 <cel@bunny.garden>2025-01-02 17:48:12 +0000
committerLibravatar cel 🌸 <cel@bunny.garden>2025-01-02 17:48:12 +0000
commit0e5f09b2bd05690f3d28f7076629031fcc2cc6e6 (patch)
tree29409764a94d570c8d9e929f1751a678355cb6ee
parent89351e956368ad3112127570ef03dd2547730ce5 (diff)
downloadluz-0e5f09b2bd05690f3d28f7076629031fcc2cc6e6.tar.gz
luz-0e5f09b2bd05690f3d28f7076629031fcc2cc6e6.tar.bz2
luz-0e5f09b2bd05690f3d28f7076629031fcc2cc6e6.zip
WIP: client
-rw-r--r--README.md1
-rw-r--r--jabber/src/client.rs5
-rw-r--r--jabber/src/jabber_stream/bound_stream.rs1
-rw-r--r--luz/Cargo.toml6
-rw-r--r--luz/src/lib.rs171
-rw-r--r--stanza/src/client/iq.rs6
-rw-r--r--stanza/src/roster.rs7
7 files changed, 194 insertions, 3 deletions
diff --git a/README.md b/README.md
index 63094ae..b40173a 100644
--- a/README.md
+++ b/README.md
@@ -20,6 +20,7 @@
- [x] rfc 7590: tls
- [x] xep-0368: srv records for xmpp over tls
- [ ] server side downgrade protection for sasl
+- [x] xep-0199: xmpp ping
- [ ] xep-0030: service discovery
- [ ] xep-0115: entity capabilities
- [ ] xep-0163: pep
diff --git a/jabber/src/client.rs b/jabber/src/client.rs
index f5d5dc7..2e59d98 100644
--- a/jabber/src/client.rs
+++ b/jabber/src/client.rs
@@ -26,6 +26,7 @@ use crate::{
pub struct JabberClient {
connection: ConnectionState,
jid: JID,
+ // TODO: have reconnection be handled by another part, so creds don't need to be stored in object
password: Arc<SASLConfig>,
server: String,
}
@@ -49,6 +50,10 @@ impl JabberClient {
})
}
+ pub fn jid(&self) -> JID {
+ self.jid.clone()
+ }
+
pub async fn connect(&mut self) -> Result<()> {
match &self.connection {
ConnectionState::Disconnected => {
diff --git a/jabber/src/jabber_stream/bound_stream.rs b/jabber/src/jabber_stream/bound_stream.rs
index c0d67b0..627158a 100644
--- a/jabber/src/jabber_stream/bound_stream.rs
+++ b/jabber/src/jabber_stream/bound_stream.rs
@@ -63,6 +63,7 @@ where
if let Some(_write_handle) = this.write_handle {
panic!("start_send called without poll_ready")
} else {
+ // TODO: switch to buffer of one rather than thread spawning and joining
*this.write_handle = Some(tokio::spawn(write(this.writer.clone(), item)));
Ok(())
}
diff --git a/luz/Cargo.toml b/luz/Cargo.toml
index 11d4197..646d8e8 100644
--- a/luz/Cargo.toml
+++ b/luz/Cargo.toml
@@ -4,3 +4,9 @@ version = "0.1.0"
edition = "2021"
[dependencies]
+futures = "0.3.31"
+jabber = { version = "0.1.0", path = "../jabber" }
+stanza = { version = "0.1.0", path = "../stanza" }
+tokio = "1.42.0"
+tokio-stream = "0.1.17"
+tokio-util = "0.7.13"
diff --git a/luz/src/lib.rs b/luz/src/lib.rs
new file mode 100644
index 0000000..1a750a3
--- /dev/null
+++ b/luz/src/lib.rs
@@ -0,0 +1,171 @@
+use std::{
+ collections::{HashMap, HashSet, VecDeque},
+ pin::pin,
+ task::{ready, Poll},
+ thread::JoinHandle,
+};
+
+use futures::{
+ stream::{SplitSink, SplitStream},
+ Sink, SinkExt, Stream, StreamExt,
+};
+use jabber::{client::JabberClient, JID};
+use stanza::{
+ client::{
+ iq::{Iq, IqType, Query},
+ Stanza,
+ },
+ roster,
+};
+use tokio::sync::mpsc;
+use tokio_stream::wrappers::ReceiverStream;
+use tokio_util::sync::{PollSendError, PollSender};
+
+pub struct Client {
+ client: JabberClient,
+ pending_iqs: HashMap<String, mpsc::Sender<Iq>>,
+ // database connection (sqlite)
+ receiver: ReceiverStream<UpdateMessage>,
+ sender: PollSender<CommandMessage>,
+}
+
+impl Client {
+ pub async fn new(jid: String, password: &str) -> Result<Self, Error> {
+ let (read_sender, read_receiver) = mpsc::channel::<UpdateMessage>(20);
+ let (write_sender, write_receiver) = mpsc::channel::<CommandMessage>(20);
+ let mut jabber_client = JabberClient::new(jid, password)?;
+ jabber_client.connect().await?;
+ let (write, read) = jabber_client.split();
+ let client = Self {
+ client: jabber_client,
+ receiver: ReceiverStream::new(read_receiver),
+ sender: PollSender::new(write_sender),
+ pending_iqs: HashMap::new(),
+ };
+ tokio::spawn(client.process_read(read, read_sender));
+ tokio::spawn(client.process_write(write, write_receiver));
+ Ok(client)
+ }
+
+ pub async fn process_read(
+ &self,
+ mut stream: SplitStream<JabberClient>,
+ sender: mpsc::Sender<UpdateMessage>,
+ ) {
+ for stanza in stream.next().await {
+ tokio::spawn(self.process_stanza(stanza, sender.clone()));
+ }
+ }
+
+ pub async fn process_write(
+ &self,
+ mut sink: SplitSink<JabberClient, Stanza>,
+ receiver: mpsc::Receiver<CommandMessage>,
+ ) {
+ for message in receiver.recv_many(, )
+ }
+}
+
+pub enum Error {
+ PollSend(PollSendError<CommandMessage>),
+ Jabber(jabber::Error),
+}
+
+impl From<jabber::Error> for Error {
+ fn from(e: jabber::Error) -> Self {
+ Self::Jabber(e)
+ }
+}
+
+impl Stream for Client {
+ type Item = UpdateMessage;
+
+ fn poll_next(
+ self: std::pin::Pin<&mut Self>,
+ cx: &mut std::task::Context<'_>,
+ ) -> std::task::Poll<Option<Self::Item>> {
+ pin!(self).receiver.poll_next_unpin(cx)
+ }
+}
+
+impl Sink<CommandMessage> for Client {
+ type Error = Error;
+
+ fn poll_ready(
+ self: std::pin::Pin<&mut Self>,
+ cx: &mut std::task::Context<'_>,
+ ) -> Poll<Result<(), Self::Error>> {
+ Poll::Ready(ready!(pin!(self).sender.poll_ready_unpin(cx)))
+ }
+
+ fn start_send(self: std::pin::Pin<&mut Self>, item: CommandMessage) -> Result<(), Self::Error> {
+ todo!()
+ }
+
+ fn poll_flush(
+ self: std::pin::Pin<&mut Self>,
+ cx: &mut std::task::Context<'_>,
+ ) -> Poll<Result<(), Self::Error>> {
+ todo!()
+ }
+
+ fn poll_close(
+ self: std::pin::Pin<&mut Self>,
+ cx: &mut std::task::Context<'_>,
+ ) -> Poll<Result<(), Self::Error>> {
+ todo!()
+ }
+}
+
+impl From<PollSendError<CommandMessage>> for Error {
+ fn from(e: PollSendError<CommandMessage>) -> Self {
+ Self::PollSend(e)
+ }
+}
+
+pub enum CommandMessage {
+ Connect,
+ GetRoster,
+ SendMessage(JID, String),
+}
+
+pub enum UpdateMessage {
+ Roster(Vec<roster::Item>),
+}
+
+impl Client {
+ pub async fn process_stanza(
+ &mut self,
+ stanza: Result<Stanza, jabber::Error>,
+ sender: mpsc::Sender<UpdateMessage>,
+ ) {
+ match stanza {
+ Ok(stanza) => todo!(),
+ Err(e) => self.process_error(e),
+ }
+ }
+
+ pub async fn iq(
+ &mut self,
+ to: Option<JID>,
+ r#type: IqType,
+ query: Option<Query>,
+ ) -> Result<IqResponse, Error> {
+ self.client
+ .send(Stanza::Iq(Iq {
+ from: Some(self.client.jid()),
+ // TODO: generate id
+ id: "test".to_string(),
+ to,
+ r#type,
+ // TODO: lang
+ lang: None,
+ query,
+ errors: Vec::new(),
+ }))
+ .await?;
+ Ok(todo!())
+ }
+
+ pub async fn iq_process(&mut self, iq: Iq) {}
+}
diff --git a/stanza/src/client/iq.rs b/stanza/src/client/iq.rs
index 6ee80ea..2e87636 100644
--- a/stanza/src/client/iq.rs
+++ b/stanza/src/client/iq.rs
@@ -9,6 +9,7 @@ use peanuts::{
use crate::{
bind::{self, Bind},
client::error::Error,
+ roster,
xep_0199::{self, Ping},
};
@@ -31,6 +32,7 @@ pub struct Iq {
pub enum Query {
Bind(Bind),
Ping(Ping),
+ Roster(roster::Query),
Unsupported,
}
@@ -39,6 +41,9 @@ impl FromElement for Query {
match element.identify() {
(Some(bind::XMLNS), "bind") => Ok(Query::Bind(Bind::from_element(element)?)),
(Some(xep_0199::XMLNS), "ping") => Ok(Query::Ping(Ping::from_element(element)?)),
+ (Some(roster::XMLNS), "query") => {
+ Ok(Query::Roster(roster::Query::from_element(element)?))
+ }
_ => Ok(Query::Unsupported),
}
}
@@ -49,6 +54,7 @@ impl IntoElement for Query {
match self {
Query::Bind(bind) => bind.builder(),
Query::Ping(ping) => ping.builder(),
+ Query::Roster(query) => query.builder(),
// TODO: consider what to do if attempt to serialize unsupported
Query::Unsupported => todo!(),
}
diff --git a/stanza/src/roster.rs b/stanza/src/roster.rs
index b49fcc3..9209fad 100644
--- a/stanza/src/roster.rs
+++ b/stanza/src/roster.rs
@@ -8,6 +8,7 @@ use peanuts::{
pub const XMLNS: &str = "jabber:iq:roster";
+#[derive(Debug, Clone)]
pub struct Query {
ver: Option<String>,
items: Vec<Item>,
@@ -33,7 +34,7 @@ impl IntoElement for Query {
}
}
-#[derive(Clone)]
+#[derive(Clone, Debug)]
pub struct Item {
approved: Option<bool>,
ask: bool,
@@ -95,7 +96,7 @@ impl IntoElement for Item {
}
}
-#[derive(Default, Clone, Copy)]
+#[derive(Default, Clone, Copy, Debug)]
pub enum Subscription {
Both,
From,
@@ -132,7 +133,7 @@ impl ToString for Subscription {
}
}
-#[derive(Clone)]
+#[derive(Clone, Debug)]
pub struct Group(Option<String>);
impl FromElement for Group {