diff options
Diffstat (limited to 'src/reader.rs')
-rw-r--r-- | src/reader.rs | 262 |
1 files changed, 230 insertions, 32 deletions
diff --git a/src/reader.rs b/src/reader.rs index 93b28af..a403171 100644 --- a/src/reader.rs +++ b/src/reader.rs @@ -1,12 +1,24 @@ +use std::io::Write; + use circular::Buffer; +#[cfg(target_arch = "wasm32")] +use js_sys::{ArrayBuffer, Uint8Array}; use nom::Err; use std::{ collections::{HashMap, HashSet, VecDeque}, str, }; -use tokio::io::{AsyncRead, AsyncReadExt}; -use tracing::{debug, info, trace}; - +use tokio::io::AsyncRead; +#[cfg(target_arch = "wasm32")] +use tokio::sync::mpsc; +use tracing::{info, trace}; +#[cfg(target_arch = "wasm32")] +use wasm_bindgen::{closure::Closure, JsCast}; +#[cfg(target_arch = "wasm32")] +use web_sys::{Blob, MessageEvent}; + +#[cfg(target_arch = "wasm32")] +use crate::error::WebsocketError; use crate::{ declaration::{Declaration, VersionInfo}, element::{Content, Element, FromElement, Name, NamespaceDeclaration}, @@ -17,19 +29,144 @@ use crate::{ static MAX_STANZA_SIZE: usize = 65536; -/// streaming reader that tracks depth and available namespaces at current depth +/// Reader that tracks depth and corresponding declared/available namespaces. #[derive(Debug)] pub struct Reader<R> { inner: R, - pub buffer: Buffer, + buffer: Buffer, // holds which tags we are in atm over depth // to have names reference namespaces could depth: Vec<Name>, namespace_declarations: Vec<HashSet<NamespaceDeclaration>>, + unendable: bool, root_ended: bool, } +/// Represents a WebSocket Message, after converting from JavaScript type. +/// from https://github.com/najamelan/ws_stream_wasm/blob/dev/src/ws_message.rs +#[cfg(target_arch = "wasm32")] +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub enum WsMessage { + /// The data of the message is a string. + /// + Text(String), + + /// The message contains binary data. + /// + Binary(Vec<u8>), +} + +/// This will convert the JavaScript event into a WsMessage. Note that this +/// will only work if the connection is set to use the binary type ArrayBuffer. +/// On binary type Blob, this will panic. +/// from https://github.com/najamelan/ws_stream_wasm/blob/dev/src/ws_message.rs +#[cfg(target_arch = "wasm32")] +impl TryFrom<MessageEvent> for WsMessage { + type Error = WebsocketError; + + fn try_from(evt: MessageEvent) -> std::result::Result<Self, Self::Error> { + match evt.data() { + d if d.is_instance_of::<ArrayBuffer>() => { + let buffy = Uint8Array::new(d.unchecked_ref()); + let mut v = vec![0; buffy.length() as usize]; + + buffy.copy_to(&mut v); // FIXME: get rid of this copy + + Ok(WsMessage::Binary(v)) + } + + // We don't allow invalid encodings. In principle if needed, + // we could add a variant to WsMessage with a CString or an OsString + // to allow the user to access this data. However until there is a usecase, + // I'm not inclined, amongst other things because the conversion from Js isn't very + // clear and it would require a bunch of testing for something that's a rather bad + // idea to begin with. If you need data that is not a valid string, use a binary + // message. + // + d if d.is_string() => match d.as_string() { + Some(text) => Ok(WsMessage::Text(text)), + None => Err(WebsocketError::InvalidEncoding), + }, + + // We have set the binary mode to array buffer (WsMeta::connect), so normally this shouldn't happen. + // That is as long as this is used within the context of the WsMeta constructor. + // + d if d.is_instance_of::<Blob>() => Err(WebsocketError::CantDecodeBlob), + + // should never happen. + // + _ => Err(WebsocketError::UnknownDataType), + } + } +} + +#[cfg(target_arch = "wasm32")] +#[derive(Debug)] +/// Receiver for websocket frames. Implements `Readable` for asynchronous XML reading. +pub struct WebSocketOnMessageRead { + queue: mpsc::UnboundedReceiver<WsMessage>, +} + +#[cfg(target_arch = "wasm32")] +impl WebSocketOnMessageRead { + /// Create a new `WebsocketOnMessageRead` with corresponding `on_message` event closure. + pub fn new() -> (Closure<dyn FnMut(MessageEvent)>, Self) { + let (send, recv) = mpsc::unbounded_channel(); + let on_msg = Closure::wrap(Box::new(move |msg_evt: MessageEvent| { + let msg_evt = msg_evt.try_into(); + match msg_evt { + Ok(msg_evt) => match send.send(msg_evt) { + Ok(()) => {} + Err(e) => { + tracing::error!("message event send error: {:?}", e); + } + }, + Err(e) => { + tracing::error!("websocket receive error: {}", e); + } + } + }) as Box<dyn FnMut(MessageEvent)>); + + (on_msg, Self { queue: recv }) + } +} + +#[cfg(target_arch = "wasm32")] +impl Readable for WebSocketOnMessageRead { + async fn read_buf(&mut self, buffer: &mut Buffer) -> Result<usize> { + let msg = self.queue.recv().await; + let msg = match msg { + Some(msg) => msg, + None => return Err(Error::WebSocketClosed), + }; + match msg { + WsMessage::Text(s) => { + let text = s.as_bytes(); + Ok(buffer.write(text)?) + } + WsMessage::Binary(v) => Ok(buffer.write(&v)?), + } + } +} + +/// Trait for abstracting asynchronous read streams. +pub trait Readable { + fn read_buf(&mut self, buffer: &mut Buffer) + -> impl std::future::Future<Output = Result<usize>>; +} + +/// String wrapper which implements Readable, for string parsing. +pub struct ReadableString(pub String); + +impl Readable for ReadableString { + async fn read_buf(&mut self, buffer: &mut Buffer) -> Result<usize> { + let string = self.0.split_off(0); + Ok(buffer.write(string.as_bytes())?) + } +} + impl<R> Reader<R> { + /// Create a new `Reader` which is constrained to a single root element. pub fn new(reader: R) -> Self { let mut default_declarations = HashSet::new(); default_declarations.insert(NamespaceDeclaration { @@ -46,25 +183,52 @@ impl<R> Reader<R> { depth: Vec::new(), // TODO: make sure reserved namespaces are never overwritten namespace_declarations: vec![default_declarations], + unendable: false, + root_ended: false, + } + } + + /// Create a new `Reader` which is not constrained to a single root element. + pub fn new_unendable(reader: R) -> Self { + let mut default_declarations = HashSet::new(); + default_declarations.insert(NamespaceDeclaration { + prefix: Some("xml".to_string()), + namespace: XML_NS.to_string(), + }); + default_declarations.insert(NamespaceDeclaration { + prefix: Some("xmlns".to_string()), + namespace: XMLNS_NS.to_string(), + }); + Self { + inner: reader, + buffer: Buffer::with_capacity(MAX_STANZA_SIZE), + depth: Vec::new(), + // TODO: make sure reserved namespaces are never overwritten + namespace_declarations: vec![default_declarations], + unendable: true, root_ended: false, } } + /// Extract the inner type from the `Reader`. pub fn into_inner(self) -> R { self.inner } } -impl<R> Reader<R> +impl<R> Readable for R where R: AsyncRead + Unpin, { - pub async fn read_buf<'s>(&mut self) -> Result<usize> { - Ok(self.inner.read_buf(&mut self.buffer).await?) + async fn read_buf(&mut self, buffer: &mut Buffer) -> Result<usize> { + Ok(tokio::io::AsyncReadExt::read_buf(self, buffer).await?) } +} +impl<R: Readable> Reader<R> { + /// Attempt to read an XML prolog, which could include an XML declaration, miscellaneous items (e.g. comments, processing instructions), and/or a doctype declaration. pub async fn read_prolog<'s>(&'s mut self) -> Result<Option<Declaration>> { - if self.root_ended { + if !self.unendable && self.root_ended { return Err(Error::RootElementEnded); } loop { @@ -104,30 +268,37 @@ where } std::result::Result::Err(e) => match e { Err::Incomplete(_) => { - self.read_buf().await?; + self.inner.read_buf(&mut self.buffer).await?; } // TODO: better error - Err::Error(e) => return Err(Error::ParseError(e.to_string())), - Err::Failure(e) => return Err(Error::ParseError(e.to_string())), + Err::Error(e) => { + return Err(Error::ParseError(input.to_string(), e.to_string())) + } + Err::Failure(e) => { + return Err(Error::ParseError(input.to_string(), e.to_string())) + } }, } } } + /// Read a start tag, moving up in document depth, and convert it into a type which implements `FromElement`. pub async fn read_start<'s, T: FromElement>(&'s mut self) -> Result<T> { let element = self.read_start_tag().await?; trace!("read element start: {:?}", element); Ok(FromElement::from_element(element)?) } + /// Read a full element (start tag + content + end tag, or empty tag) and convert it into a type which implements `FromElement`. pub async fn read<'s, T: FromElement>(&'s mut self) -> Result<T> { let element = self.read_element().await?; trace!("read element: {:?}", element); Ok(FromElement::from_element(element)?) } + /// Read a start tag, moving up in document depth. pub async fn read_start_tag<'s>(&'s mut self) -> Result<Element> { - if self.root_ended { + if !self.unendable && self.root_ended { return Err(Error::RootElementEnded); } loop { @@ -147,18 +318,23 @@ where } std::result::Result::Err(e) => match e { Err::Incomplete(_) => { - self.read_buf().await?; + self.inner.read_buf(&mut self.buffer).await?; } // TODO: better error - Err::Error(e) => return Err(Error::ParseError(e.to_string())), - Err::Failure(e) => return Err(Error::ParseError(e.to_string())), + Err::Error(e) => { + return Err(Error::ParseError(input.to_string(), e.to_string())) + } + Err::Failure(e) => { + return Err(Error::ParseError(input.to_string(), e.to_string())) + } }, } } } + /// Read an end tag, moving down in document depth. pub async fn read_end_tag<'s>(&'s mut self) -> Result<()> { - if self.root_ended { + if !self.unendable && self.root_ended { return Err(Error::RootElementEnded); } loop { @@ -181,18 +357,23 @@ where } std::result::Result::Err(e) => match e { Err::Incomplete(_) => { - self.read_buf().await?; + self.inner.read_buf(&mut self.buffer).await?; } // TODO: better error - Err::Error(e) => return Err(Error::ParseError(e.to_string())), - Err::Failure(e) => return Err(Error::ParseError(e.to_string())), + Err::Error(e) => { + return Err(Error::ParseError(input.to_string(), e.to_string())) + } + Err::Failure(e) => { + return Err(Error::ParseError(input.to_string(), e.to_string())) + } }, } } } + /// Read a full element (start tag + content + end tag, or empty tag). pub async fn read_element<'s>(&'s mut self) -> Result<Element> { - if self.root_ended { + if !self.unendable && self.root_ended { return Err(Error::RootElementEnded); } loop { @@ -212,18 +393,23 @@ where } std::result::Result::Err(e) => match e { Err::Incomplete(_) => { - self.read_buf().await?; + self.inner.read_buf(&mut self.buffer).await?; } // TODO: better error - Err::Error(e) => return Err(Error::ParseError(e.to_string())), - Err::Failure(e) => return Err(Error::ParseError(e.to_string())), + Err::Error(e) => { + return Err(Error::ParseError(input.to_string(), e.to_string())) + } + Err::Failure(e) => { + return Err(Error::ParseError(input.to_string(), e.to_string())) + } }, } } } + /// Read element content (text, another full element, a comment, a PI). pub async fn read_content<'s>(&'s mut self) -> Result<Content> { - if self.root_ended { + if !self.unendable && self.root_ended { return Err(Error::RootElementEnded); } let mut last_char = false; @@ -240,7 +426,7 @@ where } std::result::Result::Err(e) => match e { Err::Incomplete(_) => { - self.read_buf().await?; + self.inner.read_buf(&mut self.buffer).await?; } _ => match xml::ContentItem::parse(input) { Ok((rest, content_item)) => match content_item { @@ -297,11 +483,15 @@ where }, std::result::Result::Err(e) => match e { Err::Incomplete(_) => { - self.read_buf().await?; + self.inner.read_buf(&mut self.buffer).await?; } // TODO: better error - Err::Error(e) => return Err(Error::ParseError(e.to_string())), - Err::Failure(e) => return Err(Error::ParseError(e.to_string())), + Err::Error(e) => { + return Err(Error::ParseError(input.to_string(), e.to_string())) + } + Err::Failure(e) => { + return Err(Error::ParseError(input.to_string(), e.to_string())) + } }, }, }, @@ -358,11 +548,15 @@ where }, std::result::Result::Err(e) => match e { Err::Incomplete(_) => { - self.read_buf().await?; + self.inner.read_buf(&mut self.buffer).await?; } // TODO: better error - Err::Error(e) => return Err(Error::ParseError(e.to_string())), - Err::Failure(e) => return Err(Error::ParseError(e.to_string())), + Err::Error(e) => { + return Err(Error::ParseError(input.to_string(), e.to_string())) + } + Err::Failure(e) => { + return Err(Error::ParseError(input.to_string(), e.to_string())) + } }, } } @@ -371,6 +565,7 @@ where } impl<R> Reader<R> { + /// Convert a start tag into an `Element` given a mutable document context. fn start_tag_from_xml( depth: &mut Vec<Name>, namespace_declarations: &mut Vec<HashSet<NamespaceDeclaration>>, @@ -492,6 +687,7 @@ impl<R> Reader<R> { }); } + /// Ensure an end tag is acceptable given a document context. fn end_tag_from_xml( depth: &mut Vec<Name>, namespace_declarations: &mut Vec<HashSet<NamespaceDeclaration>>, @@ -539,6 +735,7 @@ impl<R> Reader<R> { } } + /// Convert an xml element (empty or not) into an `Element` given a mutable document context. fn element_from_xml( namespace_declarations: &mut Vec<HashSet<NamespaceDeclaration>>, element: xml::Element, @@ -713,6 +910,7 @@ impl<R> Reader<R> { }); } + /// Convert xml content into a `VecDeque` of `Content` given a document context. fn content_from_xml( namespaces: &mut Vec<HashSet<NamespaceDeclaration>>, xml_content: xml::Content, |