aboutsummaryrefslogblamecommitdiffstats
path: root/src/reader.rs
blob: dc16d318d5952a35a8598633b6f1b8cd8f12c115 (plain) (tree)
1
2
3
4
5
6
7
8
                     
                                 
             

                                              
                   
                 
                    




                                                                        

            
                                                 
                 

                                 



                                                                                
             
                   
                                                
                                               
                     
                                        

 



                                   
                                                           





                                   



                         
                                                       


                                                        






























































                                                                                                   
                                                                
              

                                                            







                                                                                          






















                                                                                    
                     






                                                                                               









                                                                                    










































































































































                                                                                                    










































































































































































































































































                                                                                                         
                   


     











                                                           


            
                                           






















                                                   
                                              












































































                                                                        




                                                  



                                                           











                                                             
 
use circular::Buffer;
use futures::{FutureExt, Stream};
use nom::Err;
use std::{
    collections::{BTreeMap, HashMap, HashSet},
    future::Future,
    path::Prefix,
    pin::{pin, Pin},
    str::{self, FromStr},
};
use tokio::io::{AsyncBufRead, AsyncBufReadExt, AsyncRead, AsyncReadExt};

static MAX_STANZA_SIZE: usize = 65536;

use crate::{
    element::{Content, Element, Name, Namespace},
    error::Error,
    xml::{self, parsers::Parser},
    Result,
};

/// streaming reader that tracks depth and available namespaces at current depth
pub struct Reader<R> {
    inner: R,
    buffer: Buffer,
    // holds which tags we are in atm over depth
    // to have names reference namespaces could
    depth: Vec<Name>,
    namespaces: Vec<HashSet<Namespace>>,
}

impl<R> Reader<R> {
    pub fn new(reader: R) -> Self {
        Self {
            inner: reader,
            buffer: Buffer::with_capacity(MAX_STANZA_SIZE),
            depth: Vec::new(),
            namespaces: Vec::new(),
        }
    }
}

