aboutsummaryrefslogtreecommitdiffstats
path: root/filamento/src/logic/online.rs
diff options
context:
space:
mode:
Diffstat (limited to 'filamento/src/logic/online.rs')
-rw-r--r--filamento/src/logic/online.rs661
1 files changed, 661 insertions, 0 deletions
diff --git a/filamento/src/logic/online.rs b/filamento/src/logic/online.rs
new file mode 100644
index 0000000..e8cbb33
--- /dev/null
+++ b/filamento/src/logic/online.rs
@@ -0,0 +1,661 @@
+use chrono::Utc;
+use lampada::{Connected, WriteMessage, error::WriteError};
+use stanza::client::{
+ Stanza,
+ iq::{self, Iq, IqType},
+};
+use tokio::sync::oneshot;
+use tracing::{debug, info};
+use uuid::Uuid;
+
+use crate::{
+ Command, UpdateMessage,
+ chat::Message,
+ error::{Error, MessageSendError, RosterError, StatusError},
+ roster::Contact,
+};
+
+use super::ClientLogic;
+
+pub async fn handle_online(logic: ClientLogic, command: Command, connection: Connected) {
+ match command {
+ Command::GetRoster(result_sender) => {
+ let iq_id = Uuid::new_v4().to_string();
+ let (send, iq_recv) = oneshot::channel();
+ {
+ logic.pending().lock().await.insert(iq_id.clone(), send);
+ }
+ let stanza = Stanza::Iq(Iq {
+ from: Some(connection.jid().clone()),
+ id: iq_id.to_string(),
+ to: None,
+ r#type: IqType::Get,
+ lang: None,
+ query: Some(iq::Query::Roster(stanza::roster::Query {
+ ver: None,
+ items: Vec::new(),
+ })),
+ errors: Vec::new(),
+ });
+ let (send, recv) = oneshot::channel();
+ let _ = connection
+ .write_handle()
+ .send(WriteMessage {
+ stanza,
+ respond_to: send,
+ })
+ .await;
+ // TODO: timeout
+ match recv.await {
+ Ok(Ok(())) => info!("roster request sent"),
+ Ok(Err(e)) => {
+ // TODO: log errors if fail to send
+ let _ = result_sender.send(Err(RosterError::Write(e.into())));
+ return;
+ }
+ Err(e) => {
+ let _ =
+ result_sender.send(Err(RosterError::Write(WriteError::Actor(e.into()))));
+ return;
+ }
+ };
+ // TODO: timeout
+ match iq_recv.await {
+ Ok(Ok(stanza)) => match stanza {
+ Stanza::Iq(Iq {
+ from: _,
+ id,
+ to: _,
+ r#type,
+ lang: _,
+ query: Some(iq::Query::Roster(stanza::roster::Query { ver: _, items })),
+ errors: _,
+ }) if id == iq_id && r#type == IqType::Result => {
+ let contacts: Vec<Contact> =
+ items.into_iter().map(|item| item.into()).collect();
+ if let Err(e) = logic.db().replace_cached_roster(contacts.clone()).await {
+ logic
+ .update_sender()
+ .send(UpdateMessage::Error(Error::Roster(RosterError::Cache(
+ e.into(),
+ ))))
+ .await;
+ };
+ result_sender.send(Ok(contacts));
+ return;
+ }
+ ref s @ Stanza::Iq(Iq {
+ from: _,
+ ref id,
+ to: _,
+ r#type,
+ lang: _,
+ query: _,
+ ref errors,
+ }) if *id == iq_id && r#type == IqType::Error => {
+ if let Some(error) = errors.first() {
+ result_sender.send(Err(RosterError::StanzaError(error.clone())));
+ } else {
+ result_sender.send(Err(RosterError::UnexpectedStanza(s.clone())));
+ }
+ return;
+ }
+ s => {
+ result_sender.send(Err(RosterError::UnexpectedStanza(s)));
+ return;
+ }
+ },
+ Ok(Err(e)) => {
+ result_sender.send(Err(RosterError::Read(e)));
+ return;
+ }
+ Err(e) => {
+ result_sender.send(Err(RosterError::Write(WriteError::Actor(e.into()))));
+ return;
+ }
+ }
+ }
+ Command::GetChats(sender) => {
+ let chats = logic.db().read_chats().await.map_err(|e| e.into());
+ sender.send(chats);
+ }
+ Command::GetChatsOrdered(sender) => {
+ let chats = logic.db().read_chats_ordered().await.map_err(|e| e.into());
+ sender.send(chats);
+ }
+ Command::GetChatsOrderedWithLatestMessages(sender) => {
+ let chats = logic
+ .db()
+ .read_chats_ordered_with_latest_messages()
+ .await
+ .map_err(|e| e.into());
+ sender.send(chats);
+ }
+ Command::GetChat(jid, sender) => {
+ let chats = logic.db().read_chat(jid).await.map_err(|e| e.into());
+ sender.send(chats);
+ }
+ Command::GetMessages(jid, sender) => {
+ let messages = logic
+ .db()
+ .read_message_history(jid)
+ .await
+ .map_err(|e| e.into());
+ sender.send(messages);
+ }
+ Command::DeleteChat(jid, sender) => {
+ let result = logic.db().delete_chat(jid).await.map_err(|e| e.into());
+ sender.send(result);
+ }
+ Command::DeleteMessage(uuid, sender) => {
+ let result = logic.db().delete_message(uuid).await.map_err(|e| e.into());
+ sender.send(result);
+ }
+ Command::GetUser(jid, sender) => {
+ let user = logic.db().read_user(jid).await.map_err(|e| e.into());
+ sender.send(user);
+ }
+ // TODO: offline queue to modify roster
+ Command::AddContact(jid, sender) => {
+ let iq_id = Uuid::new_v4().to_string();
+ let set_stanza = Stanza::Iq(Iq {
+ from: Some(connection.jid().clone()),
+ id: iq_id.clone(),
+ to: None,
+ r#type: IqType::Set,
+ lang: None,
+ query: Some(iq::Query::Roster(stanza::roster::Query {
+ ver: None,
+ items: vec![stanza::roster::Item {
+ approved: None,
+ ask: false,
+ jid,
+ name: None,
+ subscription: None,
+ groups: Vec::new(),
+ }],
+ })),
+ errors: Vec::new(),
+ });
+ let (send, recv) = oneshot::channel();
+ {
+ logic.pending().lock().await.insert(iq_id.clone(), send);
+ }
+ // TODO: write_handle send helper function
+ let result = connection.write_handle().write(set_stanza).await;
+ if let Err(e) = result {
+ sender.send(Err(RosterError::Write(e)));
+ return;
+ }
+ let iq_result = recv.await;
+ match iq_result {
+ Ok(i) => match i {
+ Ok(iq_result) => match iq_result {
+ Stanza::Iq(Iq {
+ from: _,
+ id,
+ to: _,
+ r#type,
+ lang: _,
+ query: _,
+ errors: _,
+ }) if id == iq_id && r#type == IqType::Result => {
+ sender.send(Ok(()));
+ return;
+ }
+ ref s @ Stanza::Iq(Iq {
+ from: _,
+ ref id,
+ to: _,
+ r#type,
+ lang: _,
+ query: _,
+ ref errors,
+ }) if *id == iq_id && r#type == IqType::Error => {
+ if let Some(error) = errors.first() {
+ sender.send(Err(RosterError::StanzaError(error.clone())));
+ } else {
+ sender.send(Err(RosterError::UnexpectedStanza(s.clone())));
+ }
+ return;
+ }
+ s => {
+ sender.send(Err(RosterError::UnexpectedStanza(s)));
+ return;
+ }
+ },
+ Err(e) => {
+ sender.send(Err(e.into()));
+ return;
+ }
+ },
+ Err(e) => {
+ sender.send(Err(RosterError::Write(WriteError::Actor(e.into()))));
+ return;
+ }
+ }
+ }
+ Command::BuddyRequest(jid, sender) => {
+ let presence = Stanza::Presence(stanza::client::presence::Presence {
+ from: None,
+ id: None,
+ to: Some(jid.clone()),
+ r#type: Some(stanza::client::presence::PresenceType::Subscribe),
+ lang: None,
+ show: None,
+ status: None,
+ priority: None,
+ errors: Vec::new(),
+ delay: None,
+ });
+ let result = connection.write_handle().write(presence).await;
+ match result {
+ Err(_) => {
+ let _ = sender.send(result);
+ }
+ Ok(()) => {
+ let presence = Stanza::Presence(stanza::client::presence::Presence {
+ from: None,
+ id: None,
+ to: Some(jid),
+ r#type: Some(stanza::client::presence::PresenceType::Subscribed),
+ lang: None,
+ show: None,
+ status: None,
+ priority: None,
+ errors: Vec::new(),
+ delay: None,
+ });
+ let result = connection.write_handle().write(presence).await;
+ let _ = sender.send(result);
+ }
+ }
+ }
+ Command::SubscriptionRequest(jid, sender) => {
+ // TODO: i should probably have builders
+ let presence = Stanza::Presence(stanza::client::presence::Presence {
+ from: None,
+ id: None,
+ to: Some(jid),
+ r#type: Some(stanza::client::presence::PresenceType::Subscribe),
+ lang: None,
+ show: None,
+ status: None,
+ priority: None,
+ errors: Vec::new(),
+ delay: None,
+ });
+ let result = connection.write_handle().write(presence).await;
+ let _ = sender.send(result);
+ }
+ Command::AcceptBuddyRequest(jid, sender) => {
+ let presence = Stanza::Presence(stanza::client::presence::Presence {
+ from: None,
+ id: None,
+ to: Some(jid.clone()),
+ r#type: Some(stanza::client::presence::PresenceType::Subscribed),
+ lang: None,
+ show: None,
+ status: None,
+ priority: None,
+ errors: Vec::new(),
+ delay: None,
+ });
+ let result = connection.write_handle().write(presence).await;
+ match result {
+ Err(_) => {
+ let _ = sender.send(result);
+ }
+ Ok(()) => {
+ let presence = Stanza::Presence(stanza::client::presence::Presence {
+ from: None,
+ id: None,
+ to: Some(jid),
+ r#type: Some(stanza::client::presence::PresenceType::Subscribe),
+ lang: None,
+ show: None,
+ status: None,
+ priority: None,
+ errors: Vec::new(),
+ delay: None,
+ });
+ let result = connection.write_handle().write(presence).await;
+ let _ = sender.send(result);
+ }
+ }
+ }
+ Command::AcceptSubscriptionRequest(jid, sender) => {
+ let presence = Stanza::Presence(stanza::client::presence::Presence {
+ from: None,
+ id: None,
+ to: Some(jid),
+ r#type: Some(stanza::client::presence::PresenceType::Subscribe),
+ lang: None,
+ show: None,
+ status: None,
+ priority: None,
+ errors: Vec::new(),
+ delay: None,
+ });
+ let result = connection.write_handle().write(presence).await;
+ let _ = sender.send(result);
+ }
+ Command::UnsubscribeFromContact(jid, sender) => {
+ let presence = Stanza::Presence(stanza::client::presence::Presence {
+ from: None,
+ id: None,
+ to: Some(jid),
+ r#type: Some(stanza::client::presence::PresenceType::Unsubscribe),
+ lang: None,
+ show: None,
+ status: None,
+ priority: None,
+ errors: Vec::new(),
+ delay: None,
+ });
+ let result = connection.write_handle().write(presence).await;
+ let _ = sender.send(result);
+ }
+ Command::UnsubscribeContact(jid, sender) => {
+ let presence = Stanza::Presence(stanza::client::presence::Presence {
+ from: None,
+ id: None,
+ to: Some(jid),
+ r#type: Some(stanza::client::presence::PresenceType::Unsubscribed),
+ lang: None,
+ show: None,
+ status: None,
+ priority: None,
+ errors: Vec::new(),
+ delay: None,
+ });
+ let result = connection.write_handle().write(presence).await;
+ let _ = sender.send(result);
+ }
+ Command::UnfriendContact(jid, sender) => {
+ let presence = Stanza::Presence(stanza::client::presence::Presence {
+ from: None,
+ id: None,
+ to: Some(jid.clone()),
+ r#type: Some(stanza::client::presence::PresenceType::Unsubscribe),
+ lang: None,
+ show: None,
+ status: None,
+ priority: None,
+ errors: Vec::new(),
+ delay: None,
+ });
+ let result = connection.write_handle().write(presence).await;
+ match result {
+ Err(_) => {
+ let _ = sender.send(result);
+ }
+ Ok(()) => {
+ let presence = Stanza::Presence(stanza::client::presence::Presence {
+ from: None,
+ id: None,
+ to: Some(jid),
+ r#type: Some(stanza::client::presence::PresenceType::Unsubscribed),
+ lang: None,
+ show: None,
+ status: None,
+ priority: None,
+ errors: Vec::new(),
+ delay: None,
+ });
+ let result = connection.write_handle().write(presence).await;
+ let _ = sender.send(result);
+ }
+ }
+ }
+ Command::DeleteContact(jid, sender) => {
+ let iq_id = Uuid::new_v4().to_string();
+ let set_stanza = Stanza::Iq(Iq {
+ from: Some(connection.jid().clone()),
+ id: iq_id.clone(),
+ to: None,
+ r#type: IqType::Set,
+ lang: None,
+ query: Some(iq::Query::Roster(stanza::roster::Query {
+ ver: None,
+ items: vec![stanza::roster::Item {
+ approved: None,
+ ask: false,
+ jid,
+ name: None,
+ subscription: Some(stanza::roster::Subscription::Remove),
+ groups: Vec::new(),
+ }],
+ })),
+ errors: Vec::new(),
+ });
+ let (send, recv) = oneshot::channel();
+ {
+ logic.pending().lock().await.insert(iq_id.clone(), send);
+ }
+ let result = connection.write_handle().write(set_stanza).await;
+ if let Err(e) = result {
+ sender.send(Err(RosterError::Write(e)));
+ return;
+ }
+ let iq_result = recv.await;
+ match iq_result {
+ Ok(i) => match i {
+ Ok(iq_result) => match iq_result {
+ Stanza::Iq(Iq {
+ from: _,
+ id,
+ to: _,
+ r#type,
+ lang: _,
+ query: _,
+ errors: _,
+ }) if id == iq_id && r#type == IqType::Result => {
+ sender.send(Ok(()));
+ return;
+ }
+ ref s @ Stanza::Iq(Iq {
+ from: _,
+ ref id,
+ to: _,
+ r#type,
+ lang: _,
+ query: _,
+ ref errors,
+ }) if *id == iq_id && r#type == IqType::Error => {
+ if let Some(error) = errors.first() {
+ sender.send(Err(RosterError::StanzaError(error.clone())));
+ } else {
+ sender.send(Err(RosterError::UnexpectedStanza(s.clone())));
+ }
+ return;
+ }
+ s => {
+ sender.send(Err(RosterError::UnexpectedStanza(s)));
+ return;
+ }
+ },
+ Err(e) => {
+ sender.send(Err(e.into()));
+ return;
+ }
+ },
+ Err(e) => {
+ sender.send(Err(RosterError::Write(WriteError::Actor(e.into()))));
+ return;
+ }
+ }
+ }
+ Command::UpdateContact(jid, contact_update, sender) => {
+ let iq_id = Uuid::new_v4().to_string();
+ let groups = Vec::from_iter(
+ contact_update
+ .groups
+ .into_iter()
+ .map(|group| stanza::roster::Group(Some(group))),
+ );
+ let set_stanza = Stanza::Iq(Iq {
+ from: Some(connection.jid().clone()),
+ id: iq_id.clone(),
+ to: None,
+ r#type: IqType::Set,
+ lang: None,
+ query: Some(iq::Query::Roster(stanza::roster::Query {
+ ver: None,
+ items: vec![stanza::roster::Item {
+ approved: None,
+ ask: false,
+ jid,
+ name: contact_update.name,
+ subscription: None,
+ groups,
+ }],
+ })),
+ errors: Vec::new(),
+ });
+ let (send, recv) = oneshot::channel();
+ {
+ logic.pending().lock().await.insert(iq_id.clone(), send);
+ }
+ let result = connection.write_handle().write(set_stanza).await;
+ if let Err(e) = result {
+ sender.send(Err(RosterError::Write(e)));
+ return;
+ }
+ let iq_result = recv.await;
+ match iq_result {
+ Ok(i) => match i {
+ Ok(iq_result) => match iq_result {
+ Stanza::Iq(Iq {
+ from: _,
+ id,
+ to: _,
+ r#type,
+ lang: _,
+ query: _,
+ errors: _,
+ }) if id == iq_id && r#type == IqType::Result => {
+ sender.send(Ok(()));
+ return;
+ }
+ ref s @ Stanza::Iq(Iq {
+ from: _,
+ ref id,
+ to: _,
+ r#type,
+ lang: _,
+ query: _,
+ ref errors,
+ }) if *id == iq_id && r#type == IqType::Error => {
+ if let Some(error) = errors.first() {
+ sender.send(Err(RosterError::StanzaError(error.clone())));
+ } else {
+ sender.send(Err(RosterError::UnexpectedStanza(s.clone())));
+ }
+ return;
+ }
+ s => {
+ sender.send(Err(RosterError::UnexpectedStanza(s)));
+ return;
+ }
+ },
+ Err(e) => {
+ sender.send(Err(e.into()));
+ return;
+ }
+ },
+ Err(e) => {
+ sender.send(Err(RosterError::Write(WriteError::Actor(e.into()))));
+ return;
+ }
+ }
+ }
+ Command::SetStatus(online, sender) => {
+ let result = logic.db().upsert_cached_status(online.clone()).await;
+ if let Err(e) = result {
+ let _ = logic
+ .update_sender()
+ .send(UpdateMessage::Error(Error::SetStatus(StatusError::Cache(
+ e.into(),
+ ))))
+ .await;
+ }
+ let result = connection
+ .write_handle()
+ .write(Stanza::Presence(online.into_stanza(None)))
+ .await
+ .map_err(|e| StatusError::Write(e));
+ // .map_err(|e| StatusError::Write(e));
+ let _ = sender.send(result);
+ }
+ // TODO: offline message queue
+ Command::SendMessage(jid, body, sender) => {
+ let id = Uuid::new_v4();
+ let message = Stanza::Message(stanza::client::message::Message {
+ from: Some(connection.jid().clone()),
+ id: Some(id.to_string()),
+ to: Some(jid.clone()),
+ // TODO: specify message type
+ r#type: stanza::client::message::MessageType::Chat,
+ // TODO: lang ?
+ lang: None,
+ subject: None,
+ body: Some(stanza::client::message::Body {
+ lang: None,
+ body: Some(body.body.clone()),
+ }),
+ thread: None,
+ delay: None,
+ });
+ let _ = sender.send(Ok(()));
+ // let _ = sender.send(Ok(message.clone()));
+ let result = connection.write_handle().write(message).await;
+ match result {
+ Ok(_) => {
+ let mut message = Message {
+ id,
+ from: connection.jid().clone(),
+ body,
+ timestamp: Utc::now(),
+ };
+ info!("send message {:?}", message);
+ if let Err(e) = logic
+ .db()
+ .create_message_with_self_resource_and_chat(message.clone(), jid.clone())
+ .await
+ .map_err(|e| e.into())
+ {
+ tracing::error!("{}", e);
+ let _ =
+ logic
+ .update_sender()
+ .send(UpdateMessage::Error(Error::MessageSend(
+ MessageSendError::MessageHistory(e),
+ )));
+ }
+ // TODO: don't do this, have separate from from details
+ message.from = message.from.as_bare();
+ let _ = logic
+ .update_sender()
+ .send(UpdateMessage::Message { to: jid, message })
+ .await;
+ }
+ Err(_) => {
+ // let _ = sender.send(result);
+ }
+ }
+ }
+ Command::SendPresence(jid, presence, sender) => {
+ let mut presence: stanza::client::presence::Presence = presence.into();
+ if let Some(jid) = jid {
+ presence.to = Some(jid);
+ };
+ let result = connection
+ .write_handle()
+ .write(Stanza::Presence(presence))
+ .await;
+ // .map_err(|e| StatusError::Write(e));
+ let _ = sender.send(result);
+ }
+ }
+}