aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorLibravatar cel 🌸 <cel@bunny.garden>2024-12-06 06:31:20 +0000
committerLibravatar cel 🌸 <cel@bunny.garden>2024-12-06 06:31:20 +0000
commit595d165479b8b12e456f39205d8433b822b07487 (patch)
treea1a112f92c738ff7b22e97d9f71dda71fe85cf76
parentaaf34b5bcad1a897bb6fb704aab6b9cd6f6c4620 (diff)
downloadluz-595d165479b8b12e456f39205d8433b822b07487.tar.gz
luz-595d165479b8b12e456f39205d8433b822b07487.tar.bz2
luz-595d165479b8b12e456f39205d8433b822b07487.zip
implement sink and stream properly UNFOLD UNFOLD
-rw-r--r--README.md2
-rw-r--r--jabber/Cargo.toml3
-rw-r--r--jabber/src/client.rs24
-rw-r--r--jabber/src/error.rs1
-rw-r--r--jabber/src/jabber_stream.rs33
-rw-r--r--jabber/src/jabber_stream/bound_stream.rs153
6 files changed, 181 insertions, 35 deletions
diff --git a/README.md b/README.md
index 44632dd..63094ae 100644
--- a/README.md
+++ b/README.md
@@ -4,6 +4,7 @@
## TODO:
+- [ ] how to know if stanza has been sent
- [ ] error states for all negotiation parts
- [ ] better errors
- [x] rename structs
@@ -74,6 +75,7 @@ need more research:
- [ ] message editing
- [ ] xep-0308: last message correction (should not be used for older than last message according to spec)
- [ ] chat read markers
+ - [ ] xep-0490: message displayed synchronization
- [ ] xep-0333: displayed markers
- [ ] message styling
- [ ] xep-0393: message styling
diff --git a/jabber/Cargo.toml b/jabber/Cargo.toml
index 4753e59..68dddd9 100644
--- a/jabber/Cargo.toml
+++ b/jabber/Cargo.toml
@@ -22,6 +22,9 @@ stanza = { version = "0.1.0", path = "../stanza" }
peanuts = { version = "0.1.0", path = "../../peanuts" }
jid = { version = "0.1.0", path = "../jid" }
futures = "0.3.31"
+take_mut = "0.2.2"
+pin-project-lite = "0.2.15"
+pin-project = "1.1.7"
[dev-dependencies]
test-log = { version = "0.2", features = ["trace"] }
diff --git a/jabber/src/client.rs b/jabber/src/client.rs
index c8b0b73..c6cab07 100644
--- a/jabber/src/client.rs
+++ b/jabber/src/client.rs
@@ -56,6 +56,14 @@ impl JabberClient {
}
}
+ pub(crate) fn inner(self) -> Result<JabberStream<Tls>> {
+ match self.connection {
+ ConnectionState::Disconnected => return Err(Error::Disconnected),
+ ConnectionState::Connecting(_connecting) => return Err(Error::Connecting),
+ ConnectionState::Connected(jabber_stream) => return Ok(jabber_stream),
+ }
+ }
+
pub async fn send_stanza(&mut self, stanza: &Stanza) -> Result<()> {
match &mut self.connection {
ConnectionState::Disconnected => return Err(Error::Disconnected),
@@ -67,22 +75,6 @@ impl JabberClient {
}
}
-impl Stream for JabberClient {
- type Item = Result<Stanza>;
-
- fn poll_next(
- self: std::pin::Pin<&mut Self>,
- cx: &mut std::task::Context<'_>,
- ) -> std::task::Poll<Option<Self::Item>> {
- let mut client = pin!(self);
- match &mut client.connection {
- ConnectionState::Disconnected => Poll::Pending,
- ConnectionState::Connecting(_connecting) => Poll::Pending,
- ConnectionState::Connected(jabber_stream) => jabber_stream.poll_next_unpin(cx),
- }
- }
-}
-
pub enum ConnectionState {
Disconnected,
Connecting(Connecting),
diff --git a/jabber/src/error.rs b/jabber/src/error.rs
index aad033c..6671fe6 100644
--- a/jabber/src/error.rs
+++ b/jabber/src/error.rs
@@ -16,6 +16,7 @@ pub enum Error {
Unsupported,
NoLocalpart,
AlreadyConnecting,
+ StreamClosed,
UnexpectedElement(peanuts::Element),
XML(peanuts::Error),
Deserialization(peanuts::DeserializeError),
diff --git a/jabber/src/jabber_stream.rs b/jabber/src/jabber_stream.rs
index dd0dcbf..d981f8f 100644
--- a/jabber/src/jabber_stream.rs
+++ b/jabber/src/jabber_stream.rs
@@ -2,7 +2,7 @@ use std::pin::pin;
use std::str::{self, FromStr};
use std::sync::Arc;
-use futures::StreamExt;
+use futures::{sink, stream, StreamExt};
use jid::JID;
use peanuts::element::{FromContent, IntoElement};
use peanuts::{Reader, Writer};
@@ -22,28 +22,14 @@ use crate::connection::{Tls, Unencrypted};
use crate::error::Error;
use crate::Result;
+pub mod bound_stream;
+
// open stream (streams started)
pub struct JabberStream<S> {
reader: Reader<ReadHalf<S>>,
writer: Writer<WriteHalf<S>>,
}
-impl<S: AsyncRead> futures::Stream for JabberStream<S> {
- type Item = Result<Stanza>;
-
- fn poll_next(
- self: std::pin::Pin<&mut Self>,
- cx: &mut std::task::Context<'_>,
- ) -> std::task::Poll<Option<Self::Item>> {
- pin!(self).reader.poll_next_unpin(cx).map(|content| {
- content.map(|content| -> Result<Stanza> {
- let stanza = content.map(|content| Stanza::from_content(content))?;
- Ok(stanza?)
- })
- })
- }
-}
-
impl<S> JabberStream<S>
where
S: AsyncRead + AsyncWrite + Unpin + Send + std::fmt::Debug,
@@ -327,7 +313,8 @@ mod tests {
use std::time::Duration;
use super::*;
- use crate::connection::Connection;
+ use crate::{client::JabberClient, connection::Connection};
+ use futures::sink;
use test_log::test;
use tokio::time::sleep;
@@ -378,7 +365,15 @@ mod tests {
}
#[tokio::test]
- async fn negotiate() {
+ async fn sink() {
+ let mut client = JabberClient::new("test@blos.sm", "slayed").unwrap();
+ client.connect().await.unwrap();
+ let stream = client.inner().unwrap();
+ let sink = sink::unfold(stream, |mut stream, stanza: Stanza| async move {
+ stream.writer.write(&stanza).await?;
+ Ok::<JabberStream<Tls>, Error>(stream)
+ });
+ todo!()
// let _jabber = Connection::connect_user("test@blos.sm", "slayed".to_string())
// .await
// .unwrap()
diff --git a/jabber/src/jabber_stream/bound_stream.rs b/jabber/src/jabber_stream/bound_stream.rs
new file mode 100644
index 0000000..ca93421
--- /dev/null
+++ b/jabber/src/jabber_stream/bound_stream.rs
@@ -0,0 +1,153 @@
+use std::pin::pin;
+use std::pin::Pin;
+use std::sync::Arc;
+
+use futures::{sink, stream, Sink, Stream};
+use peanuts::{Reader, Writer};
+use pin_project::pin_project;
+use stanza::client::Stanza;
+use tokio::io::{AsyncRead, AsyncWrite, ReadHalf, WriteHalf};
+use tokio::sync::Mutex;
+
+use crate::Error;
+
+use super::JabberStream;
+
+#[pin_project]
+pub struct BoundJabberStream<R, W, S>
+where
+ R: Stream,
+ W: Sink<Stanza>,
+ S: AsyncWrite + AsyncRead + Unpin + Send,
+{
+ reader: Arc<Mutex<Option<Reader<ReadHalf<S>>>>>,
+ writer: Arc<Mutex<Option<Writer<WriteHalf<S>>>>>,
+ stream: R,
+ sink: W,
+}
+
+impl<R, W, S> BoundJabberStream<R, W, S>
+where
+ R: Stream,
+ W: Sink<Stanza>,
+ S: AsyncWrite + AsyncRead + Unpin + Send,
+{
+ // TODO: look into biased mutex, to close stream ASAP
+ pub async fn close_stream(self) -> Result<JabberStream<S>, Error> {
+ if let Some(reader) = self.reader.lock().await.take() {
+ if let Some(writer) = self.writer.lock().await.take() {
+ // TODO: writer </stream:stream>
+ return Ok(JabberStream { reader, writer });
+ }
+ }
+ return Err(Error::StreamClosed);
+ }
+}
+
+pub trait JabberStreamTrait: AsyncWrite + AsyncRead + Unpin + Send {}
+
+impl<R, W, S> Sink<Stanza> for BoundJabberStream<R, W, S>
+where
+ R: Stream,
+ W: Sink<Stanza> + Unpin,
+ S: AsyncWrite + AsyncRead + Unpin + Send,
+{
+ type Error = <W as Sink<Stanza>>::Error;
+
+ fn poll_ready(
+ self: std::pin::Pin<&mut Self>,
+ cx: &mut std::task::Context<'_>,
+ ) -> std::task::Poll<Result<(), Self::Error>> {
+ let this = self.project();
+ pin!(this.sink).poll_ready(cx)
+ }
+
+ fn start_send(self: std::pin::Pin<&mut Self>, item: Stanza) -> Result<(), Self::Error> {
+ let this = self.project();
+ pin!(this.sink).start_send(item)
+ }
+
+ fn poll_flush(
+ self: std::pin::Pin<&mut Self>,
+ cx: &mut std::task::Context<'_>,
+ ) -> std::task::Poll<Result<(), Self::Error>> {
+ let this = self.project();
+ pin!(this.sink).poll_flush(cx)
+ }
+
+ fn poll_close(
+ self: std::pin::Pin<&mut Self>,
+ cx: &mut std::task::Context<'_>,
+ ) -> std::task::Poll<Result<(), Self::Error>> {
+ let this = self.project();
+ pin!(this.sink).poll_close(cx)
+ }
+}
+
+impl<R, W, S> Stream for BoundJabberStream<R, W, S>
+where
+ R: Stream + Unpin,
+ W: Sink<Stanza>,
+ S: AsyncWrite + AsyncRead + Unpin + Send,
+{
+ type Item = <R as Stream>::Item;
+
+ fn poll_next(
+ self: Pin<&mut Self>,
+ cx: &mut std::task::Context<'_>,
+ ) -> std::task::Poll<Option<Self::Item>> {
+ let this = self.project();
+ pin!(this.stream).poll_next(cx)
+ }
+}
+
+impl<S> JabberStream<S>
+where
+ S: AsyncWrite + AsyncRead + Unpin + Send,
+{
+ pub fn to_bound_jabber(self) -> BoundJabberStream<impl Stream, impl Sink<Stanza>, S> {
+ let reader = Arc::new(Mutex::new(Some(self.reader)));
+ let writer = Arc::new(Mutex::new(Some(self.writer)));
+ let sink = sink::unfold(writer.clone(), |writer, s: Stanza| async move {
+ write(writer, s).await
+ });
+ let stream = stream::unfold(reader.clone(), |reader| async { read(reader).await });
+ BoundJabberStream {
+ sink,
+ stream,
+ writer,
+ reader,
+ }
+ }
+}
+
+pub async fn write<W: AsyncWrite + Unpin + Send>(
+ writer: Arc<Mutex<Option<Writer<WriteHalf<W>>>>>,
+ stanza: Stanza,
+) -> Result<Arc<Mutex<Option<Writer<WriteHalf<W>>>>>, Error> {
+ {
+ if let Some(writer) = writer.lock().await.as_mut() {
+ writer.write(&stanza).await?;
+ } else {
+ return Err(Error::StreamClosed);
+ }
+ }
+ Ok(writer)
+}
+
+pub async fn read<R: AsyncRead + Unpin + Send>(
+ reader: Arc<Mutex<Option<Reader<ReadHalf<R>>>>>,
+) -> Option<(
+ Result<Stanza, Error>,
+ Arc<Mutex<Option<Reader<ReadHalf<R>>>>>,
+)> {
+ let stanza: Result<Stanza, Error>;
+ {
+ if let Some(reader) = reader.lock().await.as_mut() {
+ stanza = reader.read().await.map_err(|e| e.into());
+ } else {
+ stanza = Err(Error::StreamClosed)
+ };
+ }
+ Some((stanza, reader))
+}