From 595d165479b8b12e456f39205d8433b822b07487 Mon Sep 17 00:00:00 2001 From: cel 🌸 Date: Fri, 6 Dec 2024 06:31:20 +0000 Subject: implement sink and stream properly UNFOLD UNFOLD --- jabber/src/jabber_stream.rs | 33 ++++++++++++++------------------- 1 file changed, 14 insertions(+), 19 deletions(-) (limited to 'jabber/src/jabber_stream.rs') diff --git a/jabber/src/jabber_stream.rs b/jabber/src/jabber_stream.rs index dd0dcbf..d981f8f 100644 --- a/jabber/src/jabber_stream.rs +++ b/jabber/src/jabber_stream.rs @@ -2,7 +2,7 @@ use std::pin::pin; use std::str::{self, FromStr}; use std::sync::Arc; -use futures::StreamExt; +use futures::{sink, stream, StreamExt}; use jid::JID; use peanuts::element::{FromContent, IntoElement}; use peanuts::{Reader, Writer}; @@ -22,28 +22,14 @@ use crate::connection::{Tls, Unencrypted}; use crate::error::Error; use crate::Result; +pub mod bound_stream; + // open stream (streams started) pub struct JabberStream { reader: Reader>, writer: Writer>, } -impl futures::Stream for JabberStream { - type Item = Result; - - fn poll_next( - self: std::pin::Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> std::task::Poll> { - pin!(self).reader.poll_next_unpin(cx).map(|content| { - content.map(|content| -> Result { - let stanza = content.map(|content| Stanza::from_content(content))?; - Ok(stanza?) - }) - }) - } -} - impl JabberStream where S: AsyncRead + AsyncWrite + Unpin + Send + std::fmt::Debug, @@ -327,7 +313,8 @@ mod tests { use std::time::Duration; use super::*; - use crate::connection::Connection; + use crate::{client::JabberClient, connection::Connection}; + use futures::sink; use test_log::test; use tokio::time::sleep; @@ -378,7 +365,15 @@ mod tests { } #[tokio::test] - async fn negotiate() { + async fn sink() { + let mut client = JabberClient::new("test@blos.sm", "slayed").unwrap(); + client.connect().await.unwrap(); + let stream = client.inner().unwrap(); + let sink = sink::unfold(stream, |mut stream, stanza: Stanza| async move { + stream.writer.write(&stanza).await?; + Ok::, Error>(stream) + }); + todo!() // let _jabber = Connection::connect_user("test@blos.sm", "slayed".to_string()) // .await // .unwrap() -- cgit