X-Git-Url: https://git.piment-noir.org/?a=blobdiff_plain;f=src%2Fpools%2Fabstract-pool.ts;h=712de0b7da4c38c2808e1be61da7fced8e372da0;hb=884743b122a66be45c94d83d9230e28a9ab43836;hp=c8a87251df2f5701e92c16603870fa17a30a5d38;hpb=3a5027122ca6401ae1d755843b20f714c61e3240;p=poolifier.git diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index c8a87251..712de0b7 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -588,7 +588,8 @@ export abstract class AbstractPool< } /** - * The pool readiness boolean status. + * Whether the pool is ready or not. + * @returns The pool readiness boolean status. */ private get ready (): boolean { if (this.empty) { @@ -606,7 +607,8 @@ export abstract class AbstractPool< } /** - * The pool emptiness boolean status. + * Whether the pool is empty or not. + * @returns The pool emptiness boolean status. */ protected get empty (): boolean { return this.minimumNumberOfWorkers === 0 && this.workerNodes.length === 0 @@ -658,7 +660,7 @@ export abstract class AbstractPool< throw new Error('Worker message received without worker id') } else if (this.getWorkerNodeKeyByWorkerId(message.workerId) === -1) { throw new Error( - `Worker message received from unknown worker '${message.workerId}'` + `Worker message received from unknown worker '${message.workerId.toString()}'` ) } } @@ -820,8 +822,7 @@ export abstract class AbstractPool< /** * Whether the pool is full or not. - * - * The pool filling boolean status. + * @returns The pool fullness boolean status. */ protected get full (): boolean { return ( @@ -832,8 +833,7 @@ export abstract class AbstractPool< /** * Whether the pool is busy or not. - * - * The pool busyness boolean status. + * @returns The pool busyness boolean status. */ protected abstract get busy (): boolean @@ -891,7 +891,11 @@ export abstract class AbstractPool< } else { reject( new Error( - `Task function operation '${message.taskFunctionOperation}' failed on worker ${message.workerId} with error: '${message.workerError?.message}'` + // eslint-disable-next-line @typescript-eslint/restrict-template-expressions + `Task function operation '${message.taskFunctionOperation?.toString()}' failed on worker ${message.workerId?.toString()} with error: '${ + // eslint-disable-next-line @typescript-eslint/restrict-template-expressions + message.workerError?.message + }'` ) ) } @@ -939,7 +943,9 @@ export abstract class AbstractPool< new Error( `Task function operation '${ message.taskFunctionOperation as string - }' failed on worker ${errorResponse?.workerId} with error: '${ + // eslint-disable-next-line @typescript-eslint/restrict-template-expressions + }' failed on worker ${errorResponse?.workerId?.toString()} with error: '${ + // eslint-disable-next-line @typescript-eslint/restrict-template-expressions errorResponse?.workerError?.message }'` ) @@ -1185,7 +1191,6 @@ export abstract class AbstractPool< const workerNodeKey = this.chooseWorkerNode(name) const task: Task = { name: name ?? DEFAULT_TASK_NAME, - data: data ?? ({} as Data), priority: this.getWorkerNodeTaskFunctionPriority(workerNodeKey, name), strategy: this.getWorkerNodeTaskFunctionWorkerChoiceStrategy( @@ -1220,11 +1225,36 @@ export abstract class AbstractPool< }) } + + /** @inheritDoc */ + public mapExecute ( + data: Iterable, + name?: string, + transferList?: readonly TransferListItem[] + ): Promise { + // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition + if (data == null) { + throw new TypeError('data argument must be a defined iterable') + } + if (typeof data[Symbol.iterator] !== 'function') { + throw new TypeError('data argument must be an iterable') + } + if (!Array.isArray(data)) { + data = [...data] + } + return Promise.all( + (data as Data[]).map(data => this.execute(data, name, transferList)) + ) + } + /** * Starts the minimum number of workers. - * @param initWorkerNodeUsage + * @param initWorkerNodeUsage - Whether to initialize the worker node usage or not. @defaultValue false */ private startMinimumNumberOfWorkers (initWorkerNodeUsage = false): void { + if (this.minimumNumberOfWorkers === 0) { + return + } this.startingMinimumNumberOfWorkers = true while ( this.workerNodes.reduce( @@ -1285,6 +1315,7 @@ export abstract class AbstractPool< private async sendKillMessageToWorker (workerNodeKey: number): Promise { await new Promise((resolve, reject) => { + // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition if (this.workerNodes[workerNodeKey] == null) { resolve() return @@ -1296,7 +1327,8 @@ export abstract class AbstractPool< } else if (message.kill === 'failure') { reject( new Error( - `Kill message handling failed on worker ${message.workerId}` + // eslint-disable-next-line @typescript-eslint/restrict-template-expressions + `Kill message handling failed on worker ${message.workerId?.toString()}` ) ) } @@ -1352,6 +1384,7 @@ export abstract class AbstractPool< workerNodeKey: number, task: Task ): void { + // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition if (this.workerNodes[workerNodeKey]?.usage != null) { const workerUsage = this.workerNodes[workerNodeKey].usage ++workerUsage.tasks.executing @@ -1392,7 +1425,7 @@ export abstract class AbstractPool< message: MessageValue ): void { let needWorkerChoiceStrategiesUpdate = false - + // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition if (this.workerNodes[workerNodeKey]?.usage != null) { const workerUsage = this.workerNodes[workerNodeKey].usage updateTaskStatisticsWorkerUsage(workerUsage, message) @@ -1570,7 +1603,7 @@ export abstract class AbstractPool< ) { this.redistributeQueuedTasks(this.workerNodes.indexOf(workerNode)) } - + // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition workerNode?.terminate().catch((error: unknown) => { this.emitter?.emit(PoolEvents.error, error) }) @@ -1789,7 +1822,7 @@ export abstract class AbstractPool< taskName: string ): void { const workerNode = this.workerNodes[workerNodeKey] - + // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition if (workerNode?.usage != null) { ++workerNode.usage.tasks.stolen } @@ -1808,7 +1841,7 @@ export abstract class AbstractPool< previousTaskName?: string ): void { const workerNode = this.workerNodes[workerNodeKey] - + // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition if (workerNode?.usage != null) { ++workerNode.usage.tasks.sequentiallyStolen } @@ -1837,7 +1870,7 @@ export abstract class AbstractPool< taskName: string ): void { const workerNode = this.workerNodes[workerNodeKey] - + // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition if (workerNode?.usage != null) { workerNode.usage.tasks.sequentiallyStolen = 0 } @@ -1865,7 +1898,7 @@ export abstract class AbstractPool< const workerInfo = this.getWorkerInfo(workerNodeKey) if (workerInfo == null) { throw new Error( - `Worker node with key '${workerNodeKey}' not found in pool` + `Worker node with key '${workerNodeKey.toString()}' not found in pool` ) } if ( @@ -1981,7 +2014,7 @@ export abstract class AbstractPool< const workerInfo = this.getWorkerInfo(workerNodeKey) if (workerInfo == null) { throw new Error( - `Worker node with key '${workerNodeKey}' not found in pool` + `Worker node with key '${workerNodeKey.toString()}' not found in pool` ) } workerInfo.stealing = true @@ -2003,7 +2036,7 @@ export abstract class AbstractPool< /** * This method is the message listener registered on each worker. - * @param message + * @param message - The message received from the worker. */ protected readonly workerMessageListener = ( message: MessageValue @@ -2038,7 +2071,8 @@ export abstract class AbstractPool< private handleWorkerReadyResponse (message: MessageValue): void { const { workerId, ready, taskFunctionsProperties } = message if (ready == null || !ready) { - throw new Error(`Worker ${workerId} failed to initialize`) + // eslint-disable-next-line @typescript-eslint/restrict-template-expressions + throw new Error(`Worker ${workerId?.toString()} failed to initialize`) } const workerNodeKey = this.getWorkerNodeKeyByWorkerId(workerId) const workerNode = this.workerNodes[workerNodeKey] @@ -2074,10 +2108,12 @@ export abstract class AbstractPool< this.afterTaskExecutionHook(workerNodeKey, message) // eslint-disable-next-line @typescript-eslint/no-non-null-assertion this.promiseResponseMap.delete(taskId!) + // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition workerNode?.emit('taskFinished', taskId) if ( this.opts.enableTasksQueue === true && !this.destroying && + // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition workerNode != null ) { const workerNodeTasksUsage = workerNode.usage.tasks