initial version
This commit is contained in:
commit
c2918ffc87
9 changed files with 2783 additions and 0 deletions
2
.gitignore
vendored
Normal file
2
.gitignore
vendored
Normal file
|
@ -0,0 +1,2 @@
|
|||
/target
|
||||
config.toml
|
2176
Cargo.lock
generated
Normal file
2176
Cargo.lock
generated
Normal file
File diff suppressed because it is too large
Load diff
23
Cargo.toml
Normal file
23
Cargo.toml
Normal file
|
@ -0,0 +1,23 @@
|
|||
[package]
|
||||
name = "cvmshot"
|
||||
version = "0.1.0"
|
||||
edition = "2021"
|
||||
|
||||
[dependencies]
|
||||
anyhow = "1.0.89"
|
||||
aws-lc-rs = "1.9.0"
|
||||
base64 = "0.22.1"
|
||||
chrono = "0.4.38"
|
||||
futures = "0.3.30"
|
||||
image = { version = "0.25.2" }
|
||||
rustls = "0.23.13"
|
||||
serde = { version = "1.0.210", features = ["derive"] }
|
||||
tokio = { version = "1.40.0", features = ["full"] }
|
||||
tokio-tungstenite = { version = "0.24.0", features = [
|
||||
"rustls-tls-native-roots",
|
||||
] }
|
||||
toml = "0.8.19"
|
||||
tracing = "0.1.40"
|
||||
tracing-subscriber = "0.3.18"
|
||||
tzfile = "0.1.3"
|
||||
webp = "0.3.0"
|
19
LICENSE
Normal file
19
LICENSE
Normal file
|
@ -0,0 +1,19 @@
|
|||
Copyright 2024 Lily Tsuru/modeco80 <lily.modeco80@protonmail.ch>
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
of this software and associated documentation files (the "Software"), to deal
|
||||
in the Software without restriction, including without limitation the rights
|
||||
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
copies of the Software, and to permit persons to whom the Software is
|
||||
furnished to do so, subject to the following conditions:
|
||||
|
||||
The above copyright notice and this permission notice shall be included in all
|
||||
copies or substantial portions of the Software.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||
SOFTWARE.
|
11
README.md
Normal file
11
README.md
Normal file
|
@ -0,0 +1,11 @@
|
|||
# cvmshot
|
||||
|
||||
a simple CollabVM screenshot logger thing.
|
||||
|
||||
# Usage
|
||||
|
||||
- Configure config.toml from config.toml.example
|
||||
- `cargo b --release`
|
||||
- run `target/release/cvmshot`
|
||||
- ...
|
||||
- screenshots?
|
13
config.toml.example
Normal file
13
config.toml.example
Normal file
|
@ -0,0 +1,13 @@
|
|||
# The root directory of your screenshots
|
||||
root_path = "/home/user/cvmshots/"
|
||||
|
||||
# The WebP quality
|
||||
webp_quality = 45.0
|
||||
|
||||
# Configure nodes here:
|
||||
|
||||
# the key is the node name
|
||||
# Inside, url is the websocket url to the VM
|
||||
# the origin is the origin URI
|
||||
[vms]
|
||||
myvm1 = { url = "wss://myvm.xyz/myvm1", origin = "https://myvm.xyz" }
|
204
src/guac.rs
Normal file
204
src/guac.rs
Normal file
|
@ -0,0 +1,204 @@
|
|||
use std::fmt;
|
||||
|
||||
// type of a guac message
|
||||
pub type Elements = Vec<String>;
|
||||
|
||||
// FIXME: thiserror, please.
|
||||
|
||||
/// Errors during decoding
|
||||
#[derive(Debug, Clone)]
|
||||
pub enum DecodeError {
|
||||
/// Invalid guacamole instruction format
|
||||
InvalidFormat,
|
||||
|
||||
/// Instruction is too long for the current decode policy.
|
||||
InstructionTooLong,
|
||||
|
||||
/// Element is too long for the current decode policy.
|
||||
ElementTooLong,
|
||||
|
||||
/// Invalid element size.
|
||||
ElementSizeInvalid,
|
||||
}
|
||||
|
||||
pub type DecodeResult<T> = std::result::Result<T, DecodeError>;
|
||||
|
||||
impl fmt::Display for DecodeError {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
match self {
|
||||
Self::InvalidFormat => write!(f, "Invalid Guacamole instruction while decoding"),
|
||||
Self::InstructionTooLong => write!(f, "Instruction too long for current decode policy"),
|
||||
Self::ElementTooLong => write!(f, "Element too long for current decode policy"),
|
||||
Self::ElementSizeInvalid => write!(f, "Element size is invalid"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl std::error::Error for DecodeError {}
|
||||
|
||||
/// A decoder with a given maximum instruction/element size pair.
|
||||
pub struct ConfigurableDecoder<const MAX_INST_SIZE: usize, const MAX_ELEM_SIZE: usize>();
|
||||
|
||||
impl<const MAX_INST_SIZE: usize, const MAX_ELEM_SIZE: usize>
|
||||
ConfigurableDecoder<MAX_INST_SIZE, MAX_ELEM_SIZE>
|
||||
{
|
||||
fn max_instruction_size() -> usize {
|
||||
MAX_INST_SIZE
|
||||
}
|
||||
|
||||
fn max_element_size() -> usize {
|
||||
MAX_ELEM_SIZE
|
||||
}
|
||||
|
||||
/// Decodes a Guacamole instruction to individual elements
|
||||
pub fn decode(element_string: &String) -> DecodeResult<Elements> {
|
||||
let mut vec: Elements = Vec::new();
|
||||
let mut current_position: usize = 0;
|
||||
|
||||
// Instruction is too long. Don't even bother
|
||||
if Self::max_instruction_size() < element_string.len() {
|
||||
return Err(DecodeError::InstructionTooLong);
|
||||
}
|
||||
|
||||
let chars = element_string.chars().collect::<Vec<_>>();
|
||||
|
||||
loop {
|
||||
let mut element_size: usize = 0;
|
||||
|
||||
// Scan the integer value in by hand. This is mostly because
|
||||
// I'm stupid, and the Rust integer parsing routines (seemingly)
|
||||
// require a substring (or a slice, but, if you can generate a slice,
|
||||
// you can also just scan the value in by hand.)
|
||||
//
|
||||
// We bound this anyways and do quite the checks, so even though it's not great,
|
||||
// it should be generally fine (TM).
|
||||
loop {
|
||||
let c = chars[current_position];
|
||||
|
||||
if c >= '0' && c <= '9' {
|
||||
element_size = element_size * 10 + (c as usize) - ('0' as usize);
|
||||
} else {
|
||||
if c == '.' {
|
||||
break;
|
||||
}
|
||||
|
||||
return Err(DecodeError::InvalidFormat);
|
||||
}
|
||||
current_position += 1;
|
||||
}
|
||||
|
||||
// Eat the '.' seperating the size and the element data;
|
||||
// our integer scanning ensures we only get here in the case that this is actually the '.'
|
||||
// character.
|
||||
current_position += 1;
|
||||
|
||||
// Make sure the element size doesn't overflow the decode policy
|
||||
// or the size of the whole instruction.
|
||||
|
||||
if element_size >= Self::max_element_size() {
|
||||
return Err(DecodeError::ElementTooLong);
|
||||
}
|
||||
|
||||
if element_size >= element_string.len() {
|
||||
return Err(DecodeError::ElementSizeInvalid);
|
||||
}
|
||||
|
||||
// cutoff elements or something
|
||||
if current_position + element_size > chars.len() - 1 {
|
||||
//println!("? {current_position} a {}", chars.len());
|
||||
return Err(DecodeError::InvalidFormat);
|
||||
}
|
||||
|
||||
let element = chars
|
||||
.iter()
|
||||
.skip(current_position)
|
||||
.take(element_size)
|
||||
.collect::<String>();
|
||||
|
||||
current_position += element_size;
|
||||
|
||||
vec.push(element);
|
||||
|
||||
// make sure seperator is proper
|
||||
match chars[current_position] {
|
||||
',' => {}
|
||||
';' => break,
|
||||
_ => return Err(DecodeError::InvalidFormat),
|
||||
}
|
||||
|
||||
// eat the ','
|
||||
current_position += 1;
|
||||
}
|
||||
|
||||
Ok(vec)
|
||||
}
|
||||
}
|
||||
|
||||
pub struct Encoder();
|
||||
|
||||
impl Encoder {
|
||||
/// Encodes elements into a Guacamole instruction
|
||||
pub fn encode(elements: &Elements) -> String {
|
||||
let mut str = String::new();
|
||||
|
||||
for elem in elements.iter() {
|
||||
str.push_str(&format!("{}.{},", elem.len(), elem));
|
||||
}
|
||||
|
||||
// hacky, but whatever
|
||||
str.pop();
|
||||
str.push(';');
|
||||
|
||||
str
|
||||
}
|
||||
|
||||
// FIXME: piecemeal api. We can use a string or something
|
||||
}
|
||||
|
||||
/// The default decoder.
|
||||
/// Allows a 2MB max message, with a 1 MB max element size.
|
||||
pub type Decoder = ConfigurableDecoder<
|
||||
// prev was 12288, 4096
|
||||
// which is ~12kb, 4kb max
|
||||
{ 2 * (1024 * 1024) },
|
||||
{ 1024 * 1024 },
|
||||
>;
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn decode_basic() {
|
||||
let test = String::from("7.connect,3.vm1;");
|
||||
let res = Decoder::decode(&test);
|
||||
|
||||
assert!(res.is_ok());
|
||||
assert_eq!(res.unwrap(), vec!["connect", "vm1"]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn decode_errors() {
|
||||
let test = String::from("700.connect,3.vm1;");
|
||||
let res = Decoder::decode(&test);
|
||||
|
||||
eprintln!("Error for: {}", res.clone().unwrap_err());
|
||||
|
||||
assert!(res.is_err())
|
||||
}
|
||||
|
||||
// generally just test that the codec even works
|
||||
// (we can decode a instruction we created)
|
||||
#[test]
|
||||
fn general_codec_works() {
|
||||
let vec = vec![String::from("connect"), String::from("vm1")];
|
||||
let test = Encoder::encode(&vec);
|
||||
|
||||
assert_eq!(test, "7.connect,3.vm1;");
|
||||
|
||||
let res = Decoder::decode(&test);
|
||||
|
||||
assert!(res.is_ok());
|
||||
assert_eq!(res.unwrap(), vec);
|
||||
}
|
||||
}
|
104
src/main.rs
Normal file
104
src/main.rs
Normal file
|
@ -0,0 +1,104 @@
|
|||
use std::{collections::HashMap, time::Duration};
|
||||
|
||||
use chrono::Timelike;
|
||||
use tokio::task::JoinHandle;
|
||||
|
||||
use tracing::{Instrument, Level};
|
||||
use tracing_subscriber::FmtSubscriber;
|
||||
|
||||
use serde::Deserialize;
|
||||
use toml;
|
||||
|
||||
mod guac;
|
||||
mod shotter;
|
||||
|
||||
fn duration_until_next_minute() -> Duration {
|
||||
let now = chrono::Utc::now();
|
||||
let interval = (60 - now.second()) * 1000;
|
||||
|
||||
// not sure if this ever triggers
|
||||
if interval == 0 {
|
||||
return Duration::from_secs(59);
|
||||
}
|
||||
|
||||
return Duration::from_millis(interval as u64);
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Debug, Clone)]
|
||||
struct Node {
|
||||
url: String,
|
||||
origin: String,
|
||||
}
|
||||
|
||||
#[derive(Deserialize)]
|
||||
struct Config {
|
||||
root_path: std::path::PathBuf,
|
||||
|
||||
webp_quality: f32,
|
||||
|
||||
vms: HashMap<String, Node>,
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> anyhow::Result<()> {
|
||||
#[cfg(debug_assertions)]
|
||||
let subscriber = FmtSubscriber::builder()
|
||||
.with_max_level(Level::TRACE)
|
||||
.compact()
|
||||
.finish();
|
||||
|
||||
#[cfg(not(debug_assertions))]
|
||||
let subscriber = FmtSubscriber::builder()
|
||||
.with_max_level(Level::INFO)
|
||||
.compact()
|
||||
.finish();
|
||||
|
||||
tracing::subscriber::set_global_default(subscriber)?;
|
||||
|
||||
let _ = rustls::crypto::aws_lc_rs::default_provider().install_default();
|
||||
|
||||
let config: Config = toml::from_str(std::fs::read_to_string("./config.toml")?.as_str())?;
|
||||
|
||||
// Essentially this is meant to be a sentinel for "SCREENSHOT NOW!"
|
||||
let (tx, _) = tokio::sync::broadcast::channel::<Option<()>>(10);
|
||||
|
||||
for (id, node) in config.vms.iter() {
|
||||
tracing::info!("Adding node {id} : {:?}", node);
|
||||
|
||||
let mut clone = tx.subscribe();
|
||||
let path = config.root_path.join(id);
|
||||
let id_clone = id.clone();
|
||||
let node_clone = node.clone();
|
||||
|
||||
let _: JoinHandle<anyhow::Result<()>> = tokio::spawn(async move {
|
||||
while let Some(_) = clone.recv().await? {
|
||||
let span = tracing::span!(
|
||||
Level::INFO,
|
||||
"node screenshot",
|
||||
// FIXME: This should NOT be hardcoded
|
||||
node = id_clone.as_str()
|
||||
);
|
||||
|
||||
shotter::take_one_screenshot(
|
||||
&node_clone.url,
|
||||
&node_clone.origin,
|
||||
&id_clone,
|
||||
path.clone(),
|
||||
config.webp_quality,
|
||||
)
|
||||
.instrument(span)
|
||||
.await?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
});
|
||||
}
|
||||
|
||||
loop {
|
||||
tx.send(Some(()))?;
|
||||
|
||||
let dur = duration_until_next_minute();
|
||||
tracing::info!("Waiting {:?}", dur);
|
||||
tokio::time::sleep(dur).await;
|
||||
}
|
||||
}
|
231
src/shotter.rs
Normal file
231
src/shotter.rs
Normal file
|
@ -0,0 +1,231 @@
|
|||
use std::io::{BufReader, Cursor, Write};
|
||||
|
||||
use base64::{
|
||||
engine::general_purpose::{GeneralPurpose, GeneralPurposeConfig},
|
||||
Engine,
|
||||
};
|
||||
use chrono::{Datelike, Timelike};
|
||||
use tokio::task::JoinHandle;
|
||||
use tokio_tungstenite::tungstenite::client::IntoClientRequest;
|
||||
|
||||
use futures::{SinkExt, StreamExt};
|
||||
use tracing::Instrument;
|
||||
|
||||
use crate::guac;
|
||||
|
||||
//#[tracing::instrument(level = "info")]
|
||||
// Not sure I want to instrument EVERYTHING
|
||||
pub async fn take_one_screenshot(
|
||||
url: &String,
|
||||
origin: &String,
|
||||
node_name: &String,
|
||||
root_path: std::path::PathBuf,
|
||||
webp_quality: f32,
|
||||
) -> anyhow::Result<()> {
|
||||
// Don't particularly like this syntax but whatever
|
||||
let mut req = url.into_client_request()?;
|
||||
|
||||
let map = req.headers_mut();
|
||||
|
||||
map.insert("Origin", origin.parse().unwrap());
|
||||
map.insert("Sec-WebSocket-Protocol", "guacamole".parse().unwrap());
|
||||
|
||||
match tokio_tungstenite::connect_async(req).await {
|
||||
Ok(ws) => {
|
||||
tracing::trace!("connected to CollabVM server");
|
||||
|
||||
let (mut tx, mut rx) = ws.0.split();
|
||||
let (write_tx, mut write_rx) = tokio::sync::mpsc::channel::<String>(32);
|
||||
|
||||
let (guac_handler_tx, mut guac_handler_rx) =
|
||||
tokio::sync::mpsc::channel::<Vec<String>>(32);
|
||||
|
||||
// Add an initial "view" request. We could support caps but /shrug
|
||||
// Note that this can optionally be replaced with connect,[node]
|
||||
// but hey, being the first to use view,[node],1 is a good thing
|
||||
write_tx
|
||||
.send(guac::Encoder::encode(&vec![
|
||||
"view".into(),
|
||||
node_name.clone(),
|
||||
"1".into(),
|
||||
]))
|
||||
.await?;
|
||||
|
||||
let read: JoinHandle<anyhow::Result<()>> = tokio::spawn(
|
||||
async move {
|
||||
while let Some(msg) = rx.next().await {
|
||||
if msg.is_err() {
|
||||
return Err(anyhow::anyhow!("WebSocket protocol error"));
|
||||
} else if msg.is_ok() {
|
||||
let message = msg.unwrap();
|
||||
if message.is_text() {
|
||||
let decoded = guac::Decoder::decode(&message.into_text()?)?;
|
||||
guac_handler_tx.send(decoded).await?;
|
||||
} else {
|
||||
return Err(anyhow::anyhow!("unexpected frame"));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
.in_current_span(),
|
||||
);
|
||||
|
||||
let write: JoinHandle<anyhow::Result<()>> = tokio::spawn(
|
||||
async move {
|
||||
while let Some(msg) = write_rx.recv().await {
|
||||
tx.send(tokio_tungstenite::tungstenite::Message::text(msg))
|
||||
.await?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
.in_current_span(),
|
||||
);
|
||||
|
||||
// main handler
|
||||
let write_guac_handler_clone = write_tx.clone();
|
||||
let guac_handler: JoinHandle<anyhow::Result<()>> = tokio::spawn(
|
||||
async move {
|
||||
while let Some(msg) = guac_handler_rx.recv().await {
|
||||
match msg[0].as_str() {
|
||||
"nop" => {
|
||||
tracing::trace!("Sending nop to CollabVM server");
|
||||
write_guac_handler_clone.send("3.nop;".into()).await?;
|
||||
}
|
||||
|
||||
"size" => {
|
||||
if msg.len() != 4 {
|
||||
return Err(anyhow::anyhow!("???"));
|
||||
}
|
||||
|
||||
if msg[1].as_str() == "0" {
|
||||
let expected_width: u32 = msg[2].parse()?;
|
||||
let expected_height: u32 = msg[3].parse()?;
|
||||
tracing::trace!("Screen is {expected_width}x{expected_height}");
|
||||
}
|
||||
}
|
||||
|
||||
"png" => {
|
||||
// Layer 0 is the screen (it's also the only layer cvmts sends)
|
||||
if msg[1].as_str() == "0" {
|
||||
if msg[3].as_str() == "0" && msg[4].as_str() == "0" {
|
||||
// same as darok's stuff except i'm not keeping nanos
|
||||
let zone = tzfile::Tz::named("GMT")?;
|
||||
let now = { chrono::Utc::now().with_timezone(&&zone) };
|
||||
|
||||
let date_path = root_path.join(format!(
|
||||
"{:02}-{:02}-{:02}",
|
||||
now.year(),
|
||||
now.month(),
|
||||
now.day()
|
||||
));
|
||||
|
||||
if !date_path.exists() {
|
||||
std::fs::create_dir_all(&date_path)?;
|
||||
}
|
||||
|
||||
let file_path = date_path.join(format!(
|
||||
"{:02}-{:02}-{:02}.webp",
|
||||
now.hour(),
|
||||
now.minute(),
|
||||
now.second()
|
||||
));
|
||||
|
||||
let file_path_clone = file_path.clone();
|
||||
|
||||
let _: anyhow::Result<()> =
|
||||
tokio::task::spawn_blocking(move || {
|
||||
let image = {
|
||||
// this api genuinely sucks
|
||||
let engine = GeneralPurpose::new(
|
||||
&base64::alphabet::STANDARD,
|
||||
GeneralPurposeConfig::default(),
|
||||
);
|
||||
let decoded = engine.decode(&msg[5])?;
|
||||
|
||||
let mut reader =
|
||||
BufReader::new(Cursor::new(&decoded));
|
||||
|
||||
image::load(
|
||||
&mut reader,
|
||||
image::ImageFormat::Jpeg,
|
||||
)?
|
||||
};
|
||||
|
||||
// this library sucks but it's the only one that binds libwebp directly
|
||||
// and lets me not use lossless encoding.
|
||||
// string errors are asanine
|
||||
let encoder = webp::Encoder::from_image(&image)
|
||||
.expect("fuck you");
|
||||
|
||||
let mut file =
|
||||
std::fs::File::create(file_path_clone)?;
|
||||
|
||||
{
|
||||
let encoded = encoder
|
||||
.encode_simple(false, webp_quality)
|
||||
.expect("FUCK");
|
||||
file.write_all(&encoded)?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
})
|
||||
.await?;
|
||||
|
||||
tracing::info!(
|
||||
"Screenshot written to \"{}\"",
|
||||
file_path.display()
|
||||
);
|
||||
|
||||
// no longer needed, everyone will close now!
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
.in_current_span(),
|
||||
);
|
||||
|
||||
tokio::select! {
|
||||
read_res = read => {
|
||||
let res = read_res?;
|
||||
if res.is_err() {
|
||||
tracing::error!("WebSocket read task failure: {:?}", res);
|
||||
return res;
|
||||
}
|
||||
},
|
||||
|
||||
write_res = write => {
|
||||
let res = write_res?;
|
||||
if res.is_err() {
|
||||
tracing::error!("WebSocket write task failure: {:?}", res);
|
||||
return res;
|
||||
}
|
||||
},
|
||||
|
||||
guac_res = guac_handler => {
|
||||
let res = guac_res?;
|
||||
if res.is_err() {
|
||||
tracing::error!("CollabVM protocol failure: {:?}", res);
|
||||
return res;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Err(err) => {
|
||||
tracing::error!("Internal error: {:?}", err);
|
||||
return Err(err.into());
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
Loading…
Reference in a new issue