qmp: split actor into new actor module

logic seperation, bla bla.
This commit is contained in:
Lily Tsuru 2024-05-09 00:55:14 -04:00
parent a37d2468c2
commit 51c6356261
3 changed files with 234 additions and 229 deletions

226
src/qmp/actor.rs Normal file
View file

@ -0,0 +1,226 @@
use std::collections::HashMap;
// TODO
use tokio::{
io::{AsyncReadExt, AsyncWriteExt, BufStream},
sync::{mpsc, oneshot},
};
use super::{
io, result,
types::{self, QmpExecute, QmpHandshake},
};
enum QmpActorState {
/// Disconnected.
Disconnected,
ConnectedToServer,
/// Handshake passed successfully and we're ready.
Ready,
}
/// Actor message type.
pub(crate) enum QmpActorMessage {
/// Close
QmpDoClose,
QmpDoHandshake {
/// The oneshot response.
tx: oneshot::Sender<super::result::Result<QmpHandshake>>,
},
/// Used to send a QMP message.
QmpDoSend {
/// The message to send
command: String,
arguments: Option<serde_json::Value>,
/// The oneshot response.
tx: oneshot::Sender<serde_json::Value>,
},
/// Used to subscibe to a QMP event
QmpSubscribeToEvent {
event_name: String,
tx: mpsc::Sender<serde_json::Value>,
},
}
pub(crate) struct QmpClientActor<S>
where
S: AsyncReadExt + AsyncWriteExt + Unpin + Send,
{
stream: BufStream<S>,
state: QmpActorState,
rx: mpsc::Receiver<QmpActorMessage>,
// used to spit out events to subscribed clients
event_tx: HashMap<String, mpsc::Sender<serde_json::Value>>,
/// Used for QMP execute messages. This allows us to have multiple commands
/// in flight, and to manually send to all of them. Pretty nifty.
last_response_id: u32,
/// All in flight responses.
response_tx: HashMap<u32, Option<oneshot::Sender<serde_json::Value>>>,
}
impl<S> QmpClientActor<S>
where
S: AsyncReadExt + AsyncWriteExt + Unpin + Send,
{
fn new(stream: S, rx: mpsc::Receiver<QmpActorMessage>) -> Self {
Self {
stream: BufStream::new(stream),
state: QmpActorState::ConnectedToServer,
rx: rx,
event_tx: HashMap::new(),
last_response_id: 0,
response_tx: HashMap::new(),
}
}
async fn handshake(&mut self) -> result::Result<types::QmpHandshake> {
let handshake_as_string = io::read_qmp_line(&mut self.stream).await?;
let handshake_json =
serde_json::from_str::<types::QmpHandshake>(handshake_as_string.as_str())?;
// Now let's send the handshake. If this errors we're done anyways
let handshake_response = serde_json::json!({
"execute": "qmp_capabilities"
});
let serialized = serde_json::ser::to_string(&handshake_response)?;
io::write_qmp_line(&mut self.stream, &serialized).await?;
// Read what the server responded back
// FIXME: Make sure it's not an error.
let _handshake_server_response_as_string = io::read_qmp_line(&mut self.stream).await?;
// let handshake_server_response_json =
// serde_json::from_str::<serde_json::Value>(handshake_server_response_as_string.as_str())?;
self.state = QmpActorState::Ready;
Ok(handshake_json)
}
async fn handle_message(&mut self, msg: QmpActorMessage) -> result::Result<()> {
match msg {
QmpActorMessage::QmpDoClose => {
// FIXME: Actually close things.
}
QmpActorMessage::QmpDoHandshake { tx } => {
let handshake = self.handshake().await;
let _ = tx.send(handshake);
}
// this can only really fail in a way where it panics i'm pretty sure
QmpActorMessage::QmpSubscribeToEvent { event_name, tx } => {
self.event_tx.insert(event_name.to_uppercase(), tx);
},
QmpActorMessage::QmpDoSend { command, arguments, tx } => {
self.response_tx.insert(self.last_response_id, Some(tx));
self.send_execute(command, &arguments).await?;
self.last_response_id += 1;
}
_ => tracing::error!("Unknown message"),
}
Ok(())
}
async fn handle_qmp_read(&mut self, line: &String) -> result::Result<()> {
let res = serde_json::from_str::<serde_json::Value>(line.as_str())?;
// Check if this is an event
if let Some(event) = res.get("event") {
let str = String::from(event.as_str().unwrap());
// Send the event to the subscriber
if let Some(tx) = self.event_tx.get(&str) {
let _ = tx.send(res.clone()).await;
}
}
// Check for a execute resposne
if let Some(id) = res.get("id") {
let id = id.as_u64().unwrap() as u32;
// Send the response
if let Some(txopt) = self.response_tx.get_mut(&id) {
let tx = txopt.take().unwrap();
let _ = tx.send(res.clone());
self.response_tx.remove(&id);
}
}
Ok(())
}
async fn send_execute(&mut self, command: String, arguments: &Option<serde_json::Value>) -> result::Result<()> {
// Send
let execute = QmpExecute {
execute: command,
id: self.last_response_id,
arguments: arguments.clone()
};
let serialized = serde_json::ser::to_string(&execute)?;
io::write_qmp_line(&mut self.stream, &serialized).await?;
Ok(())
}
/// Actor runloop.
async fn run(mut self) -> result::Result<()> {
loop {
tokio::select! {
str = io::read_qmp_line(&mut self.stream) => {
match str {
Ok(val) => {
self.handle_qmp_read(&val).await?;
}
Err(err) => { tracing::error!("stream error {err}"); break }
}
},
msg_opt = self.rx.recv() => {
if let Some(msg) = msg_opt {
self.handle_message(msg).await?;
} else {
break
}
}
else => break
}
}
Ok(())
}
/// Helper function to spawn the actor.
pub(crate) fn spawn_actor(socket: S) -> mpsc::Sender<QmpActorMessage>
where
S: AsyncReadExt + AsyncWriteExt + Unpin + Send + 'static,
{
let (tx, rx) = mpsc::channel(8);
let inner = QmpClientActor::new(socket, rx);
tokio::spawn(async move {
let res = inner.run().await;
res.inspect_err(|err| {
tracing::error!("error in actor runloop: {err}");
});
});
tx
}
}

