workerNodeKey: number,
message: MessageValue<Data>
): Promise<boolean> {
- const workerId = this.getWorkerInfo(workerNodeKey).id as number
return await new Promise<boolean>((resolve, reject) => {
+ const workerId = this.getWorkerInfo(workerNodeKey).id as number
this.registerWorkerMessageListener(workerNodeKey, message => {
if (
message.workerId === workerId &&
}
private async sendTaskFunctionOperationToWorkers (
- message: Omit<MessageValue<Data>, 'workerId'>
+ message: MessageValue<Data>
): Promise<boolean> {
return await new Promise<boolean>((resolve, reject) => {
const responsesReceived = new Array<MessageValue<Data | Response>>()
if (typeof fn !== 'function') {
throw new TypeError('fn argument must be a function')
}
- this.taskFunctions.set(name, fn)
- return await this.sendTaskFunctionOperationToWorkers({
+ const opResult = await this.sendTaskFunctionOperationToWorkers({
taskFunctionOperation: 'add',
taskFunctionName: name,
taskFunction: fn.toString()
})
+ this.taskFunctions.set(name, fn)
+ return opResult
}
/** @inheritDoc */
'Cannot remove a task function not handled on the pool side'
)
}
- this.taskFunctions.delete(name)
- return await this.sendTaskFunctionOperationToWorkers({
+ const opResult = await this.sendTaskFunctionOperationToWorkers({
taskFunctionOperation: 'remove',
taskFunctionName: name
})
+ this.deleteTaskFunctionWorkerUsages(name)
+ this.taskFunctions.delete(name)
+ return opResult
}
/** @inheritDoc */
})
}
+ private deleteTaskFunctionWorkerUsages (name: string): void {
+ for (const workerNode of this.workerNodes) {
+ workerNode.deleteTaskFunctionWorkerUsage(name)
+ }
+ }
+
private shallExecuteTask (workerNodeKey: number): boolean {
return (
this.tasksQueueSize(workerNodeKey) === 0 &&
return this.taskFunctionsUsage.get(name)
}
+ /** @inheritdoc */
+ public deleteTaskFunctionWorkerUsage (name: string): boolean {
+ return this.taskFunctionsUsage.delete(name)
+ }
+
private async startOnEmptyQueue (): Promise<void> {
if (
this.onEmptyQueueCount > 0 &&
* @returns The task function worker usage statistics if the task function worker usage statistics are initialized, `undefined` otherwise.
*/
readonly getTaskFunctionWorkerUsage: (name: string) => WorkerUsage | undefined
+ /**
+ * Deletes task function worker usage statistics.
+ *
+ * @param name - The task function name.
+ * @returns `true` if the task function worker usage statistics were deleted, `false` otherwise.
+ */
+ readonly deleteTaskFunctionWorkerUsage: (name: string) => boolean
}
new Function(`return ${taskFunction as string}`)() as TaskFunction<
Data,
Response
- > /* NOSONAR */
+ >
)
} else if (taskFunctionOperation === 'remove') {
response = this.removeTaskFunction(taskFunctionName as string)
this.sendToMainWorker({
taskFunctionOperation,
taskFunctionOperationStatus: response.status,
- workerError: {
- name: taskFunctionName as string,
- message: this.handleError(response.error as Error | string)
- }
+ taskFunctionName,
+ ...(!response.status &&
+ response?.error != null && {
+ workerError: {
+ name: taskFunctionName as string,
+ message: this.handleError(response.error as Error | string)
+ }
+ })
})
}
const taskFunctionData = { test: 'test' }
const echoResult = await dynamicThreadPool.execute(taskFunctionData, 'echo')
expect(echoResult).toStrictEqual(taskFunctionData)
+ for (const workerNode of dynamicThreadPool.workerNodes) {
+ expect(workerNode.getTaskFunctionWorkerUsage('echo')).toStrictEqual({
+ tasks: {
+ executed: expect.any(Number),
+ executing: 0,
+ queued: 0,
+ stolen: 0,
+ failed: 0
+ },
+ runTime: {
+ history: new CircularArray()
+ },
+ waitTime: {
+ history: new CircularArray()
+ },
+ elu: {
+ idle: {
+ history: new CircularArray()
+ },
+ active: {
+ history: new CircularArray()
+ }
+ }
+ })
+ }
await dynamicThreadPool.destroy()
})
)
const workerNodeKey = 0
await expect(
- pool.sendKillMessageToWorker(
- workerNodeKey,
- pool.workerNodes[workerNodeKey].info.id
- )
+ pool.sendKillMessageToWorker(workerNodeKey)
).resolves.toBeUndefined()
await pool.destroy()
})
+
+ it('Verify sendTaskFunctionOperationToWorker()', async () => {
+ const pool = new DynamicClusterPool(
+ Math.floor(numberOfWorkers / 2),
+ numberOfWorkers,
+ './tests/worker-files/cluster/testWorker.js'
+ )
+ const workerNodeKey = 0
+ await expect(
+ pool.sendTaskFunctionOperationToWorker(workerNodeKey, {
+ taskFunctionOperation: 'add',
+ taskFunctionName: 'empty',
+ taskFunction: (() => {}).toString()
+ })
+ ).resolves.toBe(true)
+ expect(
+ pool.workerNodes[workerNodeKey].info.taskFunctionNames
+ ).toStrictEqual([DEFAULT_TASK_NAME, 'test', 'empty'])
+ await pool.destroy()
+ })
+
+ it('Verify sendTaskFunctionOperationToWorkers()', async () => {
+ const pool = new DynamicClusterPool(
+ Math.floor(numberOfWorkers / 2),
+ numberOfWorkers,
+ './tests/worker-files/cluster/testWorker.js'
+ )
+ await waitPoolEvents(pool, PoolEvents.ready, 1)
+ await expect(
+ pool.sendTaskFunctionOperationToWorkers({
+ taskFunctionOperation: 'add',
+ taskFunctionName: 'empty',
+ taskFunction: (() => {}).toString()
+ })
+ ).resolves.toBe(true)
+ for (const workerNode of pool.workerNodes) {
+ expect(workerNode.info.taskFunctionNames).toStrictEqual([
+ DEFAULT_TASK_NAME,
+ 'test',
+ 'empty'
+ ])
+ }
+ await pool.destroy()
+ })
})
})
expect(threadWorkerNode.taskFunctionsUsage.size).toBe(2)
})
+
+ it('Worker node deleteTaskFunctionWorkerUsage()', () => {
+ expect(threadWorkerNode.info.taskFunctionNames).toStrictEqual([
+ DEFAULT_TASK_NAME,
+ 'fn1',
+ 'fn2'
+ ])
+ expect(threadWorkerNode.taskFunctionsUsage.size).toBe(2)
+ expect(
+ threadWorkerNode.deleteTaskFunctionWorkerUsage('invalidTaskFunction')
+ ).toBe(false)
+ expect(threadWorkerNode.taskFunctionsUsage.size).toBe(2)
+ expect(threadWorkerNode.deleteTaskFunctionWorkerUsage('fn1')).toBe(true)
+ expect(threadWorkerNode.taskFunctionsUsage.size).toBe(1)
+ })
})