1 const { expect
} = require('expect')
8 } = require('../../../lib/index')
9 const { CircularArray
} = require('../../../lib/circular-array')
11 describe('Abstract pool test suite', () => {
12 const numberOfWorkers
= 1
13 const workerNotFoundInPoolError
= new Error(
14 'Worker could not be found in the pool worker nodes'
16 class StubPoolWithRemoveAllWorker
extends FixedThreadPool
{
19 this.promiseResponseMap
.clear()
22 class StubPoolWithIsMain
extends FixedThreadPool
{
28 it('Simulate pool creation from a non main thread/process', () => {
31 new StubPoolWithIsMain(
33 './tests/worker-files/thread/testWorker.js',
35 errorHandler
: e
=> console
.error(e
)
38 ).toThrowError(new Error('Cannot start a pool from a worker!'))
41 it('Verify that filePath is checked', () => {
42 const expectedError
= new Error(
43 'Please specify a file with a worker implementation'
45 expect(() => new FixedThreadPool(numberOfWorkers
)).toThrowError(
48 expect(() => new FixedThreadPool(numberOfWorkers
, '')).toThrowError(
53 it('Verify that numberOfWorkers is checked', () => {
54 expect(() => new FixedThreadPool()).toThrowError(
56 'Cannot instantiate a pool without specifying the number of workers'
61 it('Verify that a negative number of workers is checked', () => {
64 new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.js')
67 'Cannot instantiate a pool with a negative number of workers'
72 it('Verify that a non integer number of workers is checked', () => {
75 new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.js')
78 'Cannot instantiate a pool with a non integer number of workers'
83 it('Verify that pool options are checked', async () => {
84 let pool
= new FixedThreadPool(
86 './tests/worker-files/thread/testWorker.js'
88 expect(pool
.opts
.enableEvents
).toBe(true)
89 expect(pool
.emitter
).toBeDefined()
90 expect(pool
.opts
.enableTasksQueue
).toBe(false)
91 expect(pool
.opts
.workerChoiceStrategy
).toBe(
92 WorkerChoiceStrategies
.ROUND_ROBIN
94 expect(pool
.opts
.messageHandler
).toBeUndefined()
95 expect(pool
.opts
.errorHandler
).toBeUndefined()
96 expect(pool
.opts
.onlineHandler
).toBeUndefined()
97 expect(pool
.opts
.exitHandler
).toBeUndefined()
99 const testHandler
= () => console
.log('test handler executed')
100 pool
= new FixedThreadPool(
102 './tests/worker-files/thread/testWorker.js',
104 workerChoiceStrategy
: WorkerChoiceStrategies
.LESS_USED
,
106 enableTasksQueue
: true,
107 messageHandler
: testHandler
,
108 errorHandler
: testHandler
,
109 onlineHandler
: testHandler
,
110 exitHandler
: testHandler
113 expect(pool
.opts
.enableEvents
).toBe(false)
114 expect(pool
.emitter
).toBeUndefined()
115 expect(pool
.opts
.enableTasksQueue
).toBe(true)
116 expect(pool
.opts
.workerChoiceStrategy
).toBe(
117 WorkerChoiceStrategies
.LESS_USED
119 expect(pool
.opts
.messageHandler
).toStrictEqual(testHandler
)
120 expect(pool
.opts
.errorHandler
).toStrictEqual(testHandler
)
121 expect(pool
.opts
.onlineHandler
).toStrictEqual(testHandler
)
122 expect(pool
.opts
.exitHandler
).toStrictEqual(testHandler
)
126 it('Simulate worker not found during getWorkerTasksUsage', async () => {
127 const pool
= new StubPoolWithRemoveAllWorker(
129 './tests/worker-files/cluster/testWorker.js',
131 errorHandler
: e
=> console
.error(e
)
134 // Simulate worker not found.
135 pool
.removeAllWorker()
136 expect(() => pool
.getWorkerTasksUsage()).toThrowError(
137 workerNotFoundInPoolError
142 it('Verify that worker pool tasks usage are initialized', async () => {
143 const pool
= new FixedClusterPool(
145 './tests/worker-files/cluster/testWorker.js'
147 for (const workerNode
of pool
.workerNodes
) {
148 expect(workerNode
.tasksUsage
).toBeDefined()
149 expect(workerNode
.tasksUsage
.run
).toBe(0)
150 expect(workerNode
.tasksUsage
.running
).toBe(0)
151 expect(workerNode
.tasksUsage
.runTime
).toBe(0)
152 expect(workerNode
.tasksUsage
.runTimeHistory
).toBeInstanceOf(CircularArray
)
153 expect(workerNode
.tasksUsage
.runTimeHistory
.length
).toBe(0)
154 expect(workerNode
.tasksUsage
.avgRunTime
).toBe(0)
155 expect(workerNode
.tasksUsage
.medRunTime
).toBe(0)
156 expect(workerNode
.tasksUsage
.error
).toBe(0)
161 it('Verify that worker pool tasks queue are initialized', async () => {
162 const pool
= new FixedClusterPool(
164 './tests/worker-files/cluster/testWorker.js'
166 for (const workerNode
of pool
.workerNodes
) {
167 expect(workerNode
.tasksQueue
).toBeDefined()
168 expect(workerNode
.tasksQueue
).toBeInstanceOf(Array
)
169 expect(workerNode
.tasksQueue
.length
).toBe(0)
174 it('Verify that worker pool tasks usage are computed', async () => {
175 const pool
= new FixedClusterPool(
177 './tests/worker-files/cluster/testWorker.js'
180 for (let i
= 0; i
< numberOfWorkers
* 2; i
++) {
181 promises
.push(pool
.execute())
183 for (const workerNode
of pool
.workerNodes
) {
184 expect(workerNode
.tasksUsage
).toBeDefined()
185 expect(workerNode
.tasksUsage
.run
).toBe(0)
186 expect(workerNode
.tasksUsage
.running
).toBe(numberOfWorkers
* 2)
187 expect(workerNode
.tasksUsage
.runTime
).toBe(0)
188 expect(workerNode
.tasksUsage
.runTimeHistory
).toBeInstanceOf(CircularArray
)
189 expect(workerNode
.tasksUsage
.runTimeHistory
.length
).toBe(0)
190 expect(workerNode
.tasksUsage
.avgRunTime
).toBe(0)
191 expect(workerNode
.tasksUsage
.medRunTime
).toBe(0)
192 expect(workerNode
.tasksUsage
.error
).toBe(0)
194 await Promise
.all(promises
)
195 for (const workerNode
of pool
.workerNodes
) {
196 expect(workerNode
.tasksUsage
).toBeDefined()
197 expect(workerNode
.tasksUsage
.run
).toBe(numberOfWorkers
* 2)
198 expect(workerNode
.tasksUsage
.running
).toBe(0)
199 expect(workerNode
.tasksUsage
.runTime
).toBeGreaterThanOrEqual(0)
200 expect(workerNode
.tasksUsage
.runTimeHistory
).toBeInstanceOf(CircularArray
)
201 expect(workerNode
.tasksUsage
.runTimeHistory
.length
).toBe(0)
202 expect(workerNode
.tasksUsage
.avgRunTime
).toBeGreaterThanOrEqual(0)
203 expect(workerNode
.tasksUsage
.medRunTime
).toBe(0)
204 expect(workerNode
.tasksUsage
.error
).toBe(0)
209 it('Verify that worker pool tasks usage are reset at worker choice strategy change', async () => {
210 const pool
= new DynamicThreadPool(
213 './tests/worker-files/thread/testWorker.js'
216 for (let i
= 0; i
< numberOfWorkers
* 2; i
++) {
217 promises
.push(pool
.execute())
219 await Promise
.all(promises
)
220 for (const workerNode
of pool
.workerNodes
) {
221 expect(workerNode
.tasksUsage
).toBeDefined()
222 expect(workerNode
.tasksUsage
.run
).toBe(numberOfWorkers
* 2)
223 expect(workerNode
.tasksUsage
.running
).toBe(0)
224 expect(workerNode
.tasksUsage
.runTime
).toBeGreaterThanOrEqual(0)
225 expect(workerNode
.tasksUsage
.runTimeHistory
).toBeInstanceOf(CircularArray
)
226 expect(workerNode
.tasksUsage
.runTimeHistory
.length
).toBe(0)
227 expect(workerNode
.tasksUsage
.avgRunTime
).toBeGreaterThanOrEqual(0)
228 expect(workerNode
.tasksUsage
.medRunTime
).toBe(0)
229 expect(workerNode
.tasksUsage
.error
).toBe(0)
231 pool
.setWorkerChoiceStrategy(WorkerChoiceStrategies
.FAIR_SHARE
)
232 for (const workerNode
of pool
.workerNodes
) {
233 expect(workerNode
.tasksUsage
).toBeDefined()
234 expect(workerNode
.tasksUsage
.run
).toBe(0)
235 expect(workerNode
.tasksUsage
.running
).toBe(0)
236 expect(workerNode
.tasksUsage
.runTime
).toBe(0)
237 expect(workerNode
.tasksUsage
.runTimeHistory
).toBeInstanceOf(CircularArray
)
238 expect(workerNode
.tasksUsage
.runTimeHistory
.length
).toBe(0)
239 expect(workerNode
.tasksUsage
.avgRunTime
).toBe(0)
240 expect(workerNode
.tasksUsage
.medRunTime
).toBe(0)
241 expect(workerNode
.tasksUsage
.error
).toBe(0)
246 it("Verify that pool event emitter 'full' event can register a callback", async () => {
247 const pool
= new DynamicThreadPool(
250 './tests/worker-files/thread/testWorker.js'
254 pool
.emitter
.on(PoolEvents
.full
, () => ++poolFull
)
255 for (let i
= 0; i
< numberOfWorkers
* 2; i
++) {
256 promises
.push(pool
.execute())
258 await Promise
.all(promises
)
259 // The `full` event is triggered when the number of submitted tasks at once reach the number of dynamic pool workers.
260 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the dynamic pool.
261 expect(poolFull
).toBe(numberOfWorkers
+ 1)
265 it("Verify that pool event emitter 'busy' event can register a callback", async () => {
266 const pool
= new FixedThreadPool(
268 './tests/worker-files/thread/testWorker.js'
272 pool
.emitter
.on(PoolEvents
.busy
, () => ++poolBusy
)
273 for (let i
= 0; i
< numberOfWorkers
* 2; i
++) {
274 promises
.push(pool
.execute())
276 await Promise
.all(promises
)
277 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
278 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
279 expect(poolBusy
).toBe(numberOfWorkers
+ 1)