aboutsummaryrefslogtreecommitdiffstats
path: root/jabber/src/jabber_stream.rs
diff options
context:
space:
mode:
authorLibravatar cel 🌸 <cel@bunny.garden>2024-12-06 06:31:20 +0000
committerLibravatar cel 🌸 <cel@bunny.garden>2024-12-06 06:31:20 +0000
commit595d165479b8b12e456f39205d8433b822b07487 (patch)
treea1a112f92c738ff7b22e97d9f71dda71fe85cf76 /jabber/src/jabber_stream.rs
parentaaf34b5bcad1a897bb6fb704aab6b9cd6f6c4620 (diff)
downloadluz-595d165479b8b12e456f39205d8433b822b07487.tar.gz
luz-595d165479b8b12e456f39205d8433b822b07487.tar.bz2
luz-595d165479b8b12e456f39205d8433b822b07487.zip
implement sink and stream properly UNFOLD UNFOLD
Diffstat (limited to 'jabber/src/jabber_stream.rs')
-rw-r--r--jabber/src/jabber_stream.rs33
1 files changed, 14 insertions, 19 deletions
diff --git a/jabber/src/jabber_stream.rs b/jabber/src/jabber_stream.rs
index dd0dcbf..d981f8f 100644
--- a/jabber/src/jabber_stream.rs
+++ b/jabber/src/jabber_stream.rs
@@ -2,7 +2,7 @@ use std::pin::pin;
use std::str::{self, FromStr};
use std::sync::Arc;
-use futures::StreamExt;
+use futures::{sink, stream, StreamExt};
use jid::JID;
use peanuts::element::{FromContent, IntoElement};
use peanuts::{Reader, Writer};
@@ -22,28 +22,14 @@ use crate::connection::{Tls, Unencrypted};
use crate::error::Error;
use crate::Result;
+pub mod bound_stream;
+
// open stream (streams started)
pub struct JabberStream<S> {
reader: Reader<ReadHalf<S>>,
writer: Writer<WriteHalf<S>>,
}
-impl<S: AsyncRead> futures::Stream for JabberStream<S> {
- type Item = Result<Stanza>;
-
- fn poll_next(
- self: std::pin::Pin<&mut Self>,
- cx: &mut std::task::Context<'_>,
- ) -> std::task::Poll<Option<Self::Item>> {
- pin!(self).reader.poll_next_unpin(cx).map(|content| {
- content.map(|content| -> Result<Stanza> {
- let stanza = content.map(|content| Stanza::from_content(content))?;
- Ok(stanza?)
- })
- })
- }
-}
-
impl<S> JabberStream<S>
where
S: AsyncRead + AsyncWrite + Unpin + Send + std::fmt::Debug,
@@ -327,7 +313,8 @@ mod tests {
use std::time::Duration;
use super::*;
- use crate::connection::Connection;
+ use crate::{client::JabberClient, connection::Connection};
+ use futures::sink;
use test_log::test;
use tokio::time::sleep;
@@ -378,7 +365,15 @@ mod tests {
}
#[tokio::test]
- async fn negotiate() {
+ async fn sink() {
+ let mut client = JabberClient::new("test@blos.sm", "slayed").unwrap();
+ client.connect().await.unwrap();
+ let stream = client.inner().unwrap();
+ let sink = sink::unfold(stream, |mut stream, stanza: Stanza| async move {
+ stream.writer.write(&stanza).await?;
+ Ok::<JabberStream<Tls>, Error>(stream)
+ });
+ todo!()
// let _jabber = Connection::connect_user("test@blos.sm", "slayed".to_string())
// .await
// .unwrap()