improve message queue logic

This commit is contained in:
Elijah R 2023-12-18 16:55:45 -05:00
parent d9501602ec
commit 60989d43a0

View file

@ -12,6 +12,7 @@ export default class VM {
#socket : Socket; #socket : Socket;
#writeLock : Mutex = new Mutex(); #writeLock : Mutex = new Mutex();
#messageQueue : Queue<protocol.ProtocolMessage> = new Queue<protocol.ProtocolMessage>(); #messageQueue : Queue<protocol.ProtocolMessage> = new Queue<protocol.ProtocolMessage>();
#messageQueueRunning : boolean = false;
#nopTimeout : NodeJS.Timeout | null = null; #nopTimeout : NodeJS.Timeout | null = null;
#reconnectTimeout : NodeJS.Timeout | null = null; #reconnectTimeout : NodeJS.Timeout | null = null;
isConnectedToVM : boolean = false; isConnectedToVM : boolean = false;
@ -50,10 +51,6 @@ export default class VM {
}); });
this.#socket.on('data', (data) => this.#onData(data)); this.#socket.on('data', (data) => this.#onData(data));
this.#events = new EventEmitter(); this.#events = new EventEmitter();
this.#events.on('ack', () => {
if (this.#messageQueue.size > 0)
this.messageQueueLoop();
})
} }
UploadFile(file : File) { UploadFile(file : File) {
@ -66,15 +63,24 @@ export default class VM {
messageQueueLoop() { messageQueueLoop() {
return new Promise(async (res, rej) => { return new Promise(async (res, rej) => {
this.#messageQueueRunning = true;
while (this.#messageQueue.size > 0) {
var msg = this.#messageQueue.dequeue(); var msg = this.#messageQueue.dequeue();
var payload = msgpack.encode(msg); var payload = msgpack.encode(msg);
await this.#sendMessage(payload); await this.#sendMessage(payload);
await new Promise<void>((res, rej) => {
this.#events.once('ack', () => {
res();
});
});
}
this.#messageQueueRunning = false;
}) })
} }
#enqueueMessage(msg : protocol.ProtocolMessage) { #enqueueMessage(msg : protocol.ProtocolMessage) {
this.#messageQueue.enqueue(msg); this.#messageQueue.enqueue(msg);
if (this.#messageQueue.size === 1) this.messageQueueLoop(); if (this.#messageQueueRunning) this.messageQueueLoop();
} }
#sendMessage(data : Buffer) : Promise<void> { #sendMessage(data : Buffer) : Promise<void> {
@ -118,7 +124,6 @@ export default class VM {
this.#noNop = false; this.#noNop = false;
if (!this.isConnectedToVM) { if (!this.isConnectedToVM) {
log("INFO", `Got connection to VM at ${this.#id}`); log("INFO", `Got connection to VM at ${this.#id}`);
if (this.#messageQueue.size > 0) this.messageQueueLoop();
this.isConnectedToVM = true; this.isConnectedToVM = true;
} }
switch (msg.Operation) { switch (msg.Operation) {
@ -138,6 +143,7 @@ export default class VM {
} else { } else {
this.isConnectedToVM = false; this.isConnectedToVM = false;
clearTimeout(this.#nopTimeout!); clearTimeout(this.#nopTimeout!);
this.#messageQueue.clear();
log("INFO", `Connection to VM at ${this.#id} timed out`); log("INFO", `Connection to VM at ${this.#id} timed out`);
} }
} }