1 // eslint-disable-next-line n/no-unsupported-features/node-builtins
2 import { createHook, executionAsyncId } from 'node:async_hooks'
3 import { EventEmitterAsyncResource } from 'node:events'
4 import { readFileSync } from 'node:fs'
5 import { dirname, join } from 'node:path'
6 import { fileURLToPath } from 'node:url'
8 import { expect } from 'expect'
9 import { restore, stub } from 'sinon'
11 import { CircularBuffer } from '../../lib/circular-buffer.cjs'
19 WorkerChoiceStrategies,
21 } from '../../lib/index.cjs'
22 import { WorkerNode } from '../../lib/pools/worker-node.cjs'
23 import { PriorityQueue } from '../../lib/priority-queue.cjs'
24 import { DEFAULT_TASK_NAME } from '../../lib/utils.cjs'
25 import { waitPoolEvents } from '../test-utils.cjs'
27 describe('Abstract pool test suite', () => {
28 const version = JSON.parse(
30 join(dirname(fileURLToPath(import.meta.url)), '../..', 'package.json'),
34 const numberOfWorkers = 2
35 class StubPoolWithIsMain extends FixedThreadPool {
45 it('Verify that pool can be created and destroyed', async () => {
46 const pool = new FixedThreadPool(
48 './tests/worker-files/thread/testWorker.mjs'
50 expect(pool).toBeInstanceOf(FixedThreadPool)
54 it('Verify that pool cannot be created from a non main thread/process', () => {
57 new StubPoolWithIsMain(
59 './tests/worker-files/thread/testWorker.mjs',
61 errorHandler: e => console.error(e)
66 'Cannot start a pool from a worker with the same type as the pool'
71 it('Verify that pool statuses properties are set', async () => {
72 const pool = new FixedThreadPool(
74 './tests/worker-files/thread/testWorker.mjs'
76 expect(pool.started).toBe(true)
77 expect(pool.starting).toBe(false)
78 expect(pool.destroying).toBe(false)
82 it('Verify that filePath is checked', () => {
83 expect(() => new FixedThreadPool(numberOfWorkers)).toThrow(
84 new TypeError('The worker file path must be specified')
86 expect(() => new FixedThreadPool(numberOfWorkers, 0)).toThrow(
87 new TypeError('The worker file path must be a string')
90 () => new FixedThreadPool(numberOfWorkers, './dummyWorker.ts')
91 ).toThrow(new Error("Cannot find the worker file './dummyWorker.ts'"))
94 it('Verify that numberOfWorkers is checked', () => {
99 './tests/worker-files/thread/testWorker.mjs'
103 'Cannot instantiate a pool without specifying the number of workers'
108 it('Verify that a negative number of workers is checked', () => {
111 new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.cjs')
114 'Cannot instantiate a pool with a negative number of workers'
119 it('Verify that a non integer number of workers is checked', () => {
122 new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.mjs')
125 'Cannot instantiate a pool with a non safe integer number of workers'
130 it('Verify that pool arguments number and pool type are checked', () => {
135 './tests/worker-files/thread/testWorker.mjs',
141 'Cannot instantiate a fixed pool with a maximum number of workers specified at initialization'
146 it('Verify that dynamic pool sizing is checked', () => {
149 new DynamicClusterPool(
152 './tests/worker-files/cluster/testWorker.cjs'
156 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
161 new DynamicThreadPool(
164 './tests/worker-files/thread/testWorker.mjs'
168 'Cannot instantiate a pool with a non safe integer number of workers'
173 new DynamicClusterPool(
176 './tests/worker-files/cluster/testWorker.cjs'
180 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
185 new DynamicThreadPool(
188 './tests/worker-files/thread/testWorker.mjs'
192 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
197 new DynamicThreadPool(
200 './tests/worker-files/thread/testWorker.mjs'
204 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
209 new DynamicClusterPool(
212 './tests/worker-files/cluster/testWorker.cjs'
216 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
221 it('Verify that pool options are checked', async () => {
222 let pool = new FixedThreadPool(
224 './tests/worker-files/thread/testWorker.mjs'
226 expect(pool.emitter).toBeInstanceOf(EventEmitterAsyncResource)
227 expect(pool.emitter.eventNames()).toStrictEqual([])
228 expect(pool.opts).toStrictEqual({
231 restartWorkerOnError: true,
232 enableTasksQueue: false,
233 workerChoiceStrategy: WorkerChoiceStrategies.ROUND_ROBIN
235 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategiesContext
236 .workerChoiceStrategies) {
237 expect(workerChoiceStrategy.opts).toStrictEqual({
238 runTime: { median: false },
239 waitTime: { median: false },
240 elu: { median: false },
241 weights: expect.objectContaining({
242 0: expect.any(Number),
243 [pool.info.maxSize - 1]: expect.any(Number)
248 const testHandler = () => console.info('test handler executed')
249 pool = new FixedThreadPool(
251 './tests/worker-files/thread/testWorker.mjs',
253 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
254 workerChoiceStrategyOptions: {
255 runTime: { median: true },
256 weights: { 0: 300, 1: 200 }
259 restartWorkerOnError: false,
260 enableTasksQueue: true,
261 tasksQueueOptions: { concurrency: 2 },
262 messageHandler: testHandler,
263 errorHandler: testHandler,
264 onlineHandler: testHandler,
265 exitHandler: testHandler
268 expect(pool.emitter).toBeUndefined()
269 expect(pool.opts).toStrictEqual({
272 restartWorkerOnError: false,
273 enableTasksQueue: true,
276 size: Math.pow(numberOfWorkers, 2),
278 tasksStealingOnBackPressure: false,
279 tasksFinishedTimeout: 2000
281 workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
282 workerChoiceStrategyOptions: {
283 runTime: { median: true },
284 weights: { 0: 300, 1: 200 }
286 onlineHandler: testHandler,
287 messageHandler: testHandler,
288 errorHandler: testHandler,
289 exitHandler: testHandler
291 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategiesContext
292 .workerChoiceStrategies) {
293 expect(workerChoiceStrategy.opts).toStrictEqual({
294 runTime: { median: true },
295 waitTime: { median: false },
296 elu: { median: false },
297 weights: { 0: 300, 1: 200 }
303 it('Verify that pool options are validated', () => {
308 './tests/worker-files/thread/testWorker.mjs',
310 workerChoiceStrategy: 'invalidStrategy'
313 ).toThrow(new Error("Invalid worker choice strategy 'invalidStrategy'"))
318 './tests/worker-files/thread/testWorker.mjs',
320 workerChoiceStrategyOptions: { weights: {} }
325 'Invalid worker choice strategy options: must have a weight for each worker node'
332 './tests/worker-files/thread/testWorker.mjs',
334 workerChoiceStrategyOptions: { measurement: 'invalidMeasurement' }
339 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
346 './tests/worker-files/thread/testWorker.mjs',
348 enableTasksQueue: true,
349 tasksQueueOptions: 'invalidTasksQueueOptions'
353 new TypeError('Invalid tasks queue options: must be a plain object')
359 './tests/worker-files/thread/testWorker.mjs',
361 enableTasksQueue: true,
362 tasksQueueOptions: { concurrency: 0 }
367 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
374 './tests/worker-files/thread/testWorker.mjs',
376 enableTasksQueue: true,
377 tasksQueueOptions: { concurrency: -1 }
382 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
389 './tests/worker-files/thread/testWorker.mjs',
391 enableTasksQueue: true,
392 tasksQueueOptions: { concurrency: 0.2 }
396 new TypeError('Invalid worker node tasks concurrency: must be an integer')
402 './tests/worker-files/thread/testWorker.mjs',
404 enableTasksQueue: true,
405 tasksQueueOptions: { size: 0 }
410 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
417 './tests/worker-files/thread/testWorker.mjs',
419 enableTasksQueue: true,
420 tasksQueueOptions: { size: -1 }
425 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
432 './tests/worker-files/thread/testWorker.mjs',
434 enableTasksQueue: true,
435 tasksQueueOptions: { size: 0.2 }
439 new TypeError('Invalid worker node tasks queue size: must be an integer')
443 it('Verify that pool worker choice strategy options can be set', async () => {
444 const pool = new FixedThreadPool(
446 './tests/worker-files/thread/testWorker.mjs',
447 { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
449 expect(pool.opts.workerChoiceStrategyOptions).toBeUndefined()
450 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategiesContext
451 .workerChoiceStrategies) {
452 expect(workerChoiceStrategy.opts).toStrictEqual({
453 runTime: { median: false },
454 waitTime: { median: false },
455 elu: { median: false },
456 weights: expect.objectContaining({
457 0: expect.any(Number),
458 [pool.info.maxSize - 1]: expect.any(Number)
463 pool.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
481 pool.setWorkerChoiceStrategyOptions({
482 runTime: { median: true },
483 elu: { median: true }
485 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
486 runTime: { median: true },
487 elu: { median: true }
489 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategiesContext
490 .workerChoiceStrategies) {
491 expect(workerChoiceStrategy.opts).toStrictEqual({
492 runTime: { median: true },
493 waitTime: { median: false },
494 elu: { median: true },
495 weights: expect.objectContaining({
496 0: expect.any(Number),
497 [pool.info.maxSize - 1]: expect.any(Number)
502 pool.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
520 pool.setWorkerChoiceStrategyOptions({
521 runTime: { median: false },
522 elu: { median: false }
524 expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
525 runTime: { median: false },
526 elu: { median: false }
528 for (const [, workerChoiceStrategy] of pool.workerChoiceStrategiesContext
529 .workerChoiceStrategies) {
530 expect(workerChoiceStrategy.opts).toStrictEqual({
531 runTime: { median: false },
532 waitTime: { median: false },
533 elu: { median: false },
534 weights: expect.objectContaining({
535 0: expect.any(Number),
536 [pool.info.maxSize - 1]: expect.any(Number)
541 pool.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
560 pool.setWorkerChoiceStrategyOptions('invalidWorkerChoiceStrategyOptions')
563 'Invalid worker choice strategy options: must be a plain object'
566 expect(() => pool.setWorkerChoiceStrategyOptions({ weights: {} })).toThrow(
568 'Invalid worker choice strategy options: must have a weight for each worker node'
572 pool.setWorkerChoiceStrategyOptions({ measurement: 'invalidMeasurement' })
575 "Invalid worker choice strategy options: invalid measurement 'invalidMeasurement'"
581 it('Verify that pool tasks queue can be enabled/disabled', async () => {
582 const pool = new FixedThreadPool(
584 './tests/worker-files/thread/testWorker.mjs'
586 expect(pool.opts.enableTasksQueue).toBe(false)
587 expect(pool.opts.tasksQueueOptions).toBeUndefined()
588 pool.enableTasksQueue(true)
589 expect(pool.opts.enableTasksQueue).toBe(true)
590 expect(pool.opts.tasksQueueOptions).toStrictEqual({
592 size: Math.pow(numberOfWorkers, 2),
594 tasksStealingOnBackPressure: false,
595 tasksFinishedTimeout: 2000
597 pool.enableTasksQueue(true, { concurrency: 2 })
598 expect(pool.opts.enableTasksQueue).toBe(true)
599 expect(pool.opts.tasksQueueOptions).toStrictEqual({
601 size: Math.pow(numberOfWorkers, 2),
603 tasksStealingOnBackPressure: false,
604 tasksFinishedTimeout: 2000
606 pool.enableTasksQueue(false)
607 expect(pool.opts.enableTasksQueue).toBe(false)
608 expect(pool.opts.tasksQueueOptions).toBeUndefined()
612 it('Verify that pool tasks queue options can be set', async () => {
613 const pool = new FixedThreadPool(
615 './tests/worker-files/thread/testWorker.mjs',
616 { enableTasksQueue: true }
618 expect(pool.opts.tasksQueueOptions).toStrictEqual({
620 size: Math.pow(numberOfWorkers, 2),
622 tasksStealingOnBackPressure: false,
623 tasksFinishedTimeout: 2000
625 for (const workerNode of pool.workerNodes) {
626 expect(workerNode.tasksQueueBackPressureSize).toBe(
627 pool.opts.tasksQueueOptions.size
630 pool.setTasksQueueOptions({
634 tasksStealingOnBackPressure: false,
635 tasksFinishedTimeout: 3000
637 expect(pool.opts.tasksQueueOptions).toStrictEqual({
641 tasksStealingOnBackPressure: false,
642 tasksFinishedTimeout: 3000
644 for (const workerNode of pool.workerNodes) {
645 expect(workerNode.tasksQueueBackPressureSize).toBe(
646 pool.opts.tasksQueueOptions.size
649 pool.setTasksQueueOptions({
652 tasksStealingOnBackPressure: true
654 expect(pool.opts.tasksQueueOptions).toStrictEqual({
656 size: Math.pow(numberOfWorkers, 2),
658 tasksStealingOnBackPressure: true,
659 tasksFinishedTimeout: 2000
661 for (const workerNode of pool.workerNodes) {
662 expect(workerNode.tasksQueueBackPressureSize).toBe(
663 pool.opts.tasksQueueOptions.size
666 expect(() => pool.setTasksQueueOptions('invalidTasksQueueOptions')).toThrow(
667 new TypeError('Invalid tasks queue options: must be a plain object')
669 expect(() => pool.setTasksQueueOptions({ concurrency: 0 })).toThrow(
671 'Invalid worker node tasks concurrency: 0 is a negative integer or zero'
674 expect(() => pool.setTasksQueueOptions({ concurrency: -1 })).toThrow(
676 'Invalid worker node tasks concurrency: -1 is a negative integer or zero'
679 expect(() => pool.setTasksQueueOptions({ concurrency: 0.2 })).toThrow(
680 new TypeError('Invalid worker node tasks concurrency: must be an integer')
682 expect(() => pool.setTasksQueueOptions({ size: 0 })).toThrow(
684 'Invalid worker node tasks queue size: 0 is a negative integer or zero'
687 expect(() => pool.setTasksQueueOptions({ size: -1 })).toThrow(
689 'Invalid worker node tasks queue size: -1 is a negative integer or zero'
692 expect(() => pool.setTasksQueueOptions({ size: 0.2 })).toThrow(
693 new TypeError('Invalid worker node tasks queue size: must be an integer')
698 it('Verify that pool info is set', async () => {
699 let pool = new FixedThreadPool(
701 './tests/worker-files/thread/testWorker.mjs'
703 expect(pool.info).toStrictEqual({
705 type: PoolTypes.fixed,
706 worker: WorkerTypes.thread,
709 defaultStrategy: WorkerChoiceStrategies.ROUND_ROBIN,
711 minSize: numberOfWorkers,
712 maxSize: numberOfWorkers,
713 workerNodes: numberOfWorkers,
714 idleWorkerNodes: numberOfWorkers,
721 pool = new DynamicClusterPool(
722 Math.floor(numberOfWorkers / 2),
724 './tests/worker-files/cluster/testWorker.cjs'
726 expect(pool.info).toStrictEqual({
728 type: PoolTypes.dynamic,
729 worker: WorkerTypes.cluster,
732 defaultStrategy: WorkerChoiceStrategies.ROUND_ROBIN,
734 minSize: Math.floor(numberOfWorkers / 2),
735 maxSize: numberOfWorkers,
736 workerNodes: Math.floor(numberOfWorkers / 2),
737 idleWorkerNodes: Math.floor(numberOfWorkers / 2),
746 it('Verify that pool worker tasks usage are initialized', async () => {
747 const pool = new FixedClusterPool(
749 './tests/worker-files/cluster/testWorker.cjs'
751 for (const workerNode of pool.workerNodes) {
752 expect(workerNode).toBeInstanceOf(WorkerNode)
753 expect(workerNode.usage).toStrictEqual({
759 sequentiallyStolen: 0,
764 history: expect.any(CircularBuffer)
767 history: expect.any(CircularBuffer)
771 history: expect.any(CircularBuffer)
774 history: expect.any(CircularBuffer)
782 it('Verify that pool worker tasks queue are initialized', async () => {
783 let pool = new FixedClusterPool(
785 './tests/worker-files/cluster/testWorker.cjs'
787 for (const workerNode of pool.workerNodes) {
788 expect(workerNode).toBeInstanceOf(WorkerNode)
789 expect(workerNode.tasksQueue).toBeInstanceOf(PriorityQueue)
790 expect(workerNode.tasksQueue.size).toBe(0)
791 expect(workerNode.tasksQueue.maxSize).toBe(0)
792 expect(workerNode.tasksQueue.bucketSize).toBe(numberOfWorkers * 2)
795 pool = new DynamicThreadPool(
796 Math.floor(numberOfWorkers / 2),
798 './tests/worker-files/thread/testWorker.mjs'
800 for (const workerNode of pool.workerNodes) {
801 expect(workerNode).toBeInstanceOf(WorkerNode)
802 expect(workerNode.tasksQueue).toBeInstanceOf(PriorityQueue)
803 expect(workerNode.tasksQueue.size).toBe(0)
804 expect(workerNode.tasksQueue.maxSize).toBe(0)
805 expect(workerNode.tasksQueue.bucketSize).toBe(numberOfWorkers * 2)
810 it('Verify that pool worker info are initialized', async () => {
811 let pool = new FixedClusterPool(
813 './tests/worker-files/cluster/testWorker.cjs'
815 for (const workerNode of pool.workerNodes) {
816 expect(workerNode).toBeInstanceOf(WorkerNode)
817 expect(workerNode.info).toStrictEqual({
818 id: expect.any(Number),
819 type: WorkerTypes.cluster,
827 pool = new DynamicThreadPool(
828 Math.floor(numberOfWorkers / 2),
830 './tests/worker-files/thread/testWorker.mjs'
832 for (const workerNode of pool.workerNodes) {
833 expect(workerNode).toBeInstanceOf(WorkerNode)
834 expect(workerNode.info).toStrictEqual({
835 id: expect.any(Number),
836 type: WorkerTypes.thread,
846 it('Verify that pool statuses are checked at start or destroy', async () => {
847 const pool = new FixedThreadPool(
849 './tests/worker-files/thread/testWorker.mjs'
851 expect(pool.info.started).toBe(true)
852 expect(pool.info.ready).toBe(true)
853 expect(() => pool.start()).toThrow(
854 new Error('Cannot start an already started pool')
857 expect(pool.info.started).toBe(false)
858 expect(pool.info.ready).toBe(false)
859 await expect(pool.destroy()).rejects.toThrow(
860 new Error('Cannot destroy an already destroyed pool')
864 it('Verify that pool can be started after initialization', async () => {
865 const pool = new FixedClusterPool(
867 './tests/worker-files/cluster/testWorker.cjs',
872 expect(pool.info.started).toBe(false)
873 expect(pool.info.ready).toBe(false)
874 expect(pool.workerNodes).toStrictEqual([])
875 expect(pool.readyEventEmitted).toBe(false)
876 await expect(pool.execute()).rejects.toThrow(
877 new Error('Cannot execute a task on not started pool')
880 expect(pool.info.started).toBe(true)
881 expect(pool.info.ready).toBe(true)
882 await waitPoolEvents(pool, PoolEvents.ready, 1)
883 expect(pool.readyEventEmitted).toBe(true)
884 expect(pool.workerNodes.length).toBe(numberOfWorkers)
885 for (const workerNode of pool.workerNodes) {
886 expect(workerNode).toBeInstanceOf(WorkerNode)
891 it('Verify that pool execute() arguments are checked', async () => {
892 const pool = new FixedClusterPool(
894 './tests/worker-files/cluster/testWorker.cjs'
896 await expect(pool.execute(undefined, 0)).rejects.toThrow(
897 new TypeError('name argument must be a string')
899 await expect(pool.execute(undefined, '')).rejects.toThrow(
900 new TypeError('name argument must not be an empty string')
902 await expect(pool.execute(undefined, undefined, {})).rejects.toThrow(
903 new TypeError('transferList argument must be an array')
905 await expect(pool.execute(undefined, 'unknown')).rejects.toBe(
906 "Task function 'unknown' not found"
909 await expect(pool.execute()).rejects.toThrow(
910 new Error('Cannot execute a task on not started pool')
914 it('Verify that pool worker tasks usage are computed', async () => {
915 const pool = new FixedClusterPool(
917 './tests/worker-files/cluster/testWorker.cjs'
919 const promises = new Set()
920 const maxMultiplier = 2
921 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
922 promises.add(pool.execute())
924 for (const workerNode of pool.workerNodes) {
925 expect(workerNode.usage).toStrictEqual({
928 executing: maxMultiplier,
931 sequentiallyStolen: 0,
936 history: expect.any(CircularBuffer)
939 history: expect.any(CircularBuffer)
943 history: expect.any(CircularBuffer)
946 history: expect.any(CircularBuffer)
951 await Promise.all(promises)
952 for (const workerNode of pool.workerNodes) {
953 expect(workerNode.usage).toStrictEqual({
955 executed: maxMultiplier,
959 sequentiallyStolen: 0,
964 history: expect.any(CircularBuffer)
967 history: expect.any(CircularBuffer)
971 history: expect.any(CircularBuffer)
974 history: expect.any(CircularBuffer)
982 it("Verify that pool worker tasks usage aren't reset at worker choice strategy change", async () => {
983 const pool = new DynamicThreadPool(
984 Math.floor(numberOfWorkers / 2),
986 './tests/worker-files/thread/testWorker.mjs'
988 const promises = new Set()
989 const maxMultiplier = 2
990 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
991 promises.add(pool.execute())
993 await Promise.all(promises)
994 for (const workerNode of pool.workerNodes) {
995 expect(workerNode.usage).toStrictEqual({
997 executed: expect.any(Number),
1001 sequentiallyStolen: 0,
1006 history: expect.any(CircularBuffer)
1009 history: expect.any(CircularBuffer)
1013 history: expect.any(CircularBuffer)
1016 history: expect.any(CircularBuffer)
1020 expect(workerNode.usage.tasks.executed).toBeGreaterThan(0)
1021 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1022 numberOfWorkers * maxMultiplier
1025 pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.FAIR_SHARE)
1026 for (const workerNode of pool.workerNodes) {
1027 expect(workerNode.usage).toStrictEqual({
1029 executed: expect.any(Number),
1033 sequentiallyStolen: 0,
1038 history: expect.any(CircularBuffer)
1041 history: expect.any(CircularBuffer)
1045 history: expect.any(CircularBuffer)
1048 history: expect.any(CircularBuffer)
1052 expect(workerNode.usage.tasks.executed).toBeGreaterThan(0)
1053 expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
1054 numberOfWorkers * maxMultiplier
1057 await pool.destroy()
1060 it("Verify that pool event emitter 'ready' event can register a callback", async () => {
1061 const pool = new DynamicClusterPool(
1062 Math.floor(numberOfWorkers / 2),
1064 './tests/worker-files/cluster/testWorker.cjs'
1066 expect(pool.emitter.eventNames()).toStrictEqual([])
1069 pool.emitter.on(PoolEvents.ready, info => {
1073 await waitPoolEvents(pool, PoolEvents.ready, 1)
1074 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.ready])
1075 expect(poolReady).toBe(1)
1076 expect(poolInfo).toStrictEqual({
1078 type: PoolTypes.dynamic,
1079 worker: WorkerTypes.cluster,
1082 defaultStrategy: WorkerChoiceStrategies.ROUND_ROBIN,
1083 strategyRetries: expect.any(Number),
1084 minSize: expect.any(Number),
1085 maxSize: expect.any(Number),
1086 workerNodes: expect.any(Number),
1087 idleWorkerNodes: expect.any(Number),
1088 busyWorkerNodes: expect.any(Number),
1089 executedTasks: expect.any(Number),
1090 executingTasks: expect.any(Number),
1091 failedTasks: expect.any(Number)
1093 await pool.destroy()
1096 it("Verify that pool event emitter 'busy' event can register a callback", async () => {
1097 const pool = new FixedThreadPool(
1099 './tests/worker-files/thread/testWorker.mjs'
1101 expect(pool.emitter.eventNames()).toStrictEqual([])
1102 const promises = new Set()
1105 pool.emitter.on(PoolEvents.busy, info => {
1109 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.busy])
1110 for (let i = 0; i < numberOfWorkers * 2; i++) {
1111 promises.add(pool.execute())
1113 await Promise.all(promises)
1114 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
1115 // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
1116 expect(poolBusy).toBe(numberOfWorkers + 1)
1117 expect(poolInfo).toStrictEqual({
1119 type: PoolTypes.fixed,
1120 worker: WorkerTypes.thread,
1123 defaultStrategy: WorkerChoiceStrategies.ROUND_ROBIN,
1124 strategyRetries: expect.any(Number),
1125 minSize: expect.any(Number),
1126 maxSize: expect.any(Number),
1127 workerNodes: expect.any(Number),
1128 idleWorkerNodes: expect.any(Number),
1129 busyWorkerNodes: expect.any(Number),
1130 executedTasks: expect.any(Number),
1131 executingTasks: expect.any(Number),
1132 failedTasks: expect.any(Number)
1134 await pool.destroy()
1137 it("Verify that pool event emitter 'full' event can register a callback", async () => {
1138 const pool = new DynamicThreadPool(
1139 Math.floor(numberOfWorkers / 2),
1141 './tests/worker-files/thread/testWorker.mjs'
1143 expect(pool.emitter.eventNames()).toStrictEqual([])
1144 const promises = new Set()
1147 pool.emitter.on(PoolEvents.full, info => {
1151 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.full])
1152 for (let i = 0; i < numberOfWorkers * 2; i++) {
1153 promises.add(pool.execute())
1155 await Promise.all(promises)
1156 expect(poolFull).toBe(1)
1157 expect(poolInfo).toStrictEqual({
1159 type: PoolTypes.dynamic,
1160 worker: WorkerTypes.thread,
1163 defaultStrategy: WorkerChoiceStrategies.ROUND_ROBIN,
1164 strategyRetries: expect.any(Number),
1165 minSize: expect.any(Number),
1166 maxSize: expect.any(Number),
1167 workerNodes: expect.any(Number),
1168 idleWorkerNodes: expect.any(Number),
1169 busyWorkerNodes: expect.any(Number),
1170 executedTasks: expect.any(Number),
1171 executingTasks: expect.any(Number),
1172 failedTasks: expect.any(Number)
1174 await pool.destroy()
1177 it("Verify that pool event emitter 'backPressure' event can register a callback", async () => {
1178 const pool = new FixedThreadPool(
1180 './tests/worker-files/thread/testWorker.mjs',
1182 enableTasksQueue: true
1185 stub(pool, 'hasBackPressure').returns(true)
1186 expect(pool.emitter.eventNames()).toStrictEqual([])
1187 const promises = new Set()
1188 let poolBackPressure = 0
1190 pool.emitter.on(PoolEvents.backPressure, info => {
1194 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.backPressure])
1195 for (let i = 0; i < numberOfWorkers + 1; i++) {
1196 promises.add(pool.execute())
1198 await Promise.all(promises)
1199 expect(poolBackPressure).toBe(1)
1200 expect(poolInfo).toStrictEqual({
1202 type: PoolTypes.fixed,
1203 worker: WorkerTypes.thread,
1206 defaultStrategy: WorkerChoiceStrategies.ROUND_ROBIN,
1207 strategyRetries: expect.any(Number),
1208 minSize: expect.any(Number),
1209 maxSize: expect.any(Number),
1210 workerNodes: expect.any(Number),
1211 idleWorkerNodes: expect.any(Number),
1212 stealingWorkerNodes: expect.any(Number),
1213 busyWorkerNodes: expect.any(Number),
1214 executedTasks: expect.any(Number),
1215 executingTasks: expect.any(Number),
1216 maxQueuedTasks: expect.any(Number),
1217 queuedTasks: expect.any(Number),
1219 stolenTasks: expect.any(Number),
1220 failedTasks: expect.any(Number)
1222 expect(pool.hasBackPressure.callCount).toBeGreaterThanOrEqual(7)
1223 await pool.destroy()
1226 it('Verify that destroy() waits for queued tasks to finish', async () => {
1227 const tasksFinishedTimeout = 2500
1228 const pool = new FixedThreadPool(
1230 './tests/worker-files/thread/asyncWorker.mjs',
1232 enableTasksQueue: true,
1233 tasksQueueOptions: { tasksFinishedTimeout }
1236 const maxMultiplier = 4
1237 let tasksFinished = 0
1238 for (const workerNode of pool.workerNodes) {
1239 workerNode.on('taskFinished', () => {
1243 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
1246 expect(pool.info.queuedTasks).toBeGreaterThan(0)
1247 const startTime = performance.now()
1248 await pool.destroy()
1249 const elapsedTime = performance.now() - startTime
1250 expect(tasksFinished).toBeLessThanOrEqual(numberOfWorkers * maxMultiplier)
1251 expect(elapsedTime).toBeGreaterThanOrEqual(2000)
1252 expect(elapsedTime).toBeLessThanOrEqual(tasksFinishedTimeout + 800)
1255 it('Verify that destroy() waits until the tasks finished timeout is reached', async () => {
1256 const tasksFinishedTimeout = 1000
1257 const pool = new FixedThreadPool(
1259 './tests/worker-files/thread/asyncWorker.mjs',
1261 enableTasksQueue: true,
1262 tasksQueueOptions: { tasksFinishedTimeout }
1265 const maxMultiplier = 4
1266 let tasksFinished = 0
1267 for (const workerNode of pool.workerNodes) {
1268 workerNode.on('taskFinished', () => {
1272 for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
1275 expect(pool.info.queuedTasks).toBeGreaterThan(0)
1276 const startTime = performance.now()
1277 await pool.destroy()
1278 const elapsedTime = performance.now() - startTime
1279 expect(tasksFinished).toBe(0)
1280 expect(elapsedTime).toBeLessThanOrEqual(tasksFinishedTimeout + 800)
1283 it('Verify that pool asynchronous resource track tasks execution', async () => {
1288 let resolveCalls = 0
1289 const hook = createHook({
1290 init (asyncId, type) {
1291 if (type === 'poolifier:task') {
1293 taskAsyncId = asyncId
1297 if (asyncId === taskAsyncId) beforeCalls++
1300 if (asyncId === taskAsyncId) afterCalls++
1303 if (executionAsyncId() === taskAsyncId) resolveCalls++
1306 const pool = new FixedThreadPool(
1308 './tests/worker-files/thread/testWorker.mjs'
1311 await pool.execute()
1313 expect(initCalls).toBe(1)
1314 expect(beforeCalls).toBe(1)
1315 expect(afterCalls).toBe(1)
1316 expect(resolveCalls).toBe(1)
1317 await pool.destroy()
1320 it('Verify that hasTaskFunction() is working', async () => {
1321 const dynamicThreadPool = new DynamicThreadPool(
1322 Math.floor(numberOfWorkers / 2),
1324 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
1326 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1327 expect(dynamicThreadPool.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true)
1328 expect(dynamicThreadPool.hasTaskFunction('jsonIntegerSerialization')).toBe(
1331 expect(dynamicThreadPool.hasTaskFunction('factorial')).toBe(true)
1332 expect(dynamicThreadPool.hasTaskFunction('fibonacci')).toBe(true)
1333 expect(dynamicThreadPool.hasTaskFunction('unknown')).toBe(false)
1334 await dynamicThreadPool.destroy()
1335 const fixedClusterPool = new FixedClusterPool(
1337 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.cjs'
1339 await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
1340 expect(fixedClusterPool.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true)
1341 expect(fixedClusterPool.hasTaskFunction('jsonIntegerSerialization')).toBe(
1344 expect(fixedClusterPool.hasTaskFunction('factorial')).toBe(true)
1345 expect(fixedClusterPool.hasTaskFunction('fibonacci')).toBe(true)
1346 expect(fixedClusterPool.hasTaskFunction('unknown')).toBe(false)
1347 await fixedClusterPool.destroy()
1350 it('Verify that addTaskFunction() is working', async () => {
1351 const dynamicThreadPool = new DynamicThreadPool(
1352 Math.floor(numberOfWorkers / 2),
1354 './tests/worker-files/thread/testWorker.mjs'
1356 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1358 dynamicThreadPool.addTaskFunction(0, () => {})
1359 ).rejects.toThrow(new TypeError('name argument must be a string'))
1361 dynamicThreadPool.addTaskFunction('', () => {})
1363 new TypeError('name argument must not be an empty string')
1365 await expect(dynamicThreadPool.addTaskFunction('test', 0)).rejects.toThrow(
1366 new TypeError('taskFunction property must be a function')
1368 await expect(dynamicThreadPool.addTaskFunction('test', '')).rejects.toThrow(
1369 new TypeError('taskFunction property must be a function')
1372 dynamicThreadPool.addTaskFunction('test', { taskFunction: 0 })
1373 ).rejects.toThrow(new TypeError('taskFunction property must be a function'))
1375 dynamicThreadPool.addTaskFunction('test', { taskFunction: '' })
1376 ).rejects.toThrow(new TypeError('taskFunction property must be a function'))
1378 dynamicThreadPool.addTaskFunction('test', {
1379 taskFunction: () => {},
1383 new RangeError("Property 'priority' must be between -20 and 19")
1386 dynamicThreadPool.addTaskFunction('test', {
1387 taskFunction: () => {},
1391 new RangeError("Property 'priority' must be between -20 and 19")
1394 dynamicThreadPool.addTaskFunction('test', {
1395 taskFunction: () => {},
1396 strategy: 'invalidStrategy'
1399 new Error("Invalid worker choice strategy 'invalidStrategy'")
1401 expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([
1402 { name: DEFAULT_TASK_NAME },
1406 ...dynamicThreadPool.workerChoiceStrategiesContext.workerChoiceStrategies.keys()
1407 ]).toStrictEqual([WorkerChoiceStrategies.ROUND_ROBIN])
1408 const echoTaskFunction = data => {
1412 dynamicThreadPool.addTaskFunction('echo', {
1413 taskFunction: echoTaskFunction,
1414 strategy: WorkerChoiceStrategies.LEAST_ELU
1416 ).resolves.toBe(true)
1417 expect(dynamicThreadPool.taskFunctions.size).toBe(1)
1418 expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual({
1419 taskFunction: echoTaskFunction,
1420 strategy: WorkerChoiceStrategies.LEAST_ELU
1423 ...dynamicThreadPool.workerChoiceStrategiesContext.workerChoiceStrategies.keys()
1425 WorkerChoiceStrategies.ROUND_ROBIN,
1426 WorkerChoiceStrategies.LEAST_ELU
1428 expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([
1429 { name: DEFAULT_TASK_NAME },
1431 { name: 'echo', strategy: WorkerChoiceStrategies.LEAST_ELU }
1433 const taskFunctionData = { test: 'test' }
1434 const echoResult = await dynamicThreadPool.execute(taskFunctionData, 'echo')
1435 expect(echoResult).toStrictEqual(taskFunctionData)
1436 for (const workerNode of dynamicThreadPool.workerNodes) {
1437 expect(workerNode.getTaskFunctionWorkerUsage('echo')).toStrictEqual({
1439 executed: expect.any(Number),
1442 sequentiallyStolen: 0,
1447 history: expect.any(CircularBuffer)
1450 history: expect.any(CircularBuffer)
1452 elu: expect.objectContaining({
1453 idle: expect.objectContaining({
1454 history: expect.any(CircularBuffer)
1456 active: expect.objectContaining({
1457 history: expect.any(CircularBuffer)
1462 workerNode.getTaskFunctionWorkerUsage('echo').tasks.executed
1463 ).toBeGreaterThan(0)
1465 workerNode.getTaskFunctionWorkerUsage('echo').elu.active.aggregate ==
1469 workerNode.getTaskFunctionWorkerUsage('echo').elu.active.aggregate
1473 workerNode.getTaskFunctionWorkerUsage('echo').elu.active.aggregate
1474 ).toBeGreaterThan(0)
1477 workerNode.getTaskFunctionWorkerUsage('echo').elu.idle.aggregate == null
1480 workerNode.getTaskFunctionWorkerUsage('echo').elu.idle.aggregate
1484 workerNode.getTaskFunctionWorkerUsage('echo').elu.idle.aggregate
1485 ).toBeGreaterThanOrEqual(0)
1488 workerNode.getTaskFunctionWorkerUsage('echo').elu.utilization == null
1491 workerNode.getTaskFunctionWorkerUsage('echo').elu.utilization
1495 workerNode.getTaskFunctionWorkerUsage('echo').elu.utilization
1496 ).toBeGreaterThanOrEqual(0)
1498 workerNode.getTaskFunctionWorkerUsage('echo').elu.utilization
1499 ).toBeLessThanOrEqual(1)
1502 await dynamicThreadPool.destroy()
1505 it('Verify that removeTaskFunction() is working', async () => {
1506 const dynamicThreadPool = new DynamicThreadPool(
1507 Math.floor(numberOfWorkers / 2),
1509 './tests/worker-files/thread/testWorker.mjs'
1511 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1512 expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([
1513 { name: DEFAULT_TASK_NAME },
1516 await expect(dynamicThreadPool.removeTaskFunction('test')).rejects.toThrow(
1517 new Error('Cannot remove a task function not handled on the pool side')
1519 const echoTaskFunction = data => {
1522 await dynamicThreadPool.addTaskFunction('echo', {
1523 taskFunction: echoTaskFunction,
1524 strategy: WorkerChoiceStrategies.LEAST_ELU
1526 expect(dynamicThreadPool.taskFunctions.size).toBe(1)
1527 expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual({
1528 taskFunction: echoTaskFunction,
1529 strategy: WorkerChoiceStrategies.LEAST_ELU
1532 ...dynamicThreadPool.workerChoiceStrategiesContext.workerChoiceStrategies.keys()
1534 WorkerChoiceStrategies.ROUND_ROBIN,
1535 WorkerChoiceStrategies.LEAST_ELU
1537 expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([
1538 { name: DEFAULT_TASK_NAME },
1540 { name: 'echo', strategy: WorkerChoiceStrategies.LEAST_ELU }
1542 await expect(dynamicThreadPool.removeTaskFunction('echo')).resolves.toBe(
1545 expect(dynamicThreadPool.taskFunctions.size).toBe(0)
1546 expect(dynamicThreadPool.taskFunctions.get('echo')).toBeUndefined()
1548 ...dynamicThreadPool.workerChoiceStrategiesContext.workerChoiceStrategies.keys()
1549 ]).toStrictEqual([WorkerChoiceStrategies.ROUND_ROBIN])
1550 expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([
1551 { name: DEFAULT_TASK_NAME },
1554 await dynamicThreadPool.destroy()
1557 it('Verify that listTaskFunctionsProperties() is working', async () => {
1558 const dynamicThreadPool = new DynamicThreadPool(
1559 Math.floor(numberOfWorkers / 2),
1561 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
1563 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1564 expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([
1565 { name: DEFAULT_TASK_NAME },
1566 { name: 'jsonIntegerSerialization' },
1567 { name: 'factorial' },
1568 { name: 'fibonacci' }
1570 await dynamicThreadPool.destroy()
1571 const fixedClusterPool = new FixedClusterPool(
1573 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.cjs'
1575 await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
1576 expect(fixedClusterPool.listTaskFunctionsProperties()).toStrictEqual([
1577 { name: DEFAULT_TASK_NAME },
1578 { name: 'jsonIntegerSerialization' },
1579 { name: 'factorial' },
1580 { name: 'fibonacci' }
1582 await fixedClusterPool.destroy()
1585 it('Verify that setDefaultTaskFunction() is working', async () => {
1586 const dynamicThreadPool = new DynamicThreadPool(
1587 Math.floor(numberOfWorkers / 2),
1589 './tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
1591 await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
1592 const workerId = dynamicThreadPool.workerNodes[0].info.id
1593 await expect(dynamicThreadPool.setDefaultTaskFunction(0)).rejects.toThrow(
1595 `Task function operation 'default' failed on worker ${workerId} with error: 'TypeError: name parameter is not a string'`
1599 dynamicThreadPool.setDefaultTaskFunction(DEFAULT_TASK_NAME)
1602 `Task function operation 'default' failed on worker ${workerId} with error: 'Error: Cannot set the default task function reserved name as the default task function'`
1606 dynamicThreadPool.setDefaultTaskFunction('unknown')
1609 `Task function operation 'default' failed on worker ${workerId} with error: 'Error: Cannot set the default task function to a non-existing task function'`
1612 expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([
1613 { name: DEFAULT_TASK_NAME },
1614 { name: 'jsonIntegerSerialization' },
1615 { name: 'factorial' },
1616 { name: 'fibonacci' }
1619 dynamicThreadPool.setDefaultTaskFunction('factorial')
1620 ).resolves.toBe(true)
1621 expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([
1622 { name: DEFAULT_TASK_NAME },
1623 { name: 'factorial' },
1624 { name: 'jsonIntegerSerialization' },
1625 { name: 'fibonacci' }
1628 dynamicThreadPool.setDefaultTaskFunction('fibonacci')
1629 ).resolves.toBe(true)
1630 expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([
1631 { name: DEFAULT_TASK_NAME },
1632 { name: 'fibonacci' },
1633 { name: 'jsonIntegerSerialization' },
1634 { name: 'factorial' }
1636 await dynamicThreadPool.destroy()
1639 it('Verify that multiple task functions worker is working', async () => {
1640 const pool = new DynamicClusterPool(
1641 Math.floor(numberOfWorkers / 2),
1643 './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.cjs'
1645 const data = { n: 10 }
1646 const result0 = await pool.execute(data)
1647 expect(result0).toStrictEqual({ ok: 1 })
1648 const result1 = await pool.execute(data, 'jsonIntegerSerialization')
1649 expect(result1).toStrictEqual({ ok: 1 })
1650 const result2 = await pool.execute(data, 'factorial')
1651 expect(result2).toBe(3628800)
1652 const result3 = await pool.execute(data, 'fibonacci')
1653 expect(result3).toBe(55)
1654 expect(pool.info.executingTasks).toBe(0)
1655 expect(pool.info.executedTasks).toBe(4)
1656 for (const workerNode of pool.workerNodes) {
1657 expect(workerNode.info.taskFunctionsProperties).toStrictEqual([
1658 { name: DEFAULT_TASK_NAME },
1659 { name: 'jsonIntegerSerialization' },
1660 { name: 'factorial' },
1661 { name: 'fibonacci' }
1663 expect(workerNode.taskFunctionsUsage.size).toBe(3)
1664 expect(workerNode.usage.tasks.executed).toBeGreaterThan(0)
1665 for (const taskFunctionProperties of pool.listTaskFunctionsProperties()) {
1667 workerNode.getTaskFunctionWorkerUsage(taskFunctionProperties.name)
1670 executed: expect.any(Number),
1674 sequentiallyStolen: 0,
1678 history: expect.any(CircularBuffer)
1681 history: expect.any(CircularBuffer)
1685 history: expect.any(CircularBuffer)
1688 history: expect.any(CircularBuffer)
1693 workerNode.getTaskFunctionWorkerUsage(taskFunctionProperties.name)
1695 ).toBeGreaterThan(0)
1698 workerNode.getTaskFunctionWorkerUsage(DEFAULT_TASK_NAME)
1700 workerNode.getTaskFunctionWorkerUsage(
1701 workerNode.info.taskFunctionsProperties[1].name
1705 await pool.destroy()
1708 it('Verify that task function objects worker is working', async () => {
1709 const pool = new DynamicThreadPool(
1710 Math.floor(numberOfWorkers / 2),
1712 './tests/worker-files/thread/testTaskFunctionObjectsWorker.mjs'
1714 const data = { n: 10 }
1715 const result0 = await pool.execute(data)
1716 expect(result0).toStrictEqual({ ok: 1 })
1717 const result1 = await pool.execute(data, 'jsonIntegerSerialization')
1718 expect(result1).toStrictEqual({ ok: 1 })
1719 const result2 = await pool.execute(data, 'factorial')
1720 expect(result2).toBe(3628800)
1721 const result3 = await pool.execute(data, 'fibonacci')
1722 expect(result3).toBe(55)
1723 expect(pool.info.executingTasks).toBe(0)
1724 expect(pool.info.executedTasks).toBe(4)
1725 for (const workerNode of pool.workerNodes) {
1726 expect(workerNode.info.taskFunctionsProperties).toStrictEqual([
1727 { name: DEFAULT_TASK_NAME },
1728 { name: 'jsonIntegerSerialization' },
1729 { name: 'factorial' },
1730 { name: 'fibonacci' }
1732 expect(workerNode.taskFunctionsUsage.size).toBe(3)
1733 expect(workerNode.usage.tasks.executed).toBeGreaterThan(0)
1734 for (const taskFunctionProperties of pool.listTaskFunctionsProperties()) {
1736 workerNode.getTaskFunctionWorkerUsage(taskFunctionProperties.name)
1739 executed: expect.any(Number),
1743 sequentiallyStolen: 0,
1747 history: expect.any(CircularBuffer)
1750 history: expect.any(CircularBuffer)
1754 history: expect.any(CircularBuffer)
1757 history: expect.any(CircularBuffer)
1762 workerNode.getTaskFunctionWorkerUsage(taskFunctionProperties.name)
1764 ).toBeGreaterThan(0)
1767 workerNode.getTaskFunctionWorkerUsage(DEFAULT_TASK_NAME)
1769 workerNode.getTaskFunctionWorkerUsage(
1770 workerNode.info.taskFunctionsProperties[1].name
1774 await pool.destroy()
1777 it('Verify sendKillMessageToWorker()', async () => {
1778 const pool = new DynamicClusterPool(
1779 Math.floor(numberOfWorkers / 2),
1781 './tests/worker-files/cluster/testWorker.cjs'
1783 const workerNodeKey = 0
1785 pool.sendKillMessageToWorker(workerNodeKey)
1786 ).resolves.toBeUndefined()
1787 await pool.destroy()
1790 it('Verify sendTaskFunctionOperationToWorker()', async () => {
1791 const pool = new DynamicClusterPool(
1792 Math.floor(numberOfWorkers / 2),
1794 './tests/worker-files/cluster/testWorker.cjs'
1796 const workerNodeKey = 0
1798 pool.sendTaskFunctionOperationToWorker(workerNodeKey, {
1799 taskFunctionOperation: 'add',
1800 taskFunctionProperties: { name: 'empty' },
1801 taskFunction: (() => {}).toString()
1803 ).resolves.toBe(true)
1805 pool.workerNodes[workerNodeKey].info.taskFunctionsProperties
1807 { name: DEFAULT_TASK_NAME },
1811 await pool.destroy()
1814 it('Verify sendTaskFunctionOperationToWorkers()', async () => {
1815 const pool = new DynamicClusterPool(
1816 Math.floor(numberOfWorkers / 2),
1818 './tests/worker-files/cluster/testWorker.cjs'
1821 pool.sendTaskFunctionOperationToWorkers({
1822 taskFunctionOperation: 'add',
1823 taskFunctionProperties: { name: 'empty' },
1824 taskFunction: (() => {}).toString()
1826 ).resolves.toBe(true)
1827 for (const workerNode of pool.workerNodes) {
1828 expect(workerNode.info.taskFunctionsProperties).toStrictEqual([
1829 { name: DEFAULT_TASK_NAME },
1834 await pool.destroy()