diff --git a/src/main.rs b/src/main.rs index cf71f1b..091c2ed 100644 --- a/src/main.rs +++ b/src/main.rs @@ -11,8 +11,8 @@ async fn main() { tracing::subscriber::set_global_default(subscriber).expect("You Banned"); - // This is test code.. + // Create a stream to connect let stream = UnixStream::connect("/home/lily/vms/xpiss/qmp.sock") .await .expect("Could not connect"); @@ -20,15 +20,24 @@ async fn main() { let client = qmp::QmpClient::new(stream); + // Handshake QMP client.handshake().await.expect("Could not handshake QMP"); println!("handshook QMP"); - // let's try to get all RESET events - let mut rx = client.subscribe_to_event("RESET".into()).await.unwrap(); + // let's try to get all STOP events from QMP now. + let mut rx = client.subscribe_to_event("STOP".into()).await.unwrap(); + // If this worked, then now we can recieve all the events. + // This code here allows running a VM with the -no-shutdown option, + // automatially resetting it on shutdown. while let Some(message) = rx.recv().await { - println!("Got event! {:?}", message); + println!("Got stop event! {:?}", message); + + let res1 = client.execute("system_reset".into(), None).await.expect("FUCK"); + let res2 = client.execute("cont".into(), None).await.expect("FUCK"); + + println!("Result of running: {:?}, {:?}", res1, res2); } diff --git a/src/qmp/client.rs b/src/qmp/client.rs index 97aa6ec..41ea7f6 100644 --- a/src/qmp/client.rs +++ b/src/qmp/client.rs @@ -3,29 +3,24 @@ use std::collections::HashMap; -use std::time::Duration; - +use serde_json::json; // TODO use tokio::{ - io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufStream}, - sync::{mpsc, mpsc::error::TryRecvError, oneshot}, + io::{AsyncReadExt, AsyncWriteExt, BufStream}, + sync::{mpsc, oneshot}, }; use super::{ - result, - io, - types::{self, QmpHandshake}, + io, result, + types::{self, QmpExecute, QmpHandshake}, }; -enum QmpClientState { +enum QmpActorState { /// Disconnected. Disconnected, ConnectedToServer, - /// The server - HandshakingServer, - /// Handshake passed successfully and we're ready. Ready, } @@ -42,6 +37,10 @@ pub enum QmpActorMessage { /// Used to send a QMP message. QmpDoSend { + /// The message to send + command: String, + arguments: Option, + /// The oneshot response. tx: oneshot::Sender, }, @@ -53,14 +52,12 @@ pub enum QmpActorMessage { }, } - - struct QmpClientActor where S: AsyncReadExt + AsyncWriteExt + Unpin + Send, { stream: BufStream, - state: QmpClientState, + state: QmpActorState, rx: mpsc::Receiver, @@ -72,7 +69,7 @@ where last_response_id: u32, /// All in flight responses. - response_tx: HashMap>, + response_tx: HashMap>>, } impl QmpClientActor @@ -82,7 +79,7 @@ where fn new(stream: S, rx: mpsc::Receiver) -> Self { Self { stream: BufStream::new(stream), - state: QmpClientState::ConnectedToServer, + state: QmpActorState::ConnectedToServer, rx: rx, event_tx: HashMap::new(), last_response_id: 0, @@ -91,7 +88,6 @@ where } async fn handshake(&mut self) -> result::Result { - self.state = QmpClientState::HandshakingServer; let handshake_as_string = io::read_qmp_line(&mut self.stream).await?; let handshake_json = serde_json::from_str::(handshake_as_string.as_str())?; @@ -101,24 +97,24 @@ where "execute": "qmp_capabilities" }); let serialized = serde_json::ser::to_string(&handshake_response)?; - + io::write_qmp_line(&mut self.stream, &serialized).await?; // Read what the server responded back // FIXME: Make sure it's not an error. let _handshake_server_response_as_string = io::read_qmp_line(&mut self.stream).await?; -// let handshake_server_response_json = -// serde_json::from_str::(handshake_server_response_as_string.as_str())?; + // let handshake_server_response_json = + // serde_json::from_str::(handshake_server_response_as_string.as_str())?; - self.state = QmpClientState::Ready; + self.state = QmpActorState::Ready; Ok(handshake_json) } - async fn handle_message(&mut self, msg: QmpActorMessage) { + async fn handle_message(&mut self, msg: QmpActorMessage) -> result::Result<()> { match msg { QmpActorMessage::DoClose => { - println!("Byes byes!!!") + // FIXME: Actually close things. } QmpActorMessage::QmpDoHandshake { tx } => { @@ -129,37 +125,81 @@ where // this can only really fail in a way where it panics i'm pretty sure QmpActorMessage::QmpSubscribeToEvent { event_name, tx } => { self.event_tx.insert(event_name.to_uppercase(), tx); + }, + + QmpActorMessage::QmpDoSend { command, arguments, tx } => { + self.response_tx.insert(self.last_response_id, Some(tx)); + self.send_execute(command, &arguments).await?; + self.last_response_id += 1; } _ => tracing::error!("Unknown message"), } + + Ok(()) } - async fn run(mut self) { + async fn handle_qmp_read(&mut self, line: &String) -> result::Result<()> { + let res = serde_json::from_str::(line.as_str())?; + + + + + // Check if this is an event + if let Some(event) = res.get("event") { + let str = String::from(event.as_str().unwrap()); + + // Send the event to the subscriber + if let Some(tx) = self.event_tx.get(&str) { + let _ = tx.send(res.clone()).await; + } + } + + // Check for a execute resposne + if let Some(id) = res.get("id") { + let id = id.as_u64().unwrap() as u32; + + // Send the response + if let Some(txopt) = self.response_tx.get_mut(&id) { + let tx = txopt.take().unwrap(); + let _ = tx.send(res.clone()); + self.response_tx.remove(&id); + } + } + + + Ok(()) + } + + async fn send_execute(&mut self, command: String, arguments: &Option) -> result::Result<()> { + // Send + let execute = QmpExecute { + execute: command, + id: self.last_response_id, + arguments: arguments.clone() + }; + + let serialized = serde_json::ser::to_string(&execute)?; + io::write_qmp_line(&mut self.stream, &serialized).await?; + Ok(()) + } + + /// Actor runloop. + async fn run(mut self) -> result::Result<()> { loop { tokio::select! { str = io::read_qmp_line(&mut self.stream) => { match str { Ok(val) => { - let res = serde_json::from_str::(val.as_str()).expect("why it not json fucker bing"); - - // Check if this is an event - if let Some(event) = res.get("event") { - let str = String::from(event.as_str().unwrap()); - - // Send the event to the subscriber - if let Some(tx) = self.event_tx.get(&str) { - let _ = tx.send(res).await; - } - } + self.handle_qmp_read(&val).await?; } - Err(err) => { println!("stream error {err}"); break } + Err(err) => { tracing::error!("stream error {err}"); break } } }, msg_opt = self.rx.recv() => { if let Some(msg) = msg_opt { - self.handle_message(msg).await; + self.handle_message(msg).await?; } else { break } @@ -168,13 +208,31 @@ where else => break } } + + Ok(()) + } + + /// Helper function to spawn the actor. + fn spawn_actor(socket: S) -> mpsc::Sender + where + S: AsyncReadExt + AsyncWriteExt + Unpin + Send + 'static, + { + let (tx, rx) = mpsc::channel(8); + let inner = QmpClientActor::new(socket, rx); + tokio::spawn(async move { + let res = inner.run().await; + res.inspect_err(|err| { + tracing::error!("error in actor runloop: {err}"); + }); + }); + tx } } /// A client for the Qemu Machine Protocol (also known as QMP). /// Generic so it works with any Tokio stream type (which fits QEMU's ability to run -/// QMP across many protocols). -/// +/// QMP across many protocols). Currently isn't very typesafe but whatever. +/// /// This struct is essentially a handle to an actor. /// I might maybe refactor this or make some generic "actor" thingmabob but for now /// it's all in here. @@ -192,12 +250,9 @@ impl QmpClient { where S: AsyncReadExt + AsyncWriteExt + Unpin + Send + 'static, { - let (tx, rx) = mpsc::channel(8); - let inner = QmpClientActor::new(socket, rx); - - tokio::spawn(async move { inner.run().await }); - - Self { tx } + Self { + tx: QmpClientActor::spawn_actor(socket), + } } // TODO handle errors properly @@ -205,19 +260,31 @@ impl QmpClient { pub async fn handshake(&self) -> result::Result { let (tx, rx) = oneshot::channel(); - let _ = self.tx + let _ = self + .tx .send(QmpActorMessage::QmpDoHandshake { tx: tx }) .await; match rx.await { Ok(res) => res, - Err(err) => { - println!("???? {err}"); - Err(result::QmpError::GeneralFail) - } + Err(err) => Err(result::QmpError::GeneralFail), } } - /// Subscribes to a QMP event. + + pub async fn execute(&self, command: String, arguments: Option) -> result::Result { + let (tx, rx) = oneshot::channel(); + + let _ = self + .tx + .send(QmpActorMessage::QmpDoSend { command: command, arguments: arguments.clone(), tx: tx }) + .await; + + match rx.await { + Ok(res) => Ok(res), + Err(err) => Err(result::QmpError::GeneralFail), + } + } + /// Subscribes to a QMP event. /// Returns the recvieving arm of a [mpsc::channel] which will recieve /// the JSON event payloads when an event is sent by the server. pub async fn subscribe_to_event( @@ -226,7 +293,8 @@ impl QmpClient { ) -> result::Result> { let (tx, rx) = mpsc::channel(8); - let _ = self.tx + let _ = self + .tx .send(QmpActorMessage::QmpSubscribeToEvent { event_name: event_name, tx: tx, diff --git a/src/qmp/io.rs b/src/qmp/io.rs index 69476ce..0c81609 100644 --- a/src/qmp/io.rs +++ b/src/qmp/io.rs @@ -1,3 +1,5 @@ +//! Low-level IO routines + use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufStream}; use super::result; @@ -27,9 +29,13 @@ pub async fn write_qmp_line( where S: AsyncReadExt + AsyncWriteExt + Unpin + Send, { - let delimited = format!("{}\n", string); + // rough but whatever + let delimited = format!("{}\r\n", string); stream.write_all(delimited.as_bytes()).await?; + + // FIXME: We probably shouldn't do this all the time but I mean + // QMP isn't that high bandwidth so it's probably fine. stream.flush().await?; Ok(()) diff --git a/src/qmp/mod.rs b/src/qmp/mod.rs index 72815e5..df95a50 100644 --- a/src/qmp/mod.rs +++ b/src/qmp/mod.rs @@ -1,3 +1,5 @@ +//! QMP client lib + pub(crate) mod types; pub mod result; pub(crate) mod io; diff --git a/src/qmp/result.rs b/src/qmp/result.rs index 92dcf9c..50c9111 100644 --- a/src/qmp/result.rs +++ b/src/qmp/result.rs @@ -1,6 +1,7 @@ use thiserror::Error; /// QMP library error. +/// FIXME: more arms #[derive(Error, Debug)] pub enum QmpError { #[error(transparent)] diff --git a/src/qmp/types.rs b/src/qmp/types.rs index efd3917..96a4fc9 100644 --- a/src/qmp/types.rs +++ b/src/qmp/types.rs @@ -25,4 +25,16 @@ pub struct QmpHandshakeQMP { pub struct QmpHandshake { #[serde(rename = "QMP")] pub qmp: QmpHandshakeQMP, +} + +#[derive(Serialize, Debug)] +pub struct QmpExecute { + pub execute: String, + + /// Really, this is defined in the QMP as *any* JSON value, + /// but we only use a u32 here. + pub id: u32, + + #[serde(skip_serializing_if = "Option::is_none")] + pub arguments: Option } \ No newline at end of file