qmp: gate off execute() calls until ready

This commit is contained in:
Lily Tsuru 2024-05-09 01:00:12 -04:00
parent 51c6356261
commit 0e017cd00d
3 changed files with 13 additions and 9 deletions

View file

@ -1,6 +1,6 @@
use cvm12_rs::{guac, qmp}; use cvm12_rs::qmp;
use tokio::net::{UnixSocket, UnixStream}; use tokio::net::UnixStream;
#[tokio::main] #[tokio::main]
async fn main() { async fn main() {
@ -23,7 +23,7 @@ async fn main() {
// Handshake QMP // Handshake QMP
client.handshake().await.expect("Could not handshake QMP"); client.handshake().await.expect("Could not handshake QMP");
println!("handshook QMP"); println!("Connected to QMP server");
// let's try to get all STOP events from QMP now. // let's try to get all STOP events from QMP now.
let mut rx = client.subscribe_to_event("STOP".into()).await.unwrap(); let mut rx = client.subscribe_to_event("STOP".into()).await.unwrap();

View file

@ -11,6 +11,7 @@ use super::{
types::{self, QmpExecute, QmpHandshake}, types::{self, QmpExecute, QmpHandshake},
}; };
#[derive(PartialEq, Eq)]
enum QmpActorState { enum QmpActorState {
/// Disconnected. /// Disconnected.
Disconnected, Disconnected,
@ -124,6 +125,10 @@ where
}, },
QmpActorMessage::QmpDoSend { command, arguments, tx } => { QmpActorMessage::QmpDoSend { command, arguments, tx } => {
if self.state != QmpActorState::Ready {
return Err(result::QmpError::InvalidState);
}
self.response_tx.insert(self.last_response_id, Some(tx)); self.response_tx.insert(self.last_response_id, Some(tx));
self.send_execute(command, &arguments).await?; self.send_execute(command, &arguments).await?;
self.last_response_id += 1; self.last_response_id += 1;
@ -138,10 +143,6 @@ where
async fn handle_qmp_read(&mut self, line: &String) -> result::Result<()> { async fn handle_qmp_read(&mut self, line: &String) -> result::Result<()> {
let res = serde_json::from_str::<serde_json::Value>(line.as_str())?; let res = serde_json::from_str::<serde_json::Value>(line.as_str())?;
// Check if this is an event
if let Some(event) = res.get("event") { if let Some(event) = res.get("event") {
let str = String::from(event.as_str().unwrap()); let str = String::from(event.as_str().unwrap());
@ -151,11 +152,11 @@ where
} }
} }
// Check for a execute resposne
if let Some(id) = res.get("id") { if let Some(id) = res.get("id") {
let id = id.as_u64().unwrap() as u32; let id = id.as_u64().unwrap() as u32;
// Send the response // Send the response to the oneshot channel that's expecting it then remove
// the key from the HashMap, since it's no longer needed.
if let Some(txopt) = self.response_tx.get_mut(&id) { if let Some(txopt) = self.response_tx.get_mut(&id) {
let tx = txopt.take().unwrap(); let tx = txopt.take().unwrap();
let _ = tx.send(res.clone()); let _ = tx.send(res.clone());

View file

@ -13,6 +13,9 @@ pub enum QmpError {
#[error(transparent)] #[error(transparent)]
JsonError(#[from] serde_json::Error), JsonError(#[from] serde_json::Error),
#[error("invalid state for operation")]
InvalidState,
#[error("general failure")] #[error("general failure")]
GeneralFail, GeneralFail,
} }