aboutsummaryrefslogblamecommitdiffstats
path: root/src/reader.rs
blob: 24cc09828c5b6b6b211f877d8044cce1ed344169 (plain) (tree)
1
2
3
4
5
6
7
8
                     
                                 
             
          
                                                                  
                   
                 
                    


                                                                        
                   
 
            

                                                                         
                 
                                 
                             

  

                                      
                                                                                
                
                      
             
                       
                                                
                                               
                     
                                                               
                     

 

                                   








                                                          

                          
                                                           
                              

                                                                        
                              

         



                                  

 



                         
                                                           


                                                        
                                                                               


                                                
              

                                                            
                                                             
                                                                                   

















                                                                                     

                                                        


                                               







                                                                                    

                                                                            
                                               



                                                                      
                                              
                                               

     
                                                                      


                                                
              



                                                                                   




                                                                  



                                                        


                                               







                                                                                    
                                                               


                                                
              



                                                                                   




                                                         


                                              



                                                        


                                               







                                                                                    
                                                                    


                                                
              
                                                            


                                                                                   

                                                                                            


                                              



                                                        


                                               







                                                                                    
                                                                    


                                                

                                     
              
                                                            






                                                                                       
                     
                                                            


                                                   







                                                                                                 



                                                                             


                                                                  






































                                                                                                   


                                                           










                                                                                                
                                                      



                                                                                               



                                                                     


                                                          


































                                                                                               


                                                   



                                                                                        
                 





                   

                              
                                                                        

                          


                                                                












                                                                                        
                                                  


                                                
                                                                          
                                                                            


             

                                                                                                       

                      
                                                         

                       
                       
 















                                                                                    

                                                                             








                                                 






                                                                               









                                                                                       
                 
                                                               
             
                     
                                                                                  

                                                                             
                                                     
                  
                    








                                                                          


             
                                         
 
                                                                            

                           
                               
                                                                            
                       
                                     




                              

                                                                        

                                               







                                                                              
                               
                                                                                        
                 

                                                                              
                               
                                                                     

                 

                                                             
                                                   

                         
                                   

                                             
              



                                                                            
                                         
                                             

                              
                                                                            

                
                                                                        


         
                        
                                                                        

                              




                           

                                                    











                                                              
 
                                            
 










                                                                                        
                 









                                                                            
 





                                                                                                       
 
                                                       
 

                                                                   
 




                                                                                    
             

                                                                            
                           


                                                                 
 

                                                                             
 



                                                 
 


















                                                                                        
                                                                                                 










                                                                              















                                                                             
                                   


                                                                                       
                 
                                                               
             
                     
                                                                                  

                                                                             
                                                     
                  
                    








                                                                          

             
 


                                                                                
 
                                                                                   
 

                                         
                                      
         


                               
                                                                            


                       


                        
                                                            
                                  

                                          

                                                                           

                                                       

                                             
                                                                  

                         
                                                                              






























                                                                                             

                                 
                                                      

             
                   


     
            
              



                             
                                    


                      

                                  
                                           
                                     








                                                   



                                                    



                                          
                                                  












































































                                                                        




                                                  




                                                           
use circular::Buffer;
use futures::{FutureExt, Stream};
use nom::Err;
use std::{
    collections::{hash_set, BTreeMap, HashMap, HashSet, VecDeque},
    future::Future,
    path::Prefix,
    pin::{pin, Pin},
    str::{self, FromStr},
};
use tokio::io::{AsyncBufRead, AsyncBufReadExt, AsyncRead, AsyncReadExt};
use tracing::debug;

use crate::{
    declaration::{Declaration, VersionInfo},
    element::{Content, Element, FromElement, Name, NamespaceDeclaration},
    error::Error,
    xml::{self, parsers::Parser},
    Result, XMLNS_NS, XML_NS,
};

static MAX_STANZA_SIZE: usize = 65536;

