refactor: cleanup main worker checks
[poolifier.git] / tests / pools / abstract / abstract-pool.test.js
CommitLineData
85aeb3f3 1const { MessageChannel } = require('worker_threads')
a61a0724 2const { expect } = require('expect')
e843b904 3const {
70a4f5ea 4 DynamicClusterPool,
9e619829 5 DynamicThreadPool,
aee46736 6 FixedClusterPool,
e843b904 7 FixedThreadPool,
aee46736 8 PoolEvents,
184855e6 9 PoolTypes,
3d6dd312 10 WorkerChoiceStrategies,
184855e6 11 WorkerTypes
cdace0e5 12} = require('../../../lib')
78099a15 13const { CircularArray } = require('../../../lib/circular-array')
29ee7e9a 14const { Queue } = require('../../../lib/queue')
23ccf9d7 15const { version } = require('../../../package.json')
2431bdb4 16const { waitPoolEvents } = require('../../test-utils')
e1ffb94f
JB
17
18describe('Abstract pool test suite', () => {
fc027381 19 const numberOfWorkers = 2
a8884ffd 20 class StubPoolWithIsMain extends FixedThreadPool {
e1ffb94f
JB
21 isMain () {
22 return false
23 }
3ec964d6 24 }
3ec964d6 25
3ec964d6 26 it('Simulate pool creation from a non main thread/process', () => {
8d3782fa
JB
27 expect(
28 () =>
a8884ffd 29 new StubPoolWithIsMain(
7c0ba920 30 numberOfWorkers,
8d3782fa
JB
31 './tests/worker-files/thread/testWorker.js',
32 {
8ebe6c30 33 errorHandler: (e) => console.error(e)
8d3782fa
JB
34 }
35 )
d4aeae5a 36 ).toThrowError('Cannot start a pool from a worker!')
3ec964d6 37 })
c510fea7
APA
38
39 it('Verify that filePath is checked', () => {
292ad316
JB
40 const expectedError = new Error(
41 'Please specify a file with a worker implementation'
42 )
7c0ba920 43 expect(() => new FixedThreadPool(numberOfWorkers)).toThrowError(
292ad316 44 expectedError
8d3782fa 45 )
7c0ba920 46 expect(() => new FixedThreadPool(numberOfWorkers, '')).toThrowError(
292ad316 47 expectedError
8d3782fa 48 )
3d6dd312
JB
49 expect(() => new FixedThreadPool(numberOfWorkers, 0)).toThrowError(
50 expectedError
51 )
52 expect(() => new FixedThreadPool(numberOfWorkers, true)).toThrowError(
53 expectedError
54 )
55 expect(
56 () => new FixedThreadPool(numberOfWorkers, './dummyWorker.ts')
57 ).toThrowError(new Error("Cannot find the worker file './dummyWorker.ts'"))
8d3782fa
JB
58 })
59
60 it('Verify that numberOfWorkers is checked', () => {
61 expect(() => new FixedThreadPool()).toThrowError(
d4aeae5a 62 'Cannot instantiate a pool without specifying the number of workers'
8d3782fa
JB
63 )
64 })
65
66 it('Verify that a negative number of workers is checked', () => {
67 expect(
68 () =>
69 new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.js')
70 ).toThrowError(
473c717a
JB
71 new RangeError(
72 'Cannot instantiate a pool with a negative number of workers'
73 )
8d3782fa
JB
74 )
75 })
76
77 it('Verify that a non integer number of workers is checked', () => {
78 expect(
79 () =>
80 new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.js')
81 ).toThrowError(
473c717a 82 new TypeError(
0d80593b 83 'Cannot instantiate a pool with a non safe integer number of workers'
8d3782fa
JB
84 )
85 )
c510fea7 86 })
7c0ba920 87
216541b6 88 it('Verify that dynamic pool sizing is checked', () => {
a5ed75b7
JB
89 expect(
90 () =>
91 new DynamicClusterPool(
92 1,
93 undefined,
94 './tests/worker-files/cluster/testWorker.js'
95 )
96 ).toThrowError(
97 new TypeError(
98 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
99 )
100 )
2761efb4
JB
101 expect(
102 () =>
103 new DynamicThreadPool(
104 0.5,
105 1,
106 './tests/worker-files/thread/testWorker.js'
107 )
108 ).toThrowError(
109 new TypeError(
110 'Cannot instantiate a pool with a non safe integer number of workers'
111 )
112 )
113 expect(
114 () =>
115 new DynamicClusterPool(
116 0,
117 0.5,
a5ed75b7 118 './tests/worker-files/cluster/testWorker.js'
2761efb4
JB
119 )
120 ).toThrowError(
121 new TypeError(
122 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
123 )
124 )
2431bdb4
JB
125 expect(
126 () =>
127 new DynamicThreadPool(2, 1, './tests/worker-files/thread/testWorker.js')
128 ).toThrowError(
129 new RangeError(
130 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
131 )
132 )
133 expect(
134 () =>
2761efb4
JB
135 new DynamicClusterPool(
136 1,
137 1,
a5ed75b7 138 './tests/worker-files/cluster/testWorker.js'
2761efb4 139 )
2431bdb4
JB
140 ).toThrowError(
141 new RangeError(
142 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
143 )
144 )
21f710aa
JB
145 expect(
146 () =>
147 new DynamicThreadPool(0, 0, './tests/worker-files/thread/testWorker.js')
148 ).toThrowError(
149 new RangeError(
d640b48b 150 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
21f710aa
JB
151 )
152 )
2431bdb4
JB
153 })
154
fd7ebd49 155 it('Verify that pool options are checked', async () => {
7c0ba920
JB
156 let pool = new FixedThreadPool(
157 numberOfWorkers,
158 './tests/worker-files/thread/testWorker.js'
159 )
7c0ba920 160 expect(pool.emitter).toBeDefined()
1f68cede
JB
161 expect(pool.opts.enableEvents).toBe(true)
162 expect(pool.opts.restartWorkerOnError).toBe(true)
ff733df7 163 expect(pool.opts.enableTasksQueue).toBe(false)
d4aeae5a 164 expect(pool.opts.tasksQueueOptions).toBeUndefined()
e843b904
JB
165 expect(pool.opts.workerChoiceStrategy).toBe(
166 WorkerChoiceStrategies.ROUND_ROBIN
167 )
da309861 168 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
932fc8be 169 runTime: { median: false },
5df69fab
JB
170 waitTime: { median: false },
171 elu: { median: false }
da309861 172 })
35cf1c03
JB
173 expect(pool.opts.messageHandler).toBeUndefined()
174 expect(pool.opts.errorHandler).toBeUndefined()
175 expect(pool.opts.onlineHandler).toBeUndefined()
176 expect(pool.opts.exitHandler).toBeUndefined()
fd7ebd49 177 await pool.destroy()
73bfd59d 178 const testHandler = () => console.info('test handler executed')
7c0ba920
JB
179 pool = new FixedThreadPool(
180 numberOfWorkers,
181 './tests/worker-files/thread/testWorker.js',
182 {
e4543b14 183 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
49be33fe 184 workerChoiceStrategyOptions: {
932fc8be 185 runTime: { median: true },
fc027381 186 weights: { 0: 300, 1: 200 }
49be33fe 187 },
35cf1c03 188 enableEvents: false,
1f68cede 189 restartWorkerOnError: false,
ff733df7 190 enableTasksQueue: true,
d4aeae5a 191 tasksQueueOptions: { concurrency: 2 },
35cf1c03
JB
192 messageHandler: testHandler,
193 errorHandler: testHandler,
194 onlineHandler: testHandler,
195 exitHandler: testHandler
7c0ba920
JB
196 }
197 )
7c0ba920 198 expect(pool.emitter).toBeUndefined()
1f68cede
JB
199 expect(pool.opts.enableEvents).toBe(false)
200 expect(pool.opts.restartWorkerOnError).toBe(false)
ff733df7 201 expect(pool.opts.enableTasksQueue).toBe(true)
d4aeae5a 202 expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 2 })
e843b904 203 expect(pool.opts.workerChoiceStrategy).toBe(
e4543b14 204 WorkerChoiceStrategies.LEAST_USED
e843b904 205 )
da309861 206 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
932fc8be 207 runTime: { median: true },
fc027381 208 weights: { 0: 300, 1: 200 }
da309861 209 })
35cf1c03
JB
210 expect(pool.opts.messageHandler).toStrictEqual(testHandler)
211 expect(pool.opts.errorHandler).toStrictEqual(testHandler)
212 expect(pool.opts.onlineHandler).toStrictEqual(testHandler)
213 expect(pool.opts.exitHandler).toStrictEqual(testHandler)
fd7ebd49 214 await pool.destroy()
7c0ba920
JB
215 })
216
a20f0ba5 217 it('Verify that pool options are validated', async () => {
d4aeae5a
JB
218 expect(
219 () =>
220 new FixedThreadPool(
221 numberOfWorkers,
222 './tests/worker-files/thread/testWorker.js',
223 {
f0d7f803 224 workerChoiceStrategy: 'invalidStrategy'
d4aeae5a
JB
225 }
226 )
f0d7f803 227 ).toThrowError("Invalid worker choice strategy 'invalidStrategy'")
d4aeae5a
JB
228 expect(
229 () =>
230 new FixedThreadPool(
231 numberOfWorkers,
232 './tests/worker-files/thread/testWorker.js',
233 {
f0d7f803 234 workerChoiceStrategyOptions: 'invalidOptions'
d4aeae5a
JB
235 }
236 )
f0d7f803
JB
237 ).toThrowError(
238 'Invalid worker choice strategy options: must be a plain object'
239 )
49be33fe
JB
240 expect(
241 () =>
242 new FixedThreadPool(
243 numberOfWorkers,
244 './tests/worker-files/thread/testWorker.js',
245 {
246 workerChoiceStrategyOptions: { weights: {} }
247 }
248 )
249 ).toThrowError(
250 'Invalid worker choice strategy options: must have a weight for each worker node'
251 )
f0d7f803
JB
252 expect(
253 () =>
254 new FixedThreadPool(
255 numberOfWorkers,
256 './tests/worker-files/thread/testWorker.js',
257 {
258 workerChoiceStrategyOptions: { measurement: 'invalidMeasurement' }
259 }
260 )
261 ).toThrowError(
262 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
263 )
264 expect(
265 () =>
266 new FixedThreadPool(
267 numberOfWorkers,
268 './tests/worker-files/thread/testWorker.js',
269 {
270 enableTasksQueue: true,
271 tasksQueueOptions: { concurrency: 0 }
272 }
273 )
274 ).toThrowError("Invalid worker tasks concurrency '0'")
275 expect(
276 () =>
277 new FixedThreadPool(
278 numberOfWorkers,
279 './tests/worker-files/thread/testWorker.js',
280 {
281 enableTasksQueue: true,
282 tasksQueueOptions: 'invalidTasksQueueOptions'
283 }
284 )
285 ).toThrowError('Invalid tasks queue options: must be a plain object')
286 expect(
287 () =>
288 new FixedThreadPool(
289 numberOfWorkers,
290 './tests/worker-files/thread/testWorker.js',
291 {
292 enableTasksQueue: true,
293 tasksQueueOptions: { concurrency: 0.2 }
294 }
295 )
296 ).toThrowError('Invalid worker tasks concurrency: must be an integer')
d4aeae5a
JB
297 })
298
2431bdb4 299 it('Verify that pool worker choice strategy options can be set', async () => {
a20f0ba5
JB
300 const pool = new FixedThreadPool(
301 numberOfWorkers,
302 './tests/worker-files/thread/testWorker.js',
303 { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
304 )
305 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
932fc8be 306 runTime: { median: false },
5df69fab
JB
307 waitTime: { median: false },
308 elu: { median: false }
a20f0ba5
JB
309 })
310 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
311 .workerChoiceStrategies) {
86bf340d 312 expect(workerChoiceStrategy.opts).toStrictEqual({
932fc8be 313 runTime: { median: false },
5df69fab
JB
314 waitTime: { median: false },
315 elu: { median: false }
86bf340d 316 })
a20f0ba5 317 }
87de9ff5
JB
318 expect(
319 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
320 ).toStrictEqual({
932fc8be
JB
321 runTime: {
322 aggregate: true,
323 average: true,
324 median: false
325 },
326 waitTime: {
327 aggregate: false,
328 average: false,
329 median: false
330 },
5df69fab 331 elu: {
9adcefab
JB
332 aggregate: true,
333 average: true,
5df69fab
JB
334 median: false
335 }
86bf340d 336 })
9adcefab
JB
337 pool.setWorkerChoiceStrategyOptions({
338 runTime: { median: true },
339 elu: { median: true }
340 })
a20f0ba5 341 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
9adcefab
JB
342 runTime: { median: true },
343 elu: { median: true }
a20f0ba5
JB
344 })
345 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
346 .workerChoiceStrategies) {
932fc8be 347 expect(workerChoiceStrategy.opts).toStrictEqual({
9adcefab
JB
348 runTime: { median: true },
349 elu: { median: true }
932fc8be 350 })
a20f0ba5 351 }
87de9ff5
JB
352 expect(
353 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
354 ).toStrictEqual({
932fc8be
JB
355 runTime: {
356 aggregate: true,
357 average: false,
358 median: true
359 },
360 waitTime: {
361 aggregate: false,
362 average: false,
363 median: false
364 },
5df69fab 365 elu: {
9adcefab 366 aggregate: true,
5df69fab 367 average: false,
9adcefab 368 median: true
5df69fab 369 }
86bf340d 370 })
9adcefab
JB
371 pool.setWorkerChoiceStrategyOptions({
372 runTime: { median: false },
373 elu: { median: false }
374 })
a20f0ba5 375 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
9adcefab
JB
376 runTime: { median: false },
377 elu: { median: false }
a20f0ba5
JB
378 })
379 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
380 .workerChoiceStrategies) {
932fc8be 381 expect(workerChoiceStrategy.opts).toStrictEqual({
9adcefab
JB
382 runTime: { median: false },
383 elu: { median: false }
932fc8be 384 })
a20f0ba5 385 }
87de9ff5
JB
386 expect(
387 pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
388 ).toStrictEqual({
932fc8be
JB
389 runTime: {
390 aggregate: true,
391 average: true,
392 median: false
393 },
394 waitTime: {
395 aggregate: false,
396 average: false,
397 median: false
398 },
5df69fab 399 elu: {
9adcefab
JB
400 aggregate: true,
401 average: true,
5df69fab
JB
402 median: false
403 }
86bf340d 404 })
1f95d544
JB
405 expect(() =>
406 pool.setWorkerChoiceStrategyOptions('invalidWorkerChoiceStrategyOptions')
407 ).toThrowError(
408 'Invalid worker choice strategy options: must be a plain object'
409 )
410 expect(() =>
411 pool.setWorkerChoiceStrategyOptions({ weights: {} })
412 ).toThrowError(
413 'Invalid worker choice strategy options: must have a weight for each worker node'
414 )
415 expect(() =>
416 pool.setWorkerChoiceStrategyOptions({ measurement: 'invalidMeasurement' })
417 ).toThrowError(
418 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
419 )
a20f0ba5
JB
420 await pool.destroy()
421 })
422
2431bdb4 423 it('Verify that pool tasks queue can be enabled/disabled', async () => {
a20f0ba5
JB
424 const pool = new FixedThreadPool(
425 numberOfWorkers,
426 './tests/worker-files/thread/testWorker.js'
427 )
428 expect(pool.opts.enableTasksQueue).toBe(false)
429 expect(pool.opts.tasksQueueOptions).toBeUndefined()
430 pool.enableTasksQueue(true)
431 expect(pool.opts.enableTasksQueue).toBe(true)
432 expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 1 })
433 pool.enableTasksQueue(true, { concurrency: 2 })
434 expect(pool.opts.enableTasksQueue).toBe(true)
435 expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 2 })
436 pool.enableTasksQueue(false)
437 expect(pool.opts.enableTasksQueue).toBe(false)
438 expect(pool.opts.tasksQueueOptions).toBeUndefined()
439 await pool.destroy()
440 })
441
2431bdb4 442 it('Verify that pool tasks queue options can be set', async () => {
a20f0ba5
JB
443 const pool = new FixedThreadPool(
444 numberOfWorkers,
445 './tests/worker-files/thread/testWorker.js',
446 { enableTasksQueue: true }
447 )
448 expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 1 })
449 pool.setTasksQueueOptions({ concurrency: 2 })
450 expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 2 })
f0d7f803
JB
451 expect(() =>
452 pool.setTasksQueueOptions('invalidTasksQueueOptions')
453 ).toThrowError('Invalid tasks queue options: must be a plain object')
a20f0ba5
JB
454 expect(() => pool.setTasksQueueOptions({ concurrency: 0 })).toThrowError(
455 "Invalid worker tasks concurrency '0'"
456 )
f0d7f803
JB
457 expect(() => pool.setTasksQueueOptions({ concurrency: 0.2 })).toThrowError(
458 'Invalid worker tasks concurrency: must be an integer'
459 )
a20f0ba5
JB
460 await pool.destroy()
461 })
462
6b27d407
JB
463 it('Verify that pool info is set', async () => {
464 let pool = new FixedThreadPool(
465 numberOfWorkers,
466 './tests/worker-files/thread/testWorker.js'
467 )
2dca6cab
JB
468 expect(pool.info).toStrictEqual({
469 version,
470 type: PoolTypes.fixed,
471 worker: WorkerTypes.thread,
472 ready: true,
473 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
474 minSize: numberOfWorkers,
475 maxSize: numberOfWorkers,
476 workerNodes: numberOfWorkers,
477 idleWorkerNodes: numberOfWorkers,
478 busyWorkerNodes: 0,
479 executedTasks: 0,
480 executingTasks: 0,
2dca6cab
JB
481 failedTasks: 0
482 })
6b27d407
JB
483 await pool.destroy()
484 pool = new DynamicClusterPool(
2431bdb4 485 Math.floor(numberOfWorkers / 2),
6b27d407 486 numberOfWorkers,
ecdfbdc0 487 './tests/worker-files/cluster/testWorker.js'
6b27d407 488 )
2dca6cab
JB
489 expect(pool.info).toStrictEqual({
490 version,
491 type: PoolTypes.dynamic,
492 worker: WorkerTypes.cluster,
493 ready: true,
494 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
495 minSize: Math.floor(numberOfWorkers / 2),
496 maxSize: numberOfWorkers,
497 workerNodes: Math.floor(numberOfWorkers / 2),
498 idleWorkerNodes: Math.floor(numberOfWorkers / 2),
499 busyWorkerNodes: 0,
500 executedTasks: 0,
501 executingTasks: 0,
2dca6cab
JB
502 failedTasks: 0
503 })
6b27d407
JB
504 await pool.destroy()
505 })
506
2431bdb4 507 it('Verify that pool worker tasks usage are initialized', async () => {
bf9549ae
JB
508 const pool = new FixedClusterPool(
509 numberOfWorkers,
510 './tests/worker-files/cluster/testWorker.js'
511 )
f06e48d8 512 for (const workerNode of pool.workerNodes) {
465b2940 513 expect(workerNode.usage).toStrictEqual({
a4e07f72
JB
514 tasks: {
515 executed: 0,
516 executing: 0,
517 queued: 0,
df593701 518 maxQueued: 0,
a4e07f72
JB
519 failed: 0
520 },
521 runTime: {
a4e07f72
JB
522 history: expect.any(CircularArray)
523 },
524 waitTime: {
a4e07f72
JB
525 history: expect.any(CircularArray)
526 },
5df69fab
JB
527 elu: {
528 idle: {
5df69fab
JB
529 history: expect.any(CircularArray)
530 },
531 active: {
5df69fab 532 history: expect.any(CircularArray)
f7510105 533 }
5df69fab 534 }
86bf340d 535 })
f06e48d8
JB
536 }
537 await pool.destroy()
538 })
539
2431bdb4
JB
540 it('Verify that pool worker tasks queue are initialized', async () => {
541 let pool = new FixedClusterPool(
f06e48d8
JB
542 numberOfWorkers,
543 './tests/worker-files/cluster/testWorker.js'
544 )
545 for (const workerNode of pool.workerNodes) {
546 expect(workerNode.tasksQueue).toBeDefined()
29ee7e9a 547 expect(workerNode.tasksQueue).toBeInstanceOf(Queue)
4d8bf9e4 548 expect(workerNode.tasksQueue.size).toBe(0)
9c16fb4b 549 expect(workerNode.tasksQueue.maxSize).toBe(0)
bf9549ae 550 }
fd7ebd49 551 await pool.destroy()
2431bdb4
JB
552 pool = new DynamicThreadPool(
553 Math.floor(numberOfWorkers / 2),
554 numberOfWorkers,
555 './tests/worker-files/thread/testWorker.js'
556 )
557 for (const workerNode of pool.workerNodes) {
558 expect(workerNode.tasksQueue).toBeDefined()
559 expect(workerNode.tasksQueue).toBeInstanceOf(Queue)
560 expect(workerNode.tasksQueue.size).toBe(0)
561 expect(workerNode.tasksQueue.maxSize).toBe(0)
562 }
563 })
564
565 it('Verify that pool worker info are initialized', async () => {
566 let pool = new FixedClusterPool(
567 numberOfWorkers,
568 './tests/worker-files/cluster/testWorker.js'
569 )
2dca6cab
JB
570 for (const workerNode of pool.workerNodes) {
571 expect(workerNode.info).toStrictEqual({
572 id: expect.any(Number),
573 type: WorkerTypes.cluster,
574 dynamic: false,
575 ready: true
576 })
577 }
2431bdb4
JB
578 await pool.destroy()
579 pool = new DynamicThreadPool(
580 Math.floor(numberOfWorkers / 2),
581 numberOfWorkers,
582 './tests/worker-files/thread/testWorker.js'
583 )
2dca6cab
JB
584 for (const workerNode of pool.workerNodes) {
585 expect(workerNode.info).toStrictEqual({
586 id: expect.any(Number),
587 type: WorkerTypes.thread,
588 dynamic: false,
85aeb3f3
JB
589 ready: true,
590 messageChannel: expect.any(MessageChannel)
2dca6cab
JB
591 })
592 }
bf9549ae
JB
593 })
594
2431bdb4 595 it('Verify that pool worker tasks usage are computed', async () => {
bf9549ae
JB
596 const pool = new FixedClusterPool(
597 numberOfWorkers,
598 './tests/worker-files/cluster/testWorker.js'
599 )
09c2d0d3 600 const promises = new Set()
fc027381
JB
601 const maxMultiplier = 2
602 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
09c2d0d3 603 promises.add(pool.execute())
bf9549ae 604 }
f06e48d8 605 for (const workerNode of pool.workerNodes) {
465b2940 606 expect(workerNode.usage).toStrictEqual({
a4e07f72
JB
607 tasks: {
608 executed: 0,
609 executing: maxMultiplier,
610 queued: 0,
df593701 611 maxQueued: 0,
a4e07f72
JB
612 failed: 0
613 },
614 runTime: {
a4e07f72
JB
615 history: expect.any(CircularArray)
616 },
617 waitTime: {
a4e07f72
JB
618 history: expect.any(CircularArray)
619 },
5df69fab
JB
620 elu: {
621 idle: {
5df69fab
JB
622 history: expect.any(CircularArray)
623 },
624 active: {
5df69fab 625 history: expect.any(CircularArray)
f7510105 626 }
5df69fab 627 }
86bf340d 628 })
bf9549ae
JB
629 }
630 await Promise.all(promises)
f06e48d8 631 for (const workerNode of pool.workerNodes) {
465b2940 632 expect(workerNode.usage).toStrictEqual({
a4e07f72
JB
633 tasks: {
634 executed: maxMultiplier,
635 executing: 0,
636 queued: 0,
df593701 637 maxQueued: 0,
a4e07f72
JB
638 failed: 0
639 },
640 runTime: {
a4e07f72
JB
641 history: expect.any(CircularArray)
642 },
643 waitTime: {
a4e07f72
JB
644 history: expect.any(CircularArray)
645 },
5df69fab
JB
646 elu: {
647 idle: {
5df69fab
JB
648 history: expect.any(CircularArray)
649 },
650 active: {
5df69fab 651 history: expect.any(CircularArray)
f7510105 652 }
5df69fab 653 }
86bf340d 654 })
bf9549ae 655 }
fd7ebd49 656 await pool.destroy()
bf9549ae
JB
657 })
658
2431bdb4 659 it('Verify that pool worker tasks usage are reset at worker choice strategy change', async () => {
7fd82a1c 660 const pool = new DynamicThreadPool(
2431bdb4 661 Math.floor(numberOfWorkers / 2),
8f4878b7 662 numberOfWorkers,
9e619829
JB
663 './tests/worker-files/thread/testWorker.js'
664 )
09c2d0d3 665 const promises = new Set()
ee9f5295
JB
666 const maxMultiplier = 2
667 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
09c2d0d3 668 promises.add(pool.execute())
9e619829
JB
669 }
670 await Promise.all(promises)
f06e48d8 671 for (const workerNode of pool.workerNodes) {
465b2940 672 expect(workerNode.usage).toStrictEqual({
a4e07f72
JB
673 tasks: {
674 executed: expect.any(Number),
675 executing: 0,
676 queued: 0,
df593701 677 maxQueued: 0,
a4e07f72
JB
678 failed: 0
679 },
680 runTime: {
a4e07f72
JB
681 history: expect.any(CircularArray)
682 },
683 waitTime: {
a4e07f72
JB
684 history: expect.any(CircularArray)
685 },
5df69fab
JB
686 elu: {
687 idle: {
5df69fab
JB
688 history: expect.any(CircularArray)
689 },
690 active: {
5df69fab 691 history: expect.any(CircularArray)
f7510105 692 }
5df69fab 693 }
86bf340d 694 })
465b2940 695 expect(workerNode.usage.tasks.executed).toBeGreaterThan(0)
b56c2ee5 696 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(maxMultiplier)
b97d82d8
JB
697 expect(workerNode.usage.runTime.history.length).toBe(0)
698 expect(workerNode.usage.waitTime.history.length).toBe(0)
699 expect(workerNode.usage.elu.idle.history.length).toBe(0)
700 expect(workerNode.usage.elu.active.history.length).toBe(0)
9e619829
JB
701 }
702 pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.FAIR_SHARE)
f06e48d8 703 for (const workerNode of pool.workerNodes) {
465b2940 704 expect(workerNode.usage).toStrictEqual({
a4e07f72
JB
705 tasks: {
706 executed: 0,
707 executing: 0,
708 queued: 0,
df593701 709 maxQueued: 0,
a4e07f72
JB
710 failed: 0
711 },
712 runTime: {
a4e07f72
JB
713 history: expect.any(CircularArray)
714 },
715 waitTime: {
a4e07f72
JB
716 history: expect.any(CircularArray)
717 },
5df69fab
JB
718 elu: {
719 idle: {
5df69fab
JB
720 history: expect.any(CircularArray)
721 },
722 active: {
5df69fab 723 history: expect.any(CircularArray)
f7510105 724 }
5df69fab 725 }
86bf340d 726 })
465b2940
JB
727 expect(workerNode.usage.runTime.history.length).toBe(0)
728 expect(workerNode.usage.waitTime.history.length).toBe(0)
b97d82d8
JB
729 expect(workerNode.usage.elu.idle.history.length).toBe(0)
730 expect(workerNode.usage.elu.active.history.length).toBe(0)
ee11a4a2 731 }
fd7ebd49 732 await pool.destroy()
ee11a4a2
JB
733 })
734
164d950a
JB
735 it("Verify that pool event emitter 'full' event can register a callback", async () => {
736 const pool = new DynamicThreadPool(
2431bdb4 737 Math.floor(numberOfWorkers / 2),
164d950a
JB
738 numberOfWorkers,
739 './tests/worker-files/thread/testWorker.js'
740 )
09c2d0d3 741 const promises = new Set()
164d950a 742 let poolFull = 0
d46660cd 743 let poolInfo
8ebe6c30 744 pool.emitter.on(PoolEvents.full, (info) => {
d46660cd
JB
745 ++poolFull
746 poolInfo = info
747 })
164d950a 748 for (let i = 0; i < numberOfWorkers * 2; i++) {
f5d14e90 749 promises.add(pool.execute())
164d950a
JB
750 }
751 await Promise.all(promises)
2431bdb4
JB
752 // The `full` event is triggered when the number of submitted tasks at once reach the maximum number of workers in the dynamic pool.
753 // So in total numberOfWorkers * 2 - 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the dynamic pool with min = (max = numberOfWorkers) / 2.
754 expect(poolFull).toBe(numberOfWorkers * 2 - 1)
d46660cd 755 expect(poolInfo).toStrictEqual({
23ccf9d7 756 version,
d46660cd
JB
757 type: PoolTypes.dynamic,
758 worker: WorkerTypes.thread,
2431bdb4
JB
759 ready: expect.any(Boolean),
760 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
761 minSize: expect.any(Number),
762 maxSize: expect.any(Number),
763 workerNodes: expect.any(Number),
764 idleWorkerNodes: expect.any(Number),
765 busyWorkerNodes: expect.any(Number),
766 executedTasks: expect.any(Number),
767 executingTasks: expect.any(Number),
2431bdb4
JB
768 failedTasks: expect.any(Number)
769 })
770 await pool.destroy()
771 })
772
773 it("Verify that pool event emitter 'ready' event can register a callback", async () => {
d5024c00
JB
774 const pool = new DynamicClusterPool(
775 Math.floor(numberOfWorkers / 2),
2431bdb4
JB
776 numberOfWorkers,
777 './tests/worker-files/cluster/testWorker.js'
778 )
2431bdb4 779 let poolInfo
cc3ab78b 780 let poolReady = 0
8ebe6c30 781 pool.emitter.on(PoolEvents.ready, (info) => {
2431bdb4
JB
782 ++poolReady
783 poolInfo = info
784 })
785 await waitPoolEvents(pool, PoolEvents.ready, 1)
786 expect(poolReady).toBe(1)
787 expect(poolInfo).toStrictEqual({
788 version,
d5024c00 789 type: PoolTypes.dynamic,
2431bdb4
JB
790 worker: WorkerTypes.cluster,
791 ready: true,
792 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
d46660cd
JB
793 minSize: expect.any(Number),
794 maxSize: expect.any(Number),
795 workerNodes: expect.any(Number),
796 idleWorkerNodes: expect.any(Number),
797 busyWorkerNodes: expect.any(Number),
a4e07f72
JB
798 executedTasks: expect.any(Number),
799 executingTasks: expect.any(Number),
a4e07f72 800 failedTasks: expect.any(Number)
d46660cd 801 })
164d950a
JB
802 await pool.destroy()
803 })
804
cf597bc5 805 it("Verify that pool event emitter 'busy' event can register a callback", async () => {
7c0ba920
JB
806 const pool = new FixedThreadPool(
807 numberOfWorkers,
808 './tests/worker-files/thread/testWorker.js'
809 )
09c2d0d3 810 const promises = new Set()
7c0ba920 811 let poolBusy = 0
d46660cd 812 let poolInfo
8ebe6c30 813 pool.emitter.on(PoolEvents.busy, (info) => {
d46660cd
JB
814 ++poolBusy
815 poolInfo = info
816 })
7c0ba920 817 for (let i = 0; i < numberOfWorkers * 2; i++) {
f5d14e90 818 promises.add(pool.execute())
7c0ba920 819 }
cf597bc5 820 await Promise.all(promises)
14916bf9
JB
821 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
822 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
823 expect(poolBusy).toBe(numberOfWorkers + 1)
d46660cd 824 expect(poolInfo).toStrictEqual({
23ccf9d7 825 version,
d46660cd
JB
826 type: PoolTypes.fixed,
827 worker: WorkerTypes.thread,
2431bdb4
JB
828 ready: expect.any(Boolean),
829 strategy: WorkerChoiceStrategies.ROUND_ROBIN,
d46660cd
JB
830 minSize: expect.any(Number),
831 maxSize: expect.any(Number),
832 workerNodes: expect.any(Number),
833 idleWorkerNodes: expect.any(Number),
834 busyWorkerNodes: expect.any(Number),
a4e07f72
JB
835 executedTasks: expect.any(Number),
836 executingTasks: expect.any(Number),
a4e07f72 837 failedTasks: expect.any(Number)
d46660cd 838 })
fd7ebd49 839 await pool.destroy()
7c0ba920 840 })
70a4f5ea
JB
841
842 it('Verify that multiple tasks worker is working', async () => {
843 const pool = new DynamicClusterPool(
2431bdb4 844 Math.floor(numberOfWorkers / 2),
70a4f5ea 845 numberOfWorkers,
70a4f5ea
JB
846 './tests/worker-files/cluster/testMultiTasksWorker.js'
847 )
848 const data = { n: 10 }
82888165 849 const result0 = await pool.execute(data)
30b963d4 850 expect(result0).toStrictEqual({ ok: 1 })
70a4f5ea 851 const result1 = await pool.execute(data, 'jsonIntegerSerialization')
30b963d4 852 expect(result1).toStrictEqual({ ok: 1 })
70a4f5ea
JB
853 const result2 = await pool.execute(data, 'factorial')
854 expect(result2).toBe(3628800)
855 const result3 = await pool.execute(data, 'fibonacci')
024daf59 856 expect(result3).toBe(55)
70a4f5ea 857 })
3ec964d6 858})