it('Verify that filePath is checked', () => {
expect(() => new FixedThreadPool(numberOfWorkers)).toThrow(
- new Error("Cannot find the worker file 'undefined'")
+ new TypeError('The worker file path must be specified')
+ )
+ expect(() => new FixedThreadPool(numberOfWorkers, 0)).toThrow(
+ new TypeError('The worker file path must be a string')
)
expect(
() => new FixedThreadPool(numberOfWorkers, './dummyWorker.ts')
)
})
+ it('Verify that pool arguments number and pool type are checked', () => {
+ expect(
+ () =>
+ new FixedThreadPool(
+ numberOfWorkers,
+ './tests/worker-files/thread/testWorker.mjs',
+ undefined,
+ numberOfWorkers * 2
+ )
+ ).toThrow(
+ new Error(
+ 'Cannot instantiate a fixed pool with a maximum number of workers specified at initialization'
+ )
+ )
+ })
+
it('Verify that dynamic pool sizing is checked', () => {
expect(
() =>
enableEvents: true,
restartWorkerOnError: true,
enableTasksQueue: false,
- workerChoiceStrategy: WorkerChoiceStrategies.ROUND_ROBIN,
- workerChoiceStrategyOptions: {
- retries: 6,
- runTime: { median: false },
- waitTime: { median: false },
- elu: { median: false }
- }
+ workerChoiceStrategy: WorkerChoiceStrategies.ROUND_ROBIN
})
expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
- retries: 6,
+ retries: pool.info.maxSize,
runTime: { median: false },
waitTime: { median: false },
elu: { median: false }
for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
.workerChoiceStrategies) {
expect(workerChoiceStrategy.opts).toStrictEqual({
- retries: 6,
+ retries: pool.info.maxSize,
runTime: { median: false },
waitTime: { median: false },
elu: { median: false }
concurrency: 2,
size: Math.pow(numberOfWorkers, 2),
taskStealing: true,
- tasksStealingOnBackPressure: true
+ tasksStealingOnBackPressure: true,
+ tasksFinishedTimeout: 2000
},
workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
workerChoiceStrategyOptions: {
- retries: 6,
runTime: { median: true },
- waitTime: { median: false },
- elu: { median: false },
weights: { 0: 300, 1: 200 }
},
onlineHandler: testHandler,
exitHandler: testHandler
})
expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
- retries: 6,
+ retries: pool.info.maxSize,
runTime: { median: true },
waitTime: { median: false },
elu: { median: false },
for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
.workerChoiceStrategies) {
expect(workerChoiceStrategy.opts).toStrictEqual({
- retries: 6,
+ retries: pool.info.maxSize,
runTime: { median: true },
waitTime: { median: false },
elu: { median: false },
}
)
).toThrow(new Error("Invalid worker choice strategy 'invalidStrategy'"))
- expect(
- () =>
- new FixedThreadPool(
- numberOfWorkers,
- './tests/worker-files/thread/testWorker.mjs',
- {
- workerChoiceStrategyOptions: {
- retries: 'invalidChoiceRetries'
- }
- }
- )
- ).toThrow(
- new TypeError(
- 'Invalid worker choice strategy options: retries must be an integer'
- )
- )
- expect(
- () =>
- new FixedThreadPool(
- numberOfWorkers,
- './tests/worker-files/thread/testWorker.mjs',
- {
- workerChoiceStrategyOptions: {
- retries: -1
- }
- }
- )
- ).toThrow(
- new RangeError(
- "Invalid worker choice strategy options: retries '-1' must be greater or equal than zero"
- )
- )
expect(
() =>
new FixedThreadPool(
'./tests/worker-files/thread/testWorker.mjs',
{ workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
)
- expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
- retries: 6,
- runTime: { median: false },
- waitTime: { median: false },
- elu: { median: false }
- })
+ expect(pool.opts.workerChoiceStrategyOptions).toBeUndefined()
expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
- retries: 6,
+ retries: pool.info.maxSize,
runTime: { median: false },
waitTime: { median: false },
elu: { median: false }
for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
.workerChoiceStrategies) {
expect(workerChoiceStrategy.opts).toStrictEqual({
- retries: 6,
+ retries: pool.info.maxSize,
runTime: { median: false },
waitTime: { median: false },
elu: { median: false }
elu: { median: true }
})
expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
- retries: 6,
runTime: { median: true },
- waitTime: { median: false },
elu: { median: true }
})
expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
- retries: 6,
+ retries: pool.info.maxSize,
runTime: { median: true },
waitTime: { median: false },
elu: { median: true }
for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
.workerChoiceStrategies) {
expect(workerChoiceStrategy.opts).toStrictEqual({
- retries: 6,
+ retries: pool.info.maxSize,
runTime: { median: true },
waitTime: { median: false },
elu: { median: true }
elu: { median: false }
})
expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
- retries: 6,
runTime: { median: false },
- waitTime: { median: false },
elu: { median: false }
})
expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
- retries: 6,
+ retries: pool.info.maxSize,
runTime: { median: false },
waitTime: { median: false },
elu: { median: false }
for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
.workerChoiceStrategies) {
expect(workerChoiceStrategy.opts).toStrictEqual({
- retries: 6,
+ retries: pool.info.maxSize,
runTime: { median: false },
waitTime: { median: false },
elu: { median: false }
'Invalid worker choice strategy options: must be a plain object'
)
)
- expect(() =>
- pool.setWorkerChoiceStrategyOptions({
- retries: 'invalidChoiceRetries'
- })
- ).toThrow(
- new TypeError(
- 'Invalid worker choice strategy options: retries must be an integer'
- )
- )
- expect(() => pool.setWorkerChoiceStrategyOptions({ retries: -1 })).toThrow(
- new RangeError(
- "Invalid worker choice strategy options: retries '-1' must be greater or equal than zero"
- )
- )
expect(() => pool.setWorkerChoiceStrategyOptions({ weights: {} })).toThrow(
new Error(
'Invalid worker choice strategy options: must have a weight for each worker node'
concurrency: 1,
size: Math.pow(numberOfWorkers, 2),
taskStealing: true,
- tasksStealingOnBackPressure: true
+ tasksStealingOnBackPressure: true,
+ tasksFinishedTimeout: 2000
})
pool.enableTasksQueue(true, { concurrency: 2 })
expect(pool.opts.enableTasksQueue).toBe(true)
concurrency: 2,
size: Math.pow(numberOfWorkers, 2),
taskStealing: true,
- tasksStealingOnBackPressure: true
+ tasksStealingOnBackPressure: true,
+ tasksFinishedTimeout: 2000
})
pool.enableTasksQueue(false)
expect(pool.opts.enableTasksQueue).toBe(false)
concurrency: 1,
size: Math.pow(numberOfWorkers, 2),
taskStealing: true,
- tasksStealingOnBackPressure: true
+ tasksStealingOnBackPressure: true,
+ tasksFinishedTimeout: 2000
})
for (const workerNode of pool.workerNodes) {
expect(workerNode.tasksQueueBackPressureSize).toBe(
concurrency: 2,
size: 2,
taskStealing: false,
- tasksStealingOnBackPressure: false
+ tasksStealingOnBackPressure: false,
+ tasksFinishedTimeout: 3000
})
expect(pool.opts.tasksQueueOptions).toStrictEqual({
concurrency: 2,
size: 2,
taskStealing: false,
- tasksStealingOnBackPressure: false
+ tasksStealingOnBackPressure: false,
+ tasksFinishedTimeout: 3000
})
for (const workerNode of pool.workerNodes) {
expect(workerNode.tasksQueueBackPressureSize).toBe(
concurrency: 1,
size: Math.pow(numberOfWorkers, 2),
taskStealing: true,
- tasksStealingOnBackPressure: true
+ tasksStealingOnBackPressure: true,
+ tasksFinishedTimeout: 2000
})
for (const workerNode of pool.workerNodes) {
expect(workerNode.tasksQueueBackPressureSize).toBe(
stolenTasks: expect.any(Number),
failedTasks: expect.any(Number)
})
- expect(pool.hasBackPressure.called).toBe(true)
+ expect(pool.hasBackPressure.callCount).toBe(5)
+ await pool.destroy()
+ })
+
+ it('Verify that destroy() waits for queued tasks to finish', async () => {
+ const tasksFinishedTimeout = 2500
+ const pool = new FixedThreadPool(
+ numberOfWorkers,
+ './tests/worker-files/thread/asyncWorker.mjs',
+ {
+ enableTasksQueue: true,
+ tasksQueueOptions: { tasksFinishedTimeout }
+ }
+ )
+ const maxMultiplier = 4
+ let tasksFinished = 0
+ for (const workerNode of pool.workerNodes) {
+ workerNode.on('taskFinished', () => {
+ ++tasksFinished
+ })
+ }
+ for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
+ pool.execute()
+ }
+ expect(pool.info.queuedTasks).toBeGreaterThan(0)
+ const startTime = performance.now()
+ await pool.destroy()
+ const elapsedTime = performance.now() - startTime
+ expect(tasksFinished).toBe(numberOfWorkers * maxMultiplier)
+ expect(elapsedTime).toBeGreaterThanOrEqual(2000)
+ expect(elapsedTime).toBeLessThanOrEqual(tasksFinishedTimeout + 100)
+ })
+
+ it('Verify that destroy() waits until the tasks finished timeout is reached', async () => {
+ const tasksFinishedTimeout = 1000
+ const pool = new FixedThreadPool(
+ numberOfWorkers,
+ './tests/worker-files/thread/asyncWorker.mjs',
+ {
+ enableTasksQueue: true,
+ tasksQueueOptions: { tasksFinishedTimeout }
+ }
+ )
+ const maxMultiplier = 4
+ let tasksFinished = 0
+ for (const workerNode of pool.workerNodes) {
+ workerNode.on('taskFinished', () => {
+ ++tasksFinished
+ })
+ }
+ for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
+ pool.execute()
+ }
+ expect(pool.info.queuedTasks).toBeGreaterThan(0)
+ const startTime = performance.now()
await pool.destroy()
+ const elapsedTime = performance.now() - startTime
+ expect(tasksFinished).toBe(0)
+ expect(elapsedTime).toBeLessThanOrEqual(tasksFinishedTimeout + 600)
})
it('Verify that pool asynchronous resource track tasks execution', async () => {
await expect(
pool.sendKillMessageToWorker(workerNodeKey)
).resolves.toBeUndefined()
+ await expect(
+ pool.sendKillMessageToWorker(numberOfWorkers)
+ ).rejects.toStrictEqual(
+ new Error(`Invalid worker node key '${numberOfWorkers}'`)
+ )
await pool.destroy()
})