qmp: cleanup pass + executing commands works

Some more helpers (like, `execute_hmp("info block".into())` might be nice to add, for now this works.
This commit is contained in:
Lily Tsuru 2024-05-09 00:48:42 -04:00
parent 4cda248591
commit a37d2468c2
6 changed files with 155 additions and 57 deletions

View file

@ -11,8 +11,8 @@ async fn main() {
tracing::subscriber::set_global_default(subscriber).expect("You Banned"); tracing::subscriber::set_global_default(subscriber).expect("You Banned");
// This is test code..
// Create a stream to connect
let stream = UnixStream::connect("/home/lily/vms/xpiss/qmp.sock") let stream = UnixStream::connect("/home/lily/vms/xpiss/qmp.sock")
.await .await
.expect("Could not connect"); .expect("Could not connect");
@ -20,15 +20,24 @@ async fn main() {
let client = qmp::QmpClient::new(stream); let client = qmp::QmpClient::new(stream);
// Handshake QMP
client.handshake().await.expect("Could not handshake QMP"); client.handshake().await.expect("Could not handshake QMP");
println!("handshook QMP"); println!("handshook QMP");
// let's try to get all RESET events // let's try to get all STOP events from QMP now.
let mut rx = client.subscribe_to_event("RESET".into()).await.unwrap(); let mut rx = client.subscribe_to_event("STOP".into()).await.unwrap();
// If this worked, then now we can recieve all the events.
// This code here allows running a VM with the -no-shutdown option,
// automatially resetting it on shutdown.
while let Some(message) = rx.recv().await { while let Some(message) = rx.recv().await {
println!("Got event! {:?}", message); println!("Got stop event! {:?}", message);
let res1 = client.execute("system_reset".into(), None).await.expect("FUCK");
let res2 = client.execute("cont".into(), None).await.expect("FUCK");
println!("Result of running: {:?}, {:?}", res1, res2);
} }

View file

@ -3,29 +3,24 @@
use std::collections::HashMap; use std::collections::HashMap;
use std::time::Duration; use serde_json::json;
// TODO // TODO
use tokio::{ use tokio::{
io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufStream}, io::{AsyncReadExt, AsyncWriteExt, BufStream},
sync::{mpsc, mpsc::error::TryRecvError, oneshot}, sync::{mpsc, oneshot},
}; };
use super::{ use super::{
result, io, result,
io, types::{self, QmpExecute, QmpHandshake},
types::{self, QmpHandshake},
}; };
enum QmpClientState { enum QmpActorState {
/// Disconnected. /// Disconnected.
Disconnected, Disconnected,
ConnectedToServer, ConnectedToServer,
/// The server
HandshakingServer,
/// Handshake passed successfully and we're ready. /// Handshake passed successfully and we're ready.
Ready, Ready,
} }
@ -42,6 +37,10 @@ pub enum QmpActorMessage {
/// Used to send a QMP message. /// Used to send a QMP message.
QmpDoSend { QmpDoSend {
/// The message to send
command: String,
arguments: Option<serde_json::Value>,
/// The oneshot response. /// The oneshot response.
tx: oneshot::Sender<serde_json::Value>, tx: oneshot::Sender<serde_json::Value>,
}, },
@ -53,14 +52,12 @@ pub enum QmpActorMessage {
}, },
} }
struct QmpClientActor<S> struct QmpClientActor<S>
where where
S: AsyncReadExt + AsyncWriteExt + Unpin + Send, S: AsyncReadExt + AsyncWriteExt + Unpin + Send,
{ {
stream: BufStream<S>, stream: BufStream<S>,
state: QmpClientState, state: QmpActorState,
rx: mpsc::Receiver<QmpActorMessage>, rx: mpsc::Receiver<QmpActorMessage>,
@ -72,7 +69,7 @@ where
last_response_id: u32, last_response_id: u32,
/// All in flight responses. /// All in flight responses.
response_tx: HashMap<u32, oneshot::Sender<serde_json::Value>>, response_tx: HashMap<u32, Option<oneshot::Sender<serde_json::Value>>>,
} }
impl<S> QmpClientActor<S> impl<S> QmpClientActor<S>
@ -82,7 +79,7 @@ where
fn new(stream: S, rx: mpsc::Receiver<QmpActorMessage>) -> Self { fn new(stream: S, rx: mpsc::Receiver<QmpActorMessage>) -> Self {
Self { Self {
stream: BufStream::new(stream), stream: BufStream::new(stream),
state: QmpClientState::ConnectedToServer, state: QmpActorState::ConnectedToServer,
rx: rx, rx: rx,
event_tx: HashMap::new(), event_tx: HashMap::new(),
last_response_id: 0, last_response_id: 0,
@ -91,7 +88,6 @@ where
} }
async fn handshake(&mut self) -> result::Result<types::QmpHandshake> { async fn handshake(&mut self) -> result::Result<types::QmpHandshake> {
self.state = QmpClientState::HandshakingServer;
let handshake_as_string = io::read_qmp_line(&mut self.stream).await?; let handshake_as_string = io::read_qmp_line(&mut self.stream).await?;
let handshake_json = let handshake_json =
serde_json::from_str::<types::QmpHandshake>(handshake_as_string.as_str())?; serde_json::from_str::<types::QmpHandshake>(handshake_as_string.as_str())?;
@ -107,18 +103,18 @@ where
// Read what the server responded back // Read what the server responded back
// FIXME: Make sure it's not an error. // FIXME: Make sure it's not an error.
let _handshake_server_response_as_string = io::read_qmp_line(&mut self.stream).await?; let _handshake_server_response_as_string = io::read_qmp_line(&mut self.stream).await?;
// let handshake_server_response_json = // let handshake_server_response_json =
// serde_json::from_str::<serde_json::Value>(handshake_server_response_as_string.as_str())?; // serde_json::from_str::<serde_json::Value>(handshake_server_response_as_string.as_str())?;
self.state = QmpClientState::Ready; self.state = QmpActorState::Ready;
Ok(handshake_json) Ok(handshake_json)
} }
async fn handle_message(&mut self, msg: QmpActorMessage) { async fn handle_message(&mut self, msg: QmpActorMessage) -> result::Result<()> {
match msg { match msg {
QmpActorMessage::DoClose => { QmpActorMessage::DoClose => {
println!("Byes byes!!!") // FIXME: Actually close things.
} }
QmpActorMessage::QmpDoHandshake { tx } => { QmpActorMessage::QmpDoHandshake { tx } => {
@ -129,19 +125,25 @@ where
// this can only really fail in a way where it panics i'm pretty sure // this can only really fail in a way where it panics i'm pretty sure
QmpActorMessage::QmpSubscribeToEvent { event_name, tx } => { QmpActorMessage::QmpSubscribeToEvent { event_name, tx } => {
self.event_tx.insert(event_name.to_uppercase(), tx); self.event_tx.insert(event_name.to_uppercase(), tx);
},
QmpActorMessage::QmpDoSend { command, arguments, tx } => {
self.response_tx.insert(self.last_response_id, Some(tx));
self.send_execute(command, &arguments).await?;
self.last_response_id += 1;
} }
_ => tracing::error!("Unknown message"), _ => tracing::error!("Unknown message"),
} }
Ok(())
} }
async fn run(mut self) { async fn handle_qmp_read(&mut self, line: &String) -> result::Result<()> {
loop { let res = serde_json::from_str::<serde_json::Value>(line.as_str())?;
tokio::select! {
str = io::read_qmp_line(&mut self.stream) => {
match str {
Ok(val) => {
let res = serde_json::from_str::<serde_json::Value>(val.as_str()).expect("why it not json fucker bing");
// Check if this is an event // Check if this is an event
if let Some(event) = res.get("event") { if let Some(event) = res.get("event") {
@ -149,17 +151,55 @@ where
// Send the event to the subscriber // Send the event to the subscriber
if let Some(tx) = self.event_tx.get(&str) { if let Some(tx) = self.event_tx.get(&str) {
let _ = tx.send(res).await; let _ = tx.send(res.clone()).await;
} }
} }
// Check for a execute resposne
if let Some(id) = res.get("id") {
let id = id.as_u64().unwrap() as u32;
// Send the response
if let Some(txopt) = self.response_tx.get_mut(&id) {
let tx = txopt.take().unwrap();
let _ = tx.send(res.clone());
self.response_tx.remove(&id);
} }
Err(err) => { println!("stream error {err}"); break } }
Ok(())
}
async fn send_execute(&mut self, command: String, arguments: &Option<serde_json::Value>) -> result::Result<()> {
// Send
let execute = QmpExecute {
execute: command,
id: self.last_response_id,
arguments: arguments.clone()
};
let serialized = serde_json::ser::to_string(&execute)?;
io::write_qmp_line(&mut self.stream, &serialized).await?;
Ok(())
}
/// Actor runloop.
async fn run(mut self) -> result::Result<()> {
loop {
tokio::select! {
str = io::read_qmp_line(&mut self.stream) => {
match str {
Ok(val) => {
self.handle_qmp_read(&val).await?;
}
Err(err) => { tracing::error!("stream error {err}"); break }
} }
}, },
msg_opt = self.rx.recv() => { msg_opt = self.rx.recv() => {
if let Some(msg) = msg_opt { if let Some(msg) = msg_opt {
self.handle_message(msg).await; self.handle_message(msg).await?;
} else { } else {
break break
} }
@ -168,12 +208,30 @@ where
else => break else => break
} }
} }
Ok(())
}
/// Helper function to spawn the actor.
fn spawn_actor(socket: S) -> mpsc::Sender<QmpActorMessage>
where
S: AsyncReadExt + AsyncWriteExt + Unpin + Send + 'static,
{
let (tx, rx) = mpsc::channel(8);
let inner = QmpClientActor::new(socket, rx);
tokio::spawn(async move {
let res = inner.run().await;
res.inspect_err(|err| {
tracing::error!("error in actor runloop: {err}");
});
});
tx
} }
} }
/// A client for the Qemu Machine Protocol (also known as QMP). /// A client for the Qemu Machine Protocol (also known as QMP).
/// Generic so it works with any Tokio stream type (which fits QEMU's ability to run /// Generic so it works with any Tokio stream type (which fits QEMU's ability to run
/// QMP across many protocols). /// QMP across many protocols). Currently isn't very typesafe but whatever.
/// ///
/// This struct is essentially a handle to an actor. /// This struct is essentially a handle to an actor.
/// I might maybe refactor this or make some generic "actor" thingmabob but for now /// I might maybe refactor this or make some generic "actor" thingmabob but for now
@ -192,12 +250,9 @@ impl QmpClient {
where where
S: AsyncReadExt + AsyncWriteExt + Unpin + Send + 'static, S: AsyncReadExt + AsyncWriteExt + Unpin + Send + 'static,
{ {
let (tx, rx) = mpsc::channel(8); Self {
let inner = QmpClientActor::new(socket, rx); tx: QmpClientActor::spawn_actor(socket),
}
tokio::spawn(async move { inner.run().await });
Self { tx }
} }
// TODO handle errors properly // TODO handle errors properly
@ -205,18 +260,30 @@ impl QmpClient {
pub async fn handshake(&self) -> result::Result<types::QmpHandshake> { pub async fn handshake(&self) -> result::Result<types::QmpHandshake> {
let (tx, rx) = oneshot::channel(); let (tx, rx) = oneshot::channel();
let _ = self.tx let _ = self
.tx
.send(QmpActorMessage::QmpDoHandshake { tx: tx }) .send(QmpActorMessage::QmpDoHandshake { tx: tx })
.await; .await;
match rx.await { match rx.await {
Ok(res) => res, Ok(res) => res,
Err(err) => { Err(err) => Err(result::QmpError::GeneralFail),
println!("???? {err}");
Err(result::QmpError::GeneralFail)
}
} }
} }
pub async fn execute(&self, command: String, arguments: Option<serde_json::Value>) -> result::Result<serde_json::Value> {
let (tx, rx) = oneshot::channel();
let _ = self
.tx
.send(QmpActorMessage::QmpDoSend { command: command, arguments: arguments.clone(), tx: tx })
.await;
match rx.await {
Ok(res) => Ok(res),
Err(err) => Err(result::QmpError::GeneralFail),
}
}
/// Subscribes to a QMP event. /// Subscribes to a QMP event.
/// Returns the recvieving arm of a [mpsc::channel] which will recieve /// Returns the recvieving arm of a [mpsc::channel] which will recieve
/// the JSON event payloads when an event is sent by the server. /// the JSON event payloads when an event is sent by the server.
@ -226,7 +293,8 @@ impl QmpClient {
) -> result::Result<mpsc::Receiver<serde_json::Value>> { ) -> result::Result<mpsc::Receiver<serde_json::Value>> {
let (tx, rx) = mpsc::channel(8); let (tx, rx) = mpsc::channel(8);
let _ = self.tx let _ = self
.tx
.send(QmpActorMessage::QmpSubscribeToEvent { .send(QmpActorMessage::QmpSubscribeToEvent {
event_name: event_name, event_name: event_name,
tx: tx, tx: tx,

View file

@ -1,3 +1,5 @@
//! Low-level IO routines
use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufStream}; use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufStream};
use super::result; use super::result;
@ -27,9 +29,13 @@ pub async fn write_qmp_line<S>(
where where
S: AsyncReadExt + AsyncWriteExt + Unpin + Send, S: AsyncReadExt + AsyncWriteExt + Unpin + Send,
{ {
let delimited = format!("{}\n", string); // rough but whatever
let delimited = format!("{}\r\n", string);
stream.write_all(delimited.as_bytes()).await?; stream.write_all(delimited.as_bytes()).await?;
// FIXME: We probably shouldn't do this all the time but I mean
// QMP isn't that high bandwidth so it's probably fine.
stream.flush().await?; stream.flush().await?;
Ok(()) Ok(())

View file

@ -1,3 +1,5 @@
//! QMP client lib
pub(crate) mod types; pub(crate) mod types;
pub mod result; pub mod result;
pub(crate) mod io; pub(crate) mod io;

View file

@ -1,6 +1,7 @@
use thiserror::Error; use thiserror::Error;
/// QMP library error. /// QMP library error.
/// FIXME: more arms
#[derive(Error, Debug)] #[derive(Error, Debug)]
pub enum QmpError { pub enum QmpError {
#[error(transparent)] #[error(transparent)]

View file

@ -26,3 +26,15 @@ pub struct QmpHandshake {
#[serde(rename = "QMP")] #[serde(rename = "QMP")]
pub qmp: QmpHandshakeQMP, pub qmp: QmpHandshakeQMP,
} }
#[derive(Serialize, Debug)]
pub struct QmpExecute {
pub execute: String,
/// Really, this is defined in the QMP as *any* JSON value,
/// but we only use a u32 here.
pub id: u32,
#[serde(skip_serializing_if = "Option::is_none")]
pub arguments: Option<serde_json::Value>
}