diff --git a/backend/src/SocketComputerServer.ts b/backend/src/SocketComputerServer.ts index a176669..5992b86 100644 --- a/backend/src/SocketComputerServer.ts +++ b/backend/src/SocketComputerServer.ts @@ -6,7 +6,7 @@ import * as Shared from '@socketcomputer/shared'; import { Canvas } from 'canvas'; -import { FastifyInstance, fastify } from 'fastify'; +import { FastifyInstance, fastify, FastifyRequest } from 'fastify'; import * as fastifyWebsocket from '@fastify/websocket'; import { WebSocket } from 'ws'; @@ -19,11 +19,14 @@ const kCanvasJpegQuality = 0.25; class VMUser { public connection: WebSocket; + public address: string; public username: string; - private vm: VirtualMachine; + private vm: SocketVM; - constructor(connection: WebSocket, slot: VirtualMachine) { + + constructor(connection: WebSocket, slot: SocketVM, address: string) { this.connection = connection; + this.address = address; this.vm = slot; this.vm.AddUser(this); @@ -88,7 +91,10 @@ class TurnQueue extends EventEmitter { if (this.queue.toArray().indexOf(user) !== -1) return; this.queue.enqueue(user); - if (this.queue.size == 1) this.nextTurn(); + if (this.queue.size == 1) + this.nextTurn(); + else + this.updateQueue(); } public TryRemove(user: VMUser) { @@ -107,16 +113,7 @@ class TurnQueue extends EventEmitter { } } - private nextTurn() { - clearInterval(this.interval); - if (this.queue.size === 0) { - } else { - this.turnTime = kTurnTimeSeconds; - this.interval = setInterval(() => this.turnInterval(), 1000); - } - - //if (this.queue.size == 1) this.emit('turnQueue', [{ user: this.CurrentUser(), time: this.turnTime * 1000 }]); - + private updateQueue() { // removes the front of the quuee let arr = this.queue.toArray(); @@ -133,10 +130,20 @@ class TurnQueue extends EventEmitter { this.emit('turnQueue', arr2); } + + private nextTurn() { + clearInterval(this.interval); + if (this.queue.size === 0) { + } else { + this.turnTime = kTurnTimeSeconds; + this.interval = setInterval(() => this.turnInterval(), 1000); + } + + this.updateQueue(); + } } -// A slot. -class VirtualMachine extends EventEmitter { +class SocketVM extends EventEmitter { private vm: QemuVM; private display: QemuDisplay; @@ -148,30 +155,50 @@ class VirtualMachine extends EventEmitter { super(); this.vm = vm; - this.timer = new ExtendableTimer(15); + this.timer = new ExtendableTimer(2); this.timer.on('expired', async () => { // bye bye! - console.log(`[VM] VM expired, resetting..`); + console.log(`[SocketVM] VM expired, resetting..`); await this.vm.Stop(); }); this.timer.on('expiry-near', async () => { - console.log(`[VM] about to expire!`); + console.log(`[SocketVM] about to expire!`); }); this.queue.on('turnQueue', (arr: Array) => { - for(let user of arr) { + if(arr.length == 0) { + this.BroadcastMessage((encoder: Shared.MessageEncoder) => { + // Empty queue + encoder.Init(8); + encoder.SetTurnSrvMessage(0, []); + return encoder.Finish(); + }); + return; + } + + let front = this.queue.CurrentUser(); + + for(let user of arr.filter((u) => (u.user !== front))) { user.user.SendMessage((encoder: Shared.MessageEncoder) => { - let n = 16 + (arr.length * (2+kMaxUserNameLength)); - console.log(n) - encoder.Init(n); + encoder.Init(16 + (arr.length * (2+kMaxUserNameLength))); encoder.SetTurnSrvMessage(user.time, arr.map((item) => { return item.user.username; })); return encoder.Finish(); }) } + + if(front) { + front.SendMessage((encoder: Shared.MessageEncoder) => { + encoder.Init(16 + (arr.length * (2+kMaxUserNameLength))); + encoder.SetTurnSrvMessage(kTurnTimeSeconds * 1000, arr.map((item) => { + return item.user.username; + })); + return encoder.Finish(); + }) + } }); @@ -185,26 +212,6 @@ class VirtualMachine extends EventEmitter { } }); - this.queue.on('turnQueue', (arr: Array) => { - console.log("Turn queue", arr); - - for (let entry of arr) { - entry.user.SendMessage((encoder: Shared.MessageEncoder) => { - // painnnnnnnnnnnnnnnnnnn fuck i should just make a dynamic buffer system lol - encoder.Init(64+ arr.length * (Shared.kMaxUserNameLength)); - - // pain ? - encoder.SetTurnSrvMessage( - entry.time, - arr.map((entry: userAndTime) => { - return entry.user.username; - }) - ); - - return encoder.Finish(); - }); - } - }); } async Start() { @@ -214,15 +221,29 @@ class VirtualMachine extends EventEmitter { async AddUser(user: VMUser) { user.username = VMUser.GenerateName(); - console.log(user.username, 'joined.'); + console.log(`[SocketVM] ${user.username} (IP ${user.address}) joined`); // send bullshit await this.sendFullScreen(user); - // send an adduser for all users - for (let user of this.users) { - user.SendMessage((encoder: Shared.MessageEncoder) => { + // send an adduser to the user for themselves + await user.SendMessage((encoder: Shared.MessageEncoder) => { + encoder.Init(4 + Shared.kMaxUserNameLength); + encoder.SetAddUserMessage(user.username); + return encoder.Finish(); + }); + + // send an adduser for the other users + for (let userEntry of this.users) { + await user.SendMessage((encoder: Shared.MessageEncoder) => { + encoder.Init(4 + Shared.kMaxUserNameLength); + encoder.SetAddUserMessage(userEntry.username); + return encoder.Finish(); + }); + + // also let the other user know about this user joining + await userEntry.SendMessage((encoder: Shared.MessageEncoder) => { encoder.Init(4 + Shared.kMaxUserNameLength); encoder.SetAddUserMessage(user.username); return encoder.Finish(); @@ -232,16 +253,10 @@ class VirtualMachine extends EventEmitter { // officially add the user this.users.push(user); - // hello! - await this.BroadcastMessage((encoder: Shared.MessageEncoder) => { - encoder.Init(4 + Shared.kMaxUserNameLength); - encoder.SetAddUserMessage(user.username); - return encoder.Finish(); - }); } async RemUser(user: VMUser) { - console.log(user.username, 'left.'); + console.log(`[SocketVM] ${user.username} (IP ${user.address}) left`); this.users.splice(this.users.indexOf(user), 1); this.queue.TryRemove(user); @@ -260,7 +275,7 @@ class VirtualMachine extends EventEmitter { this.OnDecodedMessage(user, await Shared.MessageDecoder.ReadMessage(messageBuffer, false)); } catch (err) { // get out - console.log("FUCK!", err); + console.log(`FUCK! (user ${user.username}, ip ${user.address})`, err); user.connection.close(); return; } @@ -275,15 +290,21 @@ class VirtualMachine extends EventEmitter { case Shared.MessageType.Mouse: if(user != this.queue.CurrentUser()) return; - if(this.display == null) return; - this.display.MouseEvent((message as Shared.MouseMessage).x, (message as Shared.MouseMessage).y, (message as Shared.MouseMessage).buttons); + if(this.display == null) + return; + this.display.MouseEvent((message as Shared.MouseMessage).x, (message as Shared.MouseMessage).y, (message as Shared.MouseMessage).buttons); break; case Shared.MessageType.Key: + console.log("GOT key event", (message as Shared.KeyMessage).keysym, (message as Shared.KeyMessage).pressed) if(user != this.queue.CurrentUser()) return; - if(this.display == null) return; + if(this.display == null) + return; + + console.log("valid key event", (message as Shared.KeyMessage).keysym, (message as Shared.KeyMessage).pressed); + this.display.KeyboardEvent((message as Shared.KeyMessage).keysym, (message as Shared.KeyMessage).pressed); break; // ignore unhandlable messages (we won't get any invalid ones because they will cause a throw) @@ -372,7 +393,7 @@ class VirtualMachine extends EventEmitter { } export class SocketComputerServer { - private vm: VirtualMachine = null; + private vm: SocketVM = null; private fastify: FastifyInstance = fastify({ exposeHeadRoutes: false }); @@ -386,8 +407,8 @@ export class SocketComputerServer { try { console.log('Backend starting...'); - // create teh VM!!!! - await this.CreateVM(); + // create and start teh VMxorz!!!! + await this.InitVM(); await this.fastify.listen({ host: '127.0.0.1', @@ -398,22 +419,26 @@ export class SocketComputerServer { } } - async CreateVM() { + async InitVM() { let diskpath = '/srv/collabvm/vms/socket1/socket1.qcow2'; let slotDef: QemuVmDefinition = Slot_PCDef('2G', '-netdev user,id=vm.wan', 'rtl8139', await GenMacAddress(), true, diskpath, 'qcow2'); setSnapshot(true); // create the slot for real! - this.vm = new VirtualMachine(new QemuVM(slotDef)); + this.vm = new SocketVM(new QemuVM(slotDef)); await this.vm.Start(); // boot it up } CTRoutes(app: FastifyInstance) { let self = this; - app.get('/', { websocket: true }, (connection: fastifyWebsocket.WebSocket) => { - new VMUser(connection, self.vm); + app.get('/', { websocket: true }, (connection: fastifyWebsocket.WebSocket, req: FastifyRequest) => { + let address = req.ip; + if(req.headers["cf-connecting-ip"] !== undefined) { + address = req.headers["cf-connecting-ip"] as string; + } + new VMUser(connection, self.vm, address); }); } } diff --git a/shared/src/Protocol.ts b/shared/src/Protocol.ts index 43605c1..0473b71 100644 --- a/shared/src/Protocol.ts +++ b/shared/src/Protocol.ts @@ -155,7 +155,7 @@ export class MessageEncoder { } SetRemUserMessage(user: string) { - this.SetTypeCode(MessageType.AddUser); + this.SetTypeCode(MessageType.RemUser); this.struct.WriteString(user); } diff --git a/webapp/src/index.html b/webapp/src/index.html index 2a34a15..b0780f3 100644 --- a/webapp/src/index.html +++ b/webapp/src/index.html @@ -12,7 +12,7 @@
- +