X-Git-Url: https://git.piment-noir.org/?a=blobdiff_plain;f=src%2Fpools%2Fabstract-pool.ts;h=f76ad2f55bf4326383077456d7414aa8c023ea2e;hb=e1af34e6d9705f4ed27c94021b00cd421038ebe3;hp=30b4d01f9846cd2c2327d5a52e4a6be594e8d62c;hpb=daf86646c7cdb484ec05165a49c12224da11a4c7;p=poolifier.git diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index 30b4d01f..f76ad2f5 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -1,6 +1,7 @@ import { randomUUID } from 'node:crypto' import { performance } from 'node:perf_hooks' import { existsSync } from 'node:fs' +import { type TransferListItem } from 'node:worker_threads' import type { MessageValue, PromiseResponseWrapper, @@ -105,7 +106,9 @@ export abstract class AbstractPool< protected readonly opts: PoolOptions ) { if (!this.isMain()) { - throw new Error('Cannot start a pool from a worker!') + throw new Error( + 'Cannot start a pool from a worker with the same type as the pool' + ) } this.checkNumberOfWorkers(this.numberOfWorkers) this.checkFilePath(this.filePath) @@ -186,7 +189,7 @@ export abstract class AbstractPool< ) } else if (max === 0) { throw new RangeError( - 'Cannot instantiate a dynamic pool with a pool size equal to zero' + 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero' ) } else if (min === max) { throw new RangeError( @@ -359,14 +362,14 @@ export abstract class AbstractPool< minimum: round( Math.min( ...this.workerNodes.map( - workerNode => workerNode.usage.runTime?.minimum ?? Infinity + (workerNode) => workerNode.usage.runTime?.minimum ?? Infinity ) ) ), maximum: round( Math.max( ...this.workerNodes.map( - workerNode => workerNode.usage.runTime?.maximum ?? -Infinity + (workerNode) => workerNode.usage.runTime?.maximum ?? -Infinity ) ) ), @@ -387,7 +390,7 @@ export abstract class AbstractPool< median: round( median( this.workerNodes.map( - workerNode => workerNode.usage.runTime?.median ?? 0 + (workerNode) => workerNode.usage.runTime?.median ?? 0 ) ) ) @@ -400,14 +403,14 @@ export abstract class AbstractPool< minimum: round( Math.min( ...this.workerNodes.map( - workerNode => workerNode.usage.waitTime?.minimum ?? Infinity + (workerNode) => workerNode.usage.waitTime?.minimum ?? Infinity ) ) ), maximum: round( Math.max( ...this.workerNodes.map( - workerNode => workerNode.usage.waitTime?.maximum ?? -Infinity + (workerNode) => workerNode.usage.waitTime?.maximum ?? -Infinity ) ) ), @@ -428,7 +431,7 @@ export abstract class AbstractPool< median: round( median( this.workerNodes.map( - workerNode => workerNode.usage.waitTime?.median ?? 0 + (workerNode) => workerNode.usage.waitTime?.median ?? 0 ) ) ) @@ -521,7 +524,7 @@ export abstract class AbstractPool< */ private getWorkerNodeKeyByWorker (worker: Worker): number { return this.workerNodes.findIndex( - workerNode => workerNode.worker === worker + (workerNode) => workerNode.worker === worker ) } @@ -533,7 +536,7 @@ export abstract class AbstractPool< */ private getWorkerNodeKeyByWorkerId (workerId: number): number { return this.workerNodes.findIndex( - workerNode => workerNode.info.id === workerId + (workerNode) => workerNode.info.id === workerId ) } @@ -615,33 +618,55 @@ export abstract class AbstractPool< protected abstract get busy (): boolean /** - * Whether worker nodes are executing at least one task. + * Whether worker nodes are executing concurrently their tasks quota or not. * * @returns Worker nodes busyness boolean status. */ protected internalBusy (): boolean { - return ( - this.workerNodes.findIndex( - workerNode => - workerNode.info.ready && workerNode.usage.tasks.executing === 0 - ) === -1 - ) + if (this.opts.enableTasksQueue === true) { + return ( + this.workerNodes.findIndex( + (workerNode) => + workerNode.info.ready && + workerNode.usage.tasks.executing < + (this.opts.tasksQueueOptions?.concurrency as number) + ) === -1 + ) + } else { + return ( + this.workerNodes.findIndex( + (workerNode) => + workerNode.info.ready && workerNode.usage.tasks.executing === 0 + ) === -1 + ) + } } /** @inheritDoc */ - public async execute (data?: Data, name?: string): Promise { + public async execute ( + data?: Data, + name?: string, + transferList?: TransferListItem[] + ): Promise { return await new Promise((resolve, reject) => { + if (name != null && typeof name !== 'string') { + reject(new TypeError('name argument must be a string')) + } + if (transferList != null && !Array.isArray(transferList)) { + reject(new TypeError('transferList argument must be an array')) + } const timestamp = performance.now() const workerNodeKey = this.chooseWorkerNode() const task: Task = { name: name ?? DEFAULT_TASK_NAME, // eslint-disable-next-line @typescript-eslint/consistent-type-assertions data: data ?? ({} as Data), + transferList, timestamp, workerId: this.getWorkerInfo(workerNodeKey).id as number, - id: randomUUID() + taskId: randomUUID() } - this.promiseResponseMap.set(task.id as string, { + this.promiseResponseMap.set(task.taskId as string, { resolve, reject, workerNodeKey @@ -669,6 +694,23 @@ export abstract class AbstractPool< ) } + protected async sendKillMessageToWorker ( + workerNodeKey: number, + workerId: number + ): Promise { + const waitForKillResponse = new Promise((resolve, reject) => { + this.registerWorkerMessageListener(workerNodeKey, (message) => { + if (message.kill === 'success') { + resolve() + } else if (message.kill === 'failure') { + reject(new Error(`Worker ${workerId} kill message handling failed`)) + } + }) + }) + this.sendToWorker(workerNodeKey, { kill: true, workerId }) + await waitForKillResponse + } + /** * Terminates the worker node given its worker node key. * @@ -839,10 +881,12 @@ export abstract class AbstractPool< * * @param workerNodeKey - The worker node key. * @param message - The message. + * @param transferList - The optional array of transferable objects. */ protected abstract sendToWorker ( workerNodeKey: number, - message: MessageValue + message: MessageValue, + transferList?: TransferListItem[] ): void /** @@ -862,7 +906,7 @@ export abstract class AbstractPool< worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION) worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION) - worker.on('error', error => { + worker.on('error', (error) => { const workerNodeKey = this.getWorkerNodeKeyByWorker(worker) const workerInfo = this.getWorkerInfo(workerNodeKey) workerInfo.ready = false @@ -899,7 +943,7 @@ export abstract class AbstractPool< */ protected createAndSetupDynamicWorkerNode (): number { const workerNodeKey = this.createAndSetupWorkerNode() - this.registerWorkerMessageListener(workerNodeKey, message => { + this.registerWorkerMessageListener(workerNodeKey, (message) => { const localWorkerNodeKey = this.getWorkerNodeKeyByWorkerId( message.workerId ) @@ -907,7 +951,7 @@ export abstract class AbstractPool< // Kill message received from worker if ( isKillBehavior(KillBehaviors.HARD, message.kill) || - (message.kill != null && + (isKillBehavior(KillBehaviors.SOFT, message.kill) && ((this.opts.enableTasksQueue === false && workerUsage.tasks.executing === 0) || (this.opts.enableTasksQueue === true && @@ -1032,12 +1076,12 @@ export abstract class AbstractPool< * @returns The listener function to execute when a message is received from a worker. */ protected workerListener (): (message: MessageValue) => void { - return message => { + return (message) => { this.checkMessageWorkerId(message) if (message.ready != null) { // Worker ready response received from worker this.handleWorkerReadyResponse(message) - } else if (message.id != null) { + } else if (message.taskId != null) { // Task execution response received from worker this.handleTaskExecutionResponse(message) } @@ -1054,7 +1098,9 @@ export abstract class AbstractPool< } private handleTaskExecutionResponse (message: MessageValue): void { - const promiseResponse = this.promiseResponseMap.get(message.id as string) + const promiseResponse = this.promiseResponseMap.get( + message.taskId as string + ) if (promiseResponse != null) { if (message.taskError != null) { this.emitter?.emit(PoolEvents.taskError, message.taskError) @@ -1064,7 +1110,7 @@ export abstract class AbstractPool< } const workerNodeKey = promiseResponse.workerNodeKey this.afterTaskExecutionHook(workerNodeKey, message) - this.promiseResponseMap.delete(message.id as string) + this.promiseResponseMap.delete(message.taskId as string) if ( this.opts.enableTasksQueue === true && this.tasksQueueSize(workerNodeKey) > 0 && @@ -1143,7 +1189,7 @@ export abstract class AbstractPool< */ private executeTask (workerNodeKey: number, task: Task): void { this.beforeTaskExecutionHook(workerNodeKey, task) - this.sendToWorker(workerNodeKey, task) + this.sendToWorker(workerNodeKey, task, task.transferList) } private enqueueTask (workerNodeKey: number, task: Task): number {