aboutsummaryrefslogblamecommitdiffstats
path: root/filamento/src/logic/online.rs
blob: 767f92344406e9fbb647a223783dcac85ca81b1f (plain) (tree)
1
2
3
4
5
6
7
8
9
10
                                      

                                               
                
                       
             
                                                          
                         

             
                                             
                                                                                                                                  

                         
                                  


               

                                                                                                                                                             
                                                                                                                                              

  
            
                 
                                                                                                                                                                                                                                                                                             
                  
  
 
                                                                                                                        






                                                                         

                                                      


































                                                                                             
              




















                                                             
                                                                                                

                                                                 
                          
                                                
























































                                                                                             
     

 

                                                       























































                                                                

                                                         




                                                                        
                                                                        

                                                                        

                            


                                                                        

                                                                         
                            




                                                     

                                                                

                          
                                 
                                            

                                                                        
                                                                        

                                                                        

                            




                                                     

                                                                

                          
                                 
                                                                        

                                                                         
                            

                                                     

                                                                        
                                                                        

                                                                        

                            




                                                     

                                                                       

                          


                                                                        
                                                                        

                                                                        

                            









                                                                        

                                                                          
                            






                                                                                                    

                                                                           
                            






                                                                                                 

                                                                          
                            


                                                                        

                                                                           
                            




                                                     

                                                          
























































                                                                                   

                                                          
























































                                                                

             



                                                   

                                                      


                              
                                                           






                                                          
                                                                                                                               
                                                                                                                                                                       









                                                                                       






















                                                                                    

                               




















                                                                                                                                                  







                                                                         



             





                                              
                 




                                                                           





                                                           



                                                  




                                                                                 

                            
       




                                      
               







                                                        
                                      


                                     
                                                  












                                                                                           
                              




                                                                        















                                                                           

                                                      

                          
                         








                                                  
                 

                                   
                                   




















































                                                                                                                                                                                     

                                                       


































































                                                                                                                                                                                     

                                                            


                          
                           

                                        













































                                                                                                                                                                                                                                                             































                                                                                                                                                                                     
                                                                       

                                
                                                       


                                      
                                                           



                                        
                                              



                                                                       
                                                


     





















































































                                                                                                                                                                                                              



                                                                                      








                                                                                                                                                    

                                                                                       







                                                         
                                                







                                                                                                                                                          
                                                                                                                                                                                                     













































































                                                                                                                                                                                     
                                  


                                                         
                          
                            




                                                                    
                                                       
                                                                               

                                               
                                      

                                                      

                                             

                                                              

                                                               

                                                                                   
         



                                                                                             
                                          

                                                         

                                              

                                                                 
         



                                                                            
                                             

                                                              

                                                 

                                                                   

                                          

                                                         
         
                                             

                                                                          

                                               
                                                                            
                                        

                                                      
                                                                                   


                                                     
                                                                                   
                                        

                                                            
                                                                                          


                                                         
                                                                                


                                                     
                                                                           


                                                  

                                                                        

                                                

                                                                             

                                                                

                                                                                             

                                               
                                                                            

                                        

                                                                    

                                                         
                                                                               

                                        





                                                                                

                                        

                                                                                      





                                                               











                                                                                     
     
          
 
use std::{io::Cursor, time::Duration};

use base64::{prelude::BASE64_STANDARD, Engine};
use chrono::Utc;
use image::ImageReader;
use jid::JID;
use lampada::{Connected, WriteMessage, error::WriteError};
use sha1::{Digest, Sha1};
use stanza::{
    client::{
        iq::{self, Iq, IqType, Query}, Stanza
    }, xep_0030::{info, items}, xep_0060::{self, owner, pubsub::{self, Pubsub}}, xep_0084, xep_0172::{self, Nick}, xep_0203::Delay
};
use tokio::sync::oneshot;
use tracing::{debug, error, info};
use uuid::Uuid;

use crate::{
    avatar, chat::{Body, Chat, Delivery, Message}, disco::{Info, Items}, error::{
        AvatarPublishError, DatabaseError, DiscoError, Error, IqRequestError, MessageSendError, NickError, PEPError, RosterError, StatusError, SubscribeError
    }, files::FileStore, pep, presence::{Online, Presence, PresenceType}, roster::{Contact, ContactUpdate}, user::User, Command, UpdateMessage
};

use super::{
    local_only::{
        handle_delete_chat, handle_delete_messaage, handle_get_chat, handle_get_chats, handle_get_chats_ordered, handle_get_chats_ordered_with_latest_messages, handle_get_chats_ordered_with_latest_messages_and_users, handle_get_messages, handle_get_messages_with_users, handle_get_user
    }, ClientLogic
};

pub async fn handle_online<Fs: FileStore + Clone>(logic: ClientLogic<Fs>, command: Command<Fs>, connection: Connected) {
    let result = handle_online_result(&logic, command, connection).await;
    match result {
        Ok(_) => {}
        Err(e) => logic.handle_error(e).await,
    }
}

