Task,
TaskFunctionProperties,
TaskPerformance,
- WorkerStatistics
+ WorkerStatistics,
} from '../utility-types.js'
import {
buildTaskFunctionProperties,
DEFAULT_TASK_NAME,
EMPTY_FUNCTION,
isAsyncFunction,
- isPlainObject
+ isPlainObject,
} from '../utils.js'
import type {
TaskAsyncFunction,
TaskFunctionObject,
TaskFunctionOperationResult,
TaskFunctions,
- TaskSyncFunction
+ TaskSyncFunction,
} from './task-functions.js'
import {
checkTaskFunctionName,
- checkValidTaskFunctionEntry,
- checkValidWorkerOptions
+ checkValidTaskFunctionObjectEntry,
+ checkValidWorkerOptions,
} from './utils.js'
import { KillBehaviors, type WorkerOptions } from './worker-options.js'
/**
* The function to call when the worker is killed.
*/
- killHandler: EMPTY_FUNCTION
+ killHandler: EMPTY_FUNCTION,
}
/**
* Base class that implements some shared logic for all poolifier workers.
- *
* @typeParam MainWorker - Type of main worker.
* @typeParam Data - Type of data this worker receives from pool's execution. This can only be structured-cloneable data.
* @typeParam Response - Type of response the worker sends back to the main worker. This can only be structured-cloneable data.
/**
* Constructs a new poolifier worker.
- *
* @param isMain - Whether this is the main worker or not.
* @param mainWorker - Reference to main worker.
* @param taskFunctions - Task function(s) processed by the worker when the pool's `execution` function is invoked. The first function is the default function.
/**
* Checks if the `taskFunctions` parameter is passed to the constructor and valid.
- *
* @param taskFunctions - The task function(s) parameter that should be checked.
*/
private checkTaskFunctions (
taskFunctions:
- | TaskFunction<Data, Response>
- | TaskFunctions<Data, Response>
- | undefined
+ | TaskFunction<Data, Response>
+ | TaskFunctions<Data, Response>
+ | undefined
): void {
if (taskFunctions == null) {
throw new Error('taskFunctions parameter is mandatory')
for (let [name, fnObj] of Object.entries(taskFunctions)) {
if (typeof fnObj === 'function') {
fnObj = { taskFunction: fnObj } satisfies TaskFunctionObject<
- Data,
- Response
+ Data,
+ Response
>
}
- checkValidTaskFunctionEntry<Data, Response>(name, fnObj)
+ checkValidTaskFunctionObjectEntry<Data, Response>(name, fnObj)
fnObj.taskFunction = fnObj.taskFunction.bind(this)
if (firstEntry) {
this.taskFunctions.set(DEFAULT_TASK_NAME, fnObj)
/**
* Checks if the worker has a task function with the given name.
- *
* @param name - The name of the task function to check.
* @returns Whether the worker has a task function with the given name or not.
*/
/**
* Adds a task function to the worker.
* If a task function with the same name already exists, it is replaced.
- *
* @param name - The name of the task function to add.
* @param fn - The task function to add.
* @returns Whether the task function was added or not.
if (typeof fn === 'function') {
fn = { taskFunction: fn } satisfies TaskFunctionObject<Data, Response>
}
- checkValidTaskFunctionEntry<Data, Response>(name, fn)
+ checkValidTaskFunctionObjectEntry<Data, Response>(name, fn)
fn.taskFunction = fn.taskFunction.bind(this)
if (
this.taskFunctions.get(name) ===
/**
* Removes a task function from the worker.
- *
* @param name - The name of the task function to remove.
* @returns Whether the task function existed and was removed or not.
*/
/**
* Lists the properties of the worker's task functions.
- *
* @returns The properties of the worker's task functions.
*/
public listTaskFunctionsProperties (): TaskFunctionProperties[] {
defaultTaskFunctionName,
this.taskFunctions.get(defaultTaskFunctionName)
),
- ...taskFunctionsProperties
+ ...taskFunctionsProperties,
]
}
/**
* Sets the default task function to use in the worker.
- *
* @param name - The name of the task function to use as default task function.
* @returns Whether the default task function was set or not.
*/
/**
* Handles the ready message sent by the main worker.
- *
* @param message - The ready message.
*/
protected abstract handleReadyMessage (message: MessageValue<Data>): void
/**
* Worker message listener.
- *
* @param message - The received message.
*/
protected messageListener (message: MessageValue<Data>): void {
this.checkMessageWorkerId(message)
- if (message.statistics != null) {
+ const {
+ statistics,
+ checkActive,
+ taskFunctionOperation,
+ taskId,
+ data,
+ kill,
+ } = message
+ if (statistics != null) {
// Statistics message received
- this.statistics = message.statistics
- } else if (message.checkActive != null) {
+ this.statistics = statistics
+ } else if (checkActive != null) {
// Check active message received
- message.checkActive ? this.startCheckActive() : this.stopCheckActive()
- } else if (message.taskFunctionOperation != null) {
+ checkActive ? this.startCheckActive() : this.stopCheckActive()
+ } else if (taskFunctionOperation != null) {
// Task function operation message received
this.handleTaskFunctionOperationMessage(message)
- } else if (message.taskId != null && message.data != null) {
+ } else if (taskId != null && data != null) {
// Task message received
this.run(message)
- } else if (message.kill === true) {
+ } else if (kill === true) {
// Kill message received
this.handleKillMessage(message)
}
response = this.addTaskFunction(taskFunctionProperties.name, {
// eslint-disable-next-line @typescript-eslint/no-implied-eval, no-new-func
taskFunction: new Function(
- `return ${taskFunction}`
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ `return ${taskFunction!}`
)() as TaskFunction<Data, Response>,
...(taskFunctionProperties.priority != null && {
- priority: taskFunctionProperties.priority
+ priority: taskFunctionProperties.priority,
}),
...(taskFunctionProperties.strategy != null && {
- strategy: taskFunctionProperties.strategy
- })
+ strategy: taskFunctionProperties.strategy,
+ }),
})
break
case 'remove':
response = { status: false, error: new Error('Unknown task operation') }
break
}
+ const { status, error } = response
this.sendToMainWorker({
taskFunctionOperation,
- taskFunctionOperationStatus: response.status,
+ taskFunctionOperationStatus: status,
taskFunctionProperties,
- ...(!response.status &&
- response.error != null && {
+ ...(!status &&
+ error != null && {
workerError: {
name: taskFunctionProperties.name,
- message: this.handleError(response.error as Error | string)
- }
- })
+ message: this.handleError(error as Error | string),
+ },
+ }),
})
}
/**
* Handles a kill message sent by the main worker.
- *
* @param message - The kill message.
*/
- protected handleKillMessage (_message: MessageValue<Data>): void {
+ protected handleKillMessage (message: MessageValue<Data>): void {
this.stopCheckActive()
if (isAsyncFunction(this.opts.killHandler)) {
- (this.opts.killHandler() as Promise<void>)
+ ;(this.opts.killHandler as () => Promise<void>)()
.then(() => {
this.sendToMainWorker({ kill: 'success' })
return undefined
})
} else {
try {
- // eslint-disable-next-line @typescript-eslint/no-invalid-void-type
- this.opts.killHandler?.() as void
+ ;(this.opts.killHandler as (() => void) | undefined)?.()
this.sendToMainWorker({ kill: 'success' })
} catch {
this.sendToMainWorker({ kill: 'failure' })
/**
* Check if the message worker id is set and matches the worker id.
- *
* @param message - The message to check.
* @throws {@link https://nodejs.org/api/errors.html#class-error} If the message worker id is not set or does not match the worker id.
*/
throw new Error('Message worker id is not set')
} else if (message.workerId !== this.id) {
throw new Error(
- `Message worker id ${message.workerId} does not match the worker id ${this.id}`
+ `Message worker id ${message.workerId.toString()} does not match the worker id ${this.id.toString()}`
)
}
}
/**
* Returns the main worker.
- *
* @returns Reference to the main worker.
* @throws {@link https://nodejs.org/api/errors.html#class-error} If the main worker is not set.
*/
/**
* Sends a message to main worker.
- *
* @param message - The response message.
*/
protected abstract sendToMainWorker (
*/
protected sendTaskFunctionsPropertiesToMainWorker (): void {
this.sendToMainWorker({
- taskFunctionsProperties: this.listTaskFunctionsProperties()
+ taskFunctionsProperties: this.listTaskFunctionsProperties(),
})
}
/**
* Handles an error and convert it to a string so it can be sent back to the main worker.
- *
* @param error - The error raised by the worker.
* @returns The error message.
*/
/**
* Runs the given task.
- *
* @param task - The task to execute.
*/
protected readonly run = (task: Task<Data>): void => {
workerError: {
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
name: name!,
- message: `Task function '${name}' not found`,
- data
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ message: `Task function '${name!}' not found`,
+ data,
},
- taskId
+ taskId,
})
return
}
/**
* Runs the given task function synchronously.
- *
* @param fn - Task function that will be executed.
* @param task - Input data for the task function.
*/
this.sendToMainWorker({
data: res,
taskPerformance,
- taskId
+ taskId,
})
} catch (error) {
this.sendToMainWorker({
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
name: name!,
message: this.handleError(error as Error | string),
- data
+ data,
},
- taskId
+ taskId,
})
} finally {
this.updateLastTaskTimestamp()
/**
* Runs the given task function asynchronously.
- *
* @param fn - Task function that will be executed.
* @param task - Input data for the task function.
*/
this.sendToMainWorker({
data: res,
taskPerformance,
- taskId
+ taskId,
})
return undefined
})
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
name: name!,
message: this.handleError(error as Error | string),
- data
+ data,
},
- taskId
+ taskId,
})
})
.finally(() => {
name: name ?? DEFAULT_TASK_NAME,
timestamp: performance.now(),
...(this.statistics.elu && {
- elu: performance.eventLoopUtilization()
- })
+ elu: performance.eventLoopUtilization(),
+ }),
}
}
return {
...taskPerformance,
...(this.statistics.runTime && {
- runTime: performance.now() - taskPerformance.timestamp
+ runTime: performance.now() - taskPerformance.timestamp,
}),
...(this.statistics.elu && {
- elu: performance.eventLoopUtilization(taskPerformance.elu)
- })
+ elu: performance.eventLoopUtilization(taskPerformance.elu),
+ }),
}
}