fix: ensure the task concurrency is respected at queued task
[poolifier.git] / tests / pools / abstract / abstract-pool.test.js
CommitLineData
85aeb3f3 1const { MessageChannel } = require('worker_threads')
a61a0724 2const { expect } = require('expect')
e843b904 3const {
70a4f5ea 4 DynamicClusterPool,
9e619829 5 DynamicThreadPool,
aee46736 6 FixedClusterPool,
e843b904 7 FixedThreadPool,
aee46736 8 PoolEvents,
184855e6 9 PoolTypes,
3d6dd312 10 WorkerChoiceStrategies,
184855e6 11 WorkerTypes
cdace0e5 12} = require('../../../lib')
78099a15 13const { CircularArray } = require('../../../lib/circular-array')
29ee7e9a 14const { Queue } = require('../../../lib/queue')
23ccf9d7 15const { version } = require('../../../package.json')
2431bdb4 16const { waitPoolEvents } = require('../../test-utils')
e1ffb94f
JB
17
18describe('Abstract pool test suite', () => {
fc027381 19 const numberOfWorkers = 2
a8884ffd 20 class StubPoolWithIsMain extends FixedThreadPool {
e1ffb94f
JB
21 isMain () {
22 return false
23 }
3ec964d6 24 }
3ec964d6 25
3ec964d6 26 it('Simulate pool creation from a non main thread/process', () => {
8d3782fa
JB
27 expect(
28 () =>
a8884ffd 29 new StubPoolWithIsMain(
7c0ba920 30 numberOfWorkers,
8d3782fa
JB
31 './tests/worker-files/thread/testWorker.js',
32 {
33 errorHandler: e => console.error(e)
34 }
35 )
d4aeae5a 36 ).toThrowError('Cannot start a pool from a worker!')
3ec964d6 37 })
c510fea7
APA
38
39 it('Verify that filePath is checked', () => {
292ad316
JB
40 const expectedError = new Error(
41 'Please specify a file with a worker implementation'
42 )
7c0ba920 43 expect(() => new FixedThreadPool(numberOfWorkers)).toThrowError(
292ad316 44 expectedError
8d3782fa 45 )
7c0ba920 46 expect(() => new FixedThreadPool(numberOfWorkers, '')).toThrowError(
292ad316 47 expectedError
8d3782fa 48 )
3d6dd312
JB
49 expect(() => new FixedThreadPool(numberOfWorkers, 0)).toThrowError(
50 expectedError
51 )
52 expect(() => new FixedThreadPool(numberOfWorkers, true)).toThrowError(
53 expectedError
54 )
55 expect(
56 () => new FixedThreadPool(numberOfWorkers, './dummyWorker.ts')
57 ).toThrowError(new Error("Cannot find the worker file './dummyWorker.ts'"))
8d3782fa
JB
58 })
59
60 it('Verify that numberOfWorkers is checked', () => {
61 expect(() => new FixedThreadPool()).toThrowError(
d4aeae5a 62 'Cannot instantiate a pool without specifying the number of workers'
8d3782fa
JB
63 )
64 })
65
66 it('Verify that a negative number of workers is checked', () => {
67 expect(
68 () =>
69 new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.js')
70 ).toThrowError(
473c717a
JB
71 new RangeError(
72 'Cannot instantiate a pool with a negative number of workers'
73 )
8d3782fa
JB
74 )
75 })
76
77 it('Verify that a non integer number of workers is checked', () => {
78 expect(
79 () =>
80 new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.js')
81 ).toThrowError(
473c717a 82 new TypeError(
0d80593b 83 'Cannot instantiate a pool with a non safe integer number of workers'
8d3782fa
JB
84 )
85 )
c510fea7 86 })
7c0ba920 87
216541b6 88 it('Verify that dynamic pool sizing is checked', () => {
a5ed75b7
JB
89 expect(
90 () =>
91 new DynamicClusterPool(
92 1,
93 undefined,
94 './tests/worker-files/cluster/testWorker.js'
95 )
96 ).toThrowError(
97 new TypeError(
98 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
99 )
100 )
2761efb4
JB
101 expect(
102 () =>
103 new DynamicThreadPool(
104 0.5,
105 1,
106 './tests/worker-files/thread/testWorker.js'
107 )
108 ).toThrowError(
109 new TypeError(
110 'Cannot instantiate a pool with a non safe integer number of workers'
111 )
112 )
113 expect(
114 () =>
115 new DynamicClusterPool(
116 0,
117 0.5,
a5ed75b7 118 './tests/worker-files/cluster/testWorker.js'
2761efb4
JB
119 )
120 ).toThrowError(
121 new TypeError(
122 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
123 )
124 )
2431bdb4
JB
125 expect(
126 () =>
127 new DynamicThreadPool(2, 1, './tests/worker-files/thread/testWorker.js')
128 ).toThrowError(
129 new RangeError(
130 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
131 )
132 )
133 expect(
134 () =>
2761efb4
JB
135 new DynamicClusterPool(
136 1,
137 1,
a5ed75b7 138 './tests/worker-files/cluster/testWorker.js'
2761efb4 139 )
2431bdb4
JB
140 ).toThrowError(
141 new RangeError(
142 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
143 )
144 )
21f710aa
JB
145 expect(
146 () =>
147 new DynamicThreadPool(0, 0, './tests/worker-files/thread/testWorker.js')
148 ).toThrowError(
149 new RangeError(
b97d82d8 150 'Cannot instantiate a dynamic pool with a pool size equal to zero'
21f710aa
JB
151 )
152 )
2431bdb4
JB
153 })
154
fd7ebd49 155 it('Verify that pool options are checked', async () => {
7c0ba920
JB
156 let pool = new FixedThreadPool(
157 numberOfWorkers,
158 './tests/worker-files/thread/testWorker.js'
159 )
7c0ba920 160 expect(pool.emitter).toBeDefined()
1f68cede
JB
161 expect(pool.opts.enableEvents).toBe(true)
162 expect(pool.opts.restartWorkerOnError).toBe(true)
ff733df7 163 expect(pool.opts.enableTasksQueue).toBe(false)
d4aeae5a 164 expect(pool.opts.tasksQueueOptions).toBeUndefined()
e843b904
JB
165 expect(pool.opts.workerChoiceStrategy).toBe(
166 WorkerChoiceStrategies.ROUND_ROBIN
167 )
da309861 168 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
932fc8be 169 runTime: { median: false },
5df69fab
JB
170 waitTime: { median: false },
171 elu: { median: false }
da309861 172 })
35cf1c03
JB
173 expect(pool.opts.messageHandler).toBeUndefined()
174 expect(pool.opts.errorHandler).toBeUndefined()
175 expect(pool.opts.onlineHandler).toBeUndefined()
176 expect(pool.opts.exitHandler).toBeUndefined()
fd7ebd49 177 await pool.destroy()
35cf1c03 178 const testHandler = () => console.log('test handler executed')
7c0ba920
JB
179 pool = new FixedThreadPool(
180 numberOfWorkers,
181 './tests/worker-files/thread/testWorker.js',
182 {
e4543b14 183 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
49be33fe 184 workerChoiceStrategyOptions: {
932fc8be 185 runTime: { median: true },
fc027381 186 weights: { 0: 300, 1: 200 }
49be33fe 187 },
35cf1c03 188 enableEvents: false,
1f68cede 189 restartWorkerOnError: false,
ff733df7 190 enableTasksQueue: true,
d4aeae5a 191 tasksQueueOptions: { concurrency: 2 },
35cf1c03
JB
192 messageHandler: testHandler,
193 errorHandler: testHandler,
194 onlineHandler: testHandler,
195 exitHandler: testHandler
7c0ba920
JB
196 }
197 )
7c0ba920 198 expect(pool.emitter).toBeUndefined()
1f68cede
JB
199 expect(pool.opts.enableEvents).toBe(false)
200 expect(pool.opts.restartWorkerOnError).toBe(false)
ff733df7 201 expect(pool.opts.enableTasksQueue).toBe(true)
d4aeae5a 202 expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 2 })
e843b904 203 expect(pool.opts.workerChoiceStrategy).toBe(
e4543b14 204 WorkerChoiceStrategies.LEAST_USED
e843b904 205 )
da309861 206 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
932fc8be 207 runTime: { median: true },
fc027381 208 weights: { 0: 300, 1: 200 }
da309861 209 })
35cf1c03
JB
210 expect(pool.opts.messageHandler).toStrictEqual(testHandler)
211 expect(pool.opts.errorHandler).toStrictEqual(testHandler)
212 expect(pool.opts.onlineHandler).toStrictEqual(testHandler)
213 expect(pool.opts.exitHandler).toStrictEqual(testHandler)
fd7ebd49 214 await pool.destroy()
7c0ba920
JB
215 })
216
a20f0ba5 217 it('Verify that pool options are validated', async () => {
d4aeae5a
JB
218 expect(
219 () =>
220 new FixedThreadPool(
221 numberOfWorkers,
222 './tests/worker-files/thread/testWorker.js',
223 {
f0d7f803 224 workerChoiceStrategy: 'invalidStrategy'
d4aeae5a
JB
225 }
226 )
f0d7f803 227 ).toThrowError("Invalid worker choice strategy 'invalidStrategy'")
d4aeae5a
JB
228 expect(
229 () =>
230 new FixedThreadPool(
231 numberOfWorkers,
232 './tests/worker-files/thread/testWorker.js',
233 {
f0d7f803 234 workerChoiceStrategyOptions: 'invalidOptions'
d4aeae5a
JB
235 }
236 )
f0d7f803
JB
237 ).toThrowError(
238 'Invalid worker choice strategy options: must be a plain object'
239 )
49be33fe
JB
240 expect(
241 () =>
242 new FixedThreadPool(
243 numberOfWorkers,
244 './tests/worker-files/thread/testWorker.js',
245 {
246 workerChoiceStrategyOptions: { weights: {} }
247 }
248 )
249 ).toThrowError(
250 'Invalid worker choice strategy options: must have a weight for each worker node'
251 )
f0d7f803
JB
252 expect(
253 () =>
254 new FixedThreadPool(
255 numberOfWorkers,
256 './tests/worker-files/thread/testWorker.js',
257 {
258 workerChoiceStrategyOptions: { measurement: 'invalidMeasurement' }
259 }
260 )
261 ).toThrowError(
262 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
263 )
264 expect(
265 () =>
266 new FixedThreadPool(
267 numberOfWorkers,
268 './tests/worker-files/thread/testWorker.js',
269 {
270 enableTasksQueue: true,
271 tasksQueueOptions: { concurrency: 0 }
272 }
273 )
274 ).toThrowError("Invalid worker tasks concurrency '0'")
275 expect(
276 () =>
277 new FixedThreadPool(
278 numberOfWorkers,
279 './tests/worker-files/thread/testWorker.js',
280 {
281 enableTasksQueue: true,
282 tasksQueueOptions: 'invalidTasksQueueOptions'
283 }
284 )
285 ).toThrowError('Invalid tasks queue options: must be a plain object')
286 expect(
287 () =>
288 new FixedThreadPool(
289 numberOfWorkers,
290 './tests/worker-files/thread/testWorker.js',
291 {
292 enableTasksQueue: true,
293 tasksQueueOptions: { concurrency: 0.2 }
294 }
295 )
296 ).toThrowError('Invalid worker tasks concurrency: must be an integer')
d4aeae5a
JB
297 })
298
2431bdb4 299 it('Verify that pool worker choice strategy options can be set', async () => {
a20f0ba5
JB
300 const pool = new FixedThreadPool(
301 numberOfWorkers,
302 './tests/worker-files/thread/testWorker.js',
303 { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
304 )
305 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
932fc8be 306 runTime: { median: false },
5df69fab
JB
307 waitTime: { median: false },
308 elu: { median: false }
a20f0ba5
JB
309 })
310 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
311 .workerChoiceStrategies) {
86bf340d 312 expect(workerChoiceStrategy.opts).toStrictEqual({
932fc8be 313 runTime: { median: false },
5df69fab
JB
314 waitTime: { median: false },
315 elu: { median: false }
86bf340d 316 })
a20f0ba5 317 }
87de9ff5
JB
318 expect(
319 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
320 ).toStrictEqual({
932fc8be
JB
321 runTime: {
322 aggregate: true,
323 average: true,
324 median: false
325 },
326 waitTime: {
327 aggregate: false,
328 average: false,
329 median: false
330 },
5df69fab 331 elu: {
9adcefab
JB
332 aggregate: true,
333 average: true,
5df69fab
JB
334 median: false
335 }
86bf340d 336 })
9adcefab
JB
337 pool.setWorkerChoiceStrategyOptions({
338 runTime: { median: true },
339 elu: { median: true }
340 })
a20f0ba5 341 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
9adcefab
JB
342 runTime: { median: true },
343 elu: { median: true }
a20f0ba5
JB
344 })
345 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
346 .workerChoiceStrategies) {
932fc8be 347 expect(workerChoiceStrategy.opts).toStrictEqual({
9adcefab
JB
348 runTime: { median: true },
349 elu: { median: true }
932fc8be 350 })
a20f0ba5 351 }
87de9ff5
JB
352 expect(
353 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
354 ).toStrictEqual({
932fc8be
JB
355 runTime: {
356 aggregate: true,
357 average: false,
358 median: true
359 },
360 waitTime: {
361 aggregate: false,
362 average: false,
363 median: false
364 },
5df69fab 365 elu: {
9adcefab 366 aggregate: true,
5df69fab 367 average: false,
9adcefab 368 median: true
5df69fab 369 }
86bf340d 370 })
9adcefab
JB
371 pool.setWorkerChoiceStrategyOptions({
372 runTime: { median: false },
373 elu: { median: false }
374 })
a20f0ba5 375 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
9adcefab
JB
376 runTime: { median: false },
377 elu: { median: false }
a20f0ba5
JB
378 })
379 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
380 .workerChoiceStrategies) {
932fc8be 381 expect(workerChoiceStrategy.opts).toStrictEqual({
9adcefab
JB
382 runTime: { median: false },
383 elu: { median: false }
932fc8be 384 })
a20f0ba5 385 }
87de9ff5
JB
386 expect(
387 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
388 ).toStrictEqual({
932fc8be
JB
389 runTime: {
390 aggregate: true,
391 average: true,
392 median: false
393 },
394 waitTime: {
395 aggregate: false,
396 average: false,
397 median: false
398 },
5df69fab 399 elu: {
9adcefab
JB
400 aggregate: true,
401 average: true,
5df69fab
JB
402 median: false
403 }
86bf340d 404 })
1f95d544
JB
405 expect(() =>
406 pool.setWorkerChoiceStrategyOptions('invalidWorkerChoiceStrategyOptions')
407 ).toThrowError(
408 'Invalid worker choice strategy options: must be a plain object'
409 )
410 expect(() =>
411 pool.setWorkerChoiceStrategyOptions({ weights: {} })
412 ).toThrowError(
413 'Invalid worker choice strategy options: must have a weight for each worker node'
414 )
415 expect(() =>
416 pool.setWorkerChoiceStrategyOptions({ measurement: 'invalidMeasurement' })
417 ).toThrowError(
418 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
419 )
a20f0ba5
JB
420 await pool.destroy()
421 })
422
2431bdb4 423 it('Verify that pool tasks queue can be enabled/disabled', async () => {
a20f0ba5
JB
424 const pool = new FixedThreadPool(
425 numberOfWorkers,
426 './tests/worker-files/thread/testWorker.js'
427 )
428 expect(pool.opts.enableTasksQueue).toBe(false)
429 expect(pool.opts.tasksQueueOptions).toBeUndefined()
430 pool.enableTasksQueue(true)
431 expect(pool.opts.enableTasksQueue).toBe(true)
432 expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 1 })
433 pool.enableTasksQueue(true, { concurrency: 2 })
434 expect(pool.opts.enableTasksQueue).toBe(true)
435 expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 2 })
436 pool.enableTasksQueue(false)
437 expect(pool.opts.enableTasksQueue).toBe(false)
438 expect(pool.opts.tasksQueueOptions).toBeUndefined()
439 await pool.destroy()
440 })
441
2431bdb4 442 it('Verify that pool tasks queue options can be set', async () => {
a20f0ba5
JB
443 const pool = new FixedThreadPool(
444 numberOfWorkers,
445 './tests/worker-files/thread/testWorker.js',
446 { enableTasksQueue: true }
447 )
448 expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 1 })
449 pool.setTasksQueueOptions({ concurrency: 2 })
450 expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 2 })
f0d7f803
JB
451 expect(() =>
452 pool.setTasksQueueOptions('invalidTasksQueueOptions')
453 ).toThrowError('Invalid tasks queue options: must be a plain object')
a20f0ba5
JB
454 expect(() => pool.setTasksQueueOptions({ concurrency: 0 })).toThrowError(
455 "Invalid worker tasks concurrency '0'"
456 )
f0d7f803
JB
457 expect(() => pool.setTasksQueueOptions({ concurrency: 0.2 })).toThrowError(
458 'Invalid worker tasks concurrency: must be an integer'
459 )
a20f0ba5
JB
460 await pool.destroy()
461 })
462
6b27d407
JB
463 it('Verify that pool info is set', async () => {
464 let pool = new FixedThreadPool(
465 numberOfWorkers,
466 './tests/worker-files/thread/testWorker.js'
467 )
2dca6cab
JB
468 expect(pool.info).toStrictEqual({
469 version,
470 type: PoolTypes.fixed,
471 worker: WorkerTypes.thread,
472 ready: true,
473 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
474 minSize: numberOfWorkers,
475 maxSize: numberOfWorkers,
476 workerNodes: numberOfWorkers,
477 idleWorkerNodes: numberOfWorkers,
478 busyWorkerNodes: 0,
479 executedTasks: 0,
480 executingTasks: 0,
481 queuedTasks: 0,
482 maxQueuedTasks: 0,
483 failedTasks: 0
484 })
6b27d407
JB
485 await pool.destroy()
486 pool = new DynamicClusterPool(
2431bdb4 487 Math.floor(numberOfWorkers / 2),
6b27d407 488 numberOfWorkers,
ecdfbdc0 489 './tests/worker-files/cluster/testWorker.js'
6b27d407 490 )
2dca6cab
JB
491 expect(pool.info).toStrictEqual({
492 version,
493 type: PoolTypes.dynamic,
494 worker: WorkerTypes.cluster,
495 ready: true,
496 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
497 minSize: Math.floor(numberOfWorkers / 2),
498 maxSize: numberOfWorkers,
499 workerNodes: Math.floor(numberOfWorkers / 2),
500 idleWorkerNodes: Math.floor(numberOfWorkers / 2),
501 busyWorkerNodes: 0,
502 executedTasks: 0,
503 executingTasks: 0,
504 queuedTasks: 0,
505 maxQueuedTasks: 0,
506 failedTasks: 0
507 })
6b27d407
JB
508 await pool.destroy()
509 })
510
2431bdb4 511 it('Verify that pool worker tasks usage are initialized', async () => {
bf9549ae
JB
512 const pool = new FixedClusterPool(
513 numberOfWorkers,
514 './tests/worker-files/cluster/testWorker.js'
515 )
f06e48d8 516 for (const workerNode of pool.workerNodes) {
465b2940 517 expect(workerNode.usage).toStrictEqual({
a4e07f72
JB
518 tasks: {
519 executed: 0,
520 executing: 0,
521 queued: 0,
df593701 522 maxQueued: 0,
a4e07f72
JB
523 failed: 0
524 },
525 runTime: {
a4e07f72
JB
526 history: expect.any(CircularArray)
527 },
528 waitTime: {
a4e07f72
JB
529 history: expect.any(CircularArray)
530 },
5df69fab
JB
531 elu: {
532 idle: {
5df69fab
JB
533 history: expect.any(CircularArray)
534 },
535 active: {
5df69fab 536 history: expect.any(CircularArray)
f7510105 537 }
5df69fab 538 }
86bf340d 539 })
f06e48d8
JB
540 }
541 await pool.destroy()
542 })
543
2431bdb4
JB
544 it('Verify that pool worker tasks queue are initialized', async () => {
545 let pool = new FixedClusterPool(
f06e48d8
JB
546 numberOfWorkers,
547 './tests/worker-files/cluster/testWorker.js'
548 )
549 for (const workerNode of pool.workerNodes) {
550 expect(workerNode.tasksQueue).toBeDefined()
29ee7e9a 551 expect(workerNode.tasksQueue).toBeInstanceOf(Queue)
4d8bf9e4 552 expect(workerNode.tasksQueue.size).toBe(0)
9c16fb4b 553 expect(workerNode.tasksQueue.maxSize).toBe(0)
bf9549ae 554 }
fd7ebd49 555 await pool.destroy()
2431bdb4
JB
556 pool = new DynamicThreadPool(
557 Math.floor(numberOfWorkers / 2),
558 numberOfWorkers,
559 './tests/worker-files/thread/testWorker.js'
560 )
561 for (const workerNode of pool.workerNodes) {
562 expect(workerNode.tasksQueue).toBeDefined()
563 expect(workerNode.tasksQueue).toBeInstanceOf(Queue)
564 expect(workerNode.tasksQueue.size).toBe(0)
565 expect(workerNode.tasksQueue.maxSize).toBe(0)
566 }
567 })
568
569 it('Verify that pool worker info are initialized', async () => {
570 let pool = new FixedClusterPool(
571 numberOfWorkers,
572 './tests/worker-files/cluster/testWorker.js'
573 )
2dca6cab
JB
574 for (const workerNode of pool.workerNodes) {
575 expect(workerNode.info).toStrictEqual({
576 id: expect.any(Number),
577 type: WorkerTypes.cluster,
578 dynamic: false,
579 ready: true
580 })
581 }
2431bdb4
JB
582 await pool.destroy()
583 pool = new DynamicThreadPool(
584 Math.floor(numberOfWorkers / 2),
585 numberOfWorkers,
586 './tests/worker-files/thread/testWorker.js'
587 )
2dca6cab
JB
588 for (const workerNode of pool.workerNodes) {
589 expect(workerNode.info).toStrictEqual({
590 id: expect.any(Number),
591 type: WorkerTypes.thread,
592 dynamic: false,
85aeb3f3
JB
593 ready: true,
594 messageChannel: expect.any(MessageChannel)
2dca6cab
JB
595 })
596 }
bf9549ae
JB
597 })
598
2431bdb4 599 it('Verify that pool worker tasks usage are computed', async () => {
bf9549ae
JB
600 const pool = new FixedClusterPool(
601 numberOfWorkers,
602 './tests/worker-files/cluster/testWorker.js'
603 )
09c2d0d3 604 const promises = new Set()
fc027381
JB
605 const maxMultiplier = 2
606 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
09c2d0d3 607 promises.add(pool.execute())
bf9549ae 608 }
f06e48d8 609 for (const workerNode of pool.workerNodes) {
465b2940 610 expect(workerNode.usage).toStrictEqual({
a4e07f72
JB
611 tasks: {
612 executed: 0,
613 executing: maxMultiplier,
614 queued: 0,
df593701 615 maxQueued: 0,
a4e07f72
JB
616 failed: 0
617 },
618 runTime: {
a4e07f72
JB
619 history: expect.any(CircularArray)
620 },
621 waitTime: {
a4e07f72
JB
622 history: expect.any(CircularArray)
623 },
5df69fab
JB
624 elu: {
625 idle: {
5df69fab
JB
626 history: expect.any(CircularArray)
627 },
628 active: {
5df69fab 629 history: expect.any(CircularArray)
f7510105 630 }
5df69fab 631 }
86bf340d 632 })
bf9549ae
JB
633 }
634 await Promise.all(promises)
f06e48d8 635 for (const workerNode of pool.workerNodes) {
465b2940 636 expect(workerNode.usage).toStrictEqual({
a4e07f72
JB
637 tasks: {
638 executed: maxMultiplier,
639 executing: 0,
640 queued: 0,
df593701 641 maxQueued: 0,
a4e07f72
JB
642 failed: 0
643 },
644 runTime: {
a4e07f72
JB
645 history: expect.any(CircularArray)
646 },
647 waitTime: {
a4e07f72
JB
648 history: expect.any(CircularArray)
649 },
5df69fab
JB
650 elu: {
651 idle: {
5df69fab
JB
652 history: expect.any(CircularArray)
653 },
654 active: {
5df69fab 655 history: expect.any(CircularArray)
f7510105 656 }
5df69fab 657 }
86bf340d 658 })
bf9549ae 659 }
fd7ebd49 660 await pool.destroy()
bf9549ae
JB
661 })
662
2431bdb4 663 it('Verify that pool worker tasks usage are reset at worker choice strategy change', async () => {
7fd82a1c 664 const pool = new DynamicThreadPool(
2431bdb4 665 Math.floor(numberOfWorkers / 2),
8f4878b7 666 numberOfWorkers,
9e619829
JB
667 './tests/worker-files/thread/testWorker.js'
668 )
09c2d0d3 669 const promises = new Set()
ee9f5295
JB
670 const maxMultiplier = 2
671 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
09c2d0d3 672 promises.add(pool.execute())
9e619829
JB
673 }
674 await Promise.all(promises)
f06e48d8 675 for (const workerNode of pool.workerNodes) {
465b2940 676 expect(workerNode.usage).toStrictEqual({
a4e07f72
JB
677 tasks: {
678 executed: expect.any(Number),
679 executing: 0,
680 queued: 0,
df593701 681 maxQueued: 0,
a4e07f72
JB
682 failed: 0
683 },
684 runTime: {
a4e07f72
JB
685 history: expect.any(CircularArray)
686 },
687 waitTime: {
a4e07f72
JB
688 history: expect.any(CircularArray)
689 },
5df69fab
JB
690 elu: {
691 idle: {
5df69fab
JB
692 history: expect.any(CircularArray)
693 },
694 active: {
5df69fab 695 history: expect.any(CircularArray)
f7510105 696 }
5df69fab 697 }
86bf340d 698 })
465b2940 699 expect(workerNode.usage.tasks.executed).toBeGreaterThan(0)
b56c2ee5 700 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(maxMultiplier)
b97d82d8
JB
701 expect(workerNode.usage.runTime.history.length).toBe(0)
702 expect(workerNode.usage.waitTime.history.length).toBe(0)
703 expect(workerNode.usage.elu.idle.history.length).toBe(0)
704 expect(workerNode.usage.elu.active.history.length).toBe(0)
9e619829
JB
705 }
706 pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.FAIR_SHARE)
f06e48d8 707 for (const workerNode of pool.workerNodes) {
465b2940 708 expect(workerNode.usage).toStrictEqual({
a4e07f72
JB
709 tasks: {
710 executed: 0,
711 executing: 0,
712 queued: 0,
df593701 713 maxQueued: 0,
a4e07f72
JB
714 failed: 0
715 },
716 runTime: {
a4e07f72
JB
717 history: expect.any(CircularArray)
718 },
719 waitTime: {
a4e07f72
JB
720 history: expect.any(CircularArray)
721 },
5df69fab
JB
722 elu: {
723 idle: {
5df69fab
JB
724 history: expect.any(CircularArray)
725 },
726 active: {
5df69fab 727 history: expect.any(CircularArray)
f7510105 728 }
5df69fab 729 }
86bf340d 730 })
465b2940
JB
731 expect(workerNode.usage.runTime.history.length).toBe(0)
732 expect(workerNode.usage.waitTime.history.length).toBe(0)
b97d82d8
JB
733 expect(workerNode.usage.elu.idle.history.length).toBe(0)
734 expect(workerNode.usage.elu.active.history.length).toBe(0)
ee11a4a2 735 }
fd7ebd49 736 await pool.destroy()
ee11a4a2
JB
737 })
738
164d950a
JB
739 it("Verify that pool event emitter 'full' event can register a callback", async () => {
740 const pool = new DynamicThreadPool(
2431bdb4 741 Math.floor(numberOfWorkers / 2),
164d950a
JB
742 numberOfWorkers,
743 './tests/worker-files/thread/testWorker.js'
744 )
09c2d0d3 745 const promises = new Set()
164d950a 746 let poolFull = 0
d46660cd
JB
747 let poolInfo
748 pool.emitter.on(PoolEvents.full, info => {
749 ++poolFull
750 poolInfo = info
751 })
164d950a 752 for (let i = 0; i < numberOfWorkers * 2; i++) {
f5d14e90 753 promises.add(pool.execute())
164d950a
JB
754 }
755 await Promise.all(promises)
2431bdb4
JB
756 // The `full` event is triggered when the number of submitted tasks at once reach the maximum number of workers in the dynamic pool.
757 // So in total numberOfWorkers * 2 - 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the dynamic pool with min = (max = numberOfWorkers) / 2.
758 expect(poolFull).toBe(numberOfWorkers * 2 - 1)
d46660cd 759 expect(poolInfo).toStrictEqual({
23ccf9d7 760 version,
d46660cd
JB
761 type: PoolTypes.dynamic,
762 worker: WorkerTypes.thread,
2431bdb4
JB
763 ready: expect.any(Boolean),
764 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
765 minSize: expect.any(Number),
766 maxSize: expect.any(Number),
767 workerNodes: expect.any(Number),
768 idleWorkerNodes: expect.any(Number),
769 busyWorkerNodes: expect.any(Number),
770 executedTasks: expect.any(Number),
771 executingTasks: expect.any(Number),
772 queuedTasks: expect.any(Number),
773 maxQueuedTasks: expect.any(Number),
774 failedTasks: expect.any(Number)
775 })
776 await pool.destroy()
777 })
778
779 it("Verify that pool event emitter 'ready' event can register a callback", async () => {
d5024c00
JB
780 const pool = new DynamicClusterPool(
781 Math.floor(numberOfWorkers / 2),
2431bdb4
JB
782 numberOfWorkers,
783 './tests/worker-files/cluster/testWorker.js'
784 )
2431bdb4 785 let poolInfo
cc3ab78b 786 let poolReady = 0
2431bdb4
JB
787 pool.emitter.on(PoolEvents.ready, info => {
788 ++poolReady
789 poolInfo = info
790 })
791 await waitPoolEvents(pool, PoolEvents.ready, 1)
792 expect(poolReady).toBe(1)
793 expect(poolInfo).toStrictEqual({
794 version,
d5024c00 795 type: PoolTypes.dynamic,
2431bdb4
JB
796 worker: WorkerTypes.cluster,
797 ready: true,
798 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
d46660cd
JB
799 minSize: expect.any(Number),
800 maxSize: expect.any(Number),
801 workerNodes: expect.any(Number),
802 idleWorkerNodes: expect.any(Number),
803 busyWorkerNodes: expect.any(Number),
a4e07f72
JB
804 executedTasks: expect.any(Number),
805 executingTasks: expect.any(Number),
d46660cd 806 queuedTasks: expect.any(Number),
a4e07f72
JB
807 maxQueuedTasks: expect.any(Number),
808 failedTasks: expect.any(Number)
d46660cd 809 })
164d950a
JB
810 await pool.destroy()
811 })
812
cf597bc5 813 it("Verify that pool event emitter 'busy' event can register a callback", async () => {
7c0ba920
JB
814 const pool = new FixedThreadPool(
815 numberOfWorkers,
816 './tests/worker-files/thread/testWorker.js'
817 )
09c2d0d3 818 const promises = new Set()
7c0ba920 819 let poolBusy = 0
d46660cd
JB
820 let poolInfo
821 pool.emitter.on(PoolEvents.busy, info => {
822 ++poolBusy
823 poolInfo = info
824 })
7c0ba920 825 for (let i = 0; i < numberOfWorkers * 2; i++) {
f5d14e90 826 promises.add(pool.execute())
7c0ba920 827 }
cf597bc5 828 await Promise.all(promises)
14916bf9
JB
829 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
830 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
831 expect(poolBusy).toBe(numberOfWorkers + 1)
d46660cd 832 expect(poolInfo).toStrictEqual({
23ccf9d7 833 version,
d46660cd
JB
834 type: PoolTypes.fixed,
835 worker: WorkerTypes.thread,
2431bdb4
JB
836 ready: expect.any(Boolean),
837 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
d46660cd
JB
838 minSize: expect.any(Number),
839 maxSize: expect.any(Number),
840 workerNodes: expect.any(Number),
841 idleWorkerNodes: expect.any(Number),
842 busyWorkerNodes: expect.any(Number),
a4e07f72
JB
843 executedTasks: expect.any(Number),
844 executingTasks: expect.any(Number),
d46660cd 845 queuedTasks: expect.any(Number),
a4e07f72
JB
846 maxQueuedTasks: expect.any(Number),
847 failedTasks: expect.any(Number)
d46660cd 848 })
fd7ebd49 849 await pool.destroy()
7c0ba920 850 })
70a4f5ea
JB
851
852 it('Verify that multiple tasks worker is working', async () => {
853 const pool = new DynamicClusterPool(
2431bdb4 854 Math.floor(numberOfWorkers / 2),
70a4f5ea 855 numberOfWorkers,
70a4f5ea
JB
856 './tests/worker-files/cluster/testMultiTasksWorker.js'
857 )
858 const data = { n: 10 }
82888165 859 const result0 = await pool.execute(data)
30b963d4 860 expect(result0).toStrictEqual({ ok: 1 })
70a4f5ea 861 const result1 = await pool.execute(data, 'jsonIntegerSerialization')
30b963d4 862 expect(result1).toStrictEqual({ ok: 1 })
70a4f5ea
JB
863 const result2 = await pool.execute(data, 'factorial')
864 expect(result2).toBe(3628800)
865 const result3 = await pool.execute(data, 'fibonacci')
024daf59 866 expect(result3).toBe(55)
70a4f5ea 867 })
3ec964d6 868})