impl<R> Reader<R>
where
    R: AsyncRead + Unpin,
{
    async fn read_buf<'s>(&mut self) -> Result<usize> {
        Ok(self.inner.read_buf(&mut self.buffer).await?)
    }

    async fn read_prolog<'s>(&'s mut self) -> Result<()> {
        loop {
            self.read_buf().await?;
            let input = str::from_utf8(self.buffer.data())?;
            match xml::Prolog::parse(input) {
                Ok((rest, _prolog)) => {
                    let len = self.buffer.available_data() - rest.as_bytes().len();
                    self.buffer.consume(len);
                    return Ok(());
                }
                std::result::Result::Err(e) => match e {
                    Err::Incomplete(_) => {}
                    // TODO: better error
                    Err::Error(e) => return Err(Error::ParseError(e.to_string())),
                    Err::Failure(e) => return Err(Error::ParseError(e.to_string())),
                },
            }
        }
    }

    async fn read_start_tag<'s>(&'s mut self) -> Result<Element> {
        loop {
            self.read_buf().await?;
            let input = str::from_utf8(self.buffer.data())?;
            match xml::STag::parse(input) {
                Ok((rest, e)) => {
                    let len = self.buffer.available_data() - rest.as_bytes().len();
                    let element =
                        Reader::<R>::start_tag_from_xml(&mut self.depth, &mut self.namespaces, e)?;
                    self.buffer.consume(len);
                    return Ok(element);
                }
                std::result::Result::Err(e) => match e {
                    Err::Incomplete(_) => {}
                    // TODO: better error
                    Err::Error(e) => return Err(Error::ParseError(e.to_string())),
                    Err::Failure(e) => return Err(Error::ParseError(e.to_string())),
                },
            }
        }
    }

    async fn read_end_tag<'s>(&'s mut self) -> Result<()> {
        loop {
            self.read_buf().await?;
            let input = str::from_utf8(self.buffer.data())?;
            match xml::ETag::parse(input) {
                Ok((rest, e)) => {
                    let len = self.buffer.available_data() - rest.as_bytes().len();
                    Reader::<R>::end_tag_from_xml(&mut self.depth, &mut self.namespaces, e)?;
                    self.buffer.consume(len);
                    return Ok(());
                }
                std::result::Result::Err(e) => match e {
                    Err::Incomplete(_) => {}
                    // TODO: better error
                    Err::Error(e) => return Err(Error::ParseError(e.to_string())),
                    Err::Failure(e) => return Err(Error::ParseError(e.to_string())),
                },
            }
        }
    }

    async fn read_element<'s>(&'s mut self) -> Result<Element> {
        loop {
            self.read_buf().await?;
            let input = str::from_utf8(self.buffer.data())?;
            match xml::Element::parse(input) {
                Ok((rest, e)) => {
                    let len = self.buffer.available_data() - rest.as_bytes().len();
                    let element = Reader::<R>::element_from_xml(&mut self.namespaces, e)?;
                    self.buffer.consume(len);
                    return Ok(element);
                }
                std::result::Result::Err(e) => match e {
                    Err::Incomplete(_) => {}
                    // TODO: better error
                    Err::Error(e) => return Err(Error::ParseError(e.to_string())),
                    Err::Failure(e) => return Err(Error::ParseError(e.to_string())),
                },
            }
        }
    }

    async fn read_content<'s>(&'s mut self) -> Result<Content> {
        loop {
            self.read_buf().await?;
            let input = str::from_utf8(self.buffer.data())?;

            match xml::ContentItem::parse(input) {
                Ok((rest, c)) => {
                    match c {
                        xml::ContentItem::CharData(char_data) => todo!(),
                        xml::ContentItem::Element(element) => todo!(),
                        xml::ContentItem::Reference(reference) => todo!(),
                        xml::ContentItem::CDSect(cdsect) => todo!(),
                        xml::ContentItem::PI(pi) => todo!(),
                        xml::ContentItem::Comment(comment) => todo!(),
                    }
                    let len = self.buffer.available_data() - rest.as_bytes().len();
                    let content = Reader::<R>::content_item_from_xml(&mut self.namespaces, e)?;
                    self.buffer.consume(len);
                    return Ok(element);
                }
                std::result::Result::Err(e) => match e {
                    Err::Incomplete(_) => {}
                    // TODO: better error
                    Err::Error(e) => return Err(Error::ParseError(e.to_string())),
                    Err::Failure(e) => return Err(Error::ParseError(e.to_string())),
                },
            }
        }
    }
}

impl<R> Reader<R> {
    fn content_item_from_xml(
        namespaces: &mut Vec<HashSet<Namespace>>,
        item: xml::ContentItem,
    ) -> Result<Content> {
        todo!()
    }

