Compare commits
2 commits
0e017cd00d
...
0ecbda47bb
Author | SHA1 | Date | |
---|---|---|---|
0ecbda47bb | |||
c4caa5f643 |
3 changed files with 98 additions and 38 deletions
66
src/main.rs
66
src/main.rs
|
@ -2,44 +2,64 @@ use cvm12_rs::qmp;
|
|||
|
||||
use tokio::net::UnixStream;
|
||||
|
||||
use std::time::Duration;
|
||||
use tokio::time;
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() {
|
||||
let subscriber = tracing_subscriber::FmtSubscriber::builder()
|
||||
.with_max_level(tracing::Level::TRACE)
|
||||
.finish();
|
||||
|
||||
let subscriber = tracing_subscriber::FmtSubscriber::builder()
|
||||
.with_max_level(tracing::Level::TRACE)
|
||||
.finish();
|
||||
tracing::subscriber::set_global_default(subscriber).expect("You Banned");
|
||||
|
||||
tracing::subscriber::set_global_default(subscriber).expect("You Banned");
|
||||
|
||||
|
||||
// Create a stream to connect
|
||||
// Create a stream to connect
|
||||
let stream = UnixStream::connect("/home/lily/vms/xpiss/qmp.sock")
|
||||
.await
|
||||
.expect("Could not connect");
|
||||
|
||||
let client = qmp::QmpClient::new(stream);
|
||||
|
||||
let client = qmp::QmpClient::new(stream);
|
||||
// Handshake QMP
|
||||
client.handshake().await.expect("Could not handshake QMP");
|
||||
|
||||
// Handshake QMP
|
||||
client.handshake().await.expect("Could not handshake QMP");
|
||||
println!("Connected to QMP server");
|
||||
|
||||
println!("Connected to QMP server");
|
||||
// Create a copy of the client handle so we can issue a command in this other task.
|
||||
// This other task waits 10 seconds and closes the client, which causes the actor to stop.
|
||||
let copy = client.clone();
|
||||
|
||||
// let's try to get all STOP events from QMP now.
|
||||
let mut rx = client.subscribe_to_event("STOP".into()).await.unwrap();
|
||||
tokio::spawn(async move {
|
||||
tokio::time::sleep(Duration::from_secs(10)).await;
|
||||
println!("Closing client after 10 seconds");
|
||||
copy.close().await.expect("Closing shouldn't fail");
|
||||
});
|
||||
|
||||
// If this worked, then now we can recieve all the events.
|
||||
// This code here allows running a VM with the -no-shutdown option,
|
||||
// automatially resetting it on shutdown.
|
||||
while let Some(message) = rx.recv().await {
|
||||
println!("Got stop event! {:?}", message);
|
||||
println!(
|
||||
"res {}",
|
||||
client
|
||||
.execute_hmp("info block".into())
|
||||
.await
|
||||
.expect("this shouldn't fail")
|
||||
);
|
||||
|
||||
let res1 = client.execute("system_reset".into(), None).await.expect("FUCK");
|
||||
let res2 = client.execute("cont".into(), None).await.expect("FUCK");
|
||||
// let's try to get all STOP events from QMP now.
|
||||
let mut rx = client.subscribe_to_event("STOP".into()).await.unwrap();
|
||||
|
||||
println!("Result of running: {:?}, {:?}", res1, res2);
|
||||
}
|
||||
|
||||
// If this worked, then now we can recieve all the events.
|
||||
// This code here allows running a VM with the -no-shutdown option,
|
||||
// automatially resetting it on shutdown.
|
||||
while let Some(message) = rx.recv().await {
|
||||
println!("Got stop event! {:?}", message);
|
||||
|
||||
let res1 = client
|
||||
.execute("system_reset".into(), None)
|
||||
.await
|
||||
.expect("FUCK");
|
||||
let res2 = client.execute("cont".into(), None).await.expect("FUCK");
|
||||
|
||||
println!("Result of running: {:?}, {:?}", res1, res2);
|
||||
}
|
||||
|
||||
//println!("Hello, world!");
|
||||
}
|
||||
|
|
|
@ -111,7 +111,16 @@ where
|
|||
async fn handle_message(&mut self, msg: QmpActorMessage) -> result::Result<()> {
|
||||
match msg {
|
||||
QmpActorMessage::QmpDoClose => {
|
||||
// FIXME: Actually close things.
|
||||
self.state = QmpActorState::Disconnected;
|
||||
|
||||
self.stream.shutdown().await?;
|
||||
|
||||
// Clear all stuff. This will drop all channels and stop them safelt
|
||||
self.response_tx.clear();
|
||||
self.event_tx.clear();
|
||||
|
||||
// Close our RX which will ultimately cause us to end
|
||||
self.rx.close();
|
||||
}
|
||||
|
||||
QmpActorMessage::QmpDoHandshake { tx } => {
|
||||
|
@ -122,12 +131,16 @@ where
|
|||
// this can only really fail in a way where it panics i'm pretty sure
|
||||
QmpActorMessage::QmpSubscribeToEvent { event_name, tx } => {
|
||||
self.event_tx.insert(event_name.to_uppercase(), tx);
|
||||
},
|
||||
}
|
||||
|
||||
QmpActorMessage::QmpDoSend { command, arguments, tx } => {
|
||||
if self.state != QmpActorState::Ready {
|
||||
return Err(result::QmpError::InvalidState);
|
||||
}
|
||||
QmpActorMessage::QmpDoSend {
|
||||
command,
|
||||
arguments,
|
||||
tx,
|
||||
} => {
|
||||
if self.state != QmpActorState::Ready {
|
||||
return Err(result::QmpError::InvalidState);
|
||||
}
|
||||
|
||||
self.response_tx.insert(self.last_response_id, Some(tx));
|
||||
self.send_execute(command, &arguments).await?;
|
||||
|
@ -156,7 +169,7 @@ where
|
|||
let id = id.as_u64().unwrap() as u32;
|
||||
|
||||
// Send the response to the oneshot channel that's expecting it then remove
|
||||
// the key from the HashMap, since it's no longer needed.
|
||||
// the key from the HashMap, since it's no longer needed.
|
||||
if let Some(txopt) = self.response_tx.get_mut(&id) {
|
||||
let tx = txopt.take().unwrap();
|
||||
let _ = tx.send(res.clone());
|
||||
|
@ -164,16 +177,19 @@ where
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn send_execute(&mut self, command: String, arguments: &Option<serde_json::Value>) -> result::Result<()> {
|
||||
async fn send_execute(
|
||||
&mut self,
|
||||
command: String,
|
||||
arguments: &Option<serde_json::Value>,
|
||||
) -> result::Result<()> {
|
||||
// Send
|
||||
let execute = QmpExecute {
|
||||
execute: command,
|
||||
id: self.last_response_id,
|
||||
arguments: arguments.clone()
|
||||
arguments: arguments.clone(),
|
||||
};
|
||||
|
||||
let serialized = serde_json::ser::to_string(&execute)?;
|
||||
|
@ -216,12 +232,11 @@ where
|
|||
{
|
||||
let (tx, rx) = mpsc::channel(8);
|
||||
let inner = QmpClientActor::new(socket, rx);
|
||||
tokio::spawn(async move {
|
||||
let res = inner.run().await;
|
||||
res.inspect_err(|err| {
|
||||
tokio::spawn(async move {
|
||||
inner.run().await.inspect_err(|err| {
|
||||
tracing::error!("error in actor runloop: {err}");
|
||||
});
|
||||
});
|
||||
});
|
||||
tx
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
//! QMP client.
|
||||
|
||||
use serde_json::json;
|
||||
use tokio::{
|
||||
io::{AsyncReadExt, AsyncWriteExt},
|
||||
sync::{mpsc, oneshot},
|
||||
|
@ -35,6 +36,16 @@ impl QmpClient {
|
|||
|
||||
// TODO handle errors properly
|
||||
|
||||
/// Closes this client.
|
||||
pub async fn close(&self) -> result::Result<()> {
|
||||
let _ = self
|
||||
.tx
|
||||
.send(actor::QmpActorMessage::QmpDoClose { })
|
||||
.await;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Handshakes QMP with the server. This **MUST** be the first operation you do.
|
||||
pub async fn handshake(&self) -> result::Result<types::QmpHandshake> {
|
||||
let (tx, rx) = oneshot::channel();
|
||||
|
||||
|
@ -48,7 +59,10 @@ impl QmpClient {
|
|||
}
|
||||
}
|
||||
|
||||
// FIXME: QMP errors should bubble out into an Rust error,
|
||||
// for now this isn't done.
|
||||
|
||||
/// Executes a single QMP command.
|
||||
pub async fn execute(&self, command: String, arguments: Option<serde_json::Value>) -> result::Result<serde_json::Value> {
|
||||
let (tx, rx) = oneshot::channel();
|
||||
|
||||
|
@ -62,7 +76,18 @@ impl QmpClient {
|
|||
Err(err) => Err(result::QmpError::GeneralFail),
|
||||
}
|
||||
}
|
||||
/// Subscribes to a QMP event.
|
||||
|
||||
/// Executes a HMP (QEMU Monitor) command.
|
||||
pub async fn execute_hmp(&self, hmp_line: String) -> result::Result<String> {
|
||||
let res = self.execute("human-monitor-command".into(), Some(json!({
|
||||
"command-line": hmp_line
|
||||
}))).await?;
|
||||
|
||||
// This is very nasty, but this is always a string.
|
||||
Ok(String::from(res["return"].as_str().unwrap()))
|
||||
}
|
||||
|
||||
/// Subscribes to a QMP event with the given event name.
|
||||
/// Returns the recvieving arm of a [mpsc::channel] which will recieve
|
||||
/// the JSON event payloads when an event is sent by the server.
|
||||
pub async fn subscribe_to_event(
|
||||
|
|
Loading…
Reference in a new issue