/// streaming reader that tracks depth and available namespaces at current depth
#[derive(Debug)]
pub struct Reader<R> {
    inner: R,
    pub buffer: Buffer,
    // holds which tags we are in atm over depth
    // to have names reference namespaces could
    depth: Vec<Name>,
    namespace_declarations: Vec<HashSet<NamespaceDeclaration>>,
    root_ended: bool,
}

impl<R> Reader<R> {
    pub fn new(reader: R) -> Self {
        let mut default_declarations = HashSet::new();
        default_declarations.insert(NamespaceDeclaration {
            prefix: Some("xml".to_string()),
            namespace: XML_NS.to_string(),
        });
        default_declarations.insert(NamespaceDeclaration {
            prefix: Some("xmlns".to_string()),
            namespace: XMLNS_NS.to_string(),
        });
        Self {
            inner: reader,
            buffer: Buffer::with_capacity(MAX_STANZA_SIZE),
            depth: Vec::new(),
            // TODO: make sure reserved namespaces are never overwritten
            namespace_declarations: vec![default_declarations],
            root_ended: false,
        }
    }

    pub fn into_inner(self) -> R {
        self.inner
    }
}

impl<R> Reader<R>
where
    R: AsyncRead + Unpin,
{
    pub async fn read_buf<'s>(&mut self) -> Result<usize> {
        Ok(self.inner.read_buf(&mut self.buffer).await?)
    }

    pub async fn read_prolog<'s>(&'s mut self) -> Result<Option<Declaration>> {
        if self.root_ended {
            return Err(Error::RootElementEnded);
        }
        loop {
            let input = str::from_utf8(self.buffer.data())?;
            match xml::Prolog::parse(input) {
                Ok((rest, (decl, _misc, _doctype_decl))) => {
                    let len = self.buffer.available_data() - rest.as_bytes().len();
                    // TODO: return error if there is a doctype decl
                    if let Some(decl) = decl {
                        let declaration = Declaration {
                            version_info: match *decl.version_info {
                                xml::VersionNum::One => VersionInfo::One,
                                xml::VersionNum::OneDotOne => VersionInfo::OneDotOne,
                            },
                            encoding_decl: decl
                                .encoding_decl
                                .map(|encoding_decl| (**encoding_decl).to_string()),
                            sd_decl: decl.sd_decl.map(|sd_decl| *sd_decl),
                        };
                        self.buffer.consume(len);
                        return Ok(Some(declaration));
                    } else {
                        self.buffer.consume(len);
                        return Ok(None);
                    }
                }
                std::result::Result::Err(e) => match e {
                    Err::Incomplete(_) => {
                        self.read_buf().await?;
                    }
                    // TODO: better error
                    Err::Error(e) => return Err(Error::ParseError(e.to_string())),
                    Err::Failure(e) => return Err(Error::ParseError(e.to_string())),
                },
            }
        }
    }

    pub async fn read_start<'s, T: FromElement>(&'s mut self) -> Result<T> {
        let element = self.read_start_tag().await?;
        Ok(FromElement::from_element(element)?)
    }

    pub async fn read<'s, T: FromElement>(&'s mut self) -> Result<T> {
        let element = self.read_element().await?;
        debug!("read element: {:?}", element);
        Ok(FromElement::from_element(element)?)
    }

    pub async fn read_start_tag<'s>(&'s mut self) -> Result<Element> {
        if self.root_ended {
            return Err(Error::RootElementEnded);
        }
        loop {
            let input = str::from_utf8(self.buffer.data())?;
            match xml::STag::parse(input) {
                Ok((rest, e)) => {
                    let len = self.buffer.available_data() - rest.as_bytes().len();
                    let element = Reader::<R>::start_tag_from_xml(
                        &mut self.depth,
                        &mut self.namespace_declarations,
                        e,
                    )?;
                    self.buffer.consume(len);
                    return Ok(element);
                }
                std::result::Result::Err(e) => match e {
                    Err::Incomplete(_) => {
                        self.read_buf().await?;
                    }
                    // TODO: better error
                    Err::Error(e) => return Err(Error::ParseError(e.to_string())),
                    Err::Failure(e) => return Err(Error::ParseError(e.to_string())),
                },
            }
        }
    }

    pub async fn read_end_tag<'s>(&'s mut self) -> Result<()> {
        if self.root_ended {
            return Err(Error::RootElementEnded);
        }
        loop {
            let input = str::from_utf8(self.buffer.data())?;
            match xml::ETag::parse(input) {
                Ok((rest, e)) => {
                    let len = self.buffer.available_data() - rest.as_bytes().len();
                    Reader::<R>::end_tag_from_xml(
                        &mut self.depth,
                        &mut self.namespace_declarations,
                        e,
                    )?;
                    if self.depth.is_empty() {
                        self.root_ended = true
                    }
                    self.buffer.consume(len);
                    return Ok(());
                }
                std::result::Result::Err(e) => match e {
                    Err::Incomplete(_) => {
                        self.read_buf().await?;
                    }
                    // TODO: better error
                    Err::Error(e) => return Err(Error::ParseError(e.to_string())),
                    Err::Failure(e) => return Err(Error::ParseError(e.to_string())),
                },
            }
        }
    }

    pub async fn read_element<'s>(&'s mut self) -> Result<Element> {
        if self.root_ended {
            return Err(Error::RootElementEnded);
        }
        loop {
            let input = str::from_utf8(self.buffer.data())?;
            match xml::Element::parse(input) {
                Ok((rest, e)) => {
                    let len = self.buffer.available_data() - rest.as_bytes().len();
                    let element =
                        Reader::<R>::element_from_xml(&mut self.namespace_declarations, e)?;
                    if self.depth.is_empty() {
                        self.root_ended = true
                    }
                    self.buffer.consume(len);
                    return Ok(element);
                }
                std::result::Result::Err(e) => match e {
                    Err::Incomplete(_) => {
                        self.read_buf().await?;
                    }
                    // TODO: better error
                    Err::Error(e) => return Err(Error::ParseError(e.to_string())),
                    Err::Failure(e) => return Err(Error::ParseError(e.to_string())),
                },
            }
        }
    }

    pub async fn read_content<'s>(&'s mut self) -> Result<Content> {
        if self.root_ended {
            return Err(Error::RootElementEnded);
        }
        let mut last_char = false;
        let mut text = String::new();
        loop {
            let input = str::from_utf8(self.buffer.data())?;
            if last_char == false {
                match xml::CharData::parse(input) {
                    Ok((rest, char_data)) => {
                        let len = self.buffer.available_data() - rest.as_bytes().len();
                        text.push_str(*char_data);
                        self.buffer.consume(len);
                        last_char = true;
                    }
                    std::result::Result::Err(e) => match e {
                        Err::Incomplete(_) => {
                            self.read_buf().await?;
                        }
                        _ => match xml::ContentItem::parse(input) {
                            Ok((rest, content_item)) => match content_item {
                                xml::ContentItem::Element(element) => {
                                    if !text.is_empty() {
                                        return Ok(Content::Text(text));
                                    } else {
                                        let len =
                                            self.buffer.available_data() - rest.as_bytes().len();
                                        let element = Self::element_from_xml(
                                            &mut self.namespace_declarations,
                                            element,
                                        )?;
                                        if self.depth.is_empty() {
                                            self.root_ended = true
                                        }
                                        self.buffer.consume(len);
                                        return Ok(Content::Element(element));
                                    }
                                }
                                xml::ContentItem::Reference(reference) => {
                                    let len = self.buffer.available_data() - rest.as_bytes().len();
                                    text.push(reference.process()?);
                                    self.buffer.consume(len);
                                    continue;
                                }
                                xml::ContentItem::CDSect(cd_sect) => {
                                    let len = self.buffer.available_data() - rest.as_bytes().len();
                                    text.push_str(**cd_sect);
                                    self.buffer.consume(len);
                                    continue;
                                }
                                xml::ContentItem::PI(_pi) => {
                                    if !text.is_empty() {
                                        return Ok(Content::Text(text));
                                    } else {
                                        let len =
                                            self.buffer.available_data() - rest.as_bytes().len();
                                        self.buffer.consume(len);
                                        return Ok(Content::PI);
                                    }
                                }
                                xml::ContentItem::Comment(comment) => {
                                    if !text.is_empty() {
                                        return Ok(Content::Text(text));
                                    } else {
                                        let len =
                                            self.buffer.available_data() - rest.as_bytes().len();
                                        let comment = comment.to_string();
                                        self.buffer.consume(len);
                                        return Ok(Content::Comment(comment));
                                    }
                                }
                            },
                            std::result::Result::Err(e) => match e {
                                Err::Incomplete(_) => {
                                    self.read_buf().await?;
                                }
                                // TODO: better error
                                Err::Error(e) => return Err(Error::ParseError(e.to_string())),
                                Err::Failure(e) => return Err(Error::ParseError(e.to_string())),
                            },
                        },
                    },
                }
            } else {
                match xml::ContentItem::parse(input) {
                    Ok((rest, content_item)) => match content_item {
                        xml::ContentItem::Element(element) => {
                            // text can still be empty
                            if !text.is_empty() {
                                return Ok(Content::Text(text));
                            } else {
                                let len = self.buffer.available_data() - rest.as_bytes().len();
                                let element = Self::element_from_xml(
                                    &mut self.namespace_declarations,
                                    element,
                                )?;
                                if self.depth.is_empty() {
                                    self.root_ended = true
                                }
                                self.buffer.consume(len);
                                return Ok(Content::Element(element));
                            }
                        }
                        xml::ContentItem::Reference(reference) => {
                            let len = self.buffer.available_data() - rest.as_bytes().len();
                            text.push(reference.process()?);
                            self.buffer.consume(len);
                            last_char = false;
                            continue;
                        }
                        xml::ContentItem::CDSect(cd_sect) => {
                            let len = self.buffer.available_data() - rest.as_bytes().len();
                            text.push_str(**cd_sect);
                            self.buffer.consume(len);
                            last_char = false;
                            continue;
                        }
                        xml::ContentItem::PI(_pi) => {
                            if !text.is_empty() {
                                return Ok(Content::Text(text));
                            } else {
                                let len = self.buffer.available_data() - rest.as_bytes().len();
                                self.buffer.consume(len);
                                return Ok(Content::PI);
                            }
                        }
                        xml::ContentItem::Comment(comment) => {
                            let len = self.buffer.available_data() - rest.as_bytes().len();
                            let comment = comment.to_string();
                            self.buffer.consume(len);
                            return Ok(Content::Comment(comment));
                        }
                    },
                    std::result::Result::Err(e) => match e {
                        Err::Incomplete(_) => {
                            self.read_buf().await?;
                        }
                        // TODO: better error
                        Err::Error(e) => return Err(Error::ParseError(e.to_string())),
                        Err::Failure(e) => return Err(Error::ParseError(e.to_string())),
                    },
                }
            }
        }
    }
}

