+// eslint-disable-next-line n/no-unsupported-features/node-builtins
+import { createHook, executionAsyncId } from 'node:async_hooks'
import { EventEmitterAsyncResource } from 'node:events'
-import { dirname, join } from 'node:path'
import { readFileSync } from 'node:fs'
+import { dirname, join } from 'node:path'
import { fileURLToPath } from 'node:url'
-import { createHook, executionAsyncId } from 'node:async_hooks'
+
import { expect } from 'expect'
import { restore, stub } from 'sinon'
+
+import { CircularArray } from '../../lib/circular-array.cjs'
import {
DynamicClusterPool,
DynamicThreadPool,
PoolTypes,
WorkerChoiceStrategies,
WorkerTypes
-} from '../../lib/index.js'
-import { CircularArray } from '../../lib/circular-array.js'
-import { Deque } from '../../lib/deque.js'
-import { DEFAULT_TASK_NAME } from '../../lib/utils.js'
-import { waitPoolEvents } from '../test-utils.js'
-import { WorkerNode } from '../../lib/pools/worker-node.js'
+} from '../../lib/index.cjs'
+import { WorkerNode } from '../../lib/pools/worker-node.cjs'
+import { PriorityQueue } from '../../lib/priority-queue.cjs'
+import { DEFAULT_TASK_NAME } from '../../lib/utils.cjs'
+import { waitPoolEvents } from '../test-utils.cjs'
describe('Abstract pool test suite', () => {
const version = JSON.parse(
it('Verify that a negative number of workers is checked', () => {
expect(
() =>
- new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.js')
+ new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.cjs')
).toThrow(
new RangeError(
'Cannot instantiate a pool with a negative number of workers'
new DynamicClusterPool(
1,
undefined,
- './tests/worker-files/cluster/testWorker.js'
+ './tests/worker-files/cluster/testWorker.cjs'
)
).toThrow(
new TypeError(
new DynamicClusterPool(
0,
0.5,
- './tests/worker-files/cluster/testWorker.js'
+ './tests/worker-files/cluster/testWorker.cjs'
)
).toThrow(
new TypeError(
new DynamicClusterPool(
1,
1,
- './tests/worker-files/cluster/testWorker.js'
+ './tests/worker-files/cluster/testWorker.cjs'
)
).toThrow(
new RangeError(
'./tests/worker-files/thread/testWorker.mjs'
)
expect(pool.emitter).toBeInstanceOf(EventEmitterAsyncResource)
+ expect(pool.emitter.eventNames()).toStrictEqual([])
expect(pool.opts).toStrictEqual({
startWorkers: true,
enableEvents: true,
enableTasksQueue: false,
workerChoiceStrategy: WorkerChoiceStrategies.ROUND_ROBIN
})
- expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
- retries:
- pool.info.maxSize +
- Object.keys(pool.workerChoiceStrategyContext.opts.weights).length,
- runTime: { median: false },
- waitTime: { median: false },
- elu: { median: false },
- weights: expect.objectContaining({
- 0: expect.any(Number),
- [pool.info.maxSize - 1]: expect.any(Number)
- })
- })
- for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
+ for (const [, workerChoiceStrategy] of pool.workerChoiceStrategiesContext
.workerChoiceStrategies) {
- expect(workerChoiceStrategy.opts).toStrictEqual(
- expect.objectContaining({
- retries:
- pool.info.maxSize +
- Object.keys(workerChoiceStrategy.opts.weights).length,
- runTime: { median: false },
- waitTime: { median: false },
- elu: { median: false }
+ expect(workerChoiceStrategy.opts).toStrictEqual({
+ runTime: { median: false },
+ waitTime: { median: false },
+ elu: { median: false },
+ weights: expect.objectContaining({
+ 0: expect.any(Number),
+ [pool.info.maxSize - 1]: expect.any(Number)
})
- )
+ })
}
await pool.destroy()
const testHandler = () => console.info('test handler executed')
concurrency: 2,
size: Math.pow(numberOfWorkers, 2),
taskStealing: true,
- tasksStealingOnBackPressure: true,
+ tasksStealingOnBackPressure: false,
tasksFinishedTimeout: 2000
},
workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
errorHandler: testHandler,
exitHandler: testHandler
})
- expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
- retries:
- pool.info.maxSize +
- Object.keys(pool.opts.workerChoiceStrategyOptions.weights).length,
- runTime: { median: true },
- waitTime: { median: false },
- elu: { median: false },
- weights: { 0: 300, 1: 200 }
- })
- for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
+ for (const [, workerChoiceStrategy] of pool.workerChoiceStrategiesContext
.workerChoiceStrategies) {
expect(workerChoiceStrategy.opts).toStrictEqual({
- retries:
- pool.info.maxSize +
- Object.keys(pool.opts.workerChoiceStrategyOptions.weights).length,
runTime: { median: true },
waitTime: { median: false },
elu: { median: false },
{ workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
)
expect(pool.opts.workerChoiceStrategyOptions).toBeUndefined()
- expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
- retries:
- pool.info.maxSize +
- Object.keys(pool.workerChoiceStrategyContext.opts.weights).length,
- runTime: { median: false },
- waitTime: { median: false },
- elu: { median: false },
- weights: expect.objectContaining({
- 0: expect.any(Number),
- [pool.info.maxSize - 1]: expect.any(Number)
- })
- })
- for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
+ for (const [, workerChoiceStrategy] of pool.workerChoiceStrategiesContext
.workerChoiceStrategies) {
- expect(workerChoiceStrategy.opts).toStrictEqual(
- expect.objectContaining({
- retries:
- pool.info.maxSize +
- Object.keys(workerChoiceStrategy.opts.weights).length,
- runTime: { median: false },
- waitTime: { median: false },
- elu: { median: false }
+ expect(workerChoiceStrategy.opts).toStrictEqual({
+ runTime: { median: false },
+ waitTime: { median: false },
+ elu: { median: false },
+ weights: expect.objectContaining({
+ 0: expect.any(Number),
+ [pool.info.maxSize - 1]: expect.any(Number)
})
- )
+ })
}
expect(
- pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
+ pool.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
).toStrictEqual({
runTime: {
aggregate: true,
median: false
},
waitTime: {
- aggregate: false,
- average: false,
+ aggregate: true,
+ average: true,
median: false
},
elu: {
runTime: { median: true },
elu: { median: true }
})
- expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
- retries:
- pool.info.maxSize +
- Object.keys(pool.workerChoiceStrategyContext.opts.weights).length,
- runTime: { median: true },
- waitTime: { median: false },
- elu: { median: true },
- weights: expect.objectContaining({
- 0: expect.any(Number),
- [pool.info.maxSize - 1]: expect.any(Number)
- })
- })
- for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
+ for (const [, workerChoiceStrategy] of pool.workerChoiceStrategiesContext
.workerChoiceStrategies) {
- expect(workerChoiceStrategy.opts).toStrictEqual(
- expect.objectContaining({
- retries:
- pool.info.maxSize +
- Object.keys(workerChoiceStrategy.opts.weights).length,
- runTime: { median: true },
- waitTime: { median: false },
- elu: { median: true }
+ expect(workerChoiceStrategy.opts).toStrictEqual({
+ runTime: { median: true },
+ waitTime: { median: false },
+ elu: { median: true },
+ weights: expect.objectContaining({
+ 0: expect.any(Number),
+ [pool.info.maxSize - 1]: expect.any(Number)
})
- )
+ })
}
expect(
- pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
+ pool.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
).toStrictEqual({
runTime: {
aggregate: true,
median: true
},
waitTime: {
- aggregate: false,
- average: false,
+ aggregate: true,
+ average: true,
median: false
},
elu: {
runTime: { median: false },
elu: { median: false }
})
- expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
- retries:
- pool.info.maxSize +
- Object.keys(pool.workerChoiceStrategyContext.opts.weights).length,
- runTime: { median: false },
- waitTime: { median: false },
- elu: { median: false },
- weights: expect.objectContaining({
- 0: expect.any(Number),
- [pool.info.maxSize - 1]: expect.any(Number)
- })
- })
- for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
+ for (const [, workerChoiceStrategy] of pool.workerChoiceStrategiesContext
.workerChoiceStrategies) {
- expect(workerChoiceStrategy.opts).toStrictEqual(
- expect.objectContaining({
- retries:
- pool.info.maxSize +
- Object.keys(workerChoiceStrategy.opts.weights).length,
- runTime: { median: false },
- waitTime: { median: false },
- elu: { median: false }
+ expect(workerChoiceStrategy.opts).toStrictEqual({
+ runTime: { median: false },
+ waitTime: { median: false },
+ elu: { median: false },
+ weights: expect.objectContaining({
+ 0: expect.any(Number),
+ [pool.info.maxSize - 1]: expect.any(Number)
})
- )
+ })
}
expect(
- pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
+ pool.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
).toStrictEqual({
runTime: {
aggregate: true,
median: false
},
waitTime: {
- aggregate: false,
- average: false,
+ aggregate: true,
+ average: true,
median: false
},
elu: {
concurrency: 1,
size: Math.pow(numberOfWorkers, 2),
taskStealing: true,
- tasksStealingOnBackPressure: true,
+ tasksStealingOnBackPressure: false,
tasksFinishedTimeout: 2000
})
pool.enableTasksQueue(true, { concurrency: 2 })
concurrency: 2,
size: Math.pow(numberOfWorkers, 2),
taskStealing: true,
- tasksStealingOnBackPressure: true,
+ tasksStealingOnBackPressure: false,
tasksFinishedTimeout: 2000
})
pool.enableTasksQueue(false)
concurrency: 1,
size: Math.pow(numberOfWorkers, 2),
taskStealing: true,
- tasksStealingOnBackPressure: true,
+ tasksStealingOnBackPressure: false,
tasksFinishedTimeout: 2000
})
for (const workerNode of pool.workerNodes) {
worker: WorkerTypes.thread,
started: true,
ready: true,
- strategy: WorkerChoiceStrategies.ROUND_ROBIN,
+ defaultStrategy: WorkerChoiceStrategies.ROUND_ROBIN,
+ strategyRetries: 0,
minSize: numberOfWorkers,
maxSize: numberOfWorkers,
workerNodes: numberOfWorkers,
pool = new DynamicClusterPool(
Math.floor(numberOfWorkers / 2),
numberOfWorkers,
- './tests/worker-files/cluster/testWorker.js'
+ './tests/worker-files/cluster/testWorker.cjs'
)
expect(pool.info).toStrictEqual({
version,
worker: WorkerTypes.cluster,
started: true,
ready: true,
- strategy: WorkerChoiceStrategies.ROUND_ROBIN,
+ defaultStrategy: WorkerChoiceStrategies.ROUND_ROBIN,
+ strategyRetries: 0,
minSize: Math.floor(numberOfWorkers / 2),
maxSize: numberOfWorkers,
workerNodes: Math.floor(numberOfWorkers / 2),
it('Verify that pool worker tasks usage are initialized', async () => {
const pool = new FixedClusterPool(
numberOfWorkers,
- './tests/worker-files/cluster/testWorker.js'
+ './tests/worker-files/cluster/testWorker.cjs'
)
for (const workerNode of pool.workerNodes) {
expect(workerNode).toBeInstanceOf(WorkerNode)
it('Verify that pool worker tasks queue are initialized', async () => {
let pool = new FixedClusterPool(
numberOfWorkers,
- './tests/worker-files/cluster/testWorker.js'
+ './tests/worker-files/cluster/testWorker.cjs'
)
for (const workerNode of pool.workerNodes) {
expect(workerNode).toBeInstanceOf(WorkerNode)
- expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
+ expect(workerNode.tasksQueue).toBeInstanceOf(PriorityQueue)
expect(workerNode.tasksQueue.size).toBe(0)
expect(workerNode.tasksQueue.maxSize).toBe(0)
+ expect(workerNode.tasksQueue.k).toBe(numberOfWorkers * 2)
}
await pool.destroy()
pool = new DynamicThreadPool(
)
for (const workerNode of pool.workerNodes) {
expect(workerNode).toBeInstanceOf(WorkerNode)
- expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
+ expect(workerNode.tasksQueue).toBeInstanceOf(PriorityQueue)
expect(workerNode.tasksQueue.size).toBe(0)
expect(workerNode.tasksQueue.maxSize).toBe(0)
+ expect(workerNode.tasksQueue.k).toBe(numberOfWorkers * 2)
}
await pool.destroy()
})
it('Verify that pool worker info are initialized', async () => {
let pool = new FixedClusterPool(
numberOfWorkers,
- './tests/worker-files/cluster/testWorker.js'
+ './tests/worker-files/cluster/testWorker.cjs'
)
for (const workerNode of pool.workerNodes) {
expect(workerNode).toBeInstanceOf(WorkerNode)
id: expect.any(Number),
type: WorkerTypes.cluster,
dynamic: false,
- ready: true
+ ready: true,
+ stealing: false,
+ backPressure: false
})
}
await pool.destroy()
id: expect.any(Number),
type: WorkerTypes.thread,
dynamic: false,
- ready: true
+ ready: true,
+ stealing: false,
+ backPressure: false
})
}
await pool.destroy()
it('Verify that pool can be started after initialization', async () => {
const pool = new FixedClusterPool(
numberOfWorkers,
- './tests/worker-files/cluster/testWorker.js',
+ './tests/worker-files/cluster/testWorker.cjs',
{
startWorkers: false
}
)
expect(pool.info.started).toBe(false)
expect(pool.info.ready).toBe(false)
- expect(pool.readyEventEmitted).toBe(false)
expect(pool.workerNodes).toStrictEqual([])
+ expect(pool.readyEventEmitted).toBe(false)
await expect(pool.execute()).rejects.toThrow(
new Error('Cannot execute a task on not started pool')
)
it('Verify that pool execute() arguments are checked', async () => {
const pool = new FixedClusterPool(
numberOfWorkers,
- './tests/worker-files/cluster/testWorker.js'
+ './tests/worker-files/cluster/testWorker.cjs'
)
await expect(pool.execute(undefined, 0)).rejects.toThrow(
new TypeError('name argument must be a string')
it('Verify that pool worker tasks usage are computed', async () => {
const pool = new FixedClusterPool(
numberOfWorkers,
- './tests/worker-files/cluster/testWorker.js'
+ './tests/worker-files/cluster/testWorker.cjs'
)
const promises = new Set()
const maxMultiplier = 2
await pool.destroy()
})
- it('Verify that pool worker tasks usage are reset at worker choice strategy change', async () => {
+ it("Verify that pool worker tasks usage aren't reset at worker choice strategy change", async () => {
const pool = new DynamicThreadPool(
Math.floor(numberOfWorkers / 2),
numberOfWorkers,
for (const workerNode of pool.workerNodes) {
expect(workerNode.usage).toStrictEqual({
tasks: {
- executed: 0,
+ executed: expect.any(Number),
executing: 0,
queued: 0,
maxQueued: 0,
}
}
})
+ expect(workerNode.usage.tasks.executed).toBeGreaterThan(0)
+ expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
+ numberOfWorkers * maxMultiplier
+ )
expect(workerNode.usage.runTime.history.length).toBe(0)
expect(workerNode.usage.waitTime.history.length).toBe(0)
expect(workerNode.usage.elu.idle.history.length).toBe(0)
const pool = new DynamicClusterPool(
Math.floor(numberOfWorkers / 2),
numberOfWorkers,
- './tests/worker-files/cluster/testWorker.js'
+ './tests/worker-files/cluster/testWorker.cjs'
)
expect(pool.emitter.eventNames()).toStrictEqual([])
let poolInfo
worker: WorkerTypes.cluster,
started: true,
ready: true,
- strategy: WorkerChoiceStrategies.ROUND_ROBIN,
+ defaultStrategy: WorkerChoiceStrategies.ROUND_ROBIN,
+ strategyRetries: expect.any(Number),
minSize: expect.any(Number),
maxSize: expect.any(Number),
workerNodes: expect.any(Number),
worker: WorkerTypes.thread,
started: true,
ready: true,
- strategy: WorkerChoiceStrategies.ROUND_ROBIN,
+ defaultStrategy: WorkerChoiceStrategies.ROUND_ROBIN,
+ strategyRetries: expect.any(Number),
minSize: expect.any(Number),
maxSize: expect.any(Number),
workerNodes: expect.any(Number),
worker: WorkerTypes.thread,
started: true,
ready: true,
- strategy: WorkerChoiceStrategies.ROUND_ROBIN,
+ defaultStrategy: WorkerChoiceStrategies.ROUND_ROBIN,
+ strategyRetries: expect.any(Number),
minSize: expect.any(Number),
maxSize: expect.any(Number),
workerNodes: expect.any(Number),
worker: WorkerTypes.thread,
started: true,
ready: true,
- strategy: WorkerChoiceStrategies.ROUND_ROBIN,
+ defaultStrategy: WorkerChoiceStrategies.ROUND_ROBIN,
+ strategyRetries: expect.any(Number),
minSize: expect.any(Number),
maxSize: expect.any(Number),
workerNodes: expect.any(Number),
idleWorkerNodes: expect.any(Number),
+ stealingWorkerNodes: expect.any(Number),
busyWorkerNodes: expect.any(Number),
executedTasks: expect.any(Number),
executingTasks: expect.any(Number),
stolenTasks: expect.any(Number),
failedTasks: expect.any(Number)
})
- expect(pool.hasBackPressure.callCount).toBe(5)
+ expect(pool.hasBackPressure.callCount).toBeGreaterThanOrEqual(7)
await pool.destroy()
})
const startTime = performance.now()
await pool.destroy()
const elapsedTime = performance.now() - startTime
- expect(tasksFinished).toBeGreaterThanOrEqual(numberOfWorkers)
expect(tasksFinished).toBeLessThanOrEqual(numberOfWorkers * maxMultiplier)
expect(elapsedTime).toBeGreaterThanOrEqual(2000)
- expect(elapsedTime).toBeLessThanOrEqual(tasksFinishedTimeout + 100)
+ expect(elapsedTime).toBeLessThanOrEqual(tasksFinishedTimeout + 800)
})
it('Verify that destroy() waits until the tasks finished timeout is reached', async () => {
await dynamicThreadPool.destroy()
const fixedClusterPool = new FixedClusterPool(
numberOfWorkers,
- './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
+ './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.cjs'
)
await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
expect(fixedClusterPool.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true)
new TypeError('name argument must not be an empty string')
)
await expect(dynamicThreadPool.addTaskFunction('test', 0)).rejects.toThrow(
- new TypeError('fn argument must be a function')
+ new TypeError('taskFunction property must be a function')
)
await expect(dynamicThreadPool.addTaskFunction('test', '')).rejects.toThrow(
- new TypeError('fn argument must be a function')
+ new TypeError('taskFunction property must be a function')
+ )
+ await expect(
+ dynamicThreadPool.addTaskFunction('test', { taskFunction: 0 })
+ ).rejects.toThrow(new TypeError('taskFunction property must be a function'))
+ await expect(
+ dynamicThreadPool.addTaskFunction('test', { taskFunction: '' })
+ ).rejects.toThrow(new TypeError('taskFunction property must be a function'))
+ await expect(
+ dynamicThreadPool.addTaskFunction('test', {
+ taskFunction: () => {},
+ priority: -21
+ })
+ ).rejects.toThrow(
+ new RangeError("Property 'priority' must be between -20 and 19")
+ )
+ await expect(
+ dynamicThreadPool.addTaskFunction('test', {
+ taskFunction: () => {},
+ priority: 20
+ })
+ ).rejects.toThrow(
+ new RangeError("Property 'priority' must be between -20 and 19")
+ )
+ await expect(
+ dynamicThreadPool.addTaskFunction('test', {
+ taskFunction: () => {},
+ strategy: 'invalidStrategy'
+ })
+ ).rejects.toThrow(
+ new Error("Invalid worker choice strategy 'invalidStrategy'")
)
- expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
- DEFAULT_TASK_NAME,
- 'test'
+ expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([
+ { name: DEFAULT_TASK_NAME },
+ { name: 'test' }
])
+ expect([
+ ...dynamicThreadPool.workerChoiceStrategiesContext.workerChoiceStrategies.keys()
+ ]).toStrictEqual([WorkerChoiceStrategies.ROUND_ROBIN])
const echoTaskFunction = data => {
return data
}
await expect(
- dynamicThreadPool.addTaskFunction('echo', echoTaskFunction)
+ dynamicThreadPool.addTaskFunction('echo', {
+ taskFunction: echoTaskFunction,
+ strategy: WorkerChoiceStrategies.LEAST_ELU
+ })
).resolves.toBe(true)
expect(dynamicThreadPool.taskFunctions.size).toBe(1)
- expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual(
- echoTaskFunction
- )
- expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
- DEFAULT_TASK_NAME,
- 'test',
- 'echo'
+ expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual({
+ taskFunction: echoTaskFunction,
+ strategy: WorkerChoiceStrategies.LEAST_ELU
+ })
+ expect([
+ ...dynamicThreadPool.workerChoiceStrategiesContext.workerChoiceStrategies.keys()
+ ]).toStrictEqual([
+ WorkerChoiceStrategies.ROUND_ROBIN,
+ WorkerChoiceStrategies.LEAST_ELU
+ ])
+ expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([
+ { name: DEFAULT_TASK_NAME },
+ { name: 'test' },
+ { name: 'echo', strategy: WorkerChoiceStrategies.LEAST_ELU }
])
const taskFunctionData = { test: 'test' }
const echoResult = await dynamicThreadPool.execute(taskFunctionData, 'echo')
waitTime: {
history: new CircularArray()
},
- elu: {
- idle: {
- history: new CircularArray()
- },
- active: {
- history: new CircularArray()
- }
- }
+ elu: expect.objectContaining({
+ idle: expect.objectContaining({
+ history: expect.any(CircularArray)
+ }),
+ active: expect.objectContaining({
+ history: expect.any(CircularArray)
+ })
+ })
})
+ expect(
+ workerNode.getTaskFunctionWorkerUsage('echo').tasks.executed
+ ).toBeGreaterThan(0)
+ if (
+ workerNode.getTaskFunctionWorkerUsage('echo').elu.active.aggregate ==
+ null
+ ) {
+ expect(
+ workerNode.getTaskFunctionWorkerUsage('echo').elu.active.aggregate
+ ).toBeUndefined()
+ } else {
+ expect(
+ workerNode.getTaskFunctionWorkerUsage('echo').elu.active.aggregate
+ ).toBeGreaterThan(0)
+ }
+ if (
+ workerNode.getTaskFunctionWorkerUsage('echo').elu.idle.aggregate == null
+ ) {
+ expect(
+ workerNode.getTaskFunctionWorkerUsage('echo').elu.idle.aggregate
+ ).toBeUndefined()
+ } else {
+ expect(
+ workerNode.getTaskFunctionWorkerUsage('echo').elu.idle.aggregate
+ ).toBeGreaterThanOrEqual(0)
+ }
+ if (
+ workerNode.getTaskFunctionWorkerUsage('echo').elu.utilization == null
+ ) {
+ expect(
+ workerNode.getTaskFunctionWorkerUsage('echo').elu.utilization
+ ).toBeUndefined()
+ } else {
+ expect(
+ workerNode.getTaskFunctionWorkerUsage('echo').elu.utilization
+ ).toBeGreaterThanOrEqual(0)
+ expect(
+ workerNode.getTaskFunctionWorkerUsage('echo').elu.utilization
+ ).toBeLessThanOrEqual(1)
+ }
}
await dynamicThreadPool.destroy()
})
'./tests/worker-files/thread/testWorker.mjs'
)
await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
- expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
- DEFAULT_TASK_NAME,
- 'test'
+ expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([
+ { name: DEFAULT_TASK_NAME },
+ { name: 'test' }
])
await expect(dynamicThreadPool.removeTaskFunction('test')).rejects.toThrow(
new Error('Cannot remove a task function not handled on the pool side')
const echoTaskFunction = data => {
return data
}
- await dynamicThreadPool.addTaskFunction('echo', echoTaskFunction)
+ await dynamicThreadPool.addTaskFunction('echo', {
+ taskFunction: echoTaskFunction,
+ strategy: WorkerChoiceStrategies.LEAST_ELU
+ })
expect(dynamicThreadPool.taskFunctions.size).toBe(1)
- expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual(
- echoTaskFunction
- )
- expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
- DEFAULT_TASK_NAME,
- 'test',
- 'echo'
+ expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual({
+ taskFunction: echoTaskFunction,
+ strategy: WorkerChoiceStrategies.LEAST_ELU
+ })
+ expect([
+ ...dynamicThreadPool.workerChoiceStrategiesContext.workerChoiceStrategies.keys()
+ ]).toStrictEqual([
+ WorkerChoiceStrategies.ROUND_ROBIN,
+ WorkerChoiceStrategies.LEAST_ELU
+ ])
+ expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([
+ { name: DEFAULT_TASK_NAME },
+ { name: 'test' },
+ { name: 'echo', strategy: WorkerChoiceStrategies.LEAST_ELU }
])
await expect(dynamicThreadPool.removeTaskFunction('echo')).resolves.toBe(
true
)
expect(dynamicThreadPool.taskFunctions.size).toBe(0)
expect(dynamicThreadPool.taskFunctions.get('echo')).toBeUndefined()
- expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
- DEFAULT_TASK_NAME,
- 'test'
+ expect([
+ ...dynamicThreadPool.workerChoiceStrategiesContext.workerChoiceStrategies.keys()
+ ]).toStrictEqual([WorkerChoiceStrategies.ROUND_ROBIN])
+ expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([
+ { name: DEFAULT_TASK_NAME },
+ { name: 'test' }
])
await dynamicThreadPool.destroy()
})
- it('Verify that listTaskFunctionNames() is working', async () => {
+ it('Verify that listTaskFunctionsProperties() is working', async () => {
const dynamicThreadPool = new DynamicThreadPool(
Math.floor(numberOfWorkers / 2),
numberOfWorkers,
'./tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
)
await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
- expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
- DEFAULT_TASK_NAME,
- 'jsonIntegerSerialization',
- 'factorial',
- 'fibonacci'
+ expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([
+ { name: DEFAULT_TASK_NAME },
+ { name: 'jsonIntegerSerialization' },
+ { name: 'factorial' },
+ { name: 'fibonacci' }
])
await dynamicThreadPool.destroy()
const fixedClusterPool = new FixedClusterPool(
numberOfWorkers,
- './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
+ './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.cjs'
)
await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
- expect(fixedClusterPool.listTaskFunctionNames()).toStrictEqual([
- DEFAULT_TASK_NAME,
- 'jsonIntegerSerialization',
- 'factorial',
- 'fibonacci'
+ expect(fixedClusterPool.listTaskFunctionsProperties()).toStrictEqual([
+ { name: DEFAULT_TASK_NAME },
+ { name: 'jsonIntegerSerialization' },
+ { name: 'factorial' },
+ { name: 'fibonacci' }
])
await fixedClusterPool.destroy()
})
`Task function operation 'default' failed on worker ${workerId} with error: 'Error: Cannot set the default task function to a non-existing task function'`
)
)
- expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
- DEFAULT_TASK_NAME,
- 'jsonIntegerSerialization',
- 'factorial',
- 'fibonacci'
+ expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([
+ { name: DEFAULT_TASK_NAME },
+ { name: 'jsonIntegerSerialization' },
+ { name: 'factorial' },
+ { name: 'fibonacci' }
])
await expect(
dynamicThreadPool.setDefaultTaskFunction('factorial')
).resolves.toBe(true)
- expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
- DEFAULT_TASK_NAME,
- 'factorial',
- 'jsonIntegerSerialization',
- 'fibonacci'
+ expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([
+ { name: DEFAULT_TASK_NAME },
+ { name: 'factorial' },
+ { name: 'jsonIntegerSerialization' },
+ { name: 'fibonacci' }
])
await expect(
dynamicThreadPool.setDefaultTaskFunction('fibonacci')
).resolves.toBe(true)
- expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
- DEFAULT_TASK_NAME,
- 'fibonacci',
- 'jsonIntegerSerialization',
- 'factorial'
+ expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([
+ { name: DEFAULT_TASK_NAME },
+ { name: 'fibonacci' },
+ { name: 'jsonIntegerSerialization' },
+ { name: 'factorial' }
])
await dynamicThreadPool.destroy()
})
const pool = new DynamicClusterPool(
Math.floor(numberOfWorkers / 2),
numberOfWorkers,
- './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
+ './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.cjs'
)
const data = { n: 10 }
const result0 = await pool.execute(data)
expect(pool.info.executingTasks).toBe(0)
expect(pool.info.executedTasks).toBe(4)
for (const workerNode of pool.workerNodes) {
- expect(workerNode.info.taskFunctionNames).toStrictEqual([
- DEFAULT_TASK_NAME,
- 'jsonIntegerSerialization',
- 'factorial',
- 'fibonacci'
+ expect(workerNode.info.taskFunctionsProperties).toStrictEqual([
+ { name: DEFAULT_TASK_NAME },
+ { name: 'jsonIntegerSerialization' },
+ { name: 'factorial' },
+ { name: 'fibonacci' }
])
expect(workerNode.taskFunctionsUsage.size).toBe(3)
- for (const name of pool.listTaskFunctionNames()) {
- expect(workerNode.getTaskFunctionWorkerUsage(name)).toStrictEqual({
+ for (const taskFunctionProperties of pool.listTaskFunctionsProperties()) {
+ expect(
+ workerNode.getTaskFunctionWorkerUsage(taskFunctionProperties.name)
+ ).toStrictEqual({
tasks: {
executed: expect.any(Number),
executing: 0,
}
})
expect(
- workerNode.getTaskFunctionWorkerUsage(name).tasks.executed
+ workerNode.getTaskFunctionWorkerUsage(taskFunctionProperties.name)
+ .tasks.executed
).toBeGreaterThan(0)
}
expect(
workerNode.getTaskFunctionWorkerUsage(DEFAULT_TASK_NAME)
).toStrictEqual(
workerNode.getTaskFunctionWorkerUsage(
- workerNode.info.taskFunctionNames[1]
+ workerNode.info.taskFunctionsProperties[1].name
)
)
}
const pool = new DynamicClusterPool(
Math.floor(numberOfWorkers / 2),
numberOfWorkers,
- './tests/worker-files/cluster/testWorker.js'
+ './tests/worker-files/cluster/testWorker.cjs'
)
const workerNodeKey = 0
await expect(
const pool = new DynamicClusterPool(
Math.floor(numberOfWorkers / 2),
numberOfWorkers,
- './tests/worker-files/cluster/testWorker.js'
+ './tests/worker-files/cluster/testWorker.cjs'
)
const workerNodeKey = 0
await expect(
pool.sendTaskFunctionOperationToWorker(workerNodeKey, {
taskFunctionOperation: 'add',
- taskFunctionName: 'empty',
+ taskFunctionProperties: { name: 'empty' },
taskFunction: (() => {}).toString()
})
).resolves.toBe(true)
expect(
- pool.workerNodes[workerNodeKey].info.taskFunctionNames
- ).toStrictEqual([DEFAULT_TASK_NAME, 'test', 'empty'])
+ pool.workerNodes[workerNodeKey].info.taskFunctionsProperties
+ ).toStrictEqual([
+ { name: DEFAULT_TASK_NAME },
+ { name: 'test' },
+ { name: 'empty' }
+ ])
await pool.destroy()
})
const pool = new DynamicClusterPool(
Math.floor(numberOfWorkers / 2),
numberOfWorkers,
- './tests/worker-files/cluster/testWorker.js'
+ './tests/worker-files/cluster/testWorker.cjs'
)
await expect(
pool.sendTaskFunctionOperationToWorkers({
taskFunctionOperation: 'add',
- taskFunctionName: 'empty',
+ taskFunctionProperties: { name: 'empty' },
taskFunction: (() => {}).toString()
})
).resolves.toBe(true)
for (const workerNode of pool.workerNodes) {
- expect(workerNode.info.taskFunctionNames).toStrictEqual([
- DEFAULT_TASK_NAME,
- 'test',
- 'empty'
+ expect(workerNode.info.taskFunctionsProperties).toStrictEqual([
+ { name: DEFAULT_TASK_NAME },
+ { name: 'test' },
+ { name: 'empty' }
])
}
await pool.destroy()