pub async fn handle_get_roster<Fs: FileStore + Clone>(
    logic: &ClientLogic<Fs>,
    connection: Connected,
) -> Result<Vec<Contact>, RosterError> {
    let iq_id = Uuid::new_v4().to_string();
    let stanza = Stanza::Iq(Iq {
        from: Some(connection.jid().clone()),
        id: iq_id.to_string(),
        to: None,
        r#type: IqType::Get,
        lang: None,
        query: Some(iq::Query::Roster(stanza::roster::Query {
            ver: None,
            items: Vec::new(),
        })),
        errors: Vec::new(),
    });
    let response = logic
        .pending()
        .request(&connection, stanza, iq_id.clone())
        .await?;
    // TODO: timeout
    match response {
        Stanza::Iq(Iq {
            from: _,
            id,
            to: _,
            r#type,
            lang: _,
            query: Some(iq::Query::Roster(stanza::roster::Query { ver: _, items })),
            errors: _,
        }) if id == iq_id && r#type == IqType::Result => {
            let contacts: Vec<Contact> = items.into_iter().map(|item| item.into()).collect();
            if let Err(e) = logic.db().replace_cached_roster(contacts.clone()).await {
                logic
                    .handle_error(Error::Roster(RosterError::Cache(e.into())))
                    .await;
            };
            Ok(contacts)
        }
        ref s @ Stanza::Iq(Iq {
            from: _,
            ref id,
            to: _,
            r#type,
            lang: _,
            query: _,
            ref errors,
        }) if *id == iq_id && r#type == IqType::Error => {
            if let Some(error) = errors.first() {
                Err(RosterError::StanzaError(error.clone()))
            } else {
                Err(RosterError::UnexpectedStanza(s.clone()))
            }
        }
        s => Err(RosterError::UnexpectedStanza(s)),
    }
}

// this can't query the client... otherwise there is a hold-up and the connection can't complete
pub async fn handle_get_roster_with_users<Fs: FileStore + Clone>(
    logic: &ClientLogic<Fs>,
    connection: Connected,
) -> Result<Vec<(Contact, User)>, RosterError> {
    let iq_id = Uuid::new_v4().to_string();
    let stanza = Stanza::Iq(Iq {
        from: Some(connection.jid().clone()),
        id: iq_id.to_string(),
        to: None,
        r#type: IqType::Get,
        lang: None,
        query: Some(iq::Query::Roster(stanza::roster::Query {
            ver: None,
            items: Vec::new(),
        })),
        errors: Vec::new(),
    });
    let response = logic
        .pending()
        .request(&connection, stanza, iq_id.clone())
        .await?;
    // TODO: timeout
    match response {
        Stanza::Iq(Iq {
            from: _,
            id,
            to: _,
            r#type,
            lang: _,
            query: Some(iq::Query::Roster(stanza::roster::Query { ver: _, items })),
            errors: _,
        }) if id == iq_id && r#type == IqType::Result => {
            let contacts: Vec<Contact> = items.into_iter().map(|item| item.into()).collect();
            if let Err(e) = logic.db().replace_cached_roster(contacts.clone()).await {
                logic
                    .handle_error(Error::Roster(RosterError::Cache(e.into())))
                    .await;
            };
            let mut users = Vec::new();
            for contact in &contacts {
                let user = logic.db().read_user(contact.user_jid.clone()).await?;
                users.push(user);
            }
            Ok(contacts.into_iter().zip(users).collect())
        }
        ref s @ Stanza::Iq(Iq {
            from: _,
            ref id,
            to: _,
            r#type,
            lang: _,
            query: _,
            ref errors,
        }) if *id == iq_id && r#type == IqType::Error => {
            if let Some(error) = errors.first() {
                Err(RosterError::StanzaError(error.clone()))
            } else {
                Err(RosterError::UnexpectedStanza(s.clone()))
            }
        }
        s => Err(RosterError::UnexpectedStanza(s)),
    }
}

pub async fn handle_add_contact<Fs: FileStore + Clone>(
    logic: &ClientLogic<Fs>,
    connection: Connected,
    jid: JID,
) -> Result<(), RosterError> {
    let iq_id = Uuid::new_v4().to_string();
    let set_stanza = Stanza::Iq(Iq {
        from: Some(connection.jid().clone()),
        id: iq_id.clone(),
        to: None,
        r#type: IqType::Set,
        lang: None,
        query: Some(iq::Query::Roster(stanza::roster::Query {
            ver: None,
            items: vec![stanza::roster::Item {
                approved: None,
                ask: false,
                jid,
                name: None,
                subscription: None,
                groups: Vec::new(),
            }],
        })),
        errors: Vec::new(),
    });
    let response = logic
        .pending()
        .request(&connection, set_stanza, iq_id.clone())
        .await?;
    match response {
        Stanza::Iq(Iq {
            from: _,
            id,
            to: _,
            r#type,
            lang: _,
            query: _,
            errors: _,
        }) if id == iq_id && r#type == IqType::Result => Ok(()),
        ref s @ Stanza::Iq(Iq {
            from: _,
            ref id,
            to: _,
            r#type,
            lang: _,
            query: _,
            ref errors,
        }) if *id == iq_id && r#type == IqType::Error => {
            if let Some(error) = errors.first() {
                Err(RosterError::StanzaError(error.clone()))
            } else {
                Err(RosterError::UnexpectedStanza(s.clone()))
            }
        }
        s => Err(RosterError::UnexpectedStanza(s)),
    }
}

