From 9121cc487f885faaf6af23cf1606919160aefe5e Mon Sep 17 00:00:00 2001 From: modeco80 Date: Sat, 6 Apr 2024 23:46:37 -0400 Subject: [PATCH] Seperate server classes into new typescript modules Organization is nice. --- backend/src/ExtendableTimer.ts | 4 +- backend/src/SlotQemuDefs.ts | 6 +- backend/src/SocketComputerServer.ts | 422 ++-------------------------- backend/src/SocketVM.ts | 259 +++++++++++++++++ backend/src/TurnQueue.ts | 85 ++++++ backend/src/VMUser.ts | 48 ++++ 6 files changed, 416 insertions(+), 408 deletions(-) create mode 100644 backend/src/SocketVM.ts create mode 100644 backend/src/TurnQueue.ts create mode 100644 backend/src/VMUser.ts diff --git a/backend/src/ExtendableTimer.ts b/backend/src/ExtendableTimer.ts index 77453c8..a976565 100644 --- a/backend/src/ExtendableTimer.ts +++ b/backend/src/ExtendableTimer.ts @@ -18,8 +18,8 @@ export class ExtendableTimer extends EventEmitter { this.timeout = setTimeout(() => { this.iterationcount--; if (this.iterationcount == 1) { - this.emit('expiry-near'); - } else if (this.iterationcount == 0) { + this.emit('expiry-near'); + } else if (this.iterationcount == 0) { return this.emit('expired'); } this.Arm(); diff --git a/backend/src/SlotQemuDefs.ts b/backend/src/SlotQemuDefs.ts index 4b5914d..b791efb 100644 --- a/backend/src/SlotQemuDefs.ts +++ b/backend/src/SlotQemuDefs.ts @@ -43,9 +43,7 @@ export function Slot_PCDef( pushOption(`${netdevOption}`); pushOption(`-device ${netAdapterModel},id=vm.netadp,netdev=vm.wan,mac=${netMac}`); - pushOption( - `-drive if=none,file=${hdImagePath},cache=writeback,discard=unmap,format=${hdImageFormat},aio=${kQemuAio},id=vm.hda_drive,bps=65000000,bps_max=65000000,iops=1500,iops_max=2000` - ); + pushOption(`-drive if=none,file=${hdImagePath},cache=writeback,discard=unmap,format=${hdImageFormat},aio=${kQemuAio},id=vm.hda_drive,bps=65000000,bps_max=65000000,iops=1500,iops_max=2000`); // prettier-ignore if (hdaIsSsd) @@ -64,7 +62,7 @@ export function Slot_PCDef( pushOption('-device usb-tablet'); return { - id: "socketvm1", + id: 'socketvm1', command: qCommand }; } diff --git a/backend/src/SocketComputerServer.ts b/backend/src/SocketComputerServer.ts index 788146e..aa43583 100644 --- a/backend/src/SocketComputerServer.ts +++ b/backend/src/SocketComputerServer.ts @@ -1,418 +1,34 @@ -import { QemuVmDefinition, QemuDisplay, QemuVM, VMState, setSnapshot } from '@socketcomputer/qemu'; +import { QemuVmDefinition, QemuVM, setSnapshot } from '@socketcomputer/qemu'; import { Slot_PCDef } from './SlotQemuDefs.js'; -import { ExtendableTimer } from './ExtendableTimer.js'; -import { EventEmitter } from 'node:events'; -import * as Shared from '@socketcomputer/shared'; - -import { Canvas } from 'canvas'; import { FastifyInstance, fastify, FastifyRequest } from 'fastify'; import * as fastifyWebsocket from '@fastify/websocket'; -import { WebSocket } from 'ws'; - -import Queue from 'mnemonist/queue.js'; -import { kMaxUserNameLength } from '@socketcomputer/shared'; - -// for the maximum socket.io experience -const kCanvasJpegQuality = 0.25; - -class VMUser { - public connection: WebSocket; - public address: string; - public username: string = ""; - private vm: SocketVM; - - - constructor(connection: WebSocket, slot: SocketVM, address: string) { - this.connection = connection; - this.address = address; - this.vm = slot; - - this.vm.AddUser(this); - - this.connection.on('message', async (data, isBinary) => { - if (!isBinary) this.connection.close(1000); - await this.vm.OnWSMessage(this, data as Buffer); - }); - - this.connection.on('close', async () => { - console.log('closed'); - await this.vm.RemUser(this); - }); - } - - async SendMessage(messageGenerator: (encoder: Shared.MessageEncoder) => ArrayBuffer) { - await this.SendBuffer(messageGenerator(new Shared.MessageEncoder())); - } - - async SendBuffer(buffer: ArrayBuffer): Promise { - return new Promise((res, rej) => { - if (this.connection.readyState !== WebSocket.CLOSED) { - this.connection.send(buffer); - res(); - } - rej(new Error('connection haves closed')); - }); - } - - static GenerateName() { - return `guest${Math.floor(Math.random() * (99999 - 10000) + 10000)}`; - } -} - -const kTurnTimeSeconds = 18; - -type userAndTime = { - user: VMUser; - // waiting time if this user is not the front. - time: number; -}; - -// the turn queue. yes this is mostly stolen from cvmts but I make it cleaner by Seperate!!!!!!! -class TurnQueue extends EventEmitter { - private queue: Queue = new Queue(); - private turnTime = kTurnTimeSeconds; - private interval: NodeJS.Timeout|null = null; - - constructor() { - super(); - } - - public CurrentUser(): VMUser|null { - if(this.queue.peek() == undefined) - return null; - // We already check if it'll be undefined - return this.queue.peek()!; - } - - public TryEnqueue(user: VMUser) { - // Already the current user - if (this.CurrentUser() == user) return; - - // Already in the queue - if (this.queue.toArray().indexOf(user) !== -1) return; - - this.queue.enqueue(user); - if (this.queue.size == 1) - this.nextTurn(); - else - this.updateQueue(); - } - - public TryRemove(user: VMUser) { - if (this.queue.toArray().indexOf(user) !== -1) { - let hadTurn = (this.CurrentUser() === user); - this.queue = Queue.from(this.queue.toArray().filter(u => u !== user)); - if (hadTurn) this.nextTurn(); - } - } - - private turnInterval() { - this.turnTime--; - if (this.turnTime < 1) { - this.queue.dequeue(); - this.nextTurn(); - } - } - - private updateQueue() { - // removes the front of the quuee - let arr = this.queue.toArray(); - - let arr2: Array = arr.map((u, index) => { - let time = this.turnTime * 1000; - if(index != 0) { - time = this.turnTime * 1000 + ((index - 1) * (kTurnTimeSeconds * 1000)) - } - return { - user: u, - time: time - }; - }, this); - - this.emit('turnQueue', arr2); - } - - private nextTurn() { - clearInterval(this.interval!); - if (this.queue.size === 0) { - } else { - this.turnTime = kTurnTimeSeconds; - this.interval = setInterval(() => this.turnInterval(), 1000); - } - - this.updateQueue(); - } -} - -class SocketVM extends EventEmitter { - private vm: QemuVM; - private display: QemuDisplay|null = null; - - private timer: ExtendableTimer = new ExtendableTimer(15); - private users: Array = []; - private queue: TurnQueue = new TurnQueue(); - - constructor(vm: QemuVM) { - super(); - this.vm = vm; - - this.timer.on('expired', async () => { - // bye bye! - console.log(`[SocketVM] VM timer expired, resetting..`); - await this.vm.Stop(); - }); - - this.timer.on('expiry-near', async () => { - console.log(`[SocketVM] VM timer expires in 1 minute.`); - }); - - this.queue.on('turnQueue', (arr: Array) => { - if(arr.length == 0) { - this.BroadcastMessage((encoder: Shared.MessageEncoder) => { - // Empty queue - encoder.Init(8); - encoder.SetTurnSrvMessage(0, []); - return encoder.Finish(); - }); - return; - } - - let front = this.queue.CurrentUser(); - - for(let user of arr.filter((u) => (u.user !== front))) { - user.user.SendMessage((encoder: Shared.MessageEncoder) => { - encoder.Init(16 + (arr.length * (2+kMaxUserNameLength))); - encoder.SetTurnSrvMessage(user.time, arr.map((item) => { - return item.user.username; - })); - return encoder.Finish(); - }) - } - - if(front) { - front.SendMessage((encoder: Shared.MessageEncoder) => { - encoder.Init(16 + (arr.length * (2+kMaxUserNameLength))); - encoder.SetTurnSrvMessage(kTurnTimeSeconds * 1000, arr.map((item) => { - return item.user.username; - })); - return encoder.Finish(); - }) - } - }); - - - this.vm.on('statechange', async (state: VMState) => { - if (state == VMState.Started) { - this.display = this.vm.GetDisplay(); - await this.VMRunning(); - } else if (state == VMState.Stopped) { - this.display = null; - await this.VMStopped(); - } - }); - - } - - async Start() { - await this.vm.Start(); - } - - async AddUser(user: VMUser) { - user.username = VMUser.GenerateName(); - - console.log(`[SocketVM] ${user.username} (IP ${user.address}) joined`); - - // send bullshit - - await this.sendFullScreen(user); - - // send an adduser to the user for themselves - await user.SendMessage((encoder: Shared.MessageEncoder) => { - encoder.Init(4 + Shared.kMaxUserNameLength); - encoder.SetAddUserMessage(user.username); - return encoder.Finish(); - }); - - // send an adduser for the other users - for (let userEntry of this.users) { - await user.SendMessage((encoder: Shared.MessageEncoder) => { - encoder.Init(4 + Shared.kMaxUserNameLength); - encoder.SetAddUserMessage(userEntry.username); - return encoder.Finish(); - }); - - // also let the other user know about this user joining - await userEntry.SendMessage((encoder: Shared.MessageEncoder) => { - encoder.Init(4 + Shared.kMaxUserNameLength); - encoder.SetAddUserMessage(user.username); - return encoder.Finish(); - }); - } - - // officially add the user - this.users.push(user); - - } - - async RemUser(user: VMUser) { - console.log(`[SocketVM] ${user.username} (IP ${user.address}) left`); - - this.users.splice(this.users.indexOf(user), 1); - this.queue.TryRemove(user); - - // bye-bye! - await this.BroadcastMessage((encoder: Shared.MessageEncoder) => { - encoder.Init(4 + Shared.kMaxUserNameLength); - encoder.SetRemUserMessage(user.username); - return encoder.Finish(); - }); - } - - async OnWSMessage(user: VMUser, message: Buffer) { - try { - let messageBuffer = message.buffer.slice(message.byteOffset); - this.OnDecodedMessage(user, await Shared.MessageDecoder.ReadMessage(messageBuffer, false)); - } catch (err) { - // get out - console.log(`FUCK! (user ${user.username}, ip ${user.address})`, err); - user.connection.close(); - return; - } - } - - private async OnDecodedMessage(user: VMUser, message: Shared.DeserializedMessage) { - switch (message.type) { - case Shared.MessageType.Turn: - this.queue.TryEnqueue(user); - break; - - case Shared.MessageType.Mouse: - if(user != this.queue.CurrentUser()) - return; - if(this.display == null) - return; - this.display.MouseEvent((message as Shared.MouseMessage).x, (message as Shared.MouseMessage).y, (message as Shared.MouseMessage).buttons); - break; - - case Shared.MessageType.Key: - if(user != this.queue.CurrentUser()) - return; - - if(this.display == null) - return; - - this.display.KeyboardEvent((message as Shared.KeyMessage).keysym, (message as Shared.KeyMessage).pressed); - break; - - // ignore unhandlable messages (we won't get any invalid ones because they will cause a throw) - default: - break; - } - } - - private async BroadcastMessage(messageGenerator: (encoder: Shared.MessageEncoder) => ArrayBuffer) { - let buffer = messageGenerator(new Shared.MessageEncoder()); - for (let user of this.users) { - await user.SendBuffer(buffer); - } - } - - private async InsertCD(isoPath: string) { - await this.vm.ChangeRemovableMedia('vm.cd', isoPath); - } - - private async VMRunning() { - - let self = this; - - // Hook up the display - this.display?.on('resize', async (width: number, height: number) => { - if(self.display == null) - return; - - await self.BroadcastMessage((encoder: Shared.MessageEncoder) => { - encoder.Init(4); - encoder.SetDisplaySizeMessage(width, height); - return encoder.Finish(); - }); - - // sexy cream! - - - let canvas = self.display.GetCanvas(); - - let buffer = canvas.toBuffer('image/jpeg', { quality: kCanvasJpegQuality }); - - await self.BroadcastMessage((encoder: Shared.MessageEncoder) => { - encoder.Init(buffer.length + 8); - encoder.SetDisplayRectMessage(0, 0, buffer); - return encoder.Finish(); - }); - }); - - this.display?.on('rect', async (x: number, y: number, rect: ImageData) => { - let canvas = new Canvas(rect.width, rect.height); - canvas.getContext('2d').putImageData(rect, 0, 0); - - let buffer = canvas.toBuffer('image/jpeg', { quality: kCanvasJpegQuality }); - - await this.BroadcastMessage((encoder: Shared.MessageEncoder) => { - encoder.Init(buffer.length + 8); - encoder.SetDisplayRectMessage(x, y, buffer); - return encoder.Finish(); - }); - }); - - this.timer.Start(); - } - - private async sendFullScreen(user: VMUser) { - if (this.display == null) return; - - let buffer = this.display.GetCanvas().toBuffer('image/jpeg', { quality: kCanvasJpegQuality }); - - await user.SendMessage((encoder: Shared.MessageEncoder) => { - encoder.Init(8); - encoder.SetDisplaySizeMessage(this.display!.Size().width, this.display!.Size().height); - return encoder.Finish(); - }); - - await user.SendMessage((encoder: Shared.MessageEncoder) => { - encoder.Init(buffer.length + 8); - encoder.SetDisplayRectMessage(0, 0, buffer); - return encoder.Finish(); - }); - } - - private async VMStopped() { - await this.vm.Start(); - } -} +import { SocketVM } from './SocketVM.js'; +import { VMUser } from './VMUser.js'; // CONFIG types (not used yet) export type SocketComputerConfig_VM = { - ramsize: string, - hda: string, - netdev: string + ramsize: string; + hda: string; + netdev: string; }; export type SocketComputerConfig = { - listen: string, - port: number - vm: SocketComputerConfig_VM + listen: string; + port: number; + vm: SocketComputerConfig_VM; }; export class SocketComputerServer { - private vm: SocketVM|null = null; + private vm: SocketVM | null = null; private fastify: FastifyInstance = fastify({ exposeHeadRoutes: false }); Init() { this.fastify.register(fastifyWebsocket.default); - this.fastify.register(async (app, _) => this.CTRoutes(app), {}); + this.fastify.register(async (app, _) => this.Routes(app), {}); } async Listen() { @@ -432,24 +48,26 @@ export class SocketComputerServer { } async InitVM() { + // Create the VM definition let diskpath = '/srv/collabvm/vms/socket1/winxp.qcow2'; let slotDef: QemuVmDefinition = Slot_PCDef('2G', '-netdev tap,ifname=ktsocket1,script=no,downscript=no,id=vm.wan', 'rtl8139', 'c0:11:ab:69:44:02', true, diskpath, 'qcow2'); - setSnapshot(true); - - // create the slot for real! + // Create the VM this.vm = new SocketVM(new QemuVM(slotDef)); - await this.vm.Start(); // boot it up + + // Boot it up + setSnapshot(true); + await this.vm.Start(); } - CTRoutes(app: FastifyInstance) { + Routes(app: FastifyInstance) { let self = this; // @ts-ignore (fastify types are broken...) app.get('/', { websocket: true }, (connection: fastifyWebsocket.WebSocket, req: FastifyRequest) => { let address = req.ip; - if(req.headers["cf-connecting-ip"] !== undefined) { - address = req.headers["cf-connecting-ip"] as string; + if (req.headers['cf-connecting-ip'] !== undefined) { + address = req.headers['cf-connecting-ip'] as string; } new VMUser(connection, self.vm!, address); }); diff --git a/backend/src/SocketVM.ts b/backend/src/SocketVM.ts new file mode 100644 index 0000000..f972268 --- /dev/null +++ b/backend/src/SocketVM.ts @@ -0,0 +1,259 @@ +import { EventEmitter } from "node:events"; +import { TurnQueue, UserTimeTuple, kTurnTimeSeconds } from "./TurnQueue.js"; +import { VMUser } from "./VMUser.js"; +import { QemuDisplay, QemuVM, VMState } from '@socketcomputer/qemu'; + +import { ExtendableTimer } from './ExtendableTimer.js'; +import { kMaxUserNameLength } from '@socketcomputer/shared'; + +import * as Shared from '@socketcomputer/shared'; +import { Canvas } from 'canvas'; + +// for the maximum socket.io experience +const kCanvasJpegQuality = 0.25; + + +export class SocketVM extends EventEmitter { + private vm: QemuVM; + private display: QemuDisplay|null = null; + + private timer: ExtendableTimer = new ExtendableTimer(15); + private users: Array = []; + private queue: TurnQueue = new TurnQueue(); + + constructor(vm: QemuVM) { + super(); + this.vm = vm; + + this.timer.on('expired', async () => { + // bye bye! + console.log(`[SocketVM] VM timer expired, resetting..`); + await this.vm.Stop(); + }); + + this.timer.on('expiry-near', async () => { + console.log(`[SocketVM] VM timer expires in 1 minute.`); + }); + + this.queue.on('turnQueue', (arr: Array) => { + if(arr.length == 0) { + this.BroadcastMessage((encoder: Shared.MessageEncoder) => { + // Empty queue + encoder.Init(8); + encoder.SetTurnSrvMessage(0, []); + return encoder.Finish(); + }); + return; + } + + let front = this.queue.CurrentUser(); + + for(let user of arr.filter((u) => (u.user !== front))) { + user.user.SendMessage((encoder: Shared.MessageEncoder) => { + encoder.Init(16 + (arr.length * (2+kMaxUserNameLength))); + encoder.SetTurnSrvMessage(user.time, arr.map((item) => { + return item.user.username; + })); + return encoder.Finish(); + }) + } + + if(front) { + front.SendMessage((encoder: Shared.MessageEncoder) => { + encoder.Init(16 + (arr.length * (2+kMaxUserNameLength))); + encoder.SetTurnSrvMessage(kTurnTimeSeconds * 1000, arr.map((item) => { + return item.user.username; + })); + return encoder.Finish(); + }) + } + }); + + + this.vm.on('statechange', async (state: VMState) => { + if (state == VMState.Started) { + this.display = this.vm.GetDisplay(); + await this.VMRunning(); + } else if (state == VMState.Stopped) { + this.display = null; + await this.VMStopped(); + } + }); + + } + + async Start() { + await this.vm.Start(); + } + + async AddUser(user: VMUser) { + user.username = VMUser.GenerateName(); + + console.log(`[SocketVM] ${user.username} (IP ${user.address}) joined`); + + // send bullshit + + await this.sendFullScreen(user); + + // send an adduser to the user for themselves + await user.SendMessage((encoder: Shared.MessageEncoder) => { + encoder.Init(4 + Shared.kMaxUserNameLength); + encoder.SetAddUserMessage(user.username); + return encoder.Finish(); + }); + + // send an adduser for the other users + for (let userEntry of this.users) { + await user.SendMessage((encoder: Shared.MessageEncoder) => { + encoder.Init(4 + Shared.kMaxUserNameLength); + encoder.SetAddUserMessage(userEntry.username); + return encoder.Finish(); + }); + + // also let the other user know about this user joining + await userEntry.SendMessage((encoder: Shared.MessageEncoder) => { + encoder.Init(4 + Shared.kMaxUserNameLength); + encoder.SetAddUserMessage(user.username); + return encoder.Finish(); + }); + } + + // officially add the user + this.users.push(user); + + } + + async RemUser(user: VMUser) { + console.log(`[SocketVM] ${user.username} (IP ${user.address}) left`); + + this.users.splice(this.users.indexOf(user), 1); + this.queue.TryRemove(user); + + // bye-bye! + await this.BroadcastMessage((encoder: Shared.MessageEncoder) => { + encoder.Init(4 + Shared.kMaxUserNameLength); + encoder.SetRemUserMessage(user.username); + return encoder.Finish(); + }); + } + + async OnWSMessage(user: VMUser, message: Buffer) { + try { + let messageBuffer = message.buffer.slice(message.byteOffset); + this.OnDecodedMessage(user, await Shared.MessageDecoder.ReadMessage(messageBuffer, false)); + } catch (err) { + // Log the error and close the connection + console.log(`Error decoding message, closing connection: (user ${user.username}, ip ${user.address})`, err); + user.connection.close(); + return; + } + } + + private async OnDecodedMessage(user: VMUser, message: Shared.DeserializedMessage) { + switch (message.type) { + case Shared.MessageType.Turn: + this.queue.TryEnqueue(user); + break; + + case Shared.MessageType.Mouse: + if(user != this.queue.CurrentUser()) + return; + if(this.display == null) + return; + this.display.MouseEvent((message as Shared.MouseMessage).x, (message as Shared.MouseMessage).y, (message as Shared.MouseMessage).buttons); + break; + + case Shared.MessageType.Key: + if(user != this.queue.CurrentUser()) + return; + + if(this.display == null) + return; + + this.display.KeyboardEvent((message as Shared.KeyMessage).keysym, (message as Shared.KeyMessage).pressed); + break; + + // ignore messages we don't know about + default: + break; + } + } + + private async BroadcastMessage(messageGenerator: (encoder: Shared.MessageEncoder) => ArrayBuffer) { + let buffer = messageGenerator(new Shared.MessageEncoder()); + for (let user of this.users) { + await user.SendBuffer(buffer); + } + } + + private async InsertCD(isoPath: string) { + await this.vm.ChangeRemovableMedia('vm.cd', isoPath); + } + + private async VMRunning() { + + let self = this; + + // Hook up the display + this.display?.on('resize', async (width: number, height: number) => { + if(self.display == null) + return; + + await self.BroadcastMessage((encoder: Shared.MessageEncoder) => { + encoder.Init(4); + encoder.SetDisplaySizeMessage(width, height); + return encoder.Finish(); + }); + + // sexy cream! + + + let canvas = self.display.GetCanvas(); + + let buffer = canvas.toBuffer('image/jpeg', { quality: kCanvasJpegQuality }); + + await self.BroadcastMessage((encoder: Shared.MessageEncoder) => { + encoder.Init(buffer.length + 8); + encoder.SetDisplayRectMessage(0, 0, buffer); + return encoder.Finish(); + }); + }); + + this.display?.on('rect', async (x: number, y: number, rect: ImageData) => { + let canvas = new Canvas(rect.width, rect.height); + canvas.getContext('2d').putImageData(rect, 0, 0); + + let buffer = canvas.toBuffer('image/jpeg', { quality: kCanvasJpegQuality }); + + await this.BroadcastMessage((encoder: Shared.MessageEncoder) => { + encoder.Init(buffer.length + 8); + encoder.SetDisplayRectMessage(x, y, buffer); + return encoder.Finish(); + }); + }); + + this.timer.Start(); + } + + private async sendFullScreen(user: VMUser) { + if (this.display == null) return; + + let buffer = this.display.GetCanvas().toBuffer('image/jpeg', { quality: kCanvasJpegQuality }); + + await user.SendMessage((encoder: Shared.MessageEncoder) => { + encoder.Init(8); + encoder.SetDisplaySizeMessage(this.display!.Size().width, this.display!.Size().height); + return encoder.Finish(); + }); + + await user.SendMessage((encoder: Shared.MessageEncoder) => { + encoder.Init(buffer.length + 8); + encoder.SetDisplayRectMessage(0, 0, buffer); + return encoder.Finish(); + }); + } + + private async VMStopped() { + await this.vm.Start(); + } +} diff --git a/backend/src/TurnQueue.ts b/backend/src/TurnQueue.ts new file mode 100644 index 0000000..3dffa91 --- /dev/null +++ b/backend/src/TurnQueue.ts @@ -0,0 +1,85 @@ +import { EventEmitter } from 'node:events'; +import Queue from 'mnemonist/queue.js'; +import { VMUser } from './VMUser'; + +export const kTurnTimeSeconds = 18; + +export type UserTimeTuple = { + user: VMUser; + // waiting time if this user is not the front. + time: number; +}; + +// the turn queue. yes this is mostly stolen from cvmts but I make it cleaner by Seperate!!!!!!! +export class TurnQueue extends EventEmitter { + private queue: Queue = new Queue(); + private turnTime = kTurnTimeSeconds; + private interval: NodeJS.Timeout | null = null; + + constructor() { + super(); + } + + public CurrentUser(): VMUser | null { + if (this.queue.peek() == undefined) return null; + // We already check if it'll be undefined + return this.queue.peek()!; + } + + public TryEnqueue(user: VMUser) { + // Already the current user + if (this.CurrentUser() == user) return; + + // Already in the queue + if (this.queue.toArray().indexOf(user) !== -1) return; + + this.queue.enqueue(user); + if (this.queue.size == 1) this.nextTurn(); + else this.updateQueue(); + } + + public TryRemove(user: VMUser) { + if (this.queue.toArray().indexOf(user) !== -1) { + let hadTurn = this.CurrentUser() === user; + this.queue = Queue.from(this.queue.toArray().filter((u) => u !== user)); + if (hadTurn) this.nextTurn(); + } + } + + private turnInterval() { + this.turnTime--; + if (this.turnTime < 1) { + this.queue.dequeue(); + this.nextTurn(); + } + } + + private updateQueue() { + // removes the front of the quuee + let arr = this.queue.toArray(); + + let arr2: Array = arr.map((u, index) => { + let time = this.turnTime * 1000; + if (index != 0) { + time = this.turnTime * 1000 + (index - 1) * (kTurnTimeSeconds * 1000); + } + return { + user: u, + time: time + }; + }, this); + + this.emit('turnQueue', arr2); + } + + private nextTurn() { + clearInterval(this.interval!); + if (this.queue.size === 0) { + } else { + this.turnTime = kTurnTimeSeconds; + this.interval = setInterval(() => this.turnInterval(), 1000); + } + + this.updateQueue(); + } +} diff --git a/backend/src/VMUser.ts b/backend/src/VMUser.ts new file mode 100644 index 0000000..eefe922 --- /dev/null +++ b/backend/src/VMUser.ts @@ -0,0 +1,48 @@ +import { WebSocket } from "ws"; +import { SocketVM } from "./SocketVM"; + +import * as Shared from "@socketcomputer/shared"; + +export class VMUser { + public connection: WebSocket; + public address: string; + public username: string = ""; + private vm: SocketVM; + + + constructor(connection: WebSocket, slot: SocketVM, address: string) { + this.connection = connection; + this.address = address; + this.vm = slot; + + this.vm.AddUser(this); + + this.connection.on('message', async (data, isBinary) => { + if (!isBinary) this.connection.close(1000); + await this.vm.OnWSMessage(this, data as Buffer); + }); + + this.connection.on('close', async () => { + console.log('closed'); + await this.vm.RemUser(this); + }); + } + + async SendMessage(messageGenerator: (encoder: Shared.MessageEncoder) => ArrayBuffer) { + await this.SendBuffer(messageGenerator(new Shared.MessageEncoder())); + } + + async SendBuffer(buffer: ArrayBuffer): Promise { + return new Promise((res, rej) => { + if (this.connection.readyState !== WebSocket.CLOSED) { + this.connection.send(buffer); + res(); + } + rej(new Error('connection haves closed')); + }); + } + + static GenerateName() { + return `guest${Math.floor(Math.random() * (99999 - 10000) + 10000)}`; + } +}