impl<R> Reader<R> {
    fn start_tag_from_xml(
        depth: &mut Vec<Name>,
        namespace_declarations: &mut Vec<HashSet<NamespaceDeclaration>>,
        s_tag: xml::STag,
    ) -> Result<Element> {
        // namespace declarations on element

        let mut element_namespace_declarations = HashSet::new();
        for (prefix, namespace) in s_tag.attributes.iter().filter_map(|attribute| {
            if let xml::Attribute::NamespaceDeclaration { ns_name, value } = attribute {
                Some((ns_name, value))
            } else {
                None
            }
        }) {
            let prefix = match prefix {
                xml::NSAttName::PrefixedAttName(prefixed_att_name) => {
                    Some(prefixed_att_name.to_string())
                }
                xml::NSAttName::DefaultAttName => None,
            };
            let namespace = NamespaceDeclaration {
                prefix,
                namespace: namespace.process()?,
            };
            if !element_namespace_declarations.insert(namespace.clone()) {
                return Err(Error::DuplicateNameSpaceDeclaration(namespace));
            }
        }

        // all namespaces available in the element scope (from both parent elements and element itself)
        let namespace_declarations_stack: Vec<&NamespaceDeclaration> = namespace_declarations
            .iter()
            .flatten()
            .chain(element_namespace_declarations.iter())
            .collect();

        // element name

        let element_namespace_declaration;
        let element_local_name = s_tag.name.local_part().to_string();

        match s_tag.name.prefix() {
            Some(prefix) => {
                element_namespace_declaration = namespace_declarations_stack
                    .iter()
                    .rfind(|namespace| namespace.prefix.as_deref() == Some(prefix));
            }
            None => {
                element_namespace_declaration = namespace_declarations_stack
                    .iter()
                    .rfind(|namespace| namespace.prefix == None);
            }
        }

        let element_default_namespace =
            element_namespace_declaration.map(|decl| decl.namespace.clone());

        let element_name = Name {
            namespace: element_default_namespace,
            local_name: element_local_name,
        };

        // attributes

        let mut attributes = HashMap::new();
        for (q_name, value) in s_tag.attributes.iter().filter_map(|attribute| {
            if let xml::Attribute::Attribute { name, value } = attribute {
                Some((name, value))
            } else {
                None
            }
        }) {
            let attribute_namespace_declaration;
            let attribute_local_name = q_name.local_part().to_string();
            match q_name.prefix() {
                Some(prefix) => {
                    attribute_namespace_declaration =
                        namespace_declarations_stack
                            .iter()
                            .rfind(|namespace_declaration| {
                                namespace_declaration.prefix.as_deref() == Some(prefix)
                            });
                }
                None => attribute_namespace_declaration = None,
            }
            let name;
            if let Some(namespace_declaration) = attribute_namespace_declaration {
                name = Name {
                    namespace: Some(namespace_declaration.namespace.clone()),
                    local_name: attribute_local_name,
                };
            } else {
                name = Name {
                    namespace: None,
                    local_name: attribute_local_name,
                };
            }
            let value = value.process()?;
            // check for duplicate attribute
            if let Some(_value) = attributes.insert(name, value) {
                return Err(Error::DuplicateAttribute(q_name.to_string()));
            }
        }

        depth.push(element_name.clone());

        namespace_declarations.push(element_namespace_declarations.clone());

        return Ok(Element {
            name: element_name,
            namespace_declaration_overrides: element_namespace_declarations,
            attributes,
            content: VecDeque::new(),
        });
    }

