feat: add pool and worker readyness tracking infrastructure
[poolifier.git] / tests / pools / abstract / abstract-pool.test.js
CommitLineData
a61a0724 1const { expect } = require('expect')
e843b904 2const {
70a4f5ea 3 DynamicClusterPool,
9e619829 4 DynamicThreadPool,
aee46736 5 FixedClusterPool,
e843b904 6 FixedThreadPool,
aee46736 7 PoolEvents,
6b27d407 8 WorkerChoiceStrategies,
184855e6
JB
9 PoolTypes,
10 WorkerTypes
cdace0e5 11} = require('../../../lib')
78099a15 12const { CircularArray } = require('../../../lib/circular-array')
29ee7e9a 13const { Queue } = require('../../../lib/queue')
23ccf9d7 14const { version } = require('../../../package.json')
2431bdb4 15const { waitPoolEvents } = require('../../test-utils')
e1ffb94f
JB
16
17describe('Abstract pool test suite', () => {
fc027381 18 const numberOfWorkers = 2
a8884ffd 19 class StubPoolWithRemoveAllWorker extends FixedThreadPool {
e1ffb94f 20 removeAllWorker () {
d4aeae5a 21 this.workerNodes = []
c923ce56 22 this.promiseResponseMap.clear()
2431bdb4 23 this.handleWorkerReadyMessage = () => {}
e1ffb94f 24 }
3ec964d6 25 }
a8884ffd 26 class StubPoolWithIsMain extends FixedThreadPool {
e1ffb94f
JB
27 isMain () {
28 return false
29 }
3ec964d6 30 }
3ec964d6 31
3ec964d6 32 it('Simulate pool creation from a non main thread/process', () => {
8d3782fa
JB
33 expect(
34 () =>
a8884ffd 35 new StubPoolWithIsMain(
7c0ba920 36 numberOfWorkers,
8d3782fa
JB
37 './tests/worker-files/thread/testWorker.js',
38 {
39 errorHandler: e => console.error(e)
40 }
41 )
d4aeae5a 42 ).toThrowError('Cannot start a pool from a worker!')
3ec964d6 43 })
c510fea7
APA
44
45 it('Verify that filePath is checked', () => {
292ad316
JB
46 const expectedError = new Error(
47 'Please specify a file with a worker implementation'
48 )
7c0ba920 49 expect(() => new FixedThreadPool(numberOfWorkers)).toThrowError(
292ad316 50 expectedError
8d3782fa 51 )
7c0ba920 52 expect(() => new FixedThreadPool(numberOfWorkers, '')).toThrowError(
292ad316 53 expectedError
8d3782fa
JB
54 )
55 })
56
57 it('Verify that numberOfWorkers is checked', () => {
58 expect(() => new FixedThreadPool()).toThrowError(
d4aeae5a 59 'Cannot instantiate a pool without specifying the number of workers'
8d3782fa
JB
60 )
61 })
62
63 it('Verify that a negative number of workers is checked', () => {
64 expect(
65 () =>
66 new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.js')
67 ).toThrowError(
473c717a
JB
68 new RangeError(
69 'Cannot instantiate a pool with a negative number of workers'
70 )
8d3782fa
JB
71 )
72 })
73
74 it('Verify that a non integer number of workers is checked', () => {
75 expect(
76 () =>
77 new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.js')
78 ).toThrowError(
473c717a 79 new TypeError(
0d80593b 80 'Cannot instantiate a pool with a non safe integer number of workers'
8d3782fa
JB
81 )
82 )
c510fea7 83 })
7c0ba920 84
2431bdb4
JB
85 it('Verify dynamic pool sizing', () => {
86 expect(
87 () =>
88 new DynamicThreadPool(2, 1, './tests/worker-files/thread/testWorker.js')
89 ).toThrowError(
90 new RangeError(
91 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
92 )
93 )
94 expect(
95 () =>
96 new DynamicThreadPool(1, 1, './tests/worker-files/thread/testWorker.js')
97 ).toThrowError(
98 new RangeError(
99 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
100 )
101 )
102 })
103
fd7ebd49 104 it('Verify that pool options are checked', async () => {
7c0ba920
JB
105 let pool = new FixedThreadPool(
106 numberOfWorkers,
107 './tests/worker-files/thread/testWorker.js'
108 )
7c0ba920 109 expect(pool.emitter).toBeDefined()
1f68cede
JB
110 expect(pool.opts.enableEvents).toBe(true)
111 expect(pool.opts.restartWorkerOnError).toBe(true)
ff733df7 112 expect(pool.opts.enableTasksQueue).toBe(false)
d4aeae5a 113 expect(pool.opts.tasksQueueOptions).toBeUndefined()
e843b904
JB
114 expect(pool.opts.workerChoiceStrategy).toBe(
115 WorkerChoiceStrategies.ROUND_ROBIN
116 )
da309861 117 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
932fc8be 118 runTime: { median: false },
5df69fab
JB
119 waitTime: { median: false },
120 elu: { median: false }
da309861 121 })
35cf1c03
JB
122 expect(pool.opts.messageHandler).toBeUndefined()
123 expect(pool.opts.errorHandler).toBeUndefined()
124 expect(pool.opts.onlineHandler).toBeUndefined()
125 expect(pool.opts.exitHandler).toBeUndefined()
fd7ebd49 126 await pool.destroy()
35cf1c03 127 const testHandler = () => console.log('test handler executed')
7c0ba920
JB
128 pool = new FixedThreadPool(
129 numberOfWorkers,
130 './tests/worker-files/thread/testWorker.js',
131 {
e4543b14 132 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
49be33fe 133 workerChoiceStrategyOptions: {
932fc8be 134 runTime: { median: true },
fc027381 135 weights: { 0: 300, 1: 200 }
49be33fe 136 },
35cf1c03 137 enableEvents: false,
1f68cede 138 restartWorkerOnError: false,
ff733df7 139 enableTasksQueue: true,
d4aeae5a 140 tasksQueueOptions: { concurrency: 2 },
35cf1c03
JB
141 messageHandler: testHandler,
142 errorHandler: testHandler,
143 onlineHandler: testHandler,
144 exitHandler: testHandler
7c0ba920
JB
145 }
146 )
7c0ba920 147 expect(pool.emitter).toBeUndefined()
1f68cede
JB
148 expect(pool.opts.enableEvents).toBe(false)
149 expect(pool.opts.restartWorkerOnError).toBe(false)
ff733df7 150 expect(pool.opts.enableTasksQueue).toBe(true)
d4aeae5a 151 expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 2 })
e843b904 152 expect(pool.opts.workerChoiceStrategy).toBe(
e4543b14 153 WorkerChoiceStrategies.LEAST_USED
e843b904 154 )
da309861 155 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
932fc8be 156 runTime: { median: true },
fc027381 157 weights: { 0: 300, 1: 200 }
da309861 158 })
35cf1c03
JB
159 expect(pool.opts.messageHandler).toStrictEqual(testHandler)
160 expect(pool.opts.errorHandler).toStrictEqual(testHandler)
161 expect(pool.opts.onlineHandler).toStrictEqual(testHandler)
162 expect(pool.opts.exitHandler).toStrictEqual(testHandler)
fd7ebd49 163 await pool.destroy()
7c0ba920
JB
164 })
165
a20f0ba5 166 it('Verify that pool options are validated', async () => {
d4aeae5a
JB
167 expect(
168 () =>
169 new FixedThreadPool(
170 numberOfWorkers,
171 './tests/worker-files/thread/testWorker.js',
172 {
f0d7f803 173 workerChoiceStrategy: 'invalidStrategy'
d4aeae5a
JB
174 }
175 )
f0d7f803 176 ).toThrowError("Invalid worker choice strategy 'invalidStrategy'")
d4aeae5a
JB
177 expect(
178 () =>
179 new FixedThreadPool(
180 numberOfWorkers,
181 './tests/worker-files/thread/testWorker.js',
182 {
f0d7f803 183 workerChoiceStrategyOptions: 'invalidOptions'
d4aeae5a
JB
184 }
185 )
f0d7f803
JB
186 ).toThrowError(
187 'Invalid worker choice strategy options: must be a plain object'
188 )
49be33fe
JB
189 expect(
190 () =>
191 new FixedThreadPool(
192 numberOfWorkers,
193 './tests/worker-files/thread/testWorker.js',
194 {
195 workerChoiceStrategyOptions: { weights: {} }
196 }
197 )
198 ).toThrowError(
199 'Invalid worker choice strategy options: must have a weight for each worker node'
200 )
f0d7f803
JB
201 expect(
202 () =>
203 new FixedThreadPool(
204 numberOfWorkers,
205 './tests/worker-files/thread/testWorker.js',
206 {
207 workerChoiceStrategyOptions: { measurement: 'invalidMeasurement' }
208 }
209 )
210 ).toThrowError(
211 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
212 )
213 expect(
214 () =>
215 new FixedThreadPool(
216 numberOfWorkers,
217 './tests/worker-files/thread/testWorker.js',
218 {
219 enableTasksQueue: true,
220 tasksQueueOptions: { concurrency: 0 }
221 }
222 )
223 ).toThrowError("Invalid worker tasks concurrency '0'")
224 expect(
225 () =>
226 new FixedThreadPool(
227 numberOfWorkers,
228 './tests/worker-files/thread/testWorker.js',
229 {
230 enableTasksQueue: true,
231 tasksQueueOptions: 'invalidTasksQueueOptions'
232 }
233 )
234 ).toThrowError('Invalid tasks queue options: must be a plain object')
235 expect(
236 () =>
237 new FixedThreadPool(
238 numberOfWorkers,
239 './tests/worker-files/thread/testWorker.js',
240 {
241 enableTasksQueue: true,
242 tasksQueueOptions: { concurrency: 0.2 }
243 }
244 )
245 ).toThrowError('Invalid worker tasks concurrency: must be an integer')
d4aeae5a
JB
246 })
247
2431bdb4 248 it('Verify that pool worker choice strategy options can be set', async () => {
a20f0ba5
JB
249 const pool = new FixedThreadPool(
250 numberOfWorkers,
251 './tests/worker-files/thread/testWorker.js',
252 { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
253 )
254 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
932fc8be 255 runTime: { median: false },
5df69fab
JB
256 waitTime: { median: false },
257 elu: { median: false }
a20f0ba5
JB
258 })
259 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
260 .workerChoiceStrategies) {
86bf340d 261 expect(workerChoiceStrategy.opts).toStrictEqual({
932fc8be 262 runTime: { median: false },
5df69fab
JB
263 waitTime: { median: false },
264 elu: { median: false }
86bf340d 265 })
a20f0ba5 266 }
87de9ff5
JB
267 expect(
268 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
269 ).toStrictEqual({
932fc8be
JB
270 runTime: {
271 aggregate: true,
272 average: true,
273 median: false
274 },
275 waitTime: {
276 aggregate: false,
277 average: false,
278 median: false
279 },
5df69fab 280 elu: {
9adcefab
JB
281 aggregate: true,
282 average: true,
5df69fab
JB
283 median: false
284 }
86bf340d 285 })
9adcefab
JB
286 pool.setWorkerChoiceStrategyOptions({
287 runTime: { median: true },
288 elu: { median: true }
289 })
a20f0ba5 290 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
9adcefab
JB
291 runTime: { median: true },
292 elu: { median: true }
a20f0ba5
JB
293 })
294 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
295 .workerChoiceStrategies) {
932fc8be 296 expect(workerChoiceStrategy.opts).toStrictEqual({
9adcefab
JB
297 runTime: { median: true },
298 elu: { median: true }
932fc8be 299 })
a20f0ba5 300 }
87de9ff5
JB
301 expect(
302 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
303 ).toStrictEqual({
932fc8be
JB
304 runTime: {
305 aggregate: true,
306 average: false,
307 median: true
308 },
309 waitTime: {
310 aggregate: false,
311 average: false,
312 median: false
313 },
5df69fab 314 elu: {
9adcefab 315 aggregate: true,
5df69fab 316 average: false,
9adcefab 317 median: true
5df69fab 318 }
86bf340d 319 })
9adcefab
JB
320 pool.setWorkerChoiceStrategyOptions({
321 runTime: { median: false },
322 elu: { median: false }
323 })
a20f0ba5 324 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
9adcefab
JB
325 runTime: { median: false },
326 elu: { median: false }
a20f0ba5
JB
327 })
328 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
329 .workerChoiceStrategies) {
932fc8be 330 expect(workerChoiceStrategy.opts).toStrictEqual({
9adcefab
JB
331 runTime: { median: false },
332 elu: { median: false }
932fc8be 333 })
a20f0ba5 334 }
87de9ff5
JB
335 expect(
336 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
337 ).toStrictEqual({
932fc8be
JB
338 runTime: {
339 aggregate: true,
340 average: true,
341 median: false
342 },
343 waitTime: {
344 aggregate: false,
345 average: false,
346 median: false
347 },
5df69fab 348 elu: {
9adcefab
JB
349 aggregate: true,
350 average: true,
5df69fab
JB
351 median: false
352 }
86bf340d 353 })
1f95d544
JB
354 expect(() =>
355 pool.setWorkerChoiceStrategyOptions('invalidWorkerChoiceStrategyOptions')
356 ).toThrowError(
357 'Invalid worker choice strategy options: must be a plain object'
358 )
359 expect(() =>
360 pool.setWorkerChoiceStrategyOptions({ weights: {} })
361 ).toThrowError(
362 'Invalid worker choice strategy options: must have a weight for each worker node'
363 )
364 expect(() =>
365 pool.setWorkerChoiceStrategyOptions({ measurement: 'invalidMeasurement' })
366 ).toThrowError(
367 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
368 )
a20f0ba5
JB
369 await pool.destroy()
370 })
371
2431bdb4 372 it('Verify that pool tasks queue can be enabled/disabled', async () => {
a20f0ba5
JB
373 const pool = new FixedThreadPool(
374 numberOfWorkers,
375 './tests/worker-files/thread/testWorker.js'
376 )
377 expect(pool.opts.enableTasksQueue).toBe(false)
378 expect(pool.opts.tasksQueueOptions).toBeUndefined()
379 pool.enableTasksQueue(true)
380 expect(pool.opts.enableTasksQueue).toBe(true)
381 expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 1 })
382 pool.enableTasksQueue(true, { concurrency: 2 })
383 expect(pool.opts.enableTasksQueue).toBe(true)
384 expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 2 })
385 pool.enableTasksQueue(false)
386 expect(pool.opts.enableTasksQueue).toBe(false)
387 expect(pool.opts.tasksQueueOptions).toBeUndefined()
388 await pool.destroy()
389 })
390
2431bdb4 391 it('Verify that pool tasks queue options can be set', async () => {
a20f0ba5
JB
392 const pool = new FixedThreadPool(
393 numberOfWorkers,
394 './tests/worker-files/thread/testWorker.js',
395 { enableTasksQueue: true }
396 )
397 expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 1 })
398 pool.setTasksQueueOptions({ concurrency: 2 })
399 expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 2 })
f0d7f803
JB
400 expect(() =>
401 pool.setTasksQueueOptions('invalidTasksQueueOptions')
402 ).toThrowError('Invalid tasks queue options: must be a plain object')
a20f0ba5
JB
403 expect(() => pool.setTasksQueueOptions({ concurrency: 0 })).toThrowError(
404 "Invalid worker tasks concurrency '0'"
405 )
f0d7f803
JB
406 expect(() => pool.setTasksQueueOptions({ concurrency: 0.2 })).toThrowError(
407 'Invalid worker tasks concurrency: must be an integer'
408 )
a20f0ba5
JB
409 await pool.destroy()
410 })
411
6b27d407
JB
412 it('Verify that pool info is set', async () => {
413 let pool = new FixedThreadPool(
414 numberOfWorkers,
415 './tests/worker-files/thread/testWorker.js'
416 )
417 expect(pool.info).toStrictEqual({
23ccf9d7 418 version,
6b27d407 419 type: PoolTypes.fixed,
184855e6 420 worker: WorkerTypes.thread,
2431bdb4
JB
421 ready: false,
422 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
6b27d407
JB
423 minSize: numberOfWorkers,
424 maxSize: numberOfWorkers,
425 workerNodes: numberOfWorkers,
426 idleWorkerNodes: numberOfWorkers,
427 busyWorkerNodes: 0,
a4e07f72
JB
428 executedTasks: 0,
429 executingTasks: 0,
6b27d407 430 queuedTasks: 0,
a4e07f72
JB
431 maxQueuedTasks: 0,
432 failedTasks: 0
6b27d407
JB
433 })
434 await pool.destroy()
435 pool = new DynamicClusterPool(
2431bdb4 436 Math.floor(numberOfWorkers / 2),
6b27d407 437 numberOfWorkers,
ecdfbdc0 438 './tests/worker-files/cluster/testWorker.js'
6b27d407
JB
439 )
440 expect(pool.info).toStrictEqual({
23ccf9d7 441 version,
6b27d407 442 type: PoolTypes.dynamic,
184855e6 443 worker: WorkerTypes.cluster,
2431bdb4
JB
444 ready: false,
445 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
446 minSize: Math.floor(numberOfWorkers / 2),
447 maxSize: numberOfWorkers,
448 workerNodes: Math.floor(numberOfWorkers / 2),
449 idleWorkerNodes: Math.floor(numberOfWorkers / 2),
6b27d407 450 busyWorkerNodes: 0,
a4e07f72
JB
451 executedTasks: 0,
452 executingTasks: 0,
6b27d407 453 queuedTasks: 0,
a4e07f72
JB
454 maxQueuedTasks: 0,
455 failedTasks: 0
6b27d407
JB
456 })
457 await pool.destroy()
458 })
459
9d16d33e 460 it('Simulate worker not found', async () => {
a8884ffd 461 const pool = new StubPoolWithRemoveAllWorker(
10fcfaf4 462 numberOfWorkers,
465b2940 463 './tests/worker-files/thread/testWorker.js',
10fcfaf4 464 {
10fcfaf4
JB
465 errorHandler: e => console.error(e)
466 }
467 )
d4aeae5a 468 expect(pool.workerNodes.length).toBe(numberOfWorkers)
10fcfaf4
JB
469 // Simulate worker not found.
470 pool.removeAllWorker()
d4aeae5a 471 expect(pool.workerNodes.length).toBe(0)
fd7ebd49 472 await pool.destroy()
bf9549ae
JB
473 })
474
2431bdb4 475 it('Verify that pool worker tasks usage are initialized', async () => {
bf9549ae
JB
476 const pool = new FixedClusterPool(
477 numberOfWorkers,
478 './tests/worker-files/cluster/testWorker.js'
479 )
f06e48d8 480 for (const workerNode of pool.workerNodes) {
465b2940 481 expect(workerNode.usage).toStrictEqual({
a4e07f72
JB
482 tasks: {
483 executed: 0,
484 executing: 0,
485 queued: 0,
df593701 486 maxQueued: 0,
a4e07f72
JB
487 failed: 0
488 },
489 runTime: {
a4e07f72
JB
490 history: expect.any(CircularArray)
491 },
492 waitTime: {
a4e07f72
JB
493 history: expect.any(CircularArray)
494 },
5df69fab
JB
495 elu: {
496 idle: {
5df69fab
JB
497 history: expect.any(CircularArray)
498 },
499 active: {
5df69fab 500 history: expect.any(CircularArray)
f7510105 501 }
5df69fab 502 }
86bf340d 503 })
f06e48d8
JB
504 }
505 await pool.destroy()
506 })
507
2431bdb4
JB
508 it('Verify that pool worker tasks queue are initialized', async () => {
509 let pool = new FixedClusterPool(
f06e48d8
JB
510 numberOfWorkers,
511 './tests/worker-files/cluster/testWorker.js'
512 )
513 for (const workerNode of pool.workerNodes) {
514 expect(workerNode.tasksQueue).toBeDefined()
29ee7e9a 515 expect(workerNode.tasksQueue).toBeInstanceOf(Queue)
4d8bf9e4 516 expect(workerNode.tasksQueue.size).toBe(0)
9c16fb4b 517 expect(workerNode.tasksQueue.maxSize).toBe(0)
bf9549ae 518 }
fd7ebd49 519 await pool.destroy()
2431bdb4
JB
520 pool = new DynamicThreadPool(
521 Math.floor(numberOfWorkers / 2),
522 numberOfWorkers,
523 './tests/worker-files/thread/testWorker.js'
524 )
525 for (const workerNode of pool.workerNodes) {
526 expect(workerNode.tasksQueue).toBeDefined()
527 expect(workerNode.tasksQueue).toBeInstanceOf(Queue)
528 expect(workerNode.tasksQueue.size).toBe(0)
529 expect(workerNode.tasksQueue.maxSize).toBe(0)
530 }
531 })
532
533 it('Verify that pool worker info are initialized', async () => {
534 let pool = new FixedClusterPool(
535 numberOfWorkers,
536 './tests/worker-files/cluster/testWorker.js'
537 )
538 for (const workerNode of pool.workerNodes) {
539 expect(workerNode.info).toStrictEqual({
540 id: expect.any(Number),
541 type: WorkerTypes.cluster,
542 dynamic: false,
543 ready: false
544 })
545 }
546 await pool.destroy()
547 pool = new DynamicThreadPool(
548 Math.floor(numberOfWorkers / 2),
549 numberOfWorkers,
550 './tests/worker-files/thread/testWorker.js'
551 )
552 for (const workerNode of pool.workerNodes) {
553 expect(workerNode.info).toStrictEqual({
554 id: expect.any(Number),
555 type: WorkerTypes.thread,
556 dynamic: false,
557 ready: false
558 })
559 }
bf9549ae
JB
560 })
561
2431bdb4 562 it('Verify that pool worker tasks usage are computed', async () => {
bf9549ae
JB
563 const pool = new FixedClusterPool(
564 numberOfWorkers,
565 './tests/worker-files/cluster/testWorker.js'
566 )
09c2d0d3 567 const promises = new Set()
fc027381
JB
568 const maxMultiplier = 2
569 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
09c2d0d3 570 promises.add(pool.execute())
bf9549ae 571 }
f06e48d8 572 for (const workerNode of pool.workerNodes) {
465b2940 573 expect(workerNode.usage).toStrictEqual({
a4e07f72
JB
574 tasks: {
575 executed: 0,
576 executing: maxMultiplier,
577 queued: 0,
df593701 578 maxQueued: 0,
a4e07f72
JB
579 failed: 0
580 },
581 runTime: {
a4e07f72
JB
582 history: expect.any(CircularArray)
583 },
584 waitTime: {
a4e07f72
JB
585 history: expect.any(CircularArray)
586 },
5df69fab
JB
587 elu: {
588 idle: {
5df69fab
JB
589 history: expect.any(CircularArray)
590 },
591 active: {
5df69fab 592 history: expect.any(CircularArray)
f7510105 593 }
5df69fab 594 }
86bf340d 595 })
bf9549ae
JB
596 }
597 await Promise.all(promises)
f06e48d8 598 for (const workerNode of pool.workerNodes) {
465b2940 599 expect(workerNode.usage).toStrictEqual({
a4e07f72
JB
600 tasks: {
601 executed: maxMultiplier,
602 executing: 0,
603 queued: 0,
df593701 604 maxQueued: 0,
a4e07f72
JB
605 failed: 0
606 },
607 runTime: {
a4e07f72
JB
608 history: expect.any(CircularArray)
609 },
610 waitTime: {
a4e07f72
JB
611 history: expect.any(CircularArray)
612 },
5df69fab
JB
613 elu: {
614 idle: {
5df69fab
JB
615 history: expect.any(CircularArray)
616 },
617 active: {
5df69fab 618 history: expect.any(CircularArray)
f7510105 619 }
5df69fab 620 }
86bf340d 621 })
bf9549ae 622 }
fd7ebd49 623 await pool.destroy()
bf9549ae
JB
624 })
625
2431bdb4 626 it('Verify that pool worker tasks usage are reset at worker choice strategy change', async () => {
7fd82a1c 627 const pool = new DynamicThreadPool(
2431bdb4 628 Math.floor(numberOfWorkers / 2),
8f4878b7 629 numberOfWorkers,
9e619829
JB
630 './tests/worker-files/thread/testWorker.js'
631 )
09c2d0d3 632 const promises = new Set()
ee9f5295
JB
633 const maxMultiplier = 2
634 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
09c2d0d3 635 promises.add(pool.execute())
9e619829
JB
636 }
637 await Promise.all(promises)
f06e48d8 638 for (const workerNode of pool.workerNodes) {
465b2940 639 expect(workerNode.usage).toStrictEqual({
a4e07f72
JB
640 tasks: {
641 executed: expect.any(Number),
642 executing: 0,
643 queued: 0,
df593701 644 maxQueued: 0,
a4e07f72
JB
645 failed: 0
646 },
647 runTime: {
a4e07f72
JB
648 history: expect.any(CircularArray)
649 },
650 waitTime: {
a4e07f72
JB
651 history: expect.any(CircularArray)
652 },
5df69fab
JB
653 elu: {
654 idle: {
5df69fab
JB
655 history: expect.any(CircularArray)
656 },
657 active: {
5df69fab 658 history: expect.any(CircularArray)
f7510105 659 }
5df69fab 660 }
86bf340d 661 })
465b2940
JB
662 expect(workerNode.usage.tasks.executed).toBeGreaterThan(0)
663 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(maxMultiplier)
9e619829
JB
664 }
665 pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.FAIR_SHARE)
f06e48d8 666 for (const workerNode of pool.workerNodes) {
465b2940 667 expect(workerNode.usage).toStrictEqual({
a4e07f72
JB
668 tasks: {
669 executed: 0,
670 executing: 0,
671 queued: 0,
df593701 672 maxQueued: 0,
a4e07f72
JB
673 failed: 0
674 },
675 runTime: {
a4e07f72
JB
676 history: expect.any(CircularArray)
677 },
678 waitTime: {
a4e07f72
JB
679 history: expect.any(CircularArray)
680 },
5df69fab
JB
681 elu: {
682 idle: {
5df69fab
JB
683 history: expect.any(CircularArray)
684 },
685 active: {
5df69fab 686 history: expect.any(CircularArray)
f7510105 687 }
5df69fab 688 }
86bf340d 689 })
465b2940
JB
690 expect(workerNode.usage.runTime.history.length).toBe(0)
691 expect(workerNode.usage.waitTime.history.length).toBe(0)
ee11a4a2 692 }
fd7ebd49 693 await pool.destroy()
ee11a4a2
JB
694 })
695
164d950a
JB
696 it("Verify that pool event emitter 'full' event can register a callback", async () => {
697 const pool = new DynamicThreadPool(
2431bdb4 698 Math.floor(numberOfWorkers / 2),
164d950a
JB
699 numberOfWorkers,
700 './tests/worker-files/thread/testWorker.js'
701 )
09c2d0d3 702 const promises = new Set()
164d950a 703 let poolFull = 0
d46660cd
JB
704 let poolInfo
705 pool.emitter.on(PoolEvents.full, info => {
706 ++poolFull
707 poolInfo = info
708 })
164d950a 709 for (let i = 0; i < numberOfWorkers * 2; i++) {
f5d14e90 710 promises.add(pool.execute())
164d950a
JB
711 }
712 await Promise.all(promises)
2431bdb4
JB
713 // The `full` event is triggered when the number of submitted tasks at once reach the maximum number of workers in the dynamic pool.
714 // So in total numberOfWorkers * 2 - 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the dynamic pool with min = (max = numberOfWorkers) / 2.
715 expect(poolFull).toBe(numberOfWorkers * 2 - 1)
d46660cd 716 expect(poolInfo).toStrictEqual({
23ccf9d7 717 version,
d46660cd
JB
718 type: PoolTypes.dynamic,
719 worker: WorkerTypes.thread,
2431bdb4
JB
720 ready: expect.any(Boolean),
721 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
722 minSize: expect.any(Number),
723 maxSize: expect.any(Number),
724 workerNodes: expect.any(Number),
725 idleWorkerNodes: expect.any(Number),
726 busyWorkerNodes: expect.any(Number),
727 executedTasks: expect.any(Number),
728 executingTasks: expect.any(Number),
729 queuedTasks: expect.any(Number),
730 maxQueuedTasks: expect.any(Number),
731 failedTasks: expect.any(Number)
732 })
733 await pool.destroy()
734 })
735
736 it("Verify that pool event emitter 'ready' event can register a callback", async () => {
737 const pool = new FixedClusterPool(
738 numberOfWorkers,
739 './tests/worker-files/cluster/testWorker.js'
740 )
741 let poolReady = 0
742 let poolInfo
743 pool.emitter.on(PoolEvents.ready, info => {
744 ++poolReady
745 poolInfo = info
746 })
747 await waitPoolEvents(pool, PoolEvents.ready, 1)
748 expect(poolReady).toBe(1)
749 expect(poolInfo).toStrictEqual({
750 version,
751 type: PoolTypes.fixed,
752 worker: WorkerTypes.cluster,
753 ready: true,
754 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
d46660cd
JB
755 minSize: expect.any(Number),
756 maxSize: expect.any(Number),
757 workerNodes: expect.any(Number),
758 idleWorkerNodes: expect.any(Number),
759 busyWorkerNodes: expect.any(Number),
a4e07f72
JB
760 executedTasks: expect.any(Number),
761 executingTasks: expect.any(Number),
d46660cd 762 queuedTasks: expect.any(Number),
a4e07f72
JB
763 maxQueuedTasks: expect.any(Number),
764 failedTasks: expect.any(Number)
d46660cd 765 })
164d950a
JB
766 await pool.destroy()
767 })
768
cf597bc5 769 it("Verify that pool event emitter 'busy' event can register a callback", async () => {
7c0ba920
JB
770 const pool = new FixedThreadPool(
771 numberOfWorkers,
772 './tests/worker-files/thread/testWorker.js'
773 )
09c2d0d3 774 const promises = new Set()
7c0ba920 775 let poolBusy = 0
d46660cd
JB
776 let poolInfo
777 pool.emitter.on(PoolEvents.busy, info => {
778 ++poolBusy
779 poolInfo = info
780 })
7c0ba920 781 for (let i = 0; i < numberOfWorkers * 2; i++) {
f5d14e90 782 promises.add(pool.execute())
7c0ba920 783 }
cf597bc5 784 await Promise.all(promises)
14916bf9
JB
785 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
786 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
787 expect(poolBusy).toBe(numberOfWorkers + 1)
d46660cd 788 expect(poolInfo).toStrictEqual({
23ccf9d7 789 version,
d46660cd
JB
790 type: PoolTypes.fixed,
791 worker: WorkerTypes.thread,
2431bdb4
JB
792 ready: expect.any(Boolean),
793 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
d46660cd
JB
794 minSize: expect.any(Number),
795 maxSize: expect.any(Number),
796 workerNodes: expect.any(Number),
797 idleWorkerNodes: expect.any(Number),
798 busyWorkerNodes: expect.any(Number),
a4e07f72
JB
799 executedTasks: expect.any(Number),
800 executingTasks: expect.any(Number),
d46660cd 801 queuedTasks: expect.any(Number),
a4e07f72
JB
802 maxQueuedTasks: expect.any(Number),
803 failedTasks: expect.any(Number)
d46660cd 804 })
fd7ebd49 805 await pool.destroy()
7c0ba920 806 })
70a4f5ea
JB
807
808 it('Verify that multiple tasks worker is working', async () => {
809 const pool = new DynamicClusterPool(
2431bdb4 810 Math.floor(numberOfWorkers / 2),
70a4f5ea 811 numberOfWorkers,
70a4f5ea
JB
812 './tests/worker-files/cluster/testMultiTasksWorker.js'
813 )
814 const data = { n: 10 }
82888165 815 const result0 = await pool.execute(data)
30b963d4 816 expect(result0).toStrictEqual({ ok: 1 })
70a4f5ea 817 const result1 = await pool.execute(data, 'jsonIntegerSerialization')
30b963d4 818 expect(result1).toStrictEqual({ ok: 1 })
70a4f5ea
JB
819 const result2 = await pool.execute(data, 'factorial')
820 expect(result2).toBe(3628800)
821 const result3 = await pool.execute(data, 'fibonacci')
024daf59 822 expect(result3).toBe(55)
70a4f5ea 823 })
3ec964d6 824})