From 0ecbda47bb55a0e9b9ee6fb247d007ac488a2563 Mon Sep 17 00:00:00 2001 From: modeco80 Date: Thu, 9 May 2024 04:25:09 -0400 Subject: [PATCH] qmp: Implement disconnection fully the "testcase" has also added a test for both multiple producers (client handles to the same actor) and closing, which works as you'd expect. --- src/main.rs | 65 ++++++++++++++++++++++++++++++------------------ src/qmp/actor.rs | 40 ++++++++++++++++++++--------- 2 files changed, 69 insertions(+), 36 deletions(-) diff --git a/src/main.rs b/src/main.rs index b3bb4fb..8e27284 100644 --- a/src/main.rs +++ b/src/main.rs @@ -2,47 +2,64 @@ use cvm12_rs::qmp; use tokio::net::UnixStream; +use std::time::Duration; +use tokio::time; + #[tokio::main] async fn main() { + let subscriber = tracing_subscriber::FmtSubscriber::builder() + .with_max_level(tracing::Level::TRACE) + .finish(); - let subscriber = tracing_subscriber::FmtSubscriber::builder() - .with_max_level(tracing::Level::TRACE) - .finish(); + tracing::subscriber::set_global_default(subscriber).expect("You Banned"); - tracing::subscriber::set_global_default(subscriber).expect("You Banned"); - - - // Create a stream to connect + // Create a stream to connect let stream = UnixStream::connect("/home/lily/vms/xpiss/qmp.sock") .await .expect("Could not connect"); + let client = qmp::QmpClient::new(stream); - let client = qmp::QmpClient::new(stream); + // Handshake QMP + client.handshake().await.expect("Could not handshake QMP"); - // Handshake QMP - client.handshake().await.expect("Could not handshake QMP"); + println!("Connected to QMP server"); - println!("Connected to QMP server"); + // Create a copy of the client handle so we can issue a command in this other task. + // This other task waits 10 seconds and closes the client, which causes the actor to stop. + let copy = client.clone(); + tokio::spawn(async move { + tokio::time::sleep(Duration::from_secs(10)).await; + println!("Closing client after 10 seconds"); + copy.close().await.expect("Closing shouldn't fail"); + }); - println!("res {}", client.execute_hmp("info block".into()).await.expect("this shouldn't fail")); + println!( + "res {}", + client + .execute_hmp("info block".into()) + .await + .expect("this shouldn't fail") + ); - // let's try to get all STOP events from QMP now. - let mut rx = client.subscribe_to_event("STOP".into()).await.unwrap(); + // let's try to get all STOP events from QMP now. + let mut rx = client.subscribe_to_event("STOP".into()).await.unwrap(); - // If this worked, then now we can recieve all the events. - // This code here allows running a VM with the -no-shutdown option, - // automatially resetting it on shutdown. - while let Some(message) = rx.recv().await { - println!("Got stop event! {:?}", message); + // If this worked, then now we can recieve all the events. + // This code here allows running a VM with the -no-shutdown option, + // automatially resetting it on shutdown. + while let Some(message) = rx.recv().await { + println!("Got stop event! {:?}", message); - let res1 = client.execute("system_reset".into(), None).await.expect("FUCK"); - let res2 = client.execute("cont".into(), None).await.expect("FUCK"); + let res1 = client + .execute("system_reset".into(), None) + .await + .expect("FUCK"); + let res2 = client.execute("cont".into(), None).await.expect("FUCK"); - println!("Result of running: {:?}, {:?}", res1, res2); - } - + println!("Result of running: {:?}, {:?}", res1, res2); + } //println!("Hello, world!"); } diff --git a/src/qmp/actor.rs b/src/qmp/actor.rs index ab3a889..81d0a2f 100644 --- a/src/qmp/actor.rs +++ b/src/qmp/actor.rs @@ -111,7 +111,16 @@ where async fn handle_message(&mut self, msg: QmpActorMessage) -> result::Result<()> { match msg { QmpActorMessage::QmpDoClose => { - // FIXME: Actually close things. + self.state = QmpActorState::Disconnected; + + self.stream.shutdown().await?; + + // Clear all stuff. This will drop all channels and stop them safelt + self.response_tx.clear(); + self.event_tx.clear(); + + // Close our RX which will ultimately cause us to end + self.rx.close(); } QmpActorMessage::QmpDoHandshake { tx } => { @@ -122,12 +131,16 @@ where // this can only really fail in a way where it panics i'm pretty sure QmpActorMessage::QmpSubscribeToEvent { event_name, tx } => { self.event_tx.insert(event_name.to_uppercase(), tx); - }, + } - QmpActorMessage::QmpDoSend { command, arguments, tx } => { - if self.state != QmpActorState::Ready { - return Err(result::QmpError::InvalidState); - } + QmpActorMessage::QmpDoSend { + command, + arguments, + tx, + } => { + if self.state != QmpActorState::Ready { + return Err(result::QmpError::InvalidState); + } self.response_tx.insert(self.last_response_id, Some(tx)); self.send_execute(command, &arguments).await?; @@ -156,7 +169,7 @@ where let id = id.as_u64().unwrap() as u32; // Send the response to the oneshot channel that's expecting it then remove - // the key from the HashMap, since it's no longer needed. + // the key from the HashMap, since it's no longer needed. if let Some(txopt) = self.response_tx.get_mut(&id) { let tx = txopt.take().unwrap(); let _ = tx.send(res.clone()); @@ -164,16 +177,19 @@ where } } - Ok(()) } - async fn send_execute(&mut self, command: String, arguments: &Option) -> result::Result<()> { + async fn send_execute( + &mut self, + command: String, + arguments: &Option, + ) -> result::Result<()> { // Send let execute = QmpExecute { execute: command, id: self.last_response_id, - arguments: arguments.clone() + arguments: arguments.clone(), }; let serialized = serde_json::ser::to_string(&execute)?; @@ -216,11 +232,11 @@ where { let (tx, rx) = mpsc::channel(8); let inner = QmpClientActor::new(socket, rx); - tokio::spawn(async move { + tokio::spawn(async move { inner.run().await.inspect_err(|err| { tracing::error!("error in actor runloop: {err}"); }); - }); + }); tx } }