    fn end_tag_from_xml(
        depth: &mut Vec<Name>,
        namespace_declarations: &mut Vec<HashSet<NamespaceDeclaration>>,
        xml_e_tag: xml::ETag,
    ) -> Result<()> {
        if let Some(s_tag_name) = depth.pop() {
            let e_tag_namespace_declaration;
            let e_tag_local_name = xml_e_tag.name.local_part().to_string();
            let namespace_declarations_stack: Vec<_> =
                namespace_declarations.iter().flatten().collect();

            match xml_e_tag.name.prefix() {
                Some(prefix) => {
                    e_tag_namespace_declaration = namespace_declarations_stack
                        .iter()
                        .rfind(|namespace| namespace.prefix.as_deref() == Some(prefix));
                }
                None => {
                    e_tag_namespace_declaration = namespace_declarations_stack
                        .iter()
                        .rfind(|namespace| namespace.prefix == None);
                }
            }

            let e_tag_namespace = e_tag_namespace_declaration
                .map(|decl| decl.namespace.clone())
                .clone();

            let e_tag_name = Name {
                namespace: e_tag_namespace,
                local_name: e_tag_local_name,
            };

            if e_tag_name != s_tag_name {
                return Err(Error::MismatchedEndTag(s_tag_name, e_tag_name));
            }
            if s_tag_name == e_tag_name {
                namespace_declarations.pop();
                return Ok(());
            } else {
                return Err(Error::MismatchedEndTag(s_tag_name, e_tag_name));
            }
        } else {
            return Err(Error::NotInElement(xml_e_tag.name.to_string()));
        }
    }

