diff options
Diffstat (limited to 'jabber/src')
| -rw-r--r-- | jabber/src/client.rs | 181 | ||||
| -rw-r--r-- | jabber/src/connection.rs | 182 | ||||
| -rw-r--r-- | jabber/src/error.rs | 58 | ||||
| -rw-r--r-- | jabber/src/jabber_stream.rs | 482 | ||||
| -rw-r--r-- | jabber/src/jabber_stream/bound_stream.rs | 87 | ||||
| -rw-r--r-- | jabber/src/lib.rs | 25 | 
6 files changed, 0 insertions, 1015 deletions
diff --git a/jabber/src/client.rs b/jabber/src/client.rs deleted file mode 100644 index de2be08..0000000 --- a/jabber/src/client.rs +++ /dev/null @@ -1,181 +0,0 @@ -use rsasl::config::SASLConfig; -use stanza::{ -    sasl::Mechanisms, -    stream::{Feature, Features}, -}; - -use crate::{ -    connection::{Tls, Unencrypted}, -    jabber_stream::bound_stream::BoundJabberStream, -    Connection, Error, JabberStream, Result, JID, -}; - -pub async fn connect_and_login( -    jid: &mut JID, -    password: impl AsRef<str>, -    server: &mut String, -) -> Result<BoundJabberStream<Tls>> { -    let auth = SASLConfig::with_credentials( -        None, -        jid.localpart.clone().ok_or(Error::NoLocalpart)?, -        password.as_ref().to_string(), -    ) -    .map_err(|e| Error::SASL(e.into()))?; -    let mut conn_state = Connecting::start(&server).await?; -    loop { -        match conn_state { -            Connecting::InsecureConnectionEstablised(tcp_stream) => { -                conn_state = Connecting::InsecureStreamStarted( -                    JabberStream::start_stream(tcp_stream, server).await?, -                ) -            } -            Connecting::InsecureStreamStarted(jabber_stream) => { -                conn_state = Connecting::InsecureGotFeatures(jabber_stream.get_features().await?) -            } -            Connecting::InsecureGotFeatures((features, jabber_stream)) => { -                match features.negotiate().ok_or(Error::Negotiation)? { -                    Feature::StartTls(_start_tls) => { -                        conn_state = Connecting::StartTls(jabber_stream) -                    } -                    // TODO: better error -                    _ => return Err(Error::TlsRequired), -                } -            } -            Connecting::StartTls(jabber_stream) => { -                conn_state = -                    Connecting::ConnectionEstablished(jabber_stream.starttls(&server).await?) -            } -            Connecting::ConnectionEstablished(tls_stream) => { -                conn_state = -                    Connecting::StreamStarted(JabberStream::start_stream(tls_stream, server).await?) -            } -            Connecting::StreamStarted(jabber_stream) => { -                conn_state = Connecting::GotFeatures(jabber_stream.get_features().await?) -            } -            Connecting::GotFeatures((features, jabber_stream)) => { -                match features.negotiate().ok_or(Error::Negotiation)? { -                    Feature::StartTls(_start_tls) => return Err(Error::AlreadyTls), -                    Feature::Sasl(mechanisms) => { -                        conn_state = Connecting::Sasl(mechanisms, jabber_stream) -                    } -                    Feature::Bind => conn_state = Connecting::Bind(jabber_stream), -                    Feature::Unknown => return Err(Error::Unsupported), -                } -            } -            Connecting::Sasl(mechanisms, jabber_stream) => { -                conn_state = Connecting::ConnectionEstablished( -                    jabber_stream.sasl(mechanisms, auth.clone()).await?, -                ) -            } -            Connecting::Bind(jabber_stream) => { -                return Ok(jabber_stream.bind(jid).await?.to_bound_jabber()); -            } -        } -    } -} - -pub enum Connecting { -    InsecureConnectionEstablised(Unencrypted), -    InsecureStreamStarted(JabberStream<Unencrypted>), -    InsecureGotFeatures((Features, JabberStream<Unencrypted>)), -    StartTls(JabberStream<Unencrypted>), -    ConnectionEstablished(Tls), -    StreamStarted(JabberStream<Tls>), -    GotFeatures((Features, JabberStream<Tls>)), -    Sasl(Mechanisms, JabberStream<Tls>), -    Bind(JabberStream<Tls>), -} - -impl Connecting { -    pub async fn start(server: &str) -> Result<Self> { -        match Connection::connect(server).await? { -            Connection::Encrypted(tls_stream) => Ok(Connecting::ConnectionEstablished(tls_stream)), -            Connection::Unencrypted(tcp_stream) => { -                Ok(Connecting::InsecureConnectionEstablised(tcp_stream)) -            } -        } -    } -} - -pub enum InsecureConnecting { -    Disconnected, -    ConnectionEstablished(Connection), -    PreStarttls(JabberStream<Unencrypted>), -    PreAuthenticated(JabberStream<Tls>), -    Authenticated(Tls), -    PreBound(JabberStream<Tls>), -    Bound(JabberStream<Tls>), -} - -#[cfg(test)] -mod tests { -    use std::time::Duration; - -    use jid::JID; -    use stanza::{ -        client::{ -            iq::{Iq, IqType, Query}, -            Stanza, -        }, -        xep_0199::Ping, -    }; -    use test_log::test; -    use tokio::time::sleep; -    use tracing::info; - -    use super::connect_and_login; - -    #[test(tokio::test)] -    async fn login() { -        let mut jid: JID = "test@blos.sm".try_into().unwrap(); -        let _client = connect_and_login(&mut jid, "slayed", &mut "blos.sm".to_string()) -            .await -            .unwrap(); -        sleep(Duration::from_secs(5)).await -    } - -    #[test(tokio::test)] -    async fn ping_parallel() { -        let mut jid: JID = "test@blos.sm".try_into().unwrap(); -        let mut server = "blos.sm".to_string(); -        let client = connect_and_login(&mut jid, "slayed", &mut server) -            .await -            .unwrap(); -        let (mut read, mut write) = client.split(); - -        tokio::join!( -            async { -                write -                    .write(&Stanza::Iq(Iq { -                        from: Some(jid.clone()), -                        id: "c2s1".to_string(), -                        to: Some(server.clone().try_into().unwrap()), -                        r#type: IqType::Get, -                        lang: None, -                        query: Some(Query::Ping(Ping)), -                        errors: Vec::new(), -                    })) -                    .await -                    .unwrap(); -                write -                    .write(&Stanza::Iq(Iq { -                        from: Some(jid.clone()), -                        id: "c2s2".to_string(), -                        to: Some(server.clone().try_into().unwrap()), -                        r#type: IqType::Get, -                        lang: None, -                        query: Some(Query::Ping(Ping)), -                        errors: Vec::new(), -                    })) -                    .await -                    .unwrap(); -            }, -            async { -                for _ in 0..2 { -                    let stanza = read.read::<Stanza>().await.unwrap(); -                    info!("ping reply: {:#?}", stanza); -                } -            } -        ); -    } -} diff --git a/jabber/src/connection.rs b/jabber/src/connection.rs deleted file mode 100644 index b185eca..0000000 --- a/jabber/src/connection.rs +++ /dev/null @@ -1,182 +0,0 @@ -use std::net::{IpAddr, SocketAddr}; -use std::str; -use std::str::FromStr; - -use tokio::net::TcpStream; -use tokio_native_tls::native_tls::TlsConnector; -// TODO: use rustls -use tokio_native_tls::TlsStream; -use tracing::{debug, info, instrument, trace}; - -use crate::Result; -use crate::{Error, JID}; - -pub type Tls = TlsStream<TcpStream>; -pub type Unencrypted = TcpStream; - -#[derive(Debug)] -pub enum Connection { -    Encrypted(Tls), -    Unencrypted(Unencrypted), -} - -impl Connection { -    // #[instrument] -    /// stream not started -    // pub async fn ensure_tls(self) -> Result<Jabber<Tls>> { -    //     match self { -    //         Connection::Encrypted(j) => Ok(j), -    //         Connection::Unencrypted(mut j) => { -    //             j.start_stream().await?; -    //             info!("upgrading connection to tls"); -    //             j.get_features().await?; -    //             let j = j.starttls().await?; -    //             Ok(j) -    //         } -    //     } -    // } - -    pub async fn connect_user(jid: impl AsRef<str>) -> Result<Self> { -        let jid: JID = JID::from_str(jid.as_ref())?; -        let server = jid.domainpart.clone(); -        Self::connect(&server).await -    } - -    #[instrument] -    pub async fn connect(server: impl AsRef<str> + std::fmt::Debug) -> Result<Self> { -        info!("connecting to {}", server.as_ref()); -        let sockets = Self::get_sockets(server.as_ref()).await; -        debug!("discovered sockets: {:?}", sockets); -        for (socket_addr, tls) in sockets { -            match tls { -                true => { -                    if let Ok(connection) = Self::connect_tls(socket_addr, server.as_ref()).await { -                        info!("connected via encrypted stream to {}", socket_addr); -                        // let (readhalf, writehalf) = tokio::io::split(connection); -                        return Ok(Self::Encrypted(connection)); -                    } -                } -                false => { -                    if let Ok(connection) = Self::connect_unencrypted(socket_addr).await { -                        info!("connected via unencrypted stream to {}", socket_addr); -                        // let (readhalf, writehalf) = tokio::io::split(connection); -                        return Ok(Self::Unencrypted(connection)); -                    } -                } -            } -        } -        Err(Error::Connection) -    } - -    #[instrument] -    async fn get_sockets(address: &str) -> Vec<(SocketAddr, bool)> { -        let mut socket_addrs = Vec::new(); - -        // if it's a socket/ip then just return that - -        // socket -        trace!("checking if address is a socket address"); -        if let Ok(socket_addr) = SocketAddr::from_str(address) { -            debug!("{} is a socket address", address); -            match socket_addr.port() { -                5223 => socket_addrs.push((socket_addr, true)), -                _ => socket_addrs.push((socket_addr, false)), -            } - -            return socket_addrs; -        } -        // ip -        trace!("checking if address is an ip"); -        if let Ok(ip) = IpAddr::from_str(address) { -            debug!("{} is an ip", address); -            socket_addrs.push((SocketAddr::new(ip, 5222), false)); -            socket_addrs.push((SocketAddr::new(ip, 5223), true)); -            return socket_addrs; -        } - -        // otherwise resolve -        debug!("resolving {}", address); -        if let Ok(resolver) = trust_dns_resolver::AsyncResolver::tokio_from_system_conf() { -            if let Ok(lookup) = resolver -                .srv_lookup(format!("_xmpp-client._tcp.{}", address)) -                .await -            { -                for srv in lookup { -                    resolver -                        .lookup_ip(srv.target().to_owned()) -                        .await -                        .map(|ips| { -                            for ip in ips { -                                socket_addrs.push((SocketAddr::new(ip, srv.port()), false)) -                            } -                        }); -                } -            } -            if let Ok(lookup) = resolver -                .srv_lookup(format!("_xmpps-client._tcp.{}", address)) -                .await -            { -                for srv in lookup { -                    resolver -                        .lookup_ip(srv.target().to_owned()) -                        .await -                        .map(|ips| { -                            for ip in ips { -                                socket_addrs.push((SocketAddr::new(ip, srv.port()), true)) -                            } -                        }); -                } -            } - -            // in case cannot connect through SRV records -            resolver.lookup_ip(address).await.map(|ips| { -                for ip in ips { -                    socket_addrs.push((SocketAddr::new(ip, 5222), false)); -                    socket_addrs.push((SocketAddr::new(ip, 5223), true)); -                } -            }); -        } -        socket_addrs -    } - -    /// establishes a connection to the server -    #[instrument] -    pub async fn connect_tls(socket_addr: SocketAddr, domain_name: &str) -> Result<Tls> { -        let socket = TcpStream::connect(socket_addr) -            .await -            .map_err(|_| Error::Connection)?; -        let connector = TlsConnector::new().map_err(|_| Error::Connection)?; -        tokio_native_tls::TlsConnector::from(connector) -            .connect(domain_name, socket) -            .await -            .map_err(|_| Error::Connection) -    } - -    #[instrument] -    pub async fn connect_unencrypted(socket_addr: SocketAddr) -> Result<Unencrypted> { -        TcpStream::connect(socket_addr) -            .await -            .map_err(|_| Error::Connection) -    } -} - -#[cfg(test)] -mod tests { -    use super::*; -    use test_log::test; - -    #[test(tokio::test)] -    async fn connect() { -        Connection::connect("blos.sm").await.unwrap(); -    } - -    // #[test(tokio::test)] -    // async fn test_tls() { -    //     Connection::connect("blos.sm", None, None) -    //         .await -    //         .unwrap() -    //         .ensure_tls() -    //         .await -    //         .unwrap(); -    // } -} diff --git a/jabber/src/error.rs b/jabber/src/error.rs deleted file mode 100644 index ec60778..0000000 --- a/jabber/src/error.rs +++ /dev/null @@ -1,58 +0,0 @@ -use std::str::Utf8Error; -use std::sync::Arc; - -use jid::ParseError; -use rsasl::mechname::MechanismNameError; -use stanza::client::error::Error as ClientError; -use stanza::sasl::Failure; -use stanza::stream::Error as StreamError; -use thiserror::Error; - -#[derive(Error, Debug, Clone)] -pub enum Error { -    #[error("connection")] -    Connection, -    #[error("utf8 decode: {0}")] -    Utf8Decode(#[from] Utf8Error), -    #[error("negotiation")] -    Negotiation, -    #[error("tls required")] -    TlsRequired, -    #[error("already connected with tls")] -    AlreadyTls, -    // TODO: specify unsupported feature -    #[error("unsupported feature")] -    Unsupported, -    #[error("jid missing localpart")] -    NoLocalpart, -    #[error("received unexpected element: {0:?}")] -    UnexpectedElement(peanuts::Element), -    #[error("xml error: {0}")] -    XML(#[from] peanuts::Error), -    #[error("sasl error: {0}")] -    SASL(#[from] SASLError), -    #[error("jid error: {0}")] -    JID(#[from] ParseError), -    #[error("client stanza error: {0}")] -    ClientError(#[from] ClientError), -    #[error("stream error: {0}")] -    StreamError(#[from] StreamError), -    #[error("error missing")] -    MissingError, -} - -#[derive(Error, Debug, Clone)] -pub enum SASLError { -    #[error("sasl error: {0}")] -    SASL(Arc<rsasl::prelude::SASLError>), -    #[error("mechanism error: {0}")] -    MechanismName(#[from] MechanismNameError), -    #[error("authentication failure: {0}")] -    Authentication(#[from] Failure), -} - -impl From<rsasl::prelude::SASLError> for SASLError { -    fn from(e: rsasl::prelude::SASLError) -> Self { -        Self::SASL(Arc::new(e)) -    } -} diff --git a/jabber/src/jabber_stream.rs b/jabber/src/jabber_stream.rs deleted file mode 100644 index 302350d..0000000 --- a/jabber/src/jabber_stream.rs +++ /dev/null @@ -1,482 +0,0 @@ -use std::str::{self, FromStr}; -use std::sync::Arc; - -use jid::JID; -use peanuts::element::IntoElement; -use peanuts::{Reader, Writer}; -use rsasl::prelude::{Mechname, SASLClient, SASLConfig}; -use stanza::bind::{Bind, BindType, FullJidType, ResourceType}; -use stanza::client::iq::{Iq, IqType, Query}; -use stanza::client::Stanza; -use stanza::sasl::{Auth, Challenge, Mechanisms, Response, ServerResponse}; -use stanza::starttls::{Proceed, StartTls}; -use stanza::stream::{Features, Stream}; -use stanza::XML_VERSION; -use tokio::io::{AsyncRead, AsyncWrite, ReadHalf, WriteHalf}; -use tokio_native_tls::native_tls::TlsConnector; -use tracing::{debug, instrument}; - -use crate::connection::{Tls, Unencrypted}; -use crate::error::Error; -use crate::Result; - -pub mod bound_stream; - -// open stream (streams started) -pub struct JabberStream<S> { -    reader: JabberReader<S>, -    writer: JabberWriter<S>, -} - -impl<S> JabberStream<S> { -    fn split(self) -> (JabberReader<S>, JabberWriter<S>) { -        let reader = self.reader; -        let writer = self.writer; -        (reader, writer) -    } -} - -pub struct JabberReader<S>(Reader<ReadHalf<S>>); - -impl<S> JabberReader<S> { -    // TODO: consider taking a readhalf and creating peanuts::Reader here, only one inner -    fn new(reader: Reader<ReadHalf<S>>) -> Self { -        Self(reader) -    } - -    fn unsplit(self, writer: JabberWriter<S>) -> JabberStream<S> { -        JabberStream { -            reader: self, -            writer, -        } -    } - -    fn into_inner(self) -> Reader<ReadHalf<S>> { -        self.0 -    } -} - -impl<S> JabberReader<S> -where -    S: AsyncRead + Unpin, -{ -    pub async fn try_close(&mut self) -> Result<()> { -        self.read_end_tag().await?; -        Ok(()) -    } -} - -impl<S> std::ops::Deref for JabberReader<S> { -    type Target = Reader<ReadHalf<S>>; - -    fn deref(&self) -> &Self::Target { -        &self.0 -    } -} - -impl<S> std::ops::DerefMut for JabberReader<S> { -    fn deref_mut(&mut self) -> &mut Self::Target { -        &mut self.0 -    } -} - -pub struct JabberWriter<S>(Writer<WriteHalf<S>>); - -impl<S> JabberWriter<S> { -    fn new(writer: Writer<WriteHalf<S>>) -> Self { -        Self(writer) -    } - -    fn unsplit(self, reader: JabberReader<S>) -> JabberStream<S> { -        JabberStream { -            reader, -            writer: self, -        } -    } - -    fn into_inner(self) -> Writer<WriteHalf<S>> { -        self.0 -    } -} - -impl<S> JabberWriter<S> -where -    S: AsyncWrite + Unpin + Send, -{ -    pub async fn try_close(&mut self) -> Result<()> { -        self.write_end().await?; -        Ok(()) -    } -} - -impl<S> std::ops::Deref for JabberWriter<S> { -    type Target = Writer<WriteHalf<S>>; - -    fn deref(&self) -> &Self::Target { -        &self.0 -    } -} - -impl<S> std::ops::DerefMut for JabberWriter<S> { -    fn deref_mut(&mut self) -> &mut Self::Target { -        &mut self.0 -    } -} - -impl<S> JabberStream<S> -where -    S: AsyncRead + AsyncWrite + Unpin + Send + std::fmt::Debug, -    JabberStream<S>: std::fmt::Debug, -{ -    #[instrument] -    pub async fn sasl(mut self, mechanisms: Mechanisms, sasl_config: Arc<SASLConfig>) -> Result<S> { -        let sasl = SASLClient::new(sasl_config); -        let mut offered_mechs: Vec<&Mechname> = Vec::new(); -        for mechanism in &mechanisms.mechanisms { -            offered_mechs -                .push(Mechname::parse(mechanism.as_bytes()).map_err(|e| Error::SASL(e.into()))?) -        } -        debug!("{:?}", offered_mechs); -        let mut session = sasl -            .start_suggested(&offered_mechs) -            .map_err(|e| Error::SASL(e.into()))?; -        let selected_mechanism = session.get_mechname().as_str().to_owned(); -        debug!("selected mech: {:?}", selected_mechanism); -        let mut data: Option<Vec<u8>>; - -        if !session.are_we_first() { -            // if not first mention the mechanism then get challenge data -            // mention mechanism -            let auth = Auth { -                mechanism: selected_mechanism, -                sasl_data: "=".to_string(), -            }; -            self.writer.write_full(&auth).await?; -            // get challenge data -            let challenge: Challenge = self.reader.read().await?; -            debug!("challenge: {:?}", challenge); -            data = Some((*challenge).as_bytes().to_vec()); -            debug!("we didn't go first"); -        } else { -            // if first, mention mechanism and send data -            let mut sasl_data = Vec::new(); -            session.step64(None, &mut sasl_data).unwrap(); -            let auth = Auth { -                mechanism: selected_mechanism, -                sasl_data: str::from_utf8(&sasl_data)?.to_string(), -            }; -            debug!("{:?}", auth); -            self.writer.write_full(&auth).await?; - -            let server_response: ServerResponse = self.reader.read().await?; -            debug!("server_response: {:#?}", server_response); -            match server_response { -                ServerResponse::Challenge(challenge) => { -                    data = Some((*challenge).as_bytes().to_vec()) -                } -                ServerResponse::Success(success) => { -                    data = success.clone().map(|success| success.as_bytes().to_vec()) -                } -                ServerResponse::Failure(failure) => return Err(Error::SASL(failure.into())), -            } -            debug!("we went first"); -        } - -        // stepping the authentication exchange to completion -        if data != None { -            debug!("data: {:?}", data); -            let mut sasl_data = Vec::new(); -            while { -                // decide if need to send more data over -                let state = session -                    .step64(data.as_deref(), &mut sasl_data) -                    .expect("step errored!"); -                state.is_running() -            } { -                // While we aren't finished, receive more data from the other party -                let response = Response::new(str::from_utf8(&sasl_data)?.to_string()); -                debug!("response: {:?}", response); -                self.writer.write_full(&response).await?; -                debug!("response written"); - -                let server_response: ServerResponse = self.reader.read().await?; -                debug!("server_response: {:#?}", server_response); -                match server_response { -                    ServerResponse::Challenge(challenge) => { -                        data = Some((*challenge).as_bytes().to_vec()) -                    } -                    ServerResponse::Success(success) => { -                        data = success.clone().map(|success| success.as_bytes().to_vec()) -                    } -                    ServerResponse::Failure(failure) => return Err(Error::SASL(failure.into())), -                } -            } -        } -        let writer = self.writer.into_inner().into_inner(); -        let reader = self.reader.into_inner().into_inner(); -        let stream = reader.unsplit(writer); -        Ok(stream) -    } - -    #[instrument] -    pub async fn bind(mut self, jid: &mut JID) -> Result<Self> { -        let iq_id = nanoid::nanoid!(); -        if let Some(resource) = &jid.resourcepart { -            let iq = Iq { -                from: None, -                id: iq_id.clone(), -                to: None, -                r#type: IqType::Set, -                lang: None, -                query: Some(Query::Bind(Bind { -                    r#type: Some(BindType::Resource(ResourceType(resource.to_string()))), -                })), -                errors: Vec::new(), -            }; -            self.writer.write_full(&iq).await?; -            let result: Iq = self.reader.read().await?; -            match result { -                Iq { -                    from: _, -                    id, -                    to: _, -                    r#type: IqType::Result, -                    lang: _, -                    query: -                        Some(Query::Bind(Bind { -                            r#type: Some(BindType::Jid(FullJidType(new_jid))), -                        })), -                    errors: _, -                } if id == iq_id => { -                    *jid = new_jid; -                    return Ok(self); -                } -                Iq { -                    from: _, -                    id, -                    to: _, -                    r#type: IqType::Error, -                    lang: _, -                    query: None, -                    errors, -                } if id == iq_id => { -                    return Err(Error::ClientError( -                        errors.first().ok_or(Error::MissingError)?.clone(), -                    )) -                } -                _ => return Err(Error::UnexpectedElement(result.into_element())), -            } -        } else { -            let iq = Iq { -                from: None, -                id: iq_id.clone(), -                to: None, -                r#type: IqType::Set, -                lang: None, -                query: Some(Query::Bind(Bind { r#type: None })), -                errors: Vec::new(), -            }; -            self.writer.write_full(&iq).await?; -            let result: Iq = self.reader.read().await?; -            match result { -                Iq { -                    from: _, -                    id, -                    to: _, -                    r#type: IqType::Result, -                    lang: _, -                    query: -                        Some(Query::Bind(Bind { -                            r#type: Some(BindType::Jid(FullJidType(new_jid))), -                        })), -                    errors: _, -                } if id == iq_id => { -                    *jid = new_jid; -                    return Ok(self); -                } -                Iq { -                    from: _, -                    id, -                    to: _, -                    r#type: IqType::Error, -                    lang: _, -                    query: None, -                    errors, -                } if id == iq_id => { -                    return Err(Error::ClientError( -                        errors.first().ok_or(Error::MissingError)?.clone(), -                    )) -                } -                _ => return Err(Error::UnexpectedElement(result.into_element())), -            } -        } -    } - -    #[instrument] -    pub async fn start_stream(connection: S, server: &mut String) -> Result<Self> { -        // client to server -        let (reader, writer) = tokio::io::split(connection); -        let mut reader = JabberReader::new(Reader::new(reader)); -        let mut writer = JabberWriter::new(Writer::new(writer)); - -        // declaration -        writer.write_declaration(XML_VERSION).await?; - -        // opening stream element -        let stream = Stream::new_client( -            None, -            JID::from_str(server.as_ref())?, -            None, -            "en".to_string(), -        ); -        writer.write_start(&stream).await?; - -        // server to client - -        // may or may not send a declaration -        let _decl = reader.read_prolog().await?; - -        // receive stream element and validate -        let stream: Stream = reader.read_start().await?; -        debug!("got stream: {:?}", stream); -        if let Some(from) = stream.from { -            *server = from.to_string(); -        } - -        Ok(Self { reader, writer }) -    } - -    #[instrument] -    pub async fn get_features(mut self) -> Result<(Features, Self)> { -        debug!("getting features"); -        let features: Features = self.reader.read().await?; -        debug!("got features: {:?}", features); -        Ok((features, self)) -    } - -    pub fn into_inner(self) -> S { -        self.reader -            .into_inner() -            .into_inner() -            .unsplit(self.writer.into_inner().into_inner()) -    } - -    pub async fn send_stanza(&mut self, stanza: &Stanza) -> Result<()> { -        self.writer.write(stanza).await?; -        Ok(()) -    } -} - -impl JabberStream<Unencrypted> { -    #[instrument] -    pub async fn starttls(mut self, domain: impl AsRef<str> + std::fmt::Debug) -> Result<Tls> { -        self.writer -            .write_full(&StartTls { required: false }) -            .await?; -        let proceed: Proceed = self.reader.read().await?; -        debug!("got proceed: {:?}", proceed); -        let connector = TlsConnector::new().unwrap(); -        let stream = self -            .reader -            .into_inner() -            .into_inner() -            .unsplit(self.writer.into_inner().into_inner()); -        if let Ok(tls_stream) = tokio_native_tls::TlsConnector::from(connector) -            .connect(domain.as_ref(), stream) -            .await -        { -            return Ok(tls_stream); -        } else { -            return Err(Error::Connection); -        } -    } -} - -impl std::fmt::Debug for JabberStream<Tls> { -    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { -        f.debug_struct("Jabber") -            .field("connection", &"tls") -            .finish() -    } -} - -impl std::fmt::Debug for JabberStream<Unencrypted> { -    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { -        f.debug_struct("Jabber") -            .field("connection", &"unencrypted") -            .finish() -    } -} - -#[cfg(test)] -mod tests { -    use test_log::test; - -    #[test(tokio::test)] -    async fn start_stream() { -        // let connection = Connection::connect("blos.sm", None, None).await.unwrap(); -        // match connection { -        //     Connection::Encrypted(mut c) => c.start_stream().await.unwrap(), -        //     Connection::Unencrypted(mut c) => c.start_stream().await.unwrap(), -        // } -    } - -    #[test(tokio::test)] -    async fn sasl() { -        // let mut jabber = Connection::connect_user("test@blos.sm", "slayed".to_string()) -        //     .await -        //     .unwrap() -        //     .ensure_tls() -        //     .await -        //     .unwrap(); -        // let text = str::from_utf8(jabber.reader.buffer.data()).unwrap(); -        // println!("data: {}", text); -        // jabber.start_stream().await.unwrap(); - -        // let text = str::from_utf8(jabber.reader.buffer.data()).unwrap(); -        // println!("data: {}", text); -        // jabber.reader.read_buf().await.unwrap(); -        // let text = str::from_utf8(jabber.reader.buffer.data()).unwrap(); -        // println!("data: {}", text); - -        // let features = jabber.get_features().await.unwrap(); -        // let (sasl_config, feature) = ( -        //     jabber.auth.clone().unwrap(), -        //     features -        //         .features -        //         .iter() -        //         .find(|feature| matches!(feature, Feature::Sasl(_))) -        //         .unwrap(), -        // ); -        // match feature { -        //     Feature::StartTls(_start_tls) => todo!(), -        //     Feature::Sasl(mechanisms) => { -        //         jabber.sasl(mechanisms.clone(), sasl_config).await.unwrap(); -        //     } -        //     Feature::Bind => todo!(), -        //     Feature::Unknown => todo!(), -        // } -    } - -    #[tokio::test] -    async fn sink() { -        // let mut client = JabberClient::new("test@blos.sm", "slayed").unwrap(); -        // client.connect().await.unwrap(); -        // let stream = client.inner().unwrap(); -        // let sink = sink::unfold(stream, |mut stream, stanza: Stanza| async move { -        //     stream.writer.write(&stanza).await?; -        //     Ok::<JabberStream<Tls>, Error>(stream) -        // }); -        // todo!() -        // let _jabber = Connection::connect_user("test@blos.sm", "slayed".to_string()) -        //     .await -        //     .unwrap() -        //     .ensure_tls() -        //     .await -        //     .unwrap() -        //     .negotiate() -        //     .await -        //     .unwrap(); -        // sleep(Duration::from_secs(5)).await -    } -} diff --git a/jabber/src/jabber_stream/bound_stream.rs b/jabber/src/jabber_stream/bound_stream.rs deleted file mode 100644 index 25b79ff..0000000 --- a/jabber/src/jabber_stream/bound_stream.rs +++ /dev/null @@ -1,87 +0,0 @@ -use std::ops::{Deref, DerefMut}; - -use tokio::io::{AsyncRead, AsyncWrite}; - -use super::{JabberReader, JabberStream, JabberWriter}; - -pub struct BoundJabberStream<S>(JabberStream<S>); - -impl<S> Deref for BoundJabberStream<S> -where -    S: AsyncWrite + AsyncRead + Unpin + Send, -{ -    type Target = JabberStream<S>; - -    fn deref(&self) -> &Self::Target { -        &self.0 -    } -} - -impl<S> DerefMut for BoundJabberStream<S> -where -    S: AsyncWrite + AsyncRead + Unpin + Send, -{ -    fn deref_mut(&mut self) -> &mut Self::Target { -        &mut self.0 -    } -} - -impl<S> BoundJabberStream<S> { -    pub fn split(self) -> (BoundJabberReader<S>, BoundJabberWriter<S>) { -        let (reader, writer) = self.0.split(); -        (BoundJabberReader(reader), BoundJabberWriter(writer)) -    } -} - -pub struct BoundJabberReader<S>(JabberReader<S>); - -impl<S> BoundJabberReader<S> { -    pub fn unsplit(self, writer: BoundJabberWriter<S>) -> BoundJabberStream<S> { -        BoundJabberStream(self.0.unsplit(writer.0)) -    } -} - -impl<S> std::ops::Deref for BoundJabberReader<S> { -    type Target = JabberReader<S>; - -    fn deref(&self) -> &Self::Target { -        &self.0 -    } -} - -impl<S> std::ops::DerefMut for BoundJabberReader<S> { -    fn deref_mut(&mut self) -> &mut Self::Target { -        &mut self.0 -    } -} - -pub struct BoundJabberWriter<S>(JabberWriter<S>); - -impl<S> BoundJabberWriter<S> { -    pub fn unsplit(self, reader: BoundJabberReader<S>) -> BoundJabberStream<S> { -        BoundJabberStream(self.0.unsplit(reader.0)) -    } -} - -impl<S> std::ops::Deref for BoundJabberWriter<S> { -    type Target = JabberWriter<S>; - -    fn deref(&self) -> &Self::Target { -        &self.0 -    } -} - -impl<S> std::ops::DerefMut for BoundJabberWriter<S> { -    fn deref_mut(&mut self) -> &mut Self::Target { -        &mut self.0 -    } -} - -impl<S> JabberStream<S> -where -    S: AsyncWrite + AsyncRead + Unpin + Send, -{ -    pub fn to_bound_jabber(self) -> BoundJabberStream<S> { -        BoundJabberStream(self) -    } -} diff --git a/jabber/src/lib.rs b/jabber/src/lib.rs deleted file mode 100644 index 8855ca7..0000000 --- a/jabber/src/lib.rs +++ /dev/null @@ -1,25 +0,0 @@ -#![allow(unused_must_use)] -// #![feature(let_chains)] - -// TODO: logging (dropped errors) -pub mod client; -pub mod connection; -pub mod error; -pub mod jabber_stream; - -pub use connection::Connection; -pub use error::Error; -pub use jabber_stream::JabberStream; -pub use jid::JID; - -pub type Result<T> = std::result::Result<T, Error>; - -pub use client::connect_and_login; - -#[cfg(test)] -mod tests { -    // #[tokio::test] -    // async fn test_login() { -    //     crate::login("test@blos.sm/clown", "slayed").await.unwrap(); -    // } -}  | 
