aboutsummaryrefslogtreecommitdiffstats
path: root/src/reader.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/reader.rs')
-rw-r--r--src/reader.rs262
1 files changed, 230 insertions, 32 deletions
diff --git a/src/reader.rs b/src/reader.rs
index 93b28af..a403171 100644
--- a/src/reader.rs
+++ b/src/reader.rs
@@ -1,12 +1,24 @@
+use std::io::Write;
+
use circular::Buffer;
+#[cfg(target_arch = "wasm32")]
+use js_sys::{ArrayBuffer, Uint8Array};
use nom::Err;
use std::{
collections::{HashMap, HashSet, VecDeque},
str,
};
-use tokio::io::{AsyncRead, AsyncReadExt};
-use tracing::{debug, info, trace};
-
+use tokio::io::AsyncRead;
+#[cfg(target_arch = "wasm32")]
+use tokio::sync::mpsc;
+use tracing::{info, trace};
+#[cfg(target_arch = "wasm32")]
+use wasm_bindgen::{closure::Closure, JsCast};
+#[cfg(target_arch = "wasm32")]
+use web_sys::{Blob, MessageEvent};
+
+#[cfg(target_arch = "wasm32")]
+use crate::error::WebsocketError;
use crate::{
declaration::{Declaration, VersionInfo},
element::{Content, Element, FromElement, Name, NamespaceDeclaration},
@@ -17,19 +29,144 @@ use crate::{
static MAX_STANZA_SIZE: usize = 65536;
-/// streaming reader that tracks depth and available namespaces at current depth
+/// Reader that tracks depth and corresponding declared/available namespaces.
#[derive(Debug)]
pub struct Reader<R> {
inner: R,
- pub buffer: Buffer,
+ buffer: Buffer,
// holds which tags we are in atm over depth
// to have names reference namespaces could
depth: Vec<Name>,
namespace_declarations: Vec<HashSet<NamespaceDeclaration>>,
+ unendable: bool,
root_ended: bool,
}
+/// Represents a WebSocket Message, after converting from JavaScript type.
+/// from https://github.com/najamelan/ws_stream_wasm/blob/dev/src/ws_message.rs
+#[cfg(target_arch = "wasm32")]
+#[derive(Debug, Clone, PartialEq, Eq, Hash)]
+pub enum WsMessage {
+ /// The data of the message is a string.
+ ///
+ Text(String),
+
+ /// The message contains binary data.
+ ///
+ Binary(Vec<u8>),
+}
+
+/// This will convert the JavaScript event into a WsMessage. Note that this
+/// will only work if the connection is set to use the binary type ArrayBuffer.
+/// On binary type Blob, this will panic.
+/// from https://github.com/najamelan/ws_stream_wasm/blob/dev/src/ws_message.rs
+#[cfg(target_arch = "wasm32")]
+impl TryFrom<MessageEvent> for WsMessage {
+ type Error = WebsocketError;
+
+ fn try_from(evt: MessageEvent) -> std::result::Result<Self, Self::Error> {
+ match evt.data() {
+ d if d.is_instance_of::<ArrayBuffer>() => {
+ let buffy = Uint8Array::new(d.unchecked_ref());
+ let mut v = vec![0; buffy.length() as usize];
+
+ buffy.copy_to(&mut v); // FIXME: get rid of this copy
+
+ Ok(WsMessage::Binary(v))
+ }
+
+ // We don't allow invalid encodings. In principle if needed,
+ // we could add a variant to WsMessage with a CString or an OsString
+ // to allow the user to access this data. However until there is a usecase,
+ // I'm not inclined, amongst other things because the conversion from Js isn't very
+ // clear and it would require a bunch of testing for something that's a rather bad
+ // idea to begin with. If you need data that is not a valid string, use a binary
+ // message.
+ //
+ d if d.is_string() => match d.as_string() {
+ Some(text) => Ok(WsMessage::Text(text)),
+ None => Err(WebsocketError::InvalidEncoding),
+ },
+
+ // We have set the binary mode to array buffer (WsMeta::connect), so normally this shouldn't happen.
+ // That is as long as this is used within the context of the WsMeta constructor.
+ //
+ d if d.is_instance_of::<Blob>() => Err(WebsocketError::CantDecodeBlob),
+
+ // should never happen.
+ //
+ _ => Err(WebsocketError::UnknownDataType),
+ }
+ }
+}
+
+#[cfg(target_arch = "wasm32")]
+#[derive(Debug)]
+/// Receiver for websocket frames. Implements `Readable` for asynchronous XML reading.
+pub struct WebSocketOnMessageRead {
+ queue: mpsc::UnboundedReceiver<WsMessage>,
+}
+
+#[cfg(target_arch = "wasm32")]
+impl WebSocketOnMessageRead {
+ /// Create a new `WebsocketOnMessageRead` with corresponding `on_message` event closure.
+ pub fn new() -> (Closure<dyn FnMut(MessageEvent)>, Self) {
+ let (send, recv) = mpsc::unbounded_channel();
+ let on_msg = Closure::wrap(Box::new(move |msg_evt: MessageEvent| {
+ let msg_evt = msg_evt.try_into();
+ match msg_evt {
+ Ok(msg_evt) => match send.send(msg_evt) {
+ Ok(()) => {}
+ Err(e) => {
+ tracing::error!("message event send error: {:?}", e);
+ }
+ },
+ Err(e) => {
+ tracing::error!("websocket receive error: {}", e);
+ }
+ }
+ }) as Box<dyn FnMut(MessageEvent)>);
+
+ (on_msg, Self { queue: recv })
+ }
+}
+
+#[cfg(target_arch = "wasm32")]
+impl Readable for WebSocketOnMessageRead {
+ async fn read_buf(&mut self, buffer: &mut Buffer) -> Result<usize> {
+ let msg = self.queue.recv().await;
+ let msg = match msg {
+ Some(msg) => msg,
+ None => return Err(Error::WebSocketClosed),
+ };
+ match msg {
+ WsMessage::Text(s) => {
+ let text = s.as_bytes();
+ Ok(buffer.write(text)?)
+ }
+ WsMessage::Binary(v) => Ok(buffer.write(&v)?),
+ }
+ }
+}
+
+/// Trait for abstracting asynchronous read streams.
+pub trait Readable {
+ fn read_buf(&mut self, buffer: &mut Buffer)
+ -> impl std::future::Future<Output = Result<usize>>;
+}
+
+/// String wrapper which implements Readable, for string parsing.
+pub struct ReadableString(pub String);
+
+impl Readable for ReadableString {
+ async fn read_buf(&mut self, buffer: &mut Buffer) -> Result<usize> {
+ let string = self.0.split_off(0);
+ Ok(buffer.write(string.as_bytes())?)
+ }
+}
+
impl<R> Reader<R> {
+ /// Create a new `Reader` which is constrained to a single root element.
pub fn new(reader: R) -> Self {
let mut default_declarations = HashSet::new();
default_declarations.insert(NamespaceDeclaration {
@@ -46,25 +183,52 @@ impl<R> Reader<R> {
depth: Vec::new(),
// TODO: make sure reserved namespaces are never overwritten
namespace_declarations: vec![default_declarations],
+ unendable: false,
+ root_ended: false,
+ }
+ }
+
+ /// Create a new `Reader` which is not constrained to a single root element.
+ pub fn new_unendable(reader: R) -> Self {
+ let mut default_declarations = HashSet::new();
+ default_declarations.insert(NamespaceDeclaration {
+ prefix: Some("xml".to_string()),
+ namespace: XML_NS.to_string(),
+ });
+ default_declarations.insert(NamespaceDeclaration {
+ prefix: Some("xmlns".to_string()),
+ namespace: XMLNS_NS.to_string(),
+ });
+ Self {
+ inner: reader,
+ buffer: Buffer::with_capacity(MAX_STANZA_SIZE),
+ depth: Vec::new(),
+ // TODO: make sure reserved namespaces are never overwritten
+ namespace_declarations: vec![default_declarations],
+ unendable: true,
root_ended: false,
}
}
+ /// Extract the inner type from the `Reader`.
pub fn into_inner(self) -> R {
self.inner
}
}
-impl<R> Reader<R>
+impl<R> Readable for R
where
R: AsyncRead + Unpin,
{
- pub async fn read_buf<'s>(&mut self) -> Result<usize> {
- Ok(self.inner.read_buf(&mut self.buffer).await?)
+ async fn read_buf(&mut self, buffer: &mut Buffer) -> Result<usize> {
+ Ok(tokio::io::AsyncReadExt::read_buf(self, buffer).await?)
}
+}
+impl<R: Readable> Reader<R> {
+ /// Attempt to read an XML prolog, which could include an XML declaration, miscellaneous items (e.g. comments, processing instructions), and/or a doctype declaration.
pub async fn read_prolog<'s>(&'s mut self) -> Result<Option<Declaration>> {
- if self.root_ended {
+ if !self.unendable && self.root_ended {
return Err(Error::RootElementEnded);
}
loop {
@@ -104,30 +268,37 @@ where
}
std::result::Result::Err(e) => match e {
Err::Incomplete(_) => {
- self.read_buf().await?;
+ self.inner.read_buf(&mut self.buffer).await?;
}
// TODO: better error
- Err::Error(e) => return Err(Error::ParseError(e.to_string())),
- Err::Failure(e) => return Err(Error::ParseError(e.to_string())),
+ Err::Error(e) => {
+ return Err(Error::ParseError(input.to_string(), e.to_string()))
+ }
+ Err::Failure(e) => {
+ return Err(Error::ParseError(input.to_string(), e.to_string()))
+ }
},
}
}
}
+ /// Read a start tag, moving up in document depth, and convert it into a type which implements `FromElement`.
pub async fn read_start<'s, T: FromElement>(&'s mut self) -> Result<T> {
let element = self.read_start_tag().await?;
trace!("read element start: {:?}", element);
Ok(FromElement::from_element(element)?)
}
+ /// Read a full element (start tag + content + end tag, or empty tag) and convert it into a type which implements `FromElement`.
pub async fn read<'s, T: FromElement>(&'s mut self) -> Result<T> {
let element = self.read_element().await?;
trace!("read element: {:?}", element);
Ok(FromElement::from_element(element)?)
}
+ /// Read a start tag, moving up in document depth.
pub async fn read_start_tag<'s>(&'s mut self) -> Result<Element> {
- if self.root_ended {
+ if !self.unendable && self.root_ended {
return Err(Error::RootElementEnded);
}
loop {
@@ -147,18 +318,23 @@ where
}
std::result::Result::Err(e) => match e {
Err::Incomplete(_) => {
- self.read_buf().await?;
+ self.inner.read_buf(&mut self.buffer).await?;
}
// TODO: better error
- Err::Error(e) => return Err(Error::ParseError(e.to_string())),
- Err::Failure(e) => return Err(Error::ParseError(e.to_string())),
+ Err::Error(e) => {
+ return Err(Error::ParseError(input.to_string(), e.to_string()))
+ }
+ Err::Failure(e) => {
+ return Err(Error::ParseError(input.to_string(), e.to_string()))
+ }
},
}
}
}
+ /// Read an end tag, moving down in document depth.
pub async fn read_end_tag<'s>(&'s mut self) -> Result<()> {
- if self.root_ended {
+ if !self.unendable && self.root_ended {
return Err(Error::RootElementEnded);
}
loop {
@@ -181,18 +357,23 @@ where
}
std::result::Result::Err(e) => match e {
Err::Incomplete(_) => {
- self.read_buf().await?;
+ self.inner.read_buf(&mut self.buffer).await?;
}
// TODO: better error
- Err::Error(e) => return Err(Error::ParseError(e.to_string())),
- Err::Failure(e) => return Err(Error::ParseError(e.to_string())),
+ Err::Error(e) => {
+ return Err(Error::ParseError(input.to_string(), e.to_string()))
+ }
+ Err::Failure(e) => {
+ return Err(Error::ParseError(input.to_string(), e.to_string()))
+ }
},
}
}
}
+ /// Read a full element (start tag + content + end tag, or empty tag).
pub async fn read_element<'s>(&'s mut self) -> Result<Element> {
- if self.root_ended {
+ if !self.unendable && self.root_ended {
return Err(Error::RootElementEnded);
}
loop {
@@ -212,18 +393,23 @@ where
}
std::result::Result::Err(e) => match e {
Err::Incomplete(_) => {
- self.read_buf().await?;
+ self.inner.read_buf(&mut self.buffer).await?;
}
// TODO: better error
- Err::Error(e) => return Err(Error::ParseError(e.to_string())),
- Err::Failure(e) => return Err(Error::ParseError(e.to_string())),
+ Err::Error(e) => {
+ return Err(Error::ParseError(input.to_string(), e.to_string()))
+ }
+ Err::Failure(e) => {
+ return Err(Error::ParseError(input.to_string(), e.to_string()))
+ }
},
}
}
}
+ /// Read element content (text, another full element, a comment, a PI).
pub async fn read_content<'s>(&'s mut self) -> Result<Content> {
- if self.root_ended {
+ if !self.unendable && self.root_ended {
return Err(Error::RootElementEnded);
}
let mut last_char = false;
@@ -240,7 +426,7 @@ where
}
std::result::Result::Err(e) => match e {
Err::Incomplete(_) => {
- self.read_buf().await?;
+ self.inner.read_buf(&mut self.buffer).await?;
}
_ => match xml::ContentItem::parse(input) {
Ok((rest, content_item)) => match content_item {
@@ -297,11 +483,15 @@ where
},
std::result::Result::Err(e) => match e {
Err::Incomplete(_) => {
- self.read_buf().await?;
+ self.inner.read_buf(&mut self.buffer).await?;
}
// TODO: better error
- Err::Error(e) => return Err(Error::ParseError(e.to_string())),
- Err::Failure(e) => return Err(Error::ParseError(e.to_string())),
+ Err::Error(e) => {
+ return Err(Error::ParseError(input.to_string(), e.to_string()))
+ }
+ Err::Failure(e) => {
+ return Err(Error::ParseError(input.to_string(), e.to_string()))
+ }
},
},
},
@@ -358,11 +548,15 @@ where
},
std::result::Result::Err(e) => match e {
Err::Incomplete(_) => {
- self.read_buf().await?;
+ self.inner.read_buf(&mut self.buffer).await?;
}
// TODO: better error
- Err::Error(e) => return Err(Error::ParseError(e.to_string())),
- Err::Failure(e) => return Err(Error::ParseError(e.to_string())),
+ Err::Error(e) => {
+ return Err(Error::ParseError(input.to_string(), e.to_string()))
+ }
+ Err::Failure(e) => {
+ return Err(Error::ParseError(input.to_string(), e.to_string()))
+ }
},
}
}
@@ -371,6 +565,7 @@ where
}
impl<R> Reader<R> {
+ /// Convert a start tag into an `Element` given a mutable document context.
fn start_tag_from_xml(
depth: &mut Vec<Name>,
namespace_declarations: &mut Vec<HashSet<NamespaceDeclaration>>,
@@ -492,6 +687,7 @@ impl<R> Reader<R> {
});
}
+ /// Ensure an end tag is acceptable given a document context.
fn end_tag_from_xml(
depth: &mut Vec<Name>,
namespace_declarations: &mut Vec<HashSet<NamespaceDeclaration>>,
@@ -539,6 +735,7 @@ impl<R> Reader<R> {
}
}
+ /// Convert an xml element (empty or not) into an `Element` given a mutable document context.
fn element_from_xml(
namespace_declarations: &mut Vec<HashSet<NamespaceDeclaration>>,
element: xml::Element,
@@ -713,6 +910,7 @@ impl<R> Reader<R> {
});
}
+ /// Convert xml content into a `VecDeque` of `Content` given a document context.
fn content_from_xml(
namespaces: &mut Vec<HashSet<NamespaceDeclaration>>,
xml_content: xml::Content,