Add ACK operation to Protocol

This commit is contained in:
Elijah 2023-12-10 14:08:17 -05:00
parent 8412aeb1af
commit 3a0a2167df
2 changed files with 26 additions and 6 deletions

View file

@ -6,4 +6,5 @@ export interface ProtocolMessage {
export enum ProtocolOperation { export enum ProtocolOperation {
UploadFile = 0, UploadFile = 0,
ACK = 1,
} }

View file

@ -5,38 +5,42 @@ import {Mutex} from 'async-mutex';
import log from './log.js'; import log from './log.js';
import Queue from 'mnemonist/queue.js'; import Queue from 'mnemonist/queue.js';
import File from "./File.js"; import File from "./File.js";
import { EventEmitter } from "events";
export default class VM { export default class VM {
#socketpath : string; #socketpath : string;
#socket : Socket; #socket : Socket;
#writeLock : Mutex = new Mutex(); #writeLock : Mutex = new Mutex();
#fileQueue : Queue<File> = new Queue<File>(); #fileQueue : Queue<File> = new Queue<File>();
#isFileQueueRunning : boolean = false;
connected : boolean = false; connected : boolean = false;
#events : EventEmitter;
constructor(socketpath : string) { constructor(socketpath : string) {
this.#socketpath = socketpath; this.#socketpath = socketpath;
this.#socket = new Socket(); this.#socket = new Socket();
this.#socket.setEncoding("binary");
this.#socket.connect(socketpath); this.#socket.connect(socketpath);
this.#socket.on('connect', () => { this.#socket.on('connect', () => {
this.connected = true; this.connected = true;
log("INFO", `Connected to VM at ${socketpath}`); log("INFO", `Connected to VM at ${socketpath}`);
}); });
this.#socket.on('data', (data) => this.#onData(data));
this.#events = new EventEmitter();
this.#events.on('ack', () => {
if (this.#fileQueue.size > 0)
this.fileQueueLoop();
})
} }
UploadFile(file : File) { UploadFile(file : File) {
this.#fileQueue.enqueue(file); this.#fileQueue.enqueue(file);
if (!this.#isFileQueueRunning) if (this.#fileQueue.size === 1)
this.fileQueueLoop(); this.fileQueueLoop();
} }
fileQueueLoop() { fileQueueLoop() {
return new Promise(async (res, rej) => { return new Promise(async (res, rej) => {
this.#isFileQueueRunning = true;
while (this.#fileQueue.size > 0) {
var file = this.#fileQueue.dequeue(); var file = this.#fileQueue.dequeue();
await this.PushFile(file!); await this.PushFile(file!);
}
this.#isFileQueueRunning = false;
}) })
} }
@ -76,6 +80,21 @@ export default class VM {
}); });
}); });
} }
#onData(data : Buffer) {
var payload = data.subarray(4);
var header = data.readUInt32LE(0);
if (header !== payload.length) {
log("WARN", `Received message with invalid length header ${header}`);
return;
}
var msg = msgpack.decode(payload) as protocol.ProtocolMessage;
switch (msg.Operation) {
case protocol.ProtocolOperation.ACK:
this.#events.emit('ack');
break;
}
}
} }
function sleep(ms : number) : Promise<void> { function sleep(ms : number) : Promise<void> {