diff options
| author | 2024-12-06 06:31:20 +0000 | |
|---|---|---|
| committer | 2024-12-06 06:31:20 +0000 | |
| commit | 595d165479b8b12e456f39205d8433b822b07487 (patch) | |
| tree | a1a112f92c738ff7b22e97d9f71dda71fe85cf76 /jabber/src | |
| parent | aaf34b5bcad1a897bb6fb704aab6b9cd6f6c4620 (diff) | |
| download | luz-595d165479b8b12e456f39205d8433b822b07487.tar.gz luz-595d165479b8b12e456f39205d8433b822b07487.tar.bz2 luz-595d165479b8b12e456f39205d8433b822b07487.zip | |
implement sink and stream properly UNFOLD UNFOLD
Diffstat (limited to '')
| -rw-r--r-- | jabber/src/client.rs | 24 | ||||
| -rw-r--r-- | jabber/src/error.rs | 1 | ||||
| -rw-r--r-- | jabber/src/jabber_stream.rs | 33 | ||||
| -rw-r--r-- | jabber/src/jabber_stream/bound_stream.rs | 153 | 
4 files changed, 176 insertions, 35 deletions
| diff --git a/jabber/src/client.rs b/jabber/src/client.rs index c8b0b73..c6cab07 100644 --- a/jabber/src/client.rs +++ b/jabber/src/client.rs @@ -56,6 +56,14 @@ impl JabberClient {          }      } +    pub(crate) fn inner(self) -> Result<JabberStream<Tls>> { +        match self.connection { +            ConnectionState::Disconnected => return Err(Error::Disconnected), +            ConnectionState::Connecting(_connecting) => return Err(Error::Connecting), +            ConnectionState::Connected(jabber_stream) => return Ok(jabber_stream), +        } +    } +      pub async fn send_stanza(&mut self, stanza: &Stanza) -> Result<()> {          match &mut self.connection {              ConnectionState::Disconnected => return Err(Error::Disconnected), @@ -67,22 +75,6 @@ impl JabberClient {      }  } -impl Stream for JabberClient { -    type Item = Result<Stanza>; - -    fn poll_next( -        self: std::pin::Pin<&mut Self>, -        cx: &mut std::task::Context<'_>, -    ) -> std::task::Poll<Option<Self::Item>> { -        let mut client = pin!(self); -        match &mut client.connection { -            ConnectionState::Disconnected => Poll::Pending, -            ConnectionState::Connecting(_connecting) => Poll::Pending, -            ConnectionState::Connected(jabber_stream) => jabber_stream.poll_next_unpin(cx), -        } -    } -} -  pub enum ConnectionState {      Disconnected,      Connecting(Connecting), diff --git a/jabber/src/error.rs b/jabber/src/error.rs index aad033c..6671fe6 100644 --- a/jabber/src/error.rs +++ b/jabber/src/error.rs @@ -16,6 +16,7 @@ pub enum Error {      Unsupported,      NoLocalpart,      AlreadyConnecting, +    StreamClosed,      UnexpectedElement(peanuts::Element),      XML(peanuts::Error),      Deserialization(peanuts::DeserializeError), diff --git a/jabber/src/jabber_stream.rs b/jabber/src/jabber_stream.rs index dd0dcbf..d981f8f 100644 --- a/jabber/src/jabber_stream.rs +++ b/jabber/src/jabber_stream.rs @@ -2,7 +2,7 @@ use std::pin::pin;  use std::str::{self, FromStr};  use std::sync::Arc; -use futures::StreamExt; +use futures::{sink, stream, StreamExt};  use jid::JID;  use peanuts::element::{FromContent, IntoElement};  use peanuts::{Reader, Writer}; @@ -22,28 +22,14 @@ use crate::connection::{Tls, Unencrypted};  use crate::error::Error;  use crate::Result; +pub mod bound_stream; +  // open stream (streams started)  pub struct JabberStream<S> {      reader: Reader<ReadHalf<S>>,      writer: Writer<WriteHalf<S>>,  } -impl<S: AsyncRead> futures::Stream for JabberStream<S> { -    type Item = Result<Stanza>; - -    fn poll_next( -        self: std::pin::Pin<&mut Self>, -        cx: &mut std::task::Context<'_>, -    ) -> std::task::Poll<Option<Self::Item>> { -        pin!(self).reader.poll_next_unpin(cx).map(|content| { -            content.map(|content| -> Result<Stanza> { -                let stanza = content.map(|content| Stanza::from_content(content))?; -                Ok(stanza?) -            }) -        }) -    } -} -  impl<S> JabberStream<S>  where      S: AsyncRead + AsyncWrite + Unpin + Send + std::fmt::Debug, @@ -327,7 +313,8 @@ mod tests {      use std::time::Duration;      use super::*; -    use crate::connection::Connection; +    use crate::{client::JabberClient, connection::Connection}; +    use futures::sink;      use test_log::test;      use tokio::time::sleep; @@ -378,7 +365,15 @@ mod tests {      }      #[tokio::test] -    async fn negotiate() { +    async fn sink() { +        let mut client = JabberClient::new("test@blos.sm", "slayed").unwrap(); +        client.connect().await.unwrap(); +        let stream = client.inner().unwrap(); +        let sink = sink::unfold(stream, |mut stream, stanza: Stanza| async move { +            stream.writer.write(&stanza).await?; +            Ok::<JabberStream<Tls>, Error>(stream) +        }); +        todo!()          // let _jabber = Connection::connect_user("test@blos.sm", "slayed".to_string())          //     .await          //     .unwrap() diff --git a/jabber/src/jabber_stream/bound_stream.rs b/jabber/src/jabber_stream/bound_stream.rs new file mode 100644 index 0000000..ca93421 --- /dev/null +++ b/jabber/src/jabber_stream/bound_stream.rs @@ -0,0 +1,153 @@ +use std::pin::pin; +use std::pin::Pin; +use std::sync::Arc; + +use futures::{sink, stream, Sink, Stream}; +use peanuts::{Reader, Writer}; +use pin_project::pin_project; +use stanza::client::Stanza; +use tokio::io::{AsyncRead, AsyncWrite, ReadHalf, WriteHalf}; +use tokio::sync::Mutex; + +use crate::Error; + +use super::JabberStream; + +#[pin_project] +pub struct BoundJabberStream<R, W, S> +where +    R: Stream, +    W: Sink<Stanza>, +    S: AsyncWrite + AsyncRead + Unpin + Send, +{ +    reader: Arc<Mutex<Option<Reader<ReadHalf<S>>>>>, +    writer: Arc<Mutex<Option<Writer<WriteHalf<S>>>>>, +    stream: R, +    sink: W, +} + +impl<R, W, S> BoundJabberStream<R, W, S> +where +    R: Stream, +    W: Sink<Stanza>, +    S: AsyncWrite + AsyncRead + Unpin + Send, +{ +    // TODO: look into biased mutex, to close stream ASAP +    pub async fn close_stream(self) -> Result<JabberStream<S>, Error> { +        if let Some(reader) = self.reader.lock().await.take() { +            if let Some(writer) = self.writer.lock().await.take() { +                // TODO: writer </stream:stream> +                return Ok(JabberStream { reader, writer }); +            } +        } +        return Err(Error::StreamClosed); +    } +} + +pub trait JabberStreamTrait: AsyncWrite + AsyncRead + Unpin + Send {} + +impl<R, W, S> Sink<Stanza> for BoundJabberStream<R, W, S> +where +    R: Stream, +    W: Sink<Stanza> + Unpin, +    S: AsyncWrite + AsyncRead + Unpin + Send, +{ +    type Error = <W as Sink<Stanza>>::Error; + +    fn poll_ready( +        self: std::pin::Pin<&mut Self>, +        cx: &mut std::task::Context<'_>, +    ) -> std::task::Poll<Result<(), Self::Error>> { +        let this = self.project(); +        pin!(this.sink).poll_ready(cx) +    } + +    fn start_send(self: std::pin::Pin<&mut Self>, item: Stanza) -> Result<(), Self::Error> { +        let this = self.project(); +        pin!(this.sink).start_send(item) +    } + +    fn poll_flush( +        self: std::pin::Pin<&mut Self>, +        cx: &mut std::task::Context<'_>, +    ) -> std::task::Poll<Result<(), Self::Error>> { +        let this = self.project(); +        pin!(this.sink).poll_flush(cx) +    } + +    fn poll_close( +        self: std::pin::Pin<&mut Self>, +        cx: &mut std::task::Context<'_>, +    ) -> std::task::Poll<Result<(), Self::Error>> { +        let this = self.project(); +        pin!(this.sink).poll_close(cx) +    } +} + +impl<R, W, S> Stream for BoundJabberStream<R, W, S> +where +    R: Stream + Unpin, +    W: Sink<Stanza>, +    S: AsyncWrite + AsyncRead + Unpin + Send, +{ +    type Item = <R as Stream>::Item; + +    fn poll_next( +        self: Pin<&mut Self>, +        cx: &mut std::task::Context<'_>, +    ) -> std::task::Poll<Option<Self::Item>> { +        let this = self.project(); +        pin!(this.stream).poll_next(cx) +    } +} + +impl<S> JabberStream<S> +where +    S: AsyncWrite + AsyncRead + Unpin + Send, +{ +    pub fn to_bound_jabber(self) -> BoundJabberStream<impl Stream, impl Sink<Stanza>, S> { +        let reader = Arc::new(Mutex::new(Some(self.reader))); +        let writer = Arc::new(Mutex::new(Some(self.writer))); +        let sink = sink::unfold(writer.clone(), |writer, s: Stanza| async move { +            write(writer, s).await +        }); +        let stream = stream::unfold(reader.clone(), |reader| async { read(reader).await }); +        BoundJabberStream { +            sink, +            stream, +            writer, +            reader, +        } +    } +} + +pub async fn write<W: AsyncWrite + Unpin + Send>( +    writer: Arc<Mutex<Option<Writer<WriteHalf<W>>>>>, +    stanza: Stanza, +) -> Result<Arc<Mutex<Option<Writer<WriteHalf<W>>>>>, Error> { +    { +        if let Some(writer) = writer.lock().await.as_mut() { +            writer.write(&stanza).await?; +        } else { +            return Err(Error::StreamClosed); +        } +    } +    Ok(writer) +} + +pub async fn read<R: AsyncRead + Unpin + Send>( +    reader: Arc<Mutex<Option<Reader<ReadHalf<R>>>>>, +) -> Option<( +    Result<Stanza, Error>, +    Arc<Mutex<Option<Reader<ReadHalf<R>>>>>, +)> { +    let stanza: Result<Stanza, Error>; +    { +        if let Some(reader) = reader.lock().await.as_mut() { +            stanza = reader.read().await.map_err(|e| e.into()); +        } else { +            stanza = Err(Error::StreamClosed) +        }; +    } +    Some((stanza, reader)) +} | 