    fn start_tag_from_xml(
        depth: &mut Vec<Name>,
        namespaces: &mut Vec<HashSet<Namespace>>,
        s_tag: xml::STag,
    ) -> Result<Element> {
        let mut namespace_declarations = HashSet::new();
        for (prefix, namespace) in s_tag.attributes.iter().filter_map(|attribute| {
            if let xml::Attribute::NamespaceDeclaration { ns_name, value } = attribute {
                Some((ns_name, value))
            } else {
                None
            }
        }) {
            let prefix = match prefix {
                xml::NSAttName::PrefixedAttName(prefixed_att_name) => {
                    Some(prefixed_att_name.to_string())
                }
                xml::NSAttName::DefaultAttName => None,
            };
            let namespace = Namespace {
                prefix,
                namespace: namespace.process()?,
            };
            if !namespace_declarations.insert(namespace.clone()) {
                return Err(Error::DuplicateNameSpace(namespace));
            }
        }

        // all namespaces available to the element (from both parent elements and element itself)
        let namespace_stack: Vec<&Namespace> = namespaces
            .iter()
            .flatten()
            .chain(namespace_declarations.iter())
            .collect();

        let mut attributes = HashMap::new();

        for (q_name, value) in s_tag.attributes.iter().filter_map(|attribute| {
            if let xml::Attribute::Attribute { name, value } = attribute {
                Some((name, value))
            } else {
                None
            }
        }) {
            let namespace;
            let attribute_name;
            match q_name {
                xml::QName::PrefixedName(prefixed_name) => {
                    namespace = namespace_stack.iter().rfind(|namespace| {
                        namespace.prefix.as_deref() == Some(**prefixed_name.prefix)
                    });
                    attribute_name = prefixed_name.local_part.to_string();
                }
                xml::QName::UnprefixedName(unprefixed_name) => {
                    namespace = namespace_stack
                        .iter()
                        .rfind(|namespace| namespace.prefix == None);
                    attribute_name = unprefixed_name.to_string();
                }
            }
            if let Some(namespace) = namespace {
                let namespace = (*namespace).clone();
                let name = Name {
                    namespace,
                    name: attribute_name,
                };
                let value = value.process()?;
                if let Some(_value) = attributes.insert(name, value) {
                    return Err(Error::DuplicateAttribute(q_name.to_string()));
                }
            } else {
                return Err(Error::UnqualifiedNamespace(q_name.to_string()));
            }
        }

        let name;
        let namespace;
        match &s_tag.name {
            xml::QName::PrefixedName(prefixed_name) => {
                namespace = namespace_stack
                    .iter()
                    .rfind(|namespace| namespace.prefix.as_deref() == Some(**prefixed_name.prefix));
                name = prefixed_name.local_part.to_string();
            }
            xml::QName::UnprefixedName(unprefixed_name) => {
                namespace = namespace_stack
                    .iter()
                    .rfind(|namespace| namespace.prefix == None);
                name = unprefixed_name.to_string();
            }
        }

        let namespace = (*namespace
            .ok_or_else(|| Error::UnqualifiedNamespace(s_tag.name.to_string()))?)
        .clone();

        let name = Name { namespace, name };

        depth.push(name.clone());

        namespaces.push(namespace_declarations.clone());

        return Ok(Element {
            name,
            namespace_decl: namespace_declarations,
            attributes,
            content: Vec::new(),
        });
    }

    fn end_tag_from_xml(
        depth: &mut Vec<Name>,
        namespaces: &mut Vec<HashSet<Namespace>>,
        e_tag: xml::ETag,
    ) -> Result<()> {
        if let Some(s_tag_name) = depth.pop() {
            if s_tag_name.namespace.prefix.as_deref() == e_tag.name.prefix()
                && s_tag_name.name == e_tag.name.local_part()
            {
                namespaces.pop();
                return Ok(());
            } else {
                return Err(Error::MismatchedEndTag(
                    s_tag_name.name,
                    e_tag.name.to_string(),
                ));
            }
        } else {
            return Err(Error::NotInElement(e_tag.name.to_string()));
        }
    }

