1 const { MessageChannel
} = require('worker_threads')
2 const { expect
} = require('expect')
10 WorkerChoiceStrategies
,
12 } = require('../../../lib')
13 const { CircularArray
} = require('../../../lib/circular-array')
14 const { Queue
} = require('../../../lib/queue')
15 const { version
} = require('../../../package.json')
16 const { waitPoolEvents
} = require('../../test-utils')
18 describe('Abstract pool test suite', () => {
19 const numberOfWorkers
= 2
20 class StubPoolWithIsMain
extends FixedThreadPool
{
26 it('Simulate pool creation from a non main thread/process', () => {
29 new StubPoolWithIsMain(
31 './tests/worker-files/thread/testWorker.js',
33 errorHandler
: e
=> console
.error(e
)
36 ).toThrowError('Cannot start a pool from a worker!')
39 it('Verify that filePath is checked', () => {
40 const expectedError
= new Error(
41 'Please specify a file with a worker implementation'
43 expect(() => new FixedThreadPool(numberOfWorkers
)).toThrowError(
46 expect(() => new FixedThreadPool(numberOfWorkers
, '')).toThrowError(
49 expect(() => new FixedThreadPool(numberOfWorkers
, 0)).toThrowError(
52 expect(() => new FixedThreadPool(numberOfWorkers
, true)).toThrowError(
56 () => new FixedThreadPool(numberOfWorkers
, './dummyWorker.ts')
57 ).toThrowError(new Error("Cannot find the worker file './dummyWorker.ts'"))
60 it('Verify that numberOfWorkers is checked', () => {
61 expect(() => new FixedThreadPool()).toThrowError(
62 'Cannot instantiate a pool without specifying the number of workers'
66 it('Verify that a negative number of workers is checked', () => {
69 new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.js')
72 'Cannot instantiate a pool with a negative number of workers'
77 it('Verify that a non integer number of workers is checked', () => {
80 new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.js')
83 'Cannot instantiate a pool with a non safe integer number of workers'
88 it('Verify that dynamic pool sizing is checked', () => {
91 new DynamicThreadPool(2, 1, './tests/worker-files/thread/testWorker.js')
94 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
99 new DynamicThreadPool(1, 1, './tests/worker-files/thread/testWorker.js')
102 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
107 new DynamicThreadPool(0, 0, './tests/worker-files/thread/testWorker.js')
110 'Cannot instantiate a dynamic pool with a pool size equal to zero'
115 it('Verify that pool options are checked', async () => {
116 let pool
= new FixedThreadPool(
118 './tests/worker-files/thread/testWorker.js'
120 expect(pool
.emitter
).toBeDefined()
121 expect(pool
.opts
.enableEvents
).toBe(true)
122 expect(pool
.opts
.restartWorkerOnError
).toBe(true)
123 expect(pool
.opts
.enableTasksQueue
).toBe(false)
124 expect(pool
.opts
.tasksQueueOptions
).toBeUndefined()
125 expect(pool
.opts
.workerChoiceStrategy
).toBe(
126 WorkerChoiceStrategies
.ROUND_ROBIN
128 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
129 runTime
: { median
: false },
130 waitTime
: { median
: false },
131 elu
: { median
: false }
133 expect(pool
.opts
.messageHandler
).toBeUndefined()
134 expect(pool
.opts
.errorHandler
).toBeUndefined()
135 expect(pool
.opts
.onlineHandler
).toBeUndefined()
136 expect(pool
.opts
.exitHandler
).toBeUndefined()
138 const testHandler
= () => console
.log('test handler executed')
139 pool
= new FixedThreadPool(
141 './tests/worker-files/thread/testWorker.js',
143 workerChoiceStrategy
: WorkerChoiceStrategies
.LEAST_USED
,
144 workerChoiceStrategyOptions
: {
145 runTime
: { median
: true },
146 weights
: { 0: 300, 1: 200 }
149 restartWorkerOnError
: false,
150 enableTasksQueue
: true,
151 tasksQueueOptions
: { concurrency
: 2 },
152 messageHandler
: testHandler
,
153 errorHandler
: testHandler
,
154 onlineHandler
: testHandler
,
155 exitHandler
: testHandler
158 expect(pool
.emitter
).toBeUndefined()
159 expect(pool
.opts
.enableEvents
).toBe(false)
160 expect(pool
.opts
.restartWorkerOnError
).toBe(false)
161 expect(pool
.opts
.enableTasksQueue
).toBe(true)
162 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({ concurrency
: 2 })
163 expect(pool
.opts
.workerChoiceStrategy
).toBe(
164 WorkerChoiceStrategies
.LEAST_USED
166 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
167 runTime
: { median
: true },
168 weights
: { 0: 300, 1: 200 }
170 expect(pool
.opts
.messageHandler
).toStrictEqual(testHandler
)
171 expect(pool
.opts
.errorHandler
).toStrictEqual(testHandler
)
172 expect(pool
.opts
.onlineHandler
).toStrictEqual(testHandler
)
173 expect(pool
.opts
.exitHandler
).toStrictEqual(testHandler
)
177 it('Verify that pool options are validated', async () => {
182 './tests/worker-files/thread/testWorker.js',
184 workerChoiceStrategy
: 'invalidStrategy'
187 ).toThrowError("Invalid worker choice strategy 'invalidStrategy'")
192 './tests/worker-files/thread/testWorker.js',
194 workerChoiceStrategyOptions
: 'invalidOptions'
198 'Invalid worker choice strategy options: must be a plain object'
204 './tests/worker-files/thread/testWorker.js',
206 workerChoiceStrategyOptions
: { weights
: {} }
210 'Invalid worker choice strategy options: must have a weight for each worker node'
216 './tests/worker-files/thread/testWorker.js',
218 workerChoiceStrategyOptions
: { measurement
: 'invalidMeasurement' }
222 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
228 './tests/worker-files/thread/testWorker.js',
230 enableTasksQueue
: true,
231 tasksQueueOptions
: { concurrency
: 0 }
234 ).toThrowError("Invalid worker tasks concurrency '0'")
239 './tests/worker-files/thread/testWorker.js',
241 enableTasksQueue
: true,
242 tasksQueueOptions
: 'invalidTasksQueueOptions'
245 ).toThrowError('Invalid tasks queue options: must be a plain object')
250 './tests/worker-files/thread/testWorker.js',
252 enableTasksQueue
: true,
253 tasksQueueOptions
: { concurrency
: 0.2 }
256 ).toThrowError('Invalid worker tasks concurrency: must be an integer')
259 it('Verify that pool worker choice strategy options can be set', async () => {
260 const pool
= new FixedThreadPool(
262 './tests/worker-files/thread/testWorker.js',
263 { workerChoiceStrategy
: WorkerChoiceStrategies
.FAIR_SHARE
}
265 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
266 runTime
: { median
: false },
267 waitTime
: { median
: false },
268 elu
: { median
: false }
270 for (const [, workerChoiceStrategy
] of pool
.workerChoiceStrategyContext
271 .workerChoiceStrategies
) {
272 expect(workerChoiceStrategy
.opts
).toStrictEqual({
273 runTime
: { median
: false },
274 waitTime
: { median
: false },
275 elu
: { median
: false }
279 pool
.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
297 pool
.setWorkerChoiceStrategyOptions({
298 runTime
: { median
: true },
299 elu
: { median
: true }
301 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
302 runTime
: { median
: true },
303 elu
: { median
: true }
305 for (const [, workerChoiceStrategy
] of pool
.workerChoiceStrategyContext
306 .workerChoiceStrategies
) {
307 expect(workerChoiceStrategy
.opts
).toStrictEqual({
308 runTime
: { median
: true },
309 elu
: { median
: true }
313 pool
.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
331 pool
.setWorkerChoiceStrategyOptions({
332 runTime
: { median
: false },
333 elu
: { median
: false }
335 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
336 runTime
: { median
: false },
337 elu
: { median
: false }
339 for (const [, workerChoiceStrategy
] of pool
.workerChoiceStrategyContext
340 .workerChoiceStrategies
) {
341 expect(workerChoiceStrategy
.opts
).toStrictEqual({
342 runTime
: { median
: false },
343 elu
: { median
: false }
347 pool
.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
366 pool
.setWorkerChoiceStrategyOptions('invalidWorkerChoiceStrategyOptions')
368 'Invalid worker choice strategy options: must be a plain object'
371 pool
.setWorkerChoiceStrategyOptions({ weights
: {} })
373 'Invalid worker choice strategy options: must have a weight for each worker node'
376 pool
.setWorkerChoiceStrategyOptions({ measurement
: 'invalidMeasurement' })
378 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
383 it('Verify that pool tasks queue can be enabled/disabled', async () => {
384 const pool
= new FixedThreadPool(
386 './tests/worker-files/thread/testWorker.js'
388 expect(pool
.opts
.enableTasksQueue
).toBe(false)
389 expect(pool
.opts
.tasksQueueOptions
).toBeUndefined()
390 pool
.enableTasksQueue(true)
391 expect(pool
.opts
.enableTasksQueue
).toBe(true)
392 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({ concurrency
: 1 })
393 pool
.enableTasksQueue(true, { concurrency
: 2 })
394 expect(pool
.opts
.enableTasksQueue
).toBe(true)
395 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({ concurrency
: 2 })
396 pool
.enableTasksQueue(false)
397 expect(pool
.opts
.enableTasksQueue
).toBe(false)
398 expect(pool
.opts
.tasksQueueOptions
).toBeUndefined()
402 it('Verify that pool tasks queue options can be set', async () => {
403 const pool
= new FixedThreadPool(
405 './tests/worker-files/thread/testWorker.js',
406 { enableTasksQueue
: true }
408 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({ concurrency
: 1 })
409 pool
.setTasksQueueOptions({ concurrency
: 2 })
410 expect(pool
.opts
.tasksQueueOptions
).toStrictEqual({ concurrency
: 2 })
412 pool
.setTasksQueueOptions('invalidTasksQueueOptions')
413 ).toThrowError('Invalid tasks queue options: must be a plain object')
414 expect(() => pool
.setTasksQueueOptions({ concurrency
: 0 })).toThrowError(
415 "Invalid worker tasks concurrency '0'"
417 expect(() => pool
.setTasksQueueOptions({ concurrency
: 0.2 })).toThrowError(
418 'Invalid worker tasks concurrency: must be an integer'
423 it('Verify that pool info is set', async () => {
424 let pool
= new FixedThreadPool(
426 './tests/worker-files/thread/testWorker.js'
428 expect(pool
.info
).toStrictEqual({
430 type
: PoolTypes
.fixed
,
431 worker
: WorkerTypes
.thread
,
433 strategy
: WorkerChoiceStrategies
.ROUND_ROBIN
,
434 minSize
: numberOfWorkers
,
435 maxSize
: numberOfWorkers
,
436 workerNodes
: numberOfWorkers
,
437 idleWorkerNodes
: numberOfWorkers
,
446 pool
= new DynamicClusterPool(
447 Math
.floor(numberOfWorkers
/ 2),
449 './tests/worker-files/cluster/testWorker.js'
451 expect(pool
.info
).toStrictEqual({
453 type
: PoolTypes
.dynamic
,
454 worker
: WorkerTypes
.cluster
,
456 strategy
: WorkerChoiceStrategies
.ROUND_ROBIN
,
457 minSize
: Math
.floor(numberOfWorkers
/ 2),
458 maxSize
: numberOfWorkers
,
459 workerNodes
: Math
.floor(numberOfWorkers
/ 2),
460 idleWorkerNodes
: Math
.floor(numberOfWorkers
/ 2),
471 it('Verify that pool worker tasks usage are initialized', async () => {
472 const pool
= new FixedClusterPool(
474 './tests/worker-files/cluster/testWorker.js'
476 for (const workerNode
of pool
.workerNodes
) {
477 expect(workerNode
.usage
).toStrictEqual({
486 history
: expect
.any(CircularArray
)
489 history
: expect
.any(CircularArray
)
493 history
: expect
.any(CircularArray
)
496 history
: expect
.any(CircularArray
)
504 it('Verify that pool worker tasks queue are initialized', async () => {
505 let pool
= new FixedClusterPool(
507 './tests/worker-files/cluster/testWorker.js'
509 for (const workerNode
of pool
.workerNodes
) {
510 expect(workerNode
.tasksQueue
).toBeDefined()
511 expect(workerNode
.tasksQueue
).toBeInstanceOf(Queue
)
512 expect(workerNode
.tasksQueue
.size
).toBe(0)
513 expect(workerNode
.tasksQueue
.maxSize
).toBe(0)
516 pool
= new DynamicThreadPool(
517 Math
.floor(numberOfWorkers
/ 2),
519 './tests/worker-files/thread/testWorker.js'
521 for (const workerNode
of pool
.workerNodes
) {
522 expect(workerNode
.tasksQueue
).toBeDefined()
523 expect(workerNode
.tasksQueue
).toBeInstanceOf(Queue
)
524 expect(workerNode
.tasksQueue
.size
).toBe(0)
525 expect(workerNode
.tasksQueue
.maxSize
).toBe(0)
529 it('Verify that pool worker info are initialized', async () => {
530 let pool
= new FixedClusterPool(
532 './tests/worker-files/cluster/testWorker.js'
534 for (const workerNode
of pool
.workerNodes
) {
535 expect(workerNode
.info
).toStrictEqual({
536 id
: expect
.any(Number
),
537 type
: WorkerTypes
.cluster
,
543 pool
= new DynamicThreadPool(
544 Math
.floor(numberOfWorkers
/ 2),
546 './tests/worker-files/thread/testWorker.js'
548 for (const workerNode
of pool
.workerNodes
) {
549 expect(workerNode
.info
).toStrictEqual({
550 id
: expect
.any(Number
),
551 type
: WorkerTypes
.thread
,
554 messageChannel
: expect
.any(MessageChannel
)
559 it('Verify that pool worker tasks usage are computed', async () => {
560 const pool
= new FixedClusterPool(
562 './tests/worker-files/cluster/testWorker.js'
564 const promises
= new Set()
565 const maxMultiplier
= 2
566 for (let i
= 0; i
< numberOfWorkers
* maxMultiplier
; i
++) {
567 promises
.add(pool
.execute())
569 for (const workerNode
of pool
.workerNodes
) {
570 expect(workerNode
.usage
).toStrictEqual({
573 executing
: maxMultiplier
,
579 history
: expect
.any(CircularArray
)
582 history
: expect
.any(CircularArray
)
586 history
: expect
.any(CircularArray
)
589 history
: expect
.any(CircularArray
)
594 await Promise
.all(promises
)
595 for (const workerNode
of pool
.workerNodes
) {
596 expect(workerNode
.usage
).toStrictEqual({
598 executed
: maxMultiplier
,
605 history
: expect
.any(CircularArray
)
608 history
: expect
.any(CircularArray
)
612 history
: expect
.any(CircularArray
)
615 history
: expect
.any(CircularArray
)
623 it('Verify that pool worker tasks usage are reset at worker choice strategy change', async () => {
624 const pool
= new DynamicThreadPool(
625 Math
.floor(numberOfWorkers
/ 2),
627 './tests/worker-files/thread/testWorker.js'
629 const promises
= new Set()
630 const maxMultiplier
= 2
631 for (let i
= 0; i
< numberOfWorkers
* maxMultiplier
; i
++) {
632 promises
.add(pool
.execute())
634 await Promise
.all(promises
)
635 for (const workerNode
of pool
.workerNodes
) {
636 expect(workerNode
.usage
).toStrictEqual({
638 executed
: expect
.any(Number
),
645 history
: expect
.any(CircularArray
)
648 history
: expect
.any(CircularArray
)
652 history
: expect
.any(CircularArray
)
655 history
: expect
.any(CircularArray
)
659 expect(workerNode
.usage
.tasks
.executed
).toBeGreaterThan(0)
660 expect(workerNode
.usage
.tasks
.executed
).toBeLessThanOrEqual(maxMultiplier
)
661 expect(workerNode
.usage
.runTime
.history
.length
).toBe(0)
662 expect(workerNode
.usage
.waitTime
.history
.length
).toBe(0)
663 expect(workerNode
.usage
.elu
.idle
.history
.length
).toBe(0)
664 expect(workerNode
.usage
.elu
.active
.history
.length
).toBe(0)
666 pool
.setWorkerChoiceStrategy(WorkerChoiceStrategies
.FAIR_SHARE
)
667 for (const workerNode
of pool
.workerNodes
) {
668 expect(workerNode
.usage
).toStrictEqual({
677 history
: expect
.any(CircularArray
)
680 history
: expect
.any(CircularArray
)
684 history
: expect
.any(CircularArray
)
687 history
: expect
.any(CircularArray
)
691 expect(workerNode
.usage
.runTime
.history
.length
).toBe(0)
692 expect(workerNode
.usage
.waitTime
.history
.length
).toBe(0)
693 expect(workerNode
.usage
.elu
.idle
.history
.length
).toBe(0)
694 expect(workerNode
.usage
.elu
.active
.history
.length
).toBe(0)
699 it("Verify that pool event emitter 'full' event can register a callback", async () => {
700 const pool
= new DynamicThreadPool(
701 Math
.floor(numberOfWorkers
/ 2),
703 './tests/worker-files/thread/testWorker.js'
705 const promises
= new Set()
708 pool
.emitter
.on(PoolEvents
.full
, info
=> {
712 for (let i
= 0; i
< numberOfWorkers
* 2; i
++) {
713 promises
.add(pool
.execute())
715 await Promise
.all(promises
)
716 // The `full` event is triggered when the number of submitted tasks at once reach the maximum number of workers in the dynamic pool.
717 // So in total numberOfWorkers * 2 - 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the dynamic pool with min = (max = numberOfWorkers) / 2.
718 expect(poolFull
).toBe(numberOfWorkers
* 2 - 1)
719 expect(poolInfo
).toStrictEqual({
721 type
: PoolTypes
.dynamic
,
722 worker
: WorkerTypes
.thread
,
723 ready
: expect
.any(Boolean
),
724 strategy
: WorkerChoiceStrategies
.ROUND_ROBIN
,
725 minSize
: expect
.any(Number
),
726 maxSize
: expect
.any(Number
),
727 workerNodes
: expect
.any(Number
),
728 idleWorkerNodes
: expect
.any(Number
),
729 busyWorkerNodes
: expect
.any(Number
),
730 executedTasks
: expect
.any(Number
),
731 executingTasks
: expect
.any(Number
),
732 queuedTasks
: expect
.any(Number
),
733 maxQueuedTasks
: expect
.any(Number
),
734 failedTasks
: expect
.any(Number
)
739 it("Verify that pool event emitter 'ready' event can register a callback", async () => {
740 const pool
= new DynamicClusterPool(
741 Math
.floor(numberOfWorkers
/ 2),
743 './tests/worker-files/cluster/testWorker.js'
747 pool
.emitter
.on(PoolEvents
.ready
, info
=> {
751 await
waitPoolEvents(pool
, PoolEvents
.ready
, 1)
752 expect(poolReady
).toBe(1)
753 expect(poolInfo
).toStrictEqual({
755 type
: PoolTypes
.dynamic
,
756 worker
: WorkerTypes
.cluster
,
758 strategy
: WorkerChoiceStrategies
.ROUND_ROBIN
,
759 minSize
: expect
.any(Number
),
760 maxSize
: expect
.any(Number
),
761 workerNodes
: expect
.any(Number
),
762 idleWorkerNodes
: expect
.any(Number
),
763 busyWorkerNodes
: expect
.any(Number
),
764 executedTasks
: expect
.any(Number
),
765 executingTasks
: expect
.any(Number
),
766 queuedTasks
: expect
.any(Number
),
767 maxQueuedTasks
: expect
.any(Number
),
768 failedTasks
: expect
.any(Number
)
773 it("Verify that pool event emitter 'busy' event can register a callback", async () => {
774 const pool
= new FixedThreadPool(
776 './tests/worker-files/thread/testWorker.js'
778 const promises
= new Set()
781 pool
.emitter
.on(PoolEvents
.busy
, info
=> {
785 for (let i
= 0; i
< numberOfWorkers
* 2; i
++) {
786 promises
.add(pool
.execute())
788 await Promise
.all(promises
)
789 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
790 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
791 expect(poolBusy
).toBe(numberOfWorkers
+ 1)
792 expect(poolInfo
).toStrictEqual({
794 type
: PoolTypes
.fixed
,
795 worker
: WorkerTypes
.thread
,
796 ready
: expect
.any(Boolean
),
797 strategy
: WorkerChoiceStrategies
.ROUND_ROBIN
,
798 minSize
: expect
.any(Number
),
799 maxSize
: expect
.any(Number
),
800 workerNodes
: expect
.any(Number
),
801 idleWorkerNodes
: expect
.any(Number
),
802 busyWorkerNodes
: expect
.any(Number
),
803 executedTasks
: expect
.any(Number
),
804 executingTasks
: expect
.any(Number
),
805 queuedTasks
: expect
.any(Number
),
806 maxQueuedTasks
: expect
.any(Number
),
807 failedTasks
: expect
.any(Number
)
812 it('Verify that multiple tasks worker is working', async () => {
813 const pool
= new DynamicClusterPool(
814 Math
.floor(numberOfWorkers
/ 2),
816 './tests/worker-files/cluster/testMultiTasksWorker.js'
818 const data
= { n
: 10 }
819 const result0
= await pool
.execute(data
)
820 expect(result0
).toStrictEqual({ ok
: 1 })
821 const result1
= await pool
.execute(data
, 'jsonIntegerSerialization')
822 expect(result1
).toStrictEqual({ ok
: 1 })
823 const result2
= await pool
.execute(data
, 'factorial')
824 expect(result2
).toBe(3628800)
825 const result3
= await pool
.execute(data
, 'fibonacci')
826 expect(result3
).toBe(55)