aboutsummaryrefslogtreecommitdiffstats
path: root/jabber/src
diff options
context:
space:
mode:
Diffstat (limited to 'jabber/src')
-rw-r--r--jabber/src/client.rs181
-rw-r--r--jabber/src/connection.rs182
-rw-r--r--jabber/src/error.rs58
-rw-r--r--jabber/src/jabber_stream.rs482
-rw-r--r--jabber/src/jabber_stream/bound_stream.rs87
-rw-r--r--jabber/src/lib.rs25
6 files changed, 0 insertions, 1015 deletions
diff --git a/jabber/src/client.rs b/jabber/src/client.rs
deleted file mode 100644
index de2be08..0000000
--- a/jabber/src/client.rs
+++ /dev/null
@@ -1,181 +0,0 @@
-use rsasl::config::SASLConfig;
-use stanza::{
- sasl::Mechanisms,
- stream::{Feature, Features},
-};
-
-use crate::{
- connection::{Tls, Unencrypted},
- jabber_stream::bound_stream::BoundJabberStream,
- Connection, Error, JabberStream, Result, JID,
-};
-
-pub async fn connect_and_login(
- jid: &mut JID,
- password: impl AsRef<str>,
- server: &mut String,
-) -> Result<BoundJabberStream<Tls>> {
- let auth = SASLConfig::with_credentials(
- None,
- jid.localpart.clone().ok_or(Error::NoLocalpart)?,
- password.as_ref().to_string(),
- )
- .map_err(|e| Error::SASL(e.into()))?;
- let mut conn_state = Connecting::start(&server).await?;
- loop {
- match conn_state {
- Connecting::InsecureConnectionEstablised(tcp_stream) => {
- conn_state = Connecting::InsecureStreamStarted(
- JabberStream::start_stream(tcp_stream, server).await?,
- )
- }
- Connecting::InsecureStreamStarted(jabber_stream) => {
- conn_state = Connecting::InsecureGotFeatures(jabber_stream.get_features().await?)
- }
- Connecting::InsecureGotFeatures((features, jabber_stream)) => {
- match features.negotiate().ok_or(Error::Negotiation)? {
- Feature::StartTls(_start_tls) => {
- conn_state = Connecting::StartTls(jabber_stream)
- }
- // TODO: better error
- _ => return Err(Error::TlsRequired),
- }
- }
- Connecting::StartTls(jabber_stream) => {
- conn_state =
- Connecting::ConnectionEstablished(jabber_stream.starttls(&server).await?)
- }
- Connecting::ConnectionEstablished(tls_stream) => {
- conn_state =
- Connecting::StreamStarted(JabberStream::start_stream(tls_stream, server).await?)
- }
- Connecting::StreamStarted(jabber_stream) => {
- conn_state = Connecting::GotFeatures(jabber_stream.get_features().await?)
- }
- Connecting::GotFeatures((features, jabber_stream)) => {
- match features.negotiate().ok_or(Error::Negotiation)? {
- Feature::StartTls(_start_tls) => return Err(Error::AlreadyTls),
- Feature::Sasl(mechanisms) => {
- conn_state = Connecting::Sasl(mechanisms, jabber_stream)
- }
- Feature::Bind => conn_state = Connecting::Bind(jabber_stream),
- Feature::Unknown => return Err(Error::Unsupported),
- }
- }
- Connecting::Sasl(mechanisms, jabber_stream) => {
- conn_state = Connecting::ConnectionEstablished(
- jabber_stream.sasl(mechanisms, auth.clone()).await?,
- )
- }
- Connecting::Bind(jabber_stream) => {
- return Ok(jabber_stream.bind(jid).await?.to_bound_jabber());
- }
- }
- }
-}
-
-pub enum Connecting {
- InsecureConnectionEstablised(Unencrypted),
- InsecureStreamStarted(JabberStream<Unencrypted>),
- InsecureGotFeatures((Features, JabberStream<Unencrypted>)),
- StartTls(JabberStream<Unencrypted>),
- ConnectionEstablished(Tls),
- StreamStarted(JabberStream<Tls>),
- GotFeatures((Features, JabberStream<Tls>)),
- Sasl(Mechanisms, JabberStream<Tls>),
- Bind(JabberStream<Tls>),
-}
-
-impl Connecting {
- pub async fn start(server: &str) -> Result<Self> {
- match Connection::connect(server).await? {
- Connection::Encrypted(tls_stream) => Ok(Connecting::ConnectionEstablished(tls_stream)),
- Connection::Unencrypted(tcp_stream) => {
- Ok(Connecting::InsecureConnectionEstablised(tcp_stream))
- }
- }
- }
-}
-
-pub enum InsecureConnecting {
- Disconnected,
- ConnectionEstablished(Connection),
- PreStarttls(JabberStream<Unencrypted>),
- PreAuthenticated(JabberStream<Tls>),
- Authenticated(Tls),
- PreBound(JabberStream<Tls>),
- Bound(JabberStream<Tls>),
-}
-
-#[cfg(test)]
-mod tests {
- use std::time::Duration;
-
- use jid::JID;
- use stanza::{
- client::{
- iq::{Iq, IqType, Query},
- Stanza,
- },
- xep_0199::Ping,
- };
- use test_log::test;
- use tokio::time::sleep;
- use tracing::info;
-
- use super::connect_and_login;
-
- #[test(tokio::test)]
- async fn login() {
- let mut jid: JID = "test@blos.sm".try_into().unwrap();
- let _client = connect_and_login(&mut jid, "slayed", &mut "blos.sm".to_string())
- .await
- .unwrap();
- sleep(Duration::from_secs(5)).await
- }
-
- #[test(tokio::test)]
- async fn ping_parallel() {
- let mut jid: JID = "test@blos.sm".try_into().unwrap();
- let mut server = "blos.sm".to_string();
- let client = connect_and_login(&mut jid, "slayed", &mut server)
- .await
- .unwrap();
- let (mut read, mut write) = client.split();
-
- tokio::join!(
- async {
- write
- .write(&Stanza::Iq(Iq {
- from: Some(jid.clone()),
- id: "c2s1".to_string(),
- to: Some(server.clone().try_into().unwrap()),
- r#type: IqType::Get,
- lang: None,
- query: Some(Query::Ping(Ping)),
- errors: Vec::new(),
- }))
- .await
- .unwrap();
- write
- .write(&Stanza::Iq(Iq {
- from: Some(jid.clone()),
- id: "c2s2".to_string(),
- to: Some(server.clone().try_into().unwrap()),
- r#type: IqType::Get,
- lang: None,
- query: Some(Query::Ping(Ping)),
- errors: Vec::new(),
- }))
- .await
- .unwrap();
- },
- async {
- for _ in 0..2 {
- let stanza = read.read::<Stanza>().await.unwrap();
- info!("ping reply: {:#?}", stanza);
- }
- }
- );
- }
-}
diff --git a/jabber/src/connection.rs b/jabber/src/connection.rs
deleted file mode 100644
index b185eca..0000000
--- a/jabber/src/connection.rs
+++ /dev/null
@@ -1,182 +0,0 @@
-use std::net::{IpAddr, SocketAddr};
-use std::str;
-use std::str::FromStr;
-
-use tokio::net::TcpStream;
-use tokio_native_tls::native_tls::TlsConnector;
-// TODO: use rustls
-use tokio_native_tls::TlsStream;
-use tracing::{debug, info, instrument, trace};
-
-use crate::Result;
-use crate::{Error, JID};
-
-pub type Tls = TlsStream<TcpStream>;
-pub type Unencrypted = TcpStream;
-
-#[derive(Debug)]
-pub enum Connection {
- Encrypted(Tls),
- Unencrypted(Unencrypted),
-}
-
-impl Connection {
- // #[instrument]
- /// stream not started
- // pub async fn ensure_tls(self) -> Result<Jabber<Tls>> {
- // match self {
- // Connection::Encrypted(j) => Ok(j),
- // Connection::Unencrypted(mut j) => {
- // j.start_stream().await?;
- // info!("upgrading connection to tls");
- // j.get_features().await?;
- // let j = j.starttls().await?;
- // Ok(j)
- // }
- // }
- // }
-
- pub async fn connect_user(jid: impl AsRef<str>) -> Result<Self> {
- let jid: JID = JID::from_str(jid.as_ref())?;
- let server = jid.domainpart.clone();
- Self::connect(&server).await
- }
-
- #[instrument]
- pub async fn connect(server: impl AsRef<str> + std::fmt::Debug) -> Result<Self> {
- info!("connecting to {}", server.as_ref());
- let sockets = Self::get_sockets(server.as_ref()).await;
- debug!("discovered sockets: {:?}", sockets);
- for (socket_addr, tls) in sockets {
- match tls {
- true => {
- if let Ok(connection) = Self::connect_tls(socket_addr, server.as_ref()).await {
- info!("connected via encrypted stream to {}", socket_addr);
- // let (readhalf, writehalf) = tokio::io::split(connection);
- return Ok(Self::Encrypted(connection));
- }
- }
- false => {
- if let Ok(connection) = Self::connect_unencrypted(socket_addr).await {
- info!("connected via unencrypted stream to {}", socket_addr);
- // let (readhalf, writehalf) = tokio::io::split(connection);
- return Ok(Self::Unencrypted(connection));
- }
- }
- }
- }
- Err(Error::Connection)
- }
-
- #[instrument]
- async fn get_sockets(address: &str) -> Vec<(SocketAddr, bool)> {
- let mut socket_addrs = Vec::new();
-
- // if it's a socket/ip then just return that
-
- // socket
- trace!("checking if address is a socket address");
- if let Ok(socket_addr) = SocketAddr::from_str(address) {
- debug!("{} is a socket address", address);
- match socket_addr.port() {
- 5223 => socket_addrs.push((socket_addr, true)),
- _ => socket_addrs.push((socket_addr, false)),
- }
-
- return socket_addrs;
- }
- // ip
- trace!("checking if address is an ip");
- if let Ok(ip) = IpAddr::from_str(address) {
- debug!("{} is an ip", address);
- socket_addrs.push((SocketAddr::new(ip, 5222), false));
- socket_addrs.push((SocketAddr::new(ip, 5223), true));
- return socket_addrs;
- }
-
- // otherwise resolve
- debug!("resolving {}", address);
- if let Ok(resolver) = trust_dns_resolver::AsyncResolver::tokio_from_system_conf() {
- if let Ok(lookup) = resolver
- .srv_lookup(format!("_xmpp-client._tcp.{}", address))
- .await
- {
- for srv in lookup {
- resolver
- .lookup_ip(srv.target().to_owned())
- .await
- .map(|ips| {
- for ip in ips {
- socket_addrs.push((SocketAddr::new(ip, srv.port()), false))
- }
- });
- }
- }
- if let Ok(lookup) = resolver
- .srv_lookup(format!("_xmpps-client._tcp.{}", address))
- .await
- {
- for srv in lookup {
- resolver
- .lookup_ip(srv.target().to_owned())
- .await
- .map(|ips| {
- for ip in ips {
- socket_addrs.push((SocketAddr::new(ip, srv.port()), true))
- }
- });
- }
- }
-
- // in case cannot connect through SRV records
- resolver.lookup_ip(address).await.map(|ips| {
- for ip in ips {
- socket_addrs.push((SocketAddr::new(ip, 5222), false));
- socket_addrs.push((SocketAddr::new(ip, 5223), true));
- }
- });
- }
- socket_addrs
- }
-
- /// establishes a connection to the server
- #[instrument]
- pub async fn connect_tls(socket_addr: SocketAddr, domain_name: &str) -> Result<Tls> {
- let socket = TcpStream::connect(socket_addr)
- .await
- .map_err(|_| Error::Connection)?;
- let connector = TlsConnector::new().map_err(|_| Error::Connection)?;
- tokio_native_tls::TlsConnector::from(connector)
- .connect(domain_name, socket)
- .await
- .map_err(|_| Error::Connection)
- }
-
- #[instrument]
- pub async fn connect_unencrypted(socket_addr: SocketAddr) -> Result<Unencrypted> {
- TcpStream::connect(socket_addr)
- .await
- .map_err(|_| Error::Connection)
- }
-}
-
-#[cfg(test)]
-mod tests {
- use super::*;
- use test_log::test;
-
- #[test(tokio::test)]
- async fn connect() {
- Connection::connect("blos.sm").await.unwrap();
- }
-
- // #[test(tokio::test)]
- // async fn test_tls() {
- // Connection::connect("blos.sm", None, None)
- // .await
- // .unwrap()
- // .ensure_tls()
- // .await
- // .unwrap();
- // }
-}
diff --git a/jabber/src/error.rs b/jabber/src/error.rs
deleted file mode 100644
index ec60778..0000000
--- a/jabber/src/error.rs
+++ /dev/null
@@ -1,58 +0,0 @@
-use std::str::Utf8Error;
-use std::sync::Arc;
-
-use jid::ParseError;
-use rsasl::mechname::MechanismNameError;
-use stanza::client::error::Error as ClientError;
-use stanza::sasl::Failure;
-use stanza::stream::Error as StreamError;
-use thiserror::Error;
-
-#[derive(Error, Debug, Clone)]
-pub enum Error {
- #[error("connection")]
- Connection,
- #[error("utf8 decode: {0}")]
- Utf8Decode(#[from] Utf8Error),
- #[error("negotiation")]
- Negotiation,
- #[error("tls required")]
- TlsRequired,
- #[error("already connected with tls")]
- AlreadyTls,
- // TODO: specify unsupported feature
- #[error("unsupported feature")]
- Unsupported,
- #[error("jid missing localpart")]
- NoLocalpart,
- #[error("received unexpected element: {0:?}")]
- UnexpectedElement(peanuts::Element),
- #[error("xml error: {0}")]
- XML(#[from] peanuts::Error),
- #[error("sasl error: {0}")]
- SASL(#[from] SASLError),
- #[error("jid error: {0}")]
- JID(#[from] ParseError),
- #[error("client stanza error: {0}")]
- ClientError(#[from] ClientError),
- #[error("stream error: {0}")]
- StreamError(#[from] StreamError),
- #[error("error missing")]
- MissingError,
-}
-
-#[derive(Error, Debug, Clone)]
-pub enum SASLError {
- #[error("sasl error: {0}")]
- SASL(Arc<rsasl::prelude::SASLError>),
- #[error("mechanism error: {0}")]
- MechanismName(#[from] MechanismNameError),
- #[error("authentication failure: {0}")]
- Authentication(#[from] Failure),
-}
-
-impl From<rsasl::prelude::SASLError> for SASLError {
- fn from(e: rsasl::prelude::SASLError) -> Self {
- Self::SASL(Arc::new(e))
- }
-}
diff --git a/jabber/src/jabber_stream.rs b/jabber/src/jabber_stream.rs
deleted file mode 100644
index 302350d..0000000
--- a/jabber/src/jabber_stream.rs
+++ /dev/null
@@ -1,482 +0,0 @@
-use std::str::{self, FromStr};
-use std::sync::Arc;
-
-use jid::JID;
-use peanuts::element::IntoElement;
-use peanuts::{Reader, Writer};
-use rsasl::prelude::{Mechname, SASLClient, SASLConfig};
-use stanza::bind::{Bind, BindType, FullJidType, ResourceType};
-use stanza::client::iq::{Iq, IqType, Query};
-use stanza::client::Stanza;
-use stanza::sasl::{Auth, Challenge, Mechanisms, Response, ServerResponse};
-use stanza::starttls::{Proceed, StartTls};
-use stanza::stream::{Features, Stream};
-use stanza::XML_VERSION;
-use tokio::io::{AsyncRead, AsyncWrite, ReadHalf, WriteHalf};
-use tokio_native_tls::native_tls::TlsConnector;
-use tracing::{debug, instrument};
-
-use crate::connection::{Tls, Unencrypted};
-use crate::error::Error;
-use crate::Result;
-
-pub mod bound_stream;
-
-// open stream (streams started)
-pub struct JabberStream<S> {
- reader: JabberReader<S>,
- writer: JabberWriter<S>,
-}
-
-impl<S> JabberStream<S> {
- fn split(self) -> (JabberReader<S>, JabberWriter<S>) {
- let reader = self.reader;
- let writer = self.writer;
- (reader, writer)
- }
-}
-
-pub struct JabberReader<S>(Reader<ReadHalf<S>>);
-
-impl<S> JabberReader<S> {
- // TODO: consider taking a readhalf and creating peanuts::Reader here, only one inner
- fn new(reader: Reader<ReadHalf<S>>) -> Self {
- Self(reader)
- }
-
- fn unsplit(self, writer: JabberWriter<S>) -> JabberStream<S> {
- JabberStream {
- reader: self,
- writer,
- }
- }
-
- fn into_inner(self) -> Reader<ReadHalf<S>> {
- self.0
- }
-}
-
-impl<S> JabberReader<S>
-where
- S: AsyncRead + Unpin,
-{
- pub async fn try_close(&mut self) -> Result<()> {
- self.read_end_tag().await?;
- Ok(())
- }
-}
-
-impl<S> std::ops::Deref for JabberReader<S> {
- type Target = Reader<ReadHalf<S>>;
-
- fn deref(&self) -> &Self::Target {
- &self.0
- }
-}
-
-impl<S> std::ops::DerefMut for JabberReader<S> {
- fn deref_mut(&mut self) -> &mut Self::Target {
- &mut self.0
- }
-}
-
-pub struct JabberWriter<S>(Writer<WriteHalf<S>>);
-
-impl<S> JabberWriter<S> {
- fn new(writer: Writer<WriteHalf<S>>) -> Self {
- Self(writer)
- }
-
- fn unsplit(self, reader: JabberReader<S>) -> JabberStream<S> {
- JabberStream {
- reader,
- writer: self,
- }
- }
-
- fn into_inner(self) -> Writer<WriteHalf<S>> {
- self.0
- }
-}
-
-impl<S> JabberWriter<S>
-where
- S: AsyncWrite + Unpin + Send,
-{
- pub async fn try_close(&mut self) -> Result<()> {
- self.write_end().await?;
- Ok(())
- }
-}
-
-impl<S> std::ops::Deref for JabberWriter<S> {
- type Target = Writer<WriteHalf<S>>;
-
- fn deref(&self) -> &Self::Target {
- &self.0
- }
-}
-
-impl<S> std::ops::DerefMut for JabberWriter<S> {
- fn deref_mut(&mut self) -> &mut Self::Target {
- &mut self.0
- }
-}
-
-impl<S> JabberStream<S>
-where
- S: AsyncRead + AsyncWrite + Unpin + Send + std::fmt::Debug,
- JabberStream<S>: std::fmt::Debug,
-{
- #[instrument]
- pub async fn sasl(mut self, mechanisms: Mechanisms, sasl_config: Arc<SASLConfig>) -> Result<S> {
- let sasl = SASLClient::new(sasl_config);
- let mut offered_mechs: Vec<&Mechname> = Vec::new();
- for mechanism in &mechanisms.mechanisms {
- offered_mechs
- .push(Mechname::parse(mechanism.as_bytes()).map_err(|e| Error::SASL(e.into()))?)
- }
- debug!("{:?}", offered_mechs);
- let mut session = sasl
- .start_suggested(&offered_mechs)
- .map_err(|e| Error::SASL(e.into()))?;
- let selected_mechanism = session.get_mechname().as_str().to_owned();
- debug!("selected mech: {:?}", selected_mechanism);
- let mut data: Option<Vec<u8>>;
-
- if !session.are_we_first() {
- // if not first mention the mechanism then get challenge data
- // mention mechanism
- let auth = Auth {
- mechanism: selected_mechanism,
- sasl_data: "=".to_string(),
- };
- self.writer.write_full(&auth).await?;
- // get challenge data
- let challenge: Challenge = self.reader.read().await?;
- debug!("challenge: {:?}", challenge);
- data = Some((*challenge).as_bytes().to_vec());
- debug!("we didn't go first");
- } else {
- // if first, mention mechanism and send data
- let mut sasl_data = Vec::new();
- session.step64(None, &mut sasl_data).unwrap();
- let auth = Auth {
- mechanism: selected_mechanism,
- sasl_data: str::from_utf8(&sasl_data)?.to_string(),
- };
- debug!("{:?}", auth);
- self.writer.write_full(&auth).await?;
-
- let server_response: ServerResponse = self.reader.read().await?;
- debug!("server_response: {:#?}", server_response);
- match server_response {
- ServerResponse::Challenge(challenge) => {
- data = Some((*challenge).as_bytes().to_vec())
- }
- ServerResponse::Success(success) => {
- data = success.clone().map(|success| success.as_bytes().to_vec())
- }
- ServerResponse::Failure(failure) => return Err(Error::SASL(failure.into())),
- }
- debug!("we went first");
- }
-
- // stepping the authentication exchange to completion
- if data != None {
- debug!("data: {:?}", data);
- let mut sasl_data = Vec::new();
- while {
- // decide if need to send more data over
- let state = session
- .step64(data.as_deref(), &mut sasl_data)
- .expect("step errored!");
- state.is_running()
- } {
- // While we aren't finished, receive more data from the other party
- let response = Response::new(str::from_utf8(&sasl_data)?.to_string());
- debug!("response: {:?}", response);
- self.writer.write_full(&response).await?;
- debug!("response written");
-
- let server_response: ServerResponse = self.reader.read().await?;
- debug!("server_response: {:#?}", server_response);
- match server_response {
- ServerResponse::Challenge(challenge) => {
- data = Some((*challenge).as_bytes().to_vec())
- }
- ServerResponse::Success(success) => {
- data = success.clone().map(|success| success.as_bytes().to_vec())
- }
- ServerResponse::Failure(failure) => return Err(Error::SASL(failure.into())),
- }
- }
- }
- let writer = self.writer.into_inner().into_inner();
- let reader = self.reader.into_inner().into_inner();
- let stream = reader.unsplit(writer);
- Ok(stream)
- }
-
- #[instrument]
- pub async fn bind(mut self, jid: &mut JID) -> Result<Self> {
- let iq_id = nanoid::nanoid!();
- if let Some(resource) = &jid.resourcepart {
- let iq = Iq {
- from: None,
- id: iq_id.clone(),
- to: None,
- r#type: IqType::Set,
- lang: None,
- query: Some(Query::Bind(Bind {
- r#type: Some(BindType::Resource(ResourceType(resource.to_string()))),
- })),
- errors: Vec::new(),
- };
- self.writer.write_full(&iq).await?;
- let result: Iq = self.reader.read().await?;
- match result {
- Iq {
- from: _,
- id,
- to: _,
- r#type: IqType::Result,
- lang: _,
- query:
- Some(Query::Bind(Bind {
- r#type: Some(BindType::Jid(FullJidType(new_jid))),
- })),
- errors: _,
- } if id == iq_id => {
- *jid = new_jid;
- return Ok(self);
- }
- Iq {
- from: _,
- id,
- to: _,
- r#type: IqType::Error,
- lang: _,
- query: None,
- errors,
- } if id == iq_id => {
- return Err(Error::ClientError(
- errors.first().ok_or(Error::MissingError)?.clone(),
- ))
- }
- _ => return Err(Error::UnexpectedElement(result.into_element())),
- }
- } else {
- let iq = Iq {
- from: None,
- id: iq_id.clone(),
- to: None,
- r#type: IqType::Set,
- lang: None,
- query: Some(Query::Bind(Bind { r#type: None })),
- errors: Vec::new(),
- };
- self.writer.write_full(&iq).await?;
- let result: Iq = self.reader.read().await?;
- match result {
- Iq {
- from: _,
- id,
- to: _,
- r#type: IqType::Result,
- lang: _,
- query:
- Some(Query::Bind(Bind {
- r#type: Some(BindType::Jid(FullJidType(new_jid))),
- })),
- errors: _,
- } if id == iq_id => {
- *jid = new_jid;
- return Ok(self);
- }
- Iq {
- from: _,
- id,
- to: _,
- r#type: IqType::Error,
- lang: _,
- query: None,
- errors,
- } if id == iq_id => {
- return Err(Error::ClientError(
- errors.first().ok_or(Error::MissingError)?.clone(),
- ))
- }
- _ => return Err(Error::UnexpectedElement(result.into_element())),
- }
- }
- }
-
- #[instrument]
- pub async fn start_stream(connection: S, server: &mut String) -> Result<Self> {
- // client to server
- let (reader, writer) = tokio::io::split(connection);
- let mut reader = JabberReader::new(Reader::new(reader));
- let mut writer = JabberWriter::new(Writer::new(writer));
-
- // declaration
- writer.write_declaration(XML_VERSION).await?;
-
- // opening stream element
- let stream = Stream::new_client(
- None,
- JID::from_str(server.as_ref())?,
- None,
- "en".to_string(),
- );
- writer.write_start(&stream).await?;
-
- // server to client
-
- // may or may not send a declaration
- let _decl = reader.read_prolog().await?;
-
- // receive stream element and validate
- let stream: Stream = reader.read_start().await?;
- debug!("got stream: {:?}", stream);
- if let Some(from) = stream.from {
- *server = from.to_string();
- }
-
- Ok(Self { reader, writer })
- }
-
- #[instrument]
- pub async fn get_features(mut self) -> Result<(Features, Self)> {
- debug!("getting features");
- let features: Features = self.reader.read().await?;
- debug!("got features: {:?}", features);
- Ok((features, self))
- }
-
- pub fn into_inner(self) -> S {
- self.reader
- .into_inner()
- .into_inner()
- .unsplit(self.writer.into_inner().into_inner())
- }
-
- pub async fn send_stanza(&mut self, stanza: &Stanza) -> Result<()> {
- self.writer.write(stanza).await?;
- Ok(())
- }
-}
-
-impl JabberStream<Unencrypted> {
- #[instrument]
- pub async fn starttls(mut self, domain: impl AsRef<str> + std::fmt::Debug) -> Result<Tls> {
- self.writer
- .write_full(&StartTls { required: false })
- .await?;
- let proceed: Proceed = self.reader.read().await?;
- debug!("got proceed: {:?}", proceed);
- let connector = TlsConnector::new().unwrap();
- let stream = self
- .reader
- .into_inner()
- .into_inner()
- .unsplit(self.writer.into_inner().into_inner());
- if let Ok(tls_stream) = tokio_native_tls::TlsConnector::from(connector)
- .connect(domain.as_ref(), stream)
- .await
- {
- return Ok(tls_stream);
- } else {
- return Err(Error::Connection);
- }
- }
-}
-
-impl std::fmt::Debug for JabberStream<Tls> {
- fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
- f.debug_struct("Jabber")
- .field("connection", &"tls")
- .finish()
- }
-}
-
-impl std::fmt::Debug for JabberStream<Unencrypted> {
- fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
- f.debug_struct("Jabber")
- .field("connection", &"unencrypted")
- .finish()
- }
-}
-
-#[cfg(test)]
-mod tests {
- use test_log::test;
-
- #[test(tokio::test)]
- async fn start_stream() {
- // let connection = Connection::connect("blos.sm", None, None).await.unwrap();
- // match connection {
- // Connection::Encrypted(mut c) => c.start_stream().await.unwrap(),
- // Connection::Unencrypted(mut c) => c.start_stream().await.unwrap(),
- // }
- }
-
- #[test(tokio::test)]
- async fn sasl() {
- // let mut jabber = Connection::connect_user("test@blos.sm", "slayed".to_string())
- // .await
- // .unwrap()
- // .ensure_tls()
- // .await
- // .unwrap();
- // let text = str::from_utf8(jabber.reader.buffer.data()).unwrap();
- // println!("data: {}", text);
- // jabber.start_stream().await.unwrap();
-
- // let text = str::from_utf8(jabber.reader.buffer.data()).unwrap();
- // println!("data: {}", text);
- // jabber.reader.read_buf().await.unwrap();
- // let text = str::from_utf8(jabber.reader.buffer.data()).unwrap();
- // println!("data: {}", text);
-
- // let features = jabber.get_features().await.unwrap();
- // let (sasl_config, feature) = (
- // jabber.auth.clone().unwrap(),
- // features
- // .features
- // .iter()
- // .find(|feature| matches!(feature, Feature::Sasl(_)))
- // .unwrap(),
- // );
- // match feature {
- // Feature::StartTls(_start_tls) => todo!(),
- // Feature::Sasl(mechanisms) => {
- // jabber.sasl(mechanisms.clone(), sasl_config).await.unwrap();
- // }
- // Feature::Bind => todo!(),
- // Feature::Unknown => todo!(),
- // }
- }
-
- #[tokio::test]
- async fn sink() {
- // let mut client = JabberClient::new("test@blos.sm", "slayed").unwrap();
- // client.connect().await.unwrap();
- // let stream = client.inner().unwrap();
- // let sink = sink::unfold(stream, |mut stream, stanza: Stanza| async move {
- // stream.writer.write(&stanza).await?;
- // Ok::<JabberStream<Tls>, Error>(stream)
- // });
- // todo!()
- // let _jabber = Connection::connect_user("test@blos.sm", "slayed".to_string())
- // .await
- // .unwrap()
- // .ensure_tls()
- // .await
- // .unwrap()
- // .negotiate()
- // .await
- // .unwrap();
- // sleep(Duration::from_secs(5)).await
- }
-}
diff --git a/jabber/src/jabber_stream/bound_stream.rs b/jabber/src/jabber_stream/bound_stream.rs
deleted file mode 100644
index 25b79ff..0000000
--- a/jabber/src/jabber_stream/bound_stream.rs
+++ /dev/null
@@ -1,87 +0,0 @@
-use std::ops::{Deref, DerefMut};
-
-use tokio::io::{AsyncRead, AsyncWrite};
-
-use super::{JabberReader, JabberStream, JabberWriter};
-
-pub struct BoundJabberStream<S>(JabberStream<S>);
-
-impl<S> Deref for BoundJabberStream<S>
-where
- S: AsyncWrite + AsyncRead + Unpin + Send,
-{
- type Target = JabberStream<S>;
-
- fn deref(&self) -> &Self::Target {
- &self.0
- }
-}
-
-impl<S> DerefMut for BoundJabberStream<S>
-where
- S: AsyncWrite + AsyncRead + Unpin + Send,
-{
- fn deref_mut(&mut self) -> &mut Self::Target {
- &mut self.0
- }
-}
-
-impl<S> BoundJabberStream<S> {
- pub fn split(self) -> (BoundJabberReader<S>, BoundJabberWriter<S>) {
- let (reader, writer) = self.0.split();
- (BoundJabberReader(reader), BoundJabberWriter(writer))
- }
-}
-
-pub struct BoundJabberReader<S>(JabberReader<S>);
-
-impl<S> BoundJabberReader<S> {
- pub fn unsplit(self, writer: BoundJabberWriter<S>) -> BoundJabberStream<S> {
- BoundJabberStream(self.0.unsplit(writer.0))
- }
-}
-
-impl<S> std::ops::Deref for BoundJabberReader<S> {
- type Target = JabberReader<S>;
-
- fn deref(&self) -> &Self::Target {
- &self.0
- }
-}
-
-impl<S> std::ops::DerefMut for BoundJabberReader<S> {
- fn deref_mut(&mut self) -> &mut Self::Target {
- &mut self.0
- }
-}
-
-pub struct BoundJabberWriter<S>(JabberWriter<S>);
-
-impl<S> BoundJabberWriter<S> {
- pub fn unsplit(self, reader: BoundJabberReader<S>) -> BoundJabberStream<S> {
- BoundJabberStream(self.0.unsplit(reader.0))
- }
-}
-
-impl<S> std::ops::Deref for BoundJabberWriter<S> {
- type Target = JabberWriter<S>;
-
- fn deref(&self) -> &Self::Target {
- &self.0
- }
-}
-
-impl<S> std::ops::DerefMut for BoundJabberWriter<S> {
- fn deref_mut(&mut self) -> &mut Self::Target {
- &mut self.0
- }
-}
-
-impl<S> JabberStream<S>
-where
- S: AsyncWrite + AsyncRead + Unpin + Send,
-{
- pub fn to_bound_jabber(self) -> BoundJabberStream<S> {
- BoundJabberStream(self)
- }
-}
diff --git a/jabber/src/lib.rs b/jabber/src/lib.rs
deleted file mode 100644
index 8855ca7..0000000
--- a/jabber/src/lib.rs
+++ /dev/null
@@ -1,25 +0,0 @@
-#![allow(unused_must_use)]
-// #![feature(let_chains)]
-
-// TODO: logging (dropped errors)
-pub mod client;
-pub mod connection;
-pub mod error;
-pub mod jabber_stream;
-
-pub use connection::Connection;
-pub use error::Error;
-pub use jabber_stream::JabberStream;
-pub use jid::JID;
-
-pub type Result<T> = std::result::Result<T, Error>;
-
-pub use client::connect_and_login;
-
-#[cfg(test)]
-mod tests {
- // #[tokio::test]
- // async fn test_login() {
- // crate::login("test@blos.sm/clown", "slayed").await.unwrap();
- // }
-}