    fn element_from_xml(
        namespaces: &mut Vec<HashSet<Namespace>>,
        element: xml::Element,
    ) -> Result<Element> {
        match element {
            xml::Element::Empty(empty_elem_tag) => {
                let mut namespace_declarations = HashSet::new();
                for (prefix, namespace) in
                    empty_elem_tag.attributes.iter().filter_map(|attribute| {
                        if let xml::Attribute::NamespaceDeclaration { ns_name, value } = attribute {
                            Some((ns_name, value))
                        } else {
                            None
                        }
                    })
                {
                    let prefix = match prefix {
                        xml::NSAttName::PrefixedAttName(prefixed_att_name) => {
                            Some(prefixed_att_name.to_string())
                        }
                        xml::NSAttName::DefaultAttName => None,
                    };
                    let namespace = Namespace {
                        prefix,
                        namespace: namespace.process()?,
                    };
                    if !namespace_declarations.insert(namespace.clone()) {
                        return Err(Error::DuplicateNameSpace(namespace));
                    }
                }

                // all namespaces available to the element (from both parent elements and element itself)
                let namespace_stack: Vec<&Namespace> = namespaces
                    .iter()
                    .flatten()
                    .chain(namespace_declarations.iter())
                    .collect();

                let mut attributes = HashMap::new();

                for (q_name, value) in empty_elem_tag.attributes.iter().filter_map(|attribute| {
                    if let xml::Attribute::Attribute { name, value } = attribute {
                        Some((name, value))
                    } else {
                        None
                    }
                }) {
                    let namespace;
                    let attribute_name;
                    match q_name {
                        xml::QName::PrefixedName(prefixed_name) => {
                            namespace = namespace_stack.iter().rfind(|namespace| {
                                namespace.prefix.as_deref() == Some(**prefixed_name.prefix)
                            });
                            attribute_name = prefixed_name.local_part.to_string();
                        }
                        xml::QName::UnprefixedName(unprefixed_name) => {
                            namespace = namespace_stack
                                .iter()
                                .rfind(|namespace| namespace.prefix == None);
                            attribute_name = unprefixed_name.to_string();
                        }
                    }
                    if let Some(namespace) = namespace {
                        let namespace = (*namespace).clone();
                        let name = Name {
                            namespace,
                            name: attribute_name,
                        };
                        let value = value.process()?;
                        if let Some(_value) = attributes.insert(name, value) {
                            return Err(Error::DuplicateAttribute(q_name.to_string()));
                        }
                    } else {
                        return Err(Error::UnqualifiedNamespace(q_name.to_string()));
                    }
                }

                let name;
                let namespace;
                match &empty_elem_tag.name {
                    xml::QName::PrefixedName(prefixed_name) => {
                        namespace = namespace_stack.iter().rfind(|namespace| {
                            namespace.prefix.as_deref() == Some(**prefixed_name.prefix)
                        });
                        name = prefixed_name.local_part.to_string();
                    }
                    xml::QName::UnprefixedName(unprefixed_name) => {
                        namespace = namespace_stack
                            .iter()
                            .rfind(|namespace| namespace.prefix == None);
                        name = unprefixed_name.to_string();
                    }
                }

                let namespace = (*namespace
                    .ok_or_else(|| Error::UnqualifiedNamespace(empty_elem_tag.name.to_string()))?)
                .clone();

                let name = Name { namespace, name };

                return Ok(Element {
                    name,
                    namespace_decl: namespace_declarations,
                    attributes,
                    content: Vec::new(),
                });
            }
            xml::Element::NotEmpty(s_tag, content, e_tag) => {
                if s_tag.name != e_tag.name {
                    return Err(Error::MismatchedEndTag(
                        s_tag.name.to_string(),
                        e_tag.name.to_string(),
                    ));
                }
                let mut namespace_declarations = HashSet::new();
                for (prefix, namespace) in s_tag.attributes.iter().filter_map(|attribute| {
                    if let xml::Attribute::NamespaceDeclaration { ns_name, value } = attribute {
                        Some((ns_name, value))
                    } else {
                        None
                    }
                }) {
                    let prefix = match prefix {
                        xml::NSAttName::PrefixedAttName(prefixed_att_name) => {
                            Some(prefixed_att_name.to_string())
                        }
                        xml::NSAttName::DefaultAttName => None,
                    };
                    let namespace = Namespace {
                        prefix,
                        namespace: namespace.process()?,
                    };
                    if !namespace_declarations.insert(namespace.clone()) {
                        return Err(Error::DuplicateNameSpace(namespace));
                    }
                }

                // all namespaces available to the element (from both parent elements and element itself)
                let namespace_stack: Vec<&Namespace> = namespaces
                    .iter()
                    .flatten()
                    .chain(namespace_declarations.iter())
                    .collect();

                let mut attributes = HashMap::new();

                for (q_name, value) in s_tag.attributes.iter().filter_map(|attribute| {
                    if let xml::Attribute::Attribute { name, value } = attribute {
                        Some((name, value))
                    } else {
                        None
                    }
                }) {
                    let namespace;
                    let attribute_name;
                    match q_name {
                        xml::QName::PrefixedName(prefixed_name) => {
                            namespace = namespace_stack.iter().rfind(|namespace| {
                                namespace.prefix.as_deref() == Some(**prefixed_name.prefix)
                            });
                            attribute_name = prefixed_name.local_part.to_string();
                        }
                        xml::QName::UnprefixedName(unprefixed_name) => {
                            namespace = namespace_stack
                                .iter()
                                .rfind(|namespace| namespace.prefix == None);
                            attribute_name = unprefixed_name.to_string();
                        }
                    }
                    if let Some(namespace) = namespace {
                        let namespace = (*namespace).clone();
                        let name = Name {
                            namespace,
                            name: attribute_name,
                        };
                        let value = value.process()?;
                        if let Some(_value) = attributes.insert(name, value) {
                            return Err(Error::DuplicateAttribute(q_name.to_string()));
                        }
                    } else {
                        return Err(Error::UnqualifiedNamespace(q_name.to_string()));
                    }
                }

                let name;
                let namespace;
                match &s_tag.name {
                    xml::QName::PrefixedName(prefixed_name) => {
                        namespace = namespace_stack.iter().rfind(|namespace| {
                            namespace.prefix.as_deref() == Some(**prefixed_name.prefix)
                        });
                        name = prefixed_name.local_part.to_string();
                    }
                    xml::QName::UnprefixedName(unprefixed_name) => {
                        namespace = namespace_stack
                            .iter()
                            .rfind(|namespace| namespace.prefix == None);
                        name = unprefixed_name.to_string();
                    }
                }

                let namespace = (*namespace
                    .ok_or_else(|| Error::UnqualifiedNamespace(s_tag.name.to_string()))?)
                .clone();

                let name = Name { namespace, name };

                namespaces.push(namespace_declarations.clone());

                let content = Self::content_from_xml(namespaces, content)?;

                namespaces.pop();

                return Ok(Element {
                    name,
                    namespace_decl: namespace_declarations,
                    attributes,
                    content,
                });
            }
        }
    }

