1 const { expect
} = require('expect')
8 } = require('../../../lib/index')
9 const { CircularArray
} = require('../../../lib/circular-array')
11 describe('Abstract pool test suite', () => {
12 const numberOfWorkers
= 1
13 const workerNotFoundInPoolError
= new Error(
14 'Worker could not be found in the pool worker nodes'
16 class StubPoolWithRemoveAllWorker
extends FixedThreadPool
{
19 this.promiseResponseMap
.clear()
22 class StubPoolWithIsMain
extends FixedThreadPool
{
28 it('Simulate pool creation from a non main thread/process', () => {
31 new StubPoolWithIsMain(
33 './tests/worker-files/thread/testWorker.js',
35 errorHandler
: e
=> console
.error(e
)
38 ).toThrowError(new Error('Cannot start a pool from a worker!'))
41 it('Verify that filePath is checked', () => {
42 const expectedError
= new Error(
43 'Please specify a file with a worker implementation'
45 expect(() => new FixedThreadPool(numberOfWorkers
)).toThrowError(
48 expect(() => new FixedThreadPool(numberOfWorkers
, '')).toThrowError(
53 it('Verify that numberOfWorkers is checked', () => {
54 expect(() => new FixedThreadPool()).toThrowError(
56 'Cannot instantiate a pool without specifying the number of workers'
61 it('Verify that a negative number of workers is checked', () => {
64 new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.js')
67 'Cannot instantiate a pool with a negative number of workers'
72 it('Verify that a non integer number of workers is checked', () => {
75 new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.js')
78 'Cannot instantiate a pool with a non integer number of workers'
83 it('Verify that pool options are checked', async () => {
84 let pool
= new FixedThreadPool(
86 './tests/worker-files/thread/testWorker.js'
88 expect(pool
.opts
.enableEvents
).toBe(true)
89 expect(pool
.emitter
).toBeDefined()
90 expect(pool
.opts
.enableTasksQueue
).toBe(false)
91 expect(pool
.opts
.workerChoiceStrategy
).toBe(
92 WorkerChoiceStrategies
.ROUND_ROBIN
94 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
97 expect(pool
.opts
.messageHandler
).toBeUndefined()
98 expect(pool
.opts
.errorHandler
).toBeUndefined()
99 expect(pool
.opts
.onlineHandler
).toBeUndefined()
100 expect(pool
.opts
.exitHandler
).toBeUndefined()
102 const testHandler
= () => console
.log('test handler executed')
103 pool
= new FixedThreadPool(
105 './tests/worker-files/thread/testWorker.js',
107 workerChoiceStrategy
: WorkerChoiceStrategies
.LESS_USED
,
108 workerChoiceStrategyOptions
: { medRunTime
: true },
110 enableTasksQueue
: true,
111 messageHandler
: testHandler
,
112 errorHandler
: testHandler
,
113 onlineHandler
: testHandler
,
114 exitHandler
: testHandler
117 expect(pool
.opts
.enableEvents
).toBe(false)
118 expect(pool
.emitter
).toBeUndefined()
119 expect(pool
.opts
.enableTasksQueue
).toBe(true)
120 expect(pool
.opts
.workerChoiceStrategy
).toBe(
121 WorkerChoiceStrategies
.LESS_USED
123 expect(pool
.opts
.workerChoiceStrategyOptions
).toStrictEqual({
126 expect(pool
.opts
.messageHandler
).toStrictEqual(testHandler
)
127 expect(pool
.opts
.errorHandler
).toStrictEqual(testHandler
)
128 expect(pool
.opts
.onlineHandler
).toStrictEqual(testHandler
)
129 expect(pool
.opts
.exitHandler
).toStrictEqual(testHandler
)
133 it('Simulate worker not found during getWorkerTasksUsage', async () => {
134 const pool
= new StubPoolWithRemoveAllWorker(
136 './tests/worker-files/cluster/testWorker.js',
138 errorHandler
: e
=> console
.error(e
)
141 // Simulate worker not found.
142 pool
.removeAllWorker()
143 expect(() => pool
.getWorkerTasksUsage()).toThrowError(
144 workerNotFoundInPoolError
149 it('Verify that worker pool tasks usage are initialized', async () => {
150 const pool
= new FixedClusterPool(
152 './tests/worker-files/cluster/testWorker.js'
154 for (const workerNode
of pool
.workerNodes
) {
155 expect(workerNode
.tasksUsage
).toBeDefined()
156 expect(workerNode
.tasksUsage
.run
).toBe(0)
157 expect(workerNode
.tasksUsage
.running
).toBe(0)
158 expect(workerNode
.tasksUsage
.runTime
).toBe(0)
159 expect(workerNode
.tasksUsage
.runTimeHistory
).toBeInstanceOf(CircularArray
)
160 expect(workerNode
.tasksUsage
.runTimeHistory
.length
).toBe(0)
161 expect(workerNode
.tasksUsage
.avgRunTime
).toBe(0)
162 expect(workerNode
.tasksUsage
.medRunTime
).toBe(0)
163 expect(workerNode
.tasksUsage
.error
).toBe(0)
168 it('Verify that worker pool tasks queue are initialized', async () => {
169 const pool
= new FixedClusterPool(
171 './tests/worker-files/cluster/testWorker.js'
173 for (const workerNode
of pool
.workerNodes
) {
174 expect(workerNode
.tasksQueue
).toBeDefined()
175 expect(workerNode
.tasksQueue
).toBeInstanceOf(Array
)
176 expect(workerNode
.tasksQueue
.length
).toBe(0)
181 it('Verify that worker pool tasks usage are computed', async () => {
182 const pool
= new FixedClusterPool(
184 './tests/worker-files/cluster/testWorker.js'
187 for (let i
= 0; i
< numberOfWorkers
* 2; i
++) {
188 promises
.push(pool
.execute())
190 for (const workerNode
of pool
.workerNodes
) {
191 expect(workerNode
.tasksUsage
).toBeDefined()
192 expect(workerNode
.tasksUsage
.run
).toBe(0)
193 expect(workerNode
.tasksUsage
.running
).toBe(numberOfWorkers
* 2)
194 expect(workerNode
.tasksUsage
.runTime
).toBe(0)
195 expect(workerNode
.tasksUsage
.runTimeHistory
).toBeInstanceOf(CircularArray
)
196 expect(workerNode
.tasksUsage
.runTimeHistory
.length
).toBe(0)
197 expect(workerNode
.tasksUsage
.avgRunTime
).toBe(0)
198 expect(workerNode
.tasksUsage
.medRunTime
).toBe(0)
199 expect(workerNode
.tasksUsage
.error
).toBe(0)
201 await Promise
.all(promises
)
202 for (const workerNode
of pool
.workerNodes
) {
203 expect(workerNode
.tasksUsage
).toBeDefined()
204 expect(workerNode
.tasksUsage
.run
).toBe(numberOfWorkers
* 2)
205 expect(workerNode
.tasksUsage
.running
).toBe(0)
206 expect(workerNode
.tasksUsage
.runTime
).toBeGreaterThanOrEqual(0)
207 expect(workerNode
.tasksUsage
.runTimeHistory
).toBeInstanceOf(CircularArray
)
208 expect(workerNode
.tasksUsage
.runTimeHistory
.length
).toBe(0)
209 expect(workerNode
.tasksUsage
.avgRunTime
).toBeGreaterThanOrEqual(0)
210 expect(workerNode
.tasksUsage
.medRunTime
).toBe(0)
211 expect(workerNode
.tasksUsage
.error
).toBe(0)
216 it('Verify that worker pool tasks usage are reset at worker choice strategy change', async () => {
217 const pool
= new DynamicThreadPool(
220 './tests/worker-files/thread/testWorker.js'
223 for (let i
= 0; i
< numberOfWorkers
* 2; i
++) {
224 promises
.push(pool
.execute())
226 await Promise
.all(promises
)
227 for (const workerNode
of pool
.workerNodes
) {
228 expect(workerNode
.tasksUsage
).toBeDefined()
229 expect(workerNode
.tasksUsage
.run
).toBe(numberOfWorkers
* 2)
230 expect(workerNode
.tasksUsage
.running
).toBe(0)
231 expect(workerNode
.tasksUsage
.runTime
).toBeGreaterThanOrEqual(0)
232 expect(workerNode
.tasksUsage
.runTimeHistory
).toBeInstanceOf(CircularArray
)
233 expect(workerNode
.tasksUsage
.runTimeHistory
.length
).toBe(0)
234 expect(workerNode
.tasksUsage
.avgRunTime
).toBeGreaterThanOrEqual(0)
235 expect(workerNode
.tasksUsage
.medRunTime
).toBe(0)
236 expect(workerNode
.tasksUsage
.error
).toBe(0)
238 pool
.setWorkerChoiceStrategy(WorkerChoiceStrategies
.FAIR_SHARE
)
239 for (const workerNode
of pool
.workerNodes
) {
240 expect(workerNode
.tasksUsage
).toBeDefined()
241 expect(workerNode
.tasksUsage
.run
).toBe(0)
242 expect(workerNode
.tasksUsage
.running
).toBe(0)
243 expect(workerNode
.tasksUsage
.runTime
).toBe(0)
244 expect(workerNode
.tasksUsage
.runTimeHistory
).toBeInstanceOf(CircularArray
)
245 expect(workerNode
.tasksUsage
.runTimeHistory
.length
).toBe(0)
246 expect(workerNode
.tasksUsage
.avgRunTime
).toBe(0)
247 expect(workerNode
.tasksUsage
.medRunTime
).toBe(0)
248 expect(workerNode
.tasksUsage
.error
).toBe(0)
253 it("Verify that pool event emitter 'full' event can register a callback", async () => {
254 const pool
= new DynamicThreadPool(
257 './tests/worker-files/thread/testWorker.js'
261 pool
.emitter
.on(PoolEvents
.full
, () => ++poolFull
)
262 for (let i
= 0; i
< numberOfWorkers
* 2; i
++) {
263 promises
.push(pool
.execute())
265 await Promise
.all(promises
)
266 // The `full` event is triggered when the number of submitted tasks at once reach the number of dynamic pool workers.
267 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the dynamic pool.
268 expect(poolFull
).toBe(numberOfWorkers
+ 1)
272 it("Verify that pool event emitter 'busy' event can register a callback", async () => {
273 const pool
= new FixedThreadPool(
275 './tests/worker-files/thread/testWorker.js'
279 pool
.emitter
.on(PoolEvents
.busy
, () => ++poolBusy
)
280 for (let i
= 0; i
< numberOfWorkers
* 2; i
++) {
281 promises
.push(pool
.execute())
283 await Promise
.all(promises
)
284 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
285 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
286 expect(poolBusy
).toBe(numberOfWorkers
+ 1)