    fn element_from_xml(
        namespace_declarations: &mut Vec<HashSet<NamespaceDeclaration>>,
        element: xml::Element,
    ) -> Result<Element> {
        let xml_name;
        let xml_attributes;
        let xml_content;
        let xml_e_name;

        match element {
            xml::Element::Empty(empty_elem_tag) => {
                xml_name = empty_elem_tag.name;
                xml_attributes = empty_elem_tag.attributes;
                xml_content = None;
                xml_e_name = None;
            }
            xml::Element::NotEmpty(s_tag, content, e_tag) => {
                xml_name = s_tag.name;
                xml_attributes = s_tag.attributes;
                xml_content = Some(content);
                xml_e_name = Some(e_tag.name);
            }
        }

        // namespace declarations on element

        let mut element_namespace_declarations = HashSet::new();
        for (prefix, namespace) in xml_attributes.iter().filter_map(|attribute| {
            if let xml::Attribute::NamespaceDeclaration { ns_name, value } = attribute {
                Some((ns_name, value))
            } else {
                None
            }
        }) {
            let prefix = match prefix {
                xml::NSAttName::PrefixedAttName(prefixed_att_name) => {
                    Some(prefixed_att_name.to_string())
                }
                xml::NSAttName::DefaultAttName => None,
            };
            let namespace = NamespaceDeclaration {
                prefix,
                namespace: namespace.process()?,
            };
            if !element_namespace_declarations.insert(namespace.clone()) {
                return Err(Error::DuplicateNameSpaceDeclaration(namespace));
            }
        }

        // all namespaces available in the element scope (from both parent elements and element itself)
        let namespace_declarations_stack: Vec<&NamespaceDeclaration> = namespace_declarations
            .iter()
            .flatten()
            .chain(element_namespace_declarations.iter())
            .collect();

        // element name and default attribute namespace

        let element_namespace_declaration;
        let element_local_name = xml_name.local_part().to_string();

        match xml_name.prefix() {
            Some(prefix) => {
                element_namespace_declaration = namespace_declarations_stack
                    .iter()
                    .rfind(|namespace| namespace.prefix.as_deref() == Some(prefix));
            }
            None => {
                element_namespace_declaration = namespace_declarations_stack
                    .iter()
                    .rfind(|namespace| namespace.prefix == None);
            }
        }

        let element_default_namespace =
            element_namespace_declaration.map(|decl| decl.namespace.clone());

        let element_name = Name {
            namespace: element_default_namespace,
            local_name: element_local_name,
        };

        // end tag name match check

        if let Some(xml_e_name) = xml_e_name {
            let e_tag_namespace_declaration;
            let e_tag_local_name = xml_e_name.local_part().to_string();

            match xml_e_name.prefix() {
                Some(prefix) => {
                    e_tag_namespace_declaration = namespace_declarations_stack
                        .iter()
                        .rfind(|namespace| namespace.prefix.as_deref() == Some(prefix));
                }
                None => {
                    e_tag_namespace_declaration = namespace_declarations_stack
                        .iter()
                        .rfind(|namespace| namespace.prefix == None);
                }
            }

            let e_tag_namespace = e_tag_namespace_declaration.map(|decl| decl.namespace.clone());

            let e_tag_name = Name {
                namespace: e_tag_namespace,
                local_name: e_tag_local_name,
            };

            if e_tag_name != element_name {
                return Err(Error::MismatchedEndTag(element_name, e_tag_name));
            }
        }

        // attributes

        let mut attributes = HashMap::new();
        for (q_name, value) in xml_attributes.iter().filter_map(|attribute| {
            if let xml::Attribute::Attribute { name, value } = attribute {
                Some((name, value))
            } else {
                None
            }
        }) {
            let attribute_namespace_declaration;
            let attribute_local_name = q_name.local_part().to_string();
            match q_name.prefix() {
                Some(prefix) => {
                    attribute_namespace_declaration =
                        namespace_declarations_stack
                            .iter()
                            .rfind(|namespace_declaration| {
                                namespace_declaration.prefix.as_deref() == Some(prefix)
                            });
                }
                None => attribute_namespace_declaration = None,
            }
            let name;
            if let Some(namespace_declaration) = attribute_namespace_declaration {
                name = Name {
                    namespace: Some(namespace_declaration.namespace.clone()),
                    local_name: attribute_local_name,
                };
            } else {
                name = Name {
                    namespace: None,
                    local_name: attribute_local_name,
                };
            }
            let value = value.process()?;
            // check for duplicate attribute
            if let Some(_value) = attributes.insert(name, value) {
                return Err(Error::DuplicateAttribute(q_name.to_string()));
            }
        }

        let content;
        if let Some(xml_content) = xml_content {
            namespace_declarations.push(element_namespace_declarations.clone());

            content = Self::content_from_xml(namespace_declarations, xml_content)?;

            namespace_declarations.pop();
        } else {
            content = VecDeque::new();
        }

        return Ok(Element {
            name: element_name,
            namespace_declaration_overrides: element_namespace_declarations,
            attributes,
            content,
        });
    }