View file

@ -1,233 +1,11 @@
//! QMP client. //! QMP client.
//! Uses a broadcast channel.
use std::collections::HashMap;
use serde_json::json;
// TODO
use tokio::{ use tokio::{
io::{AsyncReadExt, AsyncWriteExt, BufStream}, io::{AsyncReadExt, AsyncWriteExt},
sync::{mpsc, oneshot}, sync::{mpsc, oneshot},
}; };
use super::{ use super::{result, actor, types};
io, result,
types::{self, QmpExecute, QmpHandshake},
};
enum QmpActorState {
/// Disconnected.
Disconnected,
ConnectedToServer,
/// Handshake passed successfully and we're ready.
Ready,
}
#[derive(Debug)]
pub enum QmpActorMessage {
/// Close
DoClose,
QmpDoHandshake {
/// The oneshot response.
tx: oneshot::Sender<super::result::Result<QmpHandshake>>,
},
/// Used to send a QMP message.
QmpDoSend {
/// The message to send
command: String,
arguments: Option<serde_json::Value>,
/// The oneshot response.
tx: oneshot::Sender<serde_json::Value>,
},
/// Used to subscibe to a QMP event
QmpSubscribeToEvent {
event_name: String,
tx: mpsc::Sender<serde_json::Value>,
},
}
struct QmpClientActor<S>
where
S: AsyncReadExt + AsyncWriteExt + Unpin + Send,
{
stream: BufStream<S>,
state: QmpActorState,
rx: mpsc::Receiver<QmpActorMessage>,
// used to spit out events to subscribed clients
event_tx: HashMap<String, mpsc::Sender<serde_json::Value>>,
/// Used for QMP execute messages. This allows us to have multiple commands
/// in flight, and to manually send to all of them. Pretty nifty.
last_response_id: u32,
/// All in flight responses.
response_tx: HashMap<u32, Option<oneshot::Sender<serde_json::Value>>>,
}
impl<S> QmpClientActor<S>
where
S: AsyncReadExt + AsyncWriteExt + Unpin + Send,
{
fn new(stream: S, rx: mpsc::Receiver<QmpActorMessage>) -> Self {
Self {
stream: BufStream::new(stream),
state: QmpActorState::ConnectedToServer,
rx: rx,
event_tx: HashMap::new(),
last_response_id: 0,
response_tx: HashMap::new(),
}
}
async fn handshake(&mut self) -> result::Result<types::QmpHandshake> {
let handshake_as_string = io::read_qmp_line(&mut self.stream).await?;
let handshake_json =
serde_json::from_str::<types::QmpHandshake>(handshake_as_string.as_str())?;
// Now let's send the handshake. If this errors we're done anyways
let handshake_response = serde_json::json!({
"execute": "qmp_capabilities"
});
let serialized = serde_json::ser::to_string(&handshake_response)?;
io::write_qmp_line(&mut self.stream, &serialized).await?;
// Read what the server responded back
// FIXME: Make sure it's not an error.
let _handshake_server_response_as_string = io::read_qmp_line(&mut self.stream).await?;
// let handshake_server_response_json =
// serde_json::from_str::<serde_json::Value>(handshake_server_response_as_string.as_str())?;
self.state = QmpActorState::Ready;
Ok(handshake_json)
}
async fn handle_message(&mut self, msg: QmpActorMessage) -> result::Result<()> {
match msg {
QmpActorMessage::DoClose => {
// FIXME: Actually close things.
}
QmpActorMessage::QmpDoHandshake { tx } => {
let handshake = self.handshake().await;
let _ = tx.send(handshake);
}
// this can only really fail in a way where it panics i'm pretty sure
QmpActorMessage::QmpSubscribeToEvent { event_name, tx } => {
self.event_tx.insert(event_name.to_uppercase(), tx);
},
QmpActorMessage::QmpDoSend { command, arguments, tx } => {
self.response_tx.insert(self.last_response_id, Some(tx));
self.send_execute(command, &arguments).await?;
self.last_response_id += 1;
}
_ => tracing::error!("Unknown message"),
}
Ok(())
}
async fn handle_qmp_read(&mut self, line: &String) -> result::Result<()> {
let res = serde_json::from_str::<serde_json::Value>(line.as_str())?;
// Check if this is an event
if let Some(event) = res.get("event") {
let str = String::from(event.as_str().unwrap());
// Send the event to the subscriber
if let Some(tx) = self.event_tx.get(&str) {
let _ = tx.send(res.clone()).await;
}
}
// Check for a execute resposne
if let Some(id) = res.get("id") {
let id = id.as_u64().unwrap() as u32;
// Send the response
if let Some(txopt) = self.response_tx.get_mut(&id) {
let tx = txopt.take().unwrap();
let _ = tx.send(res.clone());
self.response_tx.remove(&id);
}
}
Ok(())
}
async fn send_execute(&mut self, command: String, arguments: &Option<serde_json::Value>) -> result::Result<()> {
// Send
let execute = QmpExecute {
execute: command,
id: self.last_response_id,
arguments: arguments.clone()
};
let serialized = serde_json::ser::to_string(&execute)?;
io::write_qmp_line(&mut self.stream, &serialized).await?;
Ok(())
}
/// Actor runloop.
async fn run(mut self) -> result::Result<()> {
loop {
tokio::select! {
str = io::read_qmp_line(&mut self.stream) => {
match str {
Ok(val) => {
self.handle_qmp_read(&val).await?;
}
Err(err) => { tracing::error!("stream error {err}"); break }
}
},
msg_opt = self.rx.recv() => {
if let Some(msg) = msg_opt {
self.handle_message(msg).await?;
} else {
break
}
}
else => break
}
}
Ok(())
}
/// Helper function to spawn the actor.
fn spawn_actor(socket: S) -> mpsc::Sender<QmpActorMessage>
where
S: AsyncReadExt + AsyncWriteExt + Unpin + Send + 'static,
{
let (tx, rx) = mpsc::channel(8);
let inner = QmpClientActor::new(socket, rx);
tokio::spawn(async move {
let res = inner.run().await;
res.inspect_err(|err| {
tracing::error!("error in actor runloop: {err}");
});
});
tx
}
}
/// A client for the Qemu Machine Protocol (also known as QMP). /// A client for the Qemu Machine Protocol (also known as QMP).
/// Generic so it works with any Tokio stream type (which fits QEMU's ability to run /// Generic so it works with any Tokio stream type (which fits QEMU's ability to run
@ -238,7 +16,7 @@ where
/// it's all in here. /// it's all in here.
#[derive(Clone)] #[derive(Clone)]
pub struct QmpClient { pub struct QmpClient {
tx: mpsc::Sender<QmpActorMessage>, tx: mpsc::Sender<actor::QmpActorMessage>,
} }
impl QmpClient { impl QmpClient {
@ -251,7 +29,7 @@ impl QmpClient {
S: AsyncReadExt + AsyncWriteExt + Unpin + Send + 'static, S: AsyncReadExt + AsyncWriteExt + Unpin + Send + 'static,
{ {
Self { Self {
tx: QmpClientActor::spawn_actor(socket), tx: actor::QmpClientActor::spawn_actor(socket),
} }
} }
@ -262,7 +40,7 @@ impl QmpClient {
let _ = self let _ = self
.tx .tx
.send(QmpActorMessage::QmpDoHandshake { tx: tx }) .send(actor::QmpActorMessage::QmpDoHandshake { tx: tx })
.await; .await;
match rx.await { match rx.await {
Ok(res) => res, Ok(res) => res,
@ -276,7 +54,7 @@ impl QmpClient {
let _ = self let _ = self
.tx .tx
.send(QmpActorMessage::QmpDoSend { command: command, arguments: arguments.clone(), tx: tx }) .send(actor::QmpActorMessage::QmpDoSend { command: command, arguments: arguments.clone(), tx: tx })
.await; .await;
match rx.await { match rx.await {
@ -295,7 +73,7 @@ impl QmpClient {
let _ = self let _ = self
.tx .tx
.send(QmpActorMessage::QmpSubscribeToEvent { .send(actor::QmpActorMessage::QmpSubscribeToEvent {
event_name: event_name, event_name: event_name,
tx: tx, tx: tx,
}) })

View file

@ -3,5 +3,6 @@
pub(crate) mod types; pub(crate) mod types;
pub mod result; pub mod result;
pub(crate) mod io; pub(crate) mod io;
pub(crate) mod actor;
pub mod client; pub mod client;
pub use client::*; pub use client::*;