- [`pool.hasTaskFunction(name)`](#poolhastaskfunctionname)
- [`pool.addTaskFunction(name, fn)`](#pooladdtaskfunctionname-fn)
- [`pool.removeTaskFunction(name)`](#poolremovetaskfunctionname)
- - [`pool.listTaskFunctionNames()`](#poollisttaskfunctionnames)
+ - [`pool.listTaskFunctionsProperties()`](#poollisttaskfunctionsproperties)
- [`pool.setDefaultTaskFunction(name)`](#poolsetdefaulttaskfunctionname)
- [Pool options](#pool-options)
- [Worker](#worker)
- [`YourWorker.hasTaskFunction(name)`](#yourworkerhastaskfunctionname)
- [`YourWorker.addTaskFunction(name, fn)`](#yourworkeraddtaskfunctionname-fn)
- [`YourWorker.removeTaskFunction(name)`](#yourworkerremovetaskfunctionname)
- - [`YourWorker.listTaskFunctionNames()`](#yourworkerlisttaskfunctionnames)
+ - [`YourWorker.listTaskFunctionsProperties()`](#yourworkerlisttaskfunctionsproperties)
- [`YourWorker.setDefaultTaskFunction(name)`](#yourworkersetdefaulttaskfunctionname)
## Pool
This method is available on both pool implementations and returns a boolean promise.
-### `pool.listTaskFunctionNames()`
+### `pool.listTaskFunctionsProperties()`
-This method is available on both pool implementations and returns an array of the task function names.
+This method is available on both pool implementations and returns an array of the task function properties.
### `pool.setDefaultTaskFunction(name)`
This method is available on both worker implementations and returns `{ status: boolean, error?: Error }`.
-#### `YourWorker.listTaskFunctionNames()`
+#### `YourWorker.listTaskFunctionsProperties()`
-This method is available on both worker implementations and returns an array of the task function names.
+This method is available on both worker implementations and returns an array of the task function properties.
#### `YourWorker.setDefaultTaskFunction(name)`
}
}
+ /**
+ * Increments the size of the deque.
+ *
+ * @returns The new size of the deque.
+ */
private incrementSize (): number {
++this.size
if (this.size > this.maxSize) {
import type {
MessageValue,
PromiseResponseWrapper,
- Task
+ Task,
+ TaskFunctionProperties
} from '../utility-types.js'
import {
average,
+ buildTaskFunctionProperties,
DEFAULT_TASK_NAME,
EMPTY_FUNCTION,
exponentialDelay,
round,
sleep
} from '../utils.js'
-import type { TaskFunction } from '../worker/task-functions.js'
+import type {
+ TaskFunction,
+ TaskFunctionObject
+} from '../worker/task-functions.js'
import { KillBehaviors } from '../worker/worker-options.js'
import {
type IPool,
/**
* The task functions added at runtime map:
* - `key`: The task function name.
- * - `value`: The task function itself.
+ * - `value`: The task function object.
*/
- private readonly taskFunctions: Map<string, TaskFunction<Data, Response>>
+ private readonly taskFunctions: Map<
+ string,
+ TaskFunctionObject<Data, Response>
+ >
/**
* Whether the pool is started or not.
this.setupHook()
- this.taskFunctions = new Map<string, TaskFunction<Data, Response>>()
+ this.taskFunctions = new Map<string, TaskFunctionObject<Data, Response>>()
this.started = false
this.starting = false
public hasTaskFunction (name: string): boolean {
for (const workerNode of this.workerNodes) {
if (
- Array.isArray(workerNode.info.taskFunctionNames) &&
- workerNode.info.taskFunctionNames.includes(name)
+ Array.isArray(workerNode.info.taskFunctionsProperties) &&
+ workerNode.info.taskFunctionsProperties.some(
+ taskFunctionProperties => taskFunctionProperties.name === name
+ )
) {
return true
}
/** @inheritDoc */
public async addTaskFunction (
name: string,
- fn: TaskFunction<Data, Response>
+ fn: TaskFunction<Data, Response> | TaskFunctionObject<Data, Response>
): Promise<boolean> {
if (typeof name !== 'string') {
throw new TypeError('name argument must be a string')
if (typeof name === 'string' && name.trim().length === 0) {
throw new TypeError('name argument must not be an empty string')
}
- if (typeof fn !== 'function') {
- throw new TypeError('fn argument must be a function')
+ if (typeof fn === 'function') {
+ fn = { taskFunction: fn } satisfies TaskFunctionObject<Data, Response>
+ }
+ if (typeof fn.taskFunction !== 'function') {
+ throw new TypeError('taskFunction property must be a function')
}
const opResult = await this.sendTaskFunctionOperationToWorkers({
taskFunctionOperation: 'add',
- taskFunctionName: name,
- taskFunction: fn.toString()
+ taskFunctionProperties: buildTaskFunctionProperties(name, fn),
+ taskFunction: fn.taskFunction.toString()
})
this.taskFunctions.set(name, fn)
return opResult
}
const opResult = await this.sendTaskFunctionOperationToWorkers({
taskFunctionOperation: 'remove',
- taskFunctionName: name
+ taskFunctionProperties: buildTaskFunctionProperties(
+ name,
+ this.taskFunctions.get(name)
+ )
})
this.deleteTaskFunctionWorkerUsages(name)
this.taskFunctions.delete(name)
}
/** @inheritDoc */
- public listTaskFunctionNames (): string[] {
+ public listTaskFunctionsProperties (): TaskFunctionProperties[] {
for (const workerNode of this.workerNodes) {
if (
- Array.isArray(workerNode.info.taskFunctionNames) &&
- workerNode.info.taskFunctionNames.length > 0
+ Array.isArray(workerNode.info.taskFunctionsProperties) &&
+ workerNode.info.taskFunctionsProperties.length > 0
) {
- return workerNode.info.taskFunctionNames
+ return workerNode.info.taskFunctionsProperties
}
}
return []
public async setDefaultTaskFunction (name: string): Promise<boolean> {
return await this.sendTaskFunctionOperationToWorkers({
taskFunctionOperation: 'default',
- taskFunctionName: name
+ taskFunctionProperties: buildTaskFunctionProperties(
+ name,
+ this.taskFunctions.get(name)
+ )
})
}
const workerInfo = this.getWorkerInfo(workerNodeKey)
return (
workerInfo != null &&
- Array.isArray(workerInfo.taskFunctionNames) &&
- workerInfo.taskFunctionNames.length > 2
+ Array.isArray(workerInfo.taskFunctionsProperties) &&
+ workerInfo.taskFunctionsProperties.length > 2
)
}
checkActive: true
})
if (this.taskFunctions.size > 0) {
- for (const [taskFunctionName, taskFunction] of this.taskFunctions) {
+ for (const [taskFunctionName, taskFunctionObject] of this.taskFunctions) {
this.sendTaskFunctionOperationToWorker(workerNodeKey, {
taskFunctionOperation: 'add',
- taskFunctionName,
- taskFunction: taskFunction.toString()
+ taskFunctionProperties: buildTaskFunctionProperties(
+ taskFunctionName,
+ taskFunctionObject
+ ),
+ taskFunction: taskFunctionObject.taskFunction.toString()
}).catch((error: unknown) => {
this.emitter?.emit(PoolEvents.error, error)
})
) {
workerInfo.stealing = false
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
- for (const taskName of workerInfo.taskFunctionNames!) {
+ for (const taskFunctionProperties of workerInfo.taskFunctionsProperties!) {
this.resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
workerNodeKey,
- taskName
+ taskFunctionProperties.name
)
}
this.resetTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey)
message: MessageValue<Response>
): void => {
this.checkMessageWorkerId(message)
- const { workerId, ready, taskId, taskFunctionNames } = message
- if (ready != null && taskFunctionNames != null) {
+ const { workerId, ready, taskId, taskFunctionsProperties } = message
+ if (ready != null && taskFunctionsProperties != null) {
// Worker ready response received from worker
this.handleWorkerReadyResponse(message)
- } else if (taskId != null) {
- // Task execution response received from worker
- this.handleTaskExecutionResponse(message)
- } else if (taskFunctionNames != null) {
- // Task function names message received from worker
+ } else if (taskFunctionsProperties != null) {
+ // Task function properties message received from worker
const workerInfo = this.getWorkerInfo(
this.getWorkerNodeKeyByWorkerId(workerId)
)
if (workerInfo != null) {
- workerInfo.taskFunctionNames = taskFunctionNames
+ workerInfo.taskFunctionsProperties = taskFunctionsProperties
}
+ } else if (taskId != null) {
+ // Task execution response received from worker
+ this.handleTaskExecutionResponse(message)
}
}
}
private handleWorkerReadyResponse (message: MessageValue<Response>): void {
- const { workerId, ready, taskFunctionNames } = message
+ const { workerId, ready, taskFunctionsProperties } = message
if (ready == null || !ready) {
throw new Error(`Worker ${workerId} failed to initialize`)
}
const workerNode =
this.workerNodes[this.getWorkerNodeKeyByWorkerId(workerId)]
workerNode.info.ready = ready
- workerNode.info.taskFunctionNames = taskFunctionNames
+ workerNode.info.taskFunctionsProperties = taskFunctionsProperties
this.checkAndEmitReadyEvent()
}
import type { EventEmitterAsyncResource } from 'node:events'
import type { TransferListItem, WorkerOptions } from 'node:worker_threads'
+import type { TaskFunctionProperties } from '../utility-types.js'
import type { TaskFunction } from '../worker/task-functions.js'
import type {
WorkerChoiceStrategy,
*/
readonly removeTaskFunction: (name: string) => Promise<boolean>
/**
- * Lists the names of task function available in this pool.
+ * Lists the properties of task functions available in this pool.
*
- * @returns The names of task function available in this pool.
+ * @returns The properties of task functions available in this pool.
*/
- readonly listTaskFunctionNames: () => string[]
+ readonly listTaskFunctionsProperties: () => TaskFunctionProperties[]
/**
* Sets the default task function in this pool.
*
/** @inheritdoc */
public getTaskFunctionWorkerUsage (name: string): WorkerUsage | undefined {
- if (!Array.isArray(this.info.taskFunctionNames)) {
+ if (!Array.isArray(this.info.taskFunctionsProperties)) {
throw new Error(
- `Cannot get task function worker usage for task function name '${name}' when task function names list is not yet defined`
+ `Cannot get task function worker usage for task function name '${name}' when task function properties list is not yet defined`
)
}
if (
- Array.isArray(this.info.taskFunctionNames) &&
- this.info.taskFunctionNames.length < 3
+ Array.isArray(this.info.taskFunctionsProperties) &&
+ this.info.taskFunctionsProperties.length < 3
) {
throw new Error(
- `Cannot get task function worker usage for task function name '${name}' when task function names list has less than 3 elements`
+ `Cannot get task function worker usage for task function name '${name}' when task function properties list has less than 3 elements`
)
}
if (name === DEFAULT_TASK_NAME) {
- name = this.info.taskFunctionNames[1]
+ name = this.info.taskFunctionsProperties[1].name
}
if (!this.taskFunctionsUsage.has(name)) {
this.taskFunctionsUsage.set(name, this.initTaskFunctionWorkerUsage(name))
if (
(task.name === DEFAULT_TASK_NAME &&
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
- name === this.info.taskFunctionNames![1]) ||
+ name === this.info.taskFunctionsProperties![1].name) ||
(task.name !== DEFAULT_TASK_NAME && name === task.name)
) {
++taskFunctionQueueSize
import type { MessageChannel, WorkerOptions } from 'node:worker_threads'
import type { CircularArray } from '../circular-array.js'
-import type { Task } from '../utility-types.js'
+import type { Task, TaskFunctionProperties } from '../utility-types.js'
/**
* Callback invoked when the worker has started successfully.
*/
stealing: boolean
/**
- * Task function names.
+ * Task functions properties.
*/
- taskFunctionNames?: string[]
+ taskFunctionsProperties?: TaskFunctionProperties[]
}
/**
import type { EventLoopUtilization } from 'node:perf_hooks'
import type { MessagePort, TransferListItem } from 'node:worker_threads'
+import type { WorkerChoiceStrategy } from './pools/selection-strategies/selection-strategies-types.js'
import type { KillBehavior } from './worker/worker-options.js'
/**
readonly elu: boolean
}
+/**
+ * Task function properties.
+ *
+ * @internal
+ */
+export interface TaskFunctionProperties {
+ /**
+ * Task function name.
+ */
+ name: string
+ /**
+ * Task function priority. Lower values have higher priority.
+ */
+ priority?: number
+ /**
+ * Task function worker choice strategy.
+ */
+ strategy?: WorkerChoiceStrategy
+}
+
/**
* Message object that is passed as a task between main worker and worker.
*
*/
readonly taskFunctionOperationStatus?: boolean
/**
- * Task function serialized to string.
+ * Task function properties.
*/
- readonly taskFunction?: string
+ readonly taskFunctionProperties?: TaskFunctionProperties
/**
- * Task function name.
+ * Task function serialized to string.
*/
- readonly taskFunctionName?: string
+ readonly taskFunction?: string
/**
- * Task function names.
+ * Task functions properties.
*/
- readonly taskFunctionNames?: string[]
+ readonly taskFunctionsProperties?: TaskFunctionProperties[]
/**
* Whether the worker computes the given statistics or not.
*/
import { getRandomValues } from 'node:crypto'
import * as os from 'node:os'
+import type { TaskFunctionProperties } from './utility-types.js'
+import type { TaskFunctionObject } from './worker/task-functions.js'
import type { KillBehavior } from './worker/worker-options.js'
/**
return result
}
}
+
+export const buildTaskFunctionProperties = <Data, Response>(
+ name: string,
+ taskFunctionObject: TaskFunctionObject<Data, Response> | undefined
+): TaskFunctionProperties => {
+ return {
+ name,
+ ...(taskFunctionObject?.priority != null && {
+ priority: taskFunctionObject.priority
+ }),
+ ...(taskFunctionObject?.strategy != null && {
+ strategy: taskFunctionObject.strategy
+ })
+ }
+}
import type {
MessageValue,
Task,
+ TaskFunctionProperties,
TaskPerformance,
WorkerStatistics
} from '../utility-types.js'
import {
+ buildTaskFunctionProperties,
DEFAULT_TASK_NAME,
EMPTY_FUNCTION,
isAsyncFunction,
import type {
TaskAsyncFunction,
TaskFunction,
+ TaskFunctionObject,
TaskFunctionOperationResult,
TaskFunctions,
TaskSyncFunction
*/
protected abstract id: number
/**
- * Task function(s) processed by the worker when the pool's `execution` function is invoked.
+ * Task function object(s) processed by the worker when the pool's `execution` function is invoked.
*/
- protected taskFunctions!: Map<string, TaskFunction<Data, Response>>
+ protected taskFunctions!: Map<string, TaskFunctionObject<Data, Response>>
/**
* Timestamp of the last task processed by this worker.
*/
if (taskFunctions == null) {
throw new Error('taskFunctions parameter is mandatory')
}
- this.taskFunctions = new Map<string, TaskFunction<Data, Response>>()
+ this.taskFunctions = new Map<string, TaskFunctionObject<Data, Response>>()
if (typeof taskFunctions === 'function') {
- const boundFn = taskFunctions.bind(this)
- this.taskFunctions.set(DEFAULT_TASK_NAME, boundFn)
+ const fnObj = { taskFunction: taskFunctions.bind(this) }
+ this.taskFunctions.set(DEFAULT_TASK_NAME, fnObj)
this.taskFunctions.set(
typeof taskFunctions.name === 'string' &&
taskFunctions.name.trim().length > 0
? taskFunctions.name
: 'fn1',
- boundFn
+ fnObj
)
} else if (isPlainObject(taskFunctions)) {
let firstEntry = true
- for (const [name, fn] of Object.entries(taskFunctions)) {
- checkValidTaskFunctionEntry<Data, Response>(name, fn)
- const boundFn = fn.bind(this)
+ for (let [name, fnObj] of Object.entries(taskFunctions)) {
+ if (typeof fnObj === 'function') {
+ fnObj = { taskFunction: fnObj } satisfies TaskFunctionObject<
+ Data,
+ Response
+ >
+ }
+ checkValidTaskFunctionEntry<Data, Response>(name, fnObj)
+ fnObj.taskFunction = fnObj.taskFunction.bind(this)
if (firstEntry) {
- this.taskFunctions.set(DEFAULT_TASK_NAME, boundFn)
+ this.taskFunctions.set(DEFAULT_TASK_NAME, fnObj)
firstEntry = false
}
- this.taskFunctions.set(name, boundFn)
+ this.taskFunctions.set(name, fnObj)
}
if (firstEntry) {
throw new Error('taskFunctions parameter object is empty')
*/
public addTaskFunction (
name: string,
- fn: TaskFunction<Data, Response>
+ fn: TaskFunction<Data, Response> | TaskFunctionObject<Data, Response>
): TaskFunctionOperationResult {
try {
checkTaskFunctionName(name)
'Cannot add a task function with the default reserved name'
)
}
- if (typeof fn !== 'function') {
- throw new TypeError('fn parameter is not a function')
+ if (typeof fn === 'function') {
+ fn = { taskFunction: fn } satisfies TaskFunctionObject<Data, Response>
}
- const boundFn = fn.bind(this)
+ checkValidTaskFunctionEntry<Data, Response>(name, fn)
+ fn.taskFunction = fn.taskFunction.bind(this)
if (
this.taskFunctions.get(name) ===
this.taskFunctions.get(DEFAULT_TASK_NAME)
) {
- this.taskFunctions.set(DEFAULT_TASK_NAME, boundFn)
+ this.taskFunctions.set(DEFAULT_TASK_NAME, fn)
}
- this.taskFunctions.set(name, boundFn)
- this.sendTaskFunctionNamesToMainWorker()
+ this.taskFunctions.set(name, fn)
+ this.sendTaskFunctionsPropertiesToMainWorker()
return { status: true }
} catch (error) {
return { status: false, error: error as Error }
)
}
const deleteStatus = this.taskFunctions.delete(name)
- this.sendTaskFunctionNamesToMainWorker()
+ this.sendTaskFunctionsPropertiesToMainWorker()
return { status: deleteStatus }
} catch (error) {
return { status: false, error: error as Error }
}
/**
- * Lists the names of the worker's task functions.
+ * Lists the properties of the worker's task functions.
*
- * @returns The names of the worker's task functions.
+ * @returns The properties of the worker's task functions.
*/
- public listTaskFunctionNames (): string[] {
- const names = [...this.taskFunctions.keys()]
+ public listTaskFunctionsProperties (): TaskFunctionProperties[] {
let defaultTaskFunctionName = DEFAULT_TASK_NAME
- for (const [name, fn] of this.taskFunctions) {
+ for (const [name, fnObj] of this.taskFunctions) {
if (
name !== DEFAULT_TASK_NAME &&
- fn === this.taskFunctions.get(DEFAULT_TASK_NAME)
+ fnObj === this.taskFunctions.get(DEFAULT_TASK_NAME)
) {
defaultTaskFunctionName = name
break
}
}
+ const taskFunctionsProperties: TaskFunctionProperties[] = []
+ for (const [name, fnObj] of this.taskFunctions) {
+ if (name === DEFAULT_TASK_NAME || name === defaultTaskFunctionName) {
+ continue
+ }
+ taskFunctionsProperties.push(buildTaskFunctionProperties(name, fnObj))
+ }
return [
- names[names.indexOf(DEFAULT_TASK_NAME)],
- defaultTaskFunctionName,
- ...names.filter(
- name => name !== DEFAULT_TASK_NAME && name !== defaultTaskFunctionName
- )
+ buildTaskFunctionProperties(
+ DEFAULT_TASK_NAME,
+ this.taskFunctions.get(DEFAULT_TASK_NAME)
+ ),
+ buildTaskFunctionProperties(
+ defaultTaskFunctionName,
+ this.taskFunctions.get(defaultTaskFunctionName)
+ ),
+ ...taskFunctionsProperties
]
}
}
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
this.taskFunctions.set(DEFAULT_TASK_NAME, this.taskFunctions.get(name)!)
- this.sendTaskFunctionNamesToMainWorker()
+ this.sendTaskFunctionsPropertiesToMainWorker()
return { status: true }
} catch (error) {
return { status: false, error: error as Error }
protected handleTaskFunctionOperationMessage (
message: MessageValue<Data>
): void {
- const { taskFunctionOperation, taskFunctionName, taskFunction } = message
- if (taskFunctionName == null) {
+ const { taskFunctionOperation, taskFunctionProperties, taskFunction } =
+ message
+ if (taskFunctionProperties == null) {
throw new Error(
- 'Cannot handle task function operation message without a task function name'
+ 'Cannot handle task function operation message without task function properties'
)
}
let response: TaskFunctionOperationResult
switch (taskFunctionOperation) {
case 'add':
- response = this.addTaskFunction(
- taskFunctionName,
+ response = this.addTaskFunction(taskFunctionProperties.name, {
// eslint-disable-next-line @typescript-eslint/no-implied-eval, no-new-func
- new Function(`return ${taskFunction}`)() as TaskFunction<
- Data,
- Response
- >
- )
+ taskFunction: new Function(
+ `return ${taskFunction}`
+ )() as TaskFunction<Data, Response>,
+ ...(taskFunctionProperties.priority != null && {
+ priority: taskFunctionProperties.priority
+ }),
+ ...(taskFunctionProperties.strategy != null && {
+ strategy: taskFunctionProperties.strategy
+ })
+ })
break
case 'remove':
- response = this.removeTaskFunction(taskFunctionName)
+ response = this.removeTaskFunction(taskFunctionProperties.name)
break
case 'default':
- response = this.setDefaultTaskFunction(taskFunctionName)
+ response = this.setDefaultTaskFunction(taskFunctionProperties.name)
break
default:
response = { status: false, error: new Error('Unknown task operation') }
this.sendToMainWorker({
taskFunctionOperation,
taskFunctionOperationStatus: response.status,
- taskFunctionName,
+ taskFunctionProperties,
...(!response.status &&
response.error != null && {
workerError: {
- name: taskFunctionName,
+ name: taskFunctionProperties.name,
message: this.handleError(response.error as Error | string)
}
})
): void
/**
- * Sends task function names to the main worker.
+ * Sends task functions properties to the main worker.
*/
- protected sendTaskFunctionNamesToMainWorker (): void {
+ protected sendTaskFunctionsPropertiesToMainWorker (): void {
this.sendToMainWorker({
- taskFunctionNames: this.listTaskFunctionNames()
+ taskFunctionsProperties: this.listTaskFunctionsProperties()
})
}
})
return
}
- const fn = this.taskFunctions.get(taskFunctionName)
+ const fn = this.taskFunctions.get(taskFunctionName)?.taskFunction
if (isAsyncFunction(fn)) {
this.runAsync(fn as TaskAsyncFunction<Data, Response>, task)
} else {
this.getMainWorker().on('message', this.messageListener.bind(this))
this.sendToMainWorker({
ready: true,
- taskFunctionNames: this.listTaskFunctionNames()
+ taskFunctionsProperties: this.listTaskFunctionsProperties()
})
} catch {
this.sendToMainWorker({
ready: false,
- taskFunctionNames: this.listTaskFunctionNames()
+ taskFunctionsProperties: this.listTaskFunctionsProperties()
})
}
}
+import type { WorkerChoiceStrategy } from '../pools/selection-strategies/selection-strategies-types.js'
+
/**
* Task synchronous function that can be executed.
*
| TaskSyncFunction<Data, Response>
| TaskAsyncFunction<Data, Response>
+/**
+ * Task function object.
+ *
+ * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
+ * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
+ */
+export interface TaskFunctionObject<Data = unknown, Response = unknown> {
+ /**
+ * Task function.
+ */
+ taskFunction: TaskFunction<Data, Response>
+ /**
+ * Task function priority. Lower values have higher priority.
+ */
+ priority?: number
+ /**
+ * Task function worker choice strategy.
+ */
+ strategy?: WorkerChoiceStrategy
+}
+
/**
* Tasks functions that can be executed.
- * This object can contain synchronous or asynchronous functions.
- * The key is the name of the function.
- * The value is the function itself.
+ * The key is the name of the task function or task function object.
+ * The value is the function or task function object.
*
* @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
* @typeParam Response - Type of execution response. This can only be structured-cloneable data.
*/
export type TaskFunctions<Data = unknown, Response = unknown> = Record<
string,
-TaskFunction<Data, Response>
+TaskFunction<Data, Response> | TaskFunctionObject<Data, Response>
>
/**
this.port.on('message', this.messageListener.bind(this))
this.sendToMainWorker({
ready: true,
- taskFunctionNames: this.listTaskFunctionNames()
+ taskFunctionsProperties: this.listTaskFunctionsProperties()
})
} catch {
this.sendToMainWorker({
ready: false,
- taskFunctionNames: this.listTaskFunctionNames()
+ taskFunctionsProperties: this.listTaskFunctionsProperties()
})
}
}
+import { checkValidWorkerChoiceStrategy } from '../pools/utils.js'
import { isPlainObject } from '../utils.js'
-import type { TaskFunction } from './task-functions.js'
+import type { TaskFunctionObject } from './task-functions.js'
import { KillBehaviors, type WorkerOptions } from './worker-options.js'
export const checkValidWorkerOptions = (
export const checkValidTaskFunctionEntry = <Data = unknown, Response = unknown>(
name: string,
- fn: TaskFunction<Data, Response>
+ fnObj: TaskFunctionObject<Data, Response>
): void => {
if (typeof name !== 'string') {
throw new TypeError('A taskFunctions parameter object key is not a string')
'A taskFunctions parameter object key is an empty string'
)
}
- if (typeof fn !== 'function') {
+ if (typeof fnObj.taskFunction !== 'function') {
throw new TypeError(
- 'A taskFunctions parameter object value is not a function'
+ // eslint-disable-next-line @typescript-eslint/restrict-template-expressions
+ `taskFunction object 'taskFunction' property '${fnObj.taskFunction}' is not a function`
)
}
+ if (fnObj.priority != null && !Number.isSafeInteger(fnObj.priority)) {
+ throw new TypeError(
+ `taskFunction object 'priority' property '${fnObj.priority}' is not an integer`
+ )
+ }
+ checkValidWorkerChoiceStrategy(fnObj.strategy)
}
export const checkTaskFunctionName = (name: string): void => {
new TypeError('name argument must not be an empty string')
)
await expect(dynamicThreadPool.addTaskFunction('test', 0)).rejects.toThrow(
- new TypeError('fn argument must be a function')
+ new TypeError('taskFunction property must be a function')
)
await expect(dynamicThreadPool.addTaskFunction('test', '')).rejects.toThrow(
- new TypeError('fn argument must be a function')
+ new TypeError('taskFunction property must be a function')
)
- expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
- DEFAULT_TASK_NAME,
- 'test'
+ expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([
+ { name: DEFAULT_TASK_NAME },
+ { name: 'test' }
])
const echoTaskFunction = data => {
return data
dynamicThreadPool.addTaskFunction('echo', echoTaskFunction)
).resolves.toBe(true)
expect(dynamicThreadPool.taskFunctions.size).toBe(1)
- expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual(
- echoTaskFunction
- )
- expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
- DEFAULT_TASK_NAME,
- 'test',
- 'echo'
+ expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual({
+ taskFunction: echoTaskFunction
+ })
+ expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([
+ { name: DEFAULT_TASK_NAME },
+ { name: 'test' },
+ { name: 'echo' }
])
const taskFunctionData = { test: 'test' }
const echoResult = await dynamicThreadPool.execute(taskFunctionData, 'echo')
'./tests/worker-files/thread/testWorker.mjs'
)
await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
- expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
- DEFAULT_TASK_NAME,
- 'test'
+ expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([
+ { name: DEFAULT_TASK_NAME },
+ { name: 'test' }
])
await expect(dynamicThreadPool.removeTaskFunction('test')).rejects.toThrow(
new Error('Cannot remove a task function not handled on the pool side')
}
await dynamicThreadPool.addTaskFunction('echo', echoTaskFunction)
expect(dynamicThreadPool.taskFunctions.size).toBe(1)
- expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual(
- echoTaskFunction
- )
- expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
- DEFAULT_TASK_NAME,
- 'test',
- 'echo'
+ expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual({
+ taskFunction: echoTaskFunction
+ })
+ expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([
+ { name: DEFAULT_TASK_NAME },
+ { name: 'test' },
+ { name: 'echo' }
])
await expect(dynamicThreadPool.removeTaskFunction('echo')).resolves.toBe(
true
)
expect(dynamicThreadPool.taskFunctions.size).toBe(0)
expect(dynamicThreadPool.taskFunctions.get('echo')).toBeUndefined()
- expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
- DEFAULT_TASK_NAME,
- 'test'
+ expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([
+ { name: DEFAULT_TASK_NAME },
+ { name: 'test' }
])
await dynamicThreadPool.destroy()
})
'./tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
)
await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
- expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
- DEFAULT_TASK_NAME,
- 'jsonIntegerSerialization',
- 'factorial',
- 'fibonacci'
+ expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([
+ { name: DEFAULT_TASK_NAME },
+ { name: 'jsonIntegerSerialization' },
+ { name: 'factorial' },
+ { name: 'fibonacci' }
])
await dynamicThreadPool.destroy()
const fixedClusterPool = new FixedClusterPool(
'./tests/worker-files/cluster/testMultipleTaskFunctionsWorker.cjs'
)
await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
- expect(fixedClusterPool.listTaskFunctionNames()).toStrictEqual([
- DEFAULT_TASK_NAME,
- 'jsonIntegerSerialization',
- 'factorial',
- 'fibonacci'
+ expect(fixedClusterPool.listTaskFunctionsProperties()).toStrictEqual([
+ { name: DEFAULT_TASK_NAME },
+ { name: 'jsonIntegerSerialization' },
+ { name: 'factorial' },
+ { name: 'fibonacci' }
])
await fixedClusterPool.destroy()
})
`Task function operation 'default' failed on worker ${workerId} with error: 'Error: Cannot set the default task function to a non-existing task function'`
)
)
- expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
- DEFAULT_TASK_NAME,
- 'jsonIntegerSerialization',
- 'factorial',
- 'fibonacci'
+ expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([
+ { name: DEFAULT_TASK_NAME },
+ { name: 'jsonIntegerSerialization' },
+ { name: 'factorial' },
+ { name: 'fibonacci' }
])
await expect(
dynamicThreadPool.setDefaultTaskFunction('factorial')
).resolves.toBe(true)
- expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
- DEFAULT_TASK_NAME,
- 'factorial',
- 'jsonIntegerSerialization',
- 'fibonacci'
+ expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([
+ { name: DEFAULT_TASK_NAME },
+ { name: 'factorial' },
+ { name: 'jsonIntegerSerialization' },
+ { name: 'fibonacci' }
])
await expect(
dynamicThreadPool.setDefaultTaskFunction('fibonacci')
).resolves.toBe(true)
- expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
- DEFAULT_TASK_NAME,
- 'fibonacci',
- 'jsonIntegerSerialization',
- 'factorial'
+ expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([
+ { name: DEFAULT_TASK_NAME },
+ { name: 'fibonacci' },
+ { name: 'jsonIntegerSerialization' },
+ { name: 'factorial' }
])
await dynamicThreadPool.destroy()
})
expect(pool.info.executingTasks).toBe(0)
expect(pool.info.executedTasks).toBe(4)
for (const workerNode of pool.workerNodes) {
- expect(workerNode.info.taskFunctionNames).toStrictEqual([
- DEFAULT_TASK_NAME,
- 'jsonIntegerSerialization',
- 'factorial',
- 'fibonacci'
+ expect(workerNode.info.taskFunctionsProperties).toStrictEqual([
+ { name: DEFAULT_TASK_NAME },
+ { name: 'jsonIntegerSerialization' },
+ { name: 'factorial' },
+ { name: 'fibonacci' }
])
expect(workerNode.taskFunctionsUsage.size).toBe(3)
- for (const name of pool.listTaskFunctionNames()) {
- expect(workerNode.getTaskFunctionWorkerUsage(name)).toStrictEqual({
+ for (const taskFunctionProperties of pool.listTaskFunctionsProperties()) {
+ expect(
+ workerNode.getTaskFunctionWorkerUsage(taskFunctionProperties.name)
+ ).toStrictEqual({
tasks: {
executed: expect.any(Number),
executing: 0,
}
})
expect(
- workerNode.getTaskFunctionWorkerUsage(name).tasks.executed
+ workerNode.getTaskFunctionWorkerUsage(taskFunctionProperties.name)
+ .tasks.executed
).toBeGreaterThan(0)
}
expect(
workerNode.getTaskFunctionWorkerUsage(DEFAULT_TASK_NAME)
).toStrictEqual(
workerNode.getTaskFunctionWorkerUsage(
- workerNode.info.taskFunctionNames[1]
+ workerNode.info.taskFunctionsProperties[1].name
)
)
}
await expect(
pool.sendTaskFunctionOperationToWorker(workerNodeKey, {
taskFunctionOperation: 'add',
- taskFunctionName: 'empty',
+ taskFunctionProperties: { name: 'empty' },
taskFunction: (() => {}).toString()
})
).resolves.toBe(true)
expect(
- pool.workerNodes[workerNodeKey].info.taskFunctionNames
- ).toStrictEqual([DEFAULT_TASK_NAME, 'test', 'empty'])
+ pool.workerNodes[workerNodeKey].info.taskFunctionsProperties
+ ).toStrictEqual([
+ { name: DEFAULT_TASK_NAME },
+ { name: 'test' },
+ { name: 'empty' }
+ ])
await pool.destroy()
})
await expect(
pool.sendTaskFunctionOperationToWorkers({
taskFunctionOperation: 'add',
- taskFunctionName: 'empty',
+ taskFunctionProperties: { name: 'empty' },
taskFunction: (() => {}).toString()
})
).resolves.toBe(true)
for (const workerNode of pool.workerNodes) {
- expect(workerNode.info.taskFunctionNames).toStrictEqual([
- DEFAULT_TASK_NAME,
- 'test',
- 'empty'
+ expect(workerNode.info.taskFunctionsProperties).toStrictEqual([
+ { name: DEFAULT_TASK_NAME },
+ { name: 'test' },
+ { name: 'empty' }
])
}
await pool.destroy()
threadWorkerNode.getTaskFunctionWorkerUsage('invalidTaskFunction')
).toThrow(
new TypeError(
- "Cannot get task function worker usage for task function name 'invalidTaskFunction' when task function names list is not yet defined"
+ "Cannot get task function worker usage for task function name 'invalidTaskFunction' when task function properties list is not yet defined"
)
)
- threadWorkerNode.info.taskFunctionNames = [DEFAULT_TASK_NAME, 'fn1']
+ threadWorkerNode.info.taskFunctionsProperties = [
+ { name: DEFAULT_TASK_NAME },
+ { name: 'fn1' }
+ ]
expect(() =>
threadWorkerNode.getTaskFunctionWorkerUsage('invalidTaskFunction')
).toThrow(
new TypeError(
- "Cannot get task function worker usage for task function name 'invalidTaskFunction' when task function names list has less than 3 elements"
+ "Cannot get task function worker usage for task function name 'invalidTaskFunction' when task function properties list has less than 3 elements"
)
)
- threadWorkerNode.info.taskFunctionNames = [DEFAULT_TASK_NAME, 'fn1', 'fn2']
+ threadWorkerNode.info.taskFunctionsProperties = [
+ { name: DEFAULT_TASK_NAME },
+ { name: 'fn1' },
+ { name: 'fn2' }
+ ]
expect(
threadWorkerNode.getTaskFunctionWorkerUsage(DEFAULT_TASK_NAME)
).toStrictEqual({
})
it('Worker node deleteTaskFunctionWorkerUsage()', () => {
- expect(threadWorkerNode.info.taskFunctionNames).toStrictEqual([
- DEFAULT_TASK_NAME,
- 'fn1',
- 'fn2'
+ expect(threadWorkerNode.info.taskFunctionsProperties).toStrictEqual([
+ { name: DEFAULT_TASK_NAME },
+ { name: 'fn1' },
+ { name: 'fn2' }
])
expect(threadWorkerNode.taskFunctionsUsage.size).toBe(2)
expect(
it('Verify that taskFunctions parameter with unique function is taken', () => {
const worker = new ThreadWorker(() => {})
- expect(worker.taskFunctions.get(DEFAULT_TASK_NAME)).toBeInstanceOf(Function)
- expect(worker.taskFunctions.get('fn1')).toBeInstanceOf(Function)
+ expect(worker.taskFunctions.get(DEFAULT_TASK_NAME)).toBeInstanceOf(Object)
+ expect(worker.taskFunctions.get('fn1')).toBeInstanceOf(Object)
expect(worker.taskFunctions.size).toBe(2)
expect(worker.taskFunctions.get(DEFAULT_TASK_NAME)).toStrictEqual(
worker.taskFunctions.get('fn1')
new TypeError('A taskFunctions parameter object key is an empty string')
)
expect(() => new ThreadWorker({ fn1, fn2 })).toThrow(
- new TypeError('A taskFunctions parameter object value is not a function')
+ new TypeError(
+ "taskFunction object 'taskFunction' property 'undefined' is not a function"
+ )
)
})
return 2
}
const worker = new ClusterWorker({ fn1, fn2 })
- expect(worker.taskFunctions.get(DEFAULT_TASK_NAME)).toBeInstanceOf(Function)
- expect(worker.taskFunctions.get('fn1')).toBeInstanceOf(Function)
- expect(worker.taskFunctions.get('fn2')).toBeInstanceOf(Function)
+ expect(worker.taskFunctions.get(DEFAULT_TASK_NAME)).toBeInstanceOf(Object)
+ expect(worker.taskFunctions.get('fn1')).toBeInstanceOf(Object)
+ expect(worker.taskFunctions.get('fn2')).toBeInstanceOf(Object)
expect(worker.taskFunctions.size).toBe(3)
expect(worker.taskFunctions.get(DEFAULT_TASK_NAME)).toStrictEqual(
worker.taskFunctions.get('fn1')
})
expect(worker.addTaskFunction('fn3', '')).toStrictEqual({
status: false,
- error: new TypeError('fn parameter is not a function')
+ error: new TypeError(
+ "taskFunction object 'taskFunction' property 'undefined' is not a function"
+ )
})
- expect(worker.taskFunctions.get(DEFAULT_TASK_NAME)).toBeInstanceOf(Function)
- expect(worker.taskFunctions.get('fn1')).toBeInstanceOf(Function)
+ expect(worker.taskFunctions.get(DEFAULT_TASK_NAME)).toBeInstanceOf(Object)
+ expect(worker.taskFunctions.get('fn1')).toBeInstanceOf(Object)
expect(worker.taskFunctions.size).toBe(2)
expect(worker.taskFunctions.get(DEFAULT_TASK_NAME)).toStrictEqual(
worker.taskFunctions.get('fn1')
)
})
worker.addTaskFunction('fn2', fn2)
- expect(worker.taskFunctions.get(DEFAULT_TASK_NAME)).toBeInstanceOf(Function)
- expect(worker.taskFunctions.get('fn1')).toBeInstanceOf(Function)
- expect(worker.taskFunctions.get('fn2')).toBeInstanceOf(Function)
+ expect(worker.taskFunctions.get(DEFAULT_TASK_NAME)).toBeInstanceOf(Object)
+ expect(worker.taskFunctions.get('fn1')).toBeInstanceOf(Object)
+ expect(worker.taskFunctions.get('fn2')).toBeInstanceOf(Object)
expect(worker.taskFunctions.size).toBe(3)
expect(worker.taskFunctions.get(DEFAULT_TASK_NAME)).toStrictEqual(
worker.taskFunctions.get('fn1')
)
worker.addTaskFunction('fn1', fn1Replacement)
- expect(worker.taskFunctions.get(DEFAULT_TASK_NAME)).toBeInstanceOf(Function)
- expect(worker.taskFunctions.get('fn1')).toBeInstanceOf(Function)
- expect(worker.taskFunctions.get('fn2')).toBeInstanceOf(Function)
+ expect(worker.taskFunctions.get(DEFAULT_TASK_NAME)).toBeInstanceOf(Object)
+ expect(worker.taskFunctions.get('fn1')).toBeInstanceOf(Object)
+ expect(worker.taskFunctions.get('fn2')).toBeInstanceOf(Object)
expect(worker.taskFunctions.size).toBe(3)
expect(worker.taskFunctions.get(DEFAULT_TASK_NAME)).toStrictEqual(
worker.taskFunctions.get('fn1')
return 2
}
const worker = new ClusterWorker({ fn1, fn2 })
- expect(worker.listTaskFunctionNames()).toStrictEqual([
- DEFAULT_TASK_NAME,
- 'fn1',
- 'fn2'
+ expect(worker.listTaskFunctionsProperties()).toStrictEqual([
+ { name: DEFAULT_TASK_NAME },
+ { name: 'fn1' },
+ { name: 'fn2' }
])
})
status: false,
error: new TypeError('name parameter is an empty string')
})
- expect(worker.taskFunctions.get(DEFAULT_TASK_NAME)).toBeInstanceOf(Function)
- expect(worker.taskFunctions.get('fn1')).toBeInstanceOf(Function)
- expect(worker.taskFunctions.get('fn2')).toBeInstanceOf(Function)
+ expect(worker.taskFunctions.get(DEFAULT_TASK_NAME)).toBeInstanceOf(Object)
+ expect(worker.taskFunctions.get('fn1')).toBeInstanceOf(Object)
+ expect(worker.taskFunctions.get('fn2')).toBeInstanceOf(Object)
expect(worker.taskFunctions.size).toBe(3)
expect(worker.taskFunctions.get(DEFAULT_TASK_NAME)).toStrictEqual(
worker.taskFunctions.get('fn1')
status: false,
error: new TypeError('name parameter is an empty string')
})
- expect(worker.taskFunctions.get(DEFAULT_TASK_NAME)).toBeInstanceOf(Function)
- expect(worker.taskFunctions.get('fn1')).toBeInstanceOf(Function)
- expect(worker.taskFunctions.get('fn2')).toBeInstanceOf(Function)
+ expect(worker.taskFunctions.get(DEFAULT_TASK_NAME)).toBeInstanceOf(Object)
+ expect(worker.taskFunctions.get('fn1')).toBeInstanceOf(Object)
+ expect(worker.taskFunctions.get('fn2')).toBeInstanceOf(Object)
expect(worker.taskFunctions.size).toBe(3)
expect(worker.taskFunctions.get(DEFAULT_TASK_NAME)).toStrictEqual(
worker.taskFunctions.get('fn1')
)
})
worker.removeTaskFunction('fn2')
- expect(worker.taskFunctions.get(DEFAULT_TASK_NAME)).toBeInstanceOf(Function)
- expect(worker.taskFunctions.get('fn1')).toBeInstanceOf(Function)
+ expect(worker.taskFunctions.get(DEFAULT_TASK_NAME)).toBeInstanceOf(Object)
+ expect(worker.taskFunctions.get('fn1')).toBeInstanceOf(Object)
expect(worker.taskFunctions.get('fn2')).toBeUndefined()
expect(worker.taskFunctions.size).toBe(2)
expect(worker.getMainWorker.calledTwice).toBe(true)
status: false,
error: new TypeError('name parameter is an empty string')
})
- expect(worker.taskFunctions.get(DEFAULT_TASK_NAME)).toBeInstanceOf(Function)
- expect(worker.taskFunctions.get('fn1')).toBeInstanceOf(Function)
- expect(worker.taskFunctions.get('fn2')).toBeInstanceOf(Function)
+ expect(worker.taskFunctions.get(DEFAULT_TASK_NAME)).toBeInstanceOf(Object)
+ expect(worker.taskFunctions.get('fn1')).toBeInstanceOf(Object)
+ expect(worker.taskFunctions.get('fn2')).toBeInstanceOf(Object)
expect(worker.taskFunctions.size).toBe(3)
expect(worker.taskFunctions.get(DEFAULT_TASK_NAME)).toStrictEqual(
worker.taskFunctions.get('fn1')
)
})
worker.removeTaskFunction('fn2')
- expect(worker.taskFunctions.get(DEFAULT_TASK_NAME)).toBeInstanceOf(Function)
- expect(worker.taskFunctions.get('fn1')).toBeInstanceOf(Function)
+ expect(worker.taskFunctions.get(DEFAULT_TASK_NAME)).toBeInstanceOf(Object)
+ expect(worker.taskFunctions.get('fn1')).toBeInstanceOf(Object)
expect(worker.taskFunctions.get('fn2')).toBeUndefined()
expect(worker.taskFunctions.size).toBe(2)
expect(worker.port.postMessage.calledOnce).toBe(true)