From 51c635626150330ab1a8a63093a03335ba884d56 Mon Sep 17 00:00:00 2001 From: modeco80 Date: Thu, 9 May 2024 00:55:14 -0400 Subject: [PATCH] qmp: split actor into new `actor` module logic seperation, bla bla. --- src/qmp/actor.rs | 226 ++++++++++++++++++++++++++++++++++++++++++++ src/qmp/client.rs | 236 ++-------------------------------------------- src/qmp/mod.rs | 1 + 3 files changed, 234 insertions(+), 229 deletions(-) create mode 100644 src/qmp/actor.rs diff --git a/src/qmp/actor.rs b/src/qmp/actor.rs new file mode 100644 index 0000000..f023372 --- /dev/null +++ b/src/qmp/actor.rs @@ -0,0 +1,226 @@ +use std::collections::HashMap; + +// TODO +use tokio::{ + io::{AsyncReadExt, AsyncWriteExt, BufStream}, + sync::{mpsc, oneshot}, +}; + +use super::{ + io, result, + types::{self, QmpExecute, QmpHandshake}, +}; + +enum QmpActorState { + /// Disconnected. + Disconnected, + + ConnectedToServer, + + /// Handshake passed successfully and we're ready. + Ready, +} + +/// Actor message type. +pub(crate) enum QmpActorMessage { + /// Close + QmpDoClose, + + QmpDoHandshake { + /// The oneshot response. + tx: oneshot::Sender>, + }, + + /// Used to send a QMP message. + QmpDoSend { + /// The message to send + command: String, + arguments: Option, + + /// The oneshot response. + tx: oneshot::Sender, + }, + + /// Used to subscibe to a QMP event + QmpSubscribeToEvent { + event_name: String, + tx: mpsc::Sender, + }, +} + +pub(crate) struct QmpClientActor +where + S: AsyncReadExt + AsyncWriteExt + Unpin + Send, +{ + stream: BufStream, + state: QmpActorState, + + rx: mpsc::Receiver, + + // used to spit out events to subscribed clients + event_tx: HashMap>, + + /// Used for QMP execute messages. This allows us to have multiple commands + /// in flight, and to manually send to all of them. Pretty nifty. + last_response_id: u32, + + /// All in flight responses. + response_tx: HashMap>>, +} + +impl QmpClientActor +where + S: AsyncReadExt + AsyncWriteExt + Unpin + Send, +{ + fn new(stream: S, rx: mpsc::Receiver) -> Self { + Self { + stream: BufStream::new(stream), + state: QmpActorState::ConnectedToServer, + rx: rx, + event_tx: HashMap::new(), + last_response_id: 0, + response_tx: HashMap::new(), + } + } + + async fn handshake(&mut self) -> result::Result { + let handshake_as_string = io::read_qmp_line(&mut self.stream).await?; + let handshake_json = + serde_json::from_str::(handshake_as_string.as_str())?; + + // Now let's send the handshake. If this errors we're done anyways + let handshake_response = serde_json::json!({ + "execute": "qmp_capabilities" + }); + let serialized = serde_json::ser::to_string(&handshake_response)?; + + io::write_qmp_line(&mut self.stream, &serialized).await?; + + // Read what the server responded back + // FIXME: Make sure it's not an error. + let _handshake_server_response_as_string = io::read_qmp_line(&mut self.stream).await?; + // let handshake_server_response_json = + // serde_json::from_str::(handshake_server_response_as_string.as_str())?; + + self.state = QmpActorState::Ready; + + Ok(handshake_json) + } + + async fn handle_message(&mut self, msg: QmpActorMessage) -> result::Result<()> { + match msg { + QmpActorMessage::QmpDoClose => { + // FIXME: Actually close things. + } + + QmpActorMessage::QmpDoHandshake { tx } => { + let handshake = self.handshake().await; + let _ = tx.send(handshake); + } + + // this can only really fail in a way where it panics i'm pretty sure + QmpActorMessage::QmpSubscribeToEvent { event_name, tx } => { + self.event_tx.insert(event_name.to_uppercase(), tx); + }, + + QmpActorMessage::QmpDoSend { command, arguments, tx } => { + self.response_tx.insert(self.last_response_id, Some(tx)); + self.send_execute(command, &arguments).await?; + self.last_response_id += 1; + } + + _ => tracing::error!("Unknown message"), + } + + Ok(()) + } + + async fn handle_qmp_read(&mut self, line: &String) -> result::Result<()> { + let res = serde_json::from_str::(line.as_str())?; + + + + + // Check if this is an event + if let Some(event) = res.get("event") { + let str = String::from(event.as_str().unwrap()); + + // Send the event to the subscriber + if let Some(tx) = self.event_tx.get(&str) { + let _ = tx.send(res.clone()).await; + } + } + + // Check for a execute resposne + if let Some(id) = res.get("id") { + let id = id.as_u64().unwrap() as u32; + + // Send the response + if let Some(txopt) = self.response_tx.get_mut(&id) { + let tx = txopt.take().unwrap(); + let _ = tx.send(res.clone()); + self.response_tx.remove(&id); + } + } + + + Ok(()) + } + + async fn send_execute(&mut self, command: String, arguments: &Option) -> result::Result<()> { + // Send + let execute = QmpExecute { + execute: command, + id: self.last_response_id, + arguments: arguments.clone() + }; + + let serialized = serde_json::ser::to_string(&execute)?; + io::write_qmp_line(&mut self.stream, &serialized).await?; + Ok(()) + } + + /// Actor runloop. + async fn run(mut self) -> result::Result<()> { + loop { + tokio::select! { + str = io::read_qmp_line(&mut self.stream) => { + match str { + Ok(val) => { + self.handle_qmp_read(&val).await?; + } + Err(err) => { tracing::error!("stream error {err}"); break } + } + }, + + msg_opt = self.rx.recv() => { + if let Some(msg) = msg_opt { + self.handle_message(msg).await?; + } else { + break + } + } + + else => break + } + } + + Ok(()) + } + + /// Helper function to spawn the actor. + pub(crate) fn spawn_actor(socket: S) -> mpsc::Sender + where + S: AsyncReadExt + AsyncWriteExt + Unpin + Send + 'static, + { + let (tx, rx) = mpsc::channel(8); + let inner = QmpClientActor::new(socket, rx); + tokio::spawn(async move { + let res = inner.run().await; + res.inspect_err(|err| { + tracing::error!("error in actor runloop: {err}"); + }); + }); + tx + } +} diff --git a/src/qmp/client.rs b/src/qmp/client.rs index 41ea7f6..1b7f587 100644 --- a/src/qmp/client.rs +++ b/src/qmp/client.rs @@ -1,233 +1,11 @@ //! QMP client. -//! Uses a broadcast channel. -use std::collections::HashMap; - -use serde_json::json; -// TODO use tokio::{ - io::{AsyncReadExt, AsyncWriteExt, BufStream}, + io::{AsyncReadExt, AsyncWriteExt}, sync::{mpsc, oneshot}, }; -use super::{ - io, result, - types::{self, QmpExecute, QmpHandshake}, -}; - -enum QmpActorState { - /// Disconnected. - Disconnected, - - ConnectedToServer, - - /// Handshake passed successfully and we're ready. - Ready, -} - -#[derive(Debug)] -pub enum QmpActorMessage { - /// Close - DoClose, - - QmpDoHandshake { - /// The oneshot response. - tx: oneshot::Sender>, - }, - - /// Used to send a QMP message. - QmpDoSend { - /// The message to send - command: String, - arguments: Option, - - /// The oneshot response. - tx: oneshot::Sender, - }, - - /// Used to subscibe to a QMP event - QmpSubscribeToEvent { - event_name: String, - tx: mpsc::Sender, - }, -} - -struct QmpClientActor -where - S: AsyncReadExt + AsyncWriteExt + Unpin + Send, -{ - stream: BufStream, - state: QmpActorState, - - rx: mpsc::Receiver, - - // used to spit out events to subscribed clients - event_tx: HashMap>, - - /// Used for QMP execute messages. This allows us to have multiple commands - /// in flight, and to manually send to all of them. Pretty nifty. - last_response_id: u32, - - /// All in flight responses. - response_tx: HashMap>>, -} - -impl QmpClientActor -where - S: AsyncReadExt + AsyncWriteExt + Unpin + Send, -{ - fn new(stream: S, rx: mpsc::Receiver) -> Self { - Self { - stream: BufStream::new(stream), - state: QmpActorState::ConnectedToServer, - rx: rx, - event_tx: HashMap::new(), - last_response_id: 0, - response_tx: HashMap::new(), - } - } - - async fn handshake(&mut self) -> result::Result { - let handshake_as_string = io::read_qmp_line(&mut self.stream).await?; - let handshake_json = - serde_json::from_str::(handshake_as_string.as_str())?; - - // Now let's send the handshake. If this errors we're done anyways - let handshake_response = serde_json::json!({ - "execute": "qmp_capabilities" - }); - let serialized = serde_json::ser::to_string(&handshake_response)?; - - io::write_qmp_line(&mut self.stream, &serialized).await?; - - // Read what the server responded back - // FIXME: Make sure it's not an error. - let _handshake_server_response_as_string = io::read_qmp_line(&mut self.stream).await?; - // let handshake_server_response_json = - // serde_json::from_str::(handshake_server_response_as_string.as_str())?; - - self.state = QmpActorState::Ready; - - Ok(handshake_json) - } - - async fn handle_message(&mut self, msg: QmpActorMessage) -> result::Result<()> { - match msg { - QmpActorMessage::DoClose => { - // FIXME: Actually close things. - } - - QmpActorMessage::QmpDoHandshake { tx } => { - let handshake = self.handshake().await; - let _ = tx.send(handshake); - } - - // this can only really fail in a way where it panics i'm pretty sure - QmpActorMessage::QmpSubscribeToEvent { event_name, tx } => { - self.event_tx.insert(event_name.to_uppercase(), tx); - }, - - QmpActorMessage::QmpDoSend { command, arguments, tx } => { - self.response_tx.insert(self.last_response_id, Some(tx)); - self.send_execute(command, &arguments).await?; - self.last_response_id += 1; - } - - _ => tracing::error!("Unknown message"), - } - - Ok(()) - } - - async fn handle_qmp_read(&mut self, line: &String) -> result::Result<()> { - let res = serde_json::from_str::(line.as_str())?; - - - - - // Check if this is an event - if let Some(event) = res.get("event") { - let str = String::from(event.as_str().unwrap()); - - // Send the event to the subscriber - if let Some(tx) = self.event_tx.get(&str) { - let _ = tx.send(res.clone()).await; - } - } - - // Check for a execute resposne - if let Some(id) = res.get("id") { - let id = id.as_u64().unwrap() as u32; - - // Send the response - if let Some(txopt) = self.response_tx.get_mut(&id) { - let tx = txopt.take().unwrap(); - let _ = tx.send(res.clone()); - self.response_tx.remove(&id); - } - } - - - Ok(()) - } - - async fn send_execute(&mut self, command: String, arguments: &Option) -> result::Result<()> { - // Send - let execute = QmpExecute { - execute: command, - id: self.last_response_id, - arguments: arguments.clone() - }; - - let serialized = serde_json::ser::to_string(&execute)?; - io::write_qmp_line(&mut self.stream, &serialized).await?; - Ok(()) - } - - /// Actor runloop. - async fn run(mut self) -> result::Result<()> { - loop { - tokio::select! { - str = io::read_qmp_line(&mut self.stream) => { - match str { - Ok(val) => { - self.handle_qmp_read(&val).await?; - } - Err(err) => { tracing::error!("stream error {err}"); break } - } - }, - - msg_opt = self.rx.recv() => { - if let Some(msg) = msg_opt { - self.handle_message(msg).await?; - } else { - break - } - } - - else => break - } - } - - Ok(()) - } - - /// Helper function to spawn the actor. - fn spawn_actor(socket: S) -> mpsc::Sender - where - S: AsyncReadExt + AsyncWriteExt + Unpin + Send + 'static, - { - let (tx, rx) = mpsc::channel(8); - let inner = QmpClientActor::new(socket, rx); - tokio::spawn(async move { - let res = inner.run().await; - res.inspect_err(|err| { - tracing::error!("error in actor runloop: {err}"); - }); - }); - tx - } -} +use super::{result, actor, types}; /// A client for the Qemu Machine Protocol (also known as QMP). /// Generic so it works with any Tokio stream type (which fits QEMU's ability to run @@ -238,7 +16,7 @@ where /// it's all in here. #[derive(Clone)] pub struct QmpClient { - tx: mpsc::Sender, + tx: mpsc::Sender, } impl QmpClient { @@ -251,7 +29,7 @@ impl QmpClient { S: AsyncReadExt + AsyncWriteExt + Unpin + Send + 'static, { Self { - tx: QmpClientActor::spawn_actor(socket), + tx: actor::QmpClientActor::spawn_actor(socket), } } @@ -262,7 +40,7 @@ impl QmpClient { let _ = self .tx - .send(QmpActorMessage::QmpDoHandshake { tx: tx }) + .send(actor::QmpActorMessage::QmpDoHandshake { tx: tx }) .await; match rx.await { Ok(res) => res, @@ -276,7 +54,7 @@ impl QmpClient { let _ = self .tx - .send(QmpActorMessage::QmpDoSend { command: command, arguments: arguments.clone(), tx: tx }) + .send(actor::QmpActorMessage::QmpDoSend { command: command, arguments: arguments.clone(), tx: tx }) .await; match rx.await { @@ -295,7 +73,7 @@ impl QmpClient { let _ = self .tx - .send(QmpActorMessage::QmpSubscribeToEvent { + .send(actor::QmpActorMessage::QmpSubscribeToEvent { event_name: event_name, tx: tx, }) diff --git a/src/qmp/mod.rs b/src/qmp/mod.rs index df95a50..9f9e1f2 100644 --- a/src/qmp/mod.rs +++ b/src/qmp/mod.rs @@ -3,5 +3,6 @@ pub(crate) mod types; pub mod result; pub(crate) mod io; +pub(crate) mod actor; pub mod client; pub use client::*; \ No newline at end of file