import { dirname, join } from 'node:path'
import { readFileSync } from 'node:fs'
import { fileURLToPath } from 'node:url'
+import { createHook, executionAsyncId } from 'node:async_hooks'
import { expect } from 'expect'
import { restore, stub } from 'sinon'
import {
restore()
})
- it('Simulate pool creation from a non main thread/process', () => {
+ it('Verify that pool can be created and destroyed', async () => {
+ const pool = new FixedThreadPool(
+ numberOfWorkers,
+ './tests/worker-files/thread/testWorker.mjs'
+ )
+ expect(pool).toBeInstanceOf(FixedThreadPool)
+ await pool.destroy()
+ })
+
+ it('Verify that pool cannot be created from a non main thread/process', () => {
expect(
() =>
new StubPoolWithIsMain(
numberOfWorkers,
'./tests/worker-files/thread/testWorker.mjs'
)
- expect(pool.starting).toBe(false)
expect(pool.started).toBe(true)
+ expect(pool.starting).toBe(false)
+ expect(pool.destroying).toBe(false)
await pool.destroy()
})
it('Verify that filePath is checked', () => {
expect(() => new FixedThreadPool(numberOfWorkers)).toThrow(
- new Error("Cannot find the worker file 'undefined'")
+ new TypeError('The worker file path must be specified')
+ )
+ expect(() => new FixedThreadPool(numberOfWorkers, 0)).toThrow(
+ new TypeError('The worker file path must be a string')
)
expect(
() => new FixedThreadPool(numberOfWorkers, './dummyWorker.ts')
)
})
+ it('Verify that pool arguments number and pool type are checked', () => {
+ expect(
+ () =>
+ new FixedThreadPool(
+ numberOfWorkers,
+ './tests/worker-files/thread/testWorker.mjs',
+ undefined,
+ numberOfWorkers * 2
+ )
+ ).toThrow(
+ new Error(
+ 'Cannot instantiate a fixed pool with a maximum number of workers specified at initialization'
+ )
+ )
+ })
+
it('Verify that dynamic pool sizing is checked', () => {
expect(
() =>
enableEvents: true,
restartWorkerOnError: true,
enableTasksQueue: false,
- workerChoiceStrategy: WorkerChoiceStrategies.ROUND_ROBIN,
- workerChoiceStrategyOptions: {
- retries: 6,
- runTime: { median: false },
- waitTime: { median: false },
- elu: { median: false }
- }
+ workerChoiceStrategy: WorkerChoiceStrategies.ROUND_ROBIN
})
expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
- retries: 6,
+ retries:
+ pool.info.maxSize +
+ Object.keys(pool.workerChoiceStrategyContext.opts.weights).length,
runTime: { median: false },
waitTime: { median: false },
- elu: { median: false }
+ elu: { median: false },
+ weights: expect.objectContaining({
+ 0: expect.any(Number),
+ [pool.info.maxSize - 1]: expect.any(Number)
+ })
})
for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
.workerChoiceStrategies) {
- expect(workerChoiceStrategy.opts).toStrictEqual({
- retries: 6,
- runTime: { median: false },
- waitTime: { median: false },
- elu: { median: false }
- })
+ expect(workerChoiceStrategy.opts).toStrictEqual(
+ expect.objectContaining({
+ retries:
+ pool.info.maxSize +
+ Object.keys(workerChoiceStrategy.opts.weights).length,
+ runTime: { median: false },
+ waitTime: { median: false },
+ elu: { median: false }
+ // weights: expect.objectContaining({
+ // 0: expect.any(Number),
+ // [pool.info.maxSize - 1]: expect.any(Number)
+ // })
+ })
+ )
}
await pool.destroy()
const testHandler = () => console.info('test handler executed')
concurrency: 2,
size: Math.pow(numberOfWorkers, 2),
taskStealing: true,
- tasksStealingOnBackPressure: true
+ tasksStealingOnBackPressure: true,
+ tasksFinishedTimeout: 2000
},
workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
workerChoiceStrategyOptions: {
- retries: 6,
runTime: { median: true },
- waitTime: { median: false },
- elu: { median: false },
weights: { 0: 300, 1: 200 }
},
onlineHandler: testHandler,
exitHandler: testHandler
})
expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
- retries: 6,
+ retries:
+ pool.info.maxSize +
+ Object.keys(pool.opts.workerChoiceStrategyOptions.weights).length,
runTime: { median: true },
waitTime: { median: false },
elu: { median: false },
for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
.workerChoiceStrategies) {
expect(workerChoiceStrategy.opts).toStrictEqual({
- retries: 6,
+ retries:
+ pool.info.maxSize +
+ Object.keys(pool.opts.workerChoiceStrategyOptions.weights).length,
runTime: { median: true },
waitTime: { median: false },
elu: { median: false },
}
)
).toThrow(new Error("Invalid worker choice strategy 'invalidStrategy'"))
- expect(
- () =>
- new FixedThreadPool(
- numberOfWorkers,
- './tests/worker-files/thread/testWorker.mjs',
- {
- workerChoiceStrategyOptions: {
- retries: 'invalidChoiceRetries'
- }
- }
- )
- ).toThrow(
- new TypeError(
- 'Invalid worker choice strategy options: retries must be an integer'
- )
- )
- expect(
- () =>
- new FixedThreadPool(
- numberOfWorkers,
- './tests/worker-files/thread/testWorker.mjs',
- {
- workerChoiceStrategyOptions: {
- retries: -1
- }
- }
- )
- ).toThrow(
- new RangeError(
- "Invalid worker choice strategy options: retries '-1' must be greater or equal than zero"
- )
- )
expect(
() =>
new FixedThreadPool(
'./tests/worker-files/thread/testWorker.mjs',
{ workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
)
- expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
- retries: 6,
- runTime: { median: false },
- waitTime: { median: false },
- elu: { median: false }
- })
+ expect(pool.opts.workerChoiceStrategyOptions).toBeUndefined()
expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
- retries: 6,
+ retries:
+ pool.info.maxSize +
+ Object.keys(pool.workerChoiceStrategyContext.opts.weights).length,
runTime: { median: false },
waitTime: { median: false },
- elu: { median: false }
+ elu: { median: false },
+ weights: expect.objectContaining({
+ 0: expect.any(Number),
+ [pool.info.maxSize - 1]: expect.any(Number)
+ })
})
for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
.workerChoiceStrategies) {
expect(workerChoiceStrategy.opts).toStrictEqual({
- retries: 6,
+ retries:
+ pool.info.maxSize +
+ Object.keys(workerChoiceStrategy.opts.weights).length,
runTime: { median: false },
waitTime: { median: false },
- elu: { median: false }
+ elu: { median: false },
+ weights: expect.objectContaining({
+ 0: expect.any(Number),
+ [pool.info.maxSize - 1]: expect.any(Number)
+ })
})
}
expect(
elu: { median: true }
})
expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
- retries: 6,
runTime: { median: true },
- waitTime: { median: false },
elu: { median: true }
})
expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
- retries: 6,
+ retries:
+ pool.info.maxSize +
+ Object.keys(pool.workerChoiceStrategyContext.opts.weights).length,
runTime: { median: true },
waitTime: { median: false },
- elu: { median: true }
+ elu: { median: true },
+ weights: expect.objectContaining({
+ 0: expect.any(Number),
+ [pool.info.maxSize - 1]: expect.any(Number)
+ })
})
for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
.workerChoiceStrategies) {
expect(workerChoiceStrategy.opts).toStrictEqual({
- retries: 6,
+ retries:
+ pool.info.maxSize +
+ Object.keys(workerChoiceStrategy.opts.weights).length,
runTime: { median: true },
waitTime: { median: false },
- elu: { median: true }
+ elu: { median: true },
+ weights: expect.objectContaining({
+ 0: expect.any(Number),
+ [pool.info.maxSize - 1]: expect.any(Number)
+ })
})
}
expect(
elu: { median: false }
})
expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
- retries: 6,
runTime: { median: false },
- waitTime: { median: false },
elu: { median: false }
})
expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
- retries: 6,
+ retries:
+ pool.info.maxSize +
+ Object.keys(pool.workerChoiceStrategyContext.opts.weights).length,
runTime: { median: false },
waitTime: { median: false },
- elu: { median: false }
+ elu: { median: false },
+ weights: expect.objectContaining({
+ 0: expect.any(Number),
+ [pool.info.maxSize - 1]: expect.any(Number)
+ })
})
for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
.workerChoiceStrategies) {
expect(workerChoiceStrategy.opts).toStrictEqual({
- retries: 6,
+ retries:
+ pool.info.maxSize +
+ Object.keys(workerChoiceStrategy.opts.weights).length,
runTime: { median: false },
waitTime: { median: false },
- elu: { median: false }
+ elu: { median: false },
+ weights: expect.objectContaining({
+ 0: expect.any(Number),
+ [pool.info.maxSize - 1]: expect.any(Number)
+ })
})
}
expect(
'Invalid worker choice strategy options: must be a plain object'
)
)
- expect(() =>
- pool.setWorkerChoiceStrategyOptions({
- retries: 'invalidChoiceRetries'
- })
- ).toThrow(
- new TypeError(
- 'Invalid worker choice strategy options: retries must be an integer'
- )
- )
- expect(() => pool.setWorkerChoiceStrategyOptions({ retries: -1 })).toThrow(
- new RangeError(
- "Invalid worker choice strategy options: retries '-1' must be greater or equal than zero"
- )
- )
expect(() => pool.setWorkerChoiceStrategyOptions({ weights: {} })).toThrow(
new Error(
'Invalid worker choice strategy options: must have a weight for each worker node'
concurrency: 1,
size: Math.pow(numberOfWorkers, 2),
taskStealing: true,
- tasksStealingOnBackPressure: true
+ tasksStealingOnBackPressure: true,
+ tasksFinishedTimeout: 2000
})
pool.enableTasksQueue(true, { concurrency: 2 })
expect(pool.opts.enableTasksQueue).toBe(true)
concurrency: 2,
size: Math.pow(numberOfWorkers, 2),
taskStealing: true,
- tasksStealingOnBackPressure: true
+ tasksStealingOnBackPressure: true,
+ tasksFinishedTimeout: 2000
})
pool.enableTasksQueue(false)
expect(pool.opts.enableTasksQueue).toBe(false)
concurrency: 1,
size: Math.pow(numberOfWorkers, 2),
taskStealing: true,
- tasksStealingOnBackPressure: true
+ tasksStealingOnBackPressure: true,
+ tasksFinishedTimeout: 2000
})
for (const workerNode of pool.workerNodes) {
expect(workerNode.tasksQueueBackPressureSize).toBe(
concurrency: 2,
size: 2,
taskStealing: false,
- tasksStealingOnBackPressure: false
+ tasksStealingOnBackPressure: false,
+ tasksFinishedTimeout: 3000
})
expect(pool.opts.tasksQueueOptions).toStrictEqual({
concurrency: 2,
size: 2,
taskStealing: false,
- tasksStealingOnBackPressure: false
+ tasksStealingOnBackPressure: false,
+ tasksFinishedTimeout: 3000
})
for (const workerNode of pool.workerNodes) {
expect(workerNode.tasksQueueBackPressureSize).toBe(
concurrency: 1,
size: Math.pow(numberOfWorkers, 2),
taskStealing: true,
- tasksStealingOnBackPressure: true
+ tasksStealingOnBackPressure: true,
+ tasksFinishedTimeout: 2000
})
for (const workerNode of pool.workerNodes) {
expect(workerNode.tasksQueueBackPressureSize).toBe(
executing: 0,
queued: 0,
maxQueued: 0,
+ sequentiallyStolen: 0,
stolen: 0,
failed: 0
},
await pool.destroy()
})
+ it('Verify that pool statuses are checked at start or destroy', async () => {
+ const pool = new FixedThreadPool(
+ numberOfWorkers,
+ './tests/worker-files/thread/testWorker.mjs'
+ )
+ expect(pool.info.started).toBe(true)
+ expect(pool.info.ready).toBe(true)
+ expect(() => pool.start()).toThrow(
+ new Error('Cannot start an already started pool')
+ )
+ await pool.destroy()
+ expect(pool.info.started).toBe(false)
+ expect(pool.info.ready).toBe(false)
+ await expect(pool.destroy()).rejects.toThrow(
+ new Error('Cannot destroy an already destroyed pool')
+ )
+ })
+
it('Verify that pool can be started after initialization', async () => {
const pool = new FixedClusterPool(
numberOfWorkers,
)
expect(pool.info.started).toBe(false)
expect(pool.info.ready).toBe(false)
+ expect(pool.readyEventEmitted).toBe(false)
expect(pool.workerNodes).toStrictEqual([])
await expect(pool.execute()).rejects.toThrow(
new Error('Cannot execute a task on not started pool')
pool.start()
expect(pool.info.started).toBe(true)
expect(pool.info.ready).toBe(true)
+ await waitPoolEvents(pool, PoolEvents.ready, 1)
+ expect(pool.readyEventEmitted).toBe(true)
expect(pool.workerNodes.length).toBe(numberOfWorkers)
for (const workerNode of pool.workerNodes) {
expect(workerNode).toBeInstanceOf(WorkerNode)
executing: maxMultiplier,
queued: 0,
maxQueued: 0,
+ sequentiallyStolen: 0,
stolen: 0,
failed: 0
},
executing: 0,
queued: 0,
maxQueued: 0,
+ sequentiallyStolen: 0,
stolen: 0,
failed: 0
},
executing: 0,
queued: 0,
maxQueued: 0,
+ sequentiallyStolen: 0,
stolen: 0,
failed: 0
},
executing: 0,
queued: 0,
maxQueued: 0,
+ sequentiallyStolen: 0,
stolen: 0,
failed: 0
},
stolenTasks: expect.any(Number),
failedTasks: expect.any(Number)
})
- expect(pool.hasBackPressure.called).toBe(true)
+ expect(pool.hasBackPressure.callCount).toBe(5)
+ await pool.destroy()
+ })
+
+ it('Verify that destroy() waits for queued tasks to finish', async () => {
+ const tasksFinishedTimeout = 2500
+ const pool = new FixedThreadPool(
+ numberOfWorkers,
+ './tests/worker-files/thread/asyncWorker.mjs',
+ {
+ enableTasksQueue: true,
+ tasksQueueOptions: { tasksFinishedTimeout }
+ }
+ )
+ const maxMultiplier = 4
+ let tasksFinished = 0
+ for (const workerNode of pool.workerNodes) {
+ workerNode.on('taskFinished', () => {
+ ++tasksFinished
+ })
+ }
+ for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
+ pool.execute()
+ }
+ expect(pool.info.queuedTasks).toBeGreaterThan(0)
+ const startTime = performance.now()
+ await pool.destroy()
+ const elapsedTime = performance.now() - startTime
+ expect(tasksFinished).toBe(numberOfWorkers * maxMultiplier)
+ expect(elapsedTime).toBeGreaterThanOrEqual(2000)
+ expect(elapsedTime).toBeLessThanOrEqual(tasksFinishedTimeout + 100)
+ })
+
+ it('Verify that destroy() waits until the tasks finished timeout is reached', async () => {
+ const tasksFinishedTimeout = 1000
+ const pool = new FixedThreadPool(
+ numberOfWorkers,
+ './tests/worker-files/thread/asyncWorker.mjs',
+ {
+ enableTasksQueue: true,
+ tasksQueueOptions: { tasksFinishedTimeout }
+ }
+ )
+ const maxMultiplier = 4
+ let tasksFinished = 0
+ for (const workerNode of pool.workerNodes) {
+ workerNode.on('taskFinished', () => {
+ ++tasksFinished
+ })
+ }
+ for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
+ pool.execute()
+ }
+ expect(pool.info.queuedTasks).toBeGreaterThan(0)
+ const startTime = performance.now()
+ await pool.destroy()
+ const elapsedTime = performance.now() - startTime
+ expect(tasksFinished).toBe(0)
+ expect(elapsedTime).toBeLessThanOrEqual(tasksFinishedTimeout + 600)
+ })
+
+ it('Verify that pool asynchronous resource track tasks execution', async () => {
+ let taskAsyncId
+ let initCalls = 0
+ let beforeCalls = 0
+ let afterCalls = 0
+ let resolveCalls = 0
+ const hook = createHook({
+ init (asyncId, type) {
+ if (type === 'poolifier:task') {
+ initCalls++
+ taskAsyncId = asyncId
+ }
+ },
+ before (asyncId) {
+ if (asyncId === taskAsyncId) beforeCalls++
+ },
+ after (asyncId) {
+ if (asyncId === taskAsyncId) afterCalls++
+ },
+ promiseResolve () {
+ if (executionAsyncId() === taskAsyncId) resolveCalls++
+ }
+ })
+ const pool = new FixedThreadPool(
+ numberOfWorkers,
+ './tests/worker-files/thread/testWorker.mjs'
+ )
+ hook.enable()
+ await pool.execute()
+ hook.disable()
+ expect(initCalls).toBe(1)
+ expect(beforeCalls).toBe(1)
+ expect(afterCalls).toBe(1)
+ expect(resolveCalls).toBe(1)
await pool.destroy()
})
executed: expect.any(Number),
executing: 0,
queued: 0,
+ sequentiallyStolen: 0,
stolen: 0,
failed: 0
},
'./tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
)
await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
+ const workerId = dynamicThreadPool.workerNodes[0].info.id
await expect(dynamicThreadPool.setDefaultTaskFunction(0)).rejects.toThrow(
new Error(
- "Task function operation 'default' failed on worker 31 with error: 'TypeError: name parameter is not a string'"
+ `Task function operation 'default' failed on worker ${workerId} with error: 'TypeError: name parameter is not a string'`
)
)
await expect(
dynamicThreadPool.setDefaultTaskFunction(DEFAULT_TASK_NAME)
).rejects.toThrow(
new Error(
- "Task function operation 'default' failed on worker 31 with error: 'Error: Cannot set the default task function reserved name as the default task function'"
+ `Task function operation 'default' failed on worker ${workerId} with error: 'Error: Cannot set the default task function reserved name as the default task function'`
)
)
await expect(
dynamicThreadPool.setDefaultTaskFunction('unknown')
).rejects.toThrow(
new Error(
- "Task function operation 'default' failed on worker 31 with error: 'Error: Cannot set the default task function to a non-existing task function'"
+ `Task function operation 'default' failed on worker ${workerId} with error: 'Error: Cannot set the default task function to a non-existing task function'`
)
)
expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
executing: 0,
failed: 0,
queued: 0,
+ sequentiallyStolen: 0,
stolen: 0
},
runTime: {
await expect(
pool.sendKillMessageToWorker(workerNodeKey)
).resolves.toBeUndefined()
+ await expect(
+ pool.sendKillMessageToWorker(numberOfWorkers)
+ ).rejects.toStrictEqual(
+ new Error(`Invalid worker node key '${numberOfWorkers}'`)
+ )
await pool.destroy()
})