pub async fn handle_buddy_request<Fs: FileStore + Clone>(
    logic: &ClientLogic<Fs>,
    connection: Connected,
    jid: JID,
) -> Result<(), SubscribeError> {
    let client_user = logic.db.read_user(logic.bare_jid.clone()).await?;
    let nick = client_user.nick.map(|nick| Nick(nick));
    let presence = Stanza::Presence(stanza::client::presence::Presence {
        to: Some(jid.clone()),
        r#type: Some(stanza::client::presence::PresenceType::Subscribe),
        nick,
        ..Default::default()
    });
    connection.write_handle().write(presence).await?;
    let presence = Stanza::Presence(stanza::client::presence::Presence {
        to: Some(jid),
        r#type: Some(stanza::client::presence::PresenceType::Subscribed),
        ..Default::default()
    });
    connection.write_handle().write(presence).await?;
    Ok(())
}

pub async fn handle_subscription_request<Fs: FileStore + Clone>(
    logic: &ClientLogic<Fs>,
    connection: Connected,
    jid: JID,
) -> Result<(), SubscribeError> {
    // TODO: i should probably have builders
    let client_user = logic.db.read_user(logic.bare_jid.clone()).await?;
    let nick = client_user.nick.map(|nick| Nick(nick));
    let presence = Stanza::Presence(stanza::client::presence::Presence {
        to: Some(jid),
        r#type: Some(stanza::client::presence::PresenceType::Subscribe),
        nick,
        ..Default::default()
    });
    connection.write_handle().write(presence).await?;
    Ok(())
}

pub async fn handle_accept_buddy_request<Fs: FileStore + Clone>(
    logic: &ClientLogic<Fs>,
    connection: Connected,
    jid: JID,
) -> Result<(), SubscribeError> {
    let presence = Stanza::Presence(stanza::client::presence::Presence {
        to: Some(jid.clone()),
        r#type: Some(stanza::client::presence::PresenceType::Subscribed),
        ..Default::default()
    });
    connection.write_handle().write(presence).await?;
    let client_user = logic.db.read_user(logic.bare_jid.clone()).await?;
    let nick = client_user.nick.map(|nick| Nick(nick));
    let presence = Stanza::Presence(stanza::client::presence::Presence {
        to: Some(jid),
        r#type: Some(stanza::client::presence::PresenceType::Subscribe),
        nick,
        ..Default::default()
    });
    connection.write_handle().write(presence).await?;
    Ok(())
}

pub async fn handle_accept_subscription_request<Fs: FileStore + Clone>(
    logic: &ClientLogic<Fs>,
    connection: Connected,
    jid: JID,
) -> Result<(), SubscribeError> {
    let client_user = logic.db.read_user(logic.bare_jid.clone()).await?;
    let nick = client_user.nick.map(|nick| Nick(nick));
    let presence = Stanza::Presence(stanza::client::presence::Presence {
        to: Some(jid),
        r#type: Some(stanza::client::presence::PresenceType::Subscribe),
        nick,
        ..Default::default()
    });
    connection.write_handle().write(presence).await?;
    Ok(())
}

pub async fn handle_unsubscribe_from_contact(
    connection: Connected,
    jid: JID,
) -> Result<(), WriteError> {
    let presence = Stanza::Presence(stanza::client::presence::Presence {
        to: Some(jid),
        r#type: Some(stanza::client::presence::PresenceType::Unsubscribe),
        ..Default::default()
    });
    connection.write_handle().write(presence).await?;
    Ok(())
}

pub async fn handle_unsubscribe_contact(connection: Connected, jid: JID) -> Result<(), WriteError> {
    let presence = Stanza::Presence(stanza::client::presence::Presence {
        to: Some(jid),
        r#type: Some(stanza::client::presence::PresenceType::Unsubscribed),
        ..Default::default()
    });
    connection.write_handle().write(presence).await?;
    Ok(())
}

pub async fn handle_unfriend_contact(connection: Connected, jid: JID) -> Result<(), WriteError> {
    let presence = Stanza::Presence(stanza::client::presence::Presence {
        to: Some(jid.clone()),
        r#type: Some(stanza::client::presence::PresenceType::Unsubscribe),
        ..Default::default()
    });
    connection.write_handle().write(presence).await?;
    let presence = Stanza::Presence(stanza::client::presence::Presence {
        to: Some(jid),
        r#type: Some(stanza::client::presence::PresenceType::Unsubscribed),
        ..Default::default()
    });
    connection.write_handle().write(presence).await?;
    Ok(())
}

pub async fn handle_delete_contact<Fs: FileStore + Clone>(
    logic: &ClientLogic<Fs>,
    connection: Connected,
    jid: JID,
) -> Result<(), RosterError> {
    let iq_id = Uuid::new_v4().to_string();
    let set_stanza = Stanza::Iq(Iq {
        from: Some(connection.jid().clone()),
        id: iq_id.clone(),
        to: None,
        r#type: IqType::Set,
        lang: None,
        query: Some(iq::Query::Roster(stanza::roster::Query {
            ver: None,
            items: vec![stanza::roster::Item {
                approved: None,
                ask: false,
                jid,
                name: None,
                subscription: Some(stanza::roster::Subscription::Remove),
                groups: Vec::new(),
            }],
        })),
        errors: Vec::new(),
    });
    let result = logic
        .pending()
        .request(&connection, set_stanza, iq_id.clone())
        .await?;
    match result {
        Stanza::Iq(Iq {
            from: _,
            id,
            to: _,
            r#type,
            lang: _,
            query: _,
            errors: _,
            // don't really need to check matching id as request() does this anyway
        }) if id == iq_id && r#type == IqType::Result => Ok(()),
        ref s @ Stanza::Iq(Iq {
            from: _,
            ref id,
            to: _,
            r#type,
            lang: _,
            query: _,
            ref errors,
        }) if *id == iq_id && r#type == IqType::Error => {
            if let Some(error) = errors.first() {
                Err(RosterError::StanzaError(error.clone()))
            } else {
                Err(RosterError::UnexpectedStanza(s.clone()))
            }
        }
        s => Err(RosterError::UnexpectedStanza(s)),
    }
}