    fn content_from_xml(
        namespaces: &mut Vec<HashSet<NamespaceDeclaration>>,
        xml_content: xml::Content,
    ) -> Result<VecDeque<Content>> {
        let mut content = VecDeque::new();
        let mut text = xml_content.char_data.map(|str| String::from(*str));
        for (content_item, char_data) in xml_content.content {
            match content_item {
                xml::ContentItem::Element(element) => {
                    text.map(|text| {
                        if !text.is_empty() {
                            content.push_back(Content::Text(text))
                        }
                    });
                    content.push_back(Content::Element(Self::element_from_xml(
                        namespaces, element,
                    )?));
                    text = char_data.map(|str| String::from(*str));
                }
                xml::ContentItem::Reference(reference) => {
                    let data = reference.process()?;
                    if let Some(text) = &mut text {
                        text.push(data)
                    } else {
                        text = Some(String::from(data))
                    }
                    char_data.map(|char_data| text.as_mut().map(|s| s.push_str(*char_data)));
                }
                xml::ContentItem::CDSect(cd_sect) => {
                    if let Some(text) = &mut text {
                        text.push_str(**cd_sect)
                    } else {
                        text = Some(String::from(**cd_sect))
                    }
                    char_data.map(|char_data| text.as_mut().map(|s| s.push_str(*char_data)));
                }
                // TODO: is this important?
                xml::ContentItem::PI(pi) => {
                    char_data.map(|char_data| text.as_mut().map(|s| s.push_str(*char_data)));
                }
                // TODO: comments?
                xml::ContentItem::Comment(comment) => {
                    char_data.map(|char_data| text.as_mut().map(|s| s.push_str(*char_data)));
                }
            }
        }
        text.map(|text| {
            if !text.is_empty() {
                content.push_back(Content::Text(text))
            }
        });
        Ok(content)
    }
}

