From 4f7fa42a78e36031814421ba6310586da9dd46d2 Mon Sep 17 00:00:00 2001 From: Shinigami Date: Sun, 14 Feb 2021 20:02:52 +0100 Subject: [PATCH] Improve chooseWorker function in dynamic pools (#152) --- package-lock.json | 6 +++--- src/pools/abstract-pool.ts | 14 ++++++------ src/pools/cluster/dynamic.ts | 41 ++++++++++++++++-------------------- src/pools/cluster/fixed.ts | 16 +++++++------- src/pools/thread/dynamic.ts | 41 ++++++++++++++++-------------------- src/pools/thread/fixed.ts | 16 +++++++------- 6 files changed, 61 insertions(+), 73 deletions(-) diff --git a/package-lock.json b/package-lock.json index efed1e62..fe7d419b 100644 --- a/package-lock.json +++ b/package-lock.json @@ -4538,9 +4538,9 @@ "dev": true }, "uglify-js": { - "version": "3.12.7", - "resolved": "https://registry.npmjs.org/uglify-js/-/uglify-js-3.12.7.tgz", - "integrity": "sha512-SIZhkoh+U/wjW+BHGhVwE9nt8tWJspncloBcFapkpGRwNPqcH8pzX36BXe3TPBjzHWPMUZotpCigak/udWNr1Q==", + "version": "3.12.8", + "resolved": "https://registry.npmjs.org/uglify-js/-/uglify-js-3.12.8.tgz", + "integrity": "sha512-fvBeuXOsvqjecUtF/l1dwsrrf5y2BCUk9AOJGzGcm6tE7vegku5u/YvqjyDaAGr422PLoLnrxg3EnRvTqsdC1w==", "dev": true, "optional": true }, diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index ac3bd6cc..2ffd292a 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -238,15 +238,13 @@ export abstract class AbstractPool< message: MessageValue ): void - protected abstract registerWorkerMessageListener ( - port: Worker, - listener: (message: MessageValue) => void - ): void + protected abstract registerWorkerMessageListener< + Message extends Data | Response + > (worker: Worker, listener: (message: MessageValue) => void): void - protected abstract unregisterWorkerMessageListener ( - port: Worker, - listener: (message: MessageValue) => void - ): void + protected abstract unregisterWorkerMessageListener< + Message extends Data | Response + > (worker: Worker, listener: (message: MessageValue) => void): void protected internalExecute ( worker: Worker, diff --git a/src/pools/cluster/dynamic.ts b/src/pools/cluster/dynamic.ts index 432feb3a..b0bb6973 100644 --- a/src/pools/cluster/dynamic.ts +++ b/src/pools/cluster/dynamic.ts @@ -1,5 +1,5 @@ import type { Worker } from 'cluster' -import type { JSONValue, MessageValue } from '../../utility-types' +import type { JSONValue } from '../../utility-types' import type { ClusterPoolOptions } from './fixed' import { FixedClusterPool } from './fixed' @@ -46,31 +46,26 @@ export class DynamicClusterPool< * @returns Cluster worker. */ protected chooseWorker (): Worker { - let worker: Worker | undefined - for (const entry of this.tasks) { - if (entry[1] === 0) { - worker = entry[0] - break + for (const [worker, numberOfTasks] of this.tasks) { + if (numberOfTasks === 0) { + // A worker is free, use it + return worker } } - if (worker) { - // A worker is free, use it - return worker - } else { - if (this.workers.length === this.max) { - this.emitter.emit('FullPool') - return super.chooseWorker() - } - // All workers are busy, create a new worker - const worker = this.createAndSetupWorker() - worker.on('message', (message: MessageValue) => { - if (message.kill) { - this.sendToWorker(worker, { kill: 1 }) - void this.destroyWorker(worker) - } - }) - return worker + if (this.workers.length === this.max) { + this.emitter.emit('FullPool') + return super.chooseWorker() } + + // All workers are busy, create a new worker + const worker = this.createAndSetupWorker() + this.registerWorkerMessageListener(worker, message => { + if (message.kill) { + this.sendToWorker(worker, { kill: 1 }) + void this.destroyWorker(worker) + } + }) + return worker } } diff --git a/src/pools/cluster/fixed.ts b/src/pools/cluster/fixed.ts index 6620d5f3..b2724426 100644 --- a/src/pools/cluster/fixed.ts +++ b/src/pools/cluster/fixed.ts @@ -66,18 +66,18 @@ export class FixedClusterPool< worker.send(message) } - protected registerWorkerMessageListener ( - port: Worker, - listener: (message: MessageValue) => void + protected registerWorkerMessageListener ( + worker: Worker, + listener: (message: MessageValue) => void ): void { - port.on('message', listener) + worker.on('message', listener) } - protected unregisterWorkerMessageListener ( - port: Worker, - listener: (message: MessageValue) => void + protected unregisterWorkerMessageListener ( + worker: Worker, + listener: (message: MessageValue) => void ): void { - port.removeListener('message', listener) + worker.removeListener('message', listener) } protected createWorker (): Worker { diff --git a/src/pools/thread/dynamic.ts b/src/pools/thread/dynamic.ts index 9ab6bf30..fddae466 100644 --- a/src/pools/thread/dynamic.ts +++ b/src/pools/thread/dynamic.ts @@ -1,4 +1,4 @@ -import type { JSONValue, MessageValue } from '../../utility-types' +import type { JSONValue } from '../../utility-types' import type { PoolOptions } from '../abstract-pool' import type { ThreadWorkerWithMessageChannel } from './fixed' import { FixedThreadPool } from './fixed' @@ -46,31 +46,26 @@ export class DynamicThreadPool< * @returns Thread worker. */ protected chooseWorker (): ThreadWorkerWithMessageChannel { - let worker: ThreadWorkerWithMessageChannel | undefined - for (const entry of this.tasks) { - if (entry[1] === 0) { - worker = entry[0] - break + for (const [worker, numberOfTasks] of this.tasks) { + if (numberOfTasks === 0) { + // A worker is free, use it + return worker } } - if (worker) { - // A worker is free, use it - return worker - } else { - if (this.workers.length === this.max) { - this.emitter.emit('FullPool') - return super.chooseWorker() - } - // All workers are busy, create a new worker - const worker = this.createAndSetupWorker() - worker.port2?.on('message', (message: MessageValue) => { - if (message.kill) { - this.sendToWorker(worker, { kill: 1 }) - void this.destroyWorker(worker) - } - }) - return worker + if (this.workers.length === this.max) { + this.emitter.emit('FullPool') + return super.chooseWorker() } + + // All workers are busy, create a new worker + const worker = this.createAndSetupWorker() + this.registerWorkerMessageListener(worker, message => { + if (message.kill) { + this.sendToWorker(worker, { kill: 1 }) + void this.destroyWorker(worker) + } + }) + return worker } } diff --git a/src/pools/thread/fixed.ts b/src/pools/thread/fixed.ts index bfd6f016..705f124a 100644 --- a/src/pools/thread/fixed.ts +++ b/src/pools/thread/fixed.ts @@ -57,18 +57,18 @@ export class FixedThreadPool< worker.postMessage(message) } - protected registerWorkerMessageListener ( - port: ThreadWorkerWithMessageChannel, - listener: (message: MessageValue) => void + protected registerWorkerMessageListener ( + messageChannel: ThreadWorkerWithMessageChannel, + listener: (message: MessageValue) => void ): void { - port.port2?.on('message', listener) + messageChannel.port2?.on('message', listener) } - protected unregisterWorkerMessageListener ( - port: ThreadWorkerWithMessageChannel, - listener: (message: MessageValue) => void + protected unregisterWorkerMessageListener ( + messageChannel: ThreadWorkerWithMessageChannel, + listener: (message: MessageValue) => void ): void { - port.port2?.removeListener('message', listener) + messageChannel.port2?.removeListener('message', listener) } protected createWorker (): ThreadWorkerWithMessageChannel { -- 2.34.1