diff options
| author | 2025-02-20 21:08:16 +0000 | |
|---|---|---|
| committer | 2025-02-20 21:08:16 +0000 | |
| commit | 2e6ad369c51fa7e60df8c7deaa59ec7705c3ff98 (patch) | |
| tree | d6191b02372f66c918954acaa570736e544c0193 | |
| parent | c0d2aae0385aee7f1bb2ecb330e72e40b8fde6a2 (diff) | |
| download | luz-2e6ad369c51fa7e60df8c7deaa59ec7705c3ff98.tar.gz luz-2e6ad369c51fa7e60df8c7deaa59ec7705c3ff98.tar.bz2 luz-2e6ad369c51fa7e60df8c7deaa59ec7705c3ff98.zip | |
implement CLIENT
Diffstat (limited to '')
| -rw-r--r-- | luz/migrations/20240113011930_luz.sql | 2 | ||||
| -rw-r--r-- | luz/src/chat.rs | 4 | ||||
| -rw-r--r-- | luz/src/connection/read.rs | 200 | ||||
| -rw-r--r-- | luz/src/db/mod.rs | 65 | ||||
| -rw-r--r-- | luz/src/error.rs | 27 | ||||
| -rw-r--r-- | luz/src/lib.rs | 30 | ||||
| -rw-r--r-- | luz/src/main.rs | 26 | ||||
| -rw-r--r-- | luz/src/presence.rs | 4 | ||||
| -rw-r--r-- | luz/src/roster.rs | 3 | 
9 files changed, 339 insertions, 22 deletions
| diff --git a/luz/migrations/20240113011930_luz.sql b/luz/migrations/20240113011930_luz.sql index 082cc4b..028ae24 100644 --- a/luz/migrations/20240113011930_luz.sql +++ b/luz/migrations/20240113011930_luz.sql @@ -14,7 +14,7 @@ create table subscription(      state text primary key not null  ); -insert into subscription ( state ) values ('none'), ('pending-out'), ('pending-in'), ('only-out'), ('only-in'), ('out-pending-in'), ('in-pending-out'), ('buddy'); +insert into subscription ( state ) values ('none'), ('pending-out'), ('pending-in'), ('pending-in-pending-out'), ('only-out'), ('only-in'), ('out-pending-in'), ('in-pending-out'), ('buddy');  -- a roster contains users, with client-set nickname  CREATE TABLE roster(  diff --git a/luz/src/chat.rs b/luz/src/chat.rs index ff76ce1..4fb8579 100644 --- a/luz/src/chat.rs +++ b/luz/src/chat.rs @@ -1,7 +1,7 @@  use jid::JID;  use uuid::Uuid; -#[derive(Debug, sqlx::FromRow)] +#[derive(Debug, sqlx::FromRow, Clone)]  pub struct Message {      pub id: Uuid,      // does not contain full user information @@ -20,7 +20,7 @@ pub struct Message {  //     Outside,  // } -#[derive(Debug, sqlx::FromRow)] +#[derive(Debug, sqlx::FromRow, Clone)]  pub struct Body {      // TODO: rich text, other contents, threads      pub body: String, diff --git a/luz/src/connection/read.rs b/luz/src/connection/read.rs index 8f8c4a0..46f1dc9 100644 --- a/luz/src/connection/read.rs +++ b/luz/src/connection/read.rs @@ -1,6 +1,7 @@  use std::{      collections::HashMap,      ops::{Deref, DerefMut}, +    str::FromStr,      sync::Arc,      time::Duration,  }; @@ -12,10 +13,14 @@ use tokio::{      task::{JoinHandle, JoinSet},  };  use tracing::info; +use uuid::Uuid;  use crate::{ +    chat::{Body, Message},      db::Db, -    error::{Error, Reason}, +    error::{Error, IqError, PresenceError, Reason, RecvMessageError}, +    presence::{Offline, Online, Presence, Show}, +    roster::Contact,      UpdateMessage,  }; @@ -116,7 +121,7 @@ impl Read {                      println!("read stanza");                      match s {                          Ok(s) => { -                            self.tasks.spawn(handle_stanza(s, self.update_sender.clone(), self.db.clone(), self.supervisor_control.clone(), self.write_handle.clone())); +                            self.tasks.spawn(handle_stanza(s, self.update_sender.clone(), self.db.clone(), self.supervisor_control.clone(), self.write_handle.clone(), self.pending_iqs.clone()));                          },                          Err(e) => {                              println!("error: {:?}", e); @@ -173,8 +178,197 @@ async fn handle_stanza(      db: Db,      supervisor_control: mpsc::Sender<SupervisorCommand>,      write_handle: WriteHandle, +    pending_iqs: Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Reason>>>>>,  ) { -    println!("{:?}", stanza) +    match stanza { +        Stanza::Message(stanza_message) => { +            if let Some(from) = stanza_message.from { +                // TODO: group chat messages +                let message = Message { +                    id: stanza_message +                        .id +                        // TODO: proper id storage +                        .map(|id| Uuid::from_str(&id).unwrap_or_else(|_| Uuid::new_v4())) +                        .unwrap_or_else(|| Uuid::new_v4()), +                    from: from.clone(), +                    body: Body { +                        // TODO: should this be an option? +                        body: stanza_message +                            .body +                            .map(|body| body.body) +                            .unwrap_or_default() +                            .unwrap_or_default(), +                    }, +                }; +                // TODO: can this be more efficient? +                let result = db +                    .create_message_with_user_and_chat(message.clone(), from.clone()) +                    .await; +                if let Err(e) = result { +                    let _ = update_sender +                        .send(UpdateMessage::Error(Error::CacheUpdate(e.into()))) +                        .await; +                } +                let _ = update_sender +                    .send(UpdateMessage::Message { to: from, message }) +                    .await; +            } else { +                let _ = update_sender +                    .send(UpdateMessage::Error(Error::RecvMessage( +                        RecvMessageError::MissingFrom, +                    ))) +                    .await; +            } +        } +        Stanza::Presence(presence) => { +            if let Some(from) = presence.from { +                match presence.r#type { +                    Some(r#type) => match r#type { +                        // error processing a presence from somebody +                        stanza::client::presence::PresenceType::Error => { +                            // TODO: is there any other information that should go with the error? also MUST have an error, otherwise it's a different error. maybe it shoulnd't be an option. +                            let _ = update_sender +                                .send(UpdateMessage::Error(Error::Presence(PresenceError::Error( +                                    Reason::Stanza(presence.errors.first().cloned()), +                                )))) +                                .await; +                        } +                        // should not happen (error to server) +                        stanza::client::presence::PresenceType::Probe => { +                            // TODO: should probably write an error and restart stream +                            let _ = update_sender +                                .send(UpdateMessage::Error(Error::Presence( +                                    PresenceError::Unsupported, +                                ))) +                                .await; +                        } +                        stanza::client::presence::PresenceType::Subscribe => { +                            // may get a subscription request from somebody who is not a contact!!! therefore should be its own kind of event +                            let _ = update_sender +                                .send(UpdateMessage::SubscriptionRequest(from)) +                                .await; +                        } +                        stanza::client::presence::PresenceType::Unavailable => { +                            let offline = Offline { +                                status: presence.status.map(|status| status.status.0), +                            }; +                            let _ = update_sender +                                .send(UpdateMessage::Presence { +                                    from, +                                    presence: Presence::Offline(offline), +                                }) +                                .await; +                        } +                        // for now, do nothing, as these are simply informational. will receive roster push from the server regarding the changes to do with them. +                        stanza::client::presence::PresenceType::Subscribed => {} +                        stanza::client::presence::PresenceType::Unsubscribe => {} +                        stanza::client::presence::PresenceType::Unsubscribed => {} +                    }, +                    None => { +                        let online = Online { +                            show: presence.show.map(|show| match show { +                                stanza::client::presence::Show::Away => Show::Away, +                                stanza::client::presence::Show::Chat => Show::Chat, +                                stanza::client::presence::Show::Dnd => Show::DoNotDisturb, +                                stanza::client::presence::Show::Xa => Show::ExtendedAway, +                            }), +                            status: presence.status.map(|status| status.status.0), +                            priority: presence.priority.map(|priority| priority.0), +                        }; +                        let _ = update_sender +                            .send(UpdateMessage::Presence { +                                from, +                                presence: Presence::Online(online), +                            }) +                            .await; +                    } +                } +            } else { +                let _ = update_sender +                    .send(UpdateMessage::Error(Error::Presence( +                        PresenceError::MissingFrom, +                    ))) +                    .await; +            } +        } +        Stanza::Iq(iq) => match iq.r#type { +            stanza::client::iq::IqType::Error | stanza::client::iq::IqType::Result => { +                let send; +                { +                    send = pending_iqs.lock().await.remove(&iq.id); +                } +                if let Some(send) = send { +                    send.send(Ok(Stanza::Iq(iq))); +                } else { +                    let _ = update_sender +                        .send(UpdateMessage::Error(Error::Iq(IqError::NoMatchingId( +                            iq.id, +                        )))) +                        .await; +                } +            } +            // TODO: send unsupported to server +            // TODO: proper errors i am so tired please +            stanza::client::iq::IqType::Get => {} +            stanza::client::iq::IqType::Set => { +                if let Some(query) = iq.query { +                    match query { +                        stanza::client::iq::Query::Roster(mut query) => { +                            // TODO: there should only be one +                            if let Some(item) = query.items.pop() { +                                match item.subscription { +                                    Some(stanza::roster::Subscription::Remove) => { +                                        db.delete_contact(item.jid.clone()).await; +                                        update_sender +                                            .send(UpdateMessage::RosterDelete(item.jid)) +                                            .await; +                                        // TODO: send result +                                    } +                                    _ => { +                                        let contact: Contact = item.into(); +                                        if let Err(e) = db.upsert_contact(contact.clone()).await { +                                            let _ = update_sender +                                                .send(UpdateMessage::Error(Error::CacheUpdate( +                                                    e.into(), +                                                ))) +                                                .await; +                                        } +                                        let _ = update_sender +                                            .send(UpdateMessage::RosterUpdate(contact)) +                                            .await; +                                        // TODO: send result +                                        // write_handle.write(Stanza::Iq(stanza::client::iq::Iq { +                                        //     from: , +                                        //     id: todo!(), +                                        //     to: todo!(), +                                        //     r#type: todo!(), +                                        //     lang: todo!(), +                                        //     query: todo!(), +                                        //     errors: todo!(), +                                        // })); +                                    } +                                } +                            } +                        } +                        // TODO: send unsupported to server +                        _ => {} +                    } +                } else { +                    // TODO: send error (unsupported) to server +                } +            } +        }, +        Stanza::Error(error) => { +            let _ = update_sender +                .send(UpdateMessage::Error(Error::Stream(error))) +                .await; +            // TODO: reconnect +        } +        Stanza::OtherContent(content) => { +            let _ = update_sender.send(UpdateMessage::Error(Error::UnrecognizedContent(content))); +            // TODO: send error to write_thread +        } +    }  }  pub enum ReadControl { diff --git a/luz/src/db/mod.rs b/luz/src/db/mod.rs index 7557f70..4202163 100644 --- a/luz/src/db/mod.rs +++ b/luz/src/db/mod.rs @@ -160,6 +160,48 @@ impl Db {          Ok(())      } +    pub async fn upsert_contact(&self, contact: Contact) -> Result<(), Error> { +        sqlx::query!( +            "insert into users ( jid ) values ( ? ) on conflict do nothing", +            contact.user_jid, +        ) +        .execute(&self.db) +        .await?; +        sqlx::query!( +            "insert into roster ( user_jid, name, subscription ) values ( ?, ?, ? ) on conflict do update set name = ?, subscription = ?", +            contact.user_jid, +            contact.name, +            contact.subscription, +            contact.name, +            contact.subscription +        ) +        .execute(&self.db) +        .await?; +        sqlx::query!( +            "delete from groups_roster where contact_jid = ?", +            contact.user_jid +        ) +        .execute(&self.db) +        .await?; +        // TODO: delete orphaned groups from groups table +        for group in contact.groups { +            sqlx::query!( +                "insert into groups (group_name) values (?) on conflict do nothing", +                group +            ) +            .execute(&self.db) +            .await?; +            sqlx::query!( +                "insert into groups_roster (group_name, contact_jid) values (?, ?)", +                group, +                contact.user_jid +            ) +            .execute(&self.db) +            .await?; +        } +        Ok(()) +    } +      pub async fn delete_contact(&self, contact: JID) -> Result<(), Error> {          sqlx::query!("delete from roster where user_jid = ?", contact)              .execute(&self.db) @@ -290,6 +332,29 @@ impl Db {          Ok(())      } +    pub async fn create_message_with_user_and_chat( +        &self, +        message: Message, +        chat: JID, +    ) -> Result<(), Error> { +        sqlx::query!( +            "insert into users (jid) values (?) on conflict do nothing", +            chat +        ) +        .execute(&self.db) +        .await?; +        let id = Uuid::new_v4(); +        sqlx::query!( +            "insert into chats (id, correspondent) values (?, ?) on conflict do nothing", +            id, +            chat +        ) +        .execute(&self.db) +        .await?; +        self.create_message(message, chat).await?; +        Ok(()) +    } +      pub async fn read_message(&self, message: Uuid) -> Result<Message, Error> {          let message: Message = sqlx::query_as("select * from messages where id = ?")              .bind(message) diff --git a/luz/src/error.rs b/luz/src/error.rs index fbbcd2b..4fdce79 100644 --- a/luz/src/error.rs +++ b/luz/src/error.rs @@ -6,12 +6,35 @@ pub enum Error {      AlreadyConnected,      // TODO: change to Connecting(ConnectingError)      Connection(ConnectionError), -    Presence(Reason), +    Presence(PresenceError), +    SetStatus(Reason),      Roster(Reason), +    Stream(stanza::stream::Error),      SendMessage(Reason), +    RecvMessage(RecvMessageError),      AlreadyDisconnected,      LostConnection, +    // TODO: should all cache update errors include the context?      CacheUpdate(Reason), +    UnrecognizedContent(peanuts::element::Content), +    Iq(IqError), +} + +#[derive(Debug)] +pub enum PresenceError { +    Error(Reason), +    Unsupported, +    MissingFrom, +} + +#[derive(Debug)] +pub enum IqError { +    NoMatchingId(String), +} + +#[derive(Debug)] +pub enum RecvMessageError { +    MissingFrom,  }  #[derive(Debug)] @@ -40,7 +63,7 @@ pub struct StatusError(pub Reason);  impl From<StatusError> for Error {      fn from(e: StatusError) -> Self { -        Error::Presence(e.0) +        Error::SetStatus(e.0)      }  } diff --git a/luz/src/lib.rs b/luz/src/lib.rs index cffffb2..901553b 100644 --- a/luz/src/lib.rs +++ b/luz/src/lib.rs @@ -20,7 +20,7 @@ use tokio::{      sync::{mpsc, oneshot, Mutex},      task::JoinSet,  }; -use tracing::{info, Instrument}; +use tracing::{debug, info, Instrument};  use user::User;  use uuid::Uuid; @@ -28,7 +28,7 @@ use crate::connection::write::WriteHandle;  use crate::connection::{SupervisorCommand, SupervisorHandle};  use crate::error::Error; -mod chat; +pub mod chat;  mod connection;  mod db;  mod error; @@ -103,12 +103,19 @@ impl Luz {                                  .await;                          }                          None => { -                            let mut jid = self.jid.lock().await; -                            let mut domain = jid.domainpart.clone(); -                            // TODO: check what happens upon reconnection with same resource (this is probably what one wants to do and why jid should be mutated from a bare jid to one with a resource) -                            let streams_result = -                                jabber::connect_and_login(&mut jid, &*self.password, &mut domain) -                                    .await; +                            let streams_result; +                            { +                                let mut jid = self.jid.lock().await; +                                let mut domain = jid.domainpart.clone(); +                                // TODO: check what happens upon reconnection with same resource (this is probably what one wants to do and why jid should be mutated from a bare jid to one with a resource) +                                streams_result = jabber::connect_and_login( +                                    &mut jid, +                                    &*self.password, +                                    &mut domain, +                                ) +                                .await; +                                debug!("connected and logged in as {}", jid); +                            }                              match streams_result {                                  Ok(s) => {                                      let (shutdown_send, shutdown_recv) = oneshot::channel::<()>(); @@ -124,6 +131,7 @@ impl Luz {                                      self.connection_supervisor_shutdown = shutdown_recv;                                      // TODO: get roster and send initial presence                                      let (send, recv) = oneshot::channel(); +                                    debug!("getting roster");                                      CommandMessage::GetRoster(send)                                          .handle_online(                                              writer.clone(), @@ -134,7 +142,9 @@ impl Luz {                                              self.pending_iqs.clone(),                                          )                                          .await; +                                    debug!("sent roster req");                                      let roster = recv.await; +                                    debug!("got roster");                                      match roster {                                          Ok(r) => {                                              match r { @@ -371,9 +381,11 @@ impl CommandMessage {              CommandMessage::GetRoster(result_sender) => {                  // TODO: jid resource should probably be stored within the connection                  let owned_jid: JID; +                debug!("before client_jid lock");                  {                      owned_jid = client_jid.lock().await.clone();                  } +                debug!("after client_jid lock");                  let iq_id = Uuid::new_v4().to_string();                  let (send, iq_recv) = oneshot::channel();                  { @@ -1062,6 +1074,7 @@ pub enum UpdateMessage {      FullRoster(Vec<Contact>),      /// (only update app roster state, don't replace)      RosterUpdate(Contact), +    RosterDelete(JID),      /// presences should be stored with users in the ui, not contacts, as presences can be received from anyone      Presence {          from: JID, @@ -1073,4 +1086,5 @@ pub enum UpdateMessage {          to: JID,          message: Message,      }, +    SubscriptionRequest(jid::JID),  } diff --git a/luz/src/main.rs b/luz/src/main.rs index 5e9cd13..9779351 100644 --- a/luz/src/main.rs +++ b/luz/src/main.rs @@ -1,8 +1,13 @@ -use std::time::Duration; +use std::{str::FromStr, time::Duration}; +use jid::JID;  use luz::{CommandMessage, LuzHandle};  use sqlx::SqlitePool; -use tokio::io::{AsyncReadExt, AsyncWriteExt}; +use tokio::{ +    io::{AsyncReadExt, AsyncWriteExt}, +    sync::oneshot, +}; +use tracing::info;  #[tokio::main]  async fn main() { @@ -13,10 +18,23 @@ async fn main() {      tokio::spawn(async move {          while let Some(msg) = recv.recv().await { -            println!("{:#?}", msg) +            info!("{:#?}", msg)          }      });      luz.send(CommandMessage::Connect).await.unwrap(); -    tokio::time::sleep(Duration::from_secs(15)).await; +    let (send, recv) = oneshot::channel(); +    tokio::time::sleep(Duration::from_secs(5)).await; +    info!("sending message"); +    luz.send(CommandMessage::SendMessage( +        JID::from_str("cel@blos.sm").unwrap(), +        luz::chat::Body { +            body: "hallo!!!".to_string(), +        }, +        send, +    )) +    .await +    .unwrap(); +    recv.await.unwrap().unwrap(); +    println!("sent message");  } diff --git a/luz/src/presence.rs b/luz/src/presence.rs index 563121b..1df20a7 100644 --- a/luz/src/presence.rs +++ b/luz/src/presence.rs @@ -6,7 +6,7 @@ pub struct Online {      pub show: Option<Show>,      pub status: Option<String>,      #[sqlx(skip)] -    priority: Option<i8>, +    pub priority: Option<i8>,  }  #[derive(Debug, Clone, Copy)] @@ -55,7 +55,7 @@ impl sqlx::Encode<'_, Sqlite> for Show {  #[derive(Debug, Default)]  pub struct Offline { -    status: Option<String>, +    pub status: Option<String>,  }  #[derive(Debug)] diff --git a/luz/src/roster.rs b/luz/src/roster.rs index e3db00f..0e43a8a 100644 --- a/luz/src/roster.rs +++ b/luz/src/roster.rs @@ -27,6 +27,7 @@ pub enum Subscription {      None,      PendingOut,      PendingIn, +    PendingInPendingOut,      OnlyOut,      OnlyIn,      OutPendingIn, @@ -51,6 +52,7 @@ impl sqlx::Decode<'_, Sqlite> for Subscription {              "none" => Ok(Self::None),              "pending-out" => Ok(Self::PendingOut),              "pending-in" => Ok(Self::PendingIn), +            "pending-in-pending-out" => Ok(Self::PendingInPendingOut),              "only-out" => Ok(Self::OnlyOut),              "only-in" => Ok(Self::OnlyIn),              "out-pending-in" => Ok(Self::OutPendingIn), @@ -70,6 +72,7 @@ impl sqlx::Encode<'_, Sqlite> for Subscription {              Subscription::None => "none",              Subscription::PendingOut => "pending-out",              Subscription::PendingIn => "pending-in", +            Subscription::PendingInPendingOut => "pending-in-pending-out",              Subscription::OnlyOut => "only-out",              Subscription::OnlyIn => "only-in",              Subscription::OutPendingIn => "out-pending-in", | 
