qmp: Implement disconnection fully

the "testcase" has also added a test for both multiple producers (client handles to the same actor) and closing, which works as you'd expect.
This commit is contained in:
Lily Tsuru 2024-05-09 04:25:09 -04:00
parent c4caa5f643
commit 0ecbda47bb
2 changed files with 69 additions and 36 deletions

View file

@ -2,47 +2,64 @@ use cvm12_rs::qmp;
use tokio::net::UnixStream; use tokio::net::UnixStream;
use std::time::Duration;
use tokio::time;
#[tokio::main] #[tokio::main]
async fn main() { async fn main() {
let subscriber = tracing_subscriber::FmtSubscriber::builder()
.with_max_level(tracing::Level::TRACE)
.finish();
let subscriber = tracing_subscriber::FmtSubscriber::builder() tracing::subscriber::set_global_default(subscriber).expect("You Banned");
.with_max_level(tracing::Level::TRACE)
.finish();
tracing::subscriber::set_global_default(subscriber).expect("You Banned"); // Create a stream to connect
// Create a stream to connect
let stream = UnixStream::connect("/home/lily/vms/xpiss/qmp.sock") let stream = UnixStream::connect("/home/lily/vms/xpiss/qmp.sock")
.await .await
.expect("Could not connect"); .expect("Could not connect");
let client = qmp::QmpClient::new(stream);
let client = qmp::QmpClient::new(stream); // Handshake QMP
client.handshake().await.expect("Could not handshake QMP");
// Handshake QMP println!("Connected to QMP server");
client.handshake().await.expect("Could not handshake QMP");
println!("Connected to QMP server"); // Create a copy of the client handle so we can issue a command in this other task.
// This other task waits 10 seconds and closes the client, which causes the actor to stop.
let copy = client.clone();
tokio::spawn(async move {
tokio::time::sleep(Duration::from_secs(10)).await;
println!("Closing client after 10 seconds");
copy.close().await.expect("Closing shouldn't fail");
});
println!("res {}", client.execute_hmp("info block".into()).await.expect("this shouldn't fail")); println!(
"res {}",
client
.execute_hmp("info block".into())
.await
.expect("this shouldn't fail")
);
// let's try to get all STOP events from QMP now. // let's try to get all STOP events from QMP now.
let mut rx = client.subscribe_to_event("STOP".into()).await.unwrap(); let mut rx = client.subscribe_to_event("STOP".into()).await.unwrap();
// If this worked, then now we can recieve all the events. // If this worked, then now we can recieve all the events.
// This code here allows running a VM with the -no-shutdown option, // This code here allows running a VM with the -no-shutdown option,
// automatially resetting it on shutdown. // automatially resetting it on shutdown.
while let Some(message) = rx.recv().await { while let Some(message) = rx.recv().await {
println!("Got stop event! {:?}", message); println!("Got stop event! {:?}", message);
let res1 = client.execute("system_reset".into(), None).await.expect("FUCK"); let res1 = client
let res2 = client.execute("cont".into(), None).await.expect("FUCK"); .execute("system_reset".into(), None)
.await
.expect("FUCK");
let res2 = client.execute("cont".into(), None).await.expect("FUCK");
println!("Result of running: {:?}, {:?}", res1, res2); println!("Result of running: {:?}, {:?}", res1, res2);
} }
//println!("Hello, world!"); //println!("Hello, world!");
} }

View file

@ -111,7 +111,16 @@ where
async fn handle_message(&mut self, msg: QmpActorMessage) -> result::Result<()> { async fn handle_message(&mut self, msg: QmpActorMessage) -> result::Result<()> {
match msg { match msg {
QmpActorMessage::QmpDoClose => { QmpActorMessage::QmpDoClose => {
// FIXME: Actually close things. self.state = QmpActorState::Disconnected;
self.stream.shutdown().await?;
// Clear all stuff. This will drop all channels and stop them safelt
self.response_tx.clear();
self.event_tx.clear();
// Close our RX which will ultimately cause us to end
self.rx.close();
} }
QmpActorMessage::QmpDoHandshake { tx } => { QmpActorMessage::QmpDoHandshake { tx } => {
@ -122,12 +131,16 @@ where
// this can only really fail in a way where it panics i'm pretty sure // this can only really fail in a way where it panics i'm pretty sure
QmpActorMessage::QmpSubscribeToEvent { event_name, tx } => { QmpActorMessage::QmpSubscribeToEvent { event_name, tx } => {
self.event_tx.insert(event_name.to_uppercase(), tx); self.event_tx.insert(event_name.to_uppercase(), tx);
}, }
QmpActorMessage::QmpDoSend { command, arguments, tx } => { QmpActorMessage::QmpDoSend {
if self.state != QmpActorState::Ready { command,
return Err(result::QmpError::InvalidState); arguments,
} tx,
} => {
if self.state != QmpActorState::Ready {
return Err(result::QmpError::InvalidState);
}
self.response_tx.insert(self.last_response_id, Some(tx)); self.response_tx.insert(self.last_response_id, Some(tx));
self.send_execute(command, &arguments).await?; self.send_execute(command, &arguments).await?;
@ -156,7 +169,7 @@ where
let id = id.as_u64().unwrap() as u32; let id = id.as_u64().unwrap() as u32;
// Send the response to the oneshot channel that's expecting it then remove // Send the response to the oneshot channel that's expecting it then remove
// the key from the HashMap, since it's no longer needed. // the key from the HashMap, since it's no longer needed.
if let Some(txopt) = self.response_tx.get_mut(&id) { if let Some(txopt) = self.response_tx.get_mut(&id) {
let tx = txopt.take().unwrap(); let tx = txopt.take().unwrap();
let _ = tx.send(res.clone()); let _ = tx.send(res.clone());
@ -164,16 +177,19 @@ where
} }
} }
Ok(()) Ok(())
} }
async fn send_execute(&mut self, command: String, arguments: &Option<serde_json::Value>) -> result::Result<()> { async fn send_execute(
&mut self,
command: String,
arguments: &Option<serde_json::Value>,
) -> result::Result<()> {
// Send // Send
let execute = QmpExecute { let execute = QmpExecute {
execute: command, execute: command,
id: self.last_response_id, id: self.last_response_id,
arguments: arguments.clone() arguments: arguments.clone(),
}; };
let serialized = serde_json::ser::to_string(&execute)?; let serialized = serde_json::ser::to_string(&execute)?;
@ -216,11 +232,11 @@ where
{ {
let (tx, rx) = mpsc::channel(8); let (tx, rx) = mpsc::channel(8);
let inner = QmpClientActor::new(socket, rx); let inner = QmpClientActor::new(socket, rx);
tokio::spawn(async move { tokio::spawn(async move {
inner.run().await.inspect_err(|err| { inner.run().await.inspect_err(|err| {
tracing::error!("error in actor runloop: {err}"); tracing::error!("error in actor runloop: {err}");
}); });
}); });
tx tx
} }
} }