#[cfg(test)]
pub mod test {
    use tokio::io::AsyncRead;

    use super::Reader;

    pub struct MockAsyncReader<'s> {
        put: bool,
        data: &'s str,
    }

    impl<'s> MockAsyncReader<'s> {
        pub fn new(data: &'s str) -> Self {
            Self { put: false, data }
        }
    }

    impl<'s> AsyncRead for MockAsyncReader<'s> {
        fn poll_read(
            self: std::pin::Pin<&mut Self>,
            _cx: &mut std::task::Context<'_>,
            buf: &mut tokio::io::ReadBuf<'_>,
        ) -> std::task::Poll<std::io::Result<()>> {
            if !self.put {
                buf.put_slice(self.data.as_bytes());
                self.get_mut().put = true
            };
            std::task::Poll::Ready(Ok(()))
        }
    }

    pub const TEST_DOC: &'static str = "<xs:schema
       xmlns:xs='http://www.w3.org/2001/XMLSchema'
       targetNamespace='http://etherx.jabber.org/streams'
       xmlns='http://etherx.jabber.org/streams'
       elementFormDefault='unqualified'>

     <xs:import namespace='jabber:client'/>
     <xs:import namespace='jabber:server'/>
     <xs:import namespace='urn:ietf:params:xml:ns:xmpp-sasl'/>
     <xs:import namespace='urn:ietf:params:xml:ns:xmpp-streams'/>
     <xs:import namespace='urn:ietf:params:xml:ns:xmpp-tls'/>

     <xs:element name='stream'>
       <xs:complexType>
         <xs:sequence xmlns:client='jabber:client'
                      xmlns:server='jabber:server'>
           <xs:element ref='features'
                       minOccurs='0'
                       maxOccurs='1'/>
           <xs:any namespace='urn:ietf:params:xml:ns:xmpp-tls'
                   minOccurs='0'
                   maxOccurs='1'/>
           <xs:any namespace='urn:ietf:params:xml:ns:xmpp-sasl'
                   minOccurs='0'
                   maxOccurs='1'/>
           <xs:any namespace='##other'
                   minOccurs='0'
                   maxOccurs='unbounded'
                   processContents='lax'/>
           <xs:choice minOccurs='0' maxOccurs='1'>
             <xs:choice minOccurs='0' maxOccurs='unbounded'>
               <xs:element ref='client:message'/>
               <xs:element ref='client:presence'/>
               <xs:element ref='client:iq'/>
             </xs:choice>
             <xs:choice minOccurs='0' maxOccurs='unbounded'>
               <xs:element ref='server:message'/>
               <xs:element ref='server:presence'/>
               <xs:element ref='server:iq'/>
             </xs:choice>
           </xs:choice>
           <xs:element ref='error' minOccurs='0' maxOccurs='1'/>
         </xs:sequence>
         <xs:attribute name='from' type='xs:string' use='optional'/>
         <xs:attribute name='id' type='xs:string' use='optional'/>
         <xs:attribute name='to' type='xs:string' use='optional'/>
         <xs:attribute name='version' type='xs:decimal' use='optional'/>
         <xs:attribute ref='xml:lang' use='optional'/>
         <xs:anyAttribute namespace='##other' processContents='lax'/>
       </xs:complexType>
     </xs:element>

     <xs:element name='features'>
       <xs:complexType>
         <xs:sequence>
           <xs:any namespace='##other'
                   minOccurs='0'
                   maxOccurs='unbounded'
                   processContents='lax'/>
         </xs:sequence>
       </xs:complexType>
     </xs:element>

     <xs:element name='error'>
       <xs:complexType>
         <xs:sequence  xmlns:err='urn:ietf:params:xml:ns:xmpp-streams'>
           <xs:group   ref='err:streamErrorGroup'/>
           <xs:element ref='err:text'
                       minOccurs='0'
                       maxOccurs='1'/>
           <xs:any     namespace='##other'
                       minOccurs='0'
                       maxOccurs='1'
                       processContents='lax'/>
         </xs:sequence>
       </xs:complexType>
     </xs:element>

   </xs:schema>asdf";

    #[tokio::test]
    async fn test_element_read() {
        let mock = MockAsyncReader::new(TEST_DOC);
        let mut reader = Reader::new(mock);
        let element = reader.read_element().await.unwrap();
        println!("{:#?}", element);
    }
}