- [`pool = new DynamicThreadPool/DynamicClusterPool(min, max, filePath, opts)`](#pool--new-dynamicthreadpooldynamicclusterpoolmin-max-filepath-opts)
- [`pool.execute(data, name, transferList)`](#poolexecutedata-name-transferlist)
- [`pool.destroy()`](#pooldestroy)
- - [`pool.listTaskFunctions()`](#poollisttaskfunctions)
+ - [`pool.listTaskFunctionNames()`](#poollisttaskfunctionnames)
- [`PoolOptions`](#pooloptions)
- [`ThreadPoolOptions extends PoolOptions`](#threadpooloptions-extends-pooloptions)
- [`ClusterPoolOptions extends PoolOptions`](#clusterpooloptions-extends-pooloptions)
- [`YourWorker.hasTaskFunction(name)`](#yourworkerhastaskfunctionname)
- [`YourWorker.addTaskFunction(name, fn)`](#yourworkeraddtaskfunctionname-fn)
- [`YourWorker.removeTaskFunction(name)`](#yourworkerremovetaskfunctionname)
- - [`YourWorker.listTaskFunctions()`](#yourworkerlisttaskfunctions)
+ - [`YourWorker.listTaskFunctionNames()`](#yourworkerlisttaskfunctionnames)
- [`YourWorker.setDefaultTaskFunction(name)`](#yourworkersetdefaulttaskfunctionname)
## Pool
This method is available on both pool implementations and will call the terminate method on each worker.
-### `pool.listTaskFunctions()`
+### `pool.listTaskFunctionNames()`
This method is available on both pool implementations and returns an array of the task function names.
This method is available on both worker implementations and returns a boolean.
-#### `YourWorker.listTaskFunctions()`
+#### `YourWorker.listTaskFunctionNames()`
This method is available on both worker implementations and returns an array of the task function names.
MessageValue,
PromiseResponseWrapper,
Task,
- TaskError,
+ WorkerError,
TaskPerformance,
WorkerStatistics,
Writable
updateMeasurementStatistics
} from '../utils'
import { KillBehaviors } from '../worker/worker-options'
+import type { TaskFunction } from '../worker/task-functions'
import {
type IPool,
PoolEmitter,
}
}
+ private sendToWorkers (message: Omit<MessageValue<Data>, 'workerId'>): number {
+ let messagesCount = 0
+ for (const [workerNodeKey] of this.workerNodes.entries()) {
+ this.sendToWorker(workerNodeKey, {
+ ...message,
+ workerId: this.getWorkerInfo(workerNodeKey).id as number
+ })
+ ++messagesCount
+ }
+ return messagesCount
+ }
+
/** @inheritDoc */
- public listTaskFunctions (): string[] {
+ public hasTaskFunction (name: string): boolean {
+ this.sendToWorkers({
+ taskFunctionOperation: 'has',
+ taskFunctionName: name
+ })
+ return true
+ }
+
+ /** @inheritDoc */
+ public addTaskFunction (name: string, taskFunction: TaskFunction): boolean {
+ this.sendToWorkers({
+ taskFunctionOperation: 'add',
+ taskFunctionName: name,
+ taskFunction: taskFunction.toString()
+ })
+ return true
+ }
+
+ /** @inheritDoc */
+ public removeTaskFunction (name: string): boolean {
+ this.sendToWorkers({
+ taskFunctionOperation: 'remove',
+ taskFunctionName: name
+ })
+ return true
+ }
+
+ /** @inheritDoc */
+ public listTaskFunctionNames (): string[] {
for (const workerNode of this.workerNodes) {
if (
- Array.isArray(workerNode.info.taskFunctions) &&
- workerNode.info.taskFunctions.length > 0
+ Array.isArray(workerNode.info.taskFunctionNames) &&
+ workerNode.info.taskFunctionNames.length > 0
) {
- return workerNode.info.taskFunctions
+ return workerNode.info.taskFunctionNames
}
}
return []
}
+ /** @inheritDoc */
+ public setDefaultTaskFunction (name: string): boolean {
+ this.sendToWorkers({
+ taskFunctionOperation: 'default',
+ taskFunctionName: name
+ })
+ return true
+ }
+
private shallExecuteTask (workerNodeKey: number): boolean {
return (
this.tasksQueueSize(workerNodeKey) === 0 &&
const workerInfo = this.getWorkerInfo(workerNodeKey)
return (
workerInfo != null &&
- Array.isArray(workerInfo.taskFunctions) &&
- workerInfo.taskFunctions.length > 2
+ Array.isArray(workerInfo.taskFunctionNames) &&
+ workerInfo.taskFunctionNames.length > 2
)
}
) {
--workerTaskStatistics.executing
}
- if (message.taskError == null) {
+ if (message.workerError == null) {
++workerTaskStatistics.executed
} else {
++workerTaskStatistics.failed
workerUsage: WorkerUsage,
message: MessageValue<Response>
): void {
- if (message.taskError != null) {
+ if (message.workerError != null) {
return
}
updateMeasurementStatistics(
workerUsage: WorkerUsage,
message: MessageValue<Response>
): void {
- if (message.taskError != null) {
+ if (message.workerError != null) {
return
}
const eluTaskStatisticsRequirements: MeasurementStatisticsRequirements =
protected workerListener (): (message: MessageValue<Response>) => void {
return message => {
this.checkMessageWorkerId(message)
- if (message.ready != null && message.taskFunctions != null) {
+ if (message.ready != null && message.taskFunctionNames != null) {
// Worker ready response received from worker
this.handleWorkerReadyResponse(message)
} else if (message.taskId != null) {
// Task execution response received from worker
this.handleTaskExecutionResponse(message)
- } else if (message.taskFunctions != null) {
- // Task functions message received from worker
+ } else if (message.taskFunctionNames != null) {
+ // Task function names message received from worker
this.getWorkerInfo(
this.getWorkerNodeKeyByWorkerId(message.workerId)
- ).taskFunctions = message.taskFunctions
+ ).taskFunctionNames = message.taskFunctionNames
+ } else if (message.taskFunctionOperation != null) {
+ // Task function operation response received from worker
}
}
}
this.getWorkerNodeKeyByWorkerId(message.workerId)
)
workerInfo.ready = message.ready as boolean
- workerInfo.taskFunctions = message.taskFunctions
+ workerInfo.taskFunctionNames = message.taskFunctionNames
if (this.emitter != null && this.ready) {
this.emitter.emit(PoolEvents.ready, this.info)
}
}
private handleTaskExecutionResponse (message: MessageValue<Response>): void {
- const { taskId, taskError, data } = message
+ const { taskId, workerError, data } = message
const promiseResponse = this.promiseResponseMap.get(taskId as string)
if (promiseResponse != null) {
- if (taskError != null) {
- this.emitter?.emit(PoolEvents.taskError, taskError)
- promiseResponse.reject(taskError.message)
+ if (workerError != null) {
+ this.emitter?.emit(PoolEvents.taskError, workerError)
+ promiseResponse.reject(workerError.message)
} else {
promiseResponse.resolve(data as Response)
}
import { EventEmitter } from 'node:events'
import { type TransferListItem } from 'node:worker_threads'
+import type { TaskFunction } from '../worker/task-functions'
import type {
ErrorHandler,
ExitHandler,
* Terminates all workers in this pool.
*/
readonly destroy: () => Promise<void>
+ /**
+ * Whether the specified task function exists in this pool.
+ *
+ * @param name - The name of the task function.
+ * @returns `true` if the task function exists, `false` otherwise.
+ */
+ readonly hasTaskFunction: (name: string) => boolean
+ /**
+ * Adds a task function to this pool.
+ * If a task function with the same name already exists, it will be overwritten.
+ *
+ * @param name - The name of the task function.
+ * @param taskFunction - The task function.
+ * @returns `true` if the task function was added, `false` otherwise.
+ */
+ readonly addTaskFunction: (
+ name: string,
+ taskFunction: TaskFunction
+ ) => boolean
+ /**
+ * Removes a task function from this pool.
+ *
+ * @param name - The name of the task function.
+ * @returns `true` if the task function was removed, `false` otherwise.
+ */
+ readonly removeTaskFunction: (name: string) => boolean
/**
* Lists the names of task function available in this pool.
*
* @returns The names of task function available in this pool.
*/
- readonly listTaskFunctions: () => string[]
+ readonly listTaskFunctionNames: () => string[]
+ /**
+ * Sets the default task function in this pool.
+ *
+ * @param name - The name of the task function.
+ * @returns `true` if the default task function was set, `false` otherwise.
+ */
+ readonly setDefaultTaskFunction: (name: string) => boolean
/**
* Sets the worker choice strategy in this pool.
*
/** @inheritdoc */
public getTaskFunctionWorkerUsage (name: string): WorkerUsage | undefined {
- if (!Array.isArray(this.info.taskFunctions)) {
+ if (!Array.isArray(this.info.taskFunctionNames)) {
throw new Error(
`Cannot get task function worker usage for task function name '${name}' when task function names list is not yet defined`
)
}
if (
- Array.isArray(this.info.taskFunctions) &&
- this.info.taskFunctions.length < 3
+ Array.isArray(this.info.taskFunctionNames) &&
+ this.info.taskFunctionNames.length < 3
) {
throw new Error(
`Cannot get task function worker usage for task function name '${name}' when task function names list has less than 3 elements`
)
}
if (name === DEFAULT_TASK_NAME) {
- name = this.info.taskFunctions[1]
+ name = this.info.taskFunctionNames[1]
}
if (!this.taskFunctionsUsage.has(name)) {
this.taskFunctionsUsage.set(name, this.initTaskFunctionWorkerUsage(name))
for (const task of this.tasksQueue) {
if (
(task.name === DEFAULT_TASK_NAME &&
- name === (this.info.taskFunctions as string[])[1]) ||
+ name === (this.info.taskFunctionNames as string[])[1]) ||
(task.name !== DEFAULT_TASK_NAME && name === task.name)
) {
++taskFunctionQueueSize
/**
* Task function names.
*/
- taskFunctions?: string[]
+ taskFunctionNames?: string[]
}
/**
import type { KillBehavior } from './worker/worker-options'
/**
- * Task error.
+ * Worker error.
*
* @typeParam Data - Type of data sent to the worker triggering an error. This can only be structured-cloneable data.
*/
-export interface TaskError<Data = unknown> {
+export interface WorkerError<Data = unknown> {
/**
- * Task name triggering the error.
+ * Task function name triggering the error.
*/
readonly name: string
/**
*/
readonly kill?: KillBehavior | true | 'success' | 'failure'
/**
- * Task error.
+ * Worker error.
*/
- readonly taskError?: TaskError<ErrorData>
+ readonly workerError?: WorkerError<ErrorData>
/**
* Task performance.
*/
readonly taskPerformance?: TaskPerformance
+ /**
+ * Task function operation:
+ * - `'has'` - Check if a task function exists.
+ * - `'add'` - Add a task function.
+ * - `'delete'` - Delete a task function.
+ * - `'default'` - Set a task function as default.
+ */
+ readonly taskFunctionOperation?: 'has' | 'add' | 'remove' | 'default'
+ readonly taskFunctionOperationStatus?: boolean
+ /**
+ * Task function serialized to string.
+ */
+ readonly taskFunction?: string
+ /**
+ * Task function name.
+ */
+ readonly taskFunctionName?: string
/**
* Task function names.
*/
- readonly taskFunctions?: string[]
+ readonly taskFunctionNames?: string[]
/**
* Whether the worker computes the given statistics or not.
*/
TaskSyncFunction
} from './task-functions'
+interface TaskFunctionOperationReturnType {
+ status: boolean
+ error?: Error
+}
+
const DEFAULT_MAX_INACTIVE_TIME = 60000
const DEFAULT_WORKER_OPTIONS: WorkerOptions = {
/**
*
* @param name - The name of the task function to check.
* @returns Whether the worker has a task function with the given name or not.
- * @throws {@link https://nodejs.org/api/errors.html#class-typeerror} If the `name` parameter is not a string or an empty string.
*/
- public hasTaskFunction (name: string): boolean {
- this.checkTaskFunctionName(name)
- return this.taskFunctions.has(name)
+ public hasTaskFunction (name: string): TaskFunctionOperationReturnType {
+ try {
+ this.checkTaskFunctionName(name)
+ } catch (error) {
+ return { status: false, error: error as Error }
+ }
+ return { status: this.taskFunctions.has(name) }
}
/**
* @param name - The name of the task function to add.
* @param fn - The task function to add.
* @returns Whether the task function was added or not.
- * @throws {@link https://nodejs.org/api/errors.html#class-typeerror} If the `name` parameter is not a string or an empty string.
- * @throws {@link https://nodejs.org/api/errors.html#class-error} If the `name` parameter is the default task function reserved name.
- * @throws {@link https://nodejs.org/api/errors.html#class-typeerror} If the `fn` parameter is not a function.
*/
public addTaskFunction (
name: string,
fn: TaskFunction<Data, Response>
- ): boolean {
- this.checkTaskFunctionName(name)
- if (name === DEFAULT_TASK_NAME) {
- throw new Error(
- 'Cannot add a task function with the default reserved name'
- )
- }
- if (typeof fn !== 'function') {
- throw new TypeError('fn parameter is not a function')
- }
+ ): TaskFunctionOperationReturnType {
try {
+ this.checkTaskFunctionName(name)
+ if (name === DEFAULT_TASK_NAME) {
+ throw new Error(
+ 'Cannot add a task function with the default reserved name'
+ )
+ }
+ if (typeof fn !== 'function') {
+ throw new TypeError('fn parameter is not a function')
+ }
const boundFn = fn.bind(this)
if (
this.taskFunctions.get(name) ===
}
this.taskFunctions.set(name, boundFn)
this.sendTaskFunctionsListToMainWorker()
- return true
- } catch {
- return false
+ return { status: true }
+ } catch (error) {
+ return { status: false, error: error as Error }
}
}
*
* @param name - The name of the task function to remove.
* @returns Whether the task function existed and was removed or not.
- * @throws {@link https://nodejs.org/api/errors.html#class-typeerror} If the `name` parameter is not a string or an empty string.
- * @throws {@link https://nodejs.org/api/errors.html#class-error} If the `name` parameter is the default task function reserved name.
- * @throws {@link https://nodejs.org/api/errors.html#class-error} If the `name` parameter is the task function used as default task function.
*/
- public removeTaskFunction (name: string): boolean {
- this.checkTaskFunctionName(name)
- if (name === DEFAULT_TASK_NAME) {
- throw new Error(
- 'Cannot remove the task function with the default reserved name'
- )
- }
- if (
- this.taskFunctions.get(name) === this.taskFunctions.get(DEFAULT_TASK_NAME)
- ) {
- throw new Error(
- 'Cannot remove the task function used as the default task function'
- )
+ public removeTaskFunction (name: string): TaskFunctionOperationReturnType {
+ try {
+ this.checkTaskFunctionName(name)
+ if (name === DEFAULT_TASK_NAME) {
+ throw new Error(
+ 'Cannot remove the task function with the default reserved name'
+ )
+ }
+ if (
+ this.taskFunctions.get(name) ===
+ this.taskFunctions.get(DEFAULT_TASK_NAME)
+ ) {
+ throw new Error(
+ 'Cannot remove the task function used as the default task function'
+ )
+ }
+ const deleteStatus = this.taskFunctions.delete(name)
+ this.sendTaskFunctionsListToMainWorker()
+ return { status: deleteStatus }
+ } catch (error) {
+ return { status: false, error: error as Error }
}
- const deleteStatus = this.taskFunctions.delete(name)
- this.sendTaskFunctionsListToMainWorker()
- return deleteStatus
}
/**
*
* @returns The names of the worker's task functions.
*/
- public listTaskFunctions (): string[] {
+ public listTaskFunctionNames (): string[] {
const names: string[] = [...this.taskFunctions.keys()]
let defaultTaskFunctionName: string = DEFAULT_TASK_NAME
for (const [name, fn] of this.taskFunctions) {
*
* @param name - The name of the task function to use as default task function.
* @returns Whether the default task function was set or not.
- * @throws {@link https://nodejs.org/api/errors.html#class-typeerror} If the `name` parameter is not a string or an empty string.
- * @throws {@link https://nodejs.org/api/errors.html#class-error} If the `name` parameter is the default task function reserved name.
- * @throws {@link https://nodejs.org/api/errors.html#class-error} If the `name` parameter is a non-existing task function.
*/
- public setDefaultTaskFunction (name: string): boolean {
- this.checkTaskFunctionName(name)
- if (name === DEFAULT_TASK_NAME) {
- throw new Error(
- 'Cannot set the default task function reserved name as the default task function'
- )
- }
- if (!this.taskFunctions.has(name)) {
- throw new Error(
- 'Cannot set the default task function to a non-existing task function'
- )
- }
+ public setDefaultTaskFunction (name: string): TaskFunctionOperationReturnType {
try {
+ this.checkTaskFunctionName(name)
+ if (name === DEFAULT_TASK_NAME) {
+ throw new Error(
+ 'Cannot set the default task function reserved name as the default task function'
+ )
+ }
+ if (!this.taskFunctions.has(name)) {
+ throw new Error(
+ 'Cannot set the default task function to a non-existing task function'
+ )
+ }
this.taskFunctions.set(
DEFAULT_TASK_NAME,
this.taskFunctions.get(name) as TaskFunction<Data, Response>
)
- return true
- } catch {
- return false
+ return { status: true }
+ } catch (error) {
+ return { status: false, error: error as Error }
}
}
} else if (message.checkActive != null) {
// Check active message received
message.checkActive ? this.startCheckActive() : this.stopCheckActive()
+ } else if (message.taskFunctionOperation != null) {
+ // Task function operation message received
+ this.handleTaskFunctionOperationMessage(message)
} else if (message.taskId != null && message.data != null) {
// Task message received
this.run(message)
}
}
+ protected handleTaskFunctionOperationMessage (
+ message: MessageValue<Data>
+ ): void {
+ const { taskFunctionOperation, taskFunction, taskFunctionName } = message
+ let response!: TaskFunctionOperationReturnType
+ if (taskFunctionOperation === 'has') {
+ response = this.hasTaskFunction(taskFunctionName as string)
+ } else if (taskFunctionOperation === 'add') {
+ response = this.addTaskFunction(
+ taskFunctionName as string,
+ // eslint-disable-next-line @typescript-eslint/no-implied-eval, no-new-func
+ new Function(`return ${taskFunction as string}`)() as TaskFunction<
+ Data,
+ Response
+ >
+ )
+ } else if (taskFunctionOperation === 'remove') {
+ response = this.removeTaskFunction(taskFunctionName as string)
+ } else if (taskFunctionOperation === 'default') {
+ response = this.setDefaultTaskFunction(taskFunctionName as string)
+ }
+ this.sendToMainWorker({
+ taskFunctionOperation,
+ taskFunctionOperationStatus: response.status,
+ workerError: {
+ name: taskFunctionName as string,
+ message: this.handleError(response.error as Error | string)
+ },
+ workerId: this.id
+ })
+ }
+
/**
* Handles a kill message sent by the main worker.
*
*/
protected sendTaskFunctionsListToMainWorker (): void {
this.sendToMainWorker({
- taskFunctions: this.listTaskFunctions(),
+ taskFunctionNames: this.listTaskFunctionNames(),
workerId: this.id
})
}
/**
* Handles an error and convert it to a string so it can be sent back to the main worker.
*
- * @param e - The error raised by the worker.
+ * @param error - The error raised by the worker.
* @returns The error message.
*/
- protected handleError (e: Error | string): string {
- return e instanceof Error ? e.message : e
+ protected handleError (error: Error | string): string {
+ return error instanceof Error ? error.message : error
}
/**
const fn = this.taskFunctions.get(name ?? DEFAULT_TASK_NAME)
if (fn == null) {
this.sendToMainWorker({
- taskError: {
+ workerError: {
name: name as string,
message: `Task function '${name as string}' not found`,
data
workerId: this.id,
taskId
})
- } catch (e) {
- const errorMessage = this.handleError(e as Error | string)
+ } catch (error) {
this.sendToMainWorker({
- taskError: {
+ workerError: {
name: name as string,
- message: errorMessage,
+ message: this.handleError(error as Error | string),
data
},
workerId: this.id,
})
return null
})
- .catch(e => {
- const errorMessage = this.handleError(e as Error | string)
+ .catch(error => {
this.sendToMainWorker({
- taskError: {
+ workerError: {
name: name as string,
- message: errorMessage,
+ message: this.handleError(error as Error | string),
data
},
workerId: this.id,
this.getMainWorker().on('message', this.messageListener.bind(this))
this.sendToMainWorker({
ready: true,
- taskFunctions: this.listTaskFunctions(),
+ taskFunctionNames: this.listTaskFunctionNames(),
workerId: this.id
})
} catch {
this.sendToMainWorker({
ready: false,
- taskFunctions: this.listTaskFunctions(),
+ taskFunctionNames: this.listTaskFunctionNames(),
workerId: this.id
})
}
this.port.on('message', this.messageListener.bind(this))
this.sendToMainWorker({
ready: true,
- taskFunctions: this.listTaskFunctions(),
+ taskFunctionNames: this.listTaskFunctionNames(),
workerId: this.id
})
} catch {
this.sendToMainWorker({
ready: false,
- taskFunctions: this.listTaskFunctions(),
+ taskFunctionNames: this.listTaskFunctionNames(),
workerId: this.id
})
}