import { randomUUID } from 'node:crypto'
import { performance } from 'node:perf_hooks'
-import { existsSync } from 'node:fs'
import { type TransferListItem } from 'node:worker_threads'
import type {
MessageValue,
import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
import { version } from './version'
import { WorkerNode } from './worker-node'
+import {
+ checkFilePath,
+ checkValidTasksQueueOptions,
+ checkValidWorkerChoiceStrategy
+} from './utils'
/**
* Base class that implements some shared logic for all poolifier pools.
)
}
this.checkNumberOfWorkers(this.numberOfWorkers)
- this.checkFilePath(this.filePath)
+ checkFilePath(this.filePath)
this.checkPoolOptions(this.opts)
this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
this.startTimestamp = performance.now()
}
- private checkFilePath (filePath: string): void {
- if (
- filePath == null ||
- typeof filePath !== 'string' ||
- (typeof filePath === 'string' && filePath.trim().length === 0)
- ) {
- throw new Error('Please specify a file with a worker implementation')
- }
- if (!existsSync(filePath)) {
- throw new Error(`Cannot find the worker file '${filePath}'`)
- }
- }
-
private checkNumberOfWorkers (numberOfWorkers: number): void {
if (numberOfWorkers == null) {
throw new Error(
}
}
- protected checkDynamicPoolSize (min: number, max: number): void {
- if (this.type === PoolTypes.dynamic) {
- if (max == null) {
- throw new TypeError(
- 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
- )
- } else if (!Number.isSafeInteger(max)) {
- throw new TypeError(
- 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
- )
- } else if (min > max) {
- throw new RangeError(
- 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
- )
- } else if (max === 0) {
- throw new RangeError(
- 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
- )
- } else if (min === max) {
- throw new RangeError(
- 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
- )
- }
- }
- }
-
private checkPoolOptions (opts: PoolOptions<Worker>): void {
if (isPlainObject(opts)) {
this.opts.startWorkers = opts.startWorkers ?? true
+ checkValidWorkerChoiceStrategy(
+ opts.workerChoiceStrategy as WorkerChoiceStrategy
+ )
this.opts.workerChoiceStrategy =
opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
- this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
+ this.checkValidWorkerChoiceStrategyOptions(
+ opts.workerChoiceStrategyOptions as WorkerChoiceStrategyOptions
+ )
this.opts.workerChoiceStrategyOptions = {
...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
...opts.workerChoiceStrategyOptions
}
- this.checkValidWorkerChoiceStrategyOptions(
- this.opts.workerChoiceStrategyOptions
- )
this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
this.opts.enableEvents = opts.enableEvents ?? true
this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
if (this.opts.enableTasksQueue) {
- this.checkValidTasksQueueOptions(
- opts.tasksQueueOptions as TasksQueueOptions
- )
+ checkValidTasksQueueOptions(opts.tasksQueueOptions as TasksQueueOptions)
this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
opts.tasksQueueOptions as TasksQueueOptions
)
}
}
- private checkValidWorkerChoiceStrategy (
- workerChoiceStrategy: WorkerChoiceStrategy
- ): void {
- if (!Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)) {
- throw new Error(
- `Invalid worker choice strategy '${workerChoiceStrategy}'`
- )
- }
- }
-
private checkValidWorkerChoiceStrategyOptions (
workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
): void {
- if (!isPlainObject(workerChoiceStrategyOptions)) {
+ if (
+ workerChoiceStrategyOptions != null &&
+ !isPlainObject(workerChoiceStrategyOptions)
+ ) {
throw new TypeError(
'Invalid worker choice strategy options: must be a plain object'
)
}
if (
- workerChoiceStrategyOptions.retries != null &&
+ workerChoiceStrategyOptions?.retries != null &&
!Number.isSafeInteger(workerChoiceStrategyOptions.retries)
) {
throw new TypeError(
)
}
if (
- workerChoiceStrategyOptions.retries != null &&
+ workerChoiceStrategyOptions?.retries != null &&
workerChoiceStrategyOptions.retries < 0
) {
throw new RangeError(
)
}
if (
- workerChoiceStrategyOptions.weights != null &&
+ workerChoiceStrategyOptions?.weights != null &&
Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
) {
throw new Error(
)
}
if (
- workerChoiceStrategyOptions.measurement != null &&
+ workerChoiceStrategyOptions?.measurement != null &&
!Object.values(Measurements).includes(
workerChoiceStrategyOptions.measurement
)
}
}
- private checkValidTasksQueueOptions (
- tasksQueueOptions: TasksQueueOptions
- ): void {
- if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
- throw new TypeError('Invalid tasks queue options: must be a plain object')
- }
- if (
- tasksQueueOptions?.concurrency != null &&
- !Number.isSafeInteger(tasksQueueOptions?.concurrency)
- ) {
- throw new TypeError(
- 'Invalid worker node tasks concurrency: must be an integer'
- )
- }
- if (
- tasksQueueOptions?.concurrency != null &&
- tasksQueueOptions?.concurrency <= 0
- ) {
- throw new RangeError(
- `Invalid worker node tasks concurrency: ${tasksQueueOptions?.concurrency} is a negative integer or zero`
- )
- }
- if (
- tasksQueueOptions?.size != null &&
- !Number.isSafeInteger(tasksQueueOptions?.size)
- ) {
- throw new TypeError(
- 'Invalid worker node tasks queue size: must be an integer'
- )
- }
- if (tasksQueueOptions?.size != null && tasksQueueOptions?.size <= 0) {
- throw new RangeError(
- `Invalid worker node tasks queue size: ${tasksQueueOptions?.size} is a negative integer or zero`
- )
- }
- }
-
/** @inheritDoc */
public get info (): PoolInfo {
return {
workerChoiceStrategy: WorkerChoiceStrategy,
workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
): void {
- this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
+ checkValidWorkerChoiceStrategy(workerChoiceStrategy)
this.opts.workerChoiceStrategy = workerChoiceStrategy
this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
this.opts.workerChoiceStrategy
tasksQueueOptions?: TasksQueueOptions
): void {
if (this.opts.enableTasksQueue === true && !enable) {
+ this.unsetTaskStealing()
+ this.unsetTasksStealingOnBackPressure()
this.flushTasksQueues()
}
this.opts.enableTasksQueue = enable
/** @inheritDoc */
public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
if (this.opts.enableTasksQueue === true) {
- this.checkValidTasksQueueOptions(tasksQueueOptions)
+ checkValidTasksQueueOptions(tasksQueueOptions)
this.opts.tasksQueueOptions =
this.buildTasksQueueOptions(tasksQueueOptions)
this.setTasksQueueSize(this.opts.tasksQueueOptions.size as number)
+ if (this.opts.tasksQueueOptions.taskStealing === true) {
+ this.setTaskStealing()
+ } else {
+ this.unsetTaskStealing()
+ }
+ if (this.opts.tasksQueueOptions.tasksStealingOnBackPressure === true) {
+ this.setTasksStealingOnBackPressure()
+ } else {
+ this.unsetTasksStealingOnBackPressure()
+ }
} else if (this.opts.tasksQueueOptions != null) {
delete this.opts.tasksQueueOptions
}
}
- private setTasksQueueSize (size: number): void {
- for (const workerNode of this.workerNodes) {
- workerNode.tasksQueueBackPressureSize = size
- }
- }
-
private buildTasksQueueOptions (
tasksQueueOptions: TasksQueueOptions
): TasksQueueOptions {
}
}
+ private setTasksQueueSize (size: number): void {
+ for (const workerNode of this.workerNodes) {
+ workerNode.tasksQueueBackPressureSize = size
+ }
+ }
+
+ private setTaskStealing (): void {
+ for (const [workerNodeKey] of this.workerNodes.entries()) {
+ this.workerNodes[workerNodeKey].onEmptyQueue =
+ this.taskStealingOnEmptyQueue.bind(this)
+ }
+ }
+
+ private unsetTaskStealing (): void {
+ for (const [workerNodeKey] of this.workerNodes.entries()) {
+ delete this.workerNodes[workerNodeKey].onEmptyQueue
+ }
+ }
+
+ private setTasksStealingOnBackPressure (): void {
+ for (const [workerNodeKey] of this.workerNodes.entries()) {
+ this.workerNodes[workerNodeKey].onBackPressure =
+ this.tasksStealingOnBackPressure.bind(this)
+ }
+ }
+
+ private unsetTasksStealingOnBackPressure (): void {
+ for (const [workerNodeKey] of this.workerNodes.entries()) {
+ delete this.workerNodes[workerNodeKey].onBackPressure
+ }
+ }
+
/**
* Whether the pool is full or not.
*
workerNodeKey: number,
message: MessageValue<Data>
): Promise<boolean> {
- const workerId = this.getWorkerInfo(workerNodeKey).id as number
return await new Promise<boolean>((resolve, reject) => {
+ const workerId = this.getWorkerInfo(workerNodeKey).id as number
this.registerWorkerMessageListener(workerNodeKey, message => {
if (
message.workerId === workerId &&
) {
reject(
new Error(
- `Task function operation ${
+ `Task function operation '${
message.taskFunctionOperation as string
- } failed on worker ${message.workerId}`
+ }' failed on worker ${message.workerId} with error: '${
+ message.workerError?.message as string
+ }'`
)
)
}
}
private async sendTaskFunctionOperationToWorkers (
- message: Omit<MessageValue<Data>, 'workerId'>
+ message: MessageValue<Data>
): Promise<boolean> {
return await new Promise<boolean>((resolve, reject) => {
const responsesReceived = new Array<MessageValue<Data | Response>>()
message => message.taskFunctionOperationStatus === false
)
) {
+ const errorResponse = responsesReceived.find(
+ response => response.taskFunctionOperationStatus === false
+ )
reject(
new Error(
- `Task function operation ${
+ `Task function operation '${
message.taskFunctionOperation as string
- } failed on worker ${message.workerId as number}`
+ }' failed on worker ${
+ errorResponse?.workerId as number
+ } with error: '${
+ errorResponse?.workerError?.message as string
+ }'`
)
)
}
/** @inheritDoc */
public async addTaskFunction (
name: string,
- taskFunction: TaskFunction<Data, Response>
+ fn: TaskFunction<Data, Response>
): Promise<boolean> {
- this.taskFunctions.set(name, taskFunction)
- return await this.sendTaskFunctionOperationToWorkers({
+ if (typeof name !== 'string') {
+ throw new TypeError('name argument must be a string')
+ }
+ if (typeof name === 'string' && name.trim().length === 0) {
+ throw new TypeError('name argument must not be an empty string')
+ }
+ if (typeof fn !== 'function') {
+ throw new TypeError('fn argument must be a function')
+ }
+ const opResult = await this.sendTaskFunctionOperationToWorkers({
taskFunctionOperation: 'add',
taskFunctionName: name,
- taskFunction: taskFunction.toString()
+ taskFunction: fn.toString()
})
+ this.taskFunctions.set(name, fn)
+ return opResult
}
/** @inheritDoc */
public async removeTaskFunction (name: string): Promise<boolean> {
- this.taskFunctions.delete(name)
- return await this.sendTaskFunctionOperationToWorkers({
+ if (!this.taskFunctions.has(name)) {
+ throw new Error(
+ 'Cannot remove a task function not handled on the pool side'
+ )
+ }
+ const opResult = await this.sendTaskFunctionOperationToWorkers({
taskFunctionOperation: 'remove',
taskFunctionName: name
})
+ this.deleteTaskFunctionWorkerUsages(name)
+ this.taskFunctions.delete(name)
+ return opResult
}
/** @inheritDoc */
})
}
+ private deleteTaskFunctionWorkerUsages (name: string): void {
+ for (const workerNode of this.workerNodes) {
+ workerNode.deleteTaskFunctionWorkerUsage(name)
+ }
+ }
+
private shallExecuteTask (workerNodeKey: number): boolean {
return (
this.tasksQueueSize(workerNodeKey) === 0 &&
this.workerNodes[workerNodeKey].closeChannel()
this.emitter?.emit(PoolEvents.error, error)
if (
- this.opts.restartWorkerOnError === true &&
this.started &&
- !this.starting
+ !this.starting &&
+ this.opts.restartWorkerOnError === true
) {
if (workerInfo.dynamic) {
this.createAndSetupDynamicWorkerNode()
this.createAndSetupWorkerNode()
}
}
- if (this.opts.enableTasksQueue === true) {
+ if (this.started && this.opts.enableTasksQueue === true) {
this.redistributeQueuedTasks(workerNodeKey)
}
})
)
workerInfo.ready = message.ready as boolean
workerInfo.taskFunctionNames = message.taskFunctionNames
- if (this.emitter != null && this.ready) {
- this.emitter.emit(PoolEvents.ready, this.info)
+ if (this.ready) {
+ this.emitter?.emit(PoolEvents.ready, this.info)
}
}