import { randomUUID } from 'node:crypto'
import { performance } from 'node:perf_hooks'
-import { existsSync } from 'node:fs'
import { type TransferListItem } from 'node:worker_threads'
import type {
MessageValue,
import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
import { version } from './version'
import { WorkerNode } from './worker-node'
+import {
+ checkFilePath,
+ checkValidTasksQueueOptions,
+ checkValidWorkerChoiceStrategy
+} from './utils'
/**
* Base class that implements some shared logic for all poolifier pools.
)
}
this.checkNumberOfWorkers(this.numberOfWorkers)
- this.checkFilePath(this.filePath)
+ checkFilePath(this.filePath)
this.checkPoolOptions(this.opts)
this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
this.startTimestamp = performance.now()
}
- private checkFilePath (filePath: string): void {
- if (
- filePath == null ||
- typeof filePath !== 'string' ||
- (typeof filePath === 'string' && filePath.trim().length === 0)
- ) {
- throw new Error('Please specify a file with a worker implementation')
- }
- if (!existsSync(filePath)) {
- throw new Error(`Cannot find the worker file '${filePath}'`)
- }
- }
-
private checkNumberOfWorkers (numberOfWorkers: number): void {
if (numberOfWorkers == null) {
throw new Error(
}
}
- protected checkDynamicPoolSize (min: number, max: number): void {
- if (this.type === PoolTypes.dynamic) {
- if (max == null) {
- throw new TypeError(
- 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
- )
- } else if (!Number.isSafeInteger(max)) {
- throw new TypeError(
- 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
- )
- } else if (min > max) {
- throw new RangeError(
- 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
- )
- } else if (max === 0) {
- throw new RangeError(
- 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
- )
- } else if (min === max) {
- throw new RangeError(
- 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
- )
- }
- }
- }
-
private checkPoolOptions (opts: PoolOptions<Worker>): void {
if (isPlainObject(opts)) {
this.opts.startWorkers = opts.startWorkers ?? true
- this.checkValidWorkerChoiceStrategy(
+ checkValidWorkerChoiceStrategy(
opts.workerChoiceStrategy as WorkerChoiceStrategy
)
this.opts.workerChoiceStrategy =
this.opts.enableEvents = opts.enableEvents ?? true
this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
if (this.opts.enableTasksQueue) {
- this.checkValidTasksQueueOptions(
- opts.tasksQueueOptions as TasksQueueOptions
- )
+ checkValidTasksQueueOptions(opts.tasksQueueOptions as TasksQueueOptions)
this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
opts.tasksQueueOptions as TasksQueueOptions
)
}
}
- private checkValidWorkerChoiceStrategy (
- workerChoiceStrategy: WorkerChoiceStrategy
- ): void {
- if (
- workerChoiceStrategy != null &&
- !Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)
- ) {
- throw new Error(
- `Invalid worker choice strategy '${workerChoiceStrategy}'`
- )
- }
- }
-
private checkValidWorkerChoiceStrategyOptions (
workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
): void {
}
}
- private checkValidTasksQueueOptions (
- tasksQueueOptions: TasksQueueOptions
- ): void {
- if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
- throw new TypeError('Invalid tasks queue options: must be a plain object')
- }
- if (
- tasksQueueOptions?.concurrency != null &&
- !Number.isSafeInteger(tasksQueueOptions.concurrency)
- ) {
- throw new TypeError(
- 'Invalid worker node tasks concurrency: must be an integer'
- )
- }
- if (
- tasksQueueOptions?.concurrency != null &&
- tasksQueueOptions.concurrency <= 0
- ) {
- throw new RangeError(
- `Invalid worker node tasks concurrency: ${tasksQueueOptions.concurrency} is a negative integer or zero`
- )
- }
- if (
- tasksQueueOptions?.size != null &&
- !Number.isSafeInteger(tasksQueueOptions.size)
- ) {
- throw new TypeError(
- 'Invalid worker node tasks queue size: must be an integer'
- )
- }
- if (tasksQueueOptions?.size != null && tasksQueueOptions.size <= 0) {
- throw new RangeError(
- `Invalid worker node tasks queue size: ${tasksQueueOptions.size} is a negative integer or zero`
- )
- }
- }
-
/** @inheritDoc */
public get info (): PoolInfo {
return {
workerChoiceStrategy: WorkerChoiceStrategy,
workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
): void {
- this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
+ checkValidWorkerChoiceStrategy(workerChoiceStrategy)
this.opts.workerChoiceStrategy = workerChoiceStrategy
this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
this.opts.workerChoiceStrategy
/** @inheritDoc */
public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
if (this.opts.enableTasksQueue === true) {
- this.checkValidTasksQueueOptions(tasksQueueOptions)
+ checkValidTasksQueueOptions(tasksQueueOptions)
this.opts.tasksQueueOptions =
this.buildTasksQueueOptions(tasksQueueOptions)
this.setTasksQueueSize(this.opts.tasksQueueOptions.size as number)
workerNodeKey: number,
message: MessageValue<Data>
): Promise<boolean> {
- const workerId = this.getWorkerInfo(workerNodeKey).id as number
return await new Promise<boolean>((resolve, reject) => {
+ const workerId = this.getWorkerInfo(workerNodeKey).id as number
this.registerWorkerMessageListener(workerNodeKey, message => {
if (
message.workerId === workerId &&
) {
reject(
new Error(
- `Task function operation ${
+ `Task function operation '${
message.taskFunctionOperation as string
- } failed on worker ${message.workerId}`
+ }' failed on worker ${message.workerId} with error: '${
+ message.workerError?.message as string
+ }'`
)
)
}
}
private async sendTaskFunctionOperationToWorkers (
- message: Omit<MessageValue<Data>, 'workerId'>
+ message: MessageValue<Data>
): Promise<boolean> {
return await new Promise<boolean>((resolve, reject) => {
const responsesReceived = new Array<MessageValue<Data | Response>>()
message => message.taskFunctionOperationStatus === false
)
) {
+ const errorResponse = responsesReceived.find(
+ response => response.taskFunctionOperationStatus === false
+ )
reject(
new Error(
- `Task function operation ${
+ `Task function operation '${
message.taskFunctionOperation as string
- } failed on worker ${message.workerId as number}`
+ }' failed on worker ${
+ errorResponse?.workerId as number
+ } with error: '${
+ errorResponse?.workerError?.message as string
+ }'`
)
)
}
/** @inheritDoc */
public async addTaskFunction (
name: string,
- taskFunction: TaskFunction<Data, Response>
+ fn: TaskFunction<Data, Response>
): Promise<boolean> {
- this.taskFunctions.set(name, taskFunction)
- return await this.sendTaskFunctionOperationToWorkers({
+ if (typeof name !== 'string') {
+ throw new TypeError('name argument must be a string')
+ }
+ if (typeof name === 'string' && name.trim().length === 0) {
+ throw new TypeError('name argument must not be an empty string')
+ }
+ if (typeof fn !== 'function') {
+ throw new TypeError('fn argument must be a function')
+ }
+ const opResult = await this.sendTaskFunctionOperationToWorkers({
taskFunctionOperation: 'add',
taskFunctionName: name,
- taskFunction: taskFunction.toString()
+ taskFunction: fn.toString()
})
+ this.taskFunctions.set(name, fn)
+ return opResult
}
/** @inheritDoc */
'Cannot remove a task function not handled on the pool side'
)
}
- this.taskFunctions.delete(name)
- return await this.sendTaskFunctionOperationToWorkers({
+ const opResult = await this.sendTaskFunctionOperationToWorkers({
taskFunctionOperation: 'remove',
taskFunctionName: name
})
+ this.deleteTaskFunctionWorkerUsages(name)
+ this.taskFunctions.delete(name)
+ return opResult
}
/** @inheritDoc */
})
}
+ private deleteTaskFunctionWorkerUsages (name: string): void {
+ for (const workerNode of this.workerNodes) {
+ workerNode.deleteTaskFunctionWorkerUsage(name)
+ }
+ }
+
private shallExecuteTask (workerNodeKey: number): boolean {
return (
this.tasksQueueSize(workerNodeKey) === 0 &&