diff --git a/src/main.rs b/src/main.rs index 091c2ed..733d560 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,6 +1,6 @@ -use cvm12_rs::{guac, qmp}; +use cvm12_rs::qmp; -use tokio::net::{UnixSocket, UnixStream}; +use tokio::net::UnixStream; #[tokio::main] async fn main() { @@ -23,7 +23,7 @@ async fn main() { // Handshake QMP client.handshake().await.expect("Could not handshake QMP"); - println!("handshook QMP"); + println!("Connected to QMP server"); // let's try to get all STOP events from QMP now. let mut rx = client.subscribe_to_event("STOP".into()).await.unwrap(); diff --git a/src/qmp/actor.rs b/src/qmp/actor.rs index f023372..8d02f76 100644 --- a/src/qmp/actor.rs +++ b/src/qmp/actor.rs @@ -11,6 +11,7 @@ use super::{ types::{self, QmpExecute, QmpHandshake}, }; +#[derive(PartialEq, Eq)] enum QmpActorState { /// Disconnected. Disconnected, @@ -124,6 +125,10 @@ where }, QmpActorMessage::QmpDoSend { command, arguments, tx } => { + if self.state != QmpActorState::Ready { + return Err(result::QmpError::InvalidState); + } + self.response_tx.insert(self.last_response_id, Some(tx)); self.send_execute(command, &arguments).await?; self.last_response_id += 1; @@ -138,10 +143,6 @@ where async fn handle_qmp_read(&mut self, line: &String) -> result::Result<()> { let res = serde_json::from_str::(line.as_str())?; - - - - // Check if this is an event if let Some(event) = res.get("event") { let str = String::from(event.as_str().unwrap()); @@ -151,11 +152,11 @@ where } } - // Check for a execute resposne if let Some(id) = res.get("id") { let id = id.as_u64().unwrap() as u32; - // Send the response + // Send the response to the oneshot channel that's expecting it then remove + // the key from the HashMap, since it's no longer needed. if let Some(txopt) = self.response_tx.get_mut(&id) { let tx = txopt.take().unwrap(); let _ = tx.send(res.clone()); diff --git a/src/qmp/result.rs b/src/qmp/result.rs index 50c9111..932615c 100644 --- a/src/qmp/result.rs +++ b/src/qmp/result.rs @@ -13,6 +13,9 @@ pub enum QmpError { #[error(transparent)] JsonError(#[from] serde_json::Error), + #[error("invalid state for operation")] + InvalidState, + #[error("general failure")] GeneralFail, }