diff options
author | 2024-12-06 06:31:20 +0000 | |
---|---|---|
committer | 2024-12-06 06:31:20 +0000 | |
commit | 595d165479b8b12e456f39205d8433b822b07487 (patch) | |
tree | a1a112f92c738ff7b22e97d9f71dda71fe85cf76 /jabber/src/jabber_stream.rs | |
parent | aaf34b5bcad1a897bb6fb704aab6b9cd6f6c4620 (diff) | |
download | luz-595d165479b8b12e456f39205d8433b822b07487.tar.gz luz-595d165479b8b12e456f39205d8433b822b07487.tar.bz2 luz-595d165479b8b12e456f39205d8433b822b07487.zip |
implement sink and stream properly UNFOLD UNFOLD
Diffstat (limited to 'jabber/src/jabber_stream.rs')
-rw-r--r-- | jabber/src/jabber_stream.rs | 33 |
1 files changed, 14 insertions, 19 deletions
diff --git a/jabber/src/jabber_stream.rs b/jabber/src/jabber_stream.rs index dd0dcbf..d981f8f 100644 --- a/jabber/src/jabber_stream.rs +++ b/jabber/src/jabber_stream.rs @@ -2,7 +2,7 @@ use std::pin::pin; use std::str::{self, FromStr}; use std::sync::Arc; -use futures::StreamExt; +use futures::{sink, stream, StreamExt}; use jid::JID; use peanuts::element::{FromContent, IntoElement}; use peanuts::{Reader, Writer}; @@ -22,28 +22,14 @@ use crate::connection::{Tls, Unencrypted}; use crate::error::Error; use crate::Result; +pub mod bound_stream; + // open stream (streams started) pub struct JabberStream<S> { reader: Reader<ReadHalf<S>>, writer: Writer<WriteHalf<S>>, } -impl<S: AsyncRead> futures::Stream for JabberStream<S> { - type Item = Result<Stanza>; - - fn poll_next( - self: std::pin::Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> std::task::Poll<Option<Self::Item>> { - pin!(self).reader.poll_next_unpin(cx).map(|content| { - content.map(|content| -> Result<Stanza> { - let stanza = content.map(|content| Stanza::from_content(content))?; - Ok(stanza?) - }) - }) - } -} - impl<S> JabberStream<S> where S: AsyncRead + AsyncWrite + Unpin + Send + std::fmt::Debug, @@ -327,7 +313,8 @@ mod tests { use std::time::Duration; use super::*; - use crate::connection::Connection; + use crate::{client::JabberClient, connection::Connection}; + use futures::sink; use test_log::test; use tokio::time::sleep; @@ -378,7 +365,15 @@ mod tests { } #[tokio::test] - async fn negotiate() { + async fn sink() { + let mut client = JabberClient::new("test@blos.sm", "slayed").unwrap(); + client.connect().await.unwrap(); + let stream = client.inner().unwrap(); + let sink = sink::unfold(stream, |mut stream, stanza: Stanza| async move { + stream.writer.write(&stanza).await?; + Ok::<JabberStream<Tls>, Error>(stream) + }); + todo!() // let _jabber = Connection::connect_user("test@blos.sm", "slayed".to_string()) // .await // .unwrap() |