X-Git-Url: https://git.piment-noir.org/?a=blobdiff_plain;f=tests%2Fpools%2Fworker-node.test.mjs;h=e47ba10326d82cce07c5737b2050b0711d21d729;hb=937d524da3a5cce4795b85ddd1c430f0d184f731;hp=f22ea153f22f00e9af20576431efcae57499eb07;hpb=5eb72b9e26eaffb43c67147fbc6b4d2b1b959d62;p=poolifier.git diff --git a/tests/pools/worker-node.test.mjs b/tests/pools/worker-node.test.mjs index f22ea153..e47ba103 100644 --- a/tests/pools/worker-node.test.mjs +++ b/tests/pools/worker-node.test.mjs @@ -1,22 +1,32 @@ -import { MessageChannel, Worker as ThreadWorker } from 'node:worker_threads' import { Worker as ClusterWorker } from 'node:cluster' +import { MessageChannel, Worker as ThreadWorker } from 'node:worker_threads' + import { expect } from 'expect' -import { WorkerNode } from '../../lib/pools/worker-node.js' -import { WorkerTypes } from '../../lib/index.js' -import { CircularArray } from '../../lib/circular-array.js' -import { Deque } from '../../lib/deque.js' -import { DEFAULT_TASK_NAME } from '../../lib/utils.js' + +import { CircularBuffer } from '../../lib/circular-buffer.cjs' +import { WorkerTypes } from '../../lib/index.cjs' +import { WorkerNode } from '../../lib/pools/worker-node.cjs' +import { PriorityQueue } from '../../lib/priority-queue.cjs' +import { DEFAULT_TASK_NAME } from '../../lib/utils.cjs' describe('Worker node test suite', () => { const threadWorkerNode = new WorkerNode( WorkerTypes.thread, './tests/worker-files/thread/testWorker.mjs', - { tasksQueueBackPressureSize: 12 } + { + tasksQueueBackPressureSize: 12, + tasksQueueBucketSize: 6, + tasksQueuePriority: true + } ) const clusterWorkerNode = new WorkerNode( WorkerTypes.cluster, - './tests/worker-files/cluster/testWorker.js', - { tasksQueueBackPressureSize: 12 } + './tests/worker-files/cluster/testWorker.cjs', + { + tasksQueueBackPressureSize: 12, + tasksQueueBucketSize: 6, + tasksQueuePriority: true + } ) it('Worker node instantiation', () => { @@ -27,8 +37,7 @@ describe('Worker node test suite', () => { () => new WorkerNode( 'invalidWorkerType', - './tests/worker-files/thread/testWorker.mjs', - { tasksQueueBackPressureSize: 12 } + './tests/worker-files/thread/testWorker.mjs' ) ).toThrow( new TypeError( @@ -55,7 +64,7 @@ describe('Worker node test suite', () => { ) ).toThrow( new TypeError( - 'Cannot construct a worker node with invalid options: must be a plain object' + 'Cannot construct a worker node with invalid worker node options: must be a plain object' ) ) expect( @@ -118,6 +127,102 @@ describe('Worker node test suite', () => { 'Cannot construct a worker node with a tasks queue back pressure size option that is not a positive integer' ) ) + expect( + () => + new WorkerNode( + WorkerTypes.thread, + './tests/worker-files/thread/testWorker.mjs', + { + tasksQueueBackPressureSize: 12 + } + ) + ).toThrow( + new TypeError( + 'Cannot construct a worker node without a tasks queue bucket size option' + ) + ) + expect( + () => + new WorkerNode( + WorkerTypes.thread, + './tests/worker-files/thread/testWorker.mjs', + { + tasksQueueBackPressureSize: 12, + tasksQueueBucketSize: 'invalidTasksQueueBucketSize' + } + ) + ).toThrow( + new TypeError( + 'Cannot construct a worker node with a tasks queue bucket size option that is not an integer' + ) + ) + expect( + () => + new WorkerNode( + WorkerTypes.thread, + './tests/worker-files/thread/testWorker.mjs', + { tasksQueueBackPressureSize: 12, tasksQueueBucketSize: 0.2 } + ) + ).toThrow( + new TypeError( + 'Cannot construct a worker node with a tasks queue bucket size option that is not an integer' + ) + ) + expect( + () => + new WorkerNode( + WorkerTypes.thread, + './tests/worker-files/thread/testWorker.mjs', + { tasksQueueBackPressureSize: 12, tasksQueueBucketSize: 0 } + ) + ).toThrow( + new RangeError( + 'Cannot construct a worker node with a tasks queue bucket size option that is not a positive integer' + ) + ) + expect( + () => + new WorkerNode( + WorkerTypes.thread, + './tests/worker-files/thread/testWorker.mjs', + { tasksQueueBackPressureSize: 12, tasksQueueBucketSize: -1 } + ) + ).toThrow( + new RangeError( + 'Cannot construct a worker node with a tasks queue bucket size option that is not a positive integer' + ) + ) + expect( + () => + new WorkerNode( + WorkerTypes.thread, + './tests/worker-files/thread/testWorker.mjs', + { + tasksQueueBackPressureSize: 12, + tasksQueueBucketSize: 6 + } + ) + ).toThrow( + new RangeError( + 'Cannot construct a worker node without a tasks queue priority option' + ) + ) + expect( + () => + new WorkerNode( + WorkerTypes.thread, + './tests/worker-files/thread/testWorker.mjs', + { + tasksQueueBackPressureSize: 12, + tasksQueueBucketSize: 6, + tasksQueuePriority: 'invalidTasksQueuePriority' + } + ) + ).toThrow( + new RangeError( + 'Cannot construct a worker node with a tasks queue priority option that is not a boolean' + ) + ) expect(threadWorkerNode).toBeInstanceOf(WorkerNode) expect(threadWorkerNode.worker).toBeInstanceOf(ThreadWorker) expect(threadWorkerNode.info).toStrictEqual({ @@ -125,7 +230,8 @@ describe('Worker node test suite', () => { type: WorkerTypes.thread, dynamic: false, ready: false, - stealing: false + stealing: false, + backPressure: false }) expect(threadWorkerNode.usage).toStrictEqual({ tasks: { @@ -138,28 +244,30 @@ describe('Worker node test suite', () => { failed: 0 }, runTime: { - history: new CircularArray() + history: expect.any(CircularBuffer) }, waitTime: { - history: new CircularArray() + history: expect.any(CircularBuffer) }, elu: { idle: { - history: new CircularArray() + history: expect.any(CircularBuffer) }, active: { - history: new CircularArray() + history: expect.any(CircularBuffer) } } }) expect(threadWorkerNode.messageChannel).toBeInstanceOf(MessageChannel) expect(threadWorkerNode.tasksQueueBackPressureSize).toBe(12) - expect(threadWorkerNode.tasksQueue).toBeInstanceOf(Deque) + expect(threadWorkerNode.tasksQueue).toBeInstanceOf(PriorityQueue) expect(threadWorkerNode.tasksQueue.size).toBe(0) + expect(threadWorkerNode.tasksQueue.bucketSize).toBe(6) + expect(threadWorkerNode.tasksQueue.enablePriority).toBe(true) expect(threadWorkerNode.tasksQueueSize()).toBe( threadWorkerNode.tasksQueue.size ) - expect(threadWorkerNode.onBackPressureStarted).toBe(false) + expect(threadWorkerNode.setBackPressureFlag).toBe(false) expect(threadWorkerNode.taskFunctionsUsage).toBeInstanceOf(Map) expect(clusterWorkerNode).toBeInstanceOf(WorkerNode) @@ -169,7 +277,8 @@ describe('Worker node test suite', () => { type: WorkerTypes.cluster, dynamic: false, ready: false, - stealing: false + stealing: false, + backPressure: false }) expect(clusterWorkerNode.usage).toStrictEqual({ tasks: { @@ -182,28 +291,30 @@ describe('Worker node test suite', () => { failed: 0 }, runTime: { - history: new CircularArray() + history: expect.any(CircularBuffer) }, waitTime: { - history: new CircularArray() + history: expect.any(CircularBuffer) }, elu: { idle: { - history: new CircularArray() + history: expect.any(CircularBuffer) }, active: { - history: new CircularArray() + history: expect.any(CircularBuffer) } } }) expect(clusterWorkerNode.messageChannel).toBeUndefined() expect(clusterWorkerNode.tasksQueueBackPressureSize).toBe(12) - expect(clusterWorkerNode.tasksQueue).toBeInstanceOf(Deque) + expect(clusterWorkerNode.tasksQueue).toBeInstanceOf(PriorityQueue) expect(clusterWorkerNode.tasksQueue.size).toBe(0) + expect(clusterWorkerNode.tasksQueue.bucketSize).toBe(6) + expect(clusterWorkerNode.tasksQueue.enablePriority).toBe(true) expect(clusterWorkerNode.tasksQueueSize()).toBe( clusterWorkerNode.tasksQueue.size ) - expect(clusterWorkerNode.onBackPressureStarted).toBe(false) + expect(clusterWorkerNode.setBackPressureFlag).toBe(false) expect(clusterWorkerNode.taskFunctionsUsage).toBeInstanceOf(Map) }) @@ -212,18 +323,25 @@ describe('Worker node test suite', () => { threadWorkerNode.getTaskFunctionWorkerUsage('invalidTaskFunction') ).toThrow( new TypeError( - "Cannot get task function worker usage for task function name 'invalidTaskFunction' when task function names list is not yet defined" + "Cannot get task function worker usage for task function name 'invalidTaskFunction' when task function properties list is not yet defined" ) ) - threadWorkerNode.info.taskFunctionNames = [DEFAULT_TASK_NAME, 'fn1'] + threadWorkerNode.info.taskFunctionsProperties = [ + { name: DEFAULT_TASK_NAME }, + { name: 'fn1' } + ] expect(() => threadWorkerNode.getTaskFunctionWorkerUsage('invalidTaskFunction') ).toThrow( new TypeError( - "Cannot get task function worker usage for task function name 'invalidTaskFunction' when task function names list has less than 3 elements" + "Cannot get task function worker usage for task function name 'invalidTaskFunction' when task function properties list has less than 3 elements" ) ) - threadWorkerNode.info.taskFunctionNames = [DEFAULT_TASK_NAME, 'fn1', 'fn2'] + threadWorkerNode.info.taskFunctionsProperties = [ + { name: DEFAULT_TASK_NAME }, + { name: 'fn1' }, + { name: 'fn2' } + ] expect( threadWorkerNode.getTaskFunctionWorkerUsage(DEFAULT_TASK_NAME) ).toStrictEqual({ @@ -236,17 +354,17 @@ describe('Worker node test suite', () => { failed: 0 }, runTime: { - history: new CircularArray() + history: expect.any(CircularBuffer) }, waitTime: { - history: new CircularArray() + history: expect.any(CircularBuffer) }, elu: { idle: { - history: new CircularArray() + history: expect.any(CircularBuffer) }, active: { - history: new CircularArray() + history: expect.any(CircularBuffer) } } }) @@ -260,17 +378,17 @@ describe('Worker node test suite', () => { failed: 0 }, runTime: { - history: new CircularArray() + history: expect.any(CircularBuffer) }, waitTime: { - history: new CircularArray() + history: expect.any(CircularBuffer) }, elu: { idle: { - history: new CircularArray() + history: expect.any(CircularBuffer) }, active: { - history: new CircularArray() + history: expect.any(CircularBuffer) } } }) @@ -284,17 +402,17 @@ describe('Worker node test suite', () => { failed: 0 }, runTime: { - history: new CircularArray() + history: expect.any(CircularBuffer) }, waitTime: { - history: new CircularArray() + history: expect.any(CircularBuffer) }, elu: { idle: { - history: new CircularArray() + history: expect.any(CircularBuffer) }, active: { - history: new CircularArray() + history: expect.any(CircularBuffer) } } }) @@ -302,10 +420,10 @@ describe('Worker node test suite', () => { }) it('Worker node deleteTaskFunctionWorkerUsage()', () => { - expect(threadWorkerNode.info.taskFunctionNames).toStrictEqual([ - DEFAULT_TASK_NAME, - 'fn1', - 'fn2' + expect(threadWorkerNode.info.taskFunctionsProperties).toStrictEqual([ + { name: DEFAULT_TASK_NAME }, + { name: 'fn1' }, + { name: 'fn2' } ]) expect(threadWorkerNode.taskFunctionsUsage.size).toBe(2) expect(