Compare commits

...

4 commits

Author SHA1 Message Date
0e017cd00d qmp: gate off execute() calls until ready 2024-05-09 01:00:12 -04:00
51c6356261 qmp: split actor into new actor module
logic seperation, bla bla.
2024-05-09 00:55:14 -04:00
a37d2468c2 qmp: cleanup pass + executing commands works
Some more helpers (like, `execute_hmp("info block".into())` might be nice to add, for now this works.
2024-05-09 00:48:42 -04:00
4cda248591 qmp: working qmp stuff
I really should probably maybehapsibly split this out into a cargo workspace but /shrug, I'll do it if this gets too annoying.

I've split out a lot of code into new modules, just to be cleaner about organization.
2024-05-08 21:33:09 -04:00
16 changed files with 1841 additions and 40 deletions

1273
Cargo.lock generated

File diff suppressed because it is too large Load diff

View file

@ -4,3 +4,14 @@ version = "0.1.0"
edition = "2021"
[dependencies]
tokio = { version = "1.37.0", features = ["full"] }
axum = { version = "0.7.5", features = ["tokio"] }
turbojpeg = { version = "1.0.1" }
vnc-rs = "0.5.1"
toml_edit = "0.22.9"
tracing = "0.1.40"
tracing-subscriber = "0.3.18"
thiserror = "1.0.58"
serde = { version = "1.0.197", features = [ "derive" ] }
serde_json = "1.0.115"

View file

@ -6,39 +6,6 @@ Stopgap part 4. Mostly made as a experiment, and maybe for production use?.
There is none, at the moment. Once there is, there will be actual documentation here.
## Noodling for configuration
collabvm.toml
```toml
[collabvm]
listen = "127.0.0.1:3000" # or /run/cvm.sock ? if warp supports uds i guess
# optional
vm_directory = "vms"
# nested table. you can define it non-nested if you want
mod_permissions = { xxx = true, xyz = false }
mod_password_hash = "argon2 hash"
admin_password_hash = "argon2 hash"
# applied to vms by default unless they override
default_motd = "hice powered vms"
ban_command = "sexy $user $ip"
```
vms/vm1.toml
```toml
[vm]
name = "Window XP have honse (VM 1)"
# cvm1.2-rs will automatically append parameters as needed
command_line = "./vm1.lvm start"
motd = "YOU HAVE FUCKER GOOGLE FROM 999999999.. Banned!"
```
# Building
`cargo b --release`

29
doc/collabvm.toml Normal file
View file

@ -0,0 +1,29 @@
listen = "127.0.0.1:3000" # or /run/cvm.sock ? if warp supports uds i guess
# optional
vm_directory = "vms"
# nested table. you can define it non-nested if you want
[mod_permissions]
xxx = false
xyz = true
# passwords hash
mod_password_hash = "md5 hash of password"
admin_password_hash = "md5 hash of password"
# Command to run to ban a user
# $user is replaced with the user, and
# $ip is replaced with the ip of the user
ban_command = "sexy $user $ip"
# defaults for all VMs
[default_permissions]
turns_enabled = true
votes_enabled = true
turn_time = 18
vote_time = 60
vote_cooldown_time = 120
# applied to all vms by default unless they override it in their toml file
motd = "Welcome to my UserVM!"

16
doc/vm.toml Normal file
View file

@ -0,0 +1,16 @@
# A descriptive name.
name = "Windows XP SP3 x86 (MyVM 1)"
# cvm1.2-rs will automatically append parameters as needed to the command line
command_line = "/uvm/vms/vm1.lvm start"
# (Optional) Override the MOTD set in the configuration for this VM only.
motd = "VM 1 MOTD"
# more overrides.
#turns_enabled = true
#votes_enabled = true
#turn_time = 18
#vote_time = 60
#vote_cooldown_time = 120

View file

@ -5,6 +5,8 @@ use std::fmt;
// type of a guac message
pub type Elements = Vec<String>;
// FIXME: thiserror, please.
/// Errors during decoding
#[derive(Debug, Clone)]
pub enum DecodeError {
@ -90,7 +92,8 @@ pub fn decode_instruction(element_string: &String) -> DecodeResult<Elements> {
// require a substring (or a slice, but if you can generate a slice bound,
// you can also just scan the value in by hand.....)
//
// We bound this anyways so it's fine(TM)
// We bound this anyways and do quite the checks, so even though it's not great,
// it should be generally fine (TM).
loop {
let c = chars[current_position];
@ -106,10 +109,14 @@ pub fn decode_instruction(element_string: &String) -> DecodeResult<Elements> {
current_position += 1;
}
// eat the '.'; our integer scanning ensures
// we only get here in that case
// Eat the '.' seperating the size and the element data;
// our integer scanning ensures we only get here in the case that this is actually the '.'
// character.
current_position += 1;
// Make sure the element size doesn't overflow the decode policy
// or the size of the whole instruction.
if element_size >= policy.max_element_size() {
return Err(DecodeError::ElementTooLong);
}

View file

@ -1 +1,4 @@
pub mod guac;
pub mod guac;
pub mod qmp;
pub mod qemuvm;
pub mod vm;

View file

@ -1,5 +1,45 @@
use cvm12_rs::guac;
use cvm12_rs::qmp;
fn main() {
println!("Hello, world!");
use tokio::net::UnixStream;
#[tokio::main]
async fn main() {
let subscriber = tracing_subscriber::FmtSubscriber::builder()
.with_max_level(tracing::Level::TRACE)
.finish();
tracing::subscriber::set_global_default(subscriber).expect("You Banned");
// Create a stream to connect
let stream = UnixStream::connect("/home/lily/vms/xpiss/qmp.sock")
.await
.expect("Could not connect");
let client = qmp::QmpClient::new(stream);
// Handshake QMP
client.handshake().await.expect("Could not handshake QMP");
println!("Connected to QMP server");
// let's try to get all STOP events from QMP now.
let mut rx = client.subscribe_to_event("STOP".into()).await.unwrap();
// If this worked, then now we can recieve all the events.
// This code here allows running a VM with the -no-shutdown option,
// automatially resetting it on shutdown.
while let Some(message) = rx.recv().await {
println!("Got stop event! {:?}", message);
let res1 = client.execute("system_reset".into(), None).await.expect("FUCK");
let res2 = client.execute("cont".into(), None).await.expect("FUCK");
println!("Result of running: {:?}, {:?}", res1, res2);
}
//println!("Hello, world!");
}

21
src/qemuvm.rs Normal file
View file

@ -0,0 +1,21 @@
// hacked together
pub struct QemuVM {
}
impl QemuVM {
// fn new()->Self; // spawns manager
//
}
impl Drop for QemuVM {
fn drop(&mut self) {
}
}

227
src/qmp/actor.rs Normal file
View file

@ -0,0 +1,227 @@
use std::collections::HashMap;
// TODO
use tokio::{
io::{AsyncReadExt, AsyncWriteExt, BufStream},
sync::{mpsc, oneshot},
};
use super::{
io, result,
types::{self, QmpExecute, QmpHandshake},
};
#[derive(PartialEq, Eq)]
enum QmpActorState {
/// Disconnected.
Disconnected,
ConnectedToServer,
/// Handshake passed successfully and we're ready.
Ready,
}
/// Actor message type.
pub(crate) enum QmpActorMessage {
/// Close
QmpDoClose,
QmpDoHandshake {
/// The oneshot response.
tx: oneshot::Sender<super::result::Result<QmpHandshake>>,
},
/// Used to send a QMP message.
QmpDoSend {
/// The message to send
command: String,
arguments: Option<serde_json::Value>,
/// The oneshot response.
tx: oneshot::Sender<serde_json::Value>,
},
/// Used to subscibe to a QMP event
QmpSubscribeToEvent {
event_name: String,
tx: mpsc::Sender<serde_json::Value>,
},
}
pub(crate) struct QmpClientActor<S>
where
S: AsyncReadExt + AsyncWriteExt + Unpin + Send,
{
stream: BufStream<S>,
state: QmpActorState,
rx: mpsc::Receiver<QmpActorMessage>,
// used to spit out events to subscribed clients
event_tx: HashMap<String, mpsc::Sender<serde_json::Value>>,
/// Used for QMP execute messages. This allows us to have multiple commands
/// in flight, and to manually send to all of them. Pretty nifty.
last_response_id: u32,
/// All in flight responses.
response_tx: HashMap<u32, Option<oneshot::Sender<serde_json::Value>>>,
}
impl<S> QmpClientActor<S>
where
S: AsyncReadExt + AsyncWriteExt + Unpin + Send,
{
fn new(stream: S, rx: mpsc::Receiver<QmpActorMessage>) -> Self {
Self {
stream: BufStream::new(stream),
state: QmpActorState::ConnectedToServer,
rx: rx,
event_tx: HashMap::new(),
last_response_id: 0,
response_tx: HashMap::new(),
}
}
async fn handshake(&mut self) -> result::Result<types::QmpHandshake> {
let handshake_as_string = io::read_qmp_line(&mut self.stream).await?;
let handshake_json =
serde_json::from_str::<types::QmpHandshake>(handshake_as_string.as_str())?;
// Now let's send the handshake. If this errors we're done anyways
let handshake_response = serde_json::json!({
"execute": "qmp_capabilities"
});
let serialized = serde_json::ser::to_string(&handshake_response)?;
io::write_qmp_line(&mut self.stream, &serialized).await?;
// Read what the server responded back
// FIXME: Make sure it's not an error.
let _handshake_server_response_as_string = io::read_qmp_line(&mut self.stream).await?;
// let handshake_server_response_json =
// serde_json::from_str::<serde_json::Value>(handshake_server_response_as_string.as_str())?;
self.state = QmpActorState::Ready;
Ok(handshake_json)
}
async fn handle_message(&mut self, msg: QmpActorMessage) -> result::Result<()> {
match msg {
QmpActorMessage::QmpDoClose => {
// FIXME: Actually close things.
}
QmpActorMessage::QmpDoHandshake { tx } => {
let handshake = self.handshake().await;
let _ = tx.send(handshake);
}
// this can only really fail in a way where it panics i'm pretty sure
QmpActorMessage::QmpSubscribeToEvent { event_name, tx } => {
self.event_tx.insert(event_name.to_uppercase(), tx);
},
QmpActorMessage::QmpDoSend { command, arguments, tx } => {
if self.state != QmpActorState::Ready {
return Err(result::QmpError::InvalidState);
}
self.response_tx.insert(self.last_response_id, Some(tx));
self.send_execute(command, &arguments).await?;
self.last_response_id += 1;
}
_ => tracing::error!("Unknown message"),
}
Ok(())
}
async fn handle_qmp_read(&mut self, line: &String) -> result::Result<()> {
let res = serde_json::from_str::<serde_json::Value>(line.as_str())?;
if let Some(event) = res.get("event") {
let str = String::from(event.as_str().unwrap());
// Send the event to the subscriber
if let Some(tx) = self.event_tx.get(&str) {
let _ = tx.send(res.clone()).await;
}
}
if let Some(id) = res.get("id") {
let id = id.as_u64().unwrap() as u32;
// Send the response to the oneshot channel that's expecting it then remove
// the key from the HashMap, since it's no longer needed.
if let Some(txopt) = self.response_tx.get_mut(&id) {
let tx = txopt.take().unwrap();
let _ = tx.send(res.clone());
self.response_tx.remove(&id);
}
}
Ok(())
}
async fn send_execute(&mut self, command: String, arguments: &Option<serde_json::Value>) -> result::Result<()> {
// Send
let execute = QmpExecute {
execute: command,
id: self.last_response_id,
arguments: arguments.clone()
};
let serialized = serde_json::ser::to_string(&execute)?;
io::write_qmp_line(&mut self.stream, &serialized).await?;
Ok(())
}
/// Actor runloop.
async fn run(mut self) -> result::Result<()> {
loop {
tokio::select! {
str = io::read_qmp_line(&mut self.stream) => {
match str {
Ok(val) => {
self.handle_qmp_read(&val).await?;
}
Err(err) => { tracing::error!("stream error {err}"); break }
}
},
msg_opt = self.rx.recv() => {
if let Some(msg) = msg_opt {
self.handle_message(msg).await?;
} else {
break
}
}
else => break
}
}
Ok(())
}
/// Helper function to spawn the actor.
pub(crate) fn spawn_actor(socket: S) -> mpsc::Sender<QmpActorMessage>
where
S: AsyncReadExt + AsyncWriteExt + Unpin + Send + 'static,
{
let (tx, rx) = mpsc::channel(8);
let inner = QmpClientActor::new(socket, rx);
tokio::spawn(async move {
let res = inner.run().await;
res.inspect_err(|err| {
tracing::error!("error in actor runloop: {err}");
});
});
tx
}
}

85
src/qmp/client.rs Normal file
View file

@ -0,0 +1,85 @@
//! QMP client.
use tokio::{
io::{AsyncReadExt, AsyncWriteExt},
sync::{mpsc, oneshot},
};
use super::{result, actor, types};
/// A client for the Qemu Machine Protocol (also known as QMP).
/// Generic so it works with any Tokio stream type (which fits QEMU's ability to run
/// QMP across many protocols). Currently isn't very typesafe but whatever.
///
/// This struct is essentially a handle to an actor.
/// I might maybe refactor this or make some generic "actor" thingmabob but for now
/// it's all in here.
#[derive(Clone)]
pub struct QmpClient {
tx: mpsc::Sender<actor::QmpActorMessage>,
}
impl QmpClient {
/// Creates a new QMP client.
///
/// ## Notes
/// The stream provided **MUST NOT** have been shut down before being provided (besides, why would you even do that?)
pub fn new<S>(socket: S) -> Self
where
S: AsyncReadExt + AsyncWriteExt + Unpin + Send + 'static,
{
Self {
tx: actor::QmpClientActor::spawn_actor(socket),
}
}
// TODO handle errors properly
pub async fn handshake(&self) -> result::Result<types::QmpHandshake> {
let (tx, rx) = oneshot::channel();
let _ = self
.tx
.send(actor::QmpActorMessage::QmpDoHandshake { tx: tx })
.await;
match rx.await {
Ok(res) => res,
Err(err) => Err(result::QmpError::GeneralFail),
}
}
pub async fn execute(&self, command: String, arguments: Option<serde_json::Value>) -> result::Result<serde_json::Value> {
let (tx, rx) = oneshot::channel();
let _ = self
.tx
.send(actor::QmpActorMessage::QmpDoSend { command: command, arguments: arguments.clone(), tx: tx })
.await;
match rx.await {
Ok(res) => Ok(res),
Err(err) => Err(result::QmpError::GeneralFail),
}
}
/// Subscribes to a QMP event.
/// Returns the recvieving arm of a [mpsc::channel] which will recieve
/// the JSON event payloads when an event is sent by the server.
pub async fn subscribe_to_event(
&self,
event_name: String,
) -> result::Result<mpsc::Receiver<serde_json::Value>> {
let (tx, rx) = mpsc::channel(8);
let _ = self
.tx
.send(actor::QmpActorMessage::QmpSubscribeToEvent {
event_name: event_name,
tx: tx,
})
.await;
// give the user their rx
Ok(rx)
}
}

42
src/qmp/io.rs Normal file
View file

@ -0,0 +1,42 @@
//! Low-level IO routines
use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufStream};
use super::result;
/// Reads a single QMP delimited line.
pub async fn read_qmp_line<S>(stream: &mut BufStream<S>) -> result::Result<String>
where
S: AsyncReadExt + AsyncWriteExt + Unpin + Send,
{
let mut buf = String::new();
let n = stream.read_line(&mut buf).await?;
// should probably actually disconnect
if n == 0 {
return Err(result::QmpError::IoError(std::io::Error::from(
std::io::ErrorKind::UnexpectedEof,
)));
}
Ok(buf)
}
pub async fn write_qmp_line<S>(
stream: &mut BufStream<S>,
string: &String,
) -> result::Result<()>
where
S: AsyncReadExt + AsyncWriteExt + Unpin + Send,
{
// rough but whatever
let delimited = format!("{}\r\n", string);
stream.write_all(delimited.as_bytes()).await?;
// FIXME: We probably shouldn't do this all the time but I mean
// QMP isn't that high bandwidth so it's probably fine.
stream.flush().await?;
Ok(())
}

8
src/qmp/mod.rs Normal file
View file

@ -0,0 +1,8 @@
//! QMP client lib
pub(crate) mod types;
pub mod result;
pub(crate) mod io;
pub(crate) mod actor;
pub mod client;
pub use client::*;

23
src/qmp/result.rs Normal file
View file

@ -0,0 +1,23 @@
use thiserror::Error;
/// QMP library error.
/// FIXME: more arms
#[derive(Error, Debug)]
pub enum QmpError {
#[error(transparent)]
IoError(#[from] tokio::io::Error),
#[error(transparent)]
DecodingError(#[from] std::string::FromUtf8Error),
#[error(transparent)]
JsonError(#[from] serde_json::Error),
#[error("invalid state for operation")]
InvalidState,
#[error("general failure")]
GeneralFail,
}
pub type Result<T> = std::result::Result<T, QmpError>;

40
src/qmp/types.rs Normal file
View file

@ -0,0 +1,40 @@
//! Strong typed QMP messages (For the most part)...
use serde::{Deserialize, Serialize};
#[derive(Deserialize, Debug)]
pub struct QmpHandshakeSemVer {
pub major: i32,
pub minor: i32,
pub micro: i32,
}
#[derive(Deserialize, Debug)]
pub struct QmpHandshakeQMPVersion {
pub qemu: QmpHandshakeSemVer,
pub package: String,
}
/// "QMP" object
#[derive(Deserialize, Debug)]
pub struct QmpHandshakeQMP {
pub version: QmpHandshakeQMPVersion,
pub capabilities: Vec<String>,
}
#[derive(Deserialize, Debug)]
pub struct QmpHandshake {
#[serde(rename = "QMP")]
pub qmp: QmpHandshakeQMP,
}
#[derive(Serialize, Debug)]
pub struct QmpExecute {
pub execute: String,
/// Really, this is defined in the QMP as *any* JSON value,
/// but we only use a u32 here.
pub id: u32,
#[serde(skip_serializing_if = "Option::is_none")]
pub arguments: Option<serde_json::Value>
}

9
src/vm.rs Normal file
View file

@ -0,0 +1,9 @@
struct VMUser {
}
// holds a qemuvm::QemuVM, (maybe) broad
pub struct VM {
}