### Changed
- Disable publication on GitHub packages registry on release until authentication issue is fixed.
+- Rename `listTaskFunctions()` to `listTaskFunctionNames()` in pool and worker API.
### Added
+- Add `addTaskFunction()`, `removeTaskFunction()`, `setDefaultTaskFunction()` methods to pool API.
- Add `startWorkers` to pool options to whether start the minimum number of workers at pool creation or not.
- Add `taskStealing` and `tasksStealingOnPressure` to tasks queue options to whether enable task stealing or not and whether enable tasks stealing on back pressure or not.
- Continuous internal benchmarking: https://poolifier.github.io/benchmark-results/dev/bench.
- [`pool.execute(data, name, transferList)`](#poolexecutedata-name-transferlist)
- [`pool.start()`](#poolstart)
- [`pool.destroy()`](#pooldestroy)
- - [`pool.listTaskFunctions()`](#poollisttaskfunctions)
+ - [`pool.listTaskFunctionNames()`](#poollisttaskfunctionnames)
- [`PoolOptions`](#pooloptions)
- [`ThreadPoolOptions extends PoolOptions`](#threadpooloptions-extends-pooloptions)
- [`ClusterPoolOptions extends PoolOptions`](#clusterpooloptions-extends-pooloptions)
- [`YourWorker.hasTaskFunction(name)`](#yourworkerhastaskfunctionname)
- [`YourWorker.addTaskFunction(name, fn)`](#yourworkeraddtaskfunctionname-fn)
- [`YourWorker.removeTaskFunction(name)`](#yourworkerremovetaskfunctionname)
- - [`YourWorker.listTaskFunctions()`](#yourworkerlisttaskfunctions)
+ - [`YourWorker.listTaskFunctionNames()`](#yourworkerlisttaskfunctionnames)
- [`YourWorker.setDefaultTaskFunction(name)`](#yourworkersetdefaulttaskfunctionname)
## Pool
This method is available on both pool implementations and will call the terminate method on each worker.
-### `pool.listTaskFunctions()`
+### `pool.listTaskFunctionNames()`
This method is available on both pool implementations and returns an array of the task function names.
This method is available on both worker implementations and returns a boolean.
-#### `YourWorker.listTaskFunctions()`
+#### `YourWorker.listTaskFunctionNames()`
This method is available on both worker implementations and returns an array of the task function names.
export type {
TaskAsyncFunction,
TaskFunction,
+ TaskFunctionOperationReturnType,
TaskFunctions,
TaskSyncFunction
} from './worker/task-functions'
MessageValue,
PromiseResponseWrapper,
Task,
- TaskError,
+ WorkerError,
TaskPerformance,
WorkerStatistics,
Writable
updateMeasurementStatistics
} from '../utils'
import { KillBehaviors } from '../worker/worker-options'
+import type { TaskFunction } from '../worker/task-functions'
import {
type IPool,
PoolEmitter,
*/
protected readonly max?: number
+ /**
+ * The task functions added at runtime map:
+ * - `key`: The task function name.
+ * - `value`: The task function itself.
+ */
+ private readonly taskFunctions: Map<string, TaskFunction<Data, Response>>
+
/**
* Whether the pool is started or not.
*/
this.setupHook()
+ this.taskFunctions = new Map<string, TaskFunction<Data, Response>>()
+
this.started = false
this.starting = false
if (this.opts.startWorkers === true) {
* @param workerId - The worker id.
* @returns The worker node key if the worker id is found in the pool worker nodes, `-1` otherwise.
*/
- private getWorkerNodeKeyByWorkerId (workerId: number): number {
+ private getWorkerNodeKeyByWorkerId (workerId: number | undefined): number {
return this.workerNodes.findIndex(
workerNode => workerNode.info.id === workerId
)
)
}
+ private async sendTaskFunctionOperationToWorker (
+ workerNodeKey: number,
+ message: MessageValue<Data>
+ ): Promise<boolean> {
+ const workerId = this.getWorkerInfo(workerNodeKey).id as number
+ return await new Promise<boolean>((resolve, reject) => {
+ this.registerWorkerMessageListener(workerNodeKey, message => {
+ if (
+ message.workerId === workerId &&
+ message.taskFunctionOperationStatus === true
+ ) {
+ resolve(true)
+ } else if (
+ message.workerId === workerId &&
+ message.taskFunctionOperationStatus === false
+ ) {
+ reject(
+ new Error(
+ `Task function operation ${
+ message.taskFunctionOperation as string
+ } failed on worker ${message.workerId}`
+ )
+ )
+ }
+ })
+ this.sendToWorker(workerNodeKey, message)
+ })
+ }
+
+ private async sendTaskFunctionOperationToWorkers (
+ message: Omit<MessageValue<Data>, 'workerId'>
+ ): Promise<boolean> {
+ return await new Promise<boolean>((resolve, reject) => {
+ const responsesReceived = new Array<MessageValue<Data | Response>>()
+ for (const [workerNodeKey] of this.workerNodes.entries()) {
+ this.registerWorkerMessageListener(workerNodeKey, message => {
+ if (message.taskFunctionOperationStatus != null) {
+ responsesReceived.push(message)
+ if (
+ responsesReceived.length === this.workerNodes.length &&
+ responsesReceived.every(
+ message => message.taskFunctionOperationStatus === true
+ )
+ ) {
+ resolve(true)
+ } else if (
+ responsesReceived.length === this.workerNodes.length &&
+ responsesReceived.some(
+ message => message.taskFunctionOperationStatus === false
+ )
+ ) {
+ reject(
+ new Error(
+ `Task function operation ${
+ message.taskFunctionOperation as string
+ } failed on worker ${message.workerId as number}`
+ )
+ )
+ }
+ }
+ })
+ this.sendToWorker(workerNodeKey, message)
+ }
+ })
+ }
+
+ /** @inheritDoc */
+ public hasTaskFunction (name: string): boolean {
+ for (const workerNode of this.workerNodes) {
+ if (
+ Array.isArray(workerNode.info.taskFunctionNames) &&
+ workerNode.info.taskFunctionNames.includes(name)
+ ) {
+ return true
+ }
+ }
+ return false
+ }
+
+ /** @inheritDoc */
+ public async addTaskFunction (
+ name: string,
+ taskFunction: TaskFunction<Data, Response>
+ ): Promise<boolean> {
+ this.taskFunctions.set(name, taskFunction)
+ return await this.sendTaskFunctionOperationToWorkers({
+ taskFunctionOperation: 'add',
+ taskFunctionName: name,
+ taskFunction: taskFunction.toString()
+ })
+ }
+
+ /** @inheritDoc */
+ public async removeTaskFunction (name: string): Promise<boolean> {
+ this.taskFunctions.delete(name)
+ return await this.sendTaskFunctionOperationToWorkers({
+ taskFunctionOperation: 'remove',
+ taskFunctionName: name
+ })
+ }
+
/** @inheritDoc */
- public listTaskFunctions (): string[] {
+ public listTaskFunctionNames (): string[] {
for (const workerNode of this.workerNodes) {
if (
- Array.isArray(workerNode.info.taskFunctions) &&
- workerNode.info.taskFunctions.length > 0
+ Array.isArray(workerNode.info.taskFunctionNames) &&
+ workerNode.info.taskFunctionNames.length > 0
) {
- return workerNode.info.taskFunctions
+ return workerNode.info.taskFunctionNames
}
}
return []
}
+ /** @inheritDoc */
+ public async setDefaultTaskFunction (name: string): Promise<boolean> {
+ return await this.sendTaskFunctionOperationToWorkers({
+ taskFunctionOperation: 'default',
+ taskFunctionName: name
+ })
+ }
+
private shallExecuteTask (workerNodeKey: number): boolean {
return (
this.tasksQueueSize(workerNodeKey) === 0 &&
data: data ?? ({} as Data),
transferList,
timestamp,
- workerId: this.getWorkerInfo(workerNodeKey).id as number,
taskId: randomUUID()
}
this.promiseResponseMap.set(task.taskId as string, {
}
protected async sendKillMessageToWorker (
- workerNodeKey: number,
- workerId: number
+ workerNodeKey: number
): Promise<void> {
await new Promise<void>((resolve, reject) => {
this.registerWorkerMessageListener(workerNodeKey, message => {
if (message.kill === 'success') {
resolve()
} else if (message.kill === 'failure') {
- reject(new Error(`Worker ${workerId} kill message handling failed`))
+ reject(
+ new Error(
+ `Worker ${
+ message.workerId as number
+ } kill message handling failed`
+ )
+ )
}
})
- this.sendToWorker(workerNodeKey, { kill: true, workerId })
+ this.sendToWorker(workerNodeKey, { kill: true })
})
}
const workerInfo = this.getWorkerInfo(workerNodeKey)
return (
workerInfo != null &&
- Array.isArray(workerInfo.taskFunctions) &&
- workerInfo.taskFunctions.length > 2
+ Array.isArray(workerInfo.taskFunctionNames) &&
+ workerInfo.taskFunctionNames.length > 2
)
}
) {
--workerTaskStatistics.executing
}
- if (message.taskError == null) {
+ if (message.workerError == null) {
++workerTaskStatistics.executed
} else {
++workerTaskStatistics.failed
workerUsage: WorkerUsage,
message: MessageValue<Response>
): void {
- if (message.taskError != null) {
+ if (message.workerError != null) {
return
}
updateMeasurementStatistics(
workerUsage: WorkerUsage,
message: MessageValue<Response>
): void {
- if (message.taskError != null) {
+ if (message.workerError != null) {
return
}
const eluTaskStatisticsRequirements: MeasurementStatisticsRequirements =
})
const workerInfo = this.getWorkerInfo(workerNodeKey)
this.sendToWorker(workerNodeKey, {
- checkActive: true,
- workerId: workerInfo.id as number
+ checkActive: true
})
+ if (this.taskFunctions.size > 0) {
+ for (const [taskFunctionName, taskFunction] of this.taskFunctions) {
+ this.sendTaskFunctionOperationToWorker(workerNodeKey, {
+ taskFunctionOperation: 'add',
+ taskFunctionName,
+ taskFunction: taskFunction.toString()
+ }).catch(error => {
+ this.emitter?.emit(PoolEvents.error, error)
+ })
+ }
+ }
workerInfo.dynamic = true
if (
this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerReady ||
.runTime.aggregate,
elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
.elu.aggregate
- },
- workerId: this.getWorkerInfo(workerNodeKey).id as number
+ }
})
}
},
0
)
- const destinationWorkerNode = this.workerNodes[destinationWorkerNodeKey]
- const task = {
- ...(this.dequeueTask(workerNodeKey) as Task<Data>),
- workerId: destinationWorkerNode.info.id as number
- }
+ const task = this.dequeueTask(workerNodeKey) as Task<Data>
if (this.shallExecuteTask(destinationWorkerNodeKey)) {
this.executeTask(destinationWorkerNodeKey, task)
} else {
private taskStealingOnEmptyQueue (workerId: number): void {
const destinationWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(workerId)
- const destinationWorkerNode = this.workerNodes[destinationWorkerNodeKey]
const workerNodes = this.workerNodes
.slice()
.sort(
workerNode.usage.tasks.queued > 0
)
if (sourceWorkerNode != null) {
- const task = {
- ...(sourceWorkerNode.popTask() as Task<Data>),
- workerId: destinationWorkerNode.info.id as number
- }
+ const task = sourceWorkerNode.popTask() as Task<Data>
if (this.shallExecuteTask(destinationWorkerNodeKey)) {
this.executeTask(destinationWorkerNodeKey, task)
} else {
workerNode.usage.tasks.queued <
(this.opts.tasksQueueOptions?.size as number) - sizeOffset
) {
- const task = {
- ...(sourceWorkerNode.popTask() as Task<Data>),
- workerId: workerNode.info.id as number
- }
+ const task = sourceWorkerNode.popTask() as Task<Data>
if (this.shallExecuteTask(workerNodeKey)) {
this.executeTask(workerNodeKey, task)
} else {
protected workerListener (): (message: MessageValue<Response>) => void {
return message => {
this.checkMessageWorkerId(message)
- if (message.ready != null && message.taskFunctions != null) {
+ if (message.ready != null && message.taskFunctionNames != null) {
// Worker ready response received from worker
this.handleWorkerReadyResponse(message)
} else if (message.taskId != null) {
// Task execution response received from worker
this.handleTaskExecutionResponse(message)
- } else if (message.taskFunctions != null) {
- // Task functions message received from worker
+ } else if (message.taskFunctionNames != null) {
+ // Task function names message received from worker
this.getWorkerInfo(
this.getWorkerNodeKeyByWorkerId(message.workerId)
- ).taskFunctions = message.taskFunctions
+ ).taskFunctionNames = message.taskFunctionNames
}
}
}
private handleWorkerReadyResponse (message: MessageValue<Response>): void {
if (message.ready === false) {
- throw new Error(`Worker ${message.workerId} failed to initialize`)
+ throw new Error(
+ `Worker ${message.workerId as number} failed to initialize`
+ )
}
const workerInfo = this.getWorkerInfo(
this.getWorkerNodeKeyByWorkerId(message.workerId)
)
workerInfo.ready = message.ready as boolean
- workerInfo.taskFunctions = message.taskFunctions
+ workerInfo.taskFunctionNames = message.taskFunctionNames
if (this.emitter != null && this.ready) {
this.emitter.emit(PoolEvents.ready, this.info)
}
}
private handleTaskExecutionResponse (message: MessageValue<Response>): void {
- const { taskId, taskError, data } = message
+ const { taskId, workerError, data } = message
const promiseResponse = this.promiseResponseMap.get(taskId as string)
if (promiseResponse != null) {
- if (taskError != null) {
- this.emitter?.emit(PoolEvents.taskError, taskError)
- promiseResponse.reject(taskError.message)
+ if (workerError != null) {
+ this.emitter?.emit(PoolEvents.taskError, workerError)
+ promiseResponse.reject(workerError.message)
} else {
promiseResponse.resolve(data as Response)
}
worker.on('disconnect', () => {
worker.kill()
})
- await this.sendKillMessageToWorker(
- workerNodeKey,
- workerNode.info.id as number
- )
+ await this.sendKillMessageToWorker(workerNodeKey)
worker.disconnect()
await waitWorkerExit
}
workerNodeKey: number,
message: MessageValue<Data>
): void {
- this.workerNodes[workerNodeKey].worker.send(message)
+ this.workerNodes[workerNodeKey].worker.send({
+ ...message,
+ workerId: this.workerNodes[workerNodeKey].info.id as number
+ })
}
/** @inheritDoc */
protected sendStartupMessageToWorker (workerNodeKey: number): void {
this.sendToWorker(workerNodeKey, {
- ready: false,
- workerId: this.workerNodes[workerNodeKey].info.id as number
+ ready: false
})
}
import { EventEmitter } from 'node:events'
import { type TransferListItem } from 'node:worker_threads'
+import type { TaskFunction } from '../worker/task-functions'
import type {
ErrorHandler,
ExitHandler,
* Terminates all workers in this pool.
*/
readonly destroy: () => Promise<void>
+ /**
+ * Whether the specified task function exists in this pool.
+ *
+ * @param name - The name of the task function.
+ * @returns `true` if the task function exists, `false` otherwise.
+ */
+ readonly hasTaskFunction: (name: string) => boolean
+ /**
+ * Adds a task function to this pool.
+ * If a task function with the same name already exists, it will be overwritten.
+ *
+ * @param name - The name of the task function.
+ * @param taskFunction - The task function.
+ * @returns `true` if the task function was added, `false` otherwise.
+ */
+ readonly addTaskFunction: (
+ name: string,
+ taskFunction: TaskFunction<Data, Response>
+ ) => Promise<boolean>
+ /**
+ * Removes a task function from this pool.
+ *
+ * @param name - The name of the task function.
+ * @returns `true` if the task function was removed, `false` otherwise.
+ */
+ readonly removeTaskFunction: (name: string) => Promise<boolean>
/**
* Lists the names of task function available in this pool.
*
* @returns The names of task function available in this pool.
*/
- readonly listTaskFunctions: () => string[]
+ readonly listTaskFunctionNames: () => string[]
+ /**
+ * Sets the default task function in this pool.
+ *
+ * @param name - The name of the task function.
+ * @returns `true` if the default task function was set, `false` otherwise.
+ */
+ readonly setDefaultTaskFunction: (name: string) => Promise<boolean>
/**
* Sets the worker choice strategy in this pool.
*
resolve()
})
})
- await this.sendKillMessageToWorker(
- workerNodeKey,
- workerNode.info.id as number
- )
+ await this.sendKillMessageToWorker(workerNodeKey)
workerNode.closeChannel()
await worker.terminate()
await waitWorkerExit
): void {
(
this.workerNodes[workerNodeKey].messageChannel as MessageChannel
- ).port1.postMessage(message, transferList)
+ ).port1.postMessage(
+ { ...message, workerId: this.workerNodes[workerNodeKey].info.id },
+ transferList
+ )
}
/** @inheritDoc */
protected sendStartupMessageToWorker (workerNodeKey: number): void {
const workerNode = this.workerNodes[workerNodeKey]
- const worker = workerNode.worker
const port2: MessagePort = (workerNode.messageChannel as MessageChannel)
.port2
- worker.postMessage(
+ workerNode.worker.postMessage(
{
ready: false,
workerId: workerNode.info.id,
/** @inheritdoc */
public getTaskFunctionWorkerUsage (name: string): WorkerUsage | undefined {
- if (!Array.isArray(this.info.taskFunctions)) {
+ if (!Array.isArray(this.info.taskFunctionNames)) {
throw new Error(
`Cannot get task function worker usage for task function name '${name}' when task function names list is not yet defined`
)
}
if (
- Array.isArray(this.info.taskFunctions) &&
- this.info.taskFunctions.length < 3
+ Array.isArray(this.info.taskFunctionNames) &&
+ this.info.taskFunctionNames.length < 3
) {
throw new Error(
`Cannot get task function worker usage for task function name '${name}' when task function names list has less than 3 elements`
)
}
if (name === DEFAULT_TASK_NAME) {
- name = this.info.taskFunctions[1]
+ name = this.info.taskFunctionNames[1]
}
if (!this.taskFunctionsUsage.has(name)) {
this.taskFunctionsUsage.set(name, this.initTaskFunctionWorkerUsage(name))
for (const task of this.tasksQueue) {
if (
(task.name === DEFAULT_TASK_NAME &&
- name === (this.info.taskFunctions as string[])[1]) ||
+ name === (this.info.taskFunctionNames as string[])[1]) ||
(task.name !== DEFAULT_TASK_NAME && name === task.name)
) {
++taskFunctionQueueSize
/**
* Task function names.
*/
- taskFunctions?: string[]
+ taskFunctionNames?: string[]
}
/**
import type { KillBehavior } from './worker/worker-options'
/**
- * Task error.
+ * Worker error.
*
* @typeParam Data - Type of data sent to the worker triggering an error. This can only be structured-cloneable data.
*/
-export interface TaskError<Data = unknown> {
+export interface WorkerError<Data = unknown> {
/**
- * Task name triggering the error.
+ * Task function name triggering the error.
*/
readonly name: string
/**
/**
* Worker id.
*/
- readonly workerId: number
+ readonly workerId?: number
/**
* Task name.
*/
*/
readonly kill?: KillBehavior | true | 'success' | 'failure'
/**
- * Task error.
+ * Worker error.
*/
- readonly taskError?: TaskError<ErrorData>
+ readonly workerError?: WorkerError<ErrorData>
/**
* Task performance.
*/
readonly taskPerformance?: TaskPerformance
+ /**
+ * Task function operation:
+ * - `'add'` - Add a task function.
+ * - `'delete'` - Delete a task function.
+ * - `'default'` - Set a task function as default.
+ */
+ readonly taskFunctionOperation?: 'add' | 'remove' | 'default'
+ /**
+ * Whether the task function operation is successful or not.
+ */
+ readonly taskFunctionOperationStatus?: boolean
+ /**
+ * Task function serialized to string.
+ */
+ readonly taskFunction?: string
+ /**
+ * Task function name.
+ */
+ readonly taskFunctionName?: string
/**
* Task function names.
*/
- readonly taskFunctions?: string[]
+ readonly taskFunctionNames?: string[]
/**
* Whether the worker computes the given statistics or not.
*/
import type {
TaskAsyncFunction,
TaskFunction,
+ TaskFunctionOperationReturnType,
TaskFunctions,
TaskSyncFunction
} from './task-functions'
*
* @param name - The name of the task function to check.
* @returns Whether the worker has a task function with the given name or not.
- * @throws {@link https://nodejs.org/api/errors.html#class-typeerror} If the `name` parameter is not a string or an empty string.
*/
- public hasTaskFunction (name: string): boolean {
- this.checkTaskFunctionName(name)
- return this.taskFunctions.has(name)
+ public hasTaskFunction (name: string): TaskFunctionOperationReturnType {
+ try {
+ this.checkTaskFunctionName(name)
+ } catch (error) {
+ return { status: false, error: error as Error }
+ }
+ return { status: this.taskFunctions.has(name) }
}
/**
* @param name - The name of the task function to add.
* @param fn - The task function to add.
* @returns Whether the task function was added or not.
- * @throws {@link https://nodejs.org/api/errors.html#class-typeerror} If the `name` parameter is not a string or an empty string.
- * @throws {@link https://nodejs.org/api/errors.html#class-error} If the `name` parameter is the default task function reserved name.
- * @throws {@link https://nodejs.org/api/errors.html#class-typeerror} If the `fn` parameter is not a function.
*/
public addTaskFunction (
name: string,
fn: TaskFunction<Data, Response>
- ): boolean {
- this.checkTaskFunctionName(name)
- if (name === DEFAULT_TASK_NAME) {
- throw new Error(
- 'Cannot add a task function with the default reserved name'
- )
- }
- if (typeof fn !== 'function') {
- throw new TypeError('fn parameter is not a function')
- }
+ ): TaskFunctionOperationReturnType {
try {
+ this.checkTaskFunctionName(name)
+ if (name === DEFAULT_TASK_NAME) {
+ throw new Error(
+ 'Cannot add a task function with the default reserved name'
+ )
+ }
+ if (typeof fn !== 'function') {
+ throw new TypeError('fn parameter is not a function')
+ }
const boundFn = fn.bind(this)
if (
this.taskFunctions.get(name) ===
this.taskFunctions.set(DEFAULT_TASK_NAME, boundFn)
}
this.taskFunctions.set(name, boundFn)
- this.sendTaskFunctionsListToMainWorker()
- return true
- } catch {
- return false
+ this.sendTaskFunctionNamesToMainWorker()
+ return { status: true }
+ } catch (error) {
+ return { status: false, error: error as Error }
}
}
*
* @param name - The name of the task function to remove.
* @returns Whether the task function existed and was removed or not.
- * @throws {@link https://nodejs.org/api/errors.html#class-typeerror} If the `name` parameter is not a string or an empty string.
- * @throws {@link https://nodejs.org/api/errors.html#class-error} If the `name` parameter is the default task function reserved name.
- * @throws {@link https://nodejs.org/api/errors.html#class-error} If the `name` parameter is the task function used as default task function.
*/
- public removeTaskFunction (name: string): boolean {
- this.checkTaskFunctionName(name)
- if (name === DEFAULT_TASK_NAME) {
- throw new Error(
- 'Cannot remove the task function with the default reserved name'
- )
- }
- if (
- this.taskFunctions.get(name) === this.taskFunctions.get(DEFAULT_TASK_NAME)
- ) {
- throw new Error(
- 'Cannot remove the task function used as the default task function'
- )
+ public removeTaskFunction (name: string): TaskFunctionOperationReturnType {
+ try {
+ this.checkTaskFunctionName(name)
+ if (name === DEFAULT_TASK_NAME) {
+ throw new Error(
+ 'Cannot remove the task function with the default reserved name'
+ )
+ }
+ if (
+ this.taskFunctions.get(name) ===
+ this.taskFunctions.get(DEFAULT_TASK_NAME)
+ ) {
+ throw new Error(
+ 'Cannot remove the task function used as the default task function'
+ )
+ }
+ const deleteStatus = this.taskFunctions.delete(name)
+ this.sendTaskFunctionNamesToMainWorker()
+ return { status: deleteStatus }
+ } catch (error) {
+ return { status: false, error: error as Error }
}
- const deleteStatus = this.taskFunctions.delete(name)
- this.sendTaskFunctionsListToMainWorker()
- return deleteStatus
}
/**
*
* @returns The names of the worker's task functions.
*/
- public listTaskFunctions (): string[] {
+ public listTaskFunctionNames (): string[] {
const names: string[] = [...this.taskFunctions.keys()]
let defaultTaskFunctionName: string = DEFAULT_TASK_NAME
for (const [name, fn] of this.taskFunctions) {
*
* @param name - The name of the task function to use as default task function.
* @returns Whether the default task function was set or not.
- * @throws {@link https://nodejs.org/api/errors.html#class-typeerror} If the `name` parameter is not a string or an empty string.
- * @throws {@link https://nodejs.org/api/errors.html#class-error} If the `name` parameter is the default task function reserved name.
- * @throws {@link https://nodejs.org/api/errors.html#class-error} If the `name` parameter is a non-existing task function.
*/
- public setDefaultTaskFunction (name: string): boolean {
- this.checkTaskFunctionName(name)
- if (name === DEFAULT_TASK_NAME) {
- throw new Error(
- 'Cannot set the default task function reserved name as the default task function'
- )
- }
- if (!this.taskFunctions.has(name)) {
- throw new Error(
- 'Cannot set the default task function to a non-existing task function'
- )
- }
+ public setDefaultTaskFunction (name: string): TaskFunctionOperationReturnType {
try {
+ this.checkTaskFunctionName(name)
+ if (name === DEFAULT_TASK_NAME) {
+ throw new Error(
+ 'Cannot set the default task function reserved name as the default task function'
+ )
+ }
+ if (!this.taskFunctions.has(name)) {
+ throw new Error(
+ 'Cannot set the default task function to a non-existing task function'
+ )
+ }
this.taskFunctions.set(
DEFAULT_TASK_NAME,
this.taskFunctions.get(name) as TaskFunction<Data, Response>
)
- return true
- } catch {
- return false
+ return { status: true }
+ } catch (error) {
+ return { status: false, error: error as Error }
}
}
} else if (message.checkActive != null) {
// Check active message received
message.checkActive ? this.startCheckActive() : this.stopCheckActive()
+ } else if (message.taskFunctionOperation != null) {
+ // Task function operation message received
+ this.handleTaskFunctionOperationMessage(message)
} else if (message.taskId != null && message.data != null) {
// Task message received
this.run(message)
}
}
+ protected handleTaskFunctionOperationMessage (
+ message: MessageValue<Data>
+ ): void {
+ const { taskFunctionOperation, taskFunction, taskFunctionName } = message
+ let response!: TaskFunctionOperationReturnType
+ if (taskFunctionOperation === 'add') {
+ response = this.addTaskFunction(
+ taskFunctionName as string,
+ // eslint-disable-next-line @typescript-eslint/no-implied-eval, no-new-func
+ new Function(`return ${taskFunction as string}`)() as TaskFunction<
+ Data,
+ Response
+ >
+ )
+ } else if (taskFunctionOperation === 'remove') {
+ response = this.removeTaskFunction(taskFunctionName as string)
+ } else if (taskFunctionOperation === 'default') {
+ response = this.setDefaultTaskFunction(taskFunctionName as string)
+ }
+ this.sendToMainWorker({
+ taskFunctionOperation,
+ taskFunctionOperationStatus: response.status,
+ workerError: {
+ name: taskFunctionName as string,
+ message: this.handleError(response.error as Error | string)
+ }
+ })
+ }
+
/**
* Handles a kill message sent by the main worker.
*
if (isAsyncFunction(this.opts.killHandler)) {
(this.opts.killHandler?.() as Promise<void>)
.then(() => {
- this.sendToMainWorker({ kill: 'success', workerId: this.id })
+ this.sendToMainWorker({ kill: 'success' })
return null
})
.catch(() => {
- this.sendToMainWorker({ kill: 'failure', workerId: this.id })
+ this.sendToMainWorker({ kill: 'failure' })
})
.finally(() => {
this.emitDestroy()
try {
// eslint-disable-next-line @typescript-eslint/no-invalid-void-type
this.opts.killHandler?.() as void
- this.sendToMainWorker({ kill: 'success', workerId: this.id })
+ this.sendToMainWorker({ kill: 'success' })
} catch {
- this.sendToMainWorker({ kill: 'failure', workerId: this.id })
+ this.sendToMainWorker({ kill: 'failure' })
} finally {
this.emitDestroy()
}
performance.now() - this.lastTaskTimestamp >
(this.opts.maxInactiveTime ?? DEFAULT_MAX_INACTIVE_TIME)
) {
- this.sendToMainWorker({ kill: this.opts.killBehavior, workerId: this.id })
+ this.sendToMainWorker({ kill: this.opts.killBehavior })
}
}
): void
/**
- * Sends the list of task function names to the main worker.
+ * Sends task function names to the main worker.
*/
- protected sendTaskFunctionsListToMainWorker (): void {
+ protected sendTaskFunctionNamesToMainWorker (): void {
this.sendToMainWorker({
- taskFunctions: this.listTaskFunctions(),
- workerId: this.id
+ taskFunctionNames: this.listTaskFunctionNames()
})
}
const fn = this.taskFunctions.get(name ?? DEFAULT_TASK_NAME)
if (fn == null) {
this.sendToMainWorker({
- taskError: {
+ workerError: {
name: name as string,
message: `Task function '${name as string}' not found`,
data
},
- workerId: this.id,
taskId
})
return
this.sendToMainWorker({
data: res,
taskPerformance,
- workerId: this.id,
taskId
})
} catch (error) {
this.sendToMainWorker({
- taskError: {
+ workerError: {
name: name as string,
message: this.handleError(error as Error | string),
data
},
- workerId: this.id,
taskId
})
} finally {
this.sendToMainWorker({
data: res,
taskPerformance,
- workerId: this.id,
taskId
})
return null
})
.catch(error => {
this.sendToMainWorker({
- taskError: {
+ workerError: {
name: name as string,
message: this.handleError(error as Error | string),
data
},
- workerId: this.id,
taskId
})
})
this.getMainWorker().on('message', this.messageListener.bind(this))
this.sendToMainWorker({
ready: true,
- taskFunctions: this.listTaskFunctions(),
- workerId: this.id
+ taskFunctionNames: this.listTaskFunctionNames()
})
} catch {
this.sendToMainWorker({
ready: false,
- taskFunctions: this.listTaskFunctions(),
- workerId: this.id
+ taskFunctionNames: this.listTaskFunctionNames()
})
}
}
/** @inheritDoc */
protected sendToMainWorker (message: MessageValue<Response>): void {
- this.getMainWorker().send(message)
+ this.getMainWorker().send({ ...message, workerId: this.id })
}
}
string,
TaskFunction<Data, Response>
>
+
+/**
+ * Task function operation return type.
+ */
+export interface TaskFunctionOperationReturnType {
+ status: boolean
+ error?: Error
+}
this.port.on('message', this.messageListener.bind(this))
this.sendToMainWorker({
ready: true,
- taskFunctions: this.listTaskFunctions(),
- workerId: this.id
+ taskFunctionNames: this.listTaskFunctionNames()
})
} catch {
this.sendToMainWorker({
ready: false,
- taskFunctions: this.listTaskFunctions(),
- workerId: this.id
+ taskFunctionNames: this.listTaskFunctionNames()
})
}
}
/** @inheritDoc */
protected sendToMainWorker (message: MessageValue<Response>): void {
- this.port.postMessage(message)
+ this.port.postMessage({ ...message, workerId: this.id })
}
/** @inheritDoc */
'./tests/worker-files/thread/testMultipleTaskFunctionsWorker.js'
)
await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
- expect(dynamicThreadPool.listTaskFunctions()).toStrictEqual([
+ expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
DEFAULT_TASK_NAME,
'jsonIntegerSerialization',
'factorial',
'./tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
)
await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
- expect(fixedClusterPool.listTaskFunctions()).toStrictEqual([
+ expect(fixedClusterPool.listTaskFunctionNames()).toStrictEqual([
DEFAULT_TASK_NAME,
'jsonIntegerSerialization',
'factorial',
expect(pool.info.executingTasks).toBe(0)
expect(pool.info.executedTasks).toBe(4)
for (const workerNode of pool.workerNodes) {
- expect(workerNode.info.taskFunctions).toStrictEqual([
+ expect(workerNode.info.taskFunctionNames).toStrictEqual([
DEFAULT_TASK_NAME,
'jsonIntegerSerialization',
'factorial',
'fibonacci'
])
expect(workerNode.taskFunctionsUsage.size).toBe(3)
- for (const name of pool.listTaskFunctions()) {
+ for (const name of pool.listTaskFunctionNames()) {
expect(workerNode.getTaskFunctionWorkerUsage(name)).toStrictEqual({
tasks: {
executed: expect.any(Number),
expect(
workerNode.getTaskFunctionWorkerUsage(DEFAULT_TASK_NAME)
).toStrictEqual(
- workerNode.getTaskFunctionWorkerUsage(workerNode.info.taskFunctions[1])
+ workerNode.getTaskFunctionWorkerUsage(
+ workerNode.info.taskFunctionNames[1]
+ )
)
}
await pool.destroy()
"Cannot get task function worker usage for task function name 'invalidTaskFunction' when task function names list is not yet defined"
)
)
- threadWorkerNode.info.taskFunctions = [DEFAULT_TASK_NAME, 'fn1']
+ threadWorkerNode.info.taskFunctionNames = [DEFAULT_TASK_NAME, 'fn1']
expect(() =>
threadWorkerNode.getTaskFunctionWorkerUsage('invalidTaskFunction')
).toThrowError(
"Cannot get task function worker usage for task function name 'invalidTaskFunction' when task function names list has less than 3 elements"
)
)
- threadWorkerNode.info.taskFunctions = [DEFAULT_TASK_NAME, 'fn1', 'fn2']
+ threadWorkerNode.info.taskFunctionNames = [DEFAULT_TASK_NAME, 'fn1', 'fn2']
expect(
threadWorkerNode.getTaskFunctionWorkerUsage(DEFAULT_TASK_NAME)
).toStrictEqual({
return 2
}
const worker = new ClusterWorker({ fn1, fn2 })
- expect(() => worker.hasTaskFunction(0)).toThrowError(
- new TypeError('name parameter is not a string')
- )
- expect(() => worker.hasTaskFunction('')).toThrowError(
- new TypeError('name parameter is an empty string')
- )
- expect(worker.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true)
- expect(worker.hasTaskFunction('fn1')).toBe(true)
- expect(worker.hasTaskFunction('fn2')).toBe(true)
- expect(worker.hasTaskFunction('fn3')).toBe(false)
+ expect(worker.hasTaskFunction(0)).toStrictEqual({
+ status: false,
+ error: new TypeError('name parameter is not a string')
+ })
+ expect(worker.hasTaskFunction('')).toStrictEqual({
+ status: false,
+ error: new TypeError('name parameter is an empty string')
+ })
+ expect(worker.hasTaskFunction(DEFAULT_TASK_NAME)).toStrictEqual({
+ status: true
+ })
+ expect(worker.hasTaskFunction('fn1')).toStrictEqual({ status: true })
+ expect(worker.hasTaskFunction('fn2')).toStrictEqual({ status: true })
+ expect(worker.hasTaskFunction('fn3')).toStrictEqual({ status: false })
})
it('Verify that addTaskFunction() works', () => {
return 3
}
const worker = new ThreadWorker(fn1)
- expect(() => worker.addTaskFunction(0, fn1)).toThrowError(
- new TypeError('name parameter is not a string')
- )
- expect(() => worker.addTaskFunction('', fn1)).toThrowError(
- new TypeError('name parameter is an empty string')
- )
- expect(() => worker.addTaskFunction('fn3', '')).toThrowError(
- new TypeError('fn parameter is not a function')
- )
+ expect(worker.addTaskFunction(0, fn1)).toStrictEqual({
+ status: false,
+ error: new TypeError('name parameter is not a string')
+ })
+ expect(worker.addTaskFunction('', fn1)).toStrictEqual({
+ status: false,
+ error: new TypeError('name parameter is an empty string')
+ })
+ expect(worker.addTaskFunction('fn3', '')).toStrictEqual({
+ status: false,
+ error: new TypeError('fn parameter is not a function')
+ })
expect(worker.taskFunctions.get(DEFAULT_TASK_NAME)).toBeInstanceOf(Function)
expect(worker.taskFunctions.get('fn1')).toBeInstanceOf(Function)
expect(worker.taskFunctions.size).toBe(2)
expect(worker.taskFunctions.get(DEFAULT_TASK_NAME)).toStrictEqual(
worker.taskFunctions.get('fn1')
)
- expect(() => worker.addTaskFunction(DEFAULT_TASK_NAME, fn2)).toThrowError(
- new Error('Cannot add a task function with the default reserved name')
- )
+ expect(worker.addTaskFunction(DEFAULT_TASK_NAME, fn2)).toStrictEqual({
+ status: false,
+ error: new Error(
+ 'Cannot add a task function with the default reserved name'
+ )
+ })
worker.addTaskFunction('fn2', fn2)
expect(worker.taskFunctions.get(DEFAULT_TASK_NAME)).toBeInstanceOf(Function)
expect(worker.taskFunctions.get('fn1')).toBeInstanceOf(Function)
return 2
}
const worker = new ClusterWorker({ fn1, fn2 })
- expect(() => worker.removeTaskFunction(0, fn1)).toThrowError(
- new TypeError('name parameter is not a string')
- )
- expect(() => worker.removeTaskFunction('', fn1)).toThrowError(
- new TypeError('name parameter is an empty string')
- )
+ expect(worker.removeTaskFunction(0, fn1)).toStrictEqual({
+ status: false,
+ error: new TypeError('name parameter is not a string')
+ })
+ expect(worker.removeTaskFunction('', fn1)).toStrictEqual({
+ status: false,
+ error: new TypeError('name parameter is an empty string')
+ })
worker.getMainWorker = sinon.stub().returns({
id: 1,
send: sinon.stub().returns()
expect(worker.taskFunctions.get(DEFAULT_TASK_NAME)).toStrictEqual(
worker.taskFunctions.get('fn1')
)
- expect(() => worker.removeTaskFunction(DEFAULT_TASK_NAME)).toThrowError(
- new Error(
+ expect(worker.removeTaskFunction(DEFAULT_TASK_NAME)).toStrictEqual({
+ status: false,
+ error: new Error(
'Cannot remove the task function with the default reserved name'
)
- )
- expect(() => worker.removeTaskFunction('fn1')).toThrowError(
- new Error(
+ })
+ expect(worker.removeTaskFunction('fn1')).toStrictEqual({
+ status: false,
+ error: new Error(
'Cannot remove the task function used as the default task function'
)
- )
+ })
worker.removeTaskFunction('fn2')
expect(worker.taskFunctions.get(DEFAULT_TASK_NAME)).toBeInstanceOf(Function)
expect(worker.taskFunctions.get('fn1')).toBeInstanceOf(Function)
return 2
}
const worker = new ClusterWorker({ fn1, fn2 })
- expect(worker.listTaskFunctions()).toStrictEqual([
+ expect(worker.listTaskFunctionNames()).toStrictEqual([
DEFAULT_TASK_NAME,
'fn1',
'fn2'
return 2
}
const worker = new ThreadWorker({ fn1, fn2 })
- expect(() => worker.setDefaultTaskFunction(0, fn1)).toThrowError(
- new TypeError('name parameter is not a string')
- )
- expect(() => worker.setDefaultTaskFunction('', fn1)).toThrowError(
- new TypeError('name parameter is an empty string')
- )
+ expect(worker.setDefaultTaskFunction(0, fn1)).toStrictEqual({
+ status: false,
+ error: new TypeError('name parameter is not a string')
+ })
+ expect(worker.setDefaultTaskFunction('', fn1)).toStrictEqual({
+ status: false,
+ error: new TypeError('name parameter is an empty string')
+ })
expect(worker.taskFunctions.get(DEFAULT_TASK_NAME)).toBeInstanceOf(Function)
expect(worker.taskFunctions.get('fn1')).toBeInstanceOf(Function)
expect(worker.taskFunctions.get('fn2')).toBeInstanceOf(Function)
expect(worker.taskFunctions.get(DEFAULT_TASK_NAME)).toStrictEqual(
worker.taskFunctions.get('fn1')
)
- expect(() => worker.setDefaultTaskFunction(DEFAULT_TASK_NAME)).toThrowError(
- new Error(
+ expect(worker.setDefaultTaskFunction(DEFAULT_TASK_NAME)).toStrictEqual({
+ status: false,
+ error: new Error(
'Cannot set the default task function reserved name as the default task function'
)
- )
- expect(() => worker.setDefaultTaskFunction('fn3')).toThrowError(
- new Error(
+ })
+ expect(worker.setDefaultTaskFunction('fn3')).toStrictEqual({
+ status: false,
+ error: new Error(
'Cannot set the default task function to a non-existing task function'
)
- )
+ })
worker.setDefaultTaskFunction('fn1')
expect(worker.taskFunctions.get(DEFAULT_TASK_NAME)).toStrictEqual(
worker.taskFunctions.get('fn1')