    fn content_from_xml(
        namespaces: &mut Vec<HashSet<Namespace>>,
        element: xml::Content,
    ) -> Result<Vec<Content>> {
        let mut content = Vec::new();
        let mut text = element.char_data.map(|str| String::from(*str));
        for (content_item, char_data) in element.content {
            match content_item {
                xml::ContentItem::Element(element) => {
                    text.map(|text| content.push(Content::Text(text)));
                    content.push(Content::Element(Self::element_from_xml(
                        namespaces, element,
                    )?));
                    text = char_data.map(|str| String::from(*str));
                }
                xml::ContentItem::Reference(reference) => {
                    let data = reference.process()?;
                    if let Some(text) = &mut text {
                        text.push(data)
                    } else {
                        text = Some(String::from(data))
                    }
                    char_data.map(|char_data| text.as_mut().map(|s| s.push_str(*char_data)));
                }
                xml::ContentItem::CDSect(cd_sect) => {
                    if let Some(text) = &mut text {
                        text.push_str(**cd_sect)
                    } else {
                        text = Some(String::from(**cd_sect))
                    }
                    char_data.map(|char_data| text.as_mut().map(|s| s.push_str(*char_data)));
                }
                // TODO: is this important?
                xml::ContentItem::PI(pi) => {
                    char_data.map(|char_data| text.as_mut().map(|s| s.push_str(*char_data)));
                }
                // TODO: comments?
                xml::ContentItem::Comment(comment) => {
                    char_data.map(|char_data| text.as_mut().map(|s| s.push_str(*char_data)));
                }
            }
        }
        text.map(|text| content.push(Content::Text(text)));
        Ok(content)
    }
}

impl<R: AsyncRead + Unpin> Stream for Reader<R> {
    type Item = Result<Content>;

    fn poll_next(
        self: std::pin::Pin<&mut Self>,
        cx: &mut std::task::Context<'_>,
    ) -> std::task::Poll<Option<Self::Item>> {
        let mut e = self;
        let mut pinned = pin!(e.read_content());
        pinned.as_mut().poll(cx).map(|result| Some(result))
    }
}

#[cfg(test)]
mod test {
    use futures::{sink::Buffer, StreamExt};
    use tokio::io::AsyncRead;

    use super::Reader;