pub async fn handle_update_contact<Fs: FileStore + Clone>(
    logic: &ClientLogic<Fs>,
    connection: Connected,
    jid: JID,
    contact_update: ContactUpdate,
) -> Result<(), RosterError> {
    let iq_id = Uuid::new_v4().to_string();
    let groups = Vec::from_iter(
        contact_update
            .groups
            .into_iter()
            .map(|group| stanza::roster::Group(Some(group))),
    );
    let set_stanza = Stanza::Iq(Iq {
        from: Some(connection.jid().clone()),
        id: iq_id.clone(),
        to: None,
        r#type: IqType::Set,
        lang: None,
        query: Some(iq::Query::Roster(stanza::roster::Query {
            ver: None,
            items: vec![stanza::roster::Item {
                approved: None,
                ask: false,
                jid,
                name: contact_update.name,
                subscription: None,
                groups,
            }],
        })),
        errors: Vec::new(),
    });
    let response = logic
        .pending()
        .request(&connection, set_stanza, iq_id.clone())
        .await?;
    match response {
        Stanza::Iq(Iq {
            from: _,
            id,
            to: _,
            r#type,
            lang: _,
            query: _,
            errors: _,
        }) if id == iq_id && r#type == IqType::Result => Ok(()),
        ref s @ Stanza::Iq(Iq {
            from: _,
            ref id,
            to: _,
            r#type,
            lang: _,
            query: _,
            ref errors,
        }) if *id == iq_id && r#type == IqType::Error => {
            if let Some(error) = errors.first() {
                Err(RosterError::StanzaError(error.clone()))
            } else {
                Err(RosterError::UnexpectedStanza(s.clone()))
            }
        }
        s => Err(RosterError::UnexpectedStanza(s)),
    }
}

pub async fn handle_set_status<Fs: FileStore + Clone>(
    logic: &ClientLogic<Fs>,
    connection: Connected,
    online: Online,
) -> Result<(), StatusError> {
    logic.db().upsert_cached_status(online.clone()).await?;
    connection
        .write_handle()
        .write(Stanza::Presence(online.into_stanza(None)))
        .await?;
    Ok(())
}

pub async fn handle_send_message<Fs: FileStore + Clone>(logic: &ClientLogic<Fs>, connection: Connected, jid: JID, body: Body) {
    // upsert the chat and user the message will be delivered to. if there is a conflict, it will return whatever was there, otherwise it will return false by default.
    // let have_chatted = logic.db().upsert_chat_and_user(&jid).await.unwrap_or(false);
    let have_chatted  = match logic.db().upsert_chat_and_user(&jid).await {
        Ok(have_chatted) => {
            have_chatted
        },
        Err(e) => {
            error!("{}", e);
            false
        },
    };

    let nick;
    let mark_chat_as_chatted;
    if have_chatted == false {
        match logic.db.read_user(logic.bare_jid.clone()).await {
            Ok(u) => {
                nick = u.nick.map(|nick| Nick(nick));
                mark_chat_as_chatted = true;
            }
            Err(e) => {
                logic
                    .handle_error(MessageSendError::GetUserDetails(e.into()).into())
                    .await;
                nick = None;
                mark_chat_as_chatted = false;
            }
        }
    } else {
        nick = None;
        mark_chat_as_chatted = false;
    }

    // generate message struct
    let id = Uuid::new_v4();
    let timestamp = Utc::now();
    let message = Message {
        id,
        from: connection.jid().as_bare(),
        body: body.clone(),
        timestamp,
        delivery: Some(Delivery::Sending),
    };

    // try to store in message history that there is a new message that is sending. if client is quit mid-send then can mark as failed and re-send
    // TODO: mark these as potentially failed upon client launch
    if let Err(e) = logic
        .db()
        .create_message_with_self_resource(message.clone(), jid.clone(), connection.jid().clone())
        .await
    {
        // TODO: should these really be handle_error or just the error macro?
        logic
            .handle_error(MessageSendError::MessageHistory(e.into()).into())
            .await;
    }

    let from = match logic.db().read_user(logic.bare_jid.clone()).await {
        Ok(u) => u,
        Err(e) => {
            error!("{}", e);
            User {
                jid: logic.bare_jid.clone(),
                nick: None,
                avatar: None,
            }
        },
    };

    // tell the client a message is being sent
    logic
        .update_sender()
        .send(UpdateMessage::Message {
            to: jid.as_bare(),
            message,
            from,
        })
        .await;

    // prepare the message stanza
    let message_stanza = Stanza::Message(stanza::client::message::Message {
        from: Some(connection.jid().clone()),
        id: Some(id.to_string()),
        to: Some(jid.clone()),
        // TODO: specify message type
        r#type: stanza::client::message::MessageType::Chat,
        // TODO: lang ?
        body: Some(stanza::client::message::Body {
            lang: None,
            body: Some(body.body.clone()),
        }),
        // include delay to have a consistent timestamp between server and client
        delay: Some(Delay {
            from: None,
            stamp: timestamp,
        }),
        nick,
        ..Default::default()
    });

    // send the message
    let result = connection
        .write_handle()
        .write(message_stanza.clone())
        .await;
    match result {
        Ok(_) => {
            info!("sent message: {:?}", message_stanza);
            logic
                .update_sender()
                .send(UpdateMessage::MessageDelivery {
                    id,
                    delivery: Delivery::Written,
                    chat: jid.clone(),
                })
                .await;
            if mark_chat_as_chatted {
                debug!("marking chat as chatted");
                if let Err(e) = logic.db.mark_chat_as_chatted(jid).await {
                    logic
                        .handle_error(MessageSendError::MarkChatAsChatted(e.into()).into())
                        .await;
                }
            }
        }
        Err(e) => {
            logic
                .update_sender()
                .send(UpdateMessage::MessageDelivery {
                    id,
                    delivery: Delivery::Failed,
                    chat: jid,
                })
                .await;
            logic.handle_error(MessageSendError::Write(e).into()).await;
        }
    }
}

pub async fn handle_send_presence(
    connection: Connected,
    jid: Option<JID>,
    presence: PresenceType,
) -> Result<(), WriteError> {
    let mut presence: stanza::client::presence::Presence = presence.into();
    presence.to = jid;
    connection
        .write_handle()
        .write(Stanza::Presence(presence))
        .await?;
    Ok(())
}

pub async fn handle_disco_info<Fs: FileStore + Clone>(
    logic: &ClientLogic<Fs>,
    connection: Connected,
    jid: Option<JID>,
    node: Option<String>,
) -> Result<Info, DiscoError> {
    let id = Uuid::new_v4().to_string();
    let request = Iq {
        from: Some(connection.jid().clone()),
        id: id.clone(),
        to: jid.clone(),
        r#type: IqType::Get,
        lang: None,
        query: Some(Query::DiscoInfo(info::Query {
            node,
            features: Vec::new(),
            identities: Vec::new(),
            extensions: Vec::new(),
        })),
        errors: Vec::new(),
    };
    match logic
        .pending()
        .request(&connection, Stanza::Iq(request), id)
        .await?
    {
        Stanza::Iq(Iq {
            from,
            r#type,
            query,
            mut errors,
            ..
            // TODO: maybe abstract a bunch of these different errors related to iqs into an iq error thing? as in like call iq.result(), get the query from inside, error otherwise.
        }) if r#type == IqType::Result || r#type == IqType::Error => {
            if from == jid || {
                if jid == None {
                    from == Some(connection.jid().as_bare())
                } else {
                    false
                }
            } {
                match r#type {
                    IqType::Result => {
                        if let Some(query) = query {
                            match query {
                                Query::DiscoInfo(info) => Ok(info.into()),
                                q => Err(DiscoError::MismatchedQuery(q)),
                            }
                        } else {
                            Err(DiscoError::MissingQuery)
                        }
                    }
                    IqType::Error => {
                        if let Some(error) = errors.pop() {
                            Err(error.into())
                        } else {
                            Err(DiscoError::MissingError)
                        }
                    }
                    _ => unreachable!(),
                }
            } else {
                Err(DiscoError::IncorrectEntity(
                    from.unwrap_or_else(|| connection.jid().as_bare()),
                ))
            }
        }
        s => Err(DiscoError::UnexpectedStanza(s)),
    }
}

pub async fn handle_disco_items<Fs: FileStore + Clone>(
    logic: &ClientLogic<Fs>,
    connection: Connected,
    jid: Option<JID>,
    node: Option<String>,
) -> Result<Items, DiscoError> {
    let id = Uuid::new_v4().to_string();
    let request = Iq {
        from: Some(connection.jid().clone()),
        id: id.clone(),
        to: jid.clone(),
        r#type: IqType::Get,
        lang: None,
        query: Some(Query::DiscoItems(items::Query {
            node,
            items: Vec::new(),
        })),
        errors: Vec::new(),
    };
    match logic
        .pending()
        .request(&connection, Stanza::Iq(request), id)
        .await?
    {
        Stanza::Iq(Iq {
            from,
            r#type,
            query,
            mut errors,
            ..
            // TODO: maybe abstract a bunch of these different errors related to iqs into an iq error thing? as in like call iq.result(), get the query from inside, error otherwise.
        }) if r#type == IqType::Result || r#type == IqType::Error => {
            if from == jid || {
                if jid == None {
                    from == Some(connection.jid().as_bare())
                } else {
                    false
                }
            } {
                match r#type {
                    IqType::Result => {
                        if let Some(query) = query {
                            match query {
                                Query::DiscoItems(items) => Ok(items.into()),
                                q => Err(DiscoError::MismatchedQuery(q)),
                            }
                        } else {
                            Err(DiscoError::MissingQuery)
                        }
                    }
                    IqType::Error => {
                        if let Some(error) = errors.pop() {
                            Err(error.into())
                        } else {
                            Err(DiscoError::MissingError)
                        }
                    }
                    _ => unreachable!(),
                }
            } else {
                Err(DiscoError::IncorrectEntity(
                    from.unwrap_or_else(|| connection.jid().as_bare()),
                ))
            }
        }
        s => Err(DiscoError::UnexpectedStanza(s)),
    }
}

pub async fn handle_publish_pep_item<Fs: FileStore + Clone>(
    logic: &ClientLogic<Fs>,
    connection: Connected,
    item: pep::Item,
    node: String,
) -> Result<(), PEPError> {
    let id = Uuid::new_v4().to_string();
    let publish = match item {
        pep::Item::Nick(n) => {
            if let Some(n) = n {
                pubsub::Publish {
                    node,
                    items: vec![pubsub::Item {
                        item: Some(pubsub::Content::Nick(Nick(n))),
                        ..Default::default()
                    }],
                }
            } else {
                pubsub::Publish {
                    node,
                    items: vec![pubsub::Item {
                        item: Some(pubsub::Content::Nick(Nick("".to_string()))),
                        ..Default::default()
                    }]
                }
            }
        },
        pep::Item::AvatarMetadata(metadata) => {
            if let Some(metadata) = metadata {
                pubsub::Publish { node, items: vec![pubsub::Item {
                    item: Some(pubsub::Content::AvatarMetadata(xep_0084::Metadata { info: vec![xep_0084::Info { bytes: metadata.bytes, height: None, id: metadata.hash.clone(), r#type: metadata.r#type, url: None, width: None }], pointers: Vec::new() })),
                    id: Some(metadata.hash),
                    ..Default::default()
                }]}
            } else {
                pubsub::Publish { node, items: vec![pubsub::Item {
                    item: Some(pubsub::Content::AvatarMetadata(xep_0084::Metadata { info: Vec::new(), pointers: Vec::new() })),
                    ..Default::default()
                }]}
            }
        },
        pep::Item::AvatarData(data) => {
            if let Some(data) = data {
                pubsub::Publish { node, items: vec![pubsub::Item {
                    item: Some(pubsub::Content::AvatarData(xep_0084::Data(data.data_b64))),
                    id: Some(data.hash),
                    ..Default::default()
                }] }
            } else {
                pubsub::Publish { node, items: vec![pubsub::Item {
                    item: Some(pubsub::Content::AvatarData(xep_0084::Data("".to_string()))),
                    ..Default::default()
                }]}
            }
        },
    };
    let request = Iq {
        from: Some(connection.jid().clone()),
        id: id.clone(),
        to: None,
        r#type: IqType::Set,
        lang: None,
        query: Some(Query::Pubsub(Pubsub::Publish(publish, None))),
        errors: Vec::new(),
    };
    match logic
        .pending()
        .request(&connection, Stanza::Iq(request), id)
        .await? {
        
        Stanza::Iq(Iq {
            from,
            r#type,
            query,
            errors,
            ..
            // TODO: maybe abstract a bunch of these different errors related to iqs into an iq error thing? as in like call iq.result(), get the query from inside, error otherwise.
        }) if r#type == IqType::Result || r#type == IqType::Error => {
            if from == None || 
                    from == Some(connection.jid().as_bare())
             {
                match r#type {
                    IqType::Result => {
                        if let Some(query) = query {
                            match query {
                                Query::Pubsub(_) => Ok(()),
                                q => Err(PEPError::MismatchedQuery(q)),
                            }
                        } else {
                            Err(PEPError::MissingQuery)
                        }
                    }
                    IqType::Error => {
                        Err(PEPError::StanzaErrors(errors))
                    }
                    _ => unreachable!(),
                }
            } else {
                Err(PEPError::IncorrectEntity(
                    from.unwrap_or_else(|| connection.jid().as_bare()),
                ))
            }
        }
        s => Err(PEPError::UnexpectedStanza(s)),
    }
}

pub async fn handle_get_pep_item<Fs: FileStore + Clone>(logic: &ClientLogic<Fs>, connection: Connected, jid: Option<JID>, node: String, id: String) -> Result<pep::Item, PEPError> {
    let stanza_id = Uuid::new_v4().to_string();
    let request = Iq {
        from: Some(connection.jid().clone()),
        id: stanza_id.clone(),
        to: jid.clone(),
        r#type: IqType::Get,
        lang: None,
        query: Some(Query::Pubsub(Pubsub::Items(pubsub::Items {
            max_items: None,
            node,
            subid: None,
            items: vec![pubsub::Item { id: Some(id.clone()), publisher: None, item: None }],
        }))),
        errors: Vec::new(),
    };
    match logic
        .pending()
        .request(&connection, Stanza::Iq(request), stanza_id)
        .await? {
        
        Stanza::Iq(Iq {
            from,
            r#type,
            query,
            errors,
            ..
            // TODO: maybe abstract a bunch of these different errors related to iqs into an iq error thing? as in like call iq.result(), get the query from inside, error otherwise.
        }) if r#type == IqType::Result || r#type == IqType::Error => {
            if from == jid || {
                if jid == None {
                    from == Some(connection.jid().as_bare())
                } else {
                    false
                }
            } {
                match r#type {
                    IqType::Result => {
                        if let Some(query) = query {
                            match query {
                                Query::Pubsub(Pubsub::Items(mut items)) => {
                                    if let Some(item) = items.items.pop() {
                                        if item.id == Some(id.clone()) {
                                            match item.item.ok_or(PEPError::MissingItem)? {
                                                pubsub::Content::Nick(nick) => {
                                                    if nick.0.is_empty() {
                                                        Ok(pep::Item::Nick(None))
                                                    } else {
                                                        Ok(pep::Item::Nick(Some(nick.0)))
                                                        
                                                    }
                                                },
                                                pubsub::Content::AvatarData(data) => Ok(pep::Item::AvatarData(Some(avatar::Data { hash: id, data_b64: data.0 }))),
                                                pubsub::Content::AvatarMetadata(metadata) => Ok(pep::Item::AvatarMetadata(metadata.info.into_iter().find(|info| info.url.is_none()).map(|info| info.into()))),
                                                pubsub::Content::Unknown(_element) => Err(PEPError::UnsupportedItem),
                                            }
                                        } else {
                                            Err(PEPError::IncorrectItemID(id, item.id.unwrap_or_else(|| "missing id".to_string())))
                                        }
                                    } else {
                                        Err(PEPError::MissingItem)
                                    }
                                },
                                q => Err(PEPError::MismatchedQuery(q)),
                            }
                        } else {
                            Err(PEPError::MissingQuery)
                        }
                    }
                    IqType::Error => {
                        Err(PEPError::StanzaErrors(errors))
                    }
                    _ => unreachable!(),
                }
            } else {
                // TODO: include expected entity
                Err(PEPError::IncorrectEntity(
                    from.unwrap_or_else(|| connection.jid().as_bare()),
                ))
            }
        }
        s => Err(PEPError::UnexpectedStanza(s)),
    }
}

pub async fn handle_change_nick<Fs: FileStore + Clone>(logic: &ClientLogic<Fs>, nick: Option<String>) -> Result<(), NickError> {
    logic.client().publish(pep::Item::Nick(nick), xep_0172::XMLNS.to_string()).await?;
    Ok(())
}

pub async fn handle_change_avatar<Fs: FileStore + Clone>(logic: &ClientLogic<Fs>, img_data: Option<Vec<u8>>) -> Result<(), AvatarPublishError<Fs>> {
    match img_data {
        // set avatar
        Some(data) => {
            // load the image data and guess the format
            let image = ImageReader::new(Cursor::new(data)).with_guessed_format()?.decode()?;

            // convert the image to png;
            let mut data_png = Vec::new();
            let image = image.resize(192, 192, image::imageops::FilterType::Nearest);
            image.write_to(&mut Cursor::new(&mut data_png), image::ImageFormat::Jpeg)?;

            // calculate the length of the data in bytes.
            let bytes = data_png.len().try_into()?;

            // calculate sha1 hash of the data
            let mut sha1 = Sha1::new();
            sha1.update(&data_png);
            let sha1_result = sha1.finalize();
            let hash = hex::encode(sha1_result);

            // encode the image data as base64
            let data_b64 = BASE64_STANDARD.encode(data_png.clone());

            // publish the data to the data node
            logic.client().publish(pep::Item::AvatarData(Some(avatar::Data { hash: hash.clone(), data_b64 })), "urn:xmpp:avatar:data".to_string()).await?;

            // publish the metadata to the metadata node
            logic.client().publish(pep::Item::AvatarMetadata(Some(avatar::Metadata { bytes, hash: hash.clone(), r#type: "image/jpeg".to_string() })), "urn:xmpp:avatar:metadata".to_string()).await?;

            // if everything went well, save the data to the disk.

            if !logic.file_store().is_stored(&hash).await.map_err(|err| AvatarPublishError::FileStore(err))? {
                logic.file_store().store(&hash, &data_png).await.map_err(|err| AvatarPublishError::FileStore(err))?
            }
            // when the client receives the updated metadata notification from the pep node, it will already have it saved on the disk so will not require a retrieval.
            // TODO: should the node be purged?

            Ok(())
        },
        // remove avatar
        None => {
            logic.client().delete_pep_node("urn:xmpp:avatar:data".to_string()).await?;
            logic.client().publish(pep::Item::AvatarMetadata(None), "urn:xmpp:avatar:metadata".to_string(), ).await?;
            Ok(())
        },
    }
}

pub async fn handle_delete_pep_node<Fs: FileStore + Clone>(
    logic: &ClientLogic<Fs>,
    connection: Connected,
    node: String,
) -> Result<(), PEPError> {
    let id = Uuid::new_v4().to_string();
    let request = Iq {
        from: Some(connection.jid().clone()),
        id: id.clone(),
        to: None,
        r#type: IqType::Set,
        lang: None,
        query: Some(Query::PubsubOwner(xep_0060::owner::Pubsub::Delete(owner::Delete{ node, redirect: None }))),
        errors: Vec::new(),
    };
    match logic
        .pending()
        .request(&connection, Stanza::Iq(request), id)
        .await? {
        
        Stanza::Iq(Iq {
            from,
            r#type,
            query,
            errors,
            ..
            // TODO: maybe abstract a bunch of these different errors related to iqs into an iq error thing? as in like call iq.result(), get the query from inside, error otherwise.
        }) if r#type == IqType::Result || r#type == IqType::Error => {
            if from == None || 
                    from == Some(connection.jid().as_bare())
             {
                match r#type {
                    IqType::Result => {
                        if let Some(query) = query {
                            match query {
                                Query::PubsubOwner(_) => Ok(()),
                                q => Err(PEPError::MismatchedQuery(q)),
                            }
                        } else {
                            // Err(PEPError::MissingQuery)
                            Ok(())
                        }
                    }
                    IqType::Error => {
                        Err(PEPError::StanzaErrors(errors))
                    }
                    _ => unreachable!(),
                }
            } else {
                Err(PEPError::IncorrectEntity(
                    from.unwrap_or_else(|| connection.jid().as_bare()),
                ))
            }
        }
        s => Err(PEPError::UnexpectedStanza(s)),
    }
}

// TODO: could probably macro-ise?
pub async fn handle_online_result<Fs: FileStore + Clone>(
    logic: &ClientLogic<Fs>,
    command: Command<Fs>,
    connection: Connected,
) -> Result<(), Error<Fs>> {
    match command {
        Command::GetRoster(result_sender) => {
            let roster = handle_get_roster(logic, connection).await;
            let _ = result_sender.send(roster);
        }
        Command::GetRosterWithUsers(result_sender) => {
            let roster = handle_get_roster_with_users(logic, connection).await;
            let _ = result_sender.send(roster);
        }
        Command::GetChats(sender) => {
            let chats = handle_get_chats(logic).await;
            let _ = sender.send(chats);
        }
        Command::GetChatsOrdered(sender) => {
            let chats = handle_get_chats_ordered(logic).await;
            let _ = sender.send(chats);
        }
        Command::GetChatsOrderedWithLatestMessages(sender) => {
            let chats = handle_get_chats_ordered_with_latest_messages(logic).await;
            let _ = sender.send(chats);
        }
        Command::GetChatsOrderedWithLatestMessagesAndUsers(sender) => {
            let chats = handle_get_chats_ordered_with_latest_messages_and_users(logic).await;
            sender.send(chats);
        }
        Command::GetChat(jid, sender) => {
            let chat = handle_get_chat(logic, jid).await;
            let _ = sender.send(chat);
        }
        Command::GetMessages(jid, sender) => {
            let messages = handle_get_messages(logic, jid).await;
            let _ = sender.send(messages);
        }
        Command::GetMessagesWithUsers(jid, sender) => {
            let messages = handle_get_messages_with_users(logic, jid).await;
            sender.send(messages);
        }
        Command::DeleteChat(jid, sender) => {
            let result = handle_delete_chat(logic, jid).await;
            let _ = sender.send(result);
        }
        Command::DeleteMessage(uuid, sender) => {
            let result = handle_delete_messaage(logic, uuid).await;
            let _ = sender.send(result);
        }
        Command::GetUser(jid, sender) => {
            let user = handle_get_user(logic, jid).await;
            let _ = sender.send(user);
        }
        Command::AddContact(jid, sender) => {
            let result = handle_add_contact(logic, connection, jid).await;
            let _ = sender.send(result);
        }
        Command::BuddyRequest(jid, sender) => {
            let result = handle_buddy_request(logic, connection, jid).await;
            let _ = sender.send(result);
        }
        Command::SubscriptionRequest(jid, sender) => {
            let result = handle_subscription_request(logic, connection, jid).await;
            let _ = sender.send(result);
        }
        Command::AcceptBuddyRequest(jid, sender) => {
            let result = handle_accept_buddy_request(logic, connection, jid).await;
            let _ = sender.send(result);
        }
        Command::AcceptSubscriptionRequest(jid, sender) => {
            let result = handle_accept_subscription_request(logic, connection, jid).await;
            let _ = sender.send(result);
        }
        Command::UnsubscribeFromContact(jid, sender) => {
            let result = handle_unsubscribe_from_contact(connection, jid).await;
            let _ = sender.send(result);
        }
        Command::UnsubscribeContact(jid, sender) => {
            let result = handle_unsubscribe_contact(connection, jid).await;
            let _ = sender.send(result);
        }
        Command::UnfriendContact(jid, sender) => {
            let result = handle_unfriend_contact(connection, jid).await;
            let _ = sender.send(result);
        }
        Command::DeleteContact(jid, sender) => {
            let result = handle_delete_contact(logic, connection, jid).await;
            let _ = sender.send(result);
        }
        Command::UpdateContact(jid, contact_update, sender) => {
            let result = handle_update_contact(logic, connection, jid, contact_update).await;
            let _ = sender.send(result);
        }
        Command::SetStatus(online, sender) => {
            let result = handle_set_status(logic, connection, online).await;
            let _ = sender.send(result);
        }
        Command::SendMessage(jid, body) => {
            handle_send_message(logic, connection, jid, body).await;
        }
        Command::SendPresence(jid, presence, sender) => {
            let result = handle_send_presence(connection, jid, presence).await;
            let _ = sender.send(result);
        }
        Command::DiscoInfo(jid, node, sender) => {
            let result = handle_disco_info(logic, connection, jid, node).await;
            let _ = sender.send(result);
        }
        Command::DiscoItems(jid, node, sender) => {
            let result = handle_disco_items(logic, connection, jid, node).await;
            let _ = sender.send(result);
        }
        Command::PublishPEPItem { item, node, sender } => {
            let result = handle_publish_pep_item(logic, connection, item, node).await;
            let _ = sender.send(result);
        }
        Command::ChangeNick(nick, sender) => {
            let result = handle_change_nick(logic, nick).await;
            let _ = sender.send(result);
        }
        Command::ChangeAvatar(img_data, sender) => {
            let result = handle_change_avatar(logic, img_data).await;
            let _ = sender.send(result);
        },
        Command::DeletePEPNode { node, sender } => {
            let result = handle_delete_pep_node(logic, connection, node).await;
            let _ = sender.send(result);
        },
        Command::GetPEPItem { node, sender, jid, id } => {
            let result = handle_get_pep_item(logic, connection, jid, node, id).await;
            let _ = sender.send(result);
        },
    }
    Ok(())
}