From adc3c320dca41a7dd3605a481266158cdae908cd Mon Sep 17 00:00:00 2001 From: =?utf8?q?J=C3=A9r=C3=B4me=20Benoit?= Date: Sat, 8 Apr 2023 23:34:50 +0200 Subject: [PATCH] refactor: prepare the code to handle task abstraction at execute MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Signed-off-by: Jérôme Benoit --- .eslintrc.js | 2 + src/pools/abstract-pool.ts | 64 +++++++++++++------ .../selection-strategies.test.js | 4 +- 3 files changed, 48 insertions(+), 22 deletions(-) diff --git a/.eslintrc.js b/.eslintrc.js index 1d684007..70a8ec3c 100644 --- a/.eslintrc.js +++ b/.eslintrc.js @@ -42,7 +42,9 @@ module.exports = defineConfig({ 'cpus', 'ctx', 'deprecations', + 'dequeue', 'ecma', + 'enqueue', 'enum', 'esm', 'fibonacci', diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index 0fb695fb..b6222091 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -75,7 +75,7 @@ export abstract class AbstractPool< this.checkFilePath(this.filePath) this.checkPoolOptions(this.opts) - this.chooseWorker.bind(this) + this.chooseWorkerNode.bind(this) this.internalExecute.bind(this) this.checkAndEmitFull.bind(this) this.checkAndEmitBusy.bind(this) @@ -212,16 +212,24 @@ export abstract class AbstractPool< /** @inheritDoc */ public async execute (data: Data): Promise { - const [workerNodeKey, worker] = this.chooseWorker() - const messageId = crypto.randomUUID() - const res = this.internalExecute(workerNodeKey, worker, messageId) - this.checkAndEmitFull() - this.checkAndEmitBusy() - this.sendToWorker(worker, { + const [workerNodeKey, workerNode] = this.chooseWorkerNode() + const submittedTask: Task = { // eslint-disable-next-line @typescript-eslint/consistent-type-assertions data: data ?? ({} as Data), - id: messageId - }) + id: crypto.randomUUID() + } + const res = this.internalExecute(workerNodeKey, workerNode, submittedTask) + let currentTask: Task + // FIXME: Add sensible conditions to start tasks queuing on the worker node. + if (this.tasksQueueLength(workerNodeKey) > 0) { + currentTask = this.dequeueTask(workerNodeKey) as Task + this.enqueueTask(workerNodeKey, submittedTask) + } else { + currentTask = submittedTask + } + this.sendToWorker(workerNode.worker, currentTask) + this.checkAndEmitFull() + this.checkAndEmitBusy() // eslint-disable-next-line @typescript-eslint/return-await return res } @@ -305,31 +313,31 @@ export abstract class AbstractPool< * * The default uses a round robin algorithm to distribute the load. * - * @returns [worker node key, worker]. + * @returns [worker node key, worker node]. */ - protected chooseWorker (): [number, Worker] { + protected chooseWorkerNode (): [number, WorkerNode] { let workerNodeKey: number if ( this.type === PoolType.DYNAMIC && !this.full && this.findFreeWorkerNodeKey() === -1 ) { - const createdWorker = this.createAndSetupWorker() - this.registerWorkerMessageListener(createdWorker, message => { + const workerCreated = this.createAndSetupWorker() + this.registerWorkerMessageListener(workerCreated, message => { if ( isKillBehavior(KillBehaviors.HARD, message.kill) || (message.kill != null && - this.getWorkerTasksUsage(createdWorker)?.running === 0) + this.getWorkerTasksUsage(workerCreated)?.running === 0) ) { // Kill message received from the worker, means that no new tasks are submitted to that worker for a while ( > maxInactiveTime) - void this.destroyWorker(createdWorker) + void this.destroyWorker(workerCreated) } }) - workerNodeKey = this.getWorkerNodeKey(createdWorker) + workerNodeKey = this.getWorkerNodeKey(workerCreated) } else { workerNodeKey = this.workerChoiceStrategyContext.execute() } - return [workerNodeKey, this.workerNodes[workerNodeKey].worker] + return [workerNodeKey, this.workerNodes[workerNodeKey]] } /** @@ -415,12 +423,16 @@ export abstract class AbstractPool< private async internalExecute ( workerNodeKey: number, - worker: Worker, - messageId: string + workerNode: WorkerNode, + task: Task ): Promise { this.beforePromiseResponseHook(workerNodeKey) return await new Promise((resolve, reject) => { - this.promiseResponseMap.set(messageId, { resolve, reject, worker }) + this.promiseResponseMap.set(task.id, { + resolve, + reject, + worker: workerNode.worker + }) }) } @@ -507,4 +519,16 @@ export abstract class AbstractPool< this.workerNodes.splice(workerNodeKey, 1) this.workerChoiceStrategyContext.remove(workerNodeKey) } + + protected enqueueTask (workerNodeKey: number, task: Task): void { + this.workerNodes[workerNodeKey].tasksQueue.push(task) + } + + protected dequeueTask (workerNodeKey: number): Task | undefined { + return this.workerNodes[workerNodeKey].tasksQueue.shift() + } + + protected tasksQueueLength (workerNodeKey: number): number { + return this.workerNodes[workerNodeKey].tasksQueue.length + } } diff --git a/tests/pools/selection-strategies/selection-strategies.test.js b/tests/pools/selection-strategies/selection-strategies.test.js index 796c84ba..2fff1b13 100644 --- a/tests/pools/selection-strategies/selection-strategies.test.js +++ b/tests/pools/selection-strategies/selection-strategies.test.js @@ -146,14 +146,14 @@ describe('Selection strategies test suite', () => { ) let results = new Set() for (let i = 0; i < max; i++) { - results.add(pool.chooseWorker()[1].id) + results.add(pool.chooseWorkerNode()[1].worker.id) } expect(results.size).toBe(max) await pool.destroy() pool = new FixedThreadPool(max, './tests/worker-files/thread/testWorker.js') results = new Set() for (let i = 0; i < max; i++) { - results.add(pool.chooseWorker()[1].threadId) + results.add(pool.chooseWorkerNode()[1].worker.threadId) } expect(results.size).toBe(max) await pool.destroy() -- 2.34.1