    struct MockAsyncReader<'s>(&'s str);

    impl<'s> MockAsyncReader<'s> {
        fn new(data: &'s str) -> Self {
            Self(data)
        }
    }

    impl<'s> AsyncRead for MockAsyncReader<'s> {
        fn poll_read(
            self: std::pin::Pin<&mut Self>,
            _cx: &mut std::task::Context<'_>,
            buf: &mut tokio::io::ReadBuf<'_>,
        ) -> std::task::Poll<std::io::Result<()>> {
            buf.put_slice(self.0.as_bytes());
            std::task::Poll::Ready(Ok(()))
        }
    }

    const TEST_DOC: &'static str = "<xs:schema
       xmlns:xs='http://www.w3.org/2001/XMLSchema'
       targetNamespace='http://etherx.jabber.org/streams'
       xmlns='http://etherx.jabber.org/streams'
       elementFormDefault='unqualified'>

     <xs:import namespace='jabber:client'/>
     <xs:import namespace='jabber:server'/>
     <xs:import namespace='urn:ietf:params:xml:ns:xmpp-sasl'/>
     <xs:import namespace='urn:ietf:params:xml:ns:xmpp-streams'/>
     <xs:import namespace='urn:ietf:params:xml:ns:xmpp-tls'/>

     <xs:element name='stream'>
       <xs:complexType>
         <xs:sequence xmlns:client='jabber:client'
                      xmlns:server='jabber:server'>
           <xs:element ref='features'
                       minOccurs='0'
                       maxOccurs='1'/>
           <xs:any namespace='urn:ietf:params:xml:ns:xmpp-tls'
                   minOccurs='0'
                   maxOccurs='1'/>
           <xs:any namespace='urn:ietf:params:xml:ns:xmpp-sasl'
                   minOccurs='0'
                   maxOccurs='1'/>
           <xs:any namespace='##other'
                   minOccurs='0'
                   maxOccurs='unbounded'
                   processContents='lax'/>
           <xs:choice minOccurs='0' maxOccurs='1'>
             <xs:choice minOccurs='0' maxOccurs='unbounded'>
               <xs:element ref='client:message'/>
               <xs:element ref='client:presence'/>
               <xs:element ref='client:iq'/>
             </xs:choice>
             <xs:choice minOccurs='0' maxOccurs='unbounded'>
               <xs:element ref='server:message'/>
               <xs:element ref='server:presence'/>
               <xs:element ref='server:iq'/>
             </xs:choice>
           </xs:choice>
           <xs:element ref='error' minOccurs='0' maxOccurs='1'/>
         </xs:sequence>
         <xs:attribute name='from' type='xs:string' use='optional'/>
         <xs:attribute name='id' type='xs:string' use='optional'/>
         <xs:attribute name='to' type='xs:string' use='optional'/>
         <xs:attribute name='version' type='xs:decimal' use='optional'/>
         <xs:attribute ref='xml:lang' use='optional'/>
         <xs:anyAttribute namespace='##other' processContents='lax'/>
       </xs:complexType>
     </xs:element>

     <xs:element name='features'>
       <xs:complexType>
         <xs:sequence>
           <xs:any namespace='##other'
                   minOccurs='0'
                   maxOccurs='unbounded'
                   processContents='lax'/>
         </xs:sequence>
       </xs:complexType>
     </xs:element>

     <xs:element name='error'>
       <xs:complexType>
         <xs:sequence  xmlns:err='urn:ietf:params:xml:ns:xmpp-streams'>
           <xs:group   ref='err:streamErrorGroup'/>
           <xs:element ref='err:text'
                       minOccurs='0'
                       maxOccurs='1'/>
           <xs:any     namespace='##other'
                       minOccurs='0'
                       maxOccurs='1'
                       processContents='lax'/>
         </xs:sequence>
       </xs:complexType>
     </xs:element>

   </xs:schema>asdf";

    #[tokio::test]
    async fn test_element_read() {
        let mock = MockAsyncReader::new(TEST_DOC);
        let mut reader = Reader::new(mock);
        let element = reader.read_element().await.unwrap();
        println!("{:#?}", element);
    }

    #[tokio::test]
    async fn test_element_stream() {
        let mock = MockAsyncReader::new(TEST_DOC);
        let mut reader = Reader::new(mock);
        let element = reader.read_start_tag().await.unwrap();
        println!("{:#?}", element);
        loop {
            let element = reader.next().await.unwrap();
            println!("{:#?}", element);
        }
    }
}