Refactoring

This commit is contained in:
Lily Tsuru 2024-04-07 08:37:43 -04:00
parent 9121cc487f
commit 7a06d167eb
3 changed files with 82 additions and 76 deletions

View file

@ -69,7 +69,13 @@ export class SocketComputerServer {
if (req.headers['cf-connecting-ip'] !== undefined) {
address = req.headers['cf-connecting-ip'] as string;
}
new VMUser(connection, self.vm!, address);
if(self.vm == null) {
connection.close(1000, "VM is not running");
return;
}
self.vm?.AddUser(new VMUser(connection, address));
});
}
}

View file

@ -1,6 +1,6 @@
import { EventEmitter } from "node:events";
import { TurnQueue, UserTimeTuple, kTurnTimeSeconds } from "./TurnQueue.js";
import { VMUser } from "./VMUser.js";
import { EventEmitter } from 'node:events';
import { TurnQueue, UserTimeTuple, kTurnTimeSeconds } from './TurnQueue.js';
import { VMUser } from './VMUser.js';
import { QemuDisplay, QemuVM, VMState } from '@socketcomputer/qemu';
import { ExtendableTimer } from './ExtendableTimer.js';
@ -12,10 +12,13 @@ import { Canvas } from 'canvas';
// for the maximum socket.io experience
const kCanvasJpegQuality = 0.25;
function EncodeCanvas(canvas: Canvas): Buffer {
return canvas.toBuffer('image/jpeg', { quality: kCanvasJpegQuality });
}
export class SocketVM extends EventEmitter {
private vm: QemuVM;
private display: QemuDisplay|null = null;
private display: QemuDisplay | null = null;
private timer: ExtendableTimer = new ExtendableTimer(15);
private users: Array<VMUser> = [];
@ -36,7 +39,7 @@ export class SocketVM extends EventEmitter {
});
this.queue.on('turnQueue', (arr: Array<UserTimeTuple>) => {
if(arr.length == 0) {
if (arr.length == 0) {
this.BroadcastMessage((encoder: Shared.MessageEncoder) => {
// Empty queue
encoder.Init(8);
@ -48,28 +51,33 @@ export class SocketVM extends EventEmitter {
let front = this.queue.CurrentUser();
for(let user of arr.filter((u) => (u.user !== front))) {
for (let user of arr.filter((u) => u.user !== front)) {
user.user.SendMessage((encoder: Shared.MessageEncoder) => {
encoder.Init(16 + (arr.length * (2+kMaxUserNameLength)));
encoder.SetTurnSrvMessage(user.time, arr.map((item) => {
return item.user.username;
}));
encoder.Init(16 + arr.length * (2 + kMaxUserNameLength));
encoder.SetTurnSrvMessage(
user.time,
arr.map((item) => {
return item.user.username;
})
);
return encoder.Finish();
})
});
}
if(front) {
if (front) {
front.SendMessage((encoder: Shared.MessageEncoder) => {
encoder.Init(16 + (arr.length * (2+kMaxUserNameLength)));
encoder.SetTurnSrvMessage(kTurnTimeSeconds * 1000, arr.map((item) => {
return item.user.username;
}));
encoder.Init(16 + arr.length * (2 + kMaxUserNameLength));
encoder.SetTurnSrvMessage(
kTurnTimeSeconds * 1000,
arr.map((item) => {
return item.user.username;
})
);
return encoder.Finish();
})
}
});
}
});
this.vm.on('statechange', async (state: VMState) => {
if (state == VMState.Started) {
this.display = this.vm.GetDisplay();
@ -79,7 +87,6 @@ export class SocketVM extends EventEmitter {
await this.VMStopped();
}
});
}
async Start() {
@ -88,14 +95,13 @@ export class SocketVM extends EventEmitter {
async AddUser(user: VMUser) {
user.username = VMUser.GenerateName();
user.vm = this;
console.log(`[SocketVM] ${user.username} (IP ${user.address}) joined`);
// send bullshit
await this.sendFullScreen(user);
await this.SendInitialScreen(user);
// send an adduser to the user for themselves
// send an adduser to the user for themselves so they know their name
await user.SendMessage((encoder: Shared.MessageEncoder) => {
encoder.Init(4 + Shared.kMaxUserNameLength);
encoder.SetAddUserMessage(user.username);
@ -120,7 +126,6 @@ export class SocketVM extends EventEmitter {
// officially add the user
this.users.push(user);
}
async RemUser(user: VMUser) {
@ -140,7 +145,7 @@ export class SocketVM extends EventEmitter {
async OnWSMessage(user: VMUser, message: Buffer) {
try {
let messageBuffer = message.buffer.slice(message.byteOffset);
this.OnDecodedMessage(user, await Shared.MessageDecoder.ReadMessage(messageBuffer, false));
this.OnMessage(user, await Shared.MessageDecoder.ReadMessage(messageBuffer, false));
} catch (err) {
// Log the error and close the connection
console.log(`Error decoding message, closing connection: (user ${user.username}, ip ${user.address})`, err);
@ -149,26 +154,22 @@ export class SocketVM extends EventEmitter {
}
}
private async OnDecodedMessage(user: VMUser, message: Shared.DeserializedMessage) {
private async OnMessage(user: VMUser, message: Shared.DeserializedMessage) {
switch (message.type) {
case Shared.MessageType.Turn:
this.queue.TryEnqueue(user);
break;
case Shared.MessageType.Mouse:
if(user != this.queue.CurrentUser())
return;
if(this.display == null)
return;
if (user != this.queue.CurrentUser()) return;
if (this.display == null) return;
this.display.MouseEvent((message as Shared.MouseMessage).x, (message as Shared.MouseMessage).y, (message as Shared.MouseMessage).buttons);
break;
case Shared.MessageType.Key:
if(user != this.queue.CurrentUser())
return;
if (user != this.queue.CurrentUser()) return;
if(this.display == null)
return;
if (this.display == null) return;
this.display.KeyboardEvent((message as Shared.KeyMessage).keysym, (message as Shared.KeyMessage).pressed);
break;
@ -180,7 +181,10 @@ export class SocketVM extends EventEmitter {
}
private async BroadcastMessage(messageGenerator: (encoder: Shared.MessageEncoder) => ArrayBuffer) {
let buffer = messageGenerator(new Shared.MessageEncoder());
await this.BroadcastBuffer(messageGenerator(new Shared.MessageEncoder()));
}
private async BroadcastBuffer(buffer: ArrayBuffer | Buffer) {
for (let user of this.users) {
await user.SendBuffer(buffer);
}
@ -191,39 +195,23 @@ export class SocketVM extends EventEmitter {
}
private async VMRunning() {
let self = this;
// Hook up the display
this.display?.on('resize', async (width: number, height: number) => {
if(self.display == null)
return;
if (self.display == null) return;
await self.BroadcastMessage((encoder: Shared.MessageEncoder) => {
encoder.Init(4);
encoder.SetDisplaySizeMessage(width, height);
return encoder.Finish();
});
let buffers = this.MakeFullScreenData();
// sexy cream!
let canvas = self.display.GetCanvas();
let buffer = canvas.toBuffer('image/jpeg', { quality: kCanvasJpegQuality });
await self.BroadcastMessage((encoder: Shared.MessageEncoder) => {
encoder.Init(buffer.length + 8);
encoder.SetDisplayRectMessage(0, 0, buffer);
return encoder.Finish();
});
for (let buffer of buffers)
await self.BroadcastBuffer(buffer);
});
this.display?.on('rect', async (x: number, y: number, rect: ImageData) => {
let canvas = new Canvas(rect.width, rect.height);
canvas.getContext('2d').putImageData(rect, 0, 0);
let buffer = canvas.toBuffer('image/jpeg', { quality: kCanvasJpegQuality });
let buffer = EncodeCanvas(canvas);
await this.BroadcastMessage((encoder: Shared.MessageEncoder) => {
encoder.Init(buffer.length + 8);
@ -235,22 +223,38 @@ export class SocketVM extends EventEmitter {
this.timer.Start();
}
private async sendFullScreen(user: VMUser) {
if (this.display == null) return;
private MakeFullScreenData(): Array<Buffer> {
let arr: Array<Buffer> = [];
let buffer = this.display.GetCanvas().toBuffer('image/jpeg', { quality: kCanvasJpegQuality });
const addMessage = (enc: (encoder: Shared.MessageEncoder) => ArrayBuffer) => {
arr.push(Buffer.from(enc(new Shared.MessageEncoder())));
};
await user.SendMessage((encoder: Shared.MessageEncoder) => {
if (this.display == null) return arr;
let buffer = EncodeCanvas(this.display.GetCanvas());
addMessage((encoder: Shared.MessageEncoder) => {
encoder.Init(8);
encoder.SetDisplaySizeMessage(this.display!.Size().width, this.display!.Size().height);
return encoder.Finish();
});
await user.SendMessage((encoder: Shared.MessageEncoder) => {
addMessage((encoder: Shared.MessageEncoder) => {
encoder.Init(buffer.length + 8);
encoder.SetDisplayRectMessage(0, 0, buffer);
return encoder.Finish();
});
return arr;
}
private async SendInitialScreen(user: VMUser) {
if (this.display == null) return;
let buffers = this.MakeFullScreenData();
for (let buffer of buffers) await user.SendBuffer(buffer);
}
private async VMStopped() {

View file

@ -1,4 +1,4 @@
import { WebSocket } from "ws";
import { WebSocket, RawData } from "ws";
import { SocketVM } from "./SocketVM";
import * as Shared from "@socketcomputer/shared";
@ -7,24 +7,20 @@ export class VMUser {
public connection: WebSocket;
public address: string;
public username: string = "";
private vm: SocketVM;
public vm: SocketVM|null = null;
constructor(connection: WebSocket, slot: SocketVM, address: string) {
constructor(connection: WebSocket, address: string) {
this.connection = connection;
this.address = address;
this.vm = slot;
this.vm.AddUser(this);
this.connection.on('message', async (data, isBinary) => {
if (!isBinary) this.connection.close(1000);
await this.vm.OnWSMessage(this, data as Buffer);
this.connection.on('message', async (data: RawData, isBinary: boolean) => {
if (!isBinary) this.connection.close(1000, "No");
await this.vm?.OnWSMessage(this, data as Buffer);
});
this.connection.on('close', async () => {
console.log('closed');
await this.vm.RemUser(this);
this.connection.on('close', async (code: number, reason: Buffer) => {
await this.vm?.RemUser(this);
});
}
@ -32,7 +28,7 @@ export class VMUser {
await this.SendBuffer(messageGenerator(new Shared.MessageEncoder()));
}
async SendBuffer(buffer: ArrayBuffer): Promise<void> {
async SendBuffer(buffer: Buffer|ArrayBuffer): Promise<void> {
return new Promise((res, rej) => {
if (this.connection.readyState !== WebSocket.CLOSED) {
this.connection.send(buffer);