feat: use O(1) queue implementation
[poolifier.git] / tests / pools / abstract / abstract-pool.test.js
1 const { expect } = require('expect')
2 const {
3 DynamicThreadPool,
4 FixedClusterPool,
5 FixedThreadPool,
6 PoolEvents,
7 WorkerChoiceStrategies
8 } = require('../../../lib')
9 const { CircularArray } = require('../../../lib/circular-array')
10 const { Queue } = require('../../../lib/queue')
11
12 describe('Abstract pool test suite', () => {
13 const numberOfWorkers = 1
14 const workerNotFoundInPoolError = new Error(
15 'Worker could not be found in the pool worker nodes'
16 )
17 class StubPoolWithRemoveAllWorker extends FixedThreadPool {
18 removeAllWorker () {
19 this.workerNodes = []
20 this.promiseResponseMap.clear()
21 }
22 }
23 class StubPoolWithIsMain extends FixedThreadPool {
24 isMain () {
25 return false
26 }
27 }
28
29 it('Simulate pool creation from a non main thread/process', () => {
30 expect(
31 () =>
32 new StubPoolWithIsMain(
33 numberOfWorkers,
34 './tests/worker-files/thread/testWorker.js',
35 {
36 errorHandler: e => console.error(e)
37 }
38 )
39 ).toThrowError('Cannot start a pool from a worker!')
40 })
41
42 it('Verify that filePath is checked', () => {
43 const expectedError = new Error(
44 'Please specify a file with a worker implementation'
45 )
46 expect(() => new FixedThreadPool(numberOfWorkers)).toThrowError(
47 expectedError
48 )
49 expect(() => new FixedThreadPool(numberOfWorkers, '')).toThrowError(
50 expectedError
51 )
52 })
53
54 it('Verify that numberOfWorkers is checked', () => {
55 expect(() => new FixedThreadPool()).toThrowError(
56 'Cannot instantiate a pool without specifying the number of workers'
57 )
58 })
59
60 it('Verify that a negative number of workers is checked', () => {
61 expect(
62 () =>
63 new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.js')
64 ).toThrowError(
65 new RangeError(
66 'Cannot instantiate a pool with a negative number of workers'
67 )
68 )
69 })
70
71 it('Verify that a non integer number of workers is checked', () => {
72 expect(
73 () =>
74 new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.js')
75 ).toThrowError(
76 new TypeError(
77 'Cannot instantiate a pool with a non integer number of workers'
78 )
79 )
80 })
81
82 it('Verify that pool options are checked', async () => {
83 let pool = new FixedThreadPool(
84 numberOfWorkers,
85 './tests/worker-files/thread/testWorker.js'
86 )
87 expect(pool.opts.enableEvents).toBe(true)
88 expect(pool.emitter).toBeDefined()
89 expect(pool.opts.enableTasksQueue).toBe(false)
90 expect(pool.opts.tasksQueueOptions).toBeUndefined()
91 expect(pool.opts.workerChoiceStrategy).toBe(
92 WorkerChoiceStrategies.ROUND_ROBIN
93 )
94 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
95 medRunTime: false
96 })
97 expect(pool.opts.messageHandler).toBeUndefined()
98 expect(pool.opts.errorHandler).toBeUndefined()
99 expect(pool.opts.onlineHandler).toBeUndefined()
100 expect(pool.opts.exitHandler).toBeUndefined()
101 await pool.destroy()
102 const testHandler = () => console.log('test handler executed')
103 pool = new FixedThreadPool(
104 numberOfWorkers,
105 './tests/worker-files/thread/testWorker.js',
106 {
107 workerChoiceStrategy: WorkerChoiceStrategies.LESS_USED,
108 workerChoiceStrategyOptions: { medRunTime: true },
109 enableEvents: false,
110 enableTasksQueue: true,
111 tasksQueueOptions: { concurrency: 2 },
112 messageHandler: testHandler,
113 errorHandler: testHandler,
114 onlineHandler: testHandler,
115 exitHandler: testHandler
116 }
117 )
118 expect(pool.opts.enableEvents).toBe(false)
119 expect(pool.emitter).toBeUndefined()
120 expect(pool.opts.enableTasksQueue).toBe(true)
121 expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 2 })
122 expect(pool.opts.workerChoiceStrategy).toBe(
123 WorkerChoiceStrategies.LESS_USED
124 )
125 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
126 medRunTime: true
127 })
128 expect(pool.opts.messageHandler).toStrictEqual(testHandler)
129 expect(pool.opts.errorHandler).toStrictEqual(testHandler)
130 expect(pool.opts.onlineHandler).toStrictEqual(testHandler)
131 expect(pool.opts.exitHandler).toStrictEqual(testHandler)
132 await pool.destroy()
133 })
134
135 it('Verify that pool options are validated', async () => {
136 expect(
137 () =>
138 new FixedThreadPool(
139 numberOfWorkers,
140 './tests/worker-files/thread/testWorker.js',
141 {
142 enableTasksQueue: true,
143 tasksQueueOptions: { concurrency: 0 }
144 }
145 )
146 ).toThrowError("Invalid worker tasks concurrency '0'")
147 expect(
148 () =>
149 new FixedThreadPool(
150 numberOfWorkers,
151 './tests/worker-files/thread/testWorker.js',
152 {
153 workerChoiceStrategy: 'invalidStrategy'
154 }
155 )
156 ).toThrowError("Invalid worker choice strategy 'invalidStrategy'")
157 })
158
159 it('Verify that worker choice strategy options can be set', async () => {
160 const pool = new FixedThreadPool(
161 numberOfWorkers,
162 './tests/worker-files/thread/testWorker.js',
163 { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
164 )
165 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
166 medRunTime: false
167 })
168 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
169 .workerChoiceStrategies) {
170 expect(workerChoiceStrategy.opts).toStrictEqual({ medRunTime: false })
171 }
172 expect(
173 pool.workerChoiceStrategyContext.getRequiredStatistics().avgRunTime
174 ).toBe(true)
175 expect(
176 pool.workerChoiceStrategyContext.getRequiredStatistics().medRunTime
177 ).toBe(false)
178 pool.setWorkerChoiceStrategyOptions({ medRunTime: true })
179 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
180 medRunTime: true
181 })
182 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
183 .workerChoiceStrategies) {
184 expect(workerChoiceStrategy.opts).toStrictEqual({ medRunTime: true })
185 }
186 expect(
187 pool.workerChoiceStrategyContext.getRequiredStatistics().avgRunTime
188 ).toBe(false)
189 expect(
190 pool.workerChoiceStrategyContext.getRequiredStatistics().medRunTime
191 ).toBe(true)
192 pool.setWorkerChoiceStrategyOptions({ medRunTime: false })
193 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
194 medRunTime: false
195 })
196 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
197 .workerChoiceStrategies) {
198 expect(workerChoiceStrategy.opts).toStrictEqual({ medRunTime: false })
199 }
200 expect(
201 pool.workerChoiceStrategyContext.getRequiredStatistics().avgRunTime
202 ).toBe(true)
203 expect(
204 pool.workerChoiceStrategyContext.getRequiredStatistics().medRunTime
205 ).toBe(false)
206 await pool.destroy()
207 })
208
209 it('Verify that tasks queue can be enabled/disabled', async () => {
210 const pool = new FixedThreadPool(
211 numberOfWorkers,
212 './tests/worker-files/thread/testWorker.js'
213 )
214 expect(pool.opts.enableTasksQueue).toBe(false)
215 expect(pool.opts.tasksQueueOptions).toBeUndefined()
216 pool.enableTasksQueue(true)
217 expect(pool.opts.enableTasksQueue).toBe(true)
218 expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 1 })
219 pool.enableTasksQueue(true, { concurrency: 2 })
220 expect(pool.opts.enableTasksQueue).toBe(true)
221 expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 2 })
222 pool.enableTasksQueue(false)
223 expect(pool.opts.enableTasksQueue).toBe(false)
224 expect(pool.opts.tasksQueueOptions).toBeUndefined()
225 await pool.destroy()
226 })
227
228 it('Verify that tasks queue options can be set', async () => {
229 const pool = new FixedThreadPool(
230 numberOfWorkers,
231 './tests/worker-files/thread/testWorker.js',
232 { enableTasksQueue: true }
233 )
234 expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 1 })
235 pool.setTasksQueueOptions({ concurrency: 2 })
236 expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 2 })
237 expect(() => pool.setTasksQueueOptions({ concurrency: 0 })).toThrowError(
238 "Invalid worker tasks concurrency '0'"
239 )
240 await pool.destroy()
241 })
242
243 it('Simulate worker not found at getWorkerTasksUsage()', async () => {
244 const pool = new StubPoolWithRemoveAllWorker(
245 numberOfWorkers,
246 './tests/worker-files/cluster/testWorker.js',
247 {
248 errorHandler: e => console.error(e)
249 }
250 )
251 expect(pool.workerNodes.length).toBe(numberOfWorkers)
252 // Simulate worker not found.
253 pool.removeAllWorker()
254 expect(pool.workerNodes.length).toBe(0)
255 expect(() => pool.getWorkerTasksUsage()).toThrowError(
256 workerNotFoundInPoolError
257 )
258 await pool.destroy()
259 })
260
261 it('Verify that worker pool tasks usage are initialized', async () => {
262 const pool = new FixedClusterPool(
263 numberOfWorkers,
264 './tests/worker-files/cluster/testWorker.js'
265 )
266 for (const workerNode of pool.workerNodes) {
267 expect(workerNode.tasksUsage).toBeDefined()
268 expect(workerNode.tasksUsage.run).toBe(0)
269 expect(workerNode.tasksUsage.running).toBe(0)
270 expect(workerNode.tasksUsage.runTime).toBe(0)
271 expect(workerNode.tasksUsage.runTimeHistory).toBeInstanceOf(CircularArray)
272 expect(workerNode.tasksUsage.runTimeHistory.length).toBe(0)
273 expect(workerNode.tasksUsage.avgRunTime).toBe(0)
274 expect(workerNode.tasksUsage.medRunTime).toBe(0)
275 expect(workerNode.tasksUsage.error).toBe(0)
276 }
277 await pool.destroy()
278 })
279
280 it('Verify that worker pool tasks queue are initialized', async () => {
281 const pool = new FixedClusterPool(
282 numberOfWorkers,
283 './tests/worker-files/cluster/testWorker.js'
284 )
285 for (const workerNode of pool.workerNodes) {
286 expect(workerNode.tasksQueue).toBeDefined()
287 expect(workerNode.tasksQueue).toBeInstanceOf(Queue)
288 expect(workerNode.tasksQueue.size()).toBe(0)
289 }
290 await pool.destroy()
291 })
292
293 it('Verify that worker pool tasks usage are computed', async () => {
294 const pool = new FixedClusterPool(
295 numberOfWorkers,
296 './tests/worker-files/cluster/testWorker.js'
297 )
298 const promises = []
299 for (let i = 0; i < numberOfWorkers * 2; i++) {
300 promises.push(pool.execute())
301 }
302 for (const workerNode of pool.workerNodes) {
303 expect(workerNode.tasksUsage).toBeDefined()
304 expect(workerNode.tasksUsage.run).toBe(0)
305 expect(workerNode.tasksUsage.running).toBe(numberOfWorkers * 2)
306 expect(workerNode.tasksUsage.runTime).toBe(0)
307 expect(workerNode.tasksUsage.runTimeHistory).toBeInstanceOf(CircularArray)
308 expect(workerNode.tasksUsage.runTimeHistory.length).toBe(0)
309 expect(workerNode.tasksUsage.avgRunTime).toBe(0)
310 expect(workerNode.tasksUsage.medRunTime).toBe(0)
311 expect(workerNode.tasksUsage.error).toBe(0)
312 }
313 await Promise.all(promises)
314 for (const workerNode of pool.workerNodes) {
315 expect(workerNode.tasksUsage).toBeDefined()
316 expect(workerNode.tasksUsage.run).toBe(numberOfWorkers * 2)
317 expect(workerNode.tasksUsage.running).toBe(0)
318 expect(workerNode.tasksUsage.runTime).toBeGreaterThanOrEqual(0)
319 expect(workerNode.tasksUsage.runTimeHistory).toBeInstanceOf(CircularArray)
320 expect(workerNode.tasksUsage.runTimeHistory.length).toBe(0)
321 expect(workerNode.tasksUsage.avgRunTime).toBeGreaterThanOrEqual(0)
322 expect(workerNode.tasksUsage.medRunTime).toBe(0)
323 expect(workerNode.tasksUsage.error).toBe(0)
324 }
325 await pool.destroy()
326 })
327
328 it('Verify that worker pool tasks usage are reset at worker choice strategy change', async () => {
329 const pool = new DynamicThreadPool(
330 numberOfWorkers,
331 numberOfWorkers,
332 './tests/worker-files/thread/testWorker.js'
333 )
334 const promises = []
335 for (let i = 0; i < numberOfWorkers * 2; i++) {
336 promises.push(pool.execute())
337 }
338 await Promise.all(promises)
339 for (const workerNode of pool.workerNodes) {
340 expect(workerNode.tasksUsage).toBeDefined()
341 expect(workerNode.tasksUsage.run).toBe(numberOfWorkers * 2)
342 expect(workerNode.tasksUsage.running).toBe(0)
343 expect(workerNode.tasksUsage.runTime).toBeGreaterThanOrEqual(0)
344 expect(workerNode.tasksUsage.runTimeHistory).toBeInstanceOf(CircularArray)
345 expect(workerNode.tasksUsage.runTimeHistory.length).toBe(0)
346 expect(workerNode.tasksUsage.avgRunTime).toBeGreaterThanOrEqual(0)
347 expect(workerNode.tasksUsage.medRunTime).toBe(0)
348 expect(workerNode.tasksUsage.error).toBe(0)
349 }
350 pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.FAIR_SHARE)
351 for (const workerNode of pool.workerNodes) {
352 expect(workerNode.tasksUsage).toBeDefined()
353 expect(workerNode.tasksUsage.run).toBe(0)
354 expect(workerNode.tasksUsage.running).toBe(0)
355 expect(workerNode.tasksUsage.runTime).toBe(0)
356 expect(workerNode.tasksUsage.runTimeHistory).toBeInstanceOf(CircularArray)
357 expect(workerNode.tasksUsage.runTimeHistory.length).toBe(0)
358 expect(workerNode.tasksUsage.avgRunTime).toBe(0)
359 expect(workerNode.tasksUsage.medRunTime).toBe(0)
360 expect(workerNode.tasksUsage.error).toBe(0)
361 }
362 await pool.destroy()
363 })
364
365 it("Verify that pool event emitter 'full' event can register a callback", async () => {
366 const pool = new DynamicThreadPool(
367 numberOfWorkers,
368 numberOfWorkers,
369 './tests/worker-files/thread/testWorker.js'
370 )
371 const promises = []
372 let poolFull = 0
373 pool.emitter.on(PoolEvents.full, () => ++poolFull)
374 for (let i = 0; i < numberOfWorkers * 2; i++) {
375 promises.push(pool.execute())
376 }
377 await Promise.all(promises)
378 // The `full` event is triggered when the number of submitted tasks at once reach the max number of workers in the dynamic pool.
379 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the dynamic pool.
380 expect(poolFull).toBe(numberOfWorkers + 1)
381 await pool.destroy()
382 })
383
384 it("Verify that pool event emitter 'busy' event can register a callback", async () => {
385 const pool = new FixedThreadPool(
386 numberOfWorkers,
387 './tests/worker-files/thread/testWorker.js'
388 )
389 const promises = []
390 let poolBusy = 0
391 pool.emitter.on(PoolEvents.busy, () => ++poolBusy)
392 for (let i = 0; i < numberOfWorkers * 2; i++) {
393 promises.push(pool.execute())
394 }
395 await Promise.all(promises)
396 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
397 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
398 expect(poolBusy).toBe(numberOfWorkers + 1)
399 await pool.destroy()
400 })
401 })