Seperate server classes into new typescript modules
Organization is nice.
This commit is contained in:
parent
d7326736ca
commit
9121cc487f
6 changed files with 416 additions and 408 deletions
|
@ -43,9 +43,7 @@ export function Slot_PCDef(
|
||||||
pushOption(`${netdevOption}`);
|
pushOption(`${netdevOption}`);
|
||||||
pushOption(`-device ${netAdapterModel},id=vm.netadp,netdev=vm.wan,mac=${netMac}`);
|
pushOption(`-device ${netAdapterModel},id=vm.netadp,netdev=vm.wan,mac=${netMac}`);
|
||||||
|
|
||||||
pushOption(
|
pushOption(`-drive if=none,file=${hdImagePath},cache=writeback,discard=unmap,format=${hdImageFormat},aio=${kQemuAio},id=vm.hda_drive,bps=65000000,bps_max=65000000,iops=1500,iops_max=2000`);
|
||||||
`-drive if=none,file=${hdImagePath},cache=writeback,discard=unmap,format=${hdImageFormat},aio=${kQemuAio},id=vm.hda_drive,bps=65000000,bps_max=65000000,iops=1500,iops_max=2000`
|
|
||||||
);
|
|
||||||
|
|
||||||
// prettier-ignore
|
// prettier-ignore
|
||||||
if (hdaIsSsd)
|
if (hdaIsSsd)
|
||||||
|
@ -64,7 +62,7 @@ export function Slot_PCDef(
|
||||||
pushOption('-device usb-tablet');
|
pushOption('-device usb-tablet');
|
||||||
|
|
||||||
return {
|
return {
|
||||||
id: "socketvm1",
|
id: 'socketvm1',
|
||||||
command: qCommand
|
command: qCommand
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,407 +1,23 @@
|
||||||
import { QemuVmDefinition, QemuDisplay, QemuVM, VMState, setSnapshot } from '@socketcomputer/qemu';
|
import { QemuVmDefinition, QemuVM, setSnapshot } from '@socketcomputer/qemu';
|
||||||
import { Slot_PCDef } from './SlotQemuDefs.js';
|
import { Slot_PCDef } from './SlotQemuDefs.js';
|
||||||
import { ExtendableTimer } from './ExtendableTimer.js';
|
|
||||||
import { EventEmitter } from 'node:events';
|
|
||||||
import * as Shared from '@socketcomputer/shared';
|
|
||||||
|
|
||||||
import { Canvas } from 'canvas';
|
|
||||||
|
|
||||||
import { FastifyInstance, fastify, FastifyRequest } from 'fastify';
|
import { FastifyInstance, fastify, FastifyRequest } from 'fastify';
|
||||||
import * as fastifyWebsocket from '@fastify/websocket';
|
import * as fastifyWebsocket from '@fastify/websocket';
|
||||||
|
|
||||||
import { WebSocket } from 'ws';
|
import { SocketVM } from './SocketVM.js';
|
||||||
|
import { VMUser } from './VMUser.js';
|
||||||
import Queue from 'mnemonist/queue.js';
|
|
||||||
import { kMaxUserNameLength } from '@socketcomputer/shared';
|
|
||||||
|
|
||||||
// for the maximum socket.io experience
|
|
||||||
const kCanvasJpegQuality = 0.25;
|
|
||||||
|
|
||||||
class VMUser {
|
|
||||||
public connection: WebSocket;
|
|
||||||
public address: string;
|
|
||||||
public username: string = "";
|
|
||||||
private vm: SocketVM;
|
|
||||||
|
|
||||||
|
|
||||||
constructor(connection: WebSocket, slot: SocketVM, address: string) {
|
|
||||||
this.connection = connection;
|
|
||||||
this.address = address;
|
|
||||||
this.vm = slot;
|
|
||||||
|
|
||||||
this.vm.AddUser(this);
|
|
||||||
|
|
||||||
this.connection.on('message', async (data, isBinary) => {
|
|
||||||
if (!isBinary) this.connection.close(1000);
|
|
||||||
await this.vm.OnWSMessage(this, data as Buffer);
|
|
||||||
});
|
|
||||||
|
|
||||||
this.connection.on('close', async () => {
|
|
||||||
console.log('closed');
|
|
||||||
await this.vm.RemUser(this);
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
async SendMessage(messageGenerator: (encoder: Shared.MessageEncoder) => ArrayBuffer) {
|
|
||||||
await this.SendBuffer(messageGenerator(new Shared.MessageEncoder()));
|
|
||||||
}
|
|
||||||
|
|
||||||
async SendBuffer(buffer: ArrayBuffer): Promise<void> {
|
|
||||||
return new Promise((res, rej) => {
|
|
||||||
if (this.connection.readyState !== WebSocket.CLOSED) {
|
|
||||||
this.connection.send(buffer);
|
|
||||||
res();
|
|
||||||
}
|
|
||||||
rej(new Error('connection haves closed'));
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
static GenerateName() {
|
|
||||||
return `guest${Math.floor(Math.random() * (99999 - 10000) + 10000)}`;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
const kTurnTimeSeconds = 18;
|
|
||||||
|
|
||||||
type userAndTime = {
|
|
||||||
user: VMUser;
|
|
||||||
// waiting time if this user is not the front.
|
|
||||||
time: number;
|
|
||||||
};
|
|
||||||
|
|
||||||
// the turn queue. yes this is mostly stolen from cvmts but I make it cleaner by Seperate!!!!!!!
|
|
||||||
class TurnQueue extends EventEmitter {
|
|
||||||
private queue: Queue<VMUser> = new Queue<VMUser>();
|
|
||||||
private turnTime = kTurnTimeSeconds;
|
|
||||||
private interval: NodeJS.Timeout|null = null;
|
|
||||||
|
|
||||||
constructor() {
|
|
||||||
super();
|
|
||||||
}
|
|
||||||
|
|
||||||
public CurrentUser(): VMUser|null {
|
|
||||||
if(this.queue.peek() == undefined)
|
|
||||||
return null;
|
|
||||||
// We already check if it'll be undefined
|
|
||||||
return this.queue.peek()!;
|
|
||||||
}
|
|
||||||
|
|
||||||
public TryEnqueue(user: VMUser) {
|
|
||||||
// Already the current user
|
|
||||||
if (this.CurrentUser() == user) return;
|
|
||||||
|
|
||||||
// Already in the queue
|
|
||||||
if (this.queue.toArray().indexOf(user) !== -1) return;
|
|
||||||
|
|
||||||
this.queue.enqueue(user);
|
|
||||||
if (this.queue.size == 1)
|
|
||||||
this.nextTurn();
|
|
||||||
else
|
|
||||||
this.updateQueue();
|
|
||||||
}
|
|
||||||
|
|
||||||
public TryRemove(user: VMUser) {
|
|
||||||
if (this.queue.toArray().indexOf(user) !== -1) {
|
|
||||||
let hadTurn = (this.CurrentUser() === user);
|
|
||||||
this.queue = Queue.from(this.queue.toArray().filter(u => u !== user));
|
|
||||||
if (hadTurn) this.nextTurn();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private turnInterval() {
|
|
||||||
this.turnTime--;
|
|
||||||
if (this.turnTime < 1) {
|
|
||||||
this.queue.dequeue();
|
|
||||||
this.nextTurn();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private updateQueue() {
|
|
||||||
// removes the front of the quuee
|
|
||||||
let arr = this.queue.toArray();
|
|
||||||
|
|
||||||
let arr2: Array<userAndTime> = arr.map((u, index) => {
|
|
||||||
let time = this.turnTime * 1000;
|
|
||||||
if(index != 0) {
|
|
||||||
time = this.turnTime * 1000 + ((index - 1) * (kTurnTimeSeconds * 1000))
|
|
||||||
}
|
|
||||||
return {
|
|
||||||
user: u,
|
|
||||||
time: time
|
|
||||||
};
|
|
||||||
}, this);
|
|
||||||
|
|
||||||
this.emit('turnQueue', arr2);
|
|
||||||
}
|
|
||||||
|
|
||||||
private nextTurn() {
|
|
||||||
clearInterval(this.interval!);
|
|
||||||
if (this.queue.size === 0) {
|
|
||||||
} else {
|
|
||||||
this.turnTime = kTurnTimeSeconds;
|
|
||||||
this.interval = setInterval(() => this.turnInterval(), 1000);
|
|
||||||
}
|
|
||||||
|
|
||||||
this.updateQueue();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
class SocketVM extends EventEmitter {
|
|
||||||
private vm: QemuVM;
|
|
||||||
private display: QemuDisplay|null = null;
|
|
||||||
|
|
||||||
private timer: ExtendableTimer = new ExtendableTimer(15);
|
|
||||||
private users: Array<VMUser> = [];
|
|
||||||
private queue: TurnQueue = new TurnQueue();
|
|
||||||
|
|
||||||
constructor(vm: QemuVM) {
|
|
||||||
super();
|
|
||||||
this.vm = vm;
|
|
||||||
|
|
||||||
this.timer.on('expired', async () => {
|
|
||||||
// bye bye!
|
|
||||||
console.log(`[SocketVM] VM timer expired, resetting..`);
|
|
||||||
await this.vm.Stop();
|
|
||||||
});
|
|
||||||
|
|
||||||
this.timer.on('expiry-near', async () => {
|
|
||||||
console.log(`[SocketVM] VM timer expires in 1 minute.`);
|
|
||||||
});
|
|
||||||
|
|
||||||
this.queue.on('turnQueue', (arr: Array<userAndTime>) => {
|
|
||||||
if(arr.length == 0) {
|
|
||||||
this.BroadcastMessage((encoder: Shared.MessageEncoder) => {
|
|
||||||
// Empty queue
|
|
||||||
encoder.Init(8);
|
|
||||||
encoder.SetTurnSrvMessage(0, []);
|
|
||||||
return encoder.Finish();
|
|
||||||
});
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
let front = this.queue.CurrentUser();
|
|
||||||
|
|
||||||
for(let user of arr.filter((u) => (u.user !== front))) {
|
|
||||||
user.user.SendMessage((encoder: Shared.MessageEncoder) => {
|
|
||||||
encoder.Init(16 + (arr.length * (2+kMaxUserNameLength)));
|
|
||||||
encoder.SetTurnSrvMessage(user.time, arr.map((item) => {
|
|
||||||
return item.user.username;
|
|
||||||
}));
|
|
||||||
return encoder.Finish();
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
if(front) {
|
|
||||||
front.SendMessage((encoder: Shared.MessageEncoder) => {
|
|
||||||
encoder.Init(16 + (arr.length * (2+kMaxUserNameLength)));
|
|
||||||
encoder.SetTurnSrvMessage(kTurnTimeSeconds * 1000, arr.map((item) => {
|
|
||||||
return item.user.username;
|
|
||||||
}));
|
|
||||||
return encoder.Finish();
|
|
||||||
})
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
|
|
||||||
this.vm.on('statechange', async (state: VMState) => {
|
|
||||||
if (state == VMState.Started) {
|
|
||||||
this.display = this.vm.GetDisplay();
|
|
||||||
await this.VMRunning();
|
|
||||||
} else if (state == VMState.Stopped) {
|
|
||||||
this.display = null;
|
|
||||||
await this.VMStopped();
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
async Start() {
|
|
||||||
await this.vm.Start();
|
|
||||||
}
|
|
||||||
|
|
||||||
async AddUser(user: VMUser) {
|
|
||||||
user.username = VMUser.GenerateName();
|
|
||||||
|
|
||||||
console.log(`[SocketVM] ${user.username} (IP ${user.address}) joined`);
|
|
||||||
|
|
||||||
// send bullshit
|
|
||||||
|
|
||||||
await this.sendFullScreen(user);
|
|
||||||
|
|
||||||
// send an adduser to the user for themselves
|
|
||||||
await user.SendMessage((encoder: Shared.MessageEncoder) => {
|
|
||||||
encoder.Init(4 + Shared.kMaxUserNameLength);
|
|
||||||
encoder.SetAddUserMessage(user.username);
|
|
||||||
return encoder.Finish();
|
|
||||||
});
|
|
||||||
|
|
||||||
// send an adduser for the other users
|
|
||||||
for (let userEntry of this.users) {
|
|
||||||
await user.SendMessage((encoder: Shared.MessageEncoder) => {
|
|
||||||
encoder.Init(4 + Shared.kMaxUserNameLength);
|
|
||||||
encoder.SetAddUserMessage(userEntry.username);
|
|
||||||
return encoder.Finish();
|
|
||||||
});
|
|
||||||
|
|
||||||
// also let the other user know about this user joining
|
|
||||||
await userEntry.SendMessage((encoder: Shared.MessageEncoder) => {
|
|
||||||
encoder.Init(4 + Shared.kMaxUserNameLength);
|
|
||||||
encoder.SetAddUserMessage(user.username);
|
|
||||||
return encoder.Finish();
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
// officially add the user
|
|
||||||
this.users.push(user);
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
async RemUser(user: VMUser) {
|
|
||||||
console.log(`[SocketVM] ${user.username} (IP ${user.address}) left`);
|
|
||||||
|
|
||||||
this.users.splice(this.users.indexOf(user), 1);
|
|
||||||
this.queue.TryRemove(user);
|
|
||||||
|
|
||||||
// bye-bye!
|
|
||||||
await this.BroadcastMessage((encoder: Shared.MessageEncoder) => {
|
|
||||||
encoder.Init(4 + Shared.kMaxUserNameLength);
|
|
||||||
encoder.SetRemUserMessage(user.username);
|
|
||||||
return encoder.Finish();
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
async OnWSMessage(user: VMUser, message: Buffer) {
|
|
||||||
try {
|
|
||||||
let messageBuffer = message.buffer.slice(message.byteOffset);
|
|
||||||
this.OnDecodedMessage(user, await Shared.MessageDecoder.ReadMessage(messageBuffer, false));
|
|
||||||
} catch (err) {
|
|
||||||
// get out
|
|
||||||
console.log(`FUCK! (user ${user.username}, ip ${user.address})`, err);
|
|
||||||
user.connection.close();
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private async OnDecodedMessage(user: VMUser, message: Shared.DeserializedMessage) {
|
|
||||||
switch (message.type) {
|
|
||||||
case Shared.MessageType.Turn:
|
|
||||||
this.queue.TryEnqueue(user);
|
|
||||||
break;
|
|
||||||
|
|
||||||
case Shared.MessageType.Mouse:
|
|
||||||
if(user != this.queue.CurrentUser())
|
|
||||||
return;
|
|
||||||
if(this.display == null)
|
|
||||||
return;
|
|
||||||
this.display.MouseEvent((message as Shared.MouseMessage).x, (message as Shared.MouseMessage).y, (message as Shared.MouseMessage).buttons);
|
|
||||||
break;
|
|
||||||
|
|
||||||
case Shared.MessageType.Key:
|
|
||||||
if(user != this.queue.CurrentUser())
|
|
||||||
return;
|
|
||||||
|
|
||||||
if(this.display == null)
|
|
||||||
return;
|
|
||||||
|
|
||||||
this.display.KeyboardEvent((message as Shared.KeyMessage).keysym, (message as Shared.KeyMessage).pressed);
|
|
||||||
break;
|
|
||||||
|
|
||||||
// ignore unhandlable messages (we won't get any invalid ones because they will cause a throw)
|
|
||||||
default:
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private async BroadcastMessage(messageGenerator: (encoder: Shared.MessageEncoder) => ArrayBuffer) {
|
|
||||||
let buffer = messageGenerator(new Shared.MessageEncoder());
|
|
||||||
for (let user of this.users) {
|
|
||||||
await user.SendBuffer(buffer);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private async InsertCD(isoPath: string) {
|
|
||||||
await this.vm.ChangeRemovableMedia('vm.cd', isoPath);
|
|
||||||
}
|
|
||||||
|
|
||||||
private async VMRunning() {
|
|
||||||
|
|
||||||
let self = this;
|
|
||||||
|
|
||||||
// Hook up the display
|
|
||||||
this.display?.on('resize', async (width: number, height: number) => {
|
|
||||||
if(self.display == null)
|
|
||||||
return;
|
|
||||||
|
|
||||||
await self.BroadcastMessage((encoder: Shared.MessageEncoder) => {
|
|
||||||
encoder.Init(4);
|
|
||||||
encoder.SetDisplaySizeMessage(width, height);
|
|
||||||
return encoder.Finish();
|
|
||||||
});
|
|
||||||
|
|
||||||
// sexy cream!
|
|
||||||
|
|
||||||
|
|
||||||
let canvas = self.display.GetCanvas();
|
|
||||||
|
|
||||||
let buffer = canvas.toBuffer('image/jpeg', { quality: kCanvasJpegQuality });
|
|
||||||
|
|
||||||
await self.BroadcastMessage((encoder: Shared.MessageEncoder) => {
|
|
||||||
encoder.Init(buffer.length + 8);
|
|
||||||
encoder.SetDisplayRectMessage(0, 0, buffer);
|
|
||||||
return encoder.Finish();
|
|
||||||
});
|
|
||||||
});
|
|
||||||
|
|
||||||
this.display?.on('rect', async (x: number, y: number, rect: ImageData) => {
|
|
||||||
let canvas = new Canvas(rect.width, rect.height);
|
|
||||||
canvas.getContext('2d').putImageData(rect, 0, 0);
|
|
||||||
|
|
||||||
let buffer = canvas.toBuffer('image/jpeg', { quality: kCanvasJpegQuality });
|
|
||||||
|
|
||||||
await this.BroadcastMessage((encoder: Shared.MessageEncoder) => {
|
|
||||||
encoder.Init(buffer.length + 8);
|
|
||||||
encoder.SetDisplayRectMessage(x, y, buffer);
|
|
||||||
return encoder.Finish();
|
|
||||||
});
|
|
||||||
});
|
|
||||||
|
|
||||||
this.timer.Start();
|
|
||||||
}
|
|
||||||
|
|
||||||
private async sendFullScreen(user: VMUser) {
|
|
||||||
if (this.display == null) return;
|
|
||||||
|
|
||||||
let buffer = this.display.GetCanvas().toBuffer('image/jpeg', { quality: kCanvasJpegQuality });
|
|
||||||
|
|
||||||
await user.SendMessage((encoder: Shared.MessageEncoder) => {
|
|
||||||
encoder.Init(8);
|
|
||||||
encoder.SetDisplaySizeMessage(this.display!.Size().width, this.display!.Size().height);
|
|
||||||
return encoder.Finish();
|
|
||||||
});
|
|
||||||
|
|
||||||
await user.SendMessage((encoder: Shared.MessageEncoder) => {
|
|
||||||
encoder.Init(buffer.length + 8);
|
|
||||||
encoder.SetDisplayRectMessage(0, 0, buffer);
|
|
||||||
return encoder.Finish();
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
private async VMStopped() {
|
|
||||||
await this.vm.Start();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// CONFIG types (not used yet)
|
// CONFIG types (not used yet)
|
||||||
export type SocketComputerConfig_VM = {
|
export type SocketComputerConfig_VM = {
|
||||||
ramsize: string,
|
ramsize: string;
|
||||||
hda: string,
|
hda: string;
|
||||||
netdev: string
|
netdev: string;
|
||||||
};
|
};
|
||||||
|
|
||||||
export type SocketComputerConfig = {
|
export type SocketComputerConfig = {
|
||||||
listen: string,
|
listen: string;
|
||||||
port: number
|
port: number;
|
||||||
vm: SocketComputerConfig_VM
|
vm: SocketComputerConfig_VM;
|
||||||
};
|
};
|
||||||
|
|
||||||
export class SocketComputerServer {
|
export class SocketComputerServer {
|
||||||
|
@ -412,7 +28,7 @@ export class SocketComputerServer {
|
||||||
|
|
||||||
Init() {
|
Init() {
|
||||||
this.fastify.register(fastifyWebsocket.default);
|
this.fastify.register(fastifyWebsocket.default);
|
||||||
this.fastify.register(async (app, _) => this.CTRoutes(app), {});
|
this.fastify.register(async (app, _) => this.Routes(app), {});
|
||||||
}
|
}
|
||||||
|
|
||||||
async Listen() {
|
async Listen() {
|
||||||
|
@ -432,24 +48,26 @@ export class SocketComputerServer {
|
||||||
}
|
}
|
||||||
|
|
||||||
async InitVM() {
|
async InitVM() {
|
||||||
|
// Create the VM definition
|
||||||
let diskpath = '/srv/collabvm/vms/socket1/winxp.qcow2';
|
let diskpath = '/srv/collabvm/vms/socket1/winxp.qcow2';
|
||||||
let slotDef: QemuVmDefinition = Slot_PCDef('2G', '-netdev tap,ifname=ktsocket1,script=no,downscript=no,id=vm.wan', 'rtl8139', 'c0:11:ab:69:44:02', true, diskpath, 'qcow2');
|
let slotDef: QemuVmDefinition = Slot_PCDef('2G', '-netdev tap,ifname=ktsocket1,script=no,downscript=no,id=vm.wan', 'rtl8139', 'c0:11:ab:69:44:02', true, diskpath, 'qcow2');
|
||||||
|
|
||||||
setSnapshot(true);
|
// Create the VM
|
||||||
|
|
||||||
// create the slot for real!
|
|
||||||
this.vm = new SocketVM(new QemuVM(slotDef));
|
this.vm = new SocketVM(new QemuVM(slotDef));
|
||||||
await this.vm.Start(); // boot it up
|
|
||||||
|
// Boot it up
|
||||||
|
setSnapshot(true);
|
||||||
|
await this.vm.Start();
|
||||||
}
|
}
|
||||||
|
|
||||||
CTRoutes(app: FastifyInstance) {
|
Routes(app: FastifyInstance) {
|
||||||
let self = this;
|
let self = this;
|
||||||
|
|
||||||
// @ts-ignore (fastify types are broken...)
|
// @ts-ignore (fastify types are broken...)
|
||||||
app.get('/', { websocket: true }, (connection: fastifyWebsocket.WebSocket, req: FastifyRequest) => {
|
app.get('/', { websocket: true }, (connection: fastifyWebsocket.WebSocket, req: FastifyRequest) => {
|
||||||
let address = req.ip;
|
let address = req.ip;
|
||||||
if(req.headers["cf-connecting-ip"] !== undefined) {
|
if (req.headers['cf-connecting-ip'] !== undefined) {
|
||||||
address = req.headers["cf-connecting-ip"] as string;
|
address = req.headers['cf-connecting-ip'] as string;
|
||||||
}
|
}
|
||||||
new VMUser(connection, self.vm!, address);
|
new VMUser(connection, self.vm!, address);
|
||||||
});
|
});
|
||||||
|
|
259
backend/src/SocketVM.ts
Normal file
259
backend/src/SocketVM.ts
Normal file
|
@ -0,0 +1,259 @@
|
||||||
|
import { EventEmitter } from "node:events";
|
||||||
|
import { TurnQueue, UserTimeTuple, kTurnTimeSeconds } from "./TurnQueue.js";
|
||||||
|
import { VMUser } from "./VMUser.js";
|
||||||
|
import { QemuDisplay, QemuVM, VMState } from '@socketcomputer/qemu';
|
||||||
|
|
||||||
|
import { ExtendableTimer } from './ExtendableTimer.js';
|
||||||
|
import { kMaxUserNameLength } from '@socketcomputer/shared';
|
||||||
|
|
||||||
|
import * as Shared from '@socketcomputer/shared';
|
||||||
|
import { Canvas } from 'canvas';
|
||||||
|
|
||||||
|
// for the maximum socket.io experience
|
||||||
|
const kCanvasJpegQuality = 0.25;
|
||||||
|
|
||||||
|
|
||||||
|
export class SocketVM extends EventEmitter {
|
||||||
|
private vm: QemuVM;
|
||||||
|
private display: QemuDisplay|null = null;
|
||||||
|
|
||||||
|
private timer: ExtendableTimer = new ExtendableTimer(15);
|
||||||
|
private users: Array<VMUser> = [];
|
||||||
|
private queue: TurnQueue = new TurnQueue();
|
||||||
|
|
||||||
|
constructor(vm: QemuVM) {
|
||||||
|
super();
|
||||||
|
this.vm = vm;
|
||||||
|
|
||||||
|
this.timer.on('expired', async () => {
|
||||||
|
// bye bye!
|
||||||
|
console.log(`[SocketVM] VM timer expired, resetting..`);
|
||||||
|
await this.vm.Stop();
|
||||||
|
});
|
||||||
|
|
||||||
|
this.timer.on('expiry-near', async () => {
|
||||||
|
console.log(`[SocketVM] VM timer expires in 1 minute.`);
|
||||||
|
});
|
||||||
|
|
||||||
|
this.queue.on('turnQueue', (arr: Array<UserTimeTuple>) => {
|
||||||
|
if(arr.length == 0) {
|
||||||
|
this.BroadcastMessage((encoder: Shared.MessageEncoder) => {
|
||||||
|
// Empty queue
|
||||||
|
encoder.Init(8);
|
||||||
|
encoder.SetTurnSrvMessage(0, []);
|
||||||
|
return encoder.Finish();
|
||||||
|
});
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
let front = this.queue.CurrentUser();
|
||||||
|
|
||||||
|
for(let user of arr.filter((u) => (u.user !== front))) {
|
||||||
|
user.user.SendMessage((encoder: Shared.MessageEncoder) => {
|
||||||
|
encoder.Init(16 + (arr.length * (2+kMaxUserNameLength)));
|
||||||
|
encoder.SetTurnSrvMessage(user.time, arr.map((item) => {
|
||||||
|
return item.user.username;
|
||||||
|
}));
|
||||||
|
return encoder.Finish();
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
if(front) {
|
||||||
|
front.SendMessage((encoder: Shared.MessageEncoder) => {
|
||||||
|
encoder.Init(16 + (arr.length * (2+kMaxUserNameLength)));
|
||||||
|
encoder.SetTurnSrvMessage(kTurnTimeSeconds * 1000, arr.map((item) => {
|
||||||
|
return item.user.username;
|
||||||
|
}));
|
||||||
|
return encoder.Finish();
|
||||||
|
})
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
|
||||||
|
this.vm.on('statechange', async (state: VMState) => {
|
||||||
|
if (state == VMState.Started) {
|
||||||
|
this.display = this.vm.GetDisplay();
|
||||||
|
await this.VMRunning();
|
||||||
|
} else if (state == VMState.Stopped) {
|
||||||
|
this.display = null;
|
||||||
|
await this.VMStopped();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
async Start() {
|
||||||
|
await this.vm.Start();
|
||||||
|
}
|
||||||
|
|
||||||
|
async AddUser(user: VMUser) {
|
||||||
|
user.username = VMUser.GenerateName();
|
||||||
|
|
||||||
|
console.log(`[SocketVM] ${user.username} (IP ${user.address}) joined`);
|
||||||
|
|
||||||
|
// send bullshit
|
||||||
|
|
||||||
|
await this.sendFullScreen(user);
|
||||||
|
|
||||||
|
// send an adduser to the user for themselves
|
||||||
|
await user.SendMessage((encoder: Shared.MessageEncoder) => {
|
||||||
|
encoder.Init(4 + Shared.kMaxUserNameLength);
|
||||||
|
encoder.SetAddUserMessage(user.username);
|
||||||
|
return encoder.Finish();
|
||||||
|
});
|
||||||
|
|
||||||
|
// send an adduser for the other users
|
||||||
|
for (let userEntry of this.users) {
|
||||||
|
await user.SendMessage((encoder: Shared.MessageEncoder) => {
|
||||||
|
encoder.Init(4 + Shared.kMaxUserNameLength);
|
||||||
|
encoder.SetAddUserMessage(userEntry.username);
|
||||||
|
return encoder.Finish();
|
||||||
|
});
|
||||||
|
|
||||||
|
// also let the other user know about this user joining
|
||||||
|
await userEntry.SendMessage((encoder: Shared.MessageEncoder) => {
|
||||||
|
encoder.Init(4 + Shared.kMaxUserNameLength);
|
||||||
|
encoder.SetAddUserMessage(user.username);
|
||||||
|
return encoder.Finish();
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
// officially add the user
|
||||||
|
this.users.push(user);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
async RemUser(user: VMUser) {
|
||||||
|
console.log(`[SocketVM] ${user.username} (IP ${user.address}) left`);
|
||||||
|
|
||||||
|
this.users.splice(this.users.indexOf(user), 1);
|
||||||
|
this.queue.TryRemove(user);
|
||||||
|
|
||||||
|
// bye-bye!
|
||||||
|
await this.BroadcastMessage((encoder: Shared.MessageEncoder) => {
|
||||||
|
encoder.Init(4 + Shared.kMaxUserNameLength);
|
||||||
|
encoder.SetRemUserMessage(user.username);
|
||||||
|
return encoder.Finish();
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
async OnWSMessage(user: VMUser, message: Buffer) {
|
||||||
|
try {
|
||||||
|
let messageBuffer = message.buffer.slice(message.byteOffset);
|
||||||
|
this.OnDecodedMessage(user, await Shared.MessageDecoder.ReadMessage(messageBuffer, false));
|
||||||
|
} catch (err) {
|
||||||
|
// Log the error and close the connection
|
||||||
|
console.log(`Error decoding message, closing connection: (user ${user.username}, ip ${user.address})`, err);
|
||||||
|
user.connection.close();
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private async OnDecodedMessage(user: VMUser, message: Shared.DeserializedMessage) {
|
||||||
|
switch (message.type) {
|
||||||
|
case Shared.MessageType.Turn:
|
||||||
|
this.queue.TryEnqueue(user);
|
||||||
|
break;
|
||||||
|
|
||||||
|
case Shared.MessageType.Mouse:
|
||||||
|
if(user != this.queue.CurrentUser())
|
||||||
|
return;
|
||||||
|
if(this.display == null)
|
||||||
|
return;
|
||||||
|
this.display.MouseEvent((message as Shared.MouseMessage).x, (message as Shared.MouseMessage).y, (message as Shared.MouseMessage).buttons);
|
||||||
|
break;
|
||||||
|
|
||||||
|
case Shared.MessageType.Key:
|
||||||
|
if(user != this.queue.CurrentUser())
|
||||||
|
return;
|
||||||
|
|
||||||
|
if(this.display == null)
|
||||||
|
return;
|
||||||
|
|
||||||
|
this.display.KeyboardEvent((message as Shared.KeyMessage).keysym, (message as Shared.KeyMessage).pressed);
|
||||||
|
break;
|
||||||
|
|
||||||
|
// ignore messages we don't know about
|
||||||
|
default:
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private async BroadcastMessage(messageGenerator: (encoder: Shared.MessageEncoder) => ArrayBuffer) {
|
||||||
|
let buffer = messageGenerator(new Shared.MessageEncoder());
|
||||||
|
for (let user of this.users) {
|
||||||
|
await user.SendBuffer(buffer);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private async InsertCD(isoPath: string) {
|
||||||
|
await this.vm.ChangeRemovableMedia('vm.cd', isoPath);
|
||||||
|
}
|
||||||
|
|
||||||
|
private async VMRunning() {
|
||||||
|
|
||||||
|
let self = this;
|
||||||
|
|
||||||
|
// Hook up the display
|
||||||
|
this.display?.on('resize', async (width: number, height: number) => {
|
||||||
|
if(self.display == null)
|
||||||
|
return;
|
||||||
|
|
||||||
|
await self.BroadcastMessage((encoder: Shared.MessageEncoder) => {
|
||||||
|
encoder.Init(4);
|
||||||
|
encoder.SetDisplaySizeMessage(width, height);
|
||||||
|
return encoder.Finish();
|
||||||
|
});
|
||||||
|
|
||||||
|
// sexy cream!
|
||||||
|
|
||||||
|
|
||||||
|
let canvas = self.display.GetCanvas();
|
||||||
|
|
||||||
|
let buffer = canvas.toBuffer('image/jpeg', { quality: kCanvasJpegQuality });
|
||||||
|
|
||||||
|
await self.BroadcastMessage((encoder: Shared.MessageEncoder) => {
|
||||||
|
encoder.Init(buffer.length + 8);
|
||||||
|
encoder.SetDisplayRectMessage(0, 0, buffer);
|
||||||
|
return encoder.Finish();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
this.display?.on('rect', async (x: number, y: number, rect: ImageData) => {
|
||||||
|
let canvas = new Canvas(rect.width, rect.height);
|
||||||
|
canvas.getContext('2d').putImageData(rect, 0, 0);
|
||||||
|
|
||||||
|
let buffer = canvas.toBuffer('image/jpeg', { quality: kCanvasJpegQuality });
|
||||||
|
|
||||||
|
await this.BroadcastMessage((encoder: Shared.MessageEncoder) => {
|
||||||
|
encoder.Init(buffer.length + 8);
|
||||||
|
encoder.SetDisplayRectMessage(x, y, buffer);
|
||||||
|
return encoder.Finish();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
this.timer.Start();
|
||||||
|
}
|
||||||
|
|
||||||
|
private async sendFullScreen(user: VMUser) {
|
||||||
|
if (this.display == null) return;
|
||||||
|
|
||||||
|
let buffer = this.display.GetCanvas().toBuffer('image/jpeg', { quality: kCanvasJpegQuality });
|
||||||
|
|
||||||
|
await user.SendMessage((encoder: Shared.MessageEncoder) => {
|
||||||
|
encoder.Init(8);
|
||||||
|
encoder.SetDisplaySizeMessage(this.display!.Size().width, this.display!.Size().height);
|
||||||
|
return encoder.Finish();
|
||||||
|
});
|
||||||
|
|
||||||
|
await user.SendMessage((encoder: Shared.MessageEncoder) => {
|
||||||
|
encoder.Init(buffer.length + 8);
|
||||||
|
encoder.SetDisplayRectMessage(0, 0, buffer);
|
||||||
|
return encoder.Finish();
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
private async VMStopped() {
|
||||||
|
await this.vm.Start();
|
||||||
|
}
|
||||||
|
}
|
85
backend/src/TurnQueue.ts
Normal file
85
backend/src/TurnQueue.ts
Normal file
|
@ -0,0 +1,85 @@
|
||||||
|
import { EventEmitter } from 'node:events';
|
||||||
|
import Queue from 'mnemonist/queue.js';
|
||||||
|
import { VMUser } from './VMUser';
|
||||||
|
|
||||||
|
export const kTurnTimeSeconds = 18;
|
||||||
|
|
||||||
|
export type UserTimeTuple = {
|
||||||
|
user: VMUser;
|
||||||
|
// waiting time if this user is not the front.
|
||||||
|
time: number;
|
||||||
|
};
|
||||||
|
|
||||||
|
// the turn queue. yes this is mostly stolen from cvmts but I make it cleaner by Seperate!!!!!!!
|
||||||
|
export class TurnQueue extends EventEmitter {
|
||||||
|
private queue: Queue<VMUser> = new Queue<VMUser>();
|
||||||
|
private turnTime = kTurnTimeSeconds;
|
||||||
|
private interval: NodeJS.Timeout | null = null;
|
||||||
|
|
||||||
|
constructor() {
|
||||||
|
super();
|
||||||
|
}
|
||||||
|
|
||||||
|
public CurrentUser(): VMUser | null {
|
||||||
|
if (this.queue.peek() == undefined) return null;
|
||||||
|
// We already check if it'll be undefined
|
||||||
|
return this.queue.peek()!;
|
||||||
|
}
|
||||||
|
|
||||||
|
public TryEnqueue(user: VMUser) {
|
||||||
|
// Already the current user
|
||||||
|
if (this.CurrentUser() == user) return;
|
||||||
|
|
||||||
|
// Already in the queue
|
||||||
|
if (this.queue.toArray().indexOf(user) !== -1) return;
|
||||||
|
|
||||||
|
this.queue.enqueue(user);
|
||||||
|
if (this.queue.size == 1) this.nextTurn();
|
||||||
|
else this.updateQueue();
|
||||||
|
}
|
||||||
|
|
||||||
|
public TryRemove(user: VMUser) {
|
||||||
|
if (this.queue.toArray().indexOf(user) !== -1) {
|
||||||
|
let hadTurn = this.CurrentUser() === user;
|
||||||
|
this.queue = Queue.from(this.queue.toArray().filter((u) => u !== user));
|
||||||
|
if (hadTurn) this.nextTurn();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private turnInterval() {
|
||||||
|
this.turnTime--;
|
||||||
|
if (this.turnTime < 1) {
|
||||||
|
this.queue.dequeue();
|
||||||
|
this.nextTurn();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private updateQueue() {
|
||||||
|
// removes the front of the quuee
|
||||||
|
let arr = this.queue.toArray();
|
||||||
|
|
||||||
|
let arr2: Array<UserTimeTuple> = arr.map((u, index) => {
|
||||||
|
let time = this.turnTime * 1000;
|
||||||
|
if (index != 0) {
|
||||||
|
time = this.turnTime * 1000 + (index - 1) * (kTurnTimeSeconds * 1000);
|
||||||
|
}
|
||||||
|
return {
|
||||||
|
user: u,
|
||||||
|
time: time
|
||||||
|
};
|
||||||
|
}, this);
|
||||||
|
|
||||||
|
this.emit('turnQueue', arr2);
|
||||||
|
}
|
||||||
|
|
||||||
|
private nextTurn() {
|
||||||
|
clearInterval(this.interval!);
|
||||||
|
if (this.queue.size === 0) {
|
||||||
|
} else {
|
||||||
|
this.turnTime = kTurnTimeSeconds;
|
||||||
|
this.interval = setInterval(() => this.turnInterval(), 1000);
|
||||||
|
}
|
||||||
|
|
||||||
|
this.updateQueue();
|
||||||
|
}
|
||||||
|
}
|
48
backend/src/VMUser.ts
Normal file
48
backend/src/VMUser.ts
Normal file
|
@ -0,0 +1,48 @@
|
||||||
|
import { WebSocket } from "ws";
|
||||||
|
import { SocketVM } from "./SocketVM";
|
||||||
|
|
||||||
|
import * as Shared from "@socketcomputer/shared";
|
||||||
|
|
||||||
|
export class VMUser {
|
||||||
|
public connection: WebSocket;
|
||||||
|
public address: string;
|
||||||
|
public username: string = "";
|
||||||
|
private vm: SocketVM;
|
||||||
|
|
||||||
|
|
||||||
|
constructor(connection: WebSocket, slot: SocketVM, address: string) {
|
||||||
|
this.connection = connection;
|
||||||
|
this.address = address;
|
||||||
|
this.vm = slot;
|
||||||
|
|
||||||
|
this.vm.AddUser(this);
|
||||||
|
|
||||||
|
this.connection.on('message', async (data, isBinary) => {
|
||||||
|
if (!isBinary) this.connection.close(1000);
|
||||||
|
await this.vm.OnWSMessage(this, data as Buffer);
|
||||||
|
});
|
||||||
|
|
||||||
|
this.connection.on('close', async () => {
|
||||||
|
console.log('closed');
|
||||||
|
await this.vm.RemUser(this);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
async SendMessage(messageGenerator: (encoder: Shared.MessageEncoder) => ArrayBuffer) {
|
||||||
|
await this.SendBuffer(messageGenerator(new Shared.MessageEncoder()));
|
||||||
|
}
|
||||||
|
|
||||||
|
async SendBuffer(buffer: ArrayBuffer): Promise<void> {
|
||||||
|
return new Promise((res, rej) => {
|
||||||
|
if (this.connection.readyState !== WebSocket.CLOSED) {
|
||||||
|
this.connection.send(buffer);
|
||||||
|
res();
|
||||||
|
}
|
||||||
|
rej(new Error('connection haves closed'));
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
static GenerateName() {
|
||||||
|
return `guest${Math.floor(Math.random() * (99999 - 10000) + 10000)}`;
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in a new issue