diff options
author | 2025-01-02 17:48:12 +0000 | |
---|---|---|
committer | 2025-01-02 17:48:12 +0000 | |
commit | 0e5f09b2bd05690f3d28f7076629031fcc2cc6e6 (patch) | |
tree | 29409764a94d570c8d9e929f1751a678355cb6ee | |
parent | 89351e956368ad3112127570ef03dd2547730ce5 (diff) | |
download | luz-0e5f09b2bd05690f3d28f7076629031fcc2cc6e6.tar.gz luz-0e5f09b2bd05690f3d28f7076629031fcc2cc6e6.tar.bz2 luz-0e5f09b2bd05690f3d28f7076629031fcc2cc6e6.zip |
WIP: client
-rw-r--r-- | README.md | 1 | ||||
-rw-r--r-- | jabber/src/client.rs | 5 | ||||
-rw-r--r-- | jabber/src/jabber_stream/bound_stream.rs | 1 | ||||
-rw-r--r-- | luz/Cargo.toml | 6 | ||||
-rw-r--r-- | luz/src/lib.rs | 171 | ||||
-rw-r--r-- | stanza/src/client/iq.rs | 6 | ||||
-rw-r--r-- | stanza/src/roster.rs | 7 |
7 files changed, 194 insertions, 3 deletions
@@ -20,6 +20,7 @@ - [x] rfc 7590: tls - [x] xep-0368: srv records for xmpp over tls - [ ] server side downgrade protection for sasl +- [x] xep-0199: xmpp ping - [ ] xep-0030: service discovery - [ ] xep-0115: entity capabilities - [ ] xep-0163: pep diff --git a/jabber/src/client.rs b/jabber/src/client.rs index f5d5dc7..2e59d98 100644 --- a/jabber/src/client.rs +++ b/jabber/src/client.rs @@ -26,6 +26,7 @@ use crate::{ pub struct JabberClient { connection: ConnectionState, jid: JID, + // TODO: have reconnection be handled by another part, so creds don't need to be stored in object password: Arc<SASLConfig>, server: String, } @@ -49,6 +50,10 @@ impl JabberClient { }) } + pub fn jid(&self) -> JID { + self.jid.clone() + } + pub async fn connect(&mut self) -> Result<()> { match &self.connection { ConnectionState::Disconnected => { diff --git a/jabber/src/jabber_stream/bound_stream.rs b/jabber/src/jabber_stream/bound_stream.rs index c0d67b0..627158a 100644 --- a/jabber/src/jabber_stream/bound_stream.rs +++ b/jabber/src/jabber_stream/bound_stream.rs @@ -63,6 +63,7 @@ where if let Some(_write_handle) = this.write_handle { panic!("start_send called without poll_ready") } else { + // TODO: switch to buffer of one rather than thread spawning and joining *this.write_handle = Some(tokio::spawn(write(this.writer.clone(), item))); Ok(()) } diff --git a/luz/Cargo.toml b/luz/Cargo.toml index 11d4197..646d8e8 100644 --- a/luz/Cargo.toml +++ b/luz/Cargo.toml @@ -4,3 +4,9 @@ version = "0.1.0" edition = "2021" [dependencies] +futures = "0.3.31" +jabber = { version = "0.1.0", path = "../jabber" } +stanza = { version = "0.1.0", path = "../stanza" } +tokio = "1.42.0" +tokio-stream = "0.1.17" +tokio-util = "0.7.13" diff --git a/luz/src/lib.rs b/luz/src/lib.rs new file mode 100644 index 0000000..1a750a3 --- /dev/null +++ b/luz/src/lib.rs @@ -0,0 +1,171 @@ +use std::{ + collections::{HashMap, HashSet, VecDeque}, + pin::pin, + task::{ready, Poll}, + thread::JoinHandle, +}; + +use futures::{ + stream::{SplitSink, SplitStream}, + Sink, SinkExt, Stream, StreamExt, +}; +use jabber::{client::JabberClient, JID}; +use stanza::{ + client::{ + iq::{Iq, IqType, Query}, + Stanza, + }, + roster, +}; +use tokio::sync::mpsc; +use tokio_stream::wrappers::ReceiverStream; +use tokio_util::sync::{PollSendError, PollSender}; + +pub struct Client { + client: JabberClient, + pending_iqs: HashMap<String, mpsc::Sender<Iq>>, + // database connection (sqlite) + receiver: ReceiverStream<UpdateMessage>, + sender: PollSender<CommandMessage>, +} + +impl Client { + pub async fn new(jid: String, password: &str) -> Result<Self, Error> { + let (read_sender, read_receiver) = mpsc::channel::<UpdateMessage>(20); + let (write_sender, write_receiver) = mpsc::channel::<CommandMessage>(20); + let mut jabber_client = JabberClient::new(jid, password)?; + jabber_client.connect().await?; + let (write, read) = jabber_client.split(); + let client = Self { + client: jabber_client, + receiver: ReceiverStream::new(read_receiver), + sender: PollSender::new(write_sender), + pending_iqs: HashMap::new(), + }; + tokio::spawn(client.process_read(read, read_sender)); + tokio::spawn(client.process_write(write, write_receiver)); + Ok(client) + } + + pub async fn process_read( + &self, + mut stream: SplitStream<JabberClient>, + sender: mpsc::Sender<UpdateMessage>, + ) { + for stanza in stream.next().await { + tokio::spawn(self.process_stanza(stanza, sender.clone())); + } + } + + pub async fn process_write( + &self, + mut sink: SplitSink<JabberClient, Stanza>, + receiver: mpsc::Receiver<CommandMessage>, + ) { + for message in receiver.recv_many(, ) + } +} + +pub enum Error { + PollSend(PollSendError<CommandMessage>), + Jabber(jabber::Error), +} + +impl From<jabber::Error> for Error { + fn from(e: jabber::Error) -> Self { + Self::Jabber(e) + } +} + +impl Stream for Client { + type Item = UpdateMessage; + + fn poll_next( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll<Option<Self::Item>> { + pin!(self).receiver.poll_next_unpin(cx) + } +} + +impl Sink<CommandMessage> for Client { + type Error = Error; + + fn poll_ready( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> Poll<Result<(), Self::Error>> { + Poll::Ready(ready!(pin!(self).sender.poll_ready_unpin(cx))) + } + + fn start_send(self: std::pin::Pin<&mut Self>, item: CommandMessage) -> Result<(), Self::Error> { + todo!() + } + + fn poll_flush( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> Poll<Result<(), Self::Error>> { + todo!() + } + + fn poll_close( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> Poll<Result<(), Self::Error>> { + todo!() + } +} + +impl From<PollSendError<CommandMessage>> for Error { + fn from(e: PollSendError<CommandMessage>) -> Self { + Self::PollSend(e) + } +} + +pub enum CommandMessage { + Connect, + GetRoster, + SendMessage(JID, String), +} + +pub enum UpdateMessage { + Roster(Vec<roster::Item>), +} + +impl Client { + pub async fn process_stanza( + &mut self, + stanza: Result<Stanza, jabber::Error>, + sender: mpsc::Sender<UpdateMessage>, + ) { + match stanza { + Ok(stanza) => todo!(), + Err(e) => self.process_error(e), + } + } + + pub async fn iq( + &mut self, + to: Option<JID>, + r#type: IqType, + query: Option<Query>, + ) -> Result<IqResponse, Error> { + self.client + .send(Stanza::Iq(Iq { + from: Some(self.client.jid()), + // TODO: generate id + id: "test".to_string(), + to, + r#type, + // TODO: lang + lang: None, + query, + errors: Vec::new(), + })) + .await?; + Ok(todo!()) + } + + pub async fn iq_process(&mut self, iq: Iq) {} +} diff --git a/stanza/src/client/iq.rs b/stanza/src/client/iq.rs index 6ee80ea..2e87636 100644 --- a/stanza/src/client/iq.rs +++ b/stanza/src/client/iq.rs @@ -9,6 +9,7 @@ use peanuts::{ use crate::{ bind::{self, Bind}, client::error::Error, + roster, xep_0199::{self, Ping}, }; @@ -31,6 +32,7 @@ pub struct Iq { pub enum Query { Bind(Bind), Ping(Ping), + Roster(roster::Query), Unsupported, } @@ -39,6 +41,9 @@ impl FromElement for Query { match element.identify() { (Some(bind::XMLNS), "bind") => Ok(Query::Bind(Bind::from_element(element)?)), (Some(xep_0199::XMLNS), "ping") => Ok(Query::Ping(Ping::from_element(element)?)), + (Some(roster::XMLNS), "query") => { + Ok(Query::Roster(roster::Query::from_element(element)?)) + } _ => Ok(Query::Unsupported), } } @@ -49,6 +54,7 @@ impl IntoElement for Query { match self { Query::Bind(bind) => bind.builder(), Query::Ping(ping) => ping.builder(), + Query::Roster(query) => query.builder(), // TODO: consider what to do if attempt to serialize unsupported Query::Unsupported => todo!(), } diff --git a/stanza/src/roster.rs b/stanza/src/roster.rs index b49fcc3..9209fad 100644 --- a/stanza/src/roster.rs +++ b/stanza/src/roster.rs @@ -8,6 +8,7 @@ use peanuts::{ pub const XMLNS: &str = "jabber:iq:roster"; +#[derive(Debug, Clone)] pub struct Query { ver: Option<String>, items: Vec<Item>, @@ -33,7 +34,7 @@ impl IntoElement for Query { } } -#[derive(Clone)] +#[derive(Clone, Debug)] pub struct Item { approved: Option<bool>, ask: bool, @@ -95,7 +96,7 @@ impl IntoElement for Item { } } -#[derive(Default, Clone, Copy)] +#[derive(Default, Clone, Copy, Debug)] pub enum Subscription { Both, From, @@ -132,7 +133,7 @@ impl ToString for Subscription { } } -#[derive(Clone)] +#[derive(Clone, Debug)] pub struct Group(Option<String>); impl FromElement for Group { |