1 const { expect
} = require('expect')
8 } = require('../../../lib/index')
9 const { CircularArray
} = require('../../../lib/circular-array')
11 describe('Abstract pool test suite', () => {
12 const numberOfWorkers
= 1
13 const workerNotFoundInPoolError
= new Error(
14 'Worker could not be found in the pool worker nodes'
16 class StubPoolWithRemoveAllWorker
extends FixedThreadPool
{
19 this.promiseResponseMap
.clear()
22 class StubPoolWithIsMain
extends FixedThreadPool
{
28 it('Simulate pool creation from a non main thread/process', () => {
31 new StubPoolWithIsMain(
33 './tests/worker-files/thread/testWorker.js',
35 errorHandler
: e
=> console
.error(e
)
38 ).toThrowError(new Error('Cannot start a pool from a worker!'))
41 it('Verify that filePath is checked', () => {
42 const expectedError
= new Error(
43 'Please specify a file with a worker implementation'
45 expect(() => new FixedThreadPool(numberOfWorkers
)).toThrowError(
48 expect(() => new FixedThreadPool(numberOfWorkers
, '')).toThrowError(
53 it('Verify that numberOfWorkers is checked', () => {
54 expect(() => new FixedThreadPool()).toThrowError(
56 'Cannot instantiate a pool without specifying the number of workers'
61 it('Verify that a negative number of workers is checked', () => {
64 new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.js')
67 'Cannot instantiate a pool with a negative number of workers'
72 it('Verify that a non integer number of workers is checked', () => {
75 new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.js')
78 'Cannot instantiate a pool with a non integer number of workers'
83 it('Verify that pool options are checked', async () => {
84 let pool
= new FixedThreadPool(
86 './tests/worker-files/thread/testWorker.js'
88 expect(pool
.opts
.enableEvents
).toBe(true)
89 expect(pool
.emitter
).toBeDefined()
90 expect(pool
.opts
.workerChoiceStrategy
).toBe(
91 WorkerChoiceStrategies
.ROUND_ROBIN
93 expect(pool
.opts
.messageHandler
).toBeUndefined()
94 expect(pool
.opts
.errorHandler
).toBeUndefined()
95 expect(pool
.opts
.onlineHandler
).toBeUndefined()
96 expect(pool
.opts
.exitHandler
).toBeUndefined()
98 const testHandler
= () => console
.log('test handler executed')
99 pool
= new FixedThreadPool(
101 './tests/worker-files/thread/testWorker.js',
103 workerChoiceStrategy
: WorkerChoiceStrategies
.LESS_USED
,
105 messageHandler
: testHandler
,
106 errorHandler
: testHandler
,
107 onlineHandler
: testHandler
,
108 exitHandler
: testHandler
111 expect(pool
.opts
.enableEvents
).toBe(false)
112 expect(pool
.emitter
).toBeUndefined()
113 expect(pool
.opts
.workerChoiceStrategy
).toBe(
114 WorkerChoiceStrategies
.LESS_USED
116 expect(pool
.opts
.messageHandler
).toStrictEqual(testHandler
)
117 expect(pool
.opts
.errorHandler
).toStrictEqual(testHandler
)
118 expect(pool
.opts
.onlineHandler
).toStrictEqual(testHandler
)
119 expect(pool
.opts
.exitHandler
).toStrictEqual(testHandler
)
123 it('Simulate worker not found during getWorkerTasksUsage', async () => {
124 const pool
= new StubPoolWithRemoveAllWorker(
126 './tests/worker-files/cluster/testWorker.js',
128 errorHandler
: e
=> console
.error(e
)
131 // Simulate worker not found.
132 pool
.removeAllWorker()
133 expect(() => pool
.getWorkerTasksUsage()).toThrowError(
134 workerNotFoundInPoolError
139 it('Verify that worker pool tasks usage are initialized', async () => {
140 const pool
= new FixedClusterPool(
142 './tests/worker-files/cluster/testWorker.js'
144 for (const workerNode
of pool
.workerNodes
) {
145 expect(workerNode
.tasksUsage
).toBeDefined()
146 expect(workerNode
.tasksUsage
.run
).toBe(0)
147 expect(workerNode
.tasksUsage
.running
).toBe(0)
148 expect(workerNode
.tasksUsage
.runTime
).toBe(0)
149 expect(workerNode
.tasksUsage
.runTimeHistory
).toBeInstanceOf(CircularArray
)
150 expect(workerNode
.tasksUsage
.runTimeHistory
.length
).toBe(0)
151 expect(workerNode
.tasksUsage
.avgRunTime
).toBe(0)
152 expect(workerNode
.tasksUsage
.medRunTime
).toBe(0)
153 expect(workerNode
.tasksUsage
.error
).toBe(0)
158 it('Verify that worker pool tasks queue are initialized', async () => {
159 const pool
= new FixedClusterPool(
161 './tests/worker-files/cluster/testWorker.js'
163 for (const workerNode
of pool
.workerNodes
) {
164 expect(workerNode
.tasksQueue
).toBeDefined()
165 expect(workerNode
.tasksQueue
).toBeInstanceOf(Array
)
166 expect(workerNode
.tasksQueue
.length
).toBe(0)
171 it('Verify that worker pool tasks usage are computed', async () => {
172 const pool
= new FixedClusterPool(
174 './tests/worker-files/cluster/testWorker.js'
177 for (let i
= 0; i
< numberOfWorkers
* 2; i
++) {
178 promises
.push(pool
.execute())
180 for (const workerNode
of pool
.workerNodes
) {
181 expect(workerNode
.tasksUsage
).toBeDefined()
182 expect(workerNode
.tasksUsage
.run
).toBe(0)
183 expect(workerNode
.tasksUsage
.running
).toBe(numberOfWorkers
* 2)
184 expect(workerNode
.tasksUsage
.runTime
).toBe(0)
185 expect(workerNode
.tasksUsage
.runTimeHistory
).toBeInstanceOf(CircularArray
)
186 expect(workerNode
.tasksUsage
.runTimeHistory
.length
).toBe(0)
187 expect(workerNode
.tasksUsage
.avgRunTime
).toBe(0)
188 expect(workerNode
.tasksUsage
.medRunTime
).toBe(0)
189 expect(workerNode
.tasksUsage
.error
).toBe(0)
191 await Promise
.all(promises
)
192 for (const workerNode
of pool
.workerNodes
) {
193 expect(workerNode
.tasksUsage
).toBeDefined()
194 expect(workerNode
.tasksUsage
.run
).toBe(numberOfWorkers
* 2)
195 expect(workerNode
.tasksUsage
.running
).toBe(0)
196 expect(workerNode
.tasksUsage
.runTime
).toBeGreaterThanOrEqual(0)
197 expect(workerNode
.tasksUsage
.runTimeHistory
).toBeInstanceOf(CircularArray
)
198 expect(workerNode
.tasksUsage
.runTimeHistory
.length
).toBe(0)
199 expect(workerNode
.tasksUsage
.avgRunTime
).toBeGreaterThanOrEqual(0)
200 expect(workerNode
.tasksUsage
.medRunTime
).toBe(0)
201 expect(workerNode
.tasksUsage
.error
).toBe(0)
206 it('Verify that worker pool tasks usage are reset at worker choice strategy change', async () => {
207 const pool
= new DynamicThreadPool(
210 './tests/worker-files/thread/testWorker.js'
213 for (let i
= 0; i
< numberOfWorkers
* 2; i
++) {
214 promises
.push(pool
.execute())
216 await Promise
.all(promises
)
217 for (const workerNode
of pool
.workerNodes
) {
218 expect(workerNode
.tasksUsage
).toBeDefined()
219 expect(workerNode
.tasksUsage
.run
).toBe(numberOfWorkers
* 2)
220 expect(workerNode
.tasksUsage
.running
).toBe(0)
221 expect(workerNode
.tasksUsage
.runTime
).toBeGreaterThanOrEqual(0)
222 expect(workerNode
.tasksUsage
.runTimeHistory
).toBeInstanceOf(CircularArray
)
223 expect(workerNode
.tasksUsage
.runTimeHistory
.length
).toBe(0)
224 expect(workerNode
.tasksUsage
.avgRunTime
).toBeGreaterThanOrEqual(0)
225 expect(workerNode
.tasksUsage
.medRunTime
).toBe(0)
226 expect(workerNode
.tasksUsage
.error
).toBe(0)
228 pool
.setWorkerChoiceStrategy(WorkerChoiceStrategies
.FAIR_SHARE
)
229 for (const workerNode
of pool
.workerNodes
) {
230 expect(workerNode
.tasksUsage
).toBeDefined()
231 expect(workerNode
.tasksUsage
.run
).toBe(0)
232 expect(workerNode
.tasksUsage
.running
).toBe(0)
233 expect(workerNode
.tasksUsage
.runTime
).toBe(0)
234 expect(workerNode
.tasksUsage
.runTimeHistory
).toBeInstanceOf(CircularArray
)
235 expect(workerNode
.tasksUsage
.runTimeHistory
.length
).toBe(0)
236 expect(workerNode
.tasksUsage
.avgRunTime
).toBe(0)
237 expect(workerNode
.tasksUsage
.medRunTime
).toBe(0)
238 expect(workerNode
.tasksUsage
.error
).toBe(0)
243 it("Verify that pool event emitter 'full' event can register a callback", async () => {
244 const pool
= new DynamicThreadPool(
247 './tests/worker-files/thread/testWorker.js'
251 pool
.emitter
.on(PoolEvents
.full
, () => ++poolFull
)
252 for (let i
= 0; i
< numberOfWorkers
* 2; i
++) {
253 promises
.push(pool
.execute())
255 await Promise
.all(promises
)
256 // The `full` event is triggered when the number of submitted tasks at once reach the number of dynamic pool workers.
257 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the dynamic pool.
258 expect(poolFull
).toBe(numberOfWorkers
+ 1)
262 it("Verify that pool event emitter 'busy' event can register a callback", async () => {
263 const pool
= new FixedThreadPool(
265 './tests/worker-files/thread/testWorker.js'
269 pool
.emitter
.on(PoolEvents
.busy
, () => ++poolBusy
)
270 for (let i
= 0; i
< numberOfWorkers
* 2; i
++) {
271 promises
.push(pool
.execute())
273 await Promise
.all(promises
)
274 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
275 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
276 expect(poolBusy
).toBe(numberOfWorkers
+ 1)