import type {
MessageValue,
PromiseResponseWrapper,
- Task
+ Task,
+ TaskFunctionProperties
} from '../utility-types.js'
import {
average,
+ buildTaskFunctionProperties,
DEFAULT_TASK_NAME,
EMPTY_FUNCTION,
exponentialDelay,
round,
sleep
} from '../utils.js'
-import type { TaskFunction } from '../worker/task-functions.js'
+import type {
+ TaskFunction,
+ TaskFunctionObject
+} from '../worker/task-functions.js'
import { KillBehaviors } from '../worker/worker-options.js'
import {
type IPool,
/**
* The task functions added at runtime map:
* - `key`: The task function name.
- * - `value`: The task function itself.
+ * - `value`: The task function object.
*/
- private readonly taskFunctions: Map<string, TaskFunction<Data, Response>>
+ private readonly taskFunctions: Map<
+ string,
+ TaskFunctionObject<Data, Response>
+ >
/**
* Whether the pool is started or not.
this.setupHook()
- this.taskFunctions = new Map<string, TaskFunction<Data, Response>>()
+ this.taskFunctions = new Map<string, TaskFunctionObject<Data, Response>>()
this.started = false
this.starting = false
public hasTaskFunction (name: string): boolean {
for (const workerNode of this.workerNodes) {
if (
- Array.isArray(workerNode.info.taskFunctionNames) &&
- workerNode.info.taskFunctionNames.includes(name)
+ Array.isArray(workerNode.info.taskFunctionsProperties) &&
+ workerNode.info.taskFunctionsProperties.some(
+ taskFunctionProperties => taskFunctionProperties.name === name
+ )
) {
return true
}
/** @inheritDoc */
public async addTaskFunction (
name: string,
- fn: TaskFunction<Data, Response>
+ fn: TaskFunction<Data, Response> | TaskFunctionObject<Data, Response>
): Promise<boolean> {
if (typeof name !== 'string') {
throw new TypeError('name argument must be a string')
if (typeof name === 'string' && name.trim().length === 0) {
throw new TypeError('name argument must not be an empty string')
}
- if (typeof fn !== 'function') {
- throw new TypeError('fn argument must be a function')
+ if (typeof fn === 'function') {
+ fn = { taskFunction: fn } satisfies TaskFunctionObject<Data, Response>
+ }
+ if (typeof fn.taskFunction !== 'function') {
+ throw new TypeError('taskFunction property must be a function')
}
const opResult = await this.sendTaskFunctionOperationToWorkers({
taskFunctionOperation: 'add',
- taskFunctionName: name,
- taskFunction: fn.toString()
+ taskFunctionProperties: buildTaskFunctionProperties(name, fn),
+ taskFunction: fn.taskFunction.toString()
})
this.taskFunctions.set(name, fn)
return opResult
}
const opResult = await this.sendTaskFunctionOperationToWorkers({
taskFunctionOperation: 'remove',
- taskFunctionName: name
+ taskFunctionProperties: buildTaskFunctionProperties(
+ name,
+ this.taskFunctions.get(name)
+ )
})
this.deleteTaskFunctionWorkerUsages(name)
this.taskFunctions.delete(name)
}
/** @inheritDoc */
- public listTaskFunctionNames (): string[] {
+ public listTaskFunctionsProperties (): TaskFunctionProperties[] {
for (const workerNode of this.workerNodes) {
if (
- Array.isArray(workerNode.info.taskFunctionNames) &&
- workerNode.info.taskFunctionNames.length > 0
+ Array.isArray(workerNode.info.taskFunctionsProperties) &&
+ workerNode.info.taskFunctionsProperties.length > 0
) {
- return workerNode.info.taskFunctionNames
+ return workerNode.info.taskFunctionsProperties
}
}
return []
public async setDefaultTaskFunction (name: string): Promise<boolean> {
return await this.sendTaskFunctionOperationToWorkers({
taskFunctionOperation: 'default',
- taskFunctionName: name
+ taskFunctionProperties: buildTaskFunctionProperties(
+ name,
+ this.taskFunctions.get(name)
+ )
})
}
const workerInfo = this.getWorkerInfo(workerNodeKey)
return (
workerInfo != null &&
- Array.isArray(workerInfo.taskFunctionNames) &&
- workerInfo.taskFunctionNames.length > 2
+ Array.isArray(workerInfo.taskFunctionsProperties) &&
+ workerInfo.taskFunctionsProperties.length > 2
)
}
checkActive: true
})
if (this.taskFunctions.size > 0) {
- for (const [taskFunctionName, taskFunction] of this.taskFunctions) {
+ for (const [taskFunctionName, taskFunctionObject] of this.taskFunctions) {
this.sendTaskFunctionOperationToWorker(workerNodeKey, {
taskFunctionOperation: 'add',
- taskFunctionName,
- taskFunction: taskFunction.toString()
+ taskFunctionProperties: buildTaskFunctionProperties(
+ taskFunctionName,
+ taskFunctionObject
+ ),
+ taskFunction: taskFunctionObject.taskFunction.toString()
}).catch((error: unknown) => {
this.emitter?.emit(PoolEvents.error, error)
})
) {
workerInfo.stealing = false
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
- for (const taskName of workerInfo.taskFunctionNames!) {
+ for (const taskFunctionProperties of workerInfo.taskFunctionsProperties!) {
this.resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
workerNodeKey,
- taskName
+ taskFunctionProperties.name
)
}
this.resetTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey)
message: MessageValue<Response>
): void => {
this.checkMessageWorkerId(message)
- const { workerId, ready, taskId, taskFunctionNames } = message
- if (ready != null && taskFunctionNames != null) {
+ const { workerId, ready, taskId, taskFunctionsProperties } = message
+ if (ready != null && taskFunctionsProperties != null) {
// Worker ready response received from worker
this.handleWorkerReadyResponse(message)
- } else if (taskId != null) {
- // Task execution response received from worker
- this.handleTaskExecutionResponse(message)
- } else if (taskFunctionNames != null) {
- // Task function names message received from worker
+ } else if (taskFunctionsProperties != null) {
+ // Task function properties message received from worker
const workerInfo = this.getWorkerInfo(
this.getWorkerNodeKeyByWorkerId(workerId)
)
if (workerInfo != null) {
- workerInfo.taskFunctionNames = taskFunctionNames
+ workerInfo.taskFunctionsProperties = taskFunctionsProperties
}
+ } else if (taskId != null) {
+ // Task execution response received from worker
+ this.handleTaskExecutionResponse(message)
}
}
}
private handleWorkerReadyResponse (message: MessageValue<Response>): void {
- const { workerId, ready, taskFunctionNames } = message
+ const { workerId, ready, taskFunctionsProperties } = message
if (ready == null || !ready) {
throw new Error(`Worker ${workerId} failed to initialize`)
}
const workerNode =
this.workerNodes[this.getWorkerNodeKeyByWorkerId(workerId)]
workerNode.info.ready = ready
- workerNode.info.taskFunctionNames = taskFunctionNames
+ workerNode.info.taskFunctionsProperties = taskFunctionsProperties
this.checkAndEmitReadyEvent()
}