refactor: cleanup worker choice strategies code
[poolifier.git] / tests / pools / abstract / abstract-pool.test.js
CommitLineData
a61a0724 1const { expect } = require('expect')
e843b904 2const {
70a4f5ea 3 DynamicClusterPool,
9e619829 4 DynamicThreadPool,
aee46736 5 FixedClusterPool,
e843b904 6 FixedThreadPool,
aee46736 7 PoolEvents,
6b27d407 8 WorkerChoiceStrategies,
184855e6
JB
9 PoolTypes,
10 WorkerTypes
cdace0e5 11} = require('../../../lib')
78099a15 12const { CircularArray } = require('../../../lib/circular-array')
29ee7e9a 13const { Queue } = require('../../../lib/queue')
23ccf9d7 14const { version } = require('../../../package.json')
2431bdb4 15const { waitPoolEvents } = require('../../test-utils')
e1ffb94f
JB
16
17describe('Abstract pool test suite', () => {
fc027381 18 const numberOfWorkers = 2
a8884ffd 19 class StubPoolWithIsMain extends FixedThreadPool {
e1ffb94f
JB
20 isMain () {
21 return false
22 }
3ec964d6 23 }
3ec964d6 24
3ec964d6 25 it('Simulate pool creation from a non main thread/process', () => {
8d3782fa
JB
26 expect(
27 () =>
a8884ffd 28 new StubPoolWithIsMain(
7c0ba920 29 numberOfWorkers,
8d3782fa
JB
30 './tests/worker-files/thread/testWorker.js',
31 {
32 errorHandler: e => console.error(e)
33 }
34 )
d4aeae5a 35 ).toThrowError('Cannot start a pool from a worker!')
3ec964d6 36 })
c510fea7
APA
37
38 it('Verify that filePath is checked', () => {
292ad316
JB
39 const expectedError = new Error(
40 'Please specify a file with a worker implementation'
41 )
7c0ba920 42 expect(() => new FixedThreadPool(numberOfWorkers)).toThrowError(
292ad316 43 expectedError
8d3782fa 44 )
7c0ba920 45 expect(() => new FixedThreadPool(numberOfWorkers, '')).toThrowError(
292ad316 46 expectedError
8d3782fa
JB
47 )
48 })
49
50 it('Verify that numberOfWorkers is checked', () => {
51 expect(() => new FixedThreadPool()).toThrowError(
d4aeae5a 52 'Cannot instantiate a pool without specifying the number of workers'
8d3782fa
JB
53 )
54 })
55
56 it('Verify that a negative number of workers is checked', () => {
57 expect(
58 () =>
59 new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.js')
60 ).toThrowError(
473c717a
JB
61 new RangeError(
62 'Cannot instantiate a pool with a negative number of workers'
63 )
8d3782fa
JB
64 )
65 })
66
67 it('Verify that a non integer number of workers is checked', () => {
68 expect(
69 () =>
70 new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.js')
71 ).toThrowError(
473c717a 72 new TypeError(
0d80593b 73 'Cannot instantiate a pool with a non safe integer number of workers'
8d3782fa
JB
74 )
75 )
c510fea7 76 })
7c0ba920 77
216541b6 78 it('Verify that dynamic pool sizing is checked', () => {
2431bdb4
JB
79 expect(
80 () =>
81 new DynamicThreadPool(2, 1, './tests/worker-files/thread/testWorker.js')
82 ).toThrowError(
83 new RangeError(
84 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
85 )
86 )
87 expect(
88 () =>
89 new DynamicThreadPool(1, 1, './tests/worker-files/thread/testWorker.js')
90 ).toThrowError(
91 new RangeError(
92 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
93 )
94 )
21f710aa
JB
95 expect(
96 () =>
97 new DynamicThreadPool(0, 0, './tests/worker-files/thread/testWorker.js')
98 ).toThrowError(
99 new RangeError(
100 'Cannot instantiate a dynamic pool with a minimum pool size and a maximum pool size equal to zero'
101 )
102 )
2431bdb4
JB
103 })
104
fd7ebd49 105 it('Verify that pool options are checked', async () => {
7c0ba920
JB
106 let pool = new FixedThreadPool(
107 numberOfWorkers,
108 './tests/worker-files/thread/testWorker.js'
109 )
7c0ba920 110 expect(pool.emitter).toBeDefined()
1f68cede
JB
111 expect(pool.opts.enableEvents).toBe(true)
112 expect(pool.opts.restartWorkerOnError).toBe(true)
ff733df7 113 expect(pool.opts.enableTasksQueue).toBe(false)
d4aeae5a 114 expect(pool.opts.tasksQueueOptions).toBeUndefined()
e843b904
JB
115 expect(pool.opts.workerChoiceStrategy).toBe(
116 WorkerChoiceStrategies.ROUND_ROBIN
117 )
da309861 118 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
932fc8be 119 runTime: { median: false },
5df69fab
JB
120 waitTime: { median: false },
121 elu: { median: false }
da309861 122 })
35cf1c03
JB
123 expect(pool.opts.messageHandler).toBeUndefined()
124 expect(pool.opts.errorHandler).toBeUndefined()
125 expect(pool.opts.onlineHandler).toBeUndefined()
126 expect(pool.opts.exitHandler).toBeUndefined()
fd7ebd49 127 await pool.destroy()
35cf1c03 128 const testHandler = () => console.log('test handler executed')
7c0ba920
JB
129 pool = new FixedThreadPool(
130 numberOfWorkers,
131 './tests/worker-files/thread/testWorker.js',
132 {
e4543b14 133 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
49be33fe 134 workerChoiceStrategyOptions: {
932fc8be 135 runTime: { median: true },
fc027381 136 weights: { 0: 300, 1: 200 }
49be33fe 137 },
35cf1c03 138 enableEvents: false,
1f68cede 139 restartWorkerOnError: false,
ff733df7 140 enableTasksQueue: true,
d4aeae5a 141 tasksQueueOptions: { concurrency: 2 },
35cf1c03
JB
142 messageHandler: testHandler,
143 errorHandler: testHandler,
144 onlineHandler: testHandler,
145 exitHandler: testHandler
7c0ba920
JB
146 }
147 )
7c0ba920 148 expect(pool.emitter).toBeUndefined()
1f68cede
JB
149 expect(pool.opts.enableEvents).toBe(false)
150 expect(pool.opts.restartWorkerOnError).toBe(false)
ff733df7 151 expect(pool.opts.enableTasksQueue).toBe(true)
d4aeae5a 152 expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 2 })
e843b904 153 expect(pool.opts.workerChoiceStrategy).toBe(
e4543b14 154 WorkerChoiceStrategies.LEAST_USED
e843b904 155 )
da309861 156 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
932fc8be 157 runTime: { median: true },
fc027381 158 weights: { 0: 300, 1: 200 }
da309861 159 })
35cf1c03
JB
160 expect(pool.opts.messageHandler).toStrictEqual(testHandler)
161 expect(pool.opts.errorHandler).toStrictEqual(testHandler)
162 expect(pool.opts.onlineHandler).toStrictEqual(testHandler)
163 expect(pool.opts.exitHandler).toStrictEqual(testHandler)
fd7ebd49 164 await pool.destroy()
7c0ba920
JB
165 })
166
a20f0ba5 167 it('Verify that pool options are validated', async () => {
d4aeae5a
JB
168 expect(
169 () =>
170 new FixedThreadPool(
171 numberOfWorkers,
172 './tests/worker-files/thread/testWorker.js',
173 {
f0d7f803 174 workerChoiceStrategy: 'invalidStrategy'
d4aeae5a
JB
175 }
176 )
f0d7f803 177 ).toThrowError("Invalid worker choice strategy 'invalidStrategy'")
d4aeae5a
JB
178 expect(
179 () =>
180 new FixedThreadPool(
181 numberOfWorkers,
182 './tests/worker-files/thread/testWorker.js',
183 {
f0d7f803 184 workerChoiceStrategyOptions: 'invalidOptions'
d4aeae5a
JB
185 }
186 )
f0d7f803
JB
187 ).toThrowError(
188 'Invalid worker choice strategy options: must be a plain object'
189 )
49be33fe
JB
190 expect(
191 () =>
192 new FixedThreadPool(
193 numberOfWorkers,
194 './tests/worker-files/thread/testWorker.js',
195 {
196 workerChoiceStrategyOptions: { weights: {} }
197 }
198 )
199 ).toThrowError(
200 'Invalid worker choice strategy options: must have a weight for each worker node'
201 )
f0d7f803
JB
202 expect(
203 () =>
204 new FixedThreadPool(
205 numberOfWorkers,
206 './tests/worker-files/thread/testWorker.js',
207 {
208 workerChoiceStrategyOptions: { measurement: 'invalidMeasurement' }
209 }
210 )
211 ).toThrowError(
212 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
213 )
214 expect(
215 () =>
216 new FixedThreadPool(
217 numberOfWorkers,
218 './tests/worker-files/thread/testWorker.js',
219 {
220 enableTasksQueue: true,
221 tasksQueueOptions: { concurrency: 0 }
222 }
223 )
224 ).toThrowError("Invalid worker tasks concurrency '0'")
225 expect(
226 () =>
227 new FixedThreadPool(
228 numberOfWorkers,
229 './tests/worker-files/thread/testWorker.js',
230 {
231 enableTasksQueue: true,
232 tasksQueueOptions: 'invalidTasksQueueOptions'
233 }
234 )
235 ).toThrowError('Invalid tasks queue options: must be a plain object')
236 expect(
237 () =>
238 new FixedThreadPool(
239 numberOfWorkers,
240 './tests/worker-files/thread/testWorker.js',
241 {
242 enableTasksQueue: true,
243 tasksQueueOptions: { concurrency: 0.2 }
244 }
245 )
246 ).toThrowError('Invalid worker tasks concurrency: must be an integer')
d4aeae5a
JB
247 })
248
2431bdb4 249 it('Verify that pool worker choice strategy options can be set', async () => {
a20f0ba5
JB
250 const pool = new FixedThreadPool(
251 numberOfWorkers,
252 './tests/worker-files/thread/testWorker.js',
253 { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
254 )
255 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
932fc8be 256 runTime: { median: false },
5df69fab
JB
257 waitTime: { median: false },
258 elu: { median: false }
a20f0ba5
JB
259 })
260 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
261 .workerChoiceStrategies) {
86bf340d 262 expect(workerChoiceStrategy.opts).toStrictEqual({
932fc8be 263 runTime: { median: false },
5df69fab
JB
264 waitTime: { median: false },
265 elu: { median: false }
86bf340d 266 })
a20f0ba5 267 }
87de9ff5
JB
268 expect(
269 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
270 ).toStrictEqual({
932fc8be
JB
271 runTime: {
272 aggregate: true,
273 average: true,
274 median: false
275 },
276 waitTime: {
277 aggregate: false,
278 average: false,
279 median: false
280 },
5df69fab 281 elu: {
9adcefab
JB
282 aggregate: true,
283 average: true,
5df69fab
JB
284 median: false
285 }
86bf340d 286 })
9adcefab
JB
287 pool.setWorkerChoiceStrategyOptions({
288 runTime: { median: true },
289 elu: { median: true }
290 })
a20f0ba5 291 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
9adcefab
JB
292 runTime: { median: true },
293 elu: { median: true }
a20f0ba5
JB
294 })
295 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
296 .workerChoiceStrategies) {
932fc8be 297 expect(workerChoiceStrategy.opts).toStrictEqual({
9adcefab
JB
298 runTime: { median: true },
299 elu: { median: true }
932fc8be 300 })
a20f0ba5 301 }
87de9ff5
JB
302 expect(
303 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
304 ).toStrictEqual({
932fc8be
JB
305 runTime: {
306 aggregate: true,
307 average: false,
308 median: true
309 },
310 waitTime: {
311 aggregate: false,
312 average: false,
313 median: false
314 },
5df69fab 315 elu: {
9adcefab 316 aggregate: true,
5df69fab 317 average: false,
9adcefab 318 median: true
5df69fab 319 }
86bf340d 320 })
9adcefab
JB
321 pool.setWorkerChoiceStrategyOptions({
322 runTime: { median: false },
323 elu: { median: false }
324 })
a20f0ba5 325 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
9adcefab
JB
326 runTime: { median: false },
327 elu: { median: false }
a20f0ba5
JB
328 })
329 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
330 .workerChoiceStrategies) {
932fc8be 331 expect(workerChoiceStrategy.opts).toStrictEqual({
9adcefab
JB
332 runTime: { median: false },
333 elu: { median: false }
932fc8be 334 })
a20f0ba5 335 }
87de9ff5
JB
336 expect(
337 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
338 ).toStrictEqual({
932fc8be
JB
339 runTime: {
340 aggregate: true,
341 average: true,
342 median: false
343 },
344 waitTime: {
345 aggregate: false,
346 average: false,
347 median: false
348 },
5df69fab 349 elu: {
9adcefab
JB
350 aggregate: true,
351 average: true,
5df69fab
JB
352 median: false
353 }
86bf340d 354 })
1f95d544
JB
355 expect(() =>
356 pool.setWorkerChoiceStrategyOptions('invalidWorkerChoiceStrategyOptions')
357 ).toThrowError(
358 'Invalid worker choice strategy options: must be a plain object'
359 )
360 expect(() =>
361 pool.setWorkerChoiceStrategyOptions({ weights: {} })
362 ).toThrowError(
363 'Invalid worker choice strategy options: must have a weight for each worker node'
364 )
365 expect(() =>
366 pool.setWorkerChoiceStrategyOptions({ measurement: 'invalidMeasurement' })
367 ).toThrowError(
368 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
369 )
a20f0ba5
JB
370 await pool.destroy()
371 })
372
2431bdb4 373 it('Verify that pool tasks queue can be enabled/disabled', async () => {
a20f0ba5
JB
374 const pool = new FixedThreadPool(
375 numberOfWorkers,
376 './tests/worker-files/thread/testWorker.js'
377 )
378 expect(pool.opts.enableTasksQueue).toBe(false)
379 expect(pool.opts.tasksQueueOptions).toBeUndefined()
380 pool.enableTasksQueue(true)
381 expect(pool.opts.enableTasksQueue).toBe(true)
382 expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 1 })
383 pool.enableTasksQueue(true, { concurrency: 2 })
384 expect(pool.opts.enableTasksQueue).toBe(true)
385 expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 2 })
386 pool.enableTasksQueue(false)
387 expect(pool.opts.enableTasksQueue).toBe(false)
388 expect(pool.opts.tasksQueueOptions).toBeUndefined()
389 await pool.destroy()
390 })
391
2431bdb4 392 it('Verify that pool tasks queue options can be set', async () => {
a20f0ba5
JB
393 const pool = new FixedThreadPool(
394 numberOfWorkers,
395 './tests/worker-files/thread/testWorker.js',
396 { enableTasksQueue: true }
397 )
398 expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 1 })
399 pool.setTasksQueueOptions({ concurrency: 2 })
400 expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 2 })
f0d7f803
JB
401 expect(() =>
402 pool.setTasksQueueOptions('invalidTasksQueueOptions')
403 ).toThrowError('Invalid tasks queue options: must be a plain object')
a20f0ba5
JB
404 expect(() => pool.setTasksQueueOptions({ concurrency: 0 })).toThrowError(
405 "Invalid worker tasks concurrency '0'"
406 )
f0d7f803
JB
407 expect(() => pool.setTasksQueueOptions({ concurrency: 0.2 })).toThrowError(
408 'Invalid worker tasks concurrency: must be an integer'
409 )
a20f0ba5
JB
410 await pool.destroy()
411 })
412
6b27d407
JB
413 it('Verify that pool info is set', async () => {
414 let pool = new FixedThreadPool(
415 numberOfWorkers,
416 './tests/worker-files/thread/testWorker.js'
417 )
418 expect(pool.info).toStrictEqual({
23ccf9d7 419 version,
6b27d407 420 type: PoolTypes.fixed,
184855e6 421 worker: WorkerTypes.thread,
2431bdb4
JB
422 ready: false,
423 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
6b27d407
JB
424 minSize: numberOfWorkers,
425 maxSize: numberOfWorkers,
426 workerNodes: numberOfWorkers,
427 idleWorkerNodes: numberOfWorkers,
428 busyWorkerNodes: 0,
a4e07f72
JB
429 executedTasks: 0,
430 executingTasks: 0,
6b27d407 431 queuedTasks: 0,
a4e07f72
JB
432 maxQueuedTasks: 0,
433 failedTasks: 0
6b27d407 434 })
2dca6cab
JB
435 await waitPoolEvents(pool, PoolEvents.ready, 1)
436 expect(pool.info).toStrictEqual({
437 version,
438 type: PoolTypes.fixed,
439 worker: WorkerTypes.thread,
440 ready: true,
441 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
442 minSize: numberOfWorkers,
443 maxSize: numberOfWorkers,
444 workerNodes: numberOfWorkers,
445 idleWorkerNodes: numberOfWorkers,
446 busyWorkerNodes: 0,
447 executedTasks: 0,
448 executingTasks: 0,
449 queuedTasks: 0,
450 maxQueuedTasks: 0,
451 failedTasks: 0
452 })
6b27d407
JB
453 await pool.destroy()
454 pool = new DynamicClusterPool(
2431bdb4 455 Math.floor(numberOfWorkers / 2),
6b27d407 456 numberOfWorkers,
ecdfbdc0 457 './tests/worker-files/cluster/testWorker.js'
6b27d407
JB
458 )
459 expect(pool.info).toStrictEqual({
23ccf9d7 460 version,
6b27d407 461 type: PoolTypes.dynamic,
184855e6 462 worker: WorkerTypes.cluster,
2431bdb4
JB
463 ready: false,
464 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
465 minSize: Math.floor(numberOfWorkers / 2),
466 maxSize: numberOfWorkers,
467 workerNodes: Math.floor(numberOfWorkers / 2),
468 idleWorkerNodes: Math.floor(numberOfWorkers / 2),
6b27d407 469 busyWorkerNodes: 0,
a4e07f72
JB
470 executedTasks: 0,
471 executingTasks: 0,
6b27d407 472 queuedTasks: 0,
a4e07f72
JB
473 maxQueuedTasks: 0,
474 failedTasks: 0
6b27d407 475 })
2dca6cab
JB
476 await waitPoolEvents(pool, PoolEvents.ready, 1)
477 expect(pool.info).toStrictEqual({
478 version,
479 type: PoolTypes.dynamic,
480 worker: WorkerTypes.cluster,
481 ready: true,
482 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
483 minSize: Math.floor(numberOfWorkers / 2),
484 maxSize: numberOfWorkers,
485 workerNodes: Math.floor(numberOfWorkers / 2),
486 idleWorkerNodes: Math.floor(numberOfWorkers / 2),
487 busyWorkerNodes: 0,
488 executedTasks: 0,
489 executingTasks: 0,
490 queuedTasks: 0,
491 maxQueuedTasks: 0,
492 failedTasks: 0
493 })
6b27d407
JB
494 await pool.destroy()
495 })
496
2431bdb4 497 it('Verify that pool worker tasks usage are initialized', async () => {
bf9549ae
JB
498 const pool = new FixedClusterPool(
499 numberOfWorkers,
500 './tests/worker-files/cluster/testWorker.js'
501 )
f06e48d8 502 for (const workerNode of pool.workerNodes) {
465b2940 503 expect(workerNode.usage).toStrictEqual({
a4e07f72
JB
504 tasks: {
505 executed: 0,
506 executing: 0,
507 queued: 0,
df593701 508 maxQueued: 0,
a4e07f72
JB
509 failed: 0
510 },
511 runTime: {
a4e07f72
JB
512 history: expect.any(CircularArray)
513 },
514 waitTime: {
a4e07f72
JB
515 history: expect.any(CircularArray)
516 },
5df69fab
JB
517 elu: {
518 idle: {
5df69fab
JB
519 history: expect.any(CircularArray)
520 },
521 active: {
5df69fab 522 history: expect.any(CircularArray)
f7510105 523 }
5df69fab 524 }
86bf340d 525 })
f06e48d8
JB
526 }
527 await pool.destroy()
528 })
529
2431bdb4
JB
530 it('Verify that pool worker tasks queue are initialized', async () => {
531 let pool = new FixedClusterPool(
f06e48d8
JB
532 numberOfWorkers,
533 './tests/worker-files/cluster/testWorker.js'
534 )
535 for (const workerNode of pool.workerNodes) {
536 expect(workerNode.tasksQueue).toBeDefined()
29ee7e9a 537 expect(workerNode.tasksQueue).toBeInstanceOf(Queue)
4d8bf9e4 538 expect(workerNode.tasksQueue.size).toBe(0)
9c16fb4b 539 expect(workerNode.tasksQueue.maxSize).toBe(0)
bf9549ae 540 }
fd7ebd49 541 await pool.destroy()
2431bdb4
JB
542 pool = new DynamicThreadPool(
543 Math.floor(numberOfWorkers / 2),
544 numberOfWorkers,
545 './tests/worker-files/thread/testWorker.js'
546 )
547 for (const workerNode of pool.workerNodes) {
548 expect(workerNode.tasksQueue).toBeDefined()
549 expect(workerNode.tasksQueue).toBeInstanceOf(Queue)
550 expect(workerNode.tasksQueue.size).toBe(0)
551 expect(workerNode.tasksQueue.maxSize).toBe(0)
552 }
553 })
554
555 it('Verify that pool worker info are initialized', async () => {
556 let pool = new FixedClusterPool(
557 numberOfWorkers,
558 './tests/worker-files/cluster/testWorker.js'
559 )
560 for (const workerNode of pool.workerNodes) {
561 expect(workerNode.info).toStrictEqual({
562 id: expect.any(Number),
563 type: WorkerTypes.cluster,
564 dynamic: false,
565 ready: false
566 })
567 }
2dca6cab
JB
568 await waitPoolEvents(pool, PoolEvents.ready, 1)
569 for (const workerNode of pool.workerNodes) {
570 expect(workerNode.info).toStrictEqual({
571 id: expect.any(Number),
572 type: WorkerTypes.cluster,
573 dynamic: false,
574 ready: true
575 })
576 }
2431bdb4
JB
577 await pool.destroy()
578 pool = new DynamicThreadPool(
579 Math.floor(numberOfWorkers / 2),
580 numberOfWorkers,
581 './tests/worker-files/thread/testWorker.js'
582 )
583 for (const workerNode of pool.workerNodes) {
584 expect(workerNode.info).toStrictEqual({
585 id: expect.any(Number),
586 type: WorkerTypes.thread,
587 dynamic: false,
588 ready: false
589 })
590 }
2dca6cab
JB
591 await waitPoolEvents(pool, PoolEvents.ready, 1)
592 for (const workerNode of pool.workerNodes) {
593 expect(workerNode.info).toStrictEqual({
594 id: expect.any(Number),
595 type: WorkerTypes.thread,
596 dynamic: false,
597 ready: true
598 })
599 }
bf9549ae
JB
600 })
601
2431bdb4 602 it('Verify that pool worker tasks usage are computed', async () => {
bf9549ae
JB
603 const pool = new FixedClusterPool(
604 numberOfWorkers,
605 './tests/worker-files/cluster/testWorker.js'
606 )
09c2d0d3 607 const promises = new Set()
fc027381
JB
608 const maxMultiplier = 2
609 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
09c2d0d3 610 promises.add(pool.execute())
bf9549ae 611 }
f06e48d8 612 for (const workerNode of pool.workerNodes) {
465b2940 613 expect(workerNode.usage).toStrictEqual({
a4e07f72
JB
614 tasks: {
615 executed: 0,
616 executing: maxMultiplier,
617 queued: 0,
df593701 618 maxQueued: 0,
a4e07f72
JB
619 failed: 0
620 },
621 runTime: {
a4e07f72
JB
622 history: expect.any(CircularArray)
623 },
624 waitTime: {
a4e07f72
JB
625 history: expect.any(CircularArray)
626 },
5df69fab
JB
627 elu: {
628 idle: {
5df69fab
JB
629 history: expect.any(CircularArray)
630 },
631 active: {
5df69fab 632 history: expect.any(CircularArray)
f7510105 633 }
5df69fab 634 }
86bf340d 635 })
bf9549ae
JB
636 }
637 await Promise.all(promises)
f06e48d8 638 for (const workerNode of pool.workerNodes) {
465b2940 639 expect(workerNode.usage).toStrictEqual({
a4e07f72
JB
640 tasks: {
641 executed: maxMultiplier,
642 executing: 0,
643 queued: 0,
df593701 644 maxQueued: 0,
a4e07f72
JB
645 failed: 0
646 },
647 runTime: {
a4e07f72
JB
648 history: expect.any(CircularArray)
649 },
650 waitTime: {
a4e07f72
JB
651 history: expect.any(CircularArray)
652 },
5df69fab
JB
653 elu: {
654 idle: {
5df69fab
JB
655 history: expect.any(CircularArray)
656 },
657 active: {
5df69fab 658 history: expect.any(CircularArray)
f7510105 659 }
5df69fab 660 }
86bf340d 661 })
bf9549ae 662 }
fd7ebd49 663 await pool.destroy()
bf9549ae
JB
664 })
665
2431bdb4 666 it('Verify that pool worker tasks usage are reset at worker choice strategy change', async () => {
7fd82a1c 667 const pool = new DynamicThreadPool(
2431bdb4 668 Math.floor(numberOfWorkers / 2),
8f4878b7 669 numberOfWorkers,
9e619829
JB
670 './tests/worker-files/thread/testWorker.js'
671 )
09c2d0d3 672 const promises = new Set()
ee9f5295
JB
673 const maxMultiplier = 2
674 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
09c2d0d3 675 promises.add(pool.execute())
9e619829
JB
676 }
677 await Promise.all(promises)
f06e48d8 678 for (const workerNode of pool.workerNodes) {
465b2940 679 expect(workerNode.usage).toStrictEqual({
a4e07f72
JB
680 tasks: {
681 executed: expect.any(Number),
682 executing: 0,
683 queued: 0,
df593701 684 maxQueued: 0,
a4e07f72
JB
685 failed: 0
686 },
687 runTime: {
a4e07f72
JB
688 history: expect.any(CircularArray)
689 },
690 waitTime: {
a4e07f72
JB
691 history: expect.any(CircularArray)
692 },
5df69fab
JB
693 elu: {
694 idle: {
5df69fab
JB
695 history: expect.any(CircularArray)
696 },
697 active: {
5df69fab 698 history: expect.any(CircularArray)
f7510105 699 }
5df69fab 700 }
86bf340d 701 })
465b2940
JB
702 expect(workerNode.usage.tasks.executed).toBeGreaterThan(0)
703 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(maxMultiplier)
9e619829
JB
704 }
705 pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.FAIR_SHARE)
f06e48d8 706 for (const workerNode of pool.workerNodes) {
465b2940 707 expect(workerNode.usage).toStrictEqual({
a4e07f72
JB
708 tasks: {
709 executed: 0,
710 executing: 0,
711 queued: 0,
df593701 712 maxQueued: 0,
a4e07f72
JB
713 failed: 0
714 },
715 runTime: {
a4e07f72
JB
716 history: expect.any(CircularArray)
717 },
718 waitTime: {
a4e07f72
JB
719 history: expect.any(CircularArray)
720 },
5df69fab
JB
721 elu: {
722 idle: {
5df69fab
JB
723 history: expect.any(CircularArray)
724 },
725 active: {
5df69fab 726 history: expect.any(CircularArray)
f7510105 727 }
5df69fab 728 }
86bf340d 729 })
465b2940
JB
730 expect(workerNode.usage.runTime.history.length).toBe(0)
731 expect(workerNode.usage.waitTime.history.length).toBe(0)
ee11a4a2 732 }
fd7ebd49 733 await pool.destroy()
ee11a4a2
JB
734 })
735
164d950a
JB
736 it("Verify that pool event emitter 'full' event can register a callback", async () => {
737 const pool = new DynamicThreadPool(
2431bdb4 738 Math.floor(numberOfWorkers / 2),
164d950a
JB
739 numberOfWorkers,
740 './tests/worker-files/thread/testWorker.js'
741 )
09c2d0d3 742 const promises = new Set()
164d950a 743 let poolFull = 0
d46660cd
JB
744 let poolInfo
745 pool.emitter.on(PoolEvents.full, info => {
746 ++poolFull
747 poolInfo = info
748 })
164d950a 749 for (let i = 0; i < numberOfWorkers * 2; i++) {
f5d14e90 750 promises.add(pool.execute())
164d950a
JB
751 }
752 await Promise.all(promises)
2431bdb4
JB
753 // The `full` event is triggered when the number of submitted tasks at once reach the maximum number of workers in the dynamic pool.
754 // So in total numberOfWorkers * 2 - 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the dynamic pool with min = (max = numberOfWorkers) / 2.
755 expect(poolFull).toBe(numberOfWorkers * 2 - 1)
d46660cd 756 expect(poolInfo).toStrictEqual({
23ccf9d7 757 version,
d46660cd
JB
758 type: PoolTypes.dynamic,
759 worker: WorkerTypes.thread,
2431bdb4
JB
760 ready: expect.any(Boolean),
761 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
762 minSize: expect.any(Number),
763 maxSize: expect.any(Number),
764 workerNodes: expect.any(Number),
765 idleWorkerNodes: expect.any(Number),
766 busyWorkerNodes: expect.any(Number),
767 executedTasks: expect.any(Number),
768 executingTasks: expect.any(Number),
769 queuedTasks: expect.any(Number),
770 maxQueuedTasks: expect.any(Number),
771 failedTasks: expect.any(Number)
772 })
773 await pool.destroy()
774 })
775
776 it("Verify that pool event emitter 'ready' event can register a callback", async () => {
d5024c00
JB
777 const pool = new DynamicClusterPool(
778 Math.floor(numberOfWorkers / 2),
2431bdb4
JB
779 numberOfWorkers,
780 './tests/worker-files/cluster/testWorker.js'
781 )
782 let poolReady = 0
783 let poolInfo
784 pool.emitter.on(PoolEvents.ready, info => {
785 ++poolReady
786 poolInfo = info
787 })
788 await waitPoolEvents(pool, PoolEvents.ready, 1)
789 expect(poolReady).toBe(1)
790 expect(poolInfo).toStrictEqual({
791 version,
d5024c00 792 type: PoolTypes.dynamic,
2431bdb4
JB
793 worker: WorkerTypes.cluster,
794 ready: true,
795 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
d46660cd
JB
796 minSize: expect.any(Number),
797 maxSize: expect.any(Number),
798 workerNodes: expect.any(Number),
799 idleWorkerNodes: expect.any(Number),
800 busyWorkerNodes: expect.any(Number),
a4e07f72
JB
801 executedTasks: expect.any(Number),
802 executingTasks: expect.any(Number),
d46660cd 803 queuedTasks: expect.any(Number),
a4e07f72
JB
804 maxQueuedTasks: expect.any(Number),
805 failedTasks: expect.any(Number)
d46660cd 806 })
164d950a
JB
807 await pool.destroy()
808 })
809
cf597bc5 810 it("Verify that pool event emitter 'busy' event can register a callback", async () => {
7c0ba920
JB
811 const pool = new FixedThreadPool(
812 numberOfWorkers,
813 './tests/worker-files/thread/testWorker.js'
814 )
09c2d0d3 815 const promises = new Set()
7c0ba920 816 let poolBusy = 0
d46660cd
JB
817 let poolInfo
818 pool.emitter.on(PoolEvents.busy, info => {
819 ++poolBusy
820 poolInfo = info
821 })
7c0ba920 822 for (let i = 0; i < numberOfWorkers * 2; i++) {
f5d14e90 823 promises.add(pool.execute())
7c0ba920 824 }
cf597bc5 825 await Promise.all(promises)
14916bf9
JB
826 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
827 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
828 expect(poolBusy).toBe(numberOfWorkers + 1)
d46660cd 829 expect(poolInfo).toStrictEqual({
23ccf9d7 830 version,
d46660cd
JB
831 type: PoolTypes.fixed,
832 worker: WorkerTypes.thread,
2431bdb4
JB
833 ready: expect.any(Boolean),
834 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
d46660cd
JB
835 minSize: expect.any(Number),
836 maxSize: expect.any(Number),
837 workerNodes: expect.any(Number),
838 idleWorkerNodes: expect.any(Number),
839 busyWorkerNodes: expect.any(Number),
a4e07f72
JB
840 executedTasks: expect.any(Number),
841 executingTasks: expect.any(Number),
d46660cd 842 queuedTasks: expect.any(Number),
a4e07f72
JB
843 maxQueuedTasks: expect.any(Number),
844 failedTasks: expect.any(Number)
d46660cd 845 })
fd7ebd49 846 await pool.destroy()
7c0ba920 847 })
70a4f5ea
JB
848
849 it('Verify that multiple tasks worker is working', async () => {
850 const pool = new DynamicClusterPool(
2431bdb4 851 Math.floor(numberOfWorkers / 2),
70a4f5ea 852 numberOfWorkers,
70a4f5ea
JB
853 './tests/worker-files/cluster/testMultiTasksWorker.js'
854 )
855 const data = { n: 10 }
82888165 856 const result0 = await pool.execute(data)
30b963d4 857 expect(result0).toStrictEqual({ ok: 1 })
70a4f5ea 858 const result1 = await pool.execute(data, 'jsonIntegerSerialization')
30b963d4 859 expect(result1).toStrictEqual({ ok: 1 })
70a4f5ea
JB
860 const result2 = await pool.execute(data, 'factorial')
861 expect(result2).toBe(3628800)
862 const result3 = await pool.execute(data, 'fibonacci')
024daf59 863 expect(result3).toBe(55)
70a4f5ea 864 })
3ec964d6 865})