1 const { expect
} = require('expect')
8 } = require('../../../lib/index')
9 const { CircularArray
} = require('../../../lib/circular-array')
11 describe('Abstract pool test suite', () => {
12 const numberOfWorkers
= 1
13 const workerNotFoundInPoolError
= new Error(
14 'Worker could not be found in the pool'
16 class StubPoolWithRemoveAllWorker
extends FixedThreadPool
{
19 this.promiseResponseMap
.clear()
22 class StubPoolWithIsMain
extends FixedThreadPool
{
28 it('Simulate pool creation from a non main thread/process', () => {
31 new StubPoolWithIsMain(
33 './tests/worker-files/thread/testWorker.js',
35 errorHandler
: e
=> console
.error(e
)
38 ).toThrowError(new Error('Cannot start a pool from a worker!'))
41 it('Verify that filePath is checked', () => {
42 const expectedError
= new Error(
43 'Please specify a file with a worker implementation'
45 expect(() => new FixedThreadPool(numberOfWorkers
)).toThrowError(
48 expect(() => new FixedThreadPool(numberOfWorkers
, '')).toThrowError(
53 it('Verify that numberOfWorkers is checked', () => {
54 expect(() => new FixedThreadPool()).toThrowError(
56 'Cannot instantiate a pool without specifying the number of workers'
61 it('Verify that a negative number of workers is checked', () => {
64 new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.js')
67 'Cannot instantiate a pool with a negative number of workers'
72 it('Verify that a non integer number of workers is checked', () => {
75 new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.js')
78 'Cannot instantiate a pool with a non integer number of workers'
83 it('Verify that pool options are checked', async () => {
84 let pool
= new FixedThreadPool(
86 './tests/worker-files/thread/testWorker.js'
88 expect(pool
.opts
.enableEvents
).toBe(true)
89 expect(pool
.emitter
).toBeDefined()
90 expect(pool
.opts
.workerChoiceStrategy
).toBe(
91 WorkerChoiceStrategies
.ROUND_ROBIN
93 expect(pool
.opts
.messageHandler
).toBeUndefined()
94 expect(pool
.opts
.errorHandler
).toBeUndefined()
95 expect(pool
.opts
.onlineHandler
).toBeUndefined()
96 expect(pool
.opts
.exitHandler
).toBeUndefined()
98 const testHandler
= () => console
.log('test handler executed')
99 pool
= new FixedThreadPool(
101 './tests/worker-files/thread/testWorker.js',
103 workerChoiceStrategy
: WorkerChoiceStrategies
.LESS_USED
,
105 messageHandler
: testHandler
,
106 errorHandler
: testHandler
,
107 onlineHandler
: testHandler
,
108 exitHandler
: testHandler
111 expect(pool
.opts
.enableEvents
).toBe(false)
112 expect(pool
.emitter
).toBeUndefined()
113 expect(pool
.opts
.workerChoiceStrategy
).toBe(
114 WorkerChoiceStrategies
.LESS_USED
116 expect(pool
.opts
.messageHandler
).toStrictEqual(testHandler
)
117 expect(pool
.opts
.errorHandler
).toStrictEqual(testHandler
)
118 expect(pool
.opts
.onlineHandler
).toStrictEqual(testHandler
)
119 expect(pool
.opts
.exitHandler
).toStrictEqual(testHandler
)
123 it('Simulate worker not found during getWorkerTasksUsage', async () => {
124 const pool
= new StubPoolWithRemoveAllWorker(
126 './tests/worker-files/cluster/testWorker.js',
128 errorHandler
: e
=> console
.error(e
)
131 // Simulate worker not found.
132 pool
.removeAllWorker()
133 expect(() => pool
.getWorkerTasksUsage()).toThrowError(
134 workerNotFoundInPoolError
139 it('Verify that worker pool tasks usage are initialized', async () => {
140 const pool
= new FixedClusterPool(
142 './tests/worker-files/cluster/testWorker.js'
144 for (const workerItem
of pool
.workers
) {
145 expect(workerItem
.tasksUsage
).toBeDefined()
146 expect(workerItem
.tasksUsage
.run
).toBe(0)
147 expect(workerItem
.tasksUsage
.running
).toBe(0)
148 expect(workerItem
.tasksUsage
.runTime
).toBe(0)
149 expect(workerItem
.tasksUsage
.runTimeHistory
).toBeInstanceOf(CircularArray
)
150 expect(workerItem
.tasksUsage
.avgRunTime
).toBe(0)
151 expect(workerItem
.tasksUsage
.medRunTime
).toBe(0)
152 expect(workerItem
.tasksUsage
.error
).toBe(0)
157 it('Verify that worker pool tasks usage are computed', async () => {
158 const pool
= new FixedClusterPool(
160 './tests/worker-files/cluster/testWorker.js'
163 for (let i
= 0; i
< numberOfWorkers
* 2; i
++) {
164 promises
.push(pool
.execute())
166 for (const workerItem
of pool
.workers
) {
167 expect(workerItem
.tasksUsage
).toBeDefined()
168 expect(workerItem
.tasksUsage
.run
).toBe(0)
169 expect(workerItem
.tasksUsage
.running
).toBe(numberOfWorkers
* 2)
170 expect(workerItem
.tasksUsage
.runTime
).toBe(0)
171 expect(workerItem
.tasksUsage
.runTimeHistory
).toBeInstanceOf(CircularArray
)
172 expect(workerItem
.tasksUsage
.avgRunTime
).toBe(0)
173 expect(workerItem
.tasksUsage
.medRunTime
).toBe(0)
174 expect(workerItem
.tasksUsage
.error
).toBe(0)
176 await Promise
.all(promises
)
177 for (const workerItem
of pool
.workers
) {
178 expect(workerItem
.tasksUsage
).toBeDefined()
179 expect(workerItem
.tasksUsage
.run
).toBe(numberOfWorkers
* 2)
180 expect(workerItem
.tasksUsage
.running
).toBe(0)
181 expect(workerItem
.tasksUsage
.runTime
).toBeGreaterThanOrEqual(0)
182 expect(workerItem
.tasksUsage
.runTimeHistory
).toBeInstanceOf(CircularArray
)
183 expect(workerItem
.tasksUsage
.avgRunTime
).toBeGreaterThanOrEqual(0)
184 expect(workerItem
.tasksUsage
.medRunTime
).toBe(0)
185 expect(workerItem
.tasksUsage
.error
).toBe(0)
190 it('Verify that worker pool tasks usage are reset at worker choice strategy change', async () => {
191 const pool
= new DynamicThreadPool(
194 './tests/worker-files/thread/testWorker.js'
197 for (let i
= 0; i
< numberOfWorkers
* 2; i
++) {
198 promises
.push(pool
.execute())
200 await Promise
.all(promises
)
201 for (const workerItem
of pool
.workers
) {
202 expect(workerItem
.tasksUsage
).toBeDefined()
203 expect(workerItem
.tasksUsage
.run
).toBe(numberOfWorkers
* 2)
204 expect(workerItem
.tasksUsage
.running
).toBe(0)
205 expect(workerItem
.tasksUsage
.runTime
).toBeGreaterThanOrEqual(0)
206 expect(workerItem
.tasksUsage
.runTimeHistory
).toBeInstanceOf(CircularArray
)
207 expect(workerItem
.tasksUsage
.avgRunTime
).toBeGreaterThanOrEqual(0)
208 expect(workerItem
.tasksUsage
.medRunTime
).toBe(0)
209 expect(workerItem
.tasksUsage
.error
).toBe(0)
211 pool
.setWorkerChoiceStrategy(WorkerChoiceStrategies
.FAIR_SHARE
)
212 for (const workerItem
of pool
.workers
) {
213 expect(workerItem
.tasksUsage
).toBeDefined()
214 expect(workerItem
.tasksUsage
.run
).toBe(0)
215 expect(workerItem
.tasksUsage
.running
).toBe(0)
216 expect(workerItem
.tasksUsage
.runTime
).toBe(0)
217 expect(workerItem
.tasksUsage
.runTimeHistory
).toBeInstanceOf(CircularArray
)
218 expect(workerItem
.tasksUsage
.avgRunTime
).toBe(0)
219 expect(workerItem
.tasksUsage
.medRunTime
).toBe(0)
220 expect(workerItem
.tasksUsage
.error
).toBe(0)
225 it("Verify that pool event emitter 'full' event can register a callback", async () => {
226 const pool
= new DynamicThreadPool(
229 './tests/worker-files/thread/testWorker.js'
233 pool
.emitter
.on(PoolEvents
.full
, () => ++poolFull
)
234 for (let i
= 0; i
< numberOfWorkers
* 2; i
++) {
235 promises
.push(pool
.execute())
237 await Promise
.all(promises
)
238 // The `full` event is triggered when the number of submitted tasks at once reach the number of dynamic pool workers.
239 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the dynamic pool.
240 expect(poolFull
).toBe(numberOfWorkers
+ 1)
244 it("Verify that pool event emitter 'busy' event can register a callback", async () => {
245 const pool
= new FixedThreadPool(
247 './tests/worker-files/thread/testWorker.js'
251 pool
.emitter
.on(PoolEvents
.busy
, () => ++poolBusy
)
252 for (let i
= 0; i
< numberOfWorkers
* 2; i
++) {
253 promises
.push(pool
.execute())
255 await Promise
.all(promises
)
256 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
257 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
258 expect(poolBusy
).toBe(numberOfWorkers
+ 1)