use circular::Buffer; use futures::{FutureExt, Stream}; use nom::Err; use std::{ collections::{BTreeMap, HashMap, HashSet}, future::Future, path::Prefix, pin::{pin, Pin}, str::{self, FromStr}, }; use tokio::io::{AsyncBufRead, AsyncBufReadExt, AsyncRead, AsyncReadExt}; static MAX_STANZA_SIZE: usize = 65536; use crate::{ element::{Content, Element, Name, Namespace}, error::Error, xml::{self, parsers::Parser}, Result, }; /// streaming reader that tracks depth and available namespaces at current depth pub struct Reader<R> { inner: R, buffer: Buffer, // holds which tags we are in atm over depth // to have names reference namespaces could depth: Vec<Name>, namespaces: Vec<HashSet<Namespace>>, } impl<R> Reader<R> { pub fn new(reader: R) -> Self { Self { inner: reader, buffer: Buffer::with_capacity(MAX_STANZA_SIZE), depth: Vec::new(), namespaces: Vec::new(), } } } impl<R> Reader<R> where R: AsyncRead + Unpin, { async fn read_buf<'s>(&mut self) -> Result<usize> { Ok(self.inner.read_buf(&mut self.buffer).await?) } async fn read_prolog<'s>(&'s mut self) -> Result<()> { loop { self.read_buf().await?; let input = str::from_utf8(self.buffer.data())?; match xml::Prolog::parse(input) { Ok((rest, _prolog)) => { let len = self.buffer.available_data() - rest.as_bytes().len(); self.buffer.consume(len); return Ok(()); } std::result::Result::Err(e) => match e { Err::Incomplete(_) => {} // TODO: better error Err::Error(e) => return Err(Error::ParseError(e.to_string())), Err::Failure(e) => return Err(Error::ParseError(e.to_string())), }, } } } async fn read_start_tag<'s>(&'s mut self) -> Result<Element> { loop { self.read_buf().await?; let input = str::from_utf8(self.buffer.data())?; match xml::STag::parse(input) { Ok((rest, e)) => { let len = self.buffer.available_data() - rest.as_bytes().len(); let element = Reader::<R>::start_tag_from_xml(&mut self.depth, &mut self.namespaces, e)?; self.buffer.consume(len); return Ok(element); } std::result::Result::Err(e) => match e { Err::Incomplete(_) => {} // TODO: better error Err::Error(e) => return Err(Error::ParseError(e.to_string())), Err::Failure(e) => return Err(Error::ParseError(e.to_string())), }, } } } async fn read_end_tag<'s>(&'s mut self) -> Result<()> { loop { self.read_buf().await?; let input = str::from_utf8(self.buffer.data())?; match xml::ETag::parse(input) { Ok((rest, e)) => { let len = self.buffer.available_data() - rest.as_bytes().len(); Reader::<R>::end_tag_from_xml(&mut self.depth, &mut self.namespaces, e)?; self.buffer.consume(len); return Ok(()); } std::result::Result::Err(e) => match e { Err::Incomplete(_) => {} // TODO: better error Err::Error(e) => return Err(Error::ParseError(e.to_string())), Err::Failure(e) => return Err(Error::ParseError(e.to_string())), }, } } } async fn read_element<'s>(&'s mut self) -> Result<Element> { loop { self.read_buf().await?; let input = str::from_utf8(self.buffer.data())?; match xml::Element::parse(input) { Ok((rest, e)) => { let len = self.buffer.available_data() - rest.as_bytes().len(); let element = Reader::<R>::element_from_xml(&mut self.namespaces, e)?; self.buffer.consume(len); return Ok(element); } std::result::Result::Err(e) => match e { Err::Incomplete(_) => {} // TODO: better error Err::Error(e) => return Err(Error::ParseError(e.to_string())), Err::Failure(e) => return Err(Error::ParseError(e.to_string())), }, } } } async fn read_content<'s>(&'s mut self) -> Result<Content> { loop { self.read_buf().await?; let input = str::from_utf8(self.buffer.data())?; match xml::ContentItem::parse(input) { Ok((rest, c)) => { match c { xml::ContentItem::CharData(char_data) => todo!(), xml::ContentItem::Element(element) => todo!(), xml::ContentItem::Reference(reference) => todo!(), xml::ContentItem::CDSect(cdsect) => todo!(), xml::ContentItem::PI(pi) => todo!(), xml::ContentItem::Comment(comment) => todo!(), } let len = self.buffer.available_data() - rest.as_bytes().len(); let content = Reader::<R>::content_item_from_xml(&mut self.namespaces, e)?; self.buffer.consume(len); return Ok(element); } std::result::Result::Err(e) => match e { Err::Incomplete(_) => {} // TODO: better error Err::Error(e) => return Err(Error::ParseError(e.to_string())), Err::Failure(e) => return Err(Error::ParseError(e.to_string())), }, } } } } impl<R> Reader<R> { fn content_item_from_xml( namespaces: &mut Vec<HashSet<Namespace>>, item: xml::ContentItem, ) -> Result<Content> { todo!() } fn start_tag_from_xml( depth: &mut Vec<Name>, namespaces: &mut Vec<HashSet<Namespace>>, s_tag: xml::STag, ) -> Result<Element> { let mut namespace_declarations = HashSet::new(); for (prefix, namespace) in s_tag.attributes.iter().filter_map(|attribute| { if let xml::Attribute::NamespaceDeclaration { ns_name, value } = attribute { Some((ns_name, value)) } else { None } }) { let prefix = match prefix { xml::NSAttName::PrefixedAttName(prefixed_att_name) => { Some(prefixed_att_name.to_string()) } xml::NSAttName::DefaultAttName => None, }; let namespace = Namespace { prefix, namespace: namespace.process()?, }; if !namespace_declarations.insert(namespace.clone()) { return Err(Error::DuplicateNameSpace(namespace)); } } // all namespaces available to the element (from both parent elements and element itself) let namespace_stack: Vec<&Namespace> = namespaces .iter() .flatten() .chain(namespace_declarations.iter()) .collect(); let mut attributes = HashMap::new(); for (q_name, value) in s_tag.attributes.iter().filter_map(|attribute| { if let xml::Attribute::Attribute { name, value } = attribute { Some((name, value)) } else { None } }) { let namespace; let attribute_name; match q_name { xml::QName::PrefixedName(prefixed_name) => { namespace = namespace_stack.iter().rfind(|namespace| { namespace.prefix.as_deref() == Some(**prefixed_name.prefix) }); attribute_name = prefixed_name.local_part.to_string(); } xml::QName::UnprefixedName(unprefixed_name) => { namespace = namespace_stack .iter() .rfind(|namespace| namespace.prefix == None); attribute_name = unprefixed_name.to_string(); } } if let Some(namespace) = namespace { let namespace = (*namespace).clone(); let name = Name { namespace, name: attribute_name, }; let value = value.process()?; if let Some(_value) = attributes.insert(name, value) { return Err(Error::DuplicateAttribute(q_name.to_string())); } } else { return Err(Error::UnqualifiedNamespace(q_name.to_string())); } } let name; let namespace; match &s_tag.name { xml::QName::PrefixedName(prefixed_name) => { namespace = namespace_stack .iter() .rfind(|namespace| namespace.prefix.as_deref() == Some(**prefixed_name.prefix)); name = prefixed_name.local_part.to_string(); } xml::QName::UnprefixedName(unprefixed_name) => { namespace = namespace_stack .iter() .rfind(|namespace| namespace.prefix == None); name = unprefixed_name.to_string(); } } let namespace = (*namespace .ok_or_else(|| Error::UnqualifiedNamespace(s_tag.name.to_string()))?) .clone(); let name = Name { namespace, name }; depth.push(name.clone()); namespaces.push(namespace_declarations.clone()); return Ok(Element { name, namespace_decl: namespace_declarations, attributes, content: Vec::new(), }); } fn end_tag_from_xml( depth: &mut Vec<Name>, namespaces: &mut Vec<HashSet<Namespace>>, e_tag: xml::ETag, ) -> Result<()> { if let Some(s_tag_name) = depth.pop() { if s_tag_name.namespace.prefix.as_deref() == e_tag.name.prefix() && s_tag_name.name == e_tag.name.local_part() { namespaces.pop(); return Ok(()); } else { return Err(Error::MismatchedEndTag( s_tag_name.name, e_tag.name.to_string(), )); } } else { return Err(Error::NotInElement(e_tag.name.to_string())); } } fn element_from_xml( namespaces: &mut Vec<HashSet<Namespace>>, element: xml::Element, ) -> Result<Element> { match element { xml::Element::Empty(empty_elem_tag) => { let mut namespace_declarations = HashSet::new(); for (prefix, namespace) in empty_elem_tag.attributes.iter().filter_map(|attribute| { if let xml::Attribute::NamespaceDeclaration { ns_name, value } = attribute { Some((ns_name, value)) } else { None } }) { let prefix = match prefix { xml::NSAttName::PrefixedAttName(prefixed_att_name) => { Some(prefixed_att_name.to_string()) } xml::NSAttName::DefaultAttName => None, }; let namespace = Namespace { prefix, namespace: namespace.process()?, }; if !namespace_declarations.insert(namespace.clone()) { return Err(Error::DuplicateNameSpace(namespace)); } } // all namespaces available to the element (from both parent elements and element itself) let namespace_stack: Vec<&Namespace> = namespaces .iter() .flatten() .chain(namespace_declarations.iter()) .collect(); let mut attributes = HashMap::new(); for (q_name, value) in empty_elem_tag.attributes.iter().filter_map(|attribute| { if let xml::Attribute::Attribute { name, value } = attribute { Some((name, value)) } else { None } }) { let namespace; let attribute_name; match q_name { xml::QName::PrefixedName(prefixed_name) => { namespace = namespace_stack.iter().rfind(|namespace| { namespace.prefix.as_deref() == Some(**prefixed_name.prefix) }); attribute_name = prefixed_name.local_part.to_string(); } xml::QName::UnprefixedName(unprefixed_name) => { namespace = namespace_stack .iter() .rfind(|namespace| namespace.prefix == None); attribute_name = unprefixed_name.to_string(); } } if let Some(namespace) = namespace { let namespace = (*namespace).clone(); let name = Name { namespace, name: attribute_name, }; let value = value.process()?; if let Some(_value) = attributes.insert(name, value) { return Err(Error::DuplicateAttribute(q_name.to_string())); } } else { return Err(Error::UnqualifiedNamespace(q_name.to_string())); } } let name; let namespace; match &empty_elem_tag.name { xml::QName::PrefixedName(prefixed_name) => { namespace = namespace_stack.iter().rfind(|namespace| { namespace.prefix.as_deref() == Some(**prefixed_name.prefix) }); name = prefixed_name.local_part.to_string(); } xml::QName::UnprefixedName(unprefixed_name) => { namespace = namespace_stack .iter() .rfind(|namespace| namespace.prefix == None); name = unprefixed_name.to_string(); } } let namespace = (*namespace .ok_or_else(|| Error::UnqualifiedNamespace(empty_elem_tag.name.to_string()))?) .clone(); let name = Name { namespace, name }; return Ok(Element { name, namespace_decl: namespace_declarations, attributes, content: Vec::new(), }); } xml::Element::NotEmpty(s_tag, content, e_tag) => { if s_tag.name != e_tag.name { return Err(Error::MismatchedEndTag( s_tag.name.to_string(), e_tag.name.to_string(), )); } let mut namespace_declarations = HashSet::new(); for (prefix, namespace) in s_tag.attributes.iter().filter_map(|attribute| { if let xml::Attribute::NamespaceDeclaration { ns_name, value } = attribute { Some((ns_name, value)) } else { None } }) { let prefix = match prefix { xml::NSAttName::PrefixedAttName(prefixed_att_name) => { Some(prefixed_att_name.to_string()) } xml::NSAttName::DefaultAttName => None, }; let namespace = Namespace { prefix, namespace: namespace.process()?, }; if !namespace_declarations.insert(namespace.clone()) { return Err(Error::DuplicateNameSpace(namespace)); } } // all namespaces available to the element (from both parent elements and element itself) let namespace_stack: Vec<&Namespace> = namespaces .iter() .flatten() .chain(namespace_declarations.iter()) .collect(); let mut attributes = HashMap::new(); for (q_name, value) in s_tag.attributes.iter().filter_map(|attribute| { if let xml::Attribute::Attribute { name, value } = attribute { Some((name, value)) } else { None } }) { let namespace; let attribute_name; match q_name { xml::QName::PrefixedName(prefixed_name) => { namespace = namespace_stack.iter().rfind(|namespace| { namespace.prefix.as_deref() == Some(**prefixed_name.prefix) }); attribute_name = prefixed_name.local_part.to_string(); } xml::QName::UnprefixedName(unprefixed_name) => { namespace = namespace_stack .iter() .rfind(|namespace| namespace.prefix == None); attribute_name = unprefixed_name.to_string(); } } if let Some(namespace) = namespace { let namespace = (*namespace).clone(); let name = Name { namespace, name: attribute_name, }; let value = value.process()?; if let Some(_value) = attributes.insert(name, value) { return Err(Error::DuplicateAttribute(q_name.to_string())); } } else { return Err(Error::UnqualifiedNamespace(q_name.to_string())); } } let name; let namespace; match &s_tag.name { xml::QName::PrefixedName(prefixed_name) => { namespace = namespace_stack.iter().rfind(|namespace| { namespace.prefix.as_deref() == Some(**prefixed_name.prefix) }); name = prefixed_name.local_part.to_string(); } xml::QName::UnprefixedName(unprefixed_name) => { namespace = namespace_stack .iter() .rfind(|namespace| namespace.prefix == None); name = unprefixed_name.to_string(); } } let namespace = (*namespace .ok_or_else(|| Error::UnqualifiedNamespace(s_tag.name.to_string()))?) .clone(); let name = Name { namespace, name }; namespaces.push(namespace_declarations.clone()); let content = Self::content_from_xml(namespaces, content)?; namespaces.pop(); return Ok(Element { name, namespace_decl: namespace_declarations, attributes, content, }); } } } fn content_from_xml( namespaces: &mut Vec<HashSet<Namespace>>, element: xml::Content, ) -> Result<Vec<Content>> { let mut content = Vec::new(); let mut text = element.char_data.map(|str| String::from(*str)); for (content_item, char_data) in element.content { match content_item { xml::ContentItem::Element(element) => { text.map(|text| content.push(Content::Text(text))); content.push(Content::Element(Self::element_from_xml( namespaces, element, )?)); text = char_data.map(|str| String::from(*str)); } xml::ContentItem::Reference(reference) => { let data = reference.process()?; if let Some(text) = &mut text { text.push(data) } else { text = Some(String::from(data)) } char_data.map(|char_data| text.as_mut().map(|s| s.push_str(*char_data))); } xml::ContentItem::CDSect(cd_sect) => { if let Some(text) = &mut text { text.push_str(**cd_sect) } else { text = Some(String::from(**cd_sect)) } char_data.map(|char_data| text.as_mut().map(|s| s.push_str(*char_data))); } // TODO: is this important? xml::ContentItem::PI(pi) => { char_data.map(|char_data| text.as_mut().map(|s| s.push_str(*char_data))); } // TODO: comments? xml::ContentItem::Comment(comment) => { char_data.map(|char_data| text.as_mut().map(|s| s.push_str(*char_data))); } } } text.map(|text| content.push(Content::Text(text))); Ok(content) } } impl<R: AsyncRead + Unpin> Stream for Reader<R> { type Item = Result<Content>; fn poll_next( self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>, ) -> std::task::Poll<Option<Self::Item>> { let mut e = self; let mut pinned = pin!(e.read_content()); pinned.as_mut().poll(cx).map(|result| Some(result)) } } #[cfg(test)] mod test { use futures::{sink::Buffer, StreamExt}; use tokio::io::AsyncRead; use super::Reader; struct MockAsyncReader<'s>(&'s str); impl<'s> MockAsyncReader<'s> { fn new(data: &'s str) -> Self { Self(data) } } impl<'s> AsyncRead for MockAsyncReader<'s> { fn poll_read( self: std::pin::Pin<&mut Self>, _cx: &mut std::task::Context<'_>, buf: &mut tokio::io::ReadBuf<'_>, ) -> std::task::Poll<std::io::Result<()>> { buf.put_slice(self.0.as_bytes()); std::task::Poll::Ready(Ok(())) } } const TEST_DOC: &'static str = "<xs:schema xmlns:xs='http://www.w3.org/2001/XMLSchema' targetNamespace='http://etherx.jabber.org/streams' xmlns='http://etherx.jabber.org/streams' elementFormDefault='unqualified'> <xs:import namespace='jabber:client'/> <xs:import namespace='jabber:server'/> <xs:import namespace='urn:ietf:params:xml:ns:xmpp-sasl'/> <xs:import namespace='urn:ietf:params:xml:ns:xmpp-streams'/> <xs:import namespace='urn:ietf:params:xml:ns:xmpp-tls'/> <xs:element name='stream'> <xs:complexType> <xs:sequence xmlns:client='jabber:client' xmlns:server='jabber:server'> <xs:element ref='features' minOccurs='0' maxOccurs='1'/> <xs:any namespace='urn:ietf:params:xml:ns:xmpp-tls' minOccurs='0' maxOccurs='1'/> <xs:any namespace='urn:ietf:params:xml:ns:xmpp-sasl' minOccurs='0' maxOccurs='1'/> <xs:any namespace='##other' minOccurs='0' maxOccurs='unbounded' processContents='lax'/> <xs:choice minOccurs='0' maxOccurs='1'> <xs:choice minOccurs='0' maxOccurs='unbounded'> <xs:element ref='client:message'/> <xs:element ref='client:presence'/> <xs:element ref='client:iq'/> </xs:choice> <xs:choice minOccurs='0' maxOccurs='unbounded'> <xs:element ref='server:message'/> <xs:element ref='server:presence'/> <xs:element ref='server:iq'/> </xs:choice> </xs:choice> <xs:element ref='error' minOccurs='0' maxOccurs='1'/> </xs:sequence> <xs:attribute name='from' type='xs:string' use='optional'/> <xs:attribute name='id' type='xs:string' use='optional'/> <xs:attribute name='to' type='xs:string' use='optional'/> <xs:attribute name='version' type='xs:decimal' use='optional'/> <xs:attribute ref='xml:lang' use='optional'/> <xs:anyAttribute namespace='##other' processContents='lax'/> </xs:complexType> </xs:element> <xs:element name='features'> <xs:complexType> <xs:sequence> <xs:any namespace='##other' minOccurs='0' maxOccurs='unbounded' processContents='lax'/> </xs:sequence> </xs:complexType> </xs:element> <xs:element name='error'> <xs:complexType> <xs:sequence xmlns:err='urn:ietf:params:xml:ns:xmpp-streams'> <xs:group ref='err:streamErrorGroup'/> <xs:element ref='err:text' minOccurs='0' maxOccurs='1'/> <xs:any namespace='##other' minOccurs='0' maxOccurs='1' processContents='lax'/> </xs:sequence> </xs:complexType> </xs:element> </xs:schema>asdf"; #[tokio::test] async fn test_element_read() { let mock = MockAsyncReader::new(TEST_DOC); let mut reader = Reader::new(mock); let element = reader.read_element().await.unwrap(); println!("{:#?}", element); } #[tokio::test] async fn test_element_stream() { let mock = MockAsyncReader::new(TEST_DOC); let mut reader = Reader::new(mock); let element = reader.read_start_tag().await.unwrap(); println!("{:#?}", element); loop { let element = reader.next().await.unwrap(); println!("{:#?}", element); } } }