From 3a0a2167df1e206879307078a48aaf4a58db4ec6 Mon Sep 17 00:00:00 2001 From: Elijah Date: Sun, 10 Dec 2023 14:08:17 -0500 Subject: [PATCH] Add ACK operation to Protocol --- src/Protocol.ts | 1 + src/VM.ts | 31 +++++++++++++++++++++++++------ 2 files changed, 26 insertions(+), 6 deletions(-) diff --git a/src/Protocol.ts b/src/Protocol.ts index ba7e5e2..5c7601e 100644 --- a/src/Protocol.ts +++ b/src/Protocol.ts @@ -6,4 +6,5 @@ export interface ProtocolMessage { export enum ProtocolOperation { UploadFile = 0, + ACK = 1, } \ No newline at end of file diff --git a/src/VM.ts b/src/VM.ts index a4039a4..9af6ac7 100644 --- a/src/VM.ts +++ b/src/VM.ts @@ -5,38 +5,42 @@ import {Mutex} from 'async-mutex'; import log from './log.js'; import Queue from 'mnemonist/queue.js'; import File from "./File.js"; +import { EventEmitter } from "events"; export default class VM { #socketpath : string; #socket : Socket; #writeLock : Mutex = new Mutex(); #fileQueue : Queue = new Queue(); - #isFileQueueRunning : boolean = false; connected : boolean = false; + #events : EventEmitter; constructor(socketpath : string) { this.#socketpath = socketpath; this.#socket = new Socket(); + this.#socket.setEncoding("binary"); this.#socket.connect(socketpath); this.#socket.on('connect', () => { this.connected = true; log("INFO", `Connected to VM at ${socketpath}`); }); + this.#socket.on('data', (data) => this.#onData(data)); + this.#events = new EventEmitter(); + this.#events.on('ack', () => { + if (this.#fileQueue.size > 0) + this.fileQueueLoop(); + }) } UploadFile(file : File) { this.#fileQueue.enqueue(file); - if (!this.#isFileQueueRunning) + if (this.#fileQueue.size === 1) this.fileQueueLoop(); } fileQueueLoop() { return new Promise(async (res, rej) => { - this.#isFileQueueRunning = true; - while (this.#fileQueue.size > 0) { var file = this.#fileQueue.dequeue(); await this.PushFile(file!); - } - this.#isFileQueueRunning = false; }) } @@ -76,6 +80,21 @@ export default class VM { }); }); } + + #onData(data : Buffer) { + var payload = data.subarray(4); + var header = data.readUInt32LE(0); + if (header !== payload.length) { + log("WARN", `Received message with invalid length header ${header}`); + return; + } + var msg = msgpack.decode(payload) as protocol.ProtocolMessage; + switch (msg.Operation) { + case protocol.ProtocolOperation.ACK: + this.#events.emit('ack'); + break; + } + } } function sleep(ms : number) : Promise {