diff options
Diffstat (limited to 'filamento/src/lib.rs')
-rw-r--r-- | filamento/src/lib.rs | 1124 |
1 files changed, 3 insertions, 1121 deletions
diff --git a/filamento/src/lib.rs b/filamento/src/lib.rs index db59a67..030dc43 100644 --- a/filamento/src/lib.rs +++ b/filamento/src/lib.rs @@ -19,6 +19,7 @@ use lampada::{ Connected, CoreClient, CoreClientCommand, Logic, SupervisorSender, WriteMessage, error::{ActorError, CommandError, ConnectionError, ReadError, WriteError}, }; +use logic::ClientLogic; use presence::{Offline, Online, Presence, PresenceType, Show}; use roster::{Contact, ContactUpdate}; use stanza::client::{ @@ -36,6 +37,7 @@ use uuid::Uuid; pub mod chat; pub mod db; pub mod error; +mod logic; pub mod presence; pub mod roster; pub mod user; @@ -145,11 +147,7 @@ impl Client { let (_sup_send, sup_recv) = oneshot::channel(); let sup_recv = sup_recv.fuse(); - let logic = ClientLogic { - db, - pending: Arc::new(Mutex::new(HashMap::new())), - update_sender: update_send, - }; + let logic = ClientLogic::new(db, Arc::new(Mutex::new(HashMap::new())), update_send); let actor: CoreClient<ClientLogic> = CoreClient::new(jid, password, command_receiver, None, sup_recv, logic); @@ -450,1122 +448,6 @@ impl Client { } } -#[derive(Clone)] -pub struct ClientLogic { - db: Db, - pending: Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, ReadError>>>>>, - update_sender: mpsc::Sender<UpdateMessage>, -} - -impl Logic for ClientLogic { - type Cmd = Command; - - async fn handle_connect(self, connection: Connected) { - let (send, recv) = oneshot::channel(); - debug!("getting roster"); - self.clone() - .handle_online(Command::GetRoster(send), connection.clone()) - .await; - debug!("sent roster req"); - let roster = recv.await; - debug!("got roster"); - match roster { - Ok(r) => match r { - Ok(roster) => { - let online = self.db.read_cached_status().await; - let online = match online { - Ok(online) => online, - Err(e) => { - let _ = self - .update_sender - .send(UpdateMessage::Error(Error::Connecting( - ConnectionJobError::StatusCacheError(e.into()), - ))) - .await; - Online::default() - } - }; - let (send, recv) = oneshot::channel(); - self.clone() - .handle_online( - Command::SendPresence(None, PresenceType::Online(online.clone()), send), - connection, - ) - .await; - let set_status = recv.await; - match set_status { - Ok(s) => match s { - Ok(()) => { - let _ = self - .update_sender - .send(UpdateMessage::Online(online, roster)) - .await; - } - Err(e) => { - let _ = self - .update_sender - .send(UpdateMessage::Error(Error::Connecting(e.into()))) - .await; - } - }, - Err(e) => { - let _ = self - .update_sender - .send(UpdateMessage::Error(Error::Connecting( - ConnectionJobError::SendPresence(WriteError::Actor(e.into())), - ))) - .await; - } - } - } - Err(e) => { - let _ = self - .update_sender - .send(UpdateMessage::Error(Error::Connecting(e.into()))) - .await; - } - }, - Err(e) => { - let _ = self - .update_sender - .send(UpdateMessage::Error(Error::Connecting( - ConnectionJobError::RosterRetreival(RosterError::Write(WriteError::Actor( - e.into(), - ))), - ))) - .await; - } - } - } - - async fn handle_disconnect(self, connection: Connected) { - // TODO: be able to set offline status message - let offline_presence: stanza::client::presence::Presence = - Offline::default().into_stanza(None); - let stanza = Stanza::Presence(offline_presence); - // TODO: timeout and error check - connection.write_handle().write(stanza).await; - let _ = self - .update_sender - .send(UpdateMessage::Offline(Offline::default())) - .await; - } - - async fn handle_stanza( - self, - stanza: Stanza, - connection: Connected, - supervisor: SupervisorSender, - ) { - match stanza { - Stanza::Message(stanza_message) => { - if let Some(mut from) = stanza_message.from { - // TODO: don't ignore delay from. xep says SHOULD send error if incorrect. - let timestamp = stanza_message - .delay - .map(|delay| delay.stamp) - .unwrap_or_else(|| Utc::now()); - // TODO: group chat messages - let mut message = Message { - id: stanza_message - .id - // TODO: proper id storage - .map(|id| Uuid::from_str(&id).unwrap_or_else(|_| Uuid::new_v4())) - .unwrap_or_else(|| Uuid::new_v4()), - from: from.clone(), - timestamp, - body: Body { - // TODO: should this be an option? - body: stanza_message - .body - .map(|body| body.body) - .unwrap_or_default() - .unwrap_or_default(), - }, - }; - // TODO: can this be more efficient? - let result = self - .db - .create_message_with_user_resource_and_chat(message.clone(), from.clone()) - .await; - if let Err(e) = result { - tracing::error!("messagecreate"); - let _ = self - .update_sender - .send(UpdateMessage::Error(Error::MessageRecv( - MessageRecvError::MessageHistory(e.into()), - ))) - .await; - } - message.from = message.from.as_bare(); - from = from.as_bare(); - let _ = self - .update_sender - .send(UpdateMessage::Message { to: from, message }) - .await; - } else { - let _ = self - .update_sender - .send(UpdateMessage::Error(Error::MessageRecv( - MessageRecvError::MissingFrom, - ))) - .await; - } - } - Stanza::Presence(presence) => { - if let Some(from) = presence.from { - match presence.r#type { - Some(r#type) => match r#type { - // error processing a presence from somebody - stanza::client::presence::PresenceType::Error => { - // TODO: is there any other information that should go with the error? also MUST have an error, otherwise it's a different error. maybe it shoulnd't be an option. - let _ = self - .update_sender - .send(UpdateMessage::Error(Error::Presence( - // TODO: ughhhhhhhhhhhhh these stanza errors should probably just have an option, and custom display - PresenceError::StanzaError( - presence - .errors - .first() - .cloned() - .expect("error MUST have error"), - ), - ))) - .await; - } - // should not happen (error to server) - stanza::client::presence::PresenceType::Probe => { - // TODO: should probably write an error and restart stream - let _ = self - .update_sender - .send(UpdateMessage::Error(Error::Presence( - PresenceError::Unsupported, - ))) - .await; - } - stanza::client::presence::PresenceType::Subscribe => { - // may get a subscription request from somebody who is not a contact!!! therefore should be its own kind of event - let _ = self - .update_sender - .send(UpdateMessage::SubscriptionRequest(from)) - .await; - } - stanza::client::presence::PresenceType::Unavailable => { - let offline = Offline { - status: presence.status.map(|status| status.status.0), - }; - let timestamp = presence - .delay - .map(|delay| delay.stamp) - .unwrap_or_else(|| Utc::now()); - let _ = self - .update_sender - .send(UpdateMessage::Presence { - from, - presence: Presence { - timestamp, - presence: PresenceType::Offline(offline), - }, - }) - .await; - } - // for now, do nothing, as these are simply informational. will receive roster push from the server regarding the changes to do with them. - stanza::client::presence::PresenceType::Subscribed => {} - stanza::client::presence::PresenceType::Unsubscribe => {} - stanza::client::presence::PresenceType::Unsubscribed => {} - }, - None => { - let online = Online { - show: presence.show.map(|show| match show { - stanza::client::presence::Show::Away => Show::Away, - stanza::client::presence::Show::Chat => Show::Chat, - stanza::client::presence::Show::Dnd => Show::DoNotDisturb, - stanza::client::presence::Show::Xa => Show::ExtendedAway, - }), - status: presence.status.map(|status| status.status.0), - priority: presence.priority.map(|priority| priority.0), - }; - let timestamp = presence - .delay - .map(|delay| delay.stamp) - .unwrap_or_else(|| Utc::now()); - let _ = self - .update_sender - .send(UpdateMessage::Presence { - from, - presence: Presence { - timestamp, - presence: PresenceType::Online(online), - }, - }) - .await; - } - } - } else { - let _ = self - .update_sender - .send(UpdateMessage::Error(Error::Presence( - PresenceError::MissingFrom, - ))) - .await; - } - } - Stanza::Iq(iq) => match iq.r#type { - stanza::client::iq::IqType::Error | stanza::client::iq::IqType::Result => { - let send; - { - send = self.pending.lock().await.remove(&iq.id); - } - if let Some(send) = send { - send.send(Ok(Stanza::Iq(iq))); - } else { - let _ = self - .update_sender - .send(UpdateMessage::Error(Error::Iq(IqError::NoMatchingId( - iq.id, - )))) - .await; - } - } - // TODO: send unsupported to server - // TODO: proper errors i am so tired please - stanza::client::iq::IqType::Get => {} - stanza::client::iq::IqType::Set => { - if let Some(query) = iq.query { - match query { - stanza::client::iq::Query::Roster(mut query) => { - // TODO: there should only be one - if let Some(item) = query.items.pop() { - match item.subscription { - Some(stanza::roster::Subscription::Remove) => { - self.db.delete_contact(item.jid.clone()).await; - self.update_sender - .send(UpdateMessage::RosterDelete(item.jid)) - .await; - // TODO: send result - } - _ => { - let contact: Contact = item.into(); - if let Err(e) = - self.db.upsert_contact(contact.clone()).await - { - let _ = self - .update_sender - .send(UpdateMessage::Error(Error::Roster( - RosterError::Cache(e.into()), - ))) - .await; - } - let _ = self - .update_sender - .send(UpdateMessage::RosterUpdate(contact)) - .await; - // TODO: send result - // write_handle.write(Stanza::Iq(stanza::client::iq::Iq { - // from: , - // id: todo!(), - // to: todo!(), - // r#type: todo!(), - // lang: todo!(), - // query: todo!(), - // errors: todo!(), - // })); - } - } - } - } - // TODO: send unsupported to server - _ => {} - } - } else { - // TODO: send error (unsupported) to server - } - } - }, - Stanza::Error(error) => { - let _ = self - .update_sender - .send(UpdateMessage::Error(Error::Stream(error))) - .await; - // TODO: reconnect - } - Stanza::OtherContent(content) => { - let _ = self - .update_sender - .send(UpdateMessage::Error(Error::UnrecognizedContent)); - // TODO: send error to write_thread - } - } - } - - async fn handle_online(self, command: Command, connection: Connected) { - match command { - Command::GetRoster(result_sender) => { - // TODO: jid resource should probably be stored within the connection - debug!("before client_jid lock"); - debug!("after client_jid lock"); - let iq_id = Uuid::new_v4().to_string(); - let (send, iq_recv) = oneshot::channel(); - { - self.pending.lock().await.insert(iq_id.clone(), send); - } - let stanza = Stanza::Iq(Iq { - from: Some(connection.jid().clone()), - id: iq_id.to_string(), - to: None, - r#type: IqType::Get, - lang: None, - query: Some(iq::Query::Roster(stanza::roster::Query { - ver: None, - items: Vec::new(), - })), - errors: Vec::new(), - }); - let (send, recv) = oneshot::channel(); - let _ = connection - .write_handle() - .send(WriteMessage { - stanza, - respond_to: send, - }) - .await; - // TODO: timeout - match recv.await { - Ok(Ok(())) => info!("roster request sent"), - Ok(Err(e)) => { - // TODO: log errors if fail to send - let _ = result_sender.send(Err(RosterError::Write(e.into()))); - return; - } - Err(e) => { - let _ = result_sender - .send(Err(RosterError::Write(WriteError::Actor(e.into())))); - return; - } - }; - // TODO: timeout - match iq_recv.await { - Ok(Ok(stanza)) => match stanza { - Stanza::Iq(Iq { - from: _, - id, - to: _, - r#type, - lang: _, - query: Some(iq::Query::Roster(stanza::roster::Query { ver: _, items })), - errors: _, - }) if id == iq_id && r#type == IqType::Result => { - let contacts: Vec<Contact> = - items.into_iter().map(|item| item.into()).collect(); - if let Err(e) = self.db.replace_cached_roster(contacts.clone()).await { - self.update_sender - .send(UpdateMessage::Error(Error::Roster(RosterError::Cache( - e.into(), - )))) - .await; - }; - result_sender.send(Ok(contacts)); - return; - } - ref s @ Stanza::Iq(Iq { - from: _, - ref id, - to: _, - r#type, - lang: _, - query: _, - ref errors, - }) if *id == iq_id && r#type == IqType::Error => { - if let Some(error) = errors.first() { - result_sender.send(Err(RosterError::StanzaError(error.clone()))); - } else { - result_sender.send(Err(RosterError::UnexpectedStanza(s.clone()))); - } - return; - } - s => { - result_sender.send(Err(RosterError::UnexpectedStanza(s))); - return; - } - }, - Ok(Err(e)) => { - result_sender.send(Err(RosterError::Read(e))); - return; - } - Err(e) => { - result_sender.send(Err(RosterError::Write(WriteError::Actor(e.into())))); - return; - } - } - } - Command::GetChats(sender) => { - let chats = self.db.read_chats().await.map_err(|e| e.into()); - sender.send(chats); - } - Command::GetChatsOrdered(sender) => { - let chats = self.db.read_chats_ordered().await.map_err(|e| e.into()); - sender.send(chats); - } - Command::GetChatsOrderedWithLatestMessages(sender) => { - let chats = self - .db - .read_chats_ordered_with_latest_messages() - .await - .map_err(|e| e.into()); - sender.send(chats); - } - Command::GetChat(jid, sender) => { - let chats = self.db.read_chat(jid).await.map_err(|e| e.into()); - sender.send(chats); - } - Command::GetMessages(jid, sender) => { - let messages = self - .db - .read_message_history(jid) - .await - .map_err(|e| e.into()); - sender.send(messages); - } - Command::DeleteChat(jid, sender) => { - let result = self.db.delete_chat(jid).await.map_err(|e| e.into()); - sender.send(result); - } - Command::DeleteMessage(uuid, sender) => { - let result = self.db.delete_message(uuid).await.map_err(|e| e.into()); - sender.send(result); - } - Command::GetUser(jid, sender) => { - let user = self.db.read_user(jid).await.map_err(|e| e.into()); - sender.send(user); - } - // TODO: offline queue to modify roster - Command::AddContact(jid, sender) => { - let iq_id = Uuid::new_v4().to_string(); - let set_stanza = Stanza::Iq(Iq { - from: Some(connection.jid().clone()), - id: iq_id.clone(), - to: None, - r#type: IqType::Set, - lang: None, - query: Some(iq::Query::Roster(stanza::roster::Query { - ver: None, - items: vec![stanza::roster::Item { - approved: None, - ask: false, - jid, - name: None, - subscription: None, - groups: Vec::new(), - }], - })), - errors: Vec::new(), - }); - let (send, recv) = oneshot::channel(); - { - self.pending.lock().await.insert(iq_id.clone(), send); - } - // TODO: write_handle send helper function - let result = connection.write_handle().write(set_stanza).await; - if let Err(e) = result { - sender.send(Err(RosterError::Write(e))); - return; - } - let iq_result = recv.await; - match iq_result { - Ok(i) => match i { - Ok(iq_result) => match iq_result { - Stanza::Iq(Iq { - from: _, - id, - to: _, - r#type, - lang: _, - query: _, - errors: _, - }) if id == iq_id && r#type == IqType::Result => { - sender.send(Ok(())); - return; - } - ref s @ Stanza::Iq(Iq { - from: _, - ref id, - to: _, - r#type, - lang: _, - query: _, - ref errors, - }) if *id == iq_id && r#type == IqType::Error => { - if let Some(error) = errors.first() { - sender.send(Err(RosterError::StanzaError(error.clone()))); - } else { - sender.send(Err(RosterError::UnexpectedStanza(s.clone()))); - } - return; - } - s => { - sender.send(Err(RosterError::UnexpectedStanza(s))); - return; - } - }, - Err(e) => { - sender.send(Err(e.into())); - return; - } - }, - Err(e) => { - sender.send(Err(RosterError::Write(WriteError::Actor(e.into())))); - return; - } - } - } - Command::BuddyRequest(jid, sender) => { - let presence = Stanza::Presence(stanza::client::presence::Presence { - from: None, - id: None, - to: Some(jid.clone()), - r#type: Some(stanza::client::presence::PresenceType::Subscribe), - lang: None, - show: None, - status: None, - priority: None, - errors: Vec::new(), - delay: None, - }); - let result = connection.write_handle().write(presence).await; - match result { - Err(_) => { - let _ = sender.send(result); - } - Ok(()) => { - let presence = Stanza::Presence(stanza::client::presence::Presence { - from: None, - id: None, - to: Some(jid), - r#type: Some(stanza::client::presence::PresenceType::Subscribed), - lang: None, - show: None, - status: None, - priority: None, - errors: Vec::new(), - delay: None, - }); - let result = connection.write_handle().write(presence).await; - let _ = sender.send(result); - } - } - } - Command::SubscriptionRequest(jid, sender) => { - // TODO: i should probably have builders - let presence = Stanza::Presence(stanza::client::presence::Presence { - from: None, - id: None, - to: Some(jid), - r#type: Some(stanza::client::presence::PresenceType::Subscribe), - lang: None, - show: None, - status: None, - priority: None, - errors: Vec::new(), - delay: None, - }); - let result = connection.write_handle().write(presence).await; - let _ = sender.send(result); - } - Command::AcceptBuddyRequest(jid, sender) => { - let presence = Stanza::Presence(stanza::client::presence::Presence { - from: None, - id: None, - to: Some(jid.clone()), - r#type: Some(stanza::client::presence::PresenceType::Subscribed), - lang: None, - show: None, - status: None, - priority: None, - errors: Vec::new(), - delay: None, - }); - let result = connection.write_handle().write(presence).await; - match result { - Err(_) => { - let _ = sender.send(result); - } - Ok(()) => { - let presence = Stanza::Presence(stanza::client::presence::Presence { - from: None, - id: None, - to: Some(jid), - r#type: Some(stanza::client::presence::PresenceType::Subscribe), - lang: None, - show: None, - status: None, - priority: None, - errors: Vec::new(), - delay: None, - }); - let result = connection.write_handle().write(presence).await; - let _ = sender.send(result); - } - } - } - Command::AcceptSubscriptionRequest(jid, sender) => { - let presence = Stanza::Presence(stanza::client::presence::Presence { - from: None, - id: None, - to: Some(jid), - r#type: Some(stanza::client::presence::PresenceType::Subscribe), - lang: None, - show: None, - status: None, - priority: None, - errors: Vec::new(), - delay: None, - }); - let result = connection.write_handle().write(presence).await; - let _ = sender.send(result); - } - Command::UnsubscribeFromContact(jid, sender) => { - let presence = Stanza::Presence(stanza::client::presence::Presence { - from: None, - id: None, - to: Some(jid), - r#type: Some(stanza::client::presence::PresenceType::Unsubscribe), - lang: None, - show: None, - status: None, - priority: None, - errors: Vec::new(), - delay: None, - }); - let result = connection.write_handle().write(presence).await; - let _ = sender.send(result); - } - Command::UnsubscribeContact(jid, sender) => { - let presence = Stanza::Presence(stanza::client::presence::Presence { - from: None, - id: None, - to: Some(jid), - r#type: Some(stanza::client::presence::PresenceType::Unsubscribed), - lang: None, - show: None, - status: None, - priority: None, - errors: Vec::new(), - delay: None, - }); - let result = connection.write_handle().write(presence).await; - let _ = sender.send(result); - } - Command::UnfriendContact(jid, sender) => { - let presence = Stanza::Presence(stanza::client::presence::Presence { - from: None, - id: None, - to: Some(jid.clone()), - r#type: Some(stanza::client::presence::PresenceType::Unsubscribe), - lang: None, - show: None, - status: None, - priority: None, - errors: Vec::new(), - delay: None, - }); - let result = connection.write_handle().write(presence).await; - match result { - Err(_) => { - let _ = sender.send(result); - } - Ok(()) => { - let presence = Stanza::Presence(stanza::client::presence::Presence { - from: None, - id: None, - to: Some(jid), - r#type: Some(stanza::client::presence::PresenceType::Unsubscribed), - lang: None, - show: None, - status: None, - priority: None, - errors: Vec::new(), - delay: None, - }); - let result = connection.write_handle().write(presence).await; - let _ = sender.send(result); - } - } - } - Command::DeleteContact(jid, sender) => { - let iq_id = Uuid::new_v4().to_string(); - let set_stanza = Stanza::Iq(Iq { - from: Some(connection.jid().clone()), - id: iq_id.clone(), - to: None, - r#type: IqType::Set, - lang: None, - query: Some(iq::Query::Roster(stanza::roster::Query { - ver: None, - items: vec![stanza::roster::Item { - approved: None, - ask: false, - jid, - name: None, - subscription: Some(stanza::roster::Subscription::Remove), - groups: Vec::new(), - }], - })), - errors: Vec::new(), - }); - let (send, recv) = oneshot::channel(); - { - self.pending.lock().await.insert(iq_id.clone(), send); - } - let result = connection.write_handle().write(set_stanza).await; - if let Err(e) = result { - sender.send(Err(RosterError::Write(e))); - return; - } - let iq_result = recv.await; - match iq_result { - Ok(i) => match i { - Ok(iq_result) => match iq_result { - Stanza::Iq(Iq { - from: _, - id, - to: _, - r#type, - lang: _, - query: _, - errors: _, - }) if id == iq_id && r#type == IqType::Result => { - sender.send(Ok(())); - return; - } - ref s @ Stanza::Iq(Iq { - from: _, - ref id, - to: _, - r#type, - lang: _, - query: _, - ref errors, - }) if *id == iq_id && r#type == IqType::Error => { - if let Some(error) = errors.first() { - sender.send(Err(RosterError::StanzaError(error.clone()))); - } else { - sender.send(Err(RosterError::UnexpectedStanza(s.clone()))); - } - return; - } - s => { - sender.send(Err(RosterError::UnexpectedStanza(s))); - return; - } - }, - Err(e) => { - sender.send(Err(e.into())); - return; - } - }, - Err(e) => { - sender.send(Err(RosterError::Write(WriteError::Actor(e.into())))); - return; - } - } - } - Command::UpdateContact(jid, contact_update, sender) => { - let iq_id = Uuid::new_v4().to_string(); - let groups = Vec::from_iter( - contact_update - .groups - .into_iter() - .map(|group| stanza::roster::Group(Some(group))), - ); - let set_stanza = Stanza::Iq(Iq { - from: Some(connection.jid().clone()), - id: iq_id.clone(), - to: None, - r#type: IqType::Set, - lang: None, - query: Some(iq::Query::Roster(stanza::roster::Query { - ver: None, - items: vec![stanza::roster::Item { - approved: None, - ask: false, - jid, - name: contact_update.name, - subscription: None, - groups, - }], - })), - errors: Vec::new(), - }); - let (send, recv) = oneshot::channel(); - { - self.pending.lock().await.insert(iq_id.clone(), send); - } - let result = connection.write_handle().write(set_stanza).await; - if let Err(e) = result { - sender.send(Err(RosterError::Write(e))); - return; - } - let iq_result = recv.await; - match iq_result { - Ok(i) => match i { - Ok(iq_result) => match iq_result { - Stanza::Iq(Iq { - from: _, - id, - to: _, - r#type, - lang: _, - query: _, - errors: _, - }) if id == iq_id && r#type == IqType::Result => { - sender.send(Ok(())); - return; - } - ref s @ Stanza::Iq(Iq { - from: _, - ref id, - to: _, - r#type, - lang: _, - query: _, - ref errors, - }) if *id == iq_id && r#type == IqType::Error => { - if let Some(error) = errors.first() { - sender.send(Err(RosterError::StanzaError(error.clone()))); - } else { - sender.send(Err(RosterError::UnexpectedStanza(s.clone()))); - } - return; - } - s => { - sender.send(Err(RosterError::UnexpectedStanza(s))); - return; - } - }, - Err(e) => { - sender.send(Err(e.into())); - return; - } - }, - Err(e) => { - sender.send(Err(RosterError::Write(WriteError::Actor(e.into())))); - return; - } - } - } - Command::SetStatus(online, sender) => { - let result = self.db.upsert_cached_status(online.clone()).await; - if let Err(e) = result { - let _ = self - .update_sender - .send(UpdateMessage::Error(Error::SetStatus(StatusError::Cache( - e.into(), - )))) - .await; - } - let result = connection - .write_handle() - .write(Stanza::Presence(online.into_stanza(None))) - .await - .map_err(|e| StatusError::Write(e)); - // .map_err(|e| StatusError::Write(e)); - let _ = sender.send(result); - } - // TODO: offline message queue - Command::SendMessage(jid, body, sender) => { - let id = Uuid::new_v4(); - let message = Stanza::Message(stanza::client::message::Message { - from: Some(connection.jid().clone()), - id: Some(id.to_string()), - to: Some(jid.clone()), - // TODO: specify message type - r#type: stanza::client::message::MessageType::Chat, - // TODO: lang ? - lang: None, - subject: None, - body: Some(stanza::client::message::Body { - lang: None, - body: Some(body.body.clone()), - }), - thread: None, - delay: None, - }); - let _ = sender.send(Ok(())); - // let _ = sender.send(Ok(message.clone())); - let result = connection.write_handle().write(message).await; - match result { - Ok(_) => { - let mut message = Message { - id, - from: connection.jid().clone(), - body, - timestamp: Utc::now(), - }; - info!("send message {:?}", message); - if let Err(e) = self - .db - .create_message_with_self_resource_and_chat( - message.clone(), - jid.clone(), - ) - .await - .map_err(|e| e.into()) - { - tracing::error!("{}", e); - let _ = - self.update_sender - .send(UpdateMessage::Error(Error::MessageSend( - error::MessageSendError::MessageHistory(e), - ))); - } - // TODO: don't do this, have separate from from details - message.from = message.from.as_bare(); - let _ = self - .update_sender - .send(UpdateMessage::Message { to: jid, message }) - .await; - } - Err(_) => { - // let _ = sender.send(result); - } - } - } - Command::SendPresence(jid, presence, sender) => { - let mut presence: stanza::client::presence::Presence = presence.into(); - if let Some(jid) = jid { - presence.to = Some(jid); - }; - let result = connection - .write_handle() - .write(Stanza::Presence(presence)) - .await; - // .map_err(|e| StatusError::Write(e)); - let _ = sender.send(result); - } - } - } - - async fn handle_offline(self, command: Command) { - match command { - Command::GetRoster(sender) => { - let roster = self.db.read_cached_roster().await; - match roster { - Ok(roster) => { - let _ = sender.send(Ok(roster)); - } - Err(e) => { - let _ = sender.send(Err(RosterError::Cache(e.into()))); - } - } - } - Command::GetChats(sender) => { - let chats = self.db.read_chats().await.map_err(|e| e.into()); - sender.send(chats); - } - Command::GetChatsOrdered(sender) => { - let chats = self.db.read_chats_ordered().await.map_err(|e| e.into()); - sender.send(chats); - } - Command::GetChatsOrderedWithLatestMessages(sender) => { - let chats = self - .db - .read_chats_ordered_with_latest_messages() - .await - .map_err(|e| e.into()); - sender.send(chats); - } - Command::GetChat(jid, sender) => { - let chats = self.db.read_chat(jid).await.map_err(|e| e.into()); - sender.send(chats); - } - Command::GetMessages(jid, sender) => { - let messages = self - .db - .read_message_history(jid) - .await - .map_err(|e| e.into()); - sender.send(messages); - } - Command::DeleteChat(jid, sender) => { - let result = self.db.delete_chat(jid).await.map_err(|e| e.into()); - sender.send(result); - } - Command::DeleteMessage(uuid, sender) => { - let result = self.db.delete_message(uuid).await.map_err(|e| e.into()); - sender.send(result); - } - Command::GetUser(jid, sender) => { - let user = self.db.read_user(jid).await.map_err(|e| e.into()); - sender.send(user); - } - // TODO: offline queue to modify roster - Command::AddContact(_jid, sender) => { - sender.send(Err(RosterError::Write(WriteError::Disconnected))); - } - Command::BuddyRequest(_jid, sender) => { - sender.send(Err(WriteError::Disconnected)); - } - Command::SubscriptionRequest(_jid, sender) => { - sender.send(Err(WriteError::Disconnected)); - } - Command::AcceptBuddyRequest(_jid, sender) => { - sender.send(Err(WriteError::Disconnected)); - } - Command::AcceptSubscriptionRequest(_jid, sender) => { - sender.send(Err(WriteError::Disconnected)); - } - Command::UnsubscribeFromContact(_jid, sender) => { - sender.send(Err(WriteError::Disconnected)); - } - Command::UnsubscribeContact(_jid, sender) => { - sender.send(Err(WriteError::Disconnected)); - } - Command::UnfriendContact(_jid, sender) => { - sender.send(Err(WriteError::Disconnected)); - } - Command::DeleteContact(_jid, sender) => { - sender.send(Err(RosterError::Write(WriteError::Disconnected))); - } - Command::UpdateContact(_jid, _contact_update, sender) => { - sender.send(Err(RosterError::Write(WriteError::Disconnected))); - } - Command::SetStatus(online, sender) => { - let result = self - .db - .upsert_cached_status(online) - .await - .map_err(|e| StatusError::Cache(e.into())); - sender.send(result); - } - // TODO: offline message queue - Command::SendMessage(_jid, _body, sender) => { - sender.send(Err(WriteError::Disconnected)); - } - Command::SendPresence(_jid, _presence, sender) => { - sender.send(Err(WriteError::Disconnected)); - } - } - } - // pub async fn handle_stream_error(self, error) {} - // stanza errors (recoverable) - // pub async fn handle_error(self, error: Error) {} - // when it aborts, must clear iq map no matter what - async fn on_abort(self) { - let mut iqs = self.pending.lock().await; - for (_id, sender) in iqs.drain() { - let _ = sender.send(Err(ReadError::LostConnection)); - } - } - - async fn handle_connection_error(self, error: ConnectionError) { - self.update_sender - .send(UpdateMessage::Error( - ConnectionError::AlreadyConnected.into(), - )) - .await; - } -} - impl From<Command> for CoreClientCommand<Command> { fn from(value: Command) -> Self { CoreClientCommand::Command(value) |