backend: Upgrade to superqemu 0.3.x

This commit is contained in:
Lily Tsuru 2024-11-03 21:01:06 -05:00
parent 39e22408fe
commit 5b939879f9
13 changed files with 662 additions and 26 deletions

2
.gitignore vendored
View file

@ -8,3 +8,5 @@ node_modules/
/jpeg-rs/target
/jpeg-rs/index.node
/.yarn/**

View file

@ -15,7 +15,8 @@
}
},
"dependencies": {
"@computernewb/superqemu": "^0.1.0",
"@computernewb/nodejs-rfb": "^0.3.0",
"@computernewb/superqemu": "^0.3.0",
"@fastify/websocket": "^10.0.1",
"@socketcomputer/jpeg-rs": "*",
"@socketcomputer/shared": "*",

View file

@ -64,6 +64,9 @@ export function Slot_PCDef(
return {
id: 'socketvm1',
command: qCommand.join(' '),
forceTcp: false,
vncHost: undefined,
vncPort: undefined,
snapshot: true
};
}

View file

@ -7,6 +7,7 @@ import * as fastifyWebsocket from '@fastify/websocket';
import { SocketVM } from './SocketVM.js';
import { VMUser } from './VMUser.js';
import pino from 'pino';
import { QemuVMShim } from './qemu/qemu_vm.js';
// CONFIG types (not used yet)
export type SocketComputerConfig_VM = {
@ -59,7 +60,11 @@ export class SocketComputerServer {
let slotDef: QemuVmDefinition = Slot_PCDef('2G', '-netdev tap,ifname=ktsocket1,script=no,downscript=no,id=vm.wan', 'rtl8139', 'c0:11:ab:69:44:02', true, diskpath, 'qcow2');
// Create the VM
this.vm = new SocketVM(new QemuVM(slotDef));
this.vm = new SocketVM(new QemuVMShim(slotDef, {
cpuUsageMax: 50,
runOnCpus: [ 2 ],
periodMs: 10
}));
// Boot it up
await this.vm.Start();

View file

@ -1,7 +1,9 @@
import { EventEmitter } from 'node:events';
import { TurnQueue, UserTimeTuple, kTurnTimeSeconds } from './TurnQueue.js';
import { VMUser } from './VMUser.js';
import { QemuDisplay, QemuVM, Rect, Size, VMState } from '@computernewb/superqemu';
import { VMState } from '@computernewb/superqemu';
import { QemuVMShim } from './qemu/qemu_vm.js';
import { ExtendableTimer } from './ExtendableTimer.js';
import { kMaxUserNameLength } from '@socketcomputer/shared';
@ -9,10 +11,11 @@ import { kMaxUserNameLength } from '@socketcomputer/shared';
import * as Shared from '@socketcomputer/shared';
import pino from 'pino';
import { JPEGEncoder } from './JPEGEncoder.js';
import { Rect, Size, VMDisplay } from './display/types.js';
export class SocketVM extends EventEmitter {
private vm: QemuVM;
private display: QemuDisplay | null = null;
private vm;
private display: VMDisplay | null = null;
private timer: ExtendableTimer = new ExtendableTimer(15);
private users: Array<VMUser> = [];
@ -22,7 +25,7 @@ export class SocketVM extends EventEmitter {
name: 'Sc2VM'
});
constructor(vm: QemuVM) {
constructor(vm: QemuVMShim) {
super();
this.vm = vm;
@ -76,12 +79,27 @@ export class SocketVM extends EventEmitter {
}
});
this.vm.on('statechange', async (state: VMState) => {
let self = this;
this.vm.Events().on('statechange', async (state: VMState) => {
if (state == VMState.Started) {
this.display = this.vm.GetDisplay();
self.logger.info('VM started');
// start the display and add the events once
if (self.vm.GetDisplay() == null) {
self.vm.StartDisplay();
self.display = self.vm.GetDisplay();
self.logger.info('started display, adding events now');
// add events
self.display?.on('resize', (size: Size) => self.OnDisplayResized(size));
self.display?.on('rect', (rect: Rect) => self.OnDisplayRectangle(rect));
}
await this.VMRunning();
} else if (state == VMState.Stopped) {
this.display = null;
await this.VMStopped();
}
});
@ -95,7 +113,7 @@ export class SocketVM extends EventEmitter {
user.username = VMUser.GenerateName();
user.vm = this;
this.logger.info({user: user.username, ip: user.address}, 'User joined');
this.logger.info({ user: user.username, ip: user.address }, 'User joined');
await this.SendInitialScreen(user);
@ -127,7 +145,7 @@ export class SocketVM extends EventEmitter {
}
async RemUser(user: VMUser) {
this.logger.info({user: user.username, ip: user.address}, 'User left');
this.logger.info({ user: user.username, ip: user.address }, 'User left');
this.users.splice(this.users.indexOf(user), 1);
this.queue.TryRemove(user);
@ -192,9 +210,6 @@ export class SocketVM extends EventEmitter {
let self = this;
this.display = this.vm.GetDisplay();
this.display?.on('resize', (size: Size) => self.OnDisplayResized(size));
this.display?.on('rect', (rect: Rect) => self.OnDisplayRectangle(rect));
this.timer.Start();
}
@ -210,14 +225,13 @@ export class SocketVM extends EventEmitter {
private async OnDisplayResized(size: Size) {
let arr = await this.MakeFullScreen();
for(let user of this.users) {
for(let msg of arr) {
for (let user of this.users) {
for (let msg of arr) {
await user.SendBuffer(msg);
}
}
}
private async MakeRectData(rect: Rect) {
let display = this.vm.GetDisplay();
@ -230,7 +244,6 @@ export class SocketVM extends EventEmitter {
return encoded;
}
private async MakeFullScreen() {
let disp = this.display;
if (disp == null) return [];
@ -263,14 +276,12 @@ export class SocketVM extends EventEmitter {
return arr;
}
private async SendInitialScreen(user: VMUser) {
let arr = await this.MakeFullScreen();
for (let buffer of arr) await user.SendBuffer(buffer);
}
private async VMStopped() {
this.display = null;
await this.vm.Start();
}
}

View file

@ -0,0 +1,41 @@
import { Size, Rect } from './types.js';
export function BatchRects(size: Size, rects: Array<Rect>): Rect {
var mergedX = size.width;
var mergedY = size.height;
var mergedHeight = 0;
var mergedWidth = 0;
// can't batch these
if (rects.length == 0) {
return {
x: 0,
y: 0,
width: size.width,
height: size.height
};
}
if (rects.length == 1) {
if (rects[0].width == size.width && rects[0].height == size.height) {
return rects[0];
}
}
rects.forEach((r) => {
if (r.x < mergedX) mergedX = r.x;
if (r.y < mergedY) mergedY = r.y;
});
rects.forEach((r) => {
if (r.height + r.y - mergedY > mergedHeight) mergedHeight = r.height + r.y - mergedY;
if (r.width + r.x - mergedX > mergedWidth) mergedWidth = r.width + r.x - mergedX;
});
return {
x: mergedX,
y: mergedY,
width: mergedWidth,
height: mergedHeight
};
}

View file

@ -0,0 +1,30 @@
export type Size = {
width: number;
height: number;
};
export type Rect = {
x: number;
y: number;
width: number;
height: number;
};
import EventEmitter from 'node:events';
// events:
//
// 'connected' -> () -> on successful connection
// 'resize' -> (w, h) -> done when resize occurs
// 'rect' -> (x, y, Buffer) -> framebuffer rect (RGBA)
// 'frame' -> () -> done at end of frame
export interface VMDisplay extends EventEmitter {
Connect(): void;
Disconnect(): void;
Connected(): boolean;
Buffer(): Buffer;
Size(): Size;
MouseEvent(x: number, y: number, buttons: number): void;
KeyboardEvent(keysym: number, pressed: boolean): void;
}

149
backend/src/display/vnc.ts Normal file
View file

@ -0,0 +1,149 @@
import { VncClient } from '@computernewb/nodejs-rfb';
import { EventEmitter } from 'node:events';
import { Clamp } from '../util.js';
import { BatchRects } from './batch.js';
import { Size, Rect, VMDisplay } from './types.js';
// the FPS to run the VNC client at
// This only affects internal polling,
// if the VNC itself is sending updates at a slower rate
// the display will be at that slower rate
const kVncBaseFramerate = 60;
export type VncRect = {
x: number;
y: number;
width: number;
height: number;
};
// TODO: replace with a non-asshole VNC client (prefably one implemented
// as a part of cvm-rs)
export class VncDisplay extends EventEmitter implements VMDisplay {
private displayVnc = new VncClient({
debug: false,
fps: kVncBaseFramerate,
encodings: [
VncClient.consts.encodings.raw,
//VncClient.consts.encodings.pseudoQemuAudio,
VncClient.consts.encodings.pseudoDesktopSize
// For now?
//VncClient.consts.encodings.pseudoCursor
]
});
private vncShouldReconnect: boolean = false;
private vncConnectOpts: any;
constructor(vncConnectOpts: any) {
super();
this.vncConnectOpts = vncConnectOpts;
this.displayVnc.on('connectTimeout', () => {
this.Reconnect();
});
this.displayVnc.on('authError', () => {
this.Reconnect();
});
this.displayVnc.on('disconnect', () => {
this.Reconnect();
});
this.displayVnc.on('closed', () => {
this.Reconnect();
});
this.displayVnc.on('firstFrameUpdate', () => {
// apparently this library is this good.
// at least it's better than the two others which exist.
this.displayVnc.changeFps(kVncBaseFramerate);
this.emit('connected');
this.emit('resize', { width: this.displayVnc.clientWidth, height: this.displayVnc.clientHeight });
});
this.displayVnc.on('desktopSizeChanged', (size: Size) => {
this.emit('resize', size);
});
let rects: Rect[] = [];
this.displayVnc.on('rectUpdateProcessed', (rect: Rect) => {
rects.push(rect);
});
this.displayVnc.on('frameUpdated', (fb: Buffer) => {
// use the cvmts batcher
let batched = BatchRects(this.Size(), rects);
this.emit('rect', batched);
// unbatched (watch the performace go now)
//for(let rect of rects)
// this.emit('rect', rect);
rects = [];
this.emit('frame');
});
}
private Reconnect() {
if (this.displayVnc.connected) return;
if (!this.vncShouldReconnect) return;
// TODO: this should also give up after a max tries count
// if we fail after max tries, emit a event
this.displayVnc.connect(this.vncConnectOpts);
}
Connect() {
this.vncShouldReconnect = true;
this.Reconnect();
}
Disconnect() {
this.vncShouldReconnect = false;
this.displayVnc.disconnect();
// bye bye!
this.displayVnc.removeAllListeners();
this.removeAllListeners();
}
Connected() {
return this.displayVnc.connected;
}
Buffer(): Buffer {
return this.displayVnc.fb;
}
Size(): Size {
if (!this.displayVnc.connected)
return {
width: 0,
height: 0
};
return {
width: this.displayVnc.clientWidth,
height: this.displayVnc.clientHeight
};
}
MouseEvent(x: number, y: number, buttons: number) {
if (this.displayVnc.connected) this.displayVnc.sendPointerEvent(Clamp(x, 0, this.displayVnc.clientWidth), Clamp(y, 0, this.displayVnc.clientHeight), buttons);
}
KeyboardEvent(keysym: number, pressed: boolean) {
if (this.displayVnc.connected) this.displayVnc.sendKeyEvent(keysym, pressed);
}
}

116
backend/src/qemu/cgroup.ts Normal file
View file

@ -0,0 +1,116 @@
// Cgroup management code
// this sucks, ill mess with it later
import { appendFileSync, existsSync, mkdirSync, readFileSync, rmdirSync, writeFileSync } from 'node:fs';
import path from 'node:path';
import pino from 'pino';
let logger = pino({ name: 'Sc2/CGroup' });
export class CGroupController {
private controller;
private cg: CGroup;
constructor(controller: string, cg: CGroup) {
this.controller = controller;
this.cg = cg;
}
WriteValue(key: string, value: string) {
try {
writeFileSync(path.join(this.cg.Path(), `${this.controller}.${key}`), value);
} catch (e) {
logger.error({ error: e, controller_name: this.controller, controller_key: `${this.controller}.${key}`, value: value }, 'Failed to set CGroup controller value');
}
}
}
export class CGroup {
private path;
constructor(path: string) {
this.path = path;
}
InitControllers(wants_cpuset: boolean) {
// Configure this "root" cgroup to provide cpu and cpuset controllers to the leaf
// QEMU cgroups. A bit iffy but whatever.
if (wants_cpuset) {
try {
writeFileSync(path.join(this.path, 'cgroup.subtree_control'), '+cpu +cpuset');
} catch (err) {
logger.error({ error: err }, 'Could not provide cpuset controller to subtree. runOnCpus will not function.');
// just give up if this fails
writeFileSync(path.join(this.path, 'cgroup.subtree_control'), '+cpu');
}
} else {
writeFileSync(path.join(this.path, 'cgroup.subtree_control'), '+cpu');
}
}
GetController(controller: string) {
return new CGroupController(controller, this);
}
Path(): string {
return this.path;
}
HasSubgroup(name: string): boolean {
let subgroup_root = path.join(this.path, name);
if (existsSync(subgroup_root)) return true;
return false;
}
DeleteSubgroup(name: string): void {
let subgroup_root = path.join(this.path, name);
if (!this.HasSubgroup(name)) {
throw new Error(`Subgroup ${name} does not exist`);
}
//console.log("Deleting subgroup", name);
rmdirSync(subgroup_root);
}
// Gets a CGroup inside of this cgroup.
GetSubgroup(name: string): CGroup {
// make the subgroup if it doesn't already exist
let subgroup_root = path.join(this.path, name);
if (!this.HasSubgroup(name)) {
mkdirSync(subgroup_root);
// We need to make the subgroup threaded before we can attach a process to it.
// It's a bit weird, but oh well. Blame linux people, not me.
writeFileSync(path.join(subgroup_root, 'cgroup.type'), 'threaded');
}
return new CGroup(subgroup_root);
}
// Attaches a process to this cgroup.
AttachProcess(pid: number) {
appendFileSync(path.join(this.path, 'cgroup.procs'), pid.toString());
}
// Attaches a thread to this cgroup. (The CGroup is a threaded one. See above)
AttachThread(tid: number) {
appendFileSync(path.join(this.path, 'cgroup.threads'), tid.toString());
}
// Returns a CGroup instance for the process' current cgroup, prepared for subgroup usage.
// This will only fail if you are not using systemd or elogind,
// since even logind user sessions are run inside of a user@[UID] slice.
// NOTE: This only supports cgroups2-only systems. Systemd practically enforces that so /shrug
static Self(): CGroup {
const kCgroupSelfPath = '/proc/self/cgroup';
if (!existsSync(kCgroupSelfPath)) throw new Error('This process is not in a CGroup.');
let res = readFileSync(kCgroupSelfPath, { encoding: 'utf-8' });
// Make sure the first/only line is a cgroups2 0::/path/to/cgroup entry.
// Legacy cgroups1 is not supported.
if (res[0] != '0') throw new Error('CGroup.Self() does not work with cgroups 1 systems. Please do not the cgroups 1.');
let cg_path = res.substring(3, res.indexOf('\n'));
let cg = new CGroup(path.join('/sys/fs/cgroup', cg_path));
return cg;
}
}

View file

@ -0,0 +1,144 @@
import EventEmitter from 'events';
import { IProcess, IProcessLauncher, ProcessLaunchOptions } from '@computernewb/superqemu';
import { execaCommand } from 'execa';
import { Readable, Writable } from 'stream';
import { CGroup } from './cgroup.js';
export interface CgroupLimits {
cpuUsageMax?: number;
runOnCpus?: number[];
periodMs?: number;
limitProcess?: boolean;
}
interface CGroupValue {
controller: string;
key: string;
value: string;
}
function MakeValuesFromLimits(limits: CgroupLimits): CGroupValue[] {
let option_array = [];
// The default period is 100 ms, which matches cgroups2 defaults.
let periodUs = 100 * 1000;
// Convert a user-configured period to us, since that's what cgroups2 expects.
if(limits.periodMs)
periodUs = limits.periodMs * 1000;
if (limits.cpuUsageMax) {
// cpu.max
option_array.push({
controller: 'cpu',
key: 'max',
value: `${(limits.cpuUsageMax / 100) * periodUs} ${periodUs}`
});
}
if(limits.runOnCpus) {
// Make sure a CPU is not specified more than once. Bit hacky but oh well
let unique = [...new Set(limits.runOnCpus)];
option_array.push({
controller: 'cpuset',
key: 'cpus',
value: `${unique.join(',')}`
});
}
return option_array;
}
// A process automatically placed in a given cgroup.
class CGroupLimitedProcess extends EventEmitter implements IProcess {
private process;
stdin: Writable | null = null;
stdout: Readable | null = null;
stderr: Readable | null = null;
private root_cgroup: CGroup;
private cgroup: CGroup;
private id;
private limits;
constructor(cgroup_root: CGroup, id: string, limits: CgroupLimits, command: string, opts?: ProcessLaunchOptions) {
super();
this.root_cgroup = cgroup_root;
this.cgroup = cgroup_root.GetSubgroup(id);
this.id = id;
this.limits = limits;
if(!this.limits.limitProcess)
this.limits.limitProcess = false;
this.process = execaCommand(command, opts);
this.stdin = this.process.stdin;
this.stdout = this.process.stdout;
this.stderr = this.process.stderr;
let self = this;
this.process.on('spawn', () => {
self.initCgroup();
if(self.limits.limitProcess) {
// it should have one!
self.cgroup.AttachProcess(self.process.pid!);
}
self.emit('spawn');
});
this.process.on('exit', (code) => {
self.emit('exit', code);
});
}
initCgroup() {
// Set cgroup keys.
for(const val of MakeValuesFromLimits(this.limits)) {
let controller = this.cgroup.GetController(val.controller);
controller.WriteValue(val.key, val.value);
}
}
kill(signal?: number | NodeJS.Signals): boolean {
return this.process.kill(signal);
}
dispose(): void {
this.stdin = null;
this.stdout = null;
this.stderr = null;
this.root_cgroup.DeleteSubgroup(this.id);
this.process.removeAllListeners();
this.removeAllListeners();
}
}
export class QemuResourceLimitedLauncher implements IProcessLauncher {
private limits;
private name;
private root;
public group;
constructor(name: string, limits: CgroupLimits) {
this.root = CGroup.Self();
// Make sure
if(limits.runOnCpus) {
this.root.InitControllers(true);
} else {
this.root.InitControllers(false);
}
this.name = name;
this.limits = limits;
// XXX figure something better out
this.group = this.root.GetSubgroup(this.name);
}
launch(command: string, opts?: ProcessLaunchOptions | undefined): IProcess {
return new CGroupLimitedProcess(this.root, this.name, this.limits, command, opts);
}
}

131
backend/src/qemu/qemu_vm.ts Normal file
View file

@ -0,0 +1,131 @@
import EventEmitter from 'events';
import { QemuVM, QemuVmDefinition, VMState } from '@computernewb/superqemu';
import { VMDisplay } from '../display/types.js';
import { VncDisplay } from '../display/vnc.js';
import pino from 'pino';
import { CgroupLimits, QemuResourceLimitedLauncher } from './qemu_cgroup_launcher.js';
export class QemuVMShim {
private vm;
private display: VncDisplay | null = null;
private logger;
private cg_launcher: QemuResourceLimitedLauncher | null = null;
private resource_limits: CgroupLimits | null = null;
constructor(def: QemuVmDefinition, resourceLimits?: CgroupLimits) {
this.logger = pino({ name: `Sc2.QemuVMShim/${def.id}` });
if (resourceLimits) {
if (process.platform == 'linux') {
this.resource_limits = resourceLimits;
this.cg_launcher = new QemuResourceLimitedLauncher(def.id, resourceLimits);
this.vm = new QemuVM(def, this.cg_launcher);
} else {
// Just use the default Superqemu launcher on non-Linux platforms,
// .. regardless of if resource control is (somehow) enabled.
this.logger.warn({ platform: process.platform }, 'Resource control is not supported on this platform. Please remove or comment it out from your configuration.');
this.vm = new QemuVM(def);
}
} else {
this.vm = new QemuVM(def);
}
this.vm.on('statechange', async (newState) => {
if (newState == VMState.Started) {
await this.PlaceVCPUThreadsIntoCGroup();
}
});
}
Start(): Promise<void> {
return this.vm.Start();
}
async Stop(): Promise<void> {
await this.vm.Stop();
this.display?.Disconnect();
this.display = null;
}
Reboot(): Promise<void> {
return this.vm.Reboot();
}
Reset(): Promise<void> {
return this.vm.Reset();
}
MonitorCommand(command: string): Promise<any> {
return this.vm.MonitorCommand(command);
}
async PlaceVCPUThreadsIntoCGroup() {
let pin_vcpu_threads = false;
if (this.cg_launcher) {
// messy as all hell but oh well
if (this.resource_limits?.limitProcess == undefined) {
pin_vcpu_threads = true;
} else {
pin_vcpu_threads = !this.resource_limits?.limitProcess;
}
if (pin_vcpu_threads) {
// Get all vCPUs and pin them to the CGroup.
let cpu_res = await this.vm.QmpCommand('query-cpus-fast', {});
for (let cpu of cpu_res) {
this.logger.info(`Placing vCPU thread with TID ${cpu['thread-id']} to cgroup`);
this.cg_launcher.group.AttachThread(cpu['thread-id']);
}
}
}
}
StartDisplay(): void {
// boot it up
let info = this.vm.GetDisplayInfo();
if (info == null) throw new Error('its dead jim');
switch (info.type) {
case 'vnc-tcp':
this.display = new VncDisplay({
host: info.host || '127.0.0.1',
port: info.port || 5900,
path: null
});
break;
case 'vnc-uds':
this.display = new VncDisplay({
path: info.path
});
break;
}
let self = this;
this.display?.on('connected', () => {
// The VM can now be considered started
self.logger.info('Display connected');
});
// now that QMP has connected, connect to the display
self.display?.Connect();
}
GetDisplay(): VMDisplay | null {
return this.display;
}
GetState(): VMState {
return this.vm.GetState();
}
SnapshotsSupported(): boolean {
return this.vm.SnapshotsSupported();
}
Events(): EventEmitter {
return this.vm;
}
}

3
backend/src/util.ts Normal file
View file

@ -0,0 +1,3 @@
export function Clamp(input: number, min: number, max: number) {
return Math.min(Math.max(input, min), max);
}

View file

@ -41,14 +41,13 @@ __metadata:
languageName: node
linkType: hard
"@computernewb/superqemu@npm:^0.1.0":
version: 0.1.0
resolution: "@computernewb/superqemu@npm:0.1.0"
"@computernewb/superqemu@npm:^0.3.0":
version: 0.3.2
resolution: "@computernewb/superqemu@npm:0.3.2"
dependencies:
"@computernewb/nodejs-rfb": "npm:^0.3.0"
execa: "npm:^8.0.1"
pino: "npm:^9.3.1"
checksum: 10c0/7177b46c1093345cc3cbcc09450b8b8b09f09eb74ba5abd283aae39e9d1dbc0780f54187da075e44c78a7b683d47367010a473406c2817c36352edd0ddad2c1a
checksum: 10c0/845f1732f1e92b19bbf09b4bfc75381e707d367902535b1d520f1dc323e57f97cdf56d37a2d98e79c99443222224276d488d920e34010d199d798da7c564f7d1
languageName: node
linkType: hard
@ -1138,7 +1137,8 @@ __metadata:
version: 0.0.0-use.local
resolution: "@socketcomputer/backend@workspace:backend"
dependencies:
"@computernewb/superqemu": "npm:^0.1.0"
"@computernewb/nodejs-rfb": "npm:^0.3.0"
"@computernewb/superqemu": "npm:^0.3.0"
"@fastify/websocket": "npm:^10.0.1"
"@socketcomputer/jpeg-rs": "npm:*"
"@socketcomputer/shared": "npm:*"