qmp: working qmp stuff

I really should probably maybehapsibly split this out into a cargo workspace but /shrug, I'll do it if this gets too annoying.

I've split out a lot of code into new modules, just to be cleaner about organization.
This commit is contained in:
Lily Tsuru 2024-05-08 21:33:09 -04:00
parent 92925ab4d2
commit 4cda248591
15 changed files with 1734 additions and 40 deletions

1273
Cargo.lock generated

File diff suppressed because it is too large Load diff

View file

@ -4,3 +4,14 @@ version = "0.1.0"
edition = "2021" edition = "2021"
[dependencies] [dependencies]
tokio = { version = "1.37.0", features = ["full"] }
axum = { version = "0.7.5", features = ["tokio"] }
turbojpeg = { version = "1.0.1" }
vnc-rs = "0.5.1"
toml_edit = "0.22.9"
tracing = "0.1.40"
tracing-subscriber = "0.3.18"
thiserror = "1.0.58"
serde = { version = "1.0.197", features = [ "derive" ] }
serde_json = "1.0.115"

View file

@ -6,39 +6,6 @@ Stopgap part 4. Mostly made as a experiment, and maybe for production use?.
There is none, at the moment. Once there is, there will be actual documentation here. There is none, at the moment. Once there is, there will be actual documentation here.
## Noodling for configuration
collabvm.toml
```toml
[collabvm]
listen = "127.0.0.1:3000" # or /run/cvm.sock ? if warp supports uds i guess
# optional
vm_directory = "vms"
# nested table. you can define it non-nested if you want
mod_permissions = { xxx = true, xyz = false }
mod_password_hash = "argon2 hash"
admin_password_hash = "argon2 hash"
# applied to vms by default unless they override
default_motd = "hice powered vms"
ban_command = "sexy $user $ip"
```
vms/vm1.toml
```toml
[vm]
name = "Window XP have honse (VM 1)"
# cvm1.2-rs will automatically append parameters as needed
command_line = "./vm1.lvm start"
motd = "YOU HAVE FUCKER GOOGLE FROM 999999999.. Banned!"
```
# Building # Building
`cargo b --release` `cargo b --release`

29
doc/collabvm.toml Normal file
View file

@ -0,0 +1,29 @@
listen = "127.0.0.1:3000" # or /run/cvm.sock ? if warp supports uds i guess
# optional
vm_directory = "vms"
# nested table. you can define it non-nested if you want
[mod_permissions]
xxx = false
xyz = true
# passwords hash
mod_password_hash = "md5 hash of password"
admin_password_hash = "md5 hash of password"
# Command to run to ban a user
# $user is replaced with the user, and
# $ip is replaced with the ip of the user
ban_command = "sexy $user $ip"
# defaults for all VMs
[default_permissions]
turns_enabled = true
votes_enabled = true
turn_time = 18
vote_time = 60
vote_cooldown_time = 120
# applied to all vms by default unless they override it in their toml file
motd = "Welcome to my UserVM!"

16
doc/vm.toml Normal file
View file

@ -0,0 +1,16 @@
# A descriptive name.
name = "Windows XP SP3 x86 (MyVM 1)"
# cvm1.2-rs will automatically append parameters as needed to the command line
command_line = "/uvm/vms/vm1.lvm start"
# (Optional) Override the MOTD set in the configuration for this VM only.
motd = "VM 1 MOTD"
# more overrides.
#turns_enabled = true
#votes_enabled = true
#turn_time = 18
#vote_time = 60
#vote_cooldown_time = 120

View file

@ -5,6 +5,8 @@ use std::fmt;
// type of a guac message // type of a guac message
pub type Elements = Vec<String>; pub type Elements = Vec<String>;
// FIXME: thiserror, please.
/// Errors during decoding /// Errors during decoding
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub enum DecodeError { pub enum DecodeError {
@ -90,7 +92,8 @@ pub fn decode_instruction(element_string: &String) -> DecodeResult<Elements> {
// require a substring (or a slice, but if you can generate a slice bound, // require a substring (or a slice, but if you can generate a slice bound,
// you can also just scan the value in by hand.....) // you can also just scan the value in by hand.....)
// //
// We bound this anyways so it's fine(TM) // We bound this anyways and do quite the checks, so even though it's not great,
// it should be generally fine (TM).
loop { loop {
let c = chars[current_position]; let c = chars[current_position];
@ -106,10 +109,14 @@ pub fn decode_instruction(element_string: &String) -> DecodeResult<Elements> {
current_position += 1; current_position += 1;
} }
// eat the '.'; our integer scanning ensures // Eat the '.' seperating the size and the element data;
// we only get here in that case // our integer scanning ensures we only get here in the case that this is actually the '.'
// character.
current_position += 1; current_position += 1;
// Make sure the element size doesn't overflow the decode policy
// or the size of the whole instruction.
if element_size >= policy.max_element_size() { if element_size >= policy.max_element_size() {
return Err(DecodeError::ElementTooLong); return Err(DecodeError::ElementTooLong);
} }

View file

@ -1 +1,4 @@
pub mod guac; pub mod guac;
pub mod qmp;
pub mod qemuvm;
pub mod vm;

View file

@ -1,5 +1,36 @@
use cvm12_rs::guac; use cvm12_rs::{guac, qmp};
fn main() { use tokio::net::{UnixSocket, UnixStream};
println!("Hello, world!");
#[tokio::main]
async fn main() {
let subscriber = tracing_subscriber::FmtSubscriber::builder()
.with_max_level(tracing::Level::TRACE)
.finish();
tracing::subscriber::set_global_default(subscriber).expect("You Banned");
// This is test code..
let stream = UnixStream::connect("/home/lily/vms/xpiss/qmp.sock")
.await
.expect("Could not connect");
let client = qmp::QmpClient::new(stream);
client.handshake().await.expect("Could not handshake QMP");
println!("handshook QMP");
// let's try to get all RESET events
let mut rx = client.subscribe_to_event("RESET".into()).await.unwrap();
while let Some(message) = rx.recv().await {
println!("Got event! {:?}", message);
}
//println!("Hello, world!");
} }

21
src/qemuvm.rs Normal file
View file

@ -0,0 +1,21 @@
// hacked together
pub struct QemuVM {
}
impl QemuVM {
// fn new()->Self; // spawns manager
//
}
impl Drop for QemuVM {
fn drop(&mut self) {
}
}

239
src/qmp/client.rs Normal file
View file

@ -0,0 +1,239 @@
//! QMP client.
//! Uses a broadcast channel.
use std::collections::HashMap;
use std::time::Duration;
// TODO
use tokio::{
io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufStream},
sync::{mpsc, mpsc::error::TryRecvError, oneshot},
};
use super::{
result,
io,
types::{self, QmpHandshake},
};
enum QmpClientState {
/// Disconnected.
Disconnected,
ConnectedToServer,
/// The server
HandshakingServer,
/// Handshake passed successfully and we're ready.
Ready,
}
#[derive(Debug)]
pub enum QmpActorMessage {
/// Close
DoClose,
QmpDoHandshake {
/// The oneshot response.
tx: oneshot::Sender<super::result::Result<QmpHandshake>>,
},
/// Used to send a QMP message.
QmpDoSend {
/// The oneshot response.
tx: oneshot::Sender<serde_json::Value>,
},
/// Used to subscibe to a QMP event
QmpSubscribeToEvent {
event_name: String,
tx: mpsc::Sender<serde_json::Value>,
},
}
struct QmpClientActor<S>
where
S: AsyncReadExt + AsyncWriteExt + Unpin + Send,
{
stream: BufStream<S>,
state: QmpClientState,
rx: mpsc::Receiver<QmpActorMessage>,
// used to spit out events to subscribed clients
event_tx: HashMap<String, mpsc::Sender<serde_json::Value>>,
/// Used for QMP execute messages. This allows us to have multiple commands
/// in flight, and to manually send to all of them. Pretty nifty.
last_response_id: u32,
/// All in flight responses.
response_tx: HashMap<u32, oneshot::Sender<serde_json::Value>>,
}
impl<S> QmpClientActor<S>
where
S: AsyncReadExt + AsyncWriteExt + Unpin + Send,
{
fn new(stream: S, rx: mpsc::Receiver<QmpActorMessage>) -> Self {
Self {
stream: BufStream::new(stream),
state: QmpClientState::ConnectedToServer,
rx: rx,
event_tx: HashMap::new(),
last_response_id: 0,
response_tx: HashMap::new(),
}
}
async fn handshake(&mut self) -> result::Result<types::QmpHandshake> {
self.state = QmpClientState::HandshakingServer;
let handshake_as_string = io::read_qmp_line(&mut self.stream).await?;
let handshake_json =
serde_json::from_str::<types::QmpHandshake>(handshake_as_string.as_str())?;
// Now let's send the handshake. If this errors we're done anyways
let handshake_response = serde_json::json!({
"execute": "qmp_capabilities"
});
let serialized = serde_json::ser::to_string(&handshake_response)?;
io::write_qmp_line(&mut self.stream, &serialized).await?;
// Read what the server responded back
// FIXME: Make sure it's not an error.
let _handshake_server_response_as_string = io::read_qmp_line(&mut self.stream).await?;
// let handshake_server_response_json =
// serde_json::from_str::<serde_json::Value>(handshake_server_response_as_string.as_str())?;
self.state = QmpClientState::Ready;
Ok(handshake_json)
}
async fn handle_message(&mut self, msg: QmpActorMessage) {
match msg {
QmpActorMessage::DoClose => {
println!("Byes byes!!!")
}
QmpActorMessage::QmpDoHandshake { tx } => {
let handshake = self.handshake().await;
let _ = tx.send(handshake);
}
// this can only really fail in a way where it panics i'm pretty sure
QmpActorMessage::QmpSubscribeToEvent { event_name, tx } => {
self.event_tx.insert(event_name.to_uppercase(), tx);
}
_ => tracing::error!("Unknown message"),
}
}
async fn run(mut self) {
loop {
tokio::select! {
str = io::read_qmp_line(&mut self.stream) => {
match str {
Ok(val) => {
let res = serde_json::from_str::<serde_json::Value>(val.as_str()).expect("why it not json fucker bing");
// Check if this is an event
if let Some(event) = res.get("event") {
let str = String::from(event.as_str().unwrap());
// Send the event to the subscriber
if let Some(tx) = self.event_tx.get(&str) {
let _ = tx.send(res).await;
}
}
}
Err(err) => { println!("stream error {err}"); break }
}
},
msg_opt = self.rx.recv() => {
if let Some(msg) = msg_opt {
self.handle_message(msg).await;
} else {
break
}
}
else => break
}
}
}
}
/// A client for the Qemu Machine Protocol (also known as QMP).
/// Generic so it works with any Tokio stream type (which fits QEMU's ability to run
/// QMP across many protocols).
///
/// This struct is essentially a handle to an actor.
/// I might maybe refactor this or make some generic "actor" thingmabob but for now
/// it's all in here.
#[derive(Clone)]
pub struct QmpClient {
tx: mpsc::Sender<QmpActorMessage>,
}
impl QmpClient {
/// Creates a new QMP client.
///
/// ## Notes
/// The stream provided **MUST NOT** have been shut down before being provided (besides, why would you even do that?)
pub fn new<S>(socket: S) -> Self
where
S: AsyncReadExt + AsyncWriteExt + Unpin + Send + 'static,
{
let (tx, rx) = mpsc::channel(8);
let inner = QmpClientActor::new(socket, rx);
tokio::spawn(async move { inner.run().await });
Self { tx }
}
// TODO handle errors properly
pub async fn handshake(&self) -> result::Result<types::QmpHandshake> {
let (tx, rx) = oneshot::channel();
let _ = self.tx
.send(QmpActorMessage::QmpDoHandshake { tx: tx })
.await;
match rx.await {
Ok(res) => res,
Err(err) => {
println!("???? {err}");
Err(result::QmpError::GeneralFail)
}
}
}
/// Subscribes to a QMP event.
/// Returns the recvieving arm of a [mpsc::channel] which will recieve
/// the JSON event payloads when an event is sent by the server.
pub async fn subscribe_to_event(
&self,
event_name: String,
) -> result::Result<mpsc::Receiver<serde_json::Value>> {
let (tx, rx) = mpsc::channel(8);
let _ = self.tx
.send(QmpActorMessage::QmpSubscribeToEvent {
event_name: event_name,
tx: tx,
})
.await;
// give the user their rx
Ok(rx)
}
}

36
src/qmp/io.rs Normal file
View file

@ -0,0 +1,36 @@
use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufStream};
use super::result;
/// Reads a single QMP delimited line.
pub async fn read_qmp_line<S>(stream: &mut BufStream<S>) -> result::Result<String>
where
S: AsyncReadExt + AsyncWriteExt + Unpin + Send,
{
let mut buf = String::new();
let n = stream.read_line(&mut buf).await?;
// should probably actually disconnect
if n == 0 {
return Err(result::QmpError::IoError(std::io::Error::from(
std::io::ErrorKind::UnexpectedEof,
)));
}
Ok(buf)
}
pub async fn write_qmp_line<S>(
stream: &mut BufStream<S>,
string: &String,
) -> result::Result<()>
where
S: AsyncReadExt + AsyncWriteExt + Unpin + Send,
{
let delimited = format!("{}\n", string);
stream.write_all(delimited.as_bytes()).await?;
stream.flush().await?;
Ok(())
}

5
src/qmp/mod.rs Normal file
View file

@ -0,0 +1,5 @@
pub(crate) mod types;
pub mod result;
pub(crate) mod io;
pub mod client;
pub use client::*;

19
src/qmp/result.rs Normal file
View file

@ -0,0 +1,19 @@
use thiserror::Error;
/// QMP library error.
#[derive(Error, Debug)]
pub enum QmpError {
#[error(transparent)]
IoError(#[from] tokio::io::Error),
#[error(transparent)]
DecodingError(#[from] std::string::FromUtf8Error),
#[error(transparent)]
JsonError(#[from] serde_json::Error),
#[error("general failure")]
GeneralFail,
}
pub type Result<T> = std::result::Result<T, QmpError>;

28
src/qmp/types.rs Normal file
View file

@ -0,0 +1,28 @@
//! Strong typed QMP messages (For the most part)...
use serde::{Deserialize, Serialize};
#[derive(Deserialize, Debug)]
pub struct QmpHandshakeSemVer {
pub major: i32,
pub minor: i32,
pub micro: i32,
}
#[derive(Deserialize, Debug)]
pub struct QmpHandshakeQMPVersion {
pub qemu: QmpHandshakeSemVer,
pub package: String,
}
/// "QMP" object
#[derive(Deserialize, Debug)]
pub struct QmpHandshakeQMP {
pub version: QmpHandshakeQMPVersion,
pub capabilities: Vec<String>,
}
#[derive(Deserialize, Debug)]
pub struct QmpHandshake {
#[serde(rename = "QMP")]
pub qmp: QmpHandshakeQMP,
}

9
src/vm.rs Normal file
View file

@ -0,0 +1,9 @@
struct VMUser {
}
// holds a qemuvm::QemuVM, (maybe) broad
pub struct VM {
}