diff options
| author | 2025-03-27 19:09:35 +0000 | |
|---|---|---|
| committer | 2025-03-27 19:09:35 +0000 | |
| commit | a367aca33fecc03270b5b9ad2a6a21281d760fd8 (patch) | |
| tree | 695270823ee5d55f483875580c509fb2c300bd26 /filamento | |
| parent | 83a6aa0574190137b38331bd53795324139237cf (diff) | |
| download | luz-a367aca33fecc03270b5b9ad2a6a21281d760fd8.tar.gz luz-a367aca33fecc03270b5b9ad2a6a21281d760fd8.tar.bz2 luz-a367aca33fecc03270b5b9ad2a6a21281d760fd8.zip | |
refactor(filamento): handle_online logic
Diffstat (limited to '')
| -rw-r--r-- | filamento/examples/example.rs | 2 | ||||
| -rw-r--r-- | filamento/src/error.rs | 20 | ||||
| -rw-r--r-- | filamento/src/lib.rs | 7 | ||||
| -rw-r--r-- | filamento/src/logic/abort.rs | 5 | ||||
| -rw-r--r-- | filamento/src/logic/connect.rs | 3 | ||||
| -rw-r--r-- | filamento/src/logic/local_only.rs | 47 | ||||
| -rw-r--r-- | filamento/src/logic/mod.rs | 69 | ||||
| -rw-r--r-- | filamento/src/logic/online.rs | 1152 | ||||
| -rw-r--r-- | filamento/src/logic/process_stanza.rs | 15 | 
9 files changed, 701 insertions, 619 deletions
| diff --git a/filamento/examples/example.rs b/filamento/examples/example.rs index e36968d..b1ab6ce 100644 --- a/filamento/examples/example.rs +++ b/filamento/examples/example.rs @@ -20,7 +20,7 @@ async fn main() {      });      client.connect().await.unwrap(); -    tokio::time::sleep(Duration::from_secs(5)).await; +    tokio::time::sleep(Duration::from_secs(15)).await;      info!("sending message");      client          .send_message( diff --git a/filamento/src/error.rs b/filamento/src/error.rs index 6277292..ccb4406 100644 --- a/filamento/src/error.rs +++ b/filamento/src/error.rs @@ -73,6 +73,8 @@ pub enum ConnectionJobError {  pub enum RosterError {      #[error("cache: {0}")]      Cache(#[from] DatabaseError), +    #[error("iq response: {0}")] +    IqResponse(#[from] RequestError),      #[error("stream write: {0}")]      Write(#[from] WriteError),      // TODO: display for stanza, to show as xml, same for read error types. @@ -87,6 +89,20 @@ pub enum RosterError {  }  #[derive(Debug, Error, Clone)] +pub enum RequestError { +    #[error("sending request: {0}")] +    Write(#[from] WriteError), +    #[error("receiving expected response: {0}")] +    Read(#[from] ReadError), +} + +#[derive(Debug, Error, Clone)] +pub enum ResponseError { +    #[error("no matching id: {0}")] +    NoMatchingId(String), +} + +#[derive(Debug, Error, Clone)]  #[error("database error: {0}")]  pub struct DatabaseError(pub Arc<sqlx::Error>); @@ -107,8 +123,8 @@ impl From<sqlx::Error> for DatabaseOpenError {  pub enum IqError {      #[error("writing response: {0}")]      WriteError(#[from] WriteError), -    #[error("no iq with id matching `{0}`")] -    NoMatchingId(String), +    #[error("receiving response: `{0}`")] +    ReceivedResponse(#[from] ResponseError),      #[error("incorrect addressee: {0}")]      IncorrectAddressee(jid::JID),  } diff --git a/filamento/src/lib.rs b/filamento/src/lib.rs index 1e9207c..89da1a3 100644 --- a/filamento/src/lib.rs +++ b/filamento/src/lib.rs @@ -178,12 +178,7 @@ impl Client {              timeout: Duration::from_secs(10),          }; -        let logic = ClientLogic::new( -            client.clone(), -            db, -            Arc::new(Mutex::new(HashMap::new())), -            update_send, -        ); +        let logic = ClientLogic::new(client.clone(), db, update_send);          let actor: CoreClient<ClientLogic> =              CoreClient::new(jid, password, command_receiver, None, sup_recv, logic); diff --git a/filamento/src/logic/abort.rs b/filamento/src/logic/abort.rs index 32c4823..df82655 100644 --- a/filamento/src/logic/abort.rs +++ b/filamento/src/logic/abort.rs @@ -3,8 +3,5 @@ use lampada::error::ReadError;  use super::ClientLogic;  pub async fn on_abort(logic: ClientLogic) { -    let mut iqs = logic.pending().lock().await; -    for (_id, sender) in iqs.drain() { -        let _ = sender.send(Err(ReadError::LostConnection)); -    } +    logic.pending().drain().await;  } diff --git a/filamento/src/logic/connect.rs b/filamento/src/logic/connect.rs index d7b9fee..dc05448 100644 --- a/filamento/src/logic/connect.rs +++ b/filamento/src/logic/connect.rs @@ -19,7 +19,7 @@ pub async fn handle_connect(logic: ClientLogic, connection: Connected) {          .await;      debug!("sent roster req");      let roster = recv.await; -    debug!("got roster"); +    debug!("got roster: {:?}", roster);      match roster {          Ok(r) => match r {              Ok(roster) => { @@ -42,6 +42,7 @@ pub async fn handle_connect(logic: ClientLogic, connection: Connected) {                      )                      .await;                  let set_status = recv.await; +                debug!("sent initial presence");                  match set_status {                      Ok(s) => match s {                          Ok(()) => { diff --git a/filamento/src/logic/local_only.rs b/filamento/src/logic/local_only.rs new file mode 100644 index 0000000..3f6fe8d --- /dev/null +++ b/filamento/src/logic/local_only.rs @@ -0,0 +1,47 @@ +use jid::JID; +use uuid::Uuid; + +use crate::{ +    chat::{Chat, Message}, +    error::DatabaseError, +    user::User, +}; + +use super::ClientLogic; + +pub async fn handle_get_chats(logic: &ClientLogic) -> Result<Vec<Chat>, DatabaseError> { +    Ok(logic.db().read_chats().await?) +} + +pub async fn handle_get_chats_ordered(logic: &ClientLogic) -> Result<Vec<Chat>, DatabaseError> { +    Ok(logic.db().read_chats_ordered().await?) +} + +pub async fn handle_get_chats_ordered_with_latest_messages( +    logic: &ClientLogic, +) -> Result<Vec<(Chat, Message)>, DatabaseError> { +    Ok(logic.db().read_chats_ordered_with_latest_messages().await?) +} + +pub async fn handle_get_chat(logic: &ClientLogic, jid: JID) -> Result<Chat, DatabaseError> { +    Ok(logic.db().read_chat(jid).await?) +} + +pub async fn handle_get_messages( +    logic: &ClientLogic, +    jid: JID, +) -> Result<Vec<Message>, DatabaseError> { +    Ok(logic.db().read_message_history(jid).await?) +} + +pub async fn handle_delete_chat(logic: &ClientLogic, jid: JID) -> Result<(), DatabaseError> { +    Ok(logic.db().delete_chat(jid).await?) +} + +pub async fn handle_delete_messaage(logic: &ClientLogic, uuid: Uuid) -> Result<(), DatabaseError> { +    Ok(logic.db().delete_message(uuid).await?) +} + +pub async fn handle_get_user(logic: &ClientLogic, jid: JID) -> Result<User, DatabaseError> { +    Ok(logic.db().read_user(jid).await?) +} diff --git a/filamento/src/logic/mod.rs b/filamento/src/logic/mod.rs index 61d78bf..15c2d12 100644 --- a/filamento/src/logic/mod.rs +++ b/filamento/src/logic/mod.rs @@ -1,16 +1,21 @@  use std::{collections::HashMap, sync::Arc}; -use lampada::{Logic, error::ReadError}; +use lampada::{Connected, Logic, error::ReadError};  use stanza::client::Stanza;  use tokio::sync::{Mutex, mpsc, oneshot};  use tracing::{error, info, warn}; -use crate::{Client, Command, UpdateMessage, db::Db, error::Error}; +use crate::{ +    Client, Command, UpdateMessage, +    db::Db, +    error::{Error, RequestError, ResponseError}, +};  mod abort;  mod connect;  mod connection_error;  mod disconnect; +mod local_only;  mod offline;  mod online;  mod process_stanza; @@ -19,20 +24,60 @@ mod process_stanza;  pub struct ClientLogic {      client: Client,      db: Db, -    pending: Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, ReadError>>>>>, +    pending: Pending,      update_sender: mpsc::Sender<UpdateMessage>,  } +#[derive(Clone)] +pub struct Pending(Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, ReadError>>>>>); + +impl Pending { +    pub fn new() -> Self { +        Self(Arc::new(Mutex::new(HashMap::new()))) +    } + +    pub async fn request( +        &self, +        connection: &Connected, +        request: Stanza, +        id: String, +    ) -> Result<Stanza, RequestError> { +        let (send, recv) = oneshot::channel(); +        { +            self.0.lock().await.insert(id, send); +        } +        connection.write_handle().write(request).await?; +        let stanza = recv.await.map_err(|e| ReadError::Actor(e.into()))??; +        Ok(stanza) +    } + +    pub async fn respond(&self, response: Stanza, id: String) -> Result<(), ResponseError> { +        let send; +        { +            send = self.0.lock().await.remove(&id); +        } +        match send { +            Some(send) => { +                let _ = send.send(Ok(response)); +                Ok(()) +            } +            None => Err(ResponseError::NoMatchingId(id)), +        } +    } + +    pub async fn drain(&self) { +        let mut pending = self.0.lock().await; +        for (_id, sender) in pending.drain() { +            let _ = sender.send(Err(ReadError::LostConnection)); +        } +    } +} +  impl ClientLogic { -    pub fn new( -        client: Client, -        db: Db, -        pending: Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, ReadError>>>>>, -        update_sender: mpsc::Sender<UpdateMessage>, -    ) -> Self { +    pub fn new(client: Client, db: Db, update_sender: mpsc::Sender<UpdateMessage>) -> Self {          Self {              db, -            pending, +            pending: Pending::new(),              update_sender,              client,          } @@ -46,8 +91,8 @@ impl ClientLogic {          &self.db      } -    pub fn pending(&self) -> &Mutex<HashMap<String, oneshot::Sender<Result<Stanza, ReadError>>>> { -        &self.pending.as_ref() +    pub fn pending(&self) -> &Pending { +        &self.pending      }      pub fn update_sender(&self) -> &mpsc::Sender<UpdateMessage> { diff --git a/filamento/src/logic/online.rs b/filamento/src/logic/online.rs index 967ebb2..8bbeaa5 100644 --- a/filamento/src/logic/online.rs +++ b/filamento/src/logic/online.rs @@ -1,8 +1,12 @@  use chrono::Utc; +use jid::JID;  use lampada::{Connected, WriteMessage, error::WriteError}; -use stanza::client::{ -    Stanza, -    iq::{self, Iq, IqType}, +use stanza::{ +    client::{ +        Stanza, +        iq::{self, Iq, IqType}, +    }, +    xep_0203::Delay,  };  use tokio::sync::oneshot;  use tracing::{debug, info}; @@ -10,641 +14,625 @@ use uuid::Uuid;  use crate::{      Command, UpdateMessage, -    chat::Message, -    error::{Error, MessageSendError, RosterError, StatusError}, -    roster::Contact, +    chat::{Body, Message}, +    error::{DatabaseError, Error, MessageSendError, RosterError, StatusError}, +    presence::{Online, Presence, PresenceType}, +    roster::{Contact, ContactUpdate},  }; -use super::ClientLogic; +use super::{ +    ClientLogic, +    local_only::{ +        handle_delete_chat, handle_delete_messaage, handle_get_chat, handle_get_chats, +        handle_get_chats_ordered, handle_get_chats_ordered_with_latest_messages, +        handle_get_messages, handle_get_user, +    }, +};  pub async fn handle_online(logic: ClientLogic, command: Command, connection: Connected) { -    match command { -        Command::GetRoster(result_sender) => { -            let iq_id = Uuid::new_v4().to_string(); -            let (send, iq_recv) = oneshot::channel(); -            { -                logic.pending().lock().await.insert(iq_id.clone(), send); -            } -            let stanza = Stanza::Iq(Iq { -                from: Some(connection.jid().clone()), -                id: iq_id.to_string(), -                to: None, -                r#type: IqType::Get, -                lang: None, -                query: Some(iq::Query::Roster(stanza::roster::Query { -                    ver: None, -                    items: Vec::new(), -                })), -                errors: Vec::new(), -            }); -            let (send, recv) = oneshot::channel(); -            let _ = connection -                .write_handle() -                .send(WriteMessage { -                    stanza, -                    respond_to: send, -                }) -                .await; -            // TODO: timeout -            match recv.await { -                Ok(Ok(())) => info!("roster request sent"), -                Ok(Err(e)) => { -                    // TODO: log errors if fail to send -                    let _ = result_sender.send(Err(RosterError::Write(e.into()))); -                    return; -                } -                Err(e) => { -                    let _ = -                        result_sender.send(Err(RosterError::Write(WriteError::Actor(e.into())))); -                    return; -                } +    let result = handle_online_result(&logic, command, connection).await; +    match result { +        Ok(_) => {} +        Err(e) => logic.handle_error(e).await, +    } +} + +pub async fn handle_get_roster( +    logic: &ClientLogic, +    connection: Connected, +) -> Result<Vec<Contact>, RosterError> { +    let iq_id = Uuid::new_v4().to_string(); +    let stanza = Stanza::Iq(Iq { +        from: Some(connection.jid().clone()), +        id: iq_id.to_string(), +        to: None, +        r#type: IqType::Get, +        lang: None, +        query: Some(iq::Query::Roster(stanza::roster::Query { +            ver: None, +            items: Vec::new(), +        })), +        errors: Vec::new(), +    }); +    let response = logic +        .pending() +        .request(&connection, stanza, iq_id.clone()) +        .await?; +    // TODO: timeout +    match response { +        Stanza::Iq(Iq { +            from: _, +            id, +            to: _, +            r#type, +            lang: _, +            query: Some(iq::Query::Roster(stanza::roster::Query { ver: _, items })), +            errors: _, +        }) if id == iq_id && r#type == IqType::Result => { +            let contacts: Vec<Contact> = items.into_iter().map(|item| item.into()).collect(); +            if let Err(e) = logic.db().replace_cached_roster(contacts.clone()).await { +                logic +                    .handle_error(Error::Roster(RosterError::Cache(e.into()))) +                    .await;              }; -            // TODO: timeout -            match iq_recv.await { -                Ok(Ok(stanza)) => match stanza { -                    Stanza::Iq(Iq { -                        from: _, -                        id, -                        to: _, -                        r#type, -                        lang: _, -                        query: Some(iq::Query::Roster(stanza::roster::Query { ver: _, items })), -                        errors: _, -                    }) if id == iq_id && r#type == IqType::Result => { -                        let contacts: Vec<Contact> = -                            items.into_iter().map(|item| item.into()).collect(); -                        if let Err(e) = logic.db().replace_cached_roster(contacts.clone()).await { -                            logic -                                .handle_error(Error::Roster(RosterError::Cache(e.into()))) -                                .await; -                        }; -                        result_sender.send(Ok(contacts)); -                        return; -                    } -                    ref s @ Stanza::Iq(Iq { -                        from: _, -                        ref id, -                        to: _, -                        r#type, -                        lang: _, -                        query: _, -                        ref errors, -                    }) if *id == iq_id && r#type == IqType::Error => { -                        if let Some(error) = errors.first() { -                            result_sender.send(Err(RosterError::StanzaError(error.clone()))); -                        } else { -                            result_sender.send(Err(RosterError::UnexpectedStanza(s.clone()))); -                        } -                        return; -                    } -                    s => { -                        result_sender.send(Err(RosterError::UnexpectedStanza(s))); -                        return; -                    } -                }, -                Ok(Err(e)) => { -                    result_sender.send(Err(RosterError::Read(e))); -                    return; -                } -                Err(e) => { -                    result_sender.send(Err(RosterError::Write(WriteError::Actor(e.into())))); -                    return; -                } +            Ok(contacts) +        } +        ref s @ Stanza::Iq(Iq { +            from: _, +            ref id, +            to: _, +            r#type, +            lang: _, +            query: _, +            ref errors, +        }) if *id == iq_id && r#type == IqType::Error => { +            if let Some(error) = errors.first() { +                Err(RosterError::StanzaError(error.clone())) +            } else { +                Err(RosterError::UnexpectedStanza(s.clone())) +            } +        } +        s => Err(RosterError::UnexpectedStanza(s)), +    } +} + +pub async fn handle_add_contact( +    logic: &ClientLogic, +    connection: Connected, +    jid: JID, +) -> Result<(), RosterError> { +    let iq_id = Uuid::new_v4().to_string(); +    let set_stanza = Stanza::Iq(Iq { +        from: Some(connection.jid().clone()), +        id: iq_id.clone(), +        to: None, +        r#type: IqType::Set, +        lang: None, +        query: Some(iq::Query::Roster(stanza::roster::Query { +            ver: None, +            items: vec![stanza::roster::Item { +                approved: None, +                ask: false, +                jid, +                name: None, +                subscription: None, +                groups: Vec::new(), +            }], +        })), +        errors: Vec::new(), +    }); +    let response = logic +        .pending() +        .request(&connection, set_stanza, iq_id.clone()) +        .await?; +    match response { +        Stanza::Iq(Iq { +            from: _, +            id, +            to: _, +            r#type, +            lang: _, +            query: _, +            errors: _, +        }) if id == iq_id && r#type == IqType::Result => Ok(()), +        ref s @ Stanza::Iq(Iq { +            from: _, +            ref id, +            to: _, +            r#type, +            lang: _, +            query: _, +            ref errors, +        }) if *id == iq_id && r#type == IqType::Error => { +            if let Some(error) = errors.first() { +                Err(RosterError::StanzaError(error.clone())) +            } else { +                Err(RosterError::UnexpectedStanza(s.clone())) +            } +        } +        s => Err(RosterError::UnexpectedStanza(s)), +    } +} + +pub async fn handle_buddy_request(connection: Connected, jid: JID) -> Result<(), WriteError> { +    let presence = Stanza::Presence(stanza::client::presence::Presence { +        from: None, +        id: None, +        to: Some(jid.clone()), +        r#type: Some(stanza::client::presence::PresenceType::Subscribe), +        lang: None, +        show: None, +        status: None, +        priority: None, +        errors: Vec::new(), +        delay: None, +    }); +    connection.write_handle().write(presence).await?; +    let presence = Stanza::Presence(stanza::client::presence::Presence { +        from: None, +        id: None, +        to: Some(jid), +        r#type: Some(stanza::client::presence::PresenceType::Subscribed), +        lang: None, +        show: None, +        status: None, +        priority: None, +        errors: Vec::new(), +        delay: None, +    }); +    connection.write_handle().write(presence).await?; +    Ok(()) +} + +pub async fn handle_subscription_request( +    connection: Connected, +    jid: JID, +) -> Result<(), WriteError> { +    // TODO: i should probably have builders +    let presence = Stanza::Presence(stanza::client::presence::Presence { +        from: None, +        id: None, +        to: Some(jid), +        r#type: Some(stanza::client::presence::PresenceType::Subscribe), +        lang: None, +        show: None, +        status: None, +        priority: None, +        errors: Vec::new(), +        delay: None, +    }); +    connection.write_handle().write(presence).await?; +    Ok(()) +} + +pub async fn handle_accept_buddy_request( +    connection: Connected, +    jid: JID, +) -> Result<(), WriteError> { +    let presence = Stanza::Presence(stanza::client::presence::Presence { +        from: None, +        id: None, +        to: Some(jid.clone()), +        r#type: Some(stanza::client::presence::PresenceType::Subscribed), +        lang: None, +        show: None, +        status: None, +        priority: None, +        errors: Vec::new(), +        delay: None, +    }); +    connection.write_handle().write(presence).await?; +    let presence = Stanza::Presence(stanza::client::presence::Presence { +        from: None, +        id: None, +        to: Some(jid), +        r#type: Some(stanza::client::presence::PresenceType::Subscribe), +        lang: None, +        show: None, +        status: None, +        priority: None, +        errors: Vec::new(), +        delay: None, +    }); +    connection.write_handle().write(presence).await?; +    Ok(()) +} + +pub async fn handle_accept_subscription_request( +    connection: Connected, +    jid: JID, +) -> Result<(), WriteError> { +    let presence = Stanza::Presence(stanza::client::presence::Presence { +        from: None, +        id: None, +        to: Some(jid), +        r#type: Some(stanza::client::presence::PresenceType::Subscribe), +        lang: None, +        show: None, +        status: None, +        priority: None, +        errors: Vec::new(), +        delay: None, +    }); +    connection.write_handle().write(presence).await?; +    Ok(()) +} + +pub async fn handle_unsubscribe_from_contact( +    connection: Connected, +    jid: JID, +) -> Result<(), WriteError> { +    let presence = Stanza::Presence(stanza::client::presence::Presence { +        from: None, +        id: None, +        to: Some(jid), +        r#type: Some(stanza::client::presence::PresenceType::Unsubscribe), +        lang: None, +        show: None, +        status: None, +        priority: None, +        errors: Vec::new(), +        delay: None, +    }); +    connection.write_handle().write(presence).await?; +    Ok(()) +} + +pub async fn handle_unsubscribe_contact(connection: Connected, jid: JID) -> Result<(), WriteError> { +    let presence = Stanza::Presence(stanza::client::presence::Presence { +        from: None, +        id: None, +        to: Some(jid), +        r#type: Some(stanza::client::presence::PresenceType::Unsubscribed), +        lang: None, +        show: None, +        status: None, +        priority: None, +        errors: Vec::new(), +        delay: None, +    }); +    connection.write_handle().write(presence).await?; +    Ok(()) +} + +pub async fn handle_unfriend_contact(connection: Connected, jid: JID) -> Result<(), WriteError> { +    let presence = Stanza::Presence(stanza::client::presence::Presence { +        from: None, +        id: None, +        to: Some(jid.clone()), +        r#type: Some(stanza::client::presence::PresenceType::Unsubscribe), +        lang: None, +        show: None, +        status: None, +        priority: None, +        errors: Vec::new(), +        delay: None, +    }); +    connection.write_handle().write(presence).await?; +    let presence = Stanza::Presence(stanza::client::presence::Presence { +        from: None, +        id: None, +        to: Some(jid), +        r#type: Some(stanza::client::presence::PresenceType::Unsubscribed), +        lang: None, +        show: None, +        status: None, +        priority: None, +        errors: Vec::new(), +        delay: None, +    }); +    connection.write_handle().write(presence).await?; +    Ok(()) +} + +pub async fn handle_delete_contact( +    logic: &ClientLogic, +    connection: Connected, +    jid: JID, +) -> Result<(), RosterError> { +    let iq_id = Uuid::new_v4().to_string(); +    let set_stanza = Stanza::Iq(Iq { +        from: Some(connection.jid().clone()), +        id: iq_id.clone(), +        to: None, +        r#type: IqType::Set, +        lang: None, +        query: Some(iq::Query::Roster(stanza::roster::Query { +            ver: None, +            items: vec![stanza::roster::Item { +                approved: None, +                ask: false, +                jid, +                name: None, +                subscription: Some(stanza::roster::Subscription::Remove), +                groups: Vec::new(), +            }], +        })), +        errors: Vec::new(), +    }); +    let result = logic +        .pending() +        .request(&connection, set_stanza, iq_id.clone()) +        .await?; +    match result { +        Stanza::Iq(Iq { +            from: _, +            id, +            to: _, +            r#type, +            lang: _, +            query: _, +            errors: _, +            // don't really need to check matching id as request() does this anyway +        }) if id == iq_id && r#type == IqType::Result => Ok(()), +        ref s @ Stanza::Iq(Iq { +            from: _, +            ref id, +            to: _, +            r#type, +            lang: _, +            query: _, +            ref errors, +        }) if *id == iq_id && r#type == IqType::Error => { +            if let Some(error) = errors.first() { +                Err(RosterError::StanzaError(error.clone())) +            } else { +                Err(RosterError::UnexpectedStanza(s.clone())) +            } +        } +        s => Err(RosterError::UnexpectedStanza(s)), +    } +} + +pub async fn handle_update_contact( +    logic: &ClientLogic, +    connection: Connected, +    jid: JID, +    contact_update: ContactUpdate, +) -> Result<(), RosterError> { +    let iq_id = Uuid::new_v4().to_string(); +    let groups = Vec::from_iter( +        contact_update +            .groups +            .into_iter() +            .map(|group| stanza::roster::Group(Some(group))), +    ); +    let set_stanza = Stanza::Iq(Iq { +        from: Some(connection.jid().clone()), +        id: iq_id.clone(), +        to: None, +        r#type: IqType::Set, +        lang: None, +        query: Some(iq::Query::Roster(stanza::roster::Query { +            ver: None, +            items: vec![stanza::roster::Item { +                approved: None, +                ask: false, +                jid, +                name: contact_update.name, +                subscription: None, +                groups, +            }], +        })), +        errors: Vec::new(), +    }); +    let response = logic +        .pending() +        .request(&connection, set_stanza, iq_id.clone()) +        .await?; +    match response { +        Stanza::Iq(Iq { +            from: _, +            id, +            to: _, +            r#type, +            lang: _, +            query: _, +            errors: _, +        }) if id == iq_id && r#type == IqType::Result => Ok(()), +        ref s @ Stanza::Iq(Iq { +            from: _, +            ref id, +            to: _, +            r#type, +            lang: _, +            query: _, +            ref errors, +        }) if *id == iq_id && r#type == IqType::Error => { +            if let Some(error) = errors.first() { +                Err(RosterError::StanzaError(error.clone())) +            } else { +                Err(RosterError::UnexpectedStanza(s.clone()))              }          } +        s => Err(RosterError::UnexpectedStanza(s)), +    } +} + +pub async fn handle_set_status( +    logic: &ClientLogic, +    connection: Connected, +    online: Online, +) -> Result<(), StatusError> { +    logic +        .db() +        .upsert_cached_status(online.clone()) +        .await +        .map_err(|e| DatabaseError(e.into()))?; +    connection +        .write_handle() +        .write(Stanza::Presence(online.into_stanza(None))) +        .await?; +    Ok(()) +} + +pub async fn handle_send_message( +    logic: &ClientLogic, +    connection: Connected, +    jid: JID, +    body: Body, +) -> Result<(), WriteError> { +    let id = Uuid::new_v4(); +    let timestamp = Utc::now(); +    let message = Stanza::Message(stanza::client::message::Message { +        from: Some(connection.jid().clone()), +        id: Some(id.to_string()), +        to: Some(jid.clone()), +        // TODO: specify message type +        r#type: stanza::client::message::MessageType::Chat, +        // TODO: lang ? +        lang: None, +        subject: None, +        body: Some(stanza::client::message::Body { +            lang: None, +            body: Some(body.body.clone()), +        }), +        thread: None, +        // include delay to have a consistent timestamp between server and client +        delay: Some(Delay { +            from: None, +            stamp: timestamp, +        }), +    }); +    connection.write_handle().write(message).await?; +    let mut message = Message { +        id, +        from: connection.jid().clone(), +        body, +        timestamp, +    }; +    info!("sent message: {:?}", message); +    if let Err(e) = logic +        .db() +        .create_message_with_self_resource_and_chat(message.clone(), jid.clone()) +        .await +    { +        // TODO: should these really be handle_error or just the error macro? +        logic +            .handle_error(MessageSendError::MessageHistory(e.into()).into()) +            .await; +    } +    // TODO: don't do this, have separate from from details +    message.from = message.from.as_bare(); +    let _ = logic +        .update_sender() +        .send(UpdateMessage::Message { to: jid, message }) +        .await; +    Ok(()) +    // TODO: refactor this to send a sending updatemessage, then update or something like that +} + +pub async fn handle_send_presence( +    connection: Connected, +    jid: Option<JID>, +    presence: PresenceType, +) -> Result<(), WriteError> { +    let mut presence: stanza::client::presence::Presence = presence.into(); +    presence.to = jid; +    connection +        .write_handle() +        .write(Stanza::Presence(presence)) +        .await?; +    Ok(()) +} + +// TODO: could probably macro-ise? +pub async fn handle_online_result( +    logic: &ClientLogic, +    command: Command, +    connection: Connected, +) -> Result<(), Error> { +    match command { +        Command::GetRoster(result_sender) => { +            let roster = handle_get_roster(logic, connection).await; +            let _ = result_sender.send(roster); +        }          Command::GetChats(sender) => { -            let chats = logic.db().read_chats().await.map_err(|e| e.into()); -            sender.send(chats); +            let chats = handle_get_chats(logic).await; +            let _ = sender.send(chats);          }          Command::GetChatsOrdered(sender) => { -            let chats = logic.db().read_chats_ordered().await.map_err(|e| e.into()); -            sender.send(chats); +            let chats = handle_get_chats_ordered(logic).await; +            let _ = sender.send(chats);          }          Command::GetChatsOrderedWithLatestMessages(sender) => { -            let chats = logic -                .db() -                .read_chats_ordered_with_latest_messages() -                .await -                .map_err(|e| e.into()); -            sender.send(chats); +            let chats = handle_get_chats_ordered_with_latest_messages(logic).await; +            let _ = sender.send(chats);          }          Command::GetChat(jid, sender) => { -            let chats = logic.db().read_chat(jid).await.map_err(|e| e.into()); -            sender.send(chats); +            let chat = handle_get_chat(logic, jid).await; +            let _ = sender.send(chat);          }          Command::GetMessages(jid, sender) => { -            let messages = logic -                .db() -                .read_message_history(jid) -                .await -                .map_err(|e| e.into()); -            sender.send(messages); +            let messages = handle_get_messages(logic, jid).await; +            let _ = sender.send(messages);          }          Command::DeleteChat(jid, sender) => { -            let result = logic.db().delete_chat(jid).await.map_err(|e| e.into()); -            sender.send(result); +            let result = handle_delete_chat(logic, jid).await; +            let _ = sender.send(result);          }          Command::DeleteMessage(uuid, sender) => { -            let result = logic.db().delete_message(uuid).await.map_err(|e| e.into()); -            sender.send(result); +            let result = handle_delete_messaage(logic, uuid).await; +            let _ = sender.send(result);          }          Command::GetUser(jid, sender) => { -            let user = logic.db().read_user(jid).await.map_err(|e| e.into()); -            sender.send(user); +            let user = handle_get_user(logic, jid).await; +            let _ = sender.send(user);          }          // TODO: offline queue to modify roster          Command::AddContact(jid, sender) => { -            let iq_id = Uuid::new_v4().to_string(); -            let set_stanza = Stanza::Iq(Iq { -                from: Some(connection.jid().clone()), -                id: iq_id.clone(), -                to: None, -                r#type: IqType::Set, -                lang: None, -                query: Some(iq::Query::Roster(stanza::roster::Query { -                    ver: None, -                    items: vec![stanza::roster::Item { -                        approved: None, -                        ask: false, -                        jid, -                        name: None, -                        subscription: None, -                        groups: Vec::new(), -                    }], -                })), -                errors: Vec::new(), -            }); -            let (send, recv) = oneshot::channel(); -            { -                logic.pending().lock().await.insert(iq_id.clone(), send); -            } -            // TODO: write_handle send helper function -            let result = connection.write_handle().write(set_stanza).await; -            if let Err(e) = result { -                sender.send(Err(RosterError::Write(e))); -                return; -            } -            let iq_result = recv.await; -            match iq_result { -                Ok(i) => match i { -                    Ok(iq_result) => match iq_result { -                        Stanza::Iq(Iq { -                            from: _, -                            id, -                            to: _, -                            r#type, -                            lang: _, -                            query: _, -                            errors: _, -                        }) if id == iq_id && r#type == IqType::Result => { -                            sender.send(Ok(())); -                            return; -                        } -                        ref s @ Stanza::Iq(Iq { -                            from: _, -                            ref id, -                            to: _, -                            r#type, -                            lang: _, -                            query: _, -                            ref errors, -                        }) if *id == iq_id && r#type == IqType::Error => { -                            if let Some(error) = errors.first() { -                                sender.send(Err(RosterError::StanzaError(error.clone()))); -                            } else { -                                sender.send(Err(RosterError::UnexpectedStanza(s.clone()))); -                            } -                            return; -                        } -                        s => { -                            sender.send(Err(RosterError::UnexpectedStanza(s))); -                            return; -                        } -                    }, -                    Err(e) => { -                        sender.send(Err(e.into())); -                        return; -                    } -                }, -                Err(e) => { -                    sender.send(Err(RosterError::Write(WriteError::Actor(e.into())))); -                    return; -                } -            } +            let result = handle_add_contact(logic, connection, jid).await; +            let _ = sender.send(result);          }          Command::BuddyRequest(jid, sender) => { -            let presence = Stanza::Presence(stanza::client::presence::Presence { -                from: None, -                id: None, -                to: Some(jid.clone()), -                r#type: Some(stanza::client::presence::PresenceType::Subscribe), -                lang: None, -                show: None, -                status: None, -                priority: None, -                errors: Vec::new(), -                delay: None, -            }); -            let result = connection.write_handle().write(presence).await; -            match result { -                Err(_) => { -                    let _ = sender.send(result); -                } -                Ok(()) => { -                    let presence = Stanza::Presence(stanza::client::presence::Presence { -                        from: None, -                        id: None, -                        to: Some(jid), -                        r#type: Some(stanza::client::presence::PresenceType::Subscribed), -                        lang: None, -                        show: None, -                        status: None, -                        priority: None, -                        errors: Vec::new(), -                        delay: None, -                    }); -                    let result = connection.write_handle().write(presence).await; -                    let _ = sender.send(result); -                } -            } +            let result = handle_buddy_request(connection, jid).await; +            let _ = sender.send(result);          }          Command::SubscriptionRequest(jid, sender) => { -            // TODO: i should probably have builders -            let presence = Stanza::Presence(stanza::client::presence::Presence { -                from: None, -                id: None, -                to: Some(jid), -                r#type: Some(stanza::client::presence::PresenceType::Subscribe), -                lang: None, -                show: None, -                status: None, -                priority: None, -                errors: Vec::new(), -                delay: None, -            }); -            let result = connection.write_handle().write(presence).await; +            let result = handle_subscription_request(connection, jid).await;              let _ = sender.send(result);          }          Command::AcceptBuddyRequest(jid, sender) => { -            let presence = Stanza::Presence(stanza::client::presence::Presence { -                from: None, -                id: None, -                to: Some(jid.clone()), -                r#type: Some(stanza::client::presence::PresenceType::Subscribed), -                lang: None, -                show: None, -                status: None, -                priority: None, -                errors: Vec::new(), -                delay: None, -            }); -            let result = connection.write_handle().write(presence).await; -            match result { -                Err(_) => { -                    let _ = sender.send(result); -                } -                Ok(()) => { -                    let presence = Stanza::Presence(stanza::client::presence::Presence { -                        from: None, -                        id: None, -                        to: Some(jid), -                        r#type: Some(stanza::client::presence::PresenceType::Subscribe), -                        lang: None, -                        show: None, -                        status: None, -                        priority: None, -                        errors: Vec::new(), -                        delay: None, -                    }); -                    let result = connection.write_handle().write(presence).await; -                    let _ = sender.send(result); -                } -            } +            let result = handle_accept_buddy_request(connection, jid).await; +            let _ = sender.send(result);          }          Command::AcceptSubscriptionRequest(jid, sender) => { -            let presence = Stanza::Presence(stanza::client::presence::Presence { -                from: None, -                id: None, -                to: Some(jid), -                r#type: Some(stanza::client::presence::PresenceType::Subscribe), -                lang: None, -                show: None, -                status: None, -                priority: None, -                errors: Vec::new(), -                delay: None, -            }); -            let result = connection.write_handle().write(presence).await; +            let result = handle_accept_subscription_request(connection, jid).await;              let _ = sender.send(result);          }          Command::UnsubscribeFromContact(jid, sender) => { -            let presence = Stanza::Presence(stanza::client::presence::Presence { -                from: None, -                id: None, -                to: Some(jid), -                r#type: Some(stanza::client::presence::PresenceType::Unsubscribe), -                lang: None, -                show: None, -                status: None, -                priority: None, -                errors: Vec::new(), -                delay: None, -            }); -            let result = connection.write_handle().write(presence).await; +            let result = handle_unsubscribe_from_contact(connection, jid).await;              let _ = sender.send(result);          }          Command::UnsubscribeContact(jid, sender) => { -            let presence = Stanza::Presence(stanza::client::presence::Presence { -                from: None, -                id: None, -                to: Some(jid), -                r#type: Some(stanza::client::presence::PresenceType::Unsubscribed), -                lang: None, -                show: None, -                status: None, -                priority: None, -                errors: Vec::new(), -                delay: None, -            }); -            let result = connection.write_handle().write(presence).await; +            let result = handle_unsubscribe_contact(connection, jid).await;              let _ = sender.send(result);          }          Command::UnfriendContact(jid, sender) => { -            let presence = Stanza::Presence(stanza::client::presence::Presence { -                from: None, -                id: None, -                to: Some(jid.clone()), -                r#type: Some(stanza::client::presence::PresenceType::Unsubscribe), -                lang: None, -                show: None, -                status: None, -                priority: None, -                errors: Vec::new(), -                delay: None, -            }); -            let result = connection.write_handle().write(presence).await; -            match result { -                Err(_) => { -                    let _ = sender.send(result); -                } -                Ok(()) => { -                    let presence = Stanza::Presence(stanza::client::presence::Presence { -                        from: None, -                        id: None, -                        to: Some(jid), -                        r#type: Some(stanza::client::presence::PresenceType::Unsubscribed), -                        lang: None, -                        show: None, -                        status: None, -                        priority: None, -                        errors: Vec::new(), -                        delay: None, -                    }); -                    let result = connection.write_handle().write(presence).await; -                    let _ = sender.send(result); -                } -            } +            let result = handle_unfriend_contact(connection, jid).await; +            let _ = sender.send(result);          }          Command::DeleteContact(jid, sender) => { -            let iq_id = Uuid::new_v4().to_string(); -            let set_stanza = Stanza::Iq(Iq { -                from: Some(connection.jid().clone()), -                id: iq_id.clone(), -                to: None, -                r#type: IqType::Set, -                lang: None, -                query: Some(iq::Query::Roster(stanza::roster::Query { -                    ver: None, -                    items: vec![stanza::roster::Item { -                        approved: None, -                        ask: false, -                        jid, -                        name: None, -                        subscription: Some(stanza::roster::Subscription::Remove), -                        groups: Vec::new(), -                    }], -                })), -                errors: Vec::new(), -            }); -            let (send, recv) = oneshot::channel(); -            { -                logic.pending().lock().await.insert(iq_id.clone(), send); -            } -            let result = connection.write_handle().write(set_stanza).await; -            if let Err(e) = result { -                sender.send(Err(RosterError::Write(e))); -                return; -            } -            let iq_result = recv.await; -            match iq_result { -                Ok(i) => match i { -                    Ok(iq_result) => match iq_result { -                        Stanza::Iq(Iq { -                            from: _, -                            id, -                            to: _, -                            r#type, -                            lang: _, -                            query: _, -                            errors: _, -                        }) if id == iq_id && r#type == IqType::Result => { -                            sender.send(Ok(())); -                            return; -                        } -                        ref s @ Stanza::Iq(Iq { -                            from: _, -                            ref id, -                            to: _, -                            r#type, -                            lang: _, -                            query: _, -                            ref errors, -                        }) if *id == iq_id && r#type == IqType::Error => { -                            if let Some(error) = errors.first() { -                                sender.send(Err(RosterError::StanzaError(error.clone()))); -                            } else { -                                sender.send(Err(RosterError::UnexpectedStanza(s.clone()))); -                            } -                            return; -                        } -                        s => { -                            sender.send(Err(RosterError::UnexpectedStanza(s))); -                            return; -                        } -                    }, -                    Err(e) => { -                        sender.send(Err(e.into())); -                        return; -                    } -                }, -                Err(e) => { -                    sender.send(Err(RosterError::Write(WriteError::Actor(e.into())))); -                    return; -                } -            } +            let result = handle_delete_contact(logic, connection, jid).await; +            let _ = sender.send(result);          }          Command::UpdateContact(jid, contact_update, sender) => { -            let iq_id = Uuid::new_v4().to_string(); -            let groups = Vec::from_iter( -                contact_update -                    .groups -                    .into_iter() -                    .map(|group| stanza::roster::Group(Some(group))), -            ); -            let set_stanza = Stanza::Iq(Iq { -                from: Some(connection.jid().clone()), -                id: iq_id.clone(), -                to: None, -                r#type: IqType::Set, -                lang: None, -                query: Some(iq::Query::Roster(stanza::roster::Query { -                    ver: None, -                    items: vec![stanza::roster::Item { -                        approved: None, -                        ask: false, -                        jid, -                        name: contact_update.name, -                        subscription: None, -                        groups, -                    }], -                })), -                errors: Vec::new(), -            }); -            let (send, recv) = oneshot::channel(); -            { -                logic.pending().lock().await.insert(iq_id.clone(), send); -            } -            let result = connection.write_handle().write(set_stanza).await; -            if let Err(e) = result { -                sender.send(Err(RosterError::Write(e))); -                return; -            } -            let iq_result = recv.await; -            match iq_result { -                Ok(i) => match i { -                    Ok(iq_result) => match iq_result { -                        Stanza::Iq(Iq { -                            from: _, -                            id, -                            to: _, -                            r#type, -                            lang: _, -                            query: _, -                            errors: _, -                        }) if id == iq_id && r#type == IqType::Result => { -                            sender.send(Ok(())); -                            return; -                        } -                        ref s @ Stanza::Iq(Iq { -                            from: _, -                            ref id, -                            to: _, -                            r#type, -                            lang: _, -                            query: _, -                            ref errors, -                        }) if *id == iq_id && r#type == IqType::Error => { -                            if let Some(error) = errors.first() { -                                sender.send(Err(RosterError::StanzaError(error.clone()))); -                            } else { -                                sender.send(Err(RosterError::UnexpectedStanza(s.clone()))); -                            } -                            return; -                        } -                        s => { -                            sender.send(Err(RosterError::UnexpectedStanza(s))); -                            return; -                        } -                    }, -                    Err(e) => { -                        sender.send(Err(e.into())); -                        return; -                    } -                }, -                Err(e) => { -                    sender.send(Err(RosterError::Write(WriteError::Actor(e.into())))); -                    return; -                } -            } +            let result = handle_update_contact(logic, connection, jid, contact_update).await; +            let _ = sender.send(result);          }          Command::SetStatus(online, sender) => { -            let result = logic.db().upsert_cached_status(online.clone()).await; -            if let Err(e) = result { -                logic -                    .handle_error(StatusError::Cache(e.into()).into()) -                    .await; -            } -            let result = connection -                .write_handle() -                .write(Stanza::Presence(online.into_stanza(None))) -                .await -                .map_err(|e| StatusError::Write(e)); -            // .map_err(|e| StatusError::Write(e)); +            let result = handle_set_status(logic, connection, online).await;              let _ = sender.send(result);          }          // TODO: offline message queue          Command::SendMessage(jid, body, sender) => { -            let id = Uuid::new_v4(); -            let message = Stanza::Message(stanza::client::message::Message { -                from: Some(connection.jid().clone()), -                id: Some(id.to_string()), -                to: Some(jid.clone()), -                // TODO: specify message type -                r#type: stanza::client::message::MessageType::Chat, -                // TODO: lang ? -                lang: None, -                subject: None, -                body: Some(stanza::client::message::Body { -                    lang: None, -                    body: Some(body.body.clone()), -                }), -                thread: None, -                delay: None, -            }); -            let _ = sender.send(Ok(())); -            // let _ = sender.send(Ok(message.clone())); -            let result = connection.write_handle().write(message).await; -            match result { -                Ok(_) => { -                    let mut message = Message { -                        id, -                        from: connection.jid().clone(), -                        body, -                        timestamp: Utc::now(), -                    }; -                    info!("send message {:?}", message); -                    if let Err(e) = logic -                        .db() -                        .create_message_with_self_resource_and_chat(message.clone(), jid.clone()) -                        .await -                    { -                        logic -                            .handle_error(MessageSendError::MessageHistory(e.into()).into()) -                            .await; -                    } -                    // TODO: don't do this, have separate from from details -                    message.from = message.from.as_bare(); -                    let _ = logic -                        .update_sender() -                        .send(UpdateMessage::Message { to: jid, message }) -                        .await; -                } -                Err(_) => { -                    // let _ = sender.send(result); -                } -            } +            let result = handle_send_message(logic, connection, jid, body).await; +            let _ = sender.send(result);          }          Command::SendPresence(jid, presence, sender) => { -            let mut presence: stanza::client::presence::Presence = presence.into(); -            if let Some(jid) = jid { -                presence.to = Some(jid); -            }; -            let result = connection -                .write_handle() -                .write(Stanza::Presence(presence)) -                .await; -            // .map_err(|e| StatusError::Write(e)); +            let result = handle_send_presence(connection, jid, presence).await;              let _ = sender.send(result);          }      } +    Ok(())  } diff --git a/filamento/src/logic/process_stanza.rs b/filamento/src/logic/process_stanza.rs index 94257aa..660da16 100644 --- a/filamento/src/logic/process_stanza.rs +++ b/filamento/src/logic/process_stanza.rs @@ -169,21 +169,14 @@ pub async fn recv_iq(      }      match iq.r#type {          stanza::client::iq::IqType::Error | stanza::client::iq::IqType::Result => { -            let send; -            { -                send = logic.pending().lock().await.remove(&iq.id); -            }              let from = iq                  .from                  .clone()                  .unwrap_or_else(|| connection.server().clone()); -            if let Some(send) = send { -                debug!("received iq result from {}", from); -                let _ = send.send(Ok(Stanza::Iq(iq))); -                Ok(None) -            } else { -                Err(IqError::NoMatchingId(iq.id)) -            } +            let id = iq.id.clone(); +            debug!("received iq result with id `{}` from {}", id, from); +            logic.pending().respond(Stanza::Iq(iq), id).await?; +            Ok(None)          }          stanza::client::iq::IqType::Get => {              let from = iq | 
