1 const { expect
} = require('expect')
8 WorkerChoiceStrategies
,
11 } = require('../../../lib')
12 const { CircularArray
} = require('../../../lib/circular-array')
13 const { Queue
} = require('../../../lib/queue')
15 describe('Abstract pool test suite', () => {
16 const numberOfWorkers
= 2
17 class StubPoolWithRemoveAllWorker
extends FixedThreadPool
{
20 this.promiseResponseMap
.clear()
23 class StubPoolWithIsMain
extends FixedThreadPool
{
29 it('Simulate pool creation from a non main thread/process', () => {
32 new StubPoolWithIsMain(
34 './tests/worker-files/thread/testWorker.js',
36 errorHandler
: e
=> console
.error(e
)
39 ).toThrowError('Cannot start a pool from a worker!')
42 it('Verify that filePath is checked', () => {
43 const expectedError
= new Error(
44 'Please specify a file with a worker implementation'
46 expect(() => new FixedThreadPool(numberOfWorkers
)).toThrowError(
49 expect(() => new FixedThreadPool(numberOfWorkers
, '')).toThrowError(
54 it('Verify that numberOfWorkers is checked', () => {
55 expect(() => new FixedThreadPool()).toThrowError(
56 'Cannot instantiate a pool without specifying the number of workers'
60 it('Verify that a negative number of workers is checked', () => {
63 new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.js')
66 'Cannot instantiate a pool with a negative number of workers'
71 it('Verify that a non integer number of workers is checked', () => {
74 new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.js')
77 'Cannot instantiate a pool with a non safe integer number of workers'
82 it('Verify that pool options are checked', async () => {
83 let pool
= new FixedThreadPool(
85 './tests/worker-files/thread/testWorker.js'
87 expect(pool
.emitter
).toBeDefined()
88 expect(pool
.opts
.enableEvents
).toBe(true)
89 expect(pool
.opts
.restartWorkerOnError
).toBe(true)
90 expect(pool
.opts
.enableTasksQueue
).toBe(false)
91 expect(pool
.opts
.tasksQueueOptions
).toBeUndefined()
92 expect(pool
.opts
.workerChoiceStrategy
).toBe(
93 WorkerChoiceStrategies
.ROUND_ROBIN
95 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
99 expect(pool
.opts
.messageHandler
).toBeUndefined()
100 expect(pool
.opts
.errorHandler
).toBeUndefined()
101 expect(pool
.opts
.onlineHandler
).toBeUndefined()
102 expect(pool
.opts
.exitHandler
).toBeUndefined()
104 const testHandler
= () => console
.log('test handler executed')
105 pool
= new FixedThreadPool(
107 './tests/worker-files/thread/testWorker.js',
109 workerChoiceStrategy
: WorkerChoiceStrategies
.LEAST_USED
,
110 workerChoiceStrategyOptions
: {
112 weights
: { 0: 300, 1: 200 }
115 restartWorkerOnError
: false,
116 enableTasksQueue
: true,
117 tasksQueueOptions
: { concurrency
: 2 },
118 messageHandler
: testHandler
,
119 errorHandler
: testHandler
,
120 onlineHandler
: testHandler
,
121 exitHandler
: testHandler
124 expect(pool
.emitter
).toBeUndefined()
125 expect(pool
.opts
.enableEvents
).toBe(false)
126 expect(pool
.opts
.restartWorkerOnError
).toBe(false)
127 expect(pool
.opts
.enableTasksQueue
).toBe(true)
128 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({ concurrency
: 2 })
129 expect(pool
.opts
.workerChoiceStrategy
).toBe(
130 WorkerChoiceStrategies
.LEAST_USED
132 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
134 weights
: { 0: 300, 1: 200 }
136 expect(pool
.opts
.messageHandler
).toStrictEqual(testHandler
)
137 expect(pool
.opts
.errorHandler
).toStrictEqual(testHandler
)
138 expect(pool
.opts
.onlineHandler
).toStrictEqual(testHandler
)
139 expect(pool
.opts
.exitHandler
).toStrictEqual(testHandler
)
143 it('Verify that pool options are validated', async () => {
148 './tests/worker-files/thread/testWorker.js',
150 enableTasksQueue
: true,
151 tasksQueueOptions
: { concurrency
: 0 }
154 ).toThrowError("Invalid worker tasks concurrency '0'")
159 './tests/worker-files/thread/testWorker.js',
161 workerChoiceStrategy
: 'invalidStrategy'
164 ).toThrowError("Invalid worker choice strategy 'invalidStrategy'")
169 './tests/worker-files/thread/testWorker.js',
171 workerChoiceStrategyOptions
: { weights
: {} }
175 'Invalid worker choice strategy options: must have a weight for each worker node'
179 it('Verify that worker choice strategy options can be set', async () => {
180 const pool
= new FixedThreadPool(
182 './tests/worker-files/thread/testWorker.js',
183 { workerChoiceStrategy
: WorkerChoiceStrategies
.FAIR_SHARE
}
185 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
189 for (const [, workerChoiceStrategy
] of pool
.workerChoiceStrategyContext
190 .workerChoiceStrategies
) {
191 expect(workerChoiceStrategy
.opts
).toStrictEqual({
197 pool
.workerChoiceStrategyContext
.getRequiredStatistics()
207 pool
.setWorkerChoiceStrategyOptions({ medRunTime
: true })
208 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
211 for (const [, workerChoiceStrategy
] of pool
.workerChoiceStrategyContext
212 .workerChoiceStrategies
) {
213 expect(workerChoiceStrategy
.opts
).toStrictEqual({ medRunTime
: true })
216 pool
.workerChoiceStrategyContext
.getRequiredStatistics()
226 pool
.setWorkerChoiceStrategyOptions({ medRunTime
: false })
227 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
230 for (const [, workerChoiceStrategy
] of pool
.workerChoiceStrategyContext
231 .workerChoiceStrategies
) {
232 expect(workerChoiceStrategy
.opts
).toStrictEqual({ medRunTime
: false })
235 pool
.workerChoiceStrategyContext
.getRequiredStatistics()
248 it('Verify that tasks queue can be enabled/disabled', async () => {
249 const pool
= new FixedThreadPool(
251 './tests/worker-files/thread/testWorker.js'
253 expect(pool
.opts
.enableTasksQueue
).toBe(false)
254 expect(pool
.opts
.tasksQueueOptions
).toBeUndefined()
255 pool
.enableTasksQueue(true)
256 expect(pool
.opts
.enableTasksQueue
).toBe(true)
257 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({ concurrency
: 1 })
258 pool
.enableTasksQueue(true, { concurrency
: 2 })
259 expect(pool
.opts
.enableTasksQueue
).toBe(true)
260 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({ concurrency
: 2 })
261 pool
.enableTasksQueue(false)
262 expect(pool
.opts
.enableTasksQueue
).toBe(false)
263 expect(pool
.opts
.tasksQueueOptions
).toBeUndefined()
267 it('Verify that tasks queue options can be set', async () => {
268 const pool
= new FixedThreadPool(
270 './tests/worker-files/thread/testWorker.js',
271 { enableTasksQueue
: true }
273 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({ concurrency
: 1 })
274 pool
.setTasksQueueOptions({ concurrency
: 2 })
275 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({ concurrency
: 2 })
276 expect(() => pool
.setTasksQueueOptions({ concurrency
: 0 })).toThrowError(
277 "Invalid worker tasks concurrency '0'"
282 it('Verify that pool info is set', async () => {
283 let pool
= new FixedThreadPool(
285 './tests/worker-files/thread/testWorker.js'
287 expect(pool
.info
).toStrictEqual({
288 type
: PoolTypes
.fixed
,
289 worker
: WorkerTypes
.thread
,
290 minSize
: numberOfWorkers
,
291 maxSize
: numberOfWorkers
,
292 workerNodes
: numberOfWorkers
,
293 idleWorkerNodes
: numberOfWorkers
,
300 pool
= new DynamicClusterPool(
303 './tests/worker-files/thread/testWorker.js'
305 expect(pool
.info
).toStrictEqual({
306 type
: PoolTypes
.dynamic
,
307 worker
: WorkerTypes
.cluster
,
308 minSize
: numberOfWorkers
,
309 maxSize
: numberOfWorkers
* 2,
310 workerNodes
: numberOfWorkers
,
311 idleWorkerNodes
: numberOfWorkers
,
320 it('Simulate worker not found', async () => {
321 const pool
= new StubPoolWithRemoveAllWorker(
323 './tests/worker-files/cluster/testWorker.js',
325 errorHandler
: e
=> console
.error(e
)
328 expect(pool
.workerNodes
.length
).toBe(numberOfWorkers
)
329 // Simulate worker not found.
330 pool
.removeAllWorker()
331 expect(pool
.workerNodes
.length
).toBe(0)
335 it('Verify that worker pool tasks usage are initialized', async () => {
336 const pool
= new FixedClusterPool(
338 './tests/worker-files/cluster/testWorker.js'
340 for (const workerNode
of pool
.workerNodes
) {
341 expect(workerNode
.tasksUsage
).toStrictEqual({
345 runTimeHistory
: expect
.any(CircularArray
),
349 waitTimeHistory
: expect
.any(CircularArray
),
359 it('Verify that worker pool tasks queue are initialized', async () => {
360 const pool
= new FixedClusterPool(
362 './tests/worker-files/cluster/testWorker.js'
364 for (const workerNode
of pool
.workerNodes
) {
365 expect(workerNode
.tasksQueue
).toBeDefined()
366 expect(workerNode
.tasksQueue
).toBeInstanceOf(Queue
)
367 expect(workerNode
.tasksQueue
.size
).toBe(0)
372 it('Verify that worker pool tasks usage are computed', async () => {
373 const pool
= new FixedClusterPool(
375 './tests/worker-files/cluster/testWorker.js'
377 const promises
= new Set()
378 const maxMultiplier
= 2
379 for (let i
= 0; i
< numberOfWorkers
* maxMultiplier
; i
++) {
380 promises
.add(pool
.execute())
382 for (const workerNode
of pool
.workerNodes
) {
383 expect(workerNode
.tasksUsage
).toStrictEqual({
385 running
: maxMultiplier
,
387 runTimeHistory
: expect
.any(CircularArray
),
391 waitTimeHistory
: expect
.any(CircularArray
),
398 await Promise
.all(promises
)
399 for (const workerNode
of pool
.workerNodes
) {
400 expect(workerNode
.tasksUsage
).toStrictEqual({
404 runTimeHistory
: expect
.any(CircularArray
),
408 waitTimeHistory
: expect
.any(CircularArray
),
418 it('Verify that worker pool tasks usage are reset at worker choice strategy change', async () => {
419 const pool
= new DynamicThreadPool(
422 './tests/worker-files/thread/testWorker.js'
424 const promises
= new Set()
425 const maxMultiplier
= 2
426 for (let i
= 0; i
< numberOfWorkers
* maxMultiplier
; i
++) {
427 promises
.add(pool
.execute())
429 await Promise
.all(promises
)
430 for (const workerNode
of pool
.workerNodes
) {
431 expect(workerNode
.tasksUsage
).toStrictEqual({
432 ran
: expect
.any(Number
),
435 runTimeHistory
: expect
.any(CircularArray
),
439 waitTimeHistory
: expect
.any(CircularArray
),
445 expect(workerNode
.tasksUsage
.ran
).toBeGreaterThan(0)
446 expect(workerNode
.tasksUsage
.ran
).toBeLessThanOrEqual(maxMultiplier
)
448 pool
.setWorkerChoiceStrategy(WorkerChoiceStrategies
.FAIR_SHARE
)
449 for (const workerNode
of pool
.workerNodes
) {
450 expect(workerNode
.tasksUsage
).toStrictEqual({
454 runTimeHistory
: expect
.any(CircularArray
),
458 waitTimeHistory
: expect
.any(CircularArray
),
464 expect(workerNode
.tasksUsage
.runTimeHistory
.length
).toBe(0)
465 expect(workerNode
.tasksUsage
.waitTimeHistory
.length
).toBe(0)
470 it("Verify that pool event emitter 'full' event can register a callback", async () => {
471 const pool
= new DynamicThreadPool(
474 './tests/worker-files/thread/testWorker.js'
476 const promises
= new Set()
478 pool
.emitter
.on(PoolEvents
.full
, () => ++poolFull
)
479 for (let i
= 0; i
< numberOfWorkers
* 2; i
++) {
480 promises
.add(pool
.execute())
482 await Promise
.all(promises
)
483 // The `full` event is triggered when the number of submitted tasks at once reach the max number of workers in the dynamic pool.
484 // So in total numberOfWorkers * 2 times for a loop submitting up to numberOfWorkers * 2 tasks to the dynamic pool with min = max = numberOfWorkers.
485 expect(poolFull
).toBe(numberOfWorkers
* 2)
489 it("Verify that pool event emitter 'busy' event can register a callback", async () => {
490 const pool
= new FixedThreadPool(
492 './tests/worker-files/thread/testWorker.js'
494 const promises
= new Set()
496 pool
.emitter
.on(PoolEvents
.busy
, () => ++poolBusy
)
497 for (let i
= 0; i
< numberOfWorkers
* 2; i
++) {
498 promises
.add(pool
.execute())
500 await Promise
.all(promises
)
501 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
502 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
503 expect(poolBusy
).toBe(numberOfWorkers
+ 1)
507 it('Verify that multiple tasks worker is working', async () => {
508 const pool
= new DynamicClusterPool(
511 './tests/worker-files/cluster/testMultiTasksWorker.js'
513 const data
= { n
: 10 }
514 const result0
= await pool
.execute(data
)
515 expect(result0
).toBe(false)
516 const result1
= await pool
.execute(data
, 'jsonIntegerSerialization')
517 expect(result1
).toBe(false)
518 const result2
= await pool
.execute(data
, 'factorial')
519 expect(result2
).toBe(3628800)
520 const result3
= await pool
.execute(data
, 'fibonacci')
521 expect(result3
).toBe(89)