use circular::Buffer; use futures::Stream; use nom::Err; use std::{ collections::{BTreeMap, HashMap, HashSet}, path::Prefix, str::{self, FromStr}, }; use tokio::io::{AsyncBufRead, AsyncBufReadExt, AsyncRead, AsyncReadExt}; static MAX_STANZA_SIZE: usize = 65536; use crate::{ element::{Content, Element, Name, Namespace}, error::Error, xml::{self, parsers::Parser}, Result, }; /// streaming reader that tracks depth and available namespaces at current depth pub struct Reader { inner: R, buffer: Buffer, // holds which tags we are in atm over depth // to have names reference namespaces could depth: Vec, namespaces: Vec>, } impl Reader { pub fn new(reader: R) -> Self { Self { inner: reader, buffer: Buffer::with_capacity(MAX_STANZA_SIZE), depth: Vec::new(), namespaces: Vec::new(), } } } impl Reader where R: AsyncRead + Unpin, { async fn read_buf(&mut self) -> Result { Ok(self.inner.read_buf(&mut self.buffer).await?) } async fn read_element<'s>(&'s mut self) -> Result { self.read_buf().await?; let mut input = str::from_utf8(self.buffer.data())?; loop { match xml::Element::parse(input) { Ok((rest, e)) => { let len = self.buffer.available_data() - rest.as_bytes().len(); let element = Reader::::element_from_xml(&mut self.namespaces, e)?; self.buffer.consume(len); return Ok(element); } std::result::Result::Err(e) => match e { Err::Incomplete(_) => { self.read_buf().await?; input = str::from_utf8(self.buffer.data())?; } // TODO: better error Err::Error(e) => return Err(Error::ParseError(e.to_string())), Err::Failure(e) => return Err(Error::ParseError(e.to_string())), }, } } } } impl Reader { fn element_from_xml( namespaces: &mut Vec>, element: xml::Element, ) -> Result { match element { xml::Element::Empty(empty_elem_tag) => { let mut namespace_declarations = HashSet::new(); for (prefix, namespace) in empty_elem_tag.attributes.iter().filter_map(|attribute| { if let xml::Attribute::NamespaceDeclaration { ns_name, value } = attribute { Some((ns_name, value)) } else { None } }) { let prefix = match prefix { xml::NSAttName::PrefixedAttName(prefixed_att_name) => { Some(prefixed_att_name.to_string()) } xml::NSAttName::DefaultAttName => None, }; let namespace = Namespace { prefix, namespace: namespace.process()?, }; if !namespace_declarations.insert(namespace.clone()) { return Err(Error::DuplicateNameSpace(namespace)); } } // all namespaces available to the element (from both parent elements and element itself) let namespace_stack: Vec<&Namespace> = namespaces .iter() .flatten() .chain(namespace_declarations.iter()) .collect(); let mut attributes = HashMap::new(); for (q_name, value) in empty_elem_tag.attributes.iter().filter_map(|attribute| { if let xml::Attribute::Attribute { name, value } = attribute { Some((name, value)) } else { None } }) { let namespace; let attribute_name; match q_name { xml::QName::PrefixedName(prefixed_name) => { namespace = namespace_stack.iter().rfind(|namespace| { namespace.prefix.as_deref() == Some(**prefixed_name.prefix) }); attribute_name = prefixed_name.local_part.to_string(); } xml::QName::UnprefixedName(unprefixed_name) => { namespace = namespace_stack .iter() .rfind(|namespace| namespace.prefix == None); attribute_name = unprefixed_name.to_string(); } } if let Some(namespace) = namespace { let namespace = (*namespace).clone(); let name = Name { namespace, name: attribute_name, }; let value = value.process()?; if let Some(_value) = attributes.insert(name, value) { return Err(Error::DuplicateAttribute(q_name.to_string())); } } else { return Err(Error::UnqualifiedNamespace(q_name.to_string())); } } let name; let namespace; match &empty_elem_tag.name { xml::QName::PrefixedName(prefixed_name) => { namespace = namespace_stack.iter().rfind(|namespace| { namespace.prefix.as_deref() == Some(**prefixed_name.prefix) }); name = prefixed_name.local_part.to_string(); } xml::QName::UnprefixedName(unprefixed_name) => { namespace = namespace_stack .iter() .rfind(|namespace| namespace.prefix == None); name = unprefixed_name.to_string(); } } let namespace = (*namespace .ok_or_else(|| Error::UnqualifiedNamespace(empty_elem_tag.name.to_string()))?) .clone(); let name = Name { namespace, name }; return Ok(Element { name, namespace_decl: namespace_declarations, attributes, content: Vec::new(), }); } xml::Element::NotEmpty(s_tag, content, e_tag) => { if s_tag.name != e_tag.name { return Err(Error::MismatchedEndTag( s_tag.name.to_string(), e_tag.name.to_string(), )); } let mut namespace_declarations = HashSet::new(); for (prefix, namespace) in s_tag.attributes.iter().filter_map(|attribute| { if let xml::Attribute::NamespaceDeclaration { ns_name, value } = attribute { Some((ns_name, value)) } else { None } }) { let prefix = match prefix { xml::NSAttName::PrefixedAttName(prefixed_att_name) => { Some(prefixed_att_name.to_string()) } xml::NSAttName::DefaultAttName => None, }; let namespace = Namespace { prefix, namespace: namespace.process()?, }; if !namespace_declarations.insert(namespace.clone()) { return Err(Error::DuplicateNameSpace(namespace)); } } // all namespaces available to the element (from both parent elements and element itself) let namespace_stack: Vec<&Namespace> = namespaces .iter() .flatten() .chain(namespace_declarations.iter()) .collect(); let mut attributes = HashMap::new(); for (q_name, value) in s_tag.attributes.iter().filter_map(|attribute| { if let xml::Attribute::Attribute { name, value } = attribute { Some((name, value)) } else { None } }) { let namespace; let attribute_name; match q_name { xml::QName::PrefixedName(prefixed_name) => { namespace = namespace_stack.iter().rfind(|namespace| { namespace.prefix.as_deref() == Some(**prefixed_name.prefix) }); attribute_name = prefixed_name.local_part.to_string(); } xml::QName::UnprefixedName(unprefixed_name) => { namespace = namespace_stack .iter() .rfind(|namespace| namespace.prefix == None); attribute_name = unprefixed_name.to_string(); } } if let Some(namespace) = namespace { let namespace = (*namespace).clone(); let name = Name { namespace, name: attribute_name, }; let value = value.process()?; if let Some(_value) = attributes.insert(name, value) { return Err(Error::DuplicateAttribute(q_name.to_string())); } } else { return Err(Error::UnqualifiedNamespace(q_name.to_string())); } } let name; let namespace; match &s_tag.name { xml::QName::PrefixedName(prefixed_name) => { namespace = namespace_stack.iter().rfind(|namespace| { namespace.prefix.as_deref() == Some(**prefixed_name.prefix) }); name = prefixed_name.local_part.to_string(); } xml::QName::UnprefixedName(unprefixed_name) => { namespace = namespace_stack .iter() .rfind(|namespace| namespace.prefix == None); name = unprefixed_name.to_string(); } } let namespace = (*namespace .ok_or_else(|| Error::UnqualifiedNamespace(s_tag.name.to_string()))?) .clone(); let name = Name { namespace, name }; namespaces.push(namespace_declarations.clone()); let content = Self::content_from_xml(namespaces, content)?; namespaces.pop(); return Ok(Element { name, namespace_decl: namespace_declarations, attributes, content, }); } } } fn content_from_xml( namespaces: &mut Vec>, element: xml::Content, ) -> Result> { let mut content = Vec::new(); let mut text = element.char_data.map(|str| String::from(*str)); for (content_item, char_data) in element.content { match content_item { xml::ContentItem::Element(element) => { text.map(|text| content.push(Content::Text(text))); content.push(Content::Element(Self::element_from_xml( namespaces, element, )?)); text = char_data.map(|str| String::from(*str)); } xml::ContentItem::Reference(reference) => { let data = reference.process()?; if let Some(text) = &mut text { text.push(data) } else { text = Some(String::from(data)) } char_data.map(|char_data| text.as_mut().map(|s| s.push_str(*char_data))); } xml::ContentItem::CDSect(cd_sect) => { if let Some(text) = &mut text { text.push_str(**cd_sect) } else { text = Some(String::from(**cd_sect)) } char_data.map(|char_data| text.as_mut().map(|s| s.push_str(*char_data))); } // TODO: is this important? xml::ContentItem::PI(pi) => { char_data.map(|char_data| text.as_mut().map(|s| s.push_str(*char_data))); } // TODO: comments? xml::ContentItem::Comment(comment) => { char_data.map(|char_data| text.as_mut().map(|s| s.push_str(*char_data))); } } } text.map(|text| content.push(Content::Text(text))); todo!() } } // impl Reader // where // R: AsyncBufReadExt + Unpin, // { // /// could resursively read and include namespace tree with values to be shadowed within new local context // async fn read_recursive(&mut self, namespaces: BTreeMap, String>) -> Result { // let element; // let len; // loop { // let buf = self.inner.fill_buf().await?; // let input = str::from_utf8(buf)?; // match crate::xml::element(input) { // Ok((rest, e)) => { // element = e; // len = buf.len() - rest.len(); // break; // } // Err(e) => match e { // Err::Incomplete(_) => (), // e => return Err::(Error::ParseError(input.to_owned())), // }, // } // } // let final; // match element { // crate::xml::Element::Empty(e) => { // let final = Element { // } // }, // crate::xml::Element::NotEmpty(_, _, _) => todo!(), // } // self.inner.consume(len); // todo!() // } // /// reads entire next prolog, element, or misc // pub async fn read>(&mut self) -> Result { // let element; // let len; // loop { // let buf = self.inner.fill_buf().await?; // let input = str::from_utf8(buf)?; // match crate::xml::element(input) { // Ok((rest, e)) => { // element = e; // len = buf.len() - rest.len(); // break; // } // Err(e) => match e { // Err::Incomplete(_) => (), // e => return Err::(Error::ParseError(input.to_owned())), // }, // } // } // self.inner.consume(len); // // Ok(element) // todo!() // } // pub async fn read_start(&self) -> Result, Error> { // todo!() // } // pub async fn read_end(&self) -> Result<(), Error> { // todo!() // } // } // impl Stream for Reader { // type Item = impl From; // async fn poll_next( // self: std::pin::Pin<&mut Self>, // cx: &mut std::task::Context<'_>, // ) -> std::task::Poll> { // todo!() // } // }