aboutsummaryrefslogblamecommitdiffstats
path: root/filamento/src/logic/process_stanza.rs
blob: cdaff97ce0e57b07a6ec2909edce5503e2c92a66 (plain) (tree)
1
2
3
4
5
6
7
8
9
10
11
12

                      
                                               

                                           
                         





                                       
                           
                                                 

                                        


               
                        
                          
            

                                                                                           

                     

                                                              
               



                       




                                                  


                                                                         









                                                

                                                 
                                                     
                                                          
                                             





                                                                                  
 
                                                                                                                                                     
















                                                                                     













                                                                                                  
                                                                





                                                                                              
                                                            

                 
 







                                                                              



                     




                                                     
                                    





                                                 


























































                                                                                              
             

         

                                                   














































































                                                                                                    
                                             











                                                                       
                                                             

                                                                                        
                                                                         





                                                                                        
                                                                                                  



















                                                                                                                                               
                                                             
 
                                                                       












                                                                                                                                                                                                                                                                 
                                                                                                                                                                
                                                                                                                                                
                                                                                                          



                                                                                                                     


                                                                                                                                                                           


                                                                                                                                                                                                              












                                                                                                                                  













                                                                                                                                                                                                   
                                                                                    











                                                                                                          
                                                                     


                                                                                                                                                        




























































                                                                                                                                                  
                                         
                                     
                                 
                                       

                             


                               




                                                                  
                                                                   


             
                
                                            
































































                                                                                                                                                                                      


                                                   




                                                               
                      
                   

             




                                       

                                            

                          
                                                    


                                     
                                                                                    

         

                                                                                   



                                                                

                                                                        




                                                        
                    
         






                                                                
                                                                    
                                                                           



























                                                                                               




                                                                                        














                                                                                                 




                                                                                    




                                                                                             








                                                                     




                                                                    













                                                                                 




                                                                    


















                                                                                 




                                                            



                                                                     
                                            



                                                                


                                                                     

                                                                                            


                                                                               


                                                                                         


                                                                                              
                                     





                                                                                                    


                                                                                              












                                                                                             


                                                                                                  
                                     





                                                                                        

                                 
                                









                                                                                




                                                                        
                                    
                         
                     
                                                       










                                                                                 




                                                                    


                                                                             
                 
                    









                                                                    




                                                            
                        
             



         

                                                   

                          
                                               


                                                                                          
                                                                            
                                                
                                                   
                                 
                          
         
                                              
                                          
                                           

                                               

          
 
use std::str::FromStr;

use base64::{Engine, prelude::BASE64_STANDARD};
use chrono::Utc;
use lampada::{Connected, SupervisorSender};
use sha1::{Digest, Sha1};
use stanza::{
    client::{
        Stanza,
        iq::{self, Iq, IqType},
    },
    stanza_error::Error as StanzaError,
    xep_0030::{self, info},
    xep_0060::event::{Content, Event, ItemsType},
};
use tracing::{debug, error, info, warn};
use uuid::Uuid;

use crate::{
    UpdateMessage, caps,
    chat::{Body, Message},
    error::{
        AvatarUpdateError, DatabaseError, Error, IqError, IqProcessError, MessageRecvError,
        PresenceError, RosterError,
    },
    files::FileStore,
    presence::{Offline, Online, Presence, PresenceType, Show},
    roster::Contact,
    user::User,
};

use super::ClientLogic;

pub async fn handle_stanza<Fs: FileStore + Clone>(
    logic: ClientLogic<Fs>,
    stanza: Stanza,
    connection: Connected,
) {
    let result = process_stanza(logic.clone(), stanza, connection).await;
    match result {
        Ok(u) => match u {
            _ => {
                if let Some(u) = u {
                    logic.handle_update(u).await
                }
            }
        },
        Err(e) => logic.handle_error(e).await,
    }
}

pub async fn recv_message<Fs: FileStore + Clone>(
    logic: ClientLogic<Fs>,
    stanza_message: stanza::client::message::Message,
) -> Result<Option<UpdateMessage>, MessageRecvError<Fs>> {
    if let Some(from) = stanza_message.from {
        // TODO: don't ignore delay from. xep says SHOULD send error if incorrect.
        let timestamp = stanza_message
            .delay
            .map(|delay| delay.stamp)
            .unwrap_or_else(|| Utc::now());
        // TODO: group chat messages

        // body MUST be before user changes in order to avoid race condition where you e.g. get a nick update before the user is in the client state.
        // if there is a body, should create chat message
        if let Some(body) = stanza_message.body {
            let message = Message {
                id: stanza_message
                    .id
                    // TODO: proper id xep
                    .map(|id| Uuid::from_str(&id).unwrap_or_else(|_| Uuid::new_v4()))
                    .unwrap_or_else(|| Uuid::new_v4()),
                from: from.as_bare(),
                timestamp,
                body: Body {
                    body: body.body.unwrap_or_default(),
                },
                delivery: None,
            };

            // save the message to the database
            match logic.db().upsert_chat_and_user(&from).await {
                Ok(_) => {
                    if let Err(e) = logic
                        .db()
                        .create_message_with_user_resource(
                            message.clone(),
                            from.clone(),
                            from.clone(),
                        )
                        .await
                    {
                        logic
                            .handle_error(Error::MessageRecv(MessageRecvError::MessageHistory(e)))
                            .await;
                        error!("failed to upsert chat and user")
                    }
                }
                Err(e) => {
                    logic
                        .handle_error(Error::MessageRecv(MessageRecvError::MessageHistory(e)))
                        .await;
                    error!("failed to upsert chat and user")
                }
            };

            let from_user = match logic.db().read_user(from.as_bare()).await {
                Ok(u) => u,
                Err(e) => {
                    error!("{}", e);
                    User {
                        jid: from.as_bare(),
                        nick: None,
                        avatar: None,
                    }
                }
            };

            // update the client with the new message
            logic
                .update_sender()
                .send(UpdateMessage::Message {
                    to: from.as_bare(),
                    from: from_user,
                    message,
                })
                .await;
        }

        if let Some(nick) = stanza_message.nick {
            let nick = nick.0;
            if nick.is_empty() {
                match logic.db().delete_user_nick(from.as_bare()).await {
                    Ok(changed) => {
                        if changed {
                            logic
                                .update_sender()
                                .send(UpdateMessage::NickChanged {
                                    jid: from.as_bare(),
                                    nick: None,
                                })
                                .await;
                        }
                    }
                    Err(e) => {
                        logic
                            .handle_error(Error::MessageRecv(MessageRecvError::NickUpdate(e)))
                            .await;
                        // if failed, send user update anyway
                        logic
                            .update_sender()
                            .send(UpdateMessage::NickChanged {
                                jid: from.as_bare(),
                                nick: None,
                            })
                            .await;
                    }
                }
            } else {
                match logic
                    .db()
                    .upsert_user_nick(from.as_bare(), nick.clone())
                    .await
                {
                    Ok(changed) => {
                        if changed {
                            logic
                                .update_sender()
                                .send(UpdateMessage::NickChanged {
                                    jid: from.as_bare(),
                                    nick: Some(nick),
                                })
                                .await;
                        }
                    }
                    Err(e) => {
                        logic
                            .handle_error(Error::MessageRecv(MessageRecvError::NickUpdate(e)))
                            .await;
                        // if failed, send user update anyway
                        logic
                            .update_sender()
                            .send(UpdateMessage::NickChanged {
                                jid: from.as_bare(),
                                nick: Some(nick),
                            })
                            .await;
                    }
                }
            }
        }

        if let Some(event) = stanza_message.event {
            match event {
                Event::Items(items) => {
                    match items.node.as_str() {
                        "http://jabber.org/protocol/nick" => match items.items {
                            ItemsType::Item(items) => {
                                if let Some(item) = items.first() {
                                    match &item.item {
                                        Some(c) => match c {
                                            Content::Nick(nick) => {
                                                let nick = nick.0.clone();
                                                if nick.is_empty() {
                                                    match logic
                                                        .db()
                                                        .delete_user_nick(from.as_bare())
                                                        .await
                                                    {
                                                        Ok(changed) => {
                                                            if changed {
                                                                logic
                                                                .update_sender()
                                                                .send(UpdateMessage::NickChanged {
                                                                    jid: from.as_bare(),
                                                                    nick: None,
                                                                })
                                                                .await;
                                                            }
                                                        }
                                                        Err(e) => {
                                                            logic
                                                                .handle_error(Error::MessageRecv(
                                                                    MessageRecvError::NickUpdate(e),
                                                                ))
                                                                .await;
                                                            // if failed, send user update anyway
                                                            logic
                                                                .update_sender()
                                                                .send(UpdateMessage::NickChanged {
                                                                    jid: from.as_bare(),
                                                                    nick: None,
                                                                })
                                                                .await;
                                                        }
                                                    }
                                                } else {
                                                    match logic
                                                        .db()
                                                        .upsert_user_nick(
                                                            from.as_bare(),
                                                            nick.clone(),
                                                        )
                                                        .await
                                                    {
                                                        Ok(changed) => {
                                                            if changed {
                                                                logic
                                                                .update_sender()
                                                                .send(UpdateMessage::NickChanged {
                                                                    jid: from.as_bare(),
                                                                    nick: Some(nick),
                                                                })
                                                                .await;
                                                            }
                                                        }
                                                        Err(e) => {
                                                            logic
                                                                .handle_error(Error::MessageRecv(
                                                                    MessageRecvError::NickUpdate(e),
                                                                ))
                                                                .await;
                                                            // if failed, send user update anyway
                                                            logic
                                                                .update_sender()
                                                                .send(UpdateMessage::NickChanged {
                                                                    jid: from.as_bare(),
                                                                    nick: Some(nick),
                                                                })
                                                                .await;
                                                        }
                                                    }
                                                }
                                            }
                                            _ => {}
                                        },
                                        None => {}
                                    }
                                }
                            }
                            _ => {}
                        },
                        "urn:xmpp:avatar:metadata" => {
                            match items.items {
                                ItemsType::Item(items) => {
                                    if let Some(item) = items.first() {
                                        debug!("found item");
                                        match &item.item {
                                            Some(Content::AvatarMetadata(metadata)) => {
                                                debug!("found metadata");
                                                // check if user avatar has been deleted
                                                if let Some(metadata) = metadata
                                                    .info
                                                    .iter()
                                                    .find(|info| info.url.is_none())
                                                {
                                                    debug!("checking if user avatar has changed");
                                                    // check if user avatar has changed
                                                    match logic
                                                        .db()
                                                        .upsert_user_avatar(
                                                            from.as_bare(),
                                                            metadata.id.clone(),
                                                        )
                                                        .await
                                                    {
                                                        Ok((changed, old_avatar)) => {
                                                            if changed {
                                                                if let Some(old_avatar) = old_avatar
                                                                {
                                                                    if let Err(e) = logic
                                                                        .file_store()
                                                                        .delete(&old_avatar)
                                                                        .await.map_err(|err| AvatarUpdateError::FileStore(err)) {
                                                                            logic.handle_error(MessageRecvError::AvatarUpdate(e).into()).await;
                                                                    }
                                                                }
                                                            }

                                                            match logic
                                                                    .file_store()
                                                                    .is_stored(&metadata.id)
                                                                    .await
                                                                    .map_err(|err| {
                                                                        AvatarUpdateError::<Fs>::FileStore(
                                                                            err,
                                                                        )
                                                                    }) {
                                                                    Ok(false) => {
                                                                        // get data
                                                                        let pep_item = logic.client().get_pep_item(Some(from.as_bare()), "urn:xmpp:avatar:data".to_string(), metadata.id.clone()).await.map_err(|err| Into::<AvatarUpdateError<Fs>>::into(err))?;
                                                                        match pep_item {
                                                                            crate::pep::Item::AvatarData(data) => {
                                                                                let data = data.map(|data| data.data_b64).unwrap_or_default().replace("\n", "");
                                                                                // TODO: these should all be in a separate avatarupdate function
                                                                                debug!("got avatar data");
                                                                                match BASE64_STANDARD.decode(data) {
                                                                                    Ok(data) => {
                                                                                        let mut hasher = Sha1::new();
                                                                                        hasher.update(&data);
                                                                                        let received_data_hash = hex::encode(hasher.finalize());
                                                                                        debug!("received_data_hash: {}, metadata_id: {}", received_data_hash, metadata.id);
                                                                                        if received_data_hash.to_lowercase() == metadata.id.to_lowercase() {
                                                                                            if let Err(e) = logic.file_store().store(&received_data_hash, &data).await {
                                                                                                logic.handle_error(Error::MessageRecv(MessageRecvError::AvatarUpdate(AvatarUpdateError::FileStore(e)))).await;
                                                                                            }
                                                                                            if changed {
                                                                                                logic
                                                                                                .update_sender()
                                                                                                .send(
                                                                                                    UpdateMessage::AvatarChanged {
                                                                                                        jid: from.as_bare(),
                                                                                                        id: Some(
                                                                                                            metadata.id.clone(),
                                                                                                        ),
                                                                                                    },
                                                                                                )
                                                                                                .await;
                                                                                            }
                                                                                        }
                                                                                    },
                                                                                    Err(e) => {
                                                                                        logic.handle_error(Error::MessageRecv(MessageRecvError::AvatarUpdate(AvatarUpdateError::Base64(e)))).await;
                                                                                    },
                                                                                }
                                                                            },
                                                                            _ => {
                                                                                logic.handle_error(Error::MessageRecv(MessageRecvError::AvatarUpdate(AvatarUpdateError::MissingData))).await;
                                                                            }
                                                                        }
                                                                    }
                                                                    Ok(true) => {
                                                                        // just send the update
                                                                        if changed {
                                                                        logic
                                                                        .update_sender()
                                                                        .send(
                                                                            UpdateMessage::AvatarChanged {
                                                                                jid: from.as_bare(),
                                                                                id: Some(
                                                                                    metadata.id.clone(),
                                                                                ),
                                                                            },
                                                                        )
                                                                        .await;
                                                                    }
                                                                    }
                                                                    Err(e) => {
                                                                        logic.handle_error(Error::MessageRecv(MessageRecvError::AvatarUpdate(e))).await;
                                                                    }
                                                            }
                                                        }
                                                        Err(e) => {
                                                            logic
                                                                .handle_error(Error::MessageRecv(
                                                                    MessageRecvError::AvatarUpdate(
                                                                        AvatarUpdateError::Database(
                                                                            e,
                                                                        ),
                                                                    ),
                                                                ))
                                                                .await;
                                                        }
                                                    }
                                                } else {
                                                    // delete avatar
                                                    match logic
                                                        .db()
                                                        .delete_user_avatar(from.as_bare())
                                                        .await
                                                    {
                                                        Ok((changed, old_avatar)) => {
                                                            if changed {
                                                                if let Some(old_avatar) = old_avatar
                                                                {
                                                                    if let Err(e) = logic
                                                                        .file_store()
                                                                        .delete(&old_avatar)
                                                                        .await.map_err(|err| AvatarUpdateError::FileStore(err)) {
                                                                            logic.handle_error(MessageRecvError::AvatarUpdate(e).into()).await;
                                                                    }
                                                                }
                                                                logic
                                                                    .update_sender()
                                                                    .send(
                                                                        UpdateMessage::AvatarChanged {
                                                                            jid: from.as_bare(),
                                                                            id: None,
                                                                        },
                                                                    )
                                                                    .await;
                                                            }
                                                        }
                                                        Err(e) => {
                                                            logic
                                                                .handle_error(Error::MessageRecv(
                                                                    MessageRecvError::AvatarUpdate(
                                                                        AvatarUpdateError::Database(
                                                                            e,
                                                                        ),
                                                                    ),
                                                                ))
                                                                .await;
                                                        }
                                                    }
                                                }
                                                // check if the new file is in the file store
                                                // if not, retrieve from server and save in the file store (remember to check if the hash matches)
                                                // send the avatar update
                                            }
                                            _ => {}
                                        }
                                    }
                                }
                                _ => {}
                            }
                        }
                        _ => {}
                    }
                }
                // Event::Collection(collection) => todo!(),
                // Event::Configuration(configuration) => todo!(),
                // Event::Delete(delete) => todo!(),
                // Event::Purge(purge) => todo!(),
                // Event::Subscription(subscription) => todo!(),
                _ => {} // TODO: catch these catch-alls in some way
            }
        }

        Ok(None)
        // TODO: can this be more efficient?
    } else {
        Err(MessageRecvError::MissingFrom)
    }
}

pub async fn recv_presence(
    presence: stanza::client::presence::Presence,
) -> Result<Option<UpdateMessage>, PresenceError> {
    if let Some(from) = presence.from {
        match presence.r#type {
            Some(r#type) => match r#type {
                // error processing a presence from somebody
                stanza::client::presence::PresenceType::Error => {
                    // TODO: is there any other information that should go with the error? also MUST have an error, otherwise it's a different error. maybe it shoulnd't be an option.
                    // TODO: ughhhhhhhhhhhhh these stanza errors should probably just have an option, and custom display
                    Err(PresenceError::StanzaError(
                        presence
                            .errors
                            .first()
                            .cloned()
                            .expect("error MUST have error"),
                    ))
                }
                // should not happen (error to server)
                stanza::client::presence::PresenceType::Probe => {
                    // TODO: should probably write an error and restart stream
                    Err(PresenceError::Unsupported)
                }
                stanza::client::presence::PresenceType::Subscribe => {
                    // may get a subscription request from somebody who is not a contact!!! therefore should be its own kind of event
                    Ok(Some(UpdateMessage::SubscriptionRequest(from)))
                }
                stanza::client::presence::PresenceType::Unavailable => {
                    let offline = Offline {
                        status: presence.status.map(|status| status.status.0),
                    };
                    let timestamp = presence
                        .delay
                        .map(|delay| delay.stamp)
                        .unwrap_or_else(|| Utc::now());
                    Ok(Some(UpdateMessage::Presence {
                        from,
                        presence: Presence {
                            timestamp,
                            presence: PresenceType::Offline(offline),
                        },
                    }))
                }
                // for now, do nothing, as these are simply informational. will receive roster push from the server regarding the changes to do with them.
                stanza::client::presence::PresenceType::Subscribed => Ok(None),
                stanza::client::presence::PresenceType::Unsubscribe => Ok(None),
                stanza::client::presence::PresenceType::Unsubscribed => Ok(None),
            },
            None => {
                let online = Online {
                    show: presence.show.map(|show| match show {
                        stanza::client::presence::Show::Away => Show::Away,
                        stanza::client::presence::Show::Chat => Show::Chat,
                        stanza::client::presence::Show::Dnd => Show::DoNotDisturb,
                        stanza::client::presence::Show::Xa => Show::ExtendedAway,
                    }),
                    status: presence.status.map(|status| status.status.0),
                    priority: presence.priority.map(|priority| priority.0),
                };
                let timestamp = presence
                    .delay
                    .map(|delay| delay.stamp)
                    .unwrap_or_else(|| Utc::now());
                Ok(Some(UpdateMessage::Presence {
                    from,
                    presence: Presence {
                        timestamp,
                        presence: PresenceType::Online(online),
                    },
                }))
            }
        }
    } else {
        Err(PresenceError::MissingFrom)
    }
}

pub async fn recv_iq<Fs: FileStore + Clone>(
    logic: ClientLogic<Fs>,
    connection: Connected,
    iq: Iq,
) -> Result<Option<UpdateMessage>, IqProcessError> {
    if let Some(to) = &iq.to {
        if *to == *connection.jid() {
        } else {
            return Err(IqProcessError::Iq(IqError::IncorrectAddressee(to.clone())));
        }
    }
    match iq.r#type {
        stanza::client::iq::IqType::Error | stanza::client::iq::IqType::Result => {
            let from = iq
                .from
                .clone()
                .unwrap_or_else(|| connection.server().clone());
            let id = iq.id.clone();
            debug!("received iq result with id `{}` from {}", id, from);
            logic
                .pending()
                .respond(Stanza::Iq(iq), id)
                .await
                .map_err(|e| Into::<IqError>::into(e))?;
            Ok(None)
        }
        stanza::client::iq::IqType::Get => {
            let from = iq
                .from
                .clone()
                .unwrap_or_else(|| connection.server().clone());
            if let Some(query) = iq.query {
                match query {
                    stanza::client::iq::Query::DiscoInfo(query) => {
                        info!("received disco#info request from {}", from);
                        let current_caps_node = caps::caps_node();
                        let disco: info::Query =
                            if query.node.is_none() || query.node == Some(current_caps_node) {
                                let mut info = caps::client_info();
                                info.node = query.node;
                                info.into()
                            } else {
                                match logic
                                    .db()
                                    .read_capabilities(&query.node.clone().unwrap())
                                    .await
                                {
                                    Ok(c) => match caps::decode_info_base64(c) {
                                        Ok(mut i) => {
                                            i.node = query.node;
                                            i.into()
                                        }
                                        Err(_e) => {
                                            let iq = Iq {
                                                from: Some(connection.jid().clone()),
                                                id: iq.id,
                                                to: iq.from,
                                                r#type: IqType::Error,
                                                lang: None,
                                                query: Some(iq::Query::DiscoInfo(query)),
                                                errors: vec![StanzaError::ItemNotFound.into()],
                                            };
                                            // TODO: log error
                                            connection
                                                .write_handle()
                                                .write(Stanza::Iq(iq))
                                                .await
                                                .map_err(|e| Into::<IqError>::into(e))?;
                                            info!("replied to disco#info request from {}", from);
                                            return Ok(None);
                                        }
                                    },
                                    Err(_e) => {
                                        let iq = Iq {
                                            from: Some(connection.jid().clone()),
                                            id: iq.id,
                                            to: iq.from,
                                            r#type: IqType::Error,
                                            lang: None,
                                            query: Some(iq::Query::DiscoInfo(query)),
                                            errors: vec![StanzaError::ItemNotFound.into()],
                                        };
                                        // TODO: log error
                                        connection
                                            .write_handle()
                                            .write(Stanza::Iq(iq))
                                            .await
                                            .map_err(|e| Into::<IqError>::into(e))?;
                                        info!("replied to disco#info request from {}", from);
                                        return Ok(None);
                                    }
                                }
                            };
                        let iq = Iq {
                            from: Some(connection.jid().clone()),
                            id: iq.id,
                            to: iq.from,
                            r#type: IqType::Result,
                            lang: None,
                            query: Some(iq::Query::DiscoInfo(disco)),
                            errors: vec![],
                        };
                        connection
                            .write_handle()
                            .write(Stanza::Iq(iq))
                            .await
                            .map_err(|e| Into::<IqError>::into(e))?;
                        info!("replied to disco#info request from {}", from);
                        Ok(None)
                    }
                    _ => {
                        warn!("received unsupported iq get from {}", from);
                        let iq = Iq {
                            from: Some(connection.jid().clone()),
                            id: iq.id,
                            to: iq.from,
                            r#type: IqType::Error,
                            lang: None,
                            query: None,
                            errors: vec![StanzaError::ServiceUnavailable.into()],
                        };
                        connection
                            .write_handle()
                            .write(Stanza::Iq(iq))
                            .await
                            .map_err(|e| Into::<IqError>::into(e))?;
                        warn!("replied to unsupported iq get from {}", from);
                        Ok(None)
                    } // stanza::client::iq::Query::Bind(bind) => todo!(),
                      // stanza::client::iq::Query::DiscoItems(query) => todo!(),
                      // stanza::client::iq::Query::Ping(ping) => todo!(),
                      // stanza::client::iq::Query::Roster(query) => todo!(),
                      // stanza::client::iq::Query::Unsupported => todo!(),
                }
            } else {
                info!("received malformed iq query from {}", from);
                let iq = Iq {
                    from: Some(connection.jid().clone()),
                    id: iq.id,
                    to: iq.from,
                    r#type: IqType::Error,
                    lang: None,
                    query: None,
                    errors: vec![StanzaError::BadRequest.into()],
                };
                connection
                    .write_handle()
                    .write(Stanza::Iq(iq))
                    .await
                    .map_err(|e| Into::<IqError>::into(e))?;
                info!("replied to malformed iq query from {}", from);
                Ok(None)
            }
        }
        stanza::client::iq::IqType::Set => {
            let from = iq
                .from
                .clone()
                .unwrap_or_else(|| connection.server().clone());
            if let Some(query) = iq.query {
                match query {
                    stanza::client::iq::Query::Roster(mut query) => {
                        // TODO: should only have one, otherwise send error
                        // if let Some(item) = query.items.pop() && query.items.len() == 1 {
                        if let Some(item) = query.items.pop() {
                            match item.subscription {
                                Some(stanza::roster::Subscription::Remove) => {
                                    if let Err(e) =
                                        logic.db().delete_contact(item.jid.clone()).await
                                    {
                                        logic
                                            .handle_error(RosterError::Cache(e.into()).into())
                                            .await;
                                    }
                                    Ok(Some(UpdateMessage::RosterDelete(item.jid)))
                                }
                                _ => {
                                    let contact: Contact = item.into();
                                    if let Err(e) = logic.db().upsert_contact(contact.clone()).await
                                    {
                                        logic
                                            .handle_error(RosterError::Cache(e.into()).into())
                                            .await;
                                    }
                                    let iq = Iq {
                                        from: Some(connection.jid().clone()),
                                        id: iq.id,
                                        to: iq.from,
                                        r#type: IqType::Result,
                                        lang: None,
                                        query: None,
                                        errors: vec![],
                                    };
                                    if let Err(e) =
                                        connection.write_handle().write(Stanza::Iq(iq)).await
                                    {
                                        logic
                                            .handle_error(RosterError::PushReply(e.into()).into())
                                            .await;
                                    }
                                    let user = logic
                                        .db()
                                        .read_user(contact.user_jid.clone())
                                        .await
                                        .map_err(|e| Into::<RosterError>::into(e))?;
                                    Ok(Some(UpdateMessage::RosterUpdate(contact, user)))
                                }
                            }
                        } else {
                            warn!("received malformed roster push");
                            let iq = Iq {
                                from: Some(connection.jid().clone()),
                                id: iq.id,
                                to: iq.from,
                                r#type: IqType::Error,
                                lang: None,
                                query: None,
                                errors: vec![StanzaError::NotAcceptable.into()],
                            };
                            connection
                                .write_handle()
                                .write(Stanza::Iq(iq))
                                .await
                                .map_err(|e| Into::<IqError>::into(e))?;
                            Ok(None)
                        }
                    }
                    // TODO: send unsupported to server
                    _ => {
                        warn!("received unsupported iq set from {}", from);
                        let iq = Iq {
                            from: Some(connection.jid().clone()),
                            id: iq.id,
                            to: iq.from,
                            r#type: IqType::Error,
                            lang: None,
                            query: None,
                            errors: vec![StanzaError::ServiceUnavailable.into()],
                        };
                        connection
                            .write_handle()
                            .write(Stanza::Iq(iq))
                            .await
                            .map_err(|e| Into::<IqError>::into(e))?;
                        warn!("replied to unsupported iq set from {}", from);
                        Ok(None)
                    }
                }
            } else {
                warn!("received malformed iq set from {}", from);
                let iq = Iq {
                    from: Some(connection.jid().clone()),
                    id: iq.id,
                    to: iq.from,
                    r#type: IqType::Error,
                    lang: None,
                    query: None,
                    errors: vec![StanzaError::NotAcceptable.into()],
                };
                connection
                    .write_handle()
                    .write(Stanza::Iq(iq))
                    .await
                    .map_err(|e| Into::<IqError>::into(e))?;
                Ok(None)
            }
        }
    }
}

pub async fn process_stanza<Fs: FileStore + Clone>(
    logic: ClientLogic<Fs>,
    stanza: Stanza,
    connection: Connected,
) -> Result<Option<UpdateMessage>, Error<Fs>> {
    let update = match stanza {
        Stanza::Message(stanza_message) => Ok(recv_message(logic, stanza_message).await?),
        Stanza::Presence(presence) => Ok(recv_presence(presence).await?),
        Stanza::Iq(iq) => Ok(recv_iq(logic, connection.clone(), iq).await?),
        // unreachable, always caught by lampada
        // TODO: make cleaner than this in some way
        Stanza::Error(error) => {
            unreachable!()
        }
        // should this cause a stream restart?
        Stanza::OtherContent(content) => {
            Err(Error::UnrecognizedContent)
            // TODO: send error to write_thread
        }
    };
    update
}