aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorLibravatar cel 🌸 <cel@bunny.garden>2025-02-08 02:15:44 +0000
committerLibravatar cel 🌸 <cel@bunny.garden>2025-02-08 02:15:44 +0000
commita1d96233e816e2b8378a629c6cc9e34028f2435b (patch)
tree5c115c883dec61221af1d692705446744a623fed
parentb023c6b5f214759a53b5c118b00305925294ee7d (diff)
downloadluz-a1d96233e816e2b8378a629c6cc9e34028f2435b.tar.gz
luz-a1d96233e816e2b8378a629c6cc9e34028f2435b.tar.bz2
luz-a1d96233e816e2b8378a629c6cc9e34028f2435b.zip
WIP: luz actor-based client
-rw-r--r--luz/Cargo.toml1
-rw-r--r--luz/src/lib.rs700
2 files changed, 472 insertions, 229 deletions
diff --git a/luz/Cargo.toml b/luz/Cargo.toml
index a842142..6b2066b 100644
--- a/luz/Cargo.toml
+++ b/luz/Cargo.toml
@@ -6,6 +6,7 @@ edition = "2021"
[dependencies]
futures = "0.3.31"
jabber = { version = "0.1.0", path = "../jabber" }
+peanuts = { version = "0.1.0", path = "../../peanuts" }
jid = { version = "0.1.0", path = "../jid" }
sqlx = { version = "0.8.3", features = [ "sqlite", "runtime-tokio" ] }
stanza = { version = "0.1.0", path = "../stanza" }
diff --git a/luz/src/lib.rs b/luz/src/lib.rs
index 79d9352..6372fe0 100644
--- a/luz/src/lib.rs
+++ b/luz/src/lib.rs
@@ -2,7 +2,7 @@ use std::{
collections::{HashMap, HashSet, VecDeque},
fmt::Pointer,
pin::pin,
- sync::Arc,
+ sync::{atomic::AtomicBool, Arc},
task::{ready, Poll},
};
@@ -28,239 +28,528 @@ use tokio::{
select,
sync::{
mpsc::{self, Receiver, Sender},
- Mutex,
+ oneshot, Mutex,
},
task::{JoinHandle, JoinSet},
};
use tokio_stream::wrappers::ReceiverStream;
use tokio_util::sync::{PollSendError, PollSender};
-pub struct Luz {
- receiver: ReceiverStream<UpdateMessage>,
- sender: PollSender<CommandMessage>,
- // TODO: oneshot
- pending_iqs: Arc<Mutex<HashMap<String, mpsc::Sender<Iq>>>>,
- db: SqlitePool,
- tasks: JoinSet<()>,
- jid: JID,
+// actor that receives jabber stanzas to write, and if there is an error, sends a message back to the supervisor then aborts, so the supervisor can spawn a new stream.
+pub struct JabberWriter {
+ stanza_receiver: mpsc::Receiver<JabberWrite>,
+ control_receiver: mpsc::Receiver<JabberWriterControl>,
+ stream: BoundJabberWriter<Tls>,
+ on_crash: oneshot::Sender<(JabberWrite, mpsc::Receiver<JabberWrite>)>,
}
-impl Luz {
- pub async fn new() {}
+struct JabberWrite {
+ stanza: Stanza,
+ respond_to: oneshot::Sender<Result<(), Error>>,
+}
- pub async fn supervisor(
- mut read_sender: Sender<UpdateMessage>,
- mut write_receiver: Receiver<CommandMessage>,
- pending_iqs: Arc<Mutex<HashMap<String, mpsc::Sender<Iq>>>>,
- db: SqlitePool,
- jid: &JID,
- ) {
- let connection: Arc<
- Mutex<Option<(JoinHandle<()>, (JoinHandle<()>, Sender<CommandMessage>))>>,
- > = Arc::new(Mutex::new(None));
- let mut jid = jid.clone();
- let mut server = jid.domainpart.clone();
- let tasks: Arc<Mutex<JoinSet<()>>> = Arc::new(Mutex::new(JoinSet::new()));
- for command in write_receiver.recv().await {
- match command {
- CommandMessage::Connect(_) => todo!(),
- CommandMessage::Disconnect => todo!(),
- CommandMessage::Kill => todo!(),
- CommandMessage::GetRoster => todo!(),
- CommandMessage::SendMessage(jid, _) => todo!(),
- }
- tasks.lock().await.spawn(handle_command(connection.clone()));
- }
- }
+enum JabberWriterControl {
+ Disconnect,
+ Abort(oneshot::Sender<mpsc::Receiver<JabberWrite>>),
}
-pub async fn handle_command(
- connection: Arc<Mutex<Option<(JoinHandle<()>, (JoinHandle<()>, Sender<CommandMessage>))>>>,
-) {
- match command {
- CommandMessage::Connect(password) => {
- let streams = jabber::connect_and_login(&mut jid, password, &mut server)
- .await
- .unwrap();
- let (read, write) = streams.split();
- let (sender, receiver) = mpsc::channel(20);
- let reads = tokio::spawn(jabber_reads(read, read_sender.clone(), db.clone()));
- let writes = tokio::spawn(jabber_writes(write, receiver, db.clone()));
- // properly handle previous connection
- *connection.lock().await = Some((reads, (writes, sender)));
+impl JabberWriter {
+ fn new(
+ stanza_receiver: mpsc::Receiver<JabberWrite>,
+ control_receiver: mpsc::Receiver<JabberWriterControl>,
+ stream: BoundJabberWriter<Tls>,
+ supervisor: oneshot::Sender<(JabberWrite, mpsc::Receiver<JabberWrite>)>,
+ ) -> Self {
+ Self {
+ stanza_receiver,
+ control_receiver,
+ stream,
+ on_crash: supervisor,
}
- CommandMessage::Disconnect => {
- if let Some((reads, (writes, write_channel))) = connection.lock().await.take() {
- write_channel.send(CommandMessage::Disconnect).await;
- writes.await;
- reads.await;
- jid = jid.as_bare();
+ }
+
+ async fn write(&mut self, stanza: &Stanza) -> Result<(), peanuts::Error> {
+ Ok(self.stream.write(stanza).await?)
+ }
+
+ async fn run(mut self) {
+ loop {
+ tokio::select! {
+ Some(msg) = self.control_receiver.recv() => {
+ match msg {
+ JabberWriterControl::Disconnect => {
+ // TODO: close the stanza_receiver channel and drain out all of the remaining stanzas to send
+ self.stanza_receiver.close();
+ // TODO: put this in some kind of function to avoid code duplication
+ for msg in self.stanza_receiver.recv().await {
+ let result = self.stream.write(&msg.stanza).await;
+ match result {
+ Err(e) => match &e {
+ peanuts::Error::ReadError(error) => {
+ // make sure message is not lost from error, supervisor handles retry and reporting
+ self.on_crash.send((msg, self.stanza_receiver));
+ break;
+ }
+ _ => {
+ msg.respond_to.send(Err(e.into()));
+ }
+ },
+ _ => {
+ msg.respond_to.send(Ok(()));
+ }
+ }
+ }
+ self.stream.try_close().await;
+ break;
+ },
+ JabberWriterControl::Abort(sender) => {
+ sender.send(self.stanza_receiver);
+ break;
+ },
+ }
+ },
+ Some(msg) = self.stanza_receiver.recv() => {
+ let result = self.stream.write(&msg.stanza).await;
+ match result {
+ Err(e) => match &e {
+ peanuts::Error::ReadError(error) => {
+ // make sure message is not lost from error, supervisor handles retry and reporting
+ self.on_crash.send((msg, self.stanza_receiver));
+ break;
+ }
+ _ => {
+ msg.respond_to.send(Err(e.into()));
+ }
+ },
+ _ => {
+ msg.respond_to.send(Ok(()));
+ }
+ }
+ },
+ // TODO: check if this is ok to do
+ else => break,
}
}
- CommandMessage::GetRoster => match *connection.lock().await {
- Some(_) => todo!(),
- None => todo!(),
- },
- CommandMessage::SendMessage(jid, _) => todo!(),
}
}
-pub async fn jabber_writes(
- mut jabber_writer: BoundJabberWriter<Tls>,
- mut write_receiver: Receiver<CommandMessage>,
- /*error_sender: Sender<UpdateMessage>,*/ db: SqlitePool,
-) {
+#[derive(Clone)]
+pub struct JabberWriteHandle {
+ sender: mpsc::Sender<JabberWrite>,
}
-pub async fn jabber_reads<S>(
- mut read: BoundJabberReader<S>,
- read_sender: Sender<UpdateMessage>,
- db: SqlitePool,
-) where
- S: AsyncRead + Unpin + Sync,
-{
- while let Ok(stanza) = read.read::<Stanza>().await {
- let upd = match stanza {
- Stanza::Message(message) => todo!(),
- Stanza::Presence(presence) => todo!(),
- Stanza::Iq(iq) => match iq.r#type {
- IqType::Error => todo!(),
- IqType::Get => todo!(),
- IqType::Result => match iq.query {
- Some(q) => match q {
- Query::Bind(bind) => todo!(),
- Query::Ping(ping) => todo!(),
- Query::Roster(query) => UpdateMessage::Roster(query.items),
- Query::Unsupported => todo!(),
- },
- None => todo!(),
- },
- IqType::Set => todo!(),
+pub struct JabberWriterControlHandle {
+ sender: mpsc::Sender<JabberWriterControl>,
+ handle: JoinHandle<()>,
+}
+
+impl JabberWriterControlHandle {
+ pub fn new(
+ stream: BoundJabberWriter<Tls>,
+ supervisor: oneshot::Sender<(JabberWrite, mpsc::Receiver<JabberWrite>)>,
+ ) -> (JabberWriteHandle, JabberWriterControlHandle) {
+ let (control_sender, control_receiver) = mpsc::channel(20);
+ let (stanza_sender, stanza_receiver) = mpsc::channel(20);
+
+ let actor = JabberWriter::new(stanza_receiver, control_receiver, stream, supervisor);
+ let handle = tokio::spawn(async move { actor.run().await });
+
+ (
+ JabberWriteHandle {
+ sender: stanza_sender,
+ },
+ Self {
+ sender: control_sender,
+ handle,
},
- Stanza::Error(error) => todo!(),
- Stanza::OtherContent(content) => todo!(),
- };
- read_sender.send(upd).await.expect("read_sender");
+ )
}
- todo!()
-}
-
-pub enum JabberWriteCommand {}
-
-pub async fn funnel(
- mut jabber_reads: Receiver<UpdateMessage>,
- mut db_reads: Receiver<UpdateMessage>,
- out: Sender<UpdateMessage>,
-) {
- select! {
- j = jabber_reads.recv() => {
- out.send(j.unwrap()).await;
- },
- d = db_reads.recv() => {
- out.send(d.unwrap()).await;
+
+ pub fn reconnect(
+ stream: BoundJabberWriter<Tls>,
+ supervisor: oneshot::Sender<(JabberWrite, mpsc::Receiver<JabberWrite>)>,
+ stanza_receiver: mpsc::Receiver<JabberWrite>,
+ ) -> Self {
+ let (control_sender, control_receiver) = mpsc::channel(20);
+
+ let actor = JabberWriter::new(stanza_receiver, control_receiver, stream, supervisor);
+ let handle = tokio::spawn(async move { actor.run().await });
+
+ Self {
+ sender: control_sender,
+ handle,
}
}
}
-pub async fn write_thread(mut write: BoundJabberWriter<S>, write_recv: Receiver<CommandMessage>) {
- while let Some(cmd) = write_recv.recv().await {
- match cmd {
- CommandMessage::GetRoster => todo!(),
- CommandMessage::SendMessage(jid, _) => todo!(),
- CommandMessage::Connect => continue,
- CommandMessage::Disconnect => write_thread_disconnected(write_recv),
- }
- }
- todo!()
+pub struct JabberReader {
+ // TODO: place iq hashmap here
+ control_receiver: mpsc::Receiver<JabberReaderControl>,
+ stream: BoundJabberReader<Tls>,
+ on_crash: oneshot::Sender<(SqlitePool, mpsc::Sender<UpdateMessage>, JoinSet<()>)>,
+ db: SqlitePool,
+ sender: mpsc::Sender<UpdateMessage>,
+ supervisor_control: mpsc::Sender<JabberSupervisorCommand>,
+ write_sender: mpsc::Sender<JabberWrite>,
+ tasks: JoinSet<()>,
}
-impl Client {
- pub fn send_command(self: Arc<Self>, cmd: CommandMessage) {
- match cmd {
- CommandMessage::Connect => todo!(),
- CommandMessage::Disconnect => todo!(),
- CommandMessage::GetRoster => self.get_roster(),
- CommandMessage::SendMessage(jid, _) => todo!(),
+impl JabberReader {
+ fn new(
+ control_receiver: mpsc::Receiver<JabberReaderControl>,
+ stream: BoundJabberReader<Tls>,
+ on_crash: oneshot::Sender<(SqlitePool, mpsc::Sender<UpdateMessage>, JoinSet<()>)>,
+ db: SqlitePool,
+ sender: mpsc::Sender<UpdateMessage>,
+ // jabber server must be able to both terminate the connection from error, and ask for data from the client (such as supported XEPs)
+ supervisor_control: mpsc::Sender<JabberSupervisorCommand>,
+ write_sender: mpsc::Sender<JabberWrite>,
+ ) -> Self {
+ Self {
+ control_receiver,
+ stream,
+ on_crash,
+ db,
+ sender,
+ supervisor_control,
+ write_sender,
+ tasks: JoinSet::new(),
}
}
- async fn begin_query_roster(self: Arc<Self>) -> Result<()> {
- if self.connected {
- self.db.start_get_roster().await;
- self.write_chan.send(CommandMessage::GetRoster).await;
- } else {
- let roster = self.db.get_roster();
- self.db_chan.send(roster).await;
+
+ async fn run(mut self) {
+ loop {
+ tokio::select! {
+ Some(msg) = self.control_receiver.recv() => {
+ match msg {
+ JabberReaderControl::Disconnect => todo!(),
+ JabberReaderControl::Abort(sender) => todo!(),
+ };
+ },
+ stanza = self.stream.read::<Stanza>() => {
+ match stanza {
+ Ok(_) => todo!(),
+ Err(_) => todo!(),
+ }
+ self.tasks.spawn();
+ },
+ else => break
+ }
}
}
}
-pub struct ReadStreamThread {
- // held by both read thread and write thread
- pending_iqs: Arc<Mutex<HashMap<String, mpsc::Sender<Iq>>>>,
- tasks: JoinSet<()>,
+trait Task {
+ async fn handle();
+}
+
+impl Task for Stanza {
+ async fn handle() {
+ todo!()
+ }
}
-pub struct WriteStreamThread {
- pending_iqs: Arc<Mutex<HashMap<String, mpsc::Sender<Iq>>>>,
+enum JabberReaderControl {
+ Disconnect,
+ Abort(oneshot::Sender<mpsc::Receiver<JabberWrite>>),
+}
+
+struct JabberReaderControlHandle {
+ sender: mpsc::Sender<JabberReaderControl>,
+ handle: JoinHandle<()>,
+}
+
+impl JabberReaderControlHandle {
+ pub fn new(
+ stream: BoundJabberReader<Tls>,
+ on_crash: oneshot::Sender<(SqlitePool, mpsc::Sender<UpdateMessage>, JoinSet<()>)>,
+ db: SqlitePool,
+ sender: mpsc::Sender<UpdateMessage>,
+ supervisor_control: mpsc::Sender<JabberSupervisorCommand>,
+ jabber_write: mpsc::Sender<JabberWrite>,
+ ) -> Self {
+ let (control_sender, control_receiver) = mpsc::channel(20);
+
+ let actor = JabberReader::new(
+ control_receiver,
+ stream,
+ on_crash,
+ db,
+ sender,
+ supervisor_control,
+ jabber_write,
+ );
+ let handle = tokio::spawn(async move { actor.run().await });
+
+ Self {
+ sender: control_sender,
+ handle,
+ }
+ }
+}
+
+pub struct Luz {
+ receiver: mpsc::Receiver<CommandMessage>,
+ jid: Arc<Mutex<JID>>,
+ // TODO: use a dyn passwordprovider trait to avoid storing password in memory
+ password: String,
+ connected: Arc<Mutex<Option<(JabberWriteHandle, JabberSupervisorHandle)>>>,
+ db: SqlitePool,
+ sender: mpsc::Sender<UpdateMessage>,
tasks: JoinSet<()>,
}
impl Luz {
- // pub async fn new(jid: &str, dburl: &str) -> Result<Self, Error> {
- // let client = Client::new(jid, dburl).await?;
- // let luz = Arc::new(Mutex::new(client));
- // client
- // .tasks
- // .spawn(Client::read_thread(luz.clone(), read_sender));
- // client.tasks.spawn(Client::write_thread(write_receiver));
- // let luz = luz.into_inner();
- // Ok(Luz(luz))
- // }
-}
-
-// instead of having a channel to send stanzas down, have a channel to send channels that you can send stanzas down, so if one channel fails (through disconnection, etc), you can move on to the next. could also have an enum to let them sit until reconnection.
-impl Client {
- async fn new(jid: &str, dburl: &str) -> Result<Self, Error> {
- let (read_sender, read_receiver) = mpsc::channel::<UpdateMessage>(20);
- let (write_sender, write_receiver) = mpsc::channel::<CommandMessage>(20);
- let jid: JID = jid.try_into()?;
- let server = jid.domainpart.clone();
- let db = SqlitePool::connect(dburl).await?;
- // let bound_streams = jabber::connect_and_login(&mut jid, password, &mut server).await?;
- // let (read, write) = bound_streams.split();
- // let streams = SplitStreams { read: Mutex::new(read), write: Mutex::new(write) };
- let mut tasks = JoinSet::new();
- let client = Self {
- receiver: ReceiverStream::new(read_receiver),
- sender: PollSender::new(write_sender),
- pending_iqs: HashMap::new(),
- db,
+ fn new(
+ receiver: mpsc::Receiver<CommandMessage>,
+ jid: Arc<Mutex<JID>>,
+ password: String,
+ connected: Arc<Mutex<Option<(JabberWriteHandle, JabberSupervisorHandle)>>>,
+ db: SqlitePool,
+ sender: mpsc::Sender<UpdateMessage>,
+ ) -> Self {
+ Self {
jid,
- server,
- // TODO: check for drop abort handles
- tasks,
- };
- // store handles in client
- // send connection streams down channel created here
- Ok(client)
+ password,
+ connected,
+ db,
+ receiver,
+ sender,
+ tasks: JoinSet::new(),
+ }
}
- pub async fn read_thread(client: Arc<Mutex<Self>>, sender: mpsc::Sender<UpdateMessage>) {}
+ async fn run(mut self) {
+ while let Some(msg) = self.receiver.recv().await {
+ // TODO: consider separating disconnect/connect and commands apart from commandmessage
+ match msg {
+ CommandMessage::Connect => {
+ match self.connected.lock().await.as_ref() {
+ Some(_) => {
+ self.sender
+ .send(UpdateMessage::Error(Error::AlreadyConnected))
+ .await;
+ }
+ None => {
+ let mut jid = self.jid.lock().await;
+ let mut domain = jid.domainpart.clone();
+ let streams_result =
+ jabber::connect_and_login(&mut jid, &self.password, &mut domain)
+ .await;
+ match streams_result {
+ Ok(s) => {
+ let (writer, supervisor) =
+ JabberSupervisorHandle::new(s, self.sender.clone());
+ *self.connected.lock().await = Some((writer, supervisor));
+ }
+ Err(e) => {
+ self.sender.send(UpdateMessage::Error(e.into()));
+ }
+ }
+ }
+ };
+ }
+ CommandMessage::Disconnect => match self.connected.lock().await.as_mut() {
+ None => {
+ self.sender
+ .send(UpdateMessage::Error(Error::AlreadyDisonnected))
+ .await;
+ }
+ mut c => {
+ if let Some((_write_handle, supervisor_handle)) = c.take() {
+ let _ = supervisor_handle
+ .sender
+ .send(JabberSupervisorCommand::Disconnect)
+ .await;
+ } else {
+ unreachable!()
+ };
+ }
+ },
+ _ => {
+ match self.connected.lock().await.as_ref() {
+ Some((w, _)) => self.tasks.spawn(msg.handle_online(
+ w.clone(),
+ self.jid.clone(),
+ self.db.clone(),
+ self.sender.clone(),
+ )),
+ None => self.tasks.spawn(msg.handle_offline(
+ self.jid.clone(),
+ self.db.clone(),
+ self.sender.clone(),
+ )),
+ };
+ }
+ }
+ }
+ }
+}
- pub async fn write_thread(receiver: mpsc::Receiver<CommandMessage>) {
- for message in receiver.recv().await {}
+impl CommandMessage {
+ pub async fn handle_offline(
+ mut self,
+ jid: Arc<Mutex<JID>>,
+ db: SqlitePool,
+ sender: mpsc::Sender<UpdateMessage>,
+ ) {
+ todo!()
}
- pub async fn connect(&mut self, password: &str) -> Result<(), Error> {
+ pub async fn handle_online(
+ mut self,
+ jabber_write_handle: JabberWriteHandle,
+ // TODO: jid could lose resource by the end
+ jid: Arc<Mutex<JID>>,
+ db: SqlitePool,
+ sender: mpsc::Sender<UpdateMessage>,
+ ) {
todo!()
}
}
+// TODO: separate sender and receiver, store handle to Luz process to ensure dropping
+pub struct LuzHandle {
+ sender: mpsc::Sender<CommandMessage>,
+ receiver: mpsc::Receiver<UpdateMessage>,
+}
+
+impl LuzHandle {
+ pub fn new(jid: JID, password: String, db: SqlitePool) -> Self {
+ let (command_sender, command_receiver) = mpsc::channel(20);
+ let (update_sender, update_receiver) = mpsc::channel(20);
+
+ let actor = Luz::new(
+ command_receiver,
+ Arc::new(Mutex::new(jid)),
+ password,
+ Arc::new(Mutex::new(None)),
+ db,
+ update_sender,
+ );
+ tokio::spawn(async move { actor.run().await });
+
+ Self {
+ sender: command_sender,
+ receiver: update_receiver,
+ }
+ }
+}
+
+pub struct JabberSupervisor {
+ connection_commands: mpsc::Receiver<JabberSupervisorCommand>,
+ writer_crash: oneshot::Receiver<(JabberWrite, mpsc::Receiver<JabberWrite>)>,
+ reader_crash: oneshot::Receiver<(SqlitePool, mpsc::Sender<UpdateMessage>, JoinSet<()>)>,
+ sender: mpsc::Sender<UpdateMessage>,
+ writer_handle: JabberWriterControlHandle,
+ reader_handle: JabberReaderControlHandle,
+}
+
+pub enum JabberSupervisorCommand {
+ Disconnect,
+}
+
+impl JabberSupervisor {
+ fn new(
+ connection_commands: mpsc::Receiver<JabberSupervisorCommand>,
+ writer_crash: oneshot::Receiver<(JabberWrite, mpsc::Receiver<JabberWrite>)>,
+ reader_crash: oneshot::Receiver<(SqlitePool, mpsc::Sender<UpdateMessage>, JoinSet<()>)>,
+ sender: mpsc::Sender<UpdateMessage>,
+ writer_handle: JabberWriterControlHandle,
+ reader_handle: JabberReaderControlHandle,
+ ) -> Self {
+ Self {
+ connection_commands,
+ writer_crash,
+ sender,
+ writer_handle,
+ reader_handle,
+ reader_crash,
+ }
+ }
+
+ async fn handle_command_message(&mut self, msg: JabberSupervisorCommand) {}
+
+ async fn run(mut self) {
+ loop {
+ tokio::select! {
+ Some(msg) = self.connection_commands.recv() => {
+ self.handle_command_message(msg).await;
+ },
+ error = self.writer_crash => {
+
+ },
+ error = self.reader_crash => {
+
+ },
+ }
+ }
+ }
+}
+
+pub struct JabberSupervisorHandle {
+ sender: mpsc::Sender<JabberSupervisorCommand>,
+ handle: JoinHandle<()>,
+}
+
+impl JabberSupervisorHandle {
+ pub fn new(
+ streams: BoundJabberStream<Tls>,
+ sender: mpsc::Sender<UpdateMessage>,
+ db: SqlitePool,
+ update_sender: mpsc::Sender<UpdateMessage>,
+ ) -> (JabberWriteHandle, Self) {
+ let (command_sender, command_receiver) = mpsc::channel(20);
+ let (writer_error_sender, writer_error_receiver) = oneshot::channel();
+ let (reader_crash_sender, reader_crash_receiver) = oneshot::channel();
+
+ let (reader, writer) = streams.split();
+ let (jabber_write_handle, jabber_writer_control_handle) =
+ JabberWriterControlHandle::new(writer, writer_error_sender);
+ let jabber_reader_control_handle = JabberReaderControlHandle::new(
+ reader,
+ reader_crash_sender,
+ db,
+ update_sender,
+ command_sender.clone(),
+ jabber_write_handle.sender.clone(),
+ );
+
+ let actor = JabberSupervisor::new(
+ command_receiver,
+ writer_error_receiver,
+ reader_crash_receiver,
+ sender,
+ jabber_writer_control_handle,
+ jabber_reader_control_handle,
+ );
+
+ let handle = tokio::spawn(async move { actor.run().await });
+
+ (
+ jabber_write_handle,
+ Self {
+ sender: command_sender,
+ handle,
+ },
+ )
+ }
+}
+
pub enum Error {
+ AlreadyConnected,
PollSend(PollSendError<CommandMessage>),
Jabber(jabber::Error),
+ XML(peanuts::Error),
SQL(sqlx::Error),
JID(jid::ParseError),
+ AlreadyDisonnected,
+}
+
+impl From<peanuts::Error> for Error {
+ fn from(e: peanuts::Error) -> Self {
+ Self::XML(e)
+ }
}
impl From<jid::ParseError> for Error {
@@ -281,17 +570,6 @@ impl From<jabber::Error> for Error {
}
}
-impl Stream for Client {
- type Item = UpdateMessage;
-
- fn poll_next(
- self: std::pin::Pin<&mut Self>,
- cx: &mut std::task::Context<'_>,
- ) -> std::task::Poll<Option<Self::Item>> {
- pin!(self).receiver.poll_next_unpin(cx)
- }
-}
-
impl From<PollSendError<CommandMessage>> for Error {
fn from(e: PollSendError<CommandMessage>) -> Self {
Self::PollSend(e)
@@ -299,7 +577,7 @@ impl From<PollSendError<CommandMessage>> for Error {
}
pub enum CommandMessage {
- Connect(String),
+ Connect,
Disconnect,
/// gets the roster. if offline, retreives cached version from database. should be stored in application memory.
GetRoster,
@@ -307,42 +585,6 @@ pub enum CommandMessage {
}
pub enum UpdateMessage {
+ Error(Error),
Roster(Vec<roster::Item>),
}
-
-impl Client {
- pub async fn process_stanza(
- &mut self,
- stanza: Result<Stanza, jabber::Error>,
- sender: mpsc::Sender<UpdateMessage>,
- ) {
- match stanza {
- Ok(stanza) => todo!(),
- Err(e) => self.process_error(e),
- }
- }
-
- pub async fn iq(
- &mut self,
- to: Option<JID>,
- r#type: IqType,
- query: Option<Query>,
- ) -> Result<IqResponse, Error> {
- self.client
- .send(Stanza::Iq(Iq {
- from: Some(self.client.jid()),
- // TODO: generate id
- id: "test".to_string(),
- to,
- r#type,
- // TODO: lang
- lang: None,
- query,
- errors: Vec::new(),
- }))
- .await?;
- Ok(todo!())
- }
-
- pub async fn iq_process(&mut self, iq: Iq) {}
-}