PoolTypes,
WorkerChoiceStrategies,
WorkerTypes
-} from '../../lib/index.js'
-import { CircularArray } from '../../lib/circular-array.js'
-import { Deque } from '../../lib/deque.js'
-import { DEFAULT_TASK_NAME } from '../../lib/utils.js'
-import { waitPoolEvents } from '../test-utils.js'
-import { WorkerNode } from '../../lib/pools/worker-node.js'
+} from '../../lib/index.cjs'
+import { CircularArray } from '../../lib/circular-array.cjs'
+import { Deque } from '../../lib/deque.cjs'
+import { DEFAULT_TASK_NAME } from '../../lib/utils.cjs'
+import { waitPoolEvents } from '../test-utils.cjs'
+import { WorkerNode } from '../../lib/pools/worker-node.cjs'
describe('Abstract pool test suite', () => {
const version = JSON.parse(
it('Verify that filePath is checked', () => {
expect(() => new FixedThreadPool(numberOfWorkers)).toThrow(
- new Error("Cannot find the worker file 'undefined'")
+ new TypeError('The worker file path must be specified')
+ )
+ expect(() => new FixedThreadPool(numberOfWorkers, 0)).toThrow(
+ new TypeError('The worker file path must be a string')
)
expect(
() => new FixedThreadPool(numberOfWorkers, './dummyWorker.ts')
it('Verify that a negative number of workers is checked', () => {
expect(
() =>
- new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.js')
+ new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.cjs')
).toThrow(
new RangeError(
'Cannot instantiate a pool with a negative number of workers'
)
})
+ it('Verify that pool arguments number and pool type are checked', () => {
+ expect(
+ () =>
+ new FixedThreadPool(
+ numberOfWorkers,
+ './tests/worker-files/thread/testWorker.mjs',
+ undefined,
+ numberOfWorkers * 2
+ )
+ ).toThrow(
+ new Error(
+ 'Cannot instantiate a fixed pool with a maximum number of workers specified at initialization'
+ )
+ )
+ })
+
it('Verify that dynamic pool sizing is checked', () => {
expect(
() =>
new DynamicClusterPool(
1,
undefined,
- './tests/worker-files/cluster/testWorker.js'
+ './tests/worker-files/cluster/testWorker.cjs'
)
).toThrow(
new TypeError(
new DynamicClusterPool(
0,
0.5,
- './tests/worker-files/cluster/testWorker.js'
+ './tests/worker-files/cluster/testWorker.cjs'
)
).toThrow(
new TypeError(
new DynamicClusterPool(
1,
1,
- './tests/worker-files/cluster/testWorker.js'
+ './tests/worker-files/cluster/testWorker.cjs'
)
).toThrow(
new RangeError(
'./tests/worker-files/thread/testWorker.mjs'
)
expect(pool.emitter).toBeInstanceOf(EventEmitterAsyncResource)
+ expect(pool.emitter.eventNames()).toStrictEqual([])
expect(pool.opts).toStrictEqual({
startWorkers: true,
enableEvents: true,
restartWorkerOnError: true,
enableTasksQueue: false,
- workerChoiceStrategy: WorkerChoiceStrategies.ROUND_ROBIN,
- workerChoiceStrategyOptions: {
- retries: 6,
- runTime: { median: false },
- waitTime: { median: false },
- elu: { median: false }
- }
- })
- expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
- retries: 6,
- runTime: { median: false },
- waitTime: { median: false },
- elu: { median: false }
+ workerChoiceStrategy: WorkerChoiceStrategies.ROUND_ROBIN
})
for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
.workerChoiceStrategies) {
expect(workerChoiceStrategy.opts).toStrictEqual({
- retries: 6,
runTime: { median: false },
waitTime: { median: false },
- elu: { median: false }
+ elu: { median: false },
+ weights: expect.objectContaining({
+ 0: expect.any(Number),
+ [pool.info.maxSize - 1]: expect.any(Number)
+ })
})
}
await pool.destroy()
concurrency: 2,
size: Math.pow(numberOfWorkers, 2),
taskStealing: true,
- tasksStealingOnBackPressure: true
+ tasksStealingOnBackPressure: true,
+ tasksFinishedTimeout: 2000
},
workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
workerChoiceStrategyOptions: {
- retries: 6,
runTime: { median: true },
- waitTime: { median: false },
- elu: { median: false },
weights: { 0: 300, 1: 200 }
},
onlineHandler: testHandler,
errorHandler: testHandler,
exitHandler: testHandler
})
- expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
- retries: 6,
- runTime: { median: true },
- waitTime: { median: false },
- elu: { median: false },
- weights: { 0: 300, 1: 200 }
- })
for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
.workerChoiceStrategies) {
expect(workerChoiceStrategy.opts).toStrictEqual({
- retries: 6,
runTime: { median: true },
waitTime: { median: false },
elu: { median: false },
}
)
).toThrow(new Error("Invalid worker choice strategy 'invalidStrategy'"))
- expect(
- () =>
- new FixedThreadPool(
- numberOfWorkers,
- './tests/worker-files/thread/testWorker.mjs',
- {
- workerChoiceStrategyOptions: {
- retries: 'invalidChoiceRetries'
- }
- }
- )
- ).toThrow(
- new TypeError(
- 'Invalid worker choice strategy options: retries must be an integer'
- )
- )
- expect(
- () =>
- new FixedThreadPool(
- numberOfWorkers,
- './tests/worker-files/thread/testWorker.mjs',
- {
- workerChoiceStrategyOptions: {
- retries: -1
- }
- }
- )
- ).toThrow(
- new RangeError(
- "Invalid worker choice strategy options: retries '-1' must be greater or equal than zero"
- )
- )
expect(
() =>
new FixedThreadPool(
'./tests/worker-files/thread/testWorker.mjs',
{ workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
)
- expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
- retries: 6,
- runTime: { median: false },
- waitTime: { median: false },
- elu: { median: false }
- })
- expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
- retries: 6,
- runTime: { median: false },
- waitTime: { median: false },
- elu: { median: false }
- })
+ expect(pool.opts.workerChoiceStrategyOptions).toBeUndefined()
for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
.workerChoiceStrategies) {
expect(workerChoiceStrategy.opts).toStrictEqual({
- retries: 6,
runTime: { median: false },
waitTime: { median: false },
- elu: { median: false }
+ elu: { median: false },
+ weights: expect.objectContaining({
+ 0: expect.any(Number),
+ [pool.info.maxSize - 1]: expect.any(Number)
+ })
})
}
expect(
elu: { median: true }
})
expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
- retries: 6,
runTime: { median: true },
- waitTime: { median: false },
- elu: { median: true }
- })
- expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
- retries: 6,
- runTime: { median: true },
- waitTime: { median: false },
elu: { median: true }
})
for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
.workerChoiceStrategies) {
expect(workerChoiceStrategy.opts).toStrictEqual({
- retries: 6,
runTime: { median: true },
waitTime: { median: false },
- elu: { median: true }
+ elu: { median: true },
+ weights: expect.objectContaining({
+ 0: expect.any(Number),
+ [pool.info.maxSize - 1]: expect.any(Number)
+ })
})
}
expect(
elu: { median: false }
})
expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
- retries: 6,
runTime: { median: false },
- waitTime: { median: false },
- elu: { median: false }
- })
- expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
- retries: 6,
- runTime: { median: false },
- waitTime: { median: false },
elu: { median: false }
})
for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
.workerChoiceStrategies) {
expect(workerChoiceStrategy.opts).toStrictEqual({
- retries: 6,
runTime: { median: false },
waitTime: { median: false },
- elu: { median: false }
+ elu: { median: false },
+ weights: expect.objectContaining({
+ 0: expect.any(Number),
+ [pool.info.maxSize - 1]: expect.any(Number)
+ })
})
}
expect(
'Invalid worker choice strategy options: must be a plain object'
)
)
- expect(() =>
- pool.setWorkerChoiceStrategyOptions({
- retries: 'invalidChoiceRetries'
- })
- ).toThrow(
- new TypeError(
- 'Invalid worker choice strategy options: retries must be an integer'
- )
- )
- expect(() => pool.setWorkerChoiceStrategyOptions({ retries: -1 })).toThrow(
- new RangeError(
- "Invalid worker choice strategy options: retries '-1' must be greater or equal than zero"
- )
- )
expect(() => pool.setWorkerChoiceStrategyOptions({ weights: {} })).toThrow(
new Error(
'Invalid worker choice strategy options: must have a weight for each worker node'
concurrency: 1,
size: Math.pow(numberOfWorkers, 2),
taskStealing: true,
- tasksStealingOnBackPressure: true
+ tasksStealingOnBackPressure: true,
+ tasksFinishedTimeout: 2000
})
pool.enableTasksQueue(true, { concurrency: 2 })
expect(pool.opts.enableTasksQueue).toBe(true)
concurrency: 2,
size: Math.pow(numberOfWorkers, 2),
taskStealing: true,
- tasksStealingOnBackPressure: true
+ tasksStealingOnBackPressure: true,
+ tasksFinishedTimeout: 2000
})
pool.enableTasksQueue(false)
expect(pool.opts.enableTasksQueue).toBe(false)
concurrency: 1,
size: Math.pow(numberOfWorkers, 2),
taskStealing: true,
- tasksStealingOnBackPressure: true
+ tasksStealingOnBackPressure: true,
+ tasksFinishedTimeout: 2000
})
for (const workerNode of pool.workerNodes) {
expect(workerNode.tasksQueueBackPressureSize).toBe(
concurrency: 2,
size: 2,
taskStealing: false,
- tasksStealingOnBackPressure: false
+ tasksStealingOnBackPressure: false,
+ tasksFinishedTimeout: 3000
})
expect(pool.opts.tasksQueueOptions).toStrictEqual({
concurrency: 2,
size: 2,
taskStealing: false,
- tasksStealingOnBackPressure: false
+ tasksStealingOnBackPressure: false,
+ tasksFinishedTimeout: 3000
})
for (const workerNode of pool.workerNodes) {
expect(workerNode.tasksQueueBackPressureSize).toBe(
concurrency: 1,
size: Math.pow(numberOfWorkers, 2),
taskStealing: true,
- tasksStealingOnBackPressure: true
+ tasksStealingOnBackPressure: true,
+ tasksFinishedTimeout: 2000
})
for (const workerNode of pool.workerNodes) {
expect(workerNode.tasksQueueBackPressureSize).toBe(
started: true,
ready: true,
strategy: WorkerChoiceStrategies.ROUND_ROBIN,
+ strategyRetries: 0,
minSize: numberOfWorkers,
maxSize: numberOfWorkers,
workerNodes: numberOfWorkers,
pool = new DynamicClusterPool(
Math.floor(numberOfWorkers / 2),
numberOfWorkers,
- './tests/worker-files/cluster/testWorker.js'
+ './tests/worker-files/cluster/testWorker.cjs'
)
expect(pool.info).toStrictEqual({
version,
started: true,
ready: true,
strategy: WorkerChoiceStrategies.ROUND_ROBIN,
+ strategyRetries: 0,
minSize: Math.floor(numberOfWorkers / 2),
maxSize: numberOfWorkers,
workerNodes: Math.floor(numberOfWorkers / 2),
it('Verify that pool worker tasks usage are initialized', async () => {
const pool = new FixedClusterPool(
numberOfWorkers,
- './tests/worker-files/cluster/testWorker.js'
+ './tests/worker-files/cluster/testWorker.cjs'
)
for (const workerNode of pool.workerNodes) {
expect(workerNode).toBeInstanceOf(WorkerNode)
it('Verify that pool worker tasks queue are initialized', async () => {
let pool = new FixedClusterPool(
numberOfWorkers,
- './tests/worker-files/cluster/testWorker.js'
+ './tests/worker-files/cluster/testWorker.cjs'
)
for (const workerNode of pool.workerNodes) {
expect(workerNode).toBeInstanceOf(WorkerNode)
it('Verify that pool worker info are initialized', async () => {
let pool = new FixedClusterPool(
numberOfWorkers,
- './tests/worker-files/cluster/testWorker.js'
+ './tests/worker-files/cluster/testWorker.cjs'
)
for (const workerNode of pool.workerNodes) {
expect(workerNode).toBeInstanceOf(WorkerNode)
id: expect.any(Number),
type: WorkerTypes.cluster,
dynamic: false,
- ready: true
+ ready: true,
+ stealing: false
})
}
await pool.destroy()
id: expect.any(Number),
type: WorkerTypes.thread,
dynamic: false,
- ready: true
+ ready: true,
+ stealing: false
})
}
await pool.destroy()
it('Verify that pool can be started after initialization', async () => {
const pool = new FixedClusterPool(
numberOfWorkers,
- './tests/worker-files/cluster/testWorker.js',
+ './tests/worker-files/cluster/testWorker.cjs',
{
startWorkers: false
}
)
expect(pool.info.started).toBe(false)
expect(pool.info.ready).toBe(false)
- expect(pool.readyEventEmitted).toBe(false)
expect(pool.workerNodes).toStrictEqual([])
+ expect(pool.readyEventEmitted).toBe(false)
await expect(pool.execute()).rejects.toThrow(
new Error('Cannot execute a task on not started pool')
)
it('Verify that pool execute() arguments are checked', async () => {
const pool = new FixedClusterPool(
numberOfWorkers,
- './tests/worker-files/cluster/testWorker.js'
+ './tests/worker-files/cluster/testWorker.cjs'
)
await expect(pool.execute(undefined, 0)).rejects.toThrow(
new TypeError('name argument must be a string')
it('Verify that pool worker tasks usage are computed', async () => {
const pool = new FixedClusterPool(
numberOfWorkers,
- './tests/worker-files/cluster/testWorker.js'
+ './tests/worker-files/cluster/testWorker.cjs'
)
const promises = new Set()
const maxMultiplier = 2
const pool = new DynamicClusterPool(
Math.floor(numberOfWorkers / 2),
numberOfWorkers,
- './tests/worker-files/cluster/testWorker.js'
+ './tests/worker-files/cluster/testWorker.cjs'
)
expect(pool.emitter.eventNames()).toStrictEqual([])
let poolInfo
started: true,
ready: true,
strategy: WorkerChoiceStrategies.ROUND_ROBIN,
+ strategyRetries: expect.any(Number),
minSize: expect.any(Number),
maxSize: expect.any(Number),
workerNodes: expect.any(Number),
started: true,
ready: true,
strategy: WorkerChoiceStrategies.ROUND_ROBIN,
+ strategyRetries: expect.any(Number),
minSize: expect.any(Number),
maxSize: expect.any(Number),
workerNodes: expect.any(Number),
started: true,
ready: true,
strategy: WorkerChoiceStrategies.ROUND_ROBIN,
+ strategyRetries: expect.any(Number),
minSize: expect.any(Number),
maxSize: expect.any(Number),
workerNodes: expect.any(Number),
started: true,
ready: true,
strategy: WorkerChoiceStrategies.ROUND_ROBIN,
+ strategyRetries: expect.any(Number),
minSize: expect.any(Number),
maxSize: expect.any(Number),
workerNodes: expect.any(Number),
idleWorkerNodes: expect.any(Number),
+ stealingWorkerNodes: expect.any(Number),
busyWorkerNodes: expect.any(Number),
executedTasks: expect.any(Number),
executingTasks: expect.any(Number),
stolenTasks: expect.any(Number),
failedTasks: expect.any(Number)
})
- expect(pool.hasBackPressure.called).toBe(true)
+ expect(pool.hasBackPressure.callCount).toBeGreaterThanOrEqual(7)
await pool.destroy()
})
+ it('Verify that destroy() waits for queued tasks to finish', async () => {
+ const tasksFinishedTimeout = 2500
+ const pool = new FixedThreadPool(
+ numberOfWorkers,
+ './tests/worker-files/thread/asyncWorker.mjs',
+ {
+ enableTasksQueue: true,
+ tasksQueueOptions: { tasksFinishedTimeout }
+ }
+ )
+ const maxMultiplier = 4
+ let tasksFinished = 0
+ for (const workerNode of pool.workerNodes) {
+ workerNode.on('taskFinished', () => {
+ ++tasksFinished
+ })
+ }
+ for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
+ pool.execute()
+ }
+ expect(pool.info.queuedTasks).toBeGreaterThan(0)
+ const startTime = performance.now()
+ await pool.destroy()
+ const elapsedTime = performance.now() - startTime
+ expect(tasksFinished).toBeLessThanOrEqual(numberOfWorkers * maxMultiplier)
+ expect(elapsedTime).toBeGreaterThanOrEqual(2000)
+ expect(elapsedTime).toBeLessThanOrEqual(tasksFinishedTimeout + 800)
+ })
+
+ it('Verify that destroy() waits until the tasks finished timeout is reached', async () => {
+ const tasksFinishedTimeout = 1000
+ const pool = new FixedThreadPool(
+ numberOfWorkers,
+ './tests/worker-files/thread/asyncWorker.mjs',
+ {
+ enableTasksQueue: true,
+ tasksQueueOptions: { tasksFinishedTimeout }
+ }
+ )
+ const maxMultiplier = 4
+ let tasksFinished = 0
+ for (const workerNode of pool.workerNodes) {
+ workerNode.on('taskFinished', () => {
+ ++tasksFinished
+ })
+ }
+ for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
+ pool.execute()
+ }
+ expect(pool.info.queuedTasks).toBeGreaterThan(0)
+ const startTime = performance.now()
+ await pool.destroy()
+ const elapsedTime = performance.now() - startTime
+ expect(tasksFinished).toBe(0)
+ expect(elapsedTime).toBeLessThanOrEqual(tasksFinishedTimeout + 800)
+ })
+
it('Verify that pool asynchronous resource track tasks execution', async () => {
let taskAsyncId
let initCalls = 0
if (executionAsyncId() === taskAsyncId) resolveCalls++
}
})
- hook.enable()
const pool = new FixedThreadPool(
numberOfWorkers,
'./tests/worker-files/thread/testWorker.mjs'
)
+ hook.enable()
await pool.execute()
hook.disable()
expect(initCalls).toBe(1)
expect(beforeCalls).toBe(1)
expect(afterCalls).toBe(1)
expect(resolveCalls).toBe(1)
+ await pool.destroy()
})
it('Verify that hasTaskFunction() is working', async () => {
await dynamicThreadPool.destroy()
const fixedClusterPool = new FixedClusterPool(
numberOfWorkers,
- './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
+ './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.cjs'
)
await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
expect(fixedClusterPool.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true)
await dynamicThreadPool.destroy()
const fixedClusterPool = new FixedClusterPool(
numberOfWorkers,
- './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
+ './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.cjs'
)
await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
expect(fixedClusterPool.listTaskFunctionNames()).toStrictEqual([
const pool = new DynamicClusterPool(
Math.floor(numberOfWorkers / 2),
numberOfWorkers,
- './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
+ './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.cjs'
)
const data = { n: 10 }
const result0 = await pool.execute(data)
const pool = new DynamicClusterPool(
Math.floor(numberOfWorkers / 2),
numberOfWorkers,
- './tests/worker-files/cluster/testWorker.js'
+ './tests/worker-files/cluster/testWorker.cjs'
)
const workerNodeKey = 0
await expect(
const pool = new DynamicClusterPool(
Math.floor(numberOfWorkers / 2),
numberOfWorkers,
- './tests/worker-files/cluster/testWorker.js'
+ './tests/worker-files/cluster/testWorker.cjs'
)
const workerNodeKey = 0
await expect(
const pool = new DynamicClusterPool(
Math.floor(numberOfWorkers / 2),
numberOfWorkers,
- './tests/worker-files/cluster/testWorker.js'
+ './tests/worker-files/cluster/testWorker.cjs'
)
await expect(
pool.sendTaskFunctionOperationToWorkers({