diff options
| author | 2025-01-02 17:48:12 +0000 | |
|---|---|---|
| committer | 2025-01-02 17:48:12 +0000 | |
| commit | 0e5f09b2bd05690f3d28f7076629031fcc2cc6e6 (patch) | |
| tree | 29409764a94d570c8d9e929f1751a678355cb6ee | |
| parent | 89351e956368ad3112127570ef03dd2547730ce5 (diff) | |
| download | luz-0e5f09b2bd05690f3d28f7076629031fcc2cc6e6.tar.gz luz-0e5f09b2bd05690f3d28f7076629031fcc2cc6e6.tar.bz2 luz-0e5f09b2bd05690f3d28f7076629031fcc2cc6e6.zip | |
WIP: client
| -rw-r--r-- | README.md | 1 | ||||
| -rw-r--r-- | jabber/src/client.rs | 5 | ||||
| -rw-r--r-- | jabber/src/jabber_stream/bound_stream.rs | 1 | ||||
| -rw-r--r-- | luz/Cargo.toml | 6 | ||||
| -rw-r--r-- | luz/src/lib.rs | 171 | ||||
| -rw-r--r-- | stanza/src/client/iq.rs | 6 | ||||
| -rw-r--r-- | stanza/src/roster.rs | 7 | 
7 files changed, 194 insertions, 3 deletions
| @@ -20,6 +20,7 @@  - [x] rfc 7590: tls  - [x] xep-0368: srv records for xmpp over tls  - [ ] server side downgrade protection for sasl +- [x] xep-0199: xmpp ping  - [ ] xep-0030: service discovery  - [ ] xep-0115: entity capabilities  - [ ] xep-0163: pep diff --git a/jabber/src/client.rs b/jabber/src/client.rs index f5d5dc7..2e59d98 100644 --- a/jabber/src/client.rs +++ b/jabber/src/client.rs @@ -26,6 +26,7 @@ use crate::{  pub struct JabberClient {      connection: ConnectionState,      jid: JID, +    // TODO: have reconnection be handled by another part, so creds don't need to be stored in object      password: Arc<SASLConfig>,      server: String,  } @@ -49,6 +50,10 @@ impl JabberClient {          })      } +    pub fn jid(&self) -> JID { +        self.jid.clone() +    } +      pub async fn connect(&mut self) -> Result<()> {          match &self.connection {              ConnectionState::Disconnected => { diff --git a/jabber/src/jabber_stream/bound_stream.rs b/jabber/src/jabber_stream/bound_stream.rs index c0d67b0..627158a 100644 --- a/jabber/src/jabber_stream/bound_stream.rs +++ b/jabber/src/jabber_stream/bound_stream.rs @@ -63,6 +63,7 @@ where          if let Some(_write_handle) = this.write_handle {              panic!("start_send called without poll_ready")          } else { +            // TODO: switch to buffer of one rather than thread spawning and joining              *this.write_handle = Some(tokio::spawn(write(this.writer.clone(), item)));              Ok(())          } diff --git a/luz/Cargo.toml b/luz/Cargo.toml index 11d4197..646d8e8 100644 --- a/luz/Cargo.toml +++ b/luz/Cargo.toml @@ -4,3 +4,9 @@ version = "0.1.0"  edition = "2021"  [dependencies] +futures = "0.3.31" +jabber = { version = "0.1.0", path = "../jabber" } +stanza = { version = "0.1.0", path = "../stanza" } +tokio = "1.42.0" +tokio-stream = "0.1.17" +tokio-util = "0.7.13" diff --git a/luz/src/lib.rs b/luz/src/lib.rs new file mode 100644 index 0000000..1a750a3 --- /dev/null +++ b/luz/src/lib.rs @@ -0,0 +1,171 @@ +use std::{ +    collections::{HashMap, HashSet, VecDeque}, +    pin::pin, +    task::{ready, Poll}, +    thread::JoinHandle, +}; + +use futures::{ +    stream::{SplitSink, SplitStream}, +    Sink, SinkExt, Stream, StreamExt, +}; +use jabber::{client::JabberClient, JID}; +use stanza::{ +    client::{ +        iq::{Iq, IqType, Query}, +        Stanza, +    }, +    roster, +}; +use tokio::sync::mpsc; +use tokio_stream::wrappers::ReceiverStream; +use tokio_util::sync::{PollSendError, PollSender}; + +pub struct Client { +    client: JabberClient, +    pending_iqs: HashMap<String, mpsc::Sender<Iq>>, +    // database connection (sqlite) +    receiver: ReceiverStream<UpdateMessage>, +    sender: PollSender<CommandMessage>, +} + +impl Client { +    pub async fn new(jid: String, password: &str) -> Result<Self, Error> { +        let (read_sender, read_receiver) = mpsc::channel::<UpdateMessage>(20); +        let (write_sender, write_receiver) = mpsc::channel::<CommandMessage>(20); +        let mut jabber_client = JabberClient::new(jid, password)?; +        jabber_client.connect().await?; +        let (write, read) = jabber_client.split(); +        let client = Self { +            client: jabber_client, +            receiver: ReceiverStream::new(read_receiver), +            sender: PollSender::new(write_sender), +            pending_iqs: HashMap::new(), +        }; +        tokio::spawn(client.process_read(read, read_sender)); +        tokio::spawn(client.process_write(write, write_receiver)); +        Ok(client) +    } + +    pub async fn process_read( +        &self, +        mut stream: SplitStream<JabberClient>, +        sender: mpsc::Sender<UpdateMessage>, +    ) { +        for stanza in stream.next().await { +            tokio::spawn(self.process_stanza(stanza, sender.clone())); +        } +    } + +    pub async fn process_write( +        &self, +        mut sink: SplitSink<JabberClient, Stanza>, +        receiver: mpsc::Receiver<CommandMessage>, +    ) { +        for message in receiver.recv_many(, ) +    } +} + +pub enum Error { +    PollSend(PollSendError<CommandMessage>), +    Jabber(jabber::Error), +} + +impl From<jabber::Error> for Error { +    fn from(e: jabber::Error) -> Self { +        Self::Jabber(e) +    } +} + +impl Stream for Client { +    type Item = UpdateMessage; + +    fn poll_next( +        self: std::pin::Pin<&mut Self>, +        cx: &mut std::task::Context<'_>, +    ) -> std::task::Poll<Option<Self::Item>> { +        pin!(self).receiver.poll_next_unpin(cx) +    } +} + +impl Sink<CommandMessage> for Client { +    type Error = Error; + +    fn poll_ready( +        self: std::pin::Pin<&mut Self>, +        cx: &mut std::task::Context<'_>, +    ) -> Poll<Result<(), Self::Error>> { +        Poll::Ready(ready!(pin!(self).sender.poll_ready_unpin(cx))) +    } + +    fn start_send(self: std::pin::Pin<&mut Self>, item: CommandMessage) -> Result<(), Self::Error> { +        todo!() +    } + +    fn poll_flush( +        self: std::pin::Pin<&mut Self>, +        cx: &mut std::task::Context<'_>, +    ) -> Poll<Result<(), Self::Error>> { +        todo!() +    } + +    fn poll_close( +        self: std::pin::Pin<&mut Self>, +        cx: &mut std::task::Context<'_>, +    ) -> Poll<Result<(), Self::Error>> { +        todo!() +    } +} + +impl From<PollSendError<CommandMessage>> for Error { +    fn from(e: PollSendError<CommandMessage>) -> Self { +        Self::PollSend(e) +    } +} + +pub enum CommandMessage { +    Connect, +    GetRoster, +    SendMessage(JID, String), +} + +pub enum UpdateMessage { +    Roster(Vec<roster::Item>), +} + +impl Client { +    pub async fn process_stanza( +        &mut self, +        stanza: Result<Stanza, jabber::Error>, +        sender: mpsc::Sender<UpdateMessage>, +    ) { +        match stanza { +            Ok(stanza) => todo!(), +            Err(e) => self.process_error(e), +        } +    } + +    pub async fn iq( +        &mut self, +        to: Option<JID>, +        r#type: IqType, +        query: Option<Query>, +    ) -> Result<IqResponse, Error> { +        self.client +            .send(Stanza::Iq(Iq { +                from: Some(self.client.jid()), +                // TODO: generate id +                id: "test".to_string(), +                to, +                r#type, +                // TODO: lang +                lang: None, +                query, +                errors: Vec::new(), +            })) +            .await?; +        Ok(todo!()) +    } + +    pub async fn iq_process(&mut self, iq: Iq) {} +} diff --git a/stanza/src/client/iq.rs b/stanza/src/client/iq.rs index 6ee80ea..2e87636 100644 --- a/stanza/src/client/iq.rs +++ b/stanza/src/client/iq.rs @@ -9,6 +9,7 @@ use peanuts::{  use crate::{      bind::{self, Bind},      client::error::Error, +    roster,      xep_0199::{self, Ping},  }; @@ -31,6 +32,7 @@ pub struct Iq {  pub enum Query {      Bind(Bind),      Ping(Ping), +    Roster(roster::Query),      Unsupported,  } @@ -39,6 +41,9 @@ impl FromElement for Query {          match element.identify() {              (Some(bind::XMLNS), "bind") => Ok(Query::Bind(Bind::from_element(element)?)),              (Some(xep_0199::XMLNS), "ping") => Ok(Query::Ping(Ping::from_element(element)?)), +            (Some(roster::XMLNS), "query") => { +                Ok(Query::Roster(roster::Query::from_element(element)?)) +            }              _ => Ok(Query::Unsupported),          }      } @@ -49,6 +54,7 @@ impl IntoElement for Query {          match self {              Query::Bind(bind) => bind.builder(),              Query::Ping(ping) => ping.builder(), +            Query::Roster(query) => query.builder(),              // TODO: consider what to do if attempt to serialize unsupported              Query::Unsupported => todo!(),          } diff --git a/stanza/src/roster.rs b/stanza/src/roster.rs index b49fcc3..9209fad 100644 --- a/stanza/src/roster.rs +++ b/stanza/src/roster.rs @@ -8,6 +8,7 @@ use peanuts::{  pub const XMLNS: &str = "jabber:iq:roster"; +#[derive(Debug, Clone)]  pub struct Query {      ver: Option<String>,      items: Vec<Item>, @@ -33,7 +34,7 @@ impl IntoElement for Query {      }  } -#[derive(Clone)] +#[derive(Clone, Debug)]  pub struct Item {      approved: Option<bool>,      ask: bool, @@ -95,7 +96,7 @@ impl IntoElement for Item {      }  } -#[derive(Default, Clone, Copy)] +#[derive(Default, Clone, Copy, Debug)]  pub enum Subscription {      Both,      From, @@ -132,7 +133,7 @@ impl ToString for Subscription {      }  } -#[derive(Clone)] +#[derive(Clone, Debug)]  pub struct Group(Option<String>);  impl FromElement for Group { | 
