message queue shits

This commit is contained in:
Elijah 2023-12-10 15:19:13 -05:00
parent 5557dc1750
commit 5d791150b0

View file

@ -11,7 +11,7 @@ export default class VM {
#socketpath : string; #socketpath : string;
#socket : Socket; #socket : Socket;
#writeLock : Mutex = new Mutex(); #writeLock : Mutex = new Mutex();
#fileQueue : Queue<File> = new Queue<File>(); #messageQueue : Queue<protocol.ProtocolMessage> = new Queue<protocol.ProtocolMessage>();
#nopTimeout : NodeJS.Timeout | null = null; #nopTimeout : NodeJS.Timeout | null = null;
isConnectedToVM : boolean = false; isConnectedToVM : boolean = false;
#noNop : boolean = false; #noNop : boolean = false;
@ -32,35 +32,30 @@ export default class VM {
this.#socket.on('data', (data) => this.#onData(data)); this.#socket.on('data', (data) => this.#onData(data));
this.#events = new EventEmitter(); this.#events = new EventEmitter();
this.#events.on('ack', () => { this.#events.on('ack', () => {
if (this.#fileQueue.size > 0) if (this.#messageQueue.size > 0)
this.fileQueueLoop(); this.messageQueueLoop();
}) })
} }
UploadFile(file : File) { UploadFile(file : File) {
this.#fileQueue.enqueue(file); this.#enqueueMessage({
if (this.#fileQueue.size === 1) Operation: protocol.ProtocolOperation.UploadFile,
this.fileQueueLoop(); Filename: file.Filename,
FileData: file.FileData
});
} }
fileQueueLoop() { messageQueueLoop() {
return new Promise(async (res, rej) => { return new Promise(async (res, rej) => {
var file = this.#fileQueue.dequeue(); var msg = this.#messageQueue.dequeue();
await this.PushFile(file!); var payload = msgpack.encode(msg);
await this.#sendMessage(payload);
}) })
} }
PushFile(file : File) : Promise<void> { #enqueueMessage(msg : protocol.ProtocolMessage) {
return new Promise(async (res, rej) => { this.#messageQueue.enqueue(msg);
const msg : protocol.ProtocolMessage = { if (this.#messageQueue.size === 1) this.messageQueueLoop();
Operation: protocol.ProtocolOperation.UploadFile,
Filename: file.Filename,
FileData: file.FileData
};
var payload = msgpack.encode(msg);
await this.#sendMessage(payload);
res();
});
} }
#sendMessage(data : Buffer) : Promise<void> { #sendMessage(data : Buffer) : Promise<void> {
@ -102,7 +97,7 @@ export default class VM {
if (this.#nopTimeout) clearInterval(this.#nopTimeout); if (this.#nopTimeout) clearInterval(this.#nopTimeout);
this.#nopTimeout = setInterval(() => this.#nopTimeoutFunc(), 5000); this.#nopTimeout = setInterval(() => this.#nopTimeoutFunc(), 5000);
if (!this.isConnectedToVM) { if (!this.isConnectedToVM) {
if (this.#fileQueue.size > 0) this.fileQueueLoop(); if (this.#messageQueue.size > 0) this.messageQueueLoop();
this.isConnectedToVM = true; this.isConnectedToVM = true;
this.#noNop = false; this.#noNop = false;
} }
@ -115,11 +110,9 @@ export default class VM {
#nopTimeoutFunc() { #nopTimeoutFunc() {
if (!this.#noNop) { if (!this.#noNop) {
var payload : protocol.ProtocolMessage = { this.#enqueueMessage({
Operation: protocol.ProtocolOperation.NOP Operation: protocol.ProtocolOperation.NOP
}; });
var data = msgpack.encode(payload);
this.#sendMessage(data);
this.#noNop = true; this.#noNop = true;
} else { } else {
this.isConnectedToVM = false; this.isConnectedToVM = false;