X-Git-Url: https://git.piment-noir.org/?a=blobdiff_plain;f=tests%2Fpools%2Fworker-node.test.mjs;h=a29322825912d17dc2027d0c57eaa0e76b1a2212;hb=3c2426eb135938bf35b17771f32ed7a39e252173;hp=cfca3422b0f3ff727138608a65f7542dff63ba35;hpb=c3719753af0a9be03abf722a7543495359e817b5;p=poolifier.git diff --git a/tests/pools/worker-node.test.mjs b/tests/pools/worker-node.test.mjs index cfca3422..a2932282 100644 --- a/tests/pools/worker-node.test.mjs +++ b/tests/pools/worker-node.test.mjs @@ -1,21 +1,24 @@ -import { MessageChannel } from 'node:worker_threads' +import { Worker as ClusterWorker } from 'node:cluster' +import { MessageChannel, Worker as ThreadWorker } from 'node:worker_threads' + import { expect } from 'expect' -import { WorkerNode } from '../../lib/pools/worker-node.js' -import { WorkerTypes } from '../../lib/index.js' -import { CircularArray } from '../../lib/circular-array.js' -import { Deque } from '../../lib/deque.js' -import { DEFAULT_TASK_NAME } from '../../lib/utils.js' + +import { CircularArray } from '../../lib/circular-array.cjs' +import { WorkerTypes } from '../../lib/index.cjs' +import { WorkerNode } from '../../lib/pools/worker-node.cjs' +import { PriorityQueue } from '../../lib/priority-queue.cjs' +import { DEFAULT_TASK_NAME } from '../../lib/utils.cjs' describe('Worker node test suite', () => { const threadWorkerNode = new WorkerNode( WorkerTypes.thread, './tests/worker-files/thread/testWorker.mjs', - { tasksQueueBackPressureSize: 12 } + { tasksQueueBackPressureSize: 12, tasksQueueBucketSize: 6 } ) const clusterWorkerNode = new WorkerNode( WorkerTypes.cluster, - './tests/worker-files/cluster/testWorker.js', - { tasksQueueBackPressureSize: 12 } + './tests/worker-files/cluster/testWorker.cjs', + { tasksQueueBackPressureSize: 12, tasksQueueBucketSize: 6 } ) it('Worker node instantiation', () => { @@ -117,12 +120,80 @@ describe('Worker node test suite', () => { 'Cannot construct a worker node with a tasks queue back pressure size option that is not a positive integer' ) ) + expect( + () => + new WorkerNode( + WorkerTypes.thread, + './tests/worker-files/thread/testWorker.mjs', + { + tasksQueueBackPressureSize: 12 + } + ) + ).toThrow( + new TypeError( + 'Cannot construct a worker node without a tasks queue bucket size option' + ) + ) + expect( + () => + new WorkerNode( + WorkerTypes.thread, + './tests/worker-files/thread/testWorker.mjs', + { + tasksQueueBackPressureSize: 12, + tasksQueueBucketSize: 'invalidTasksQueueBucketSize' + } + ) + ).toThrow( + new TypeError( + 'Cannot construct a worker node with a tasks queue bucket size option that is not an integer' + ) + ) + expect( + () => + new WorkerNode( + WorkerTypes.thread, + './tests/worker-files/thread/testWorker.mjs', + { tasksQueueBackPressureSize: 12, tasksQueueBucketSize: 0.2 } + ) + ).toThrow( + new TypeError( + 'Cannot construct a worker node with a tasks queue bucket size option that is not an integer' + ) + ) + expect( + () => + new WorkerNode( + WorkerTypes.thread, + './tests/worker-files/thread/testWorker.mjs', + { tasksQueueBackPressureSize: 12, tasksQueueBucketSize: 0 } + ) + ).toThrow( + new RangeError( + 'Cannot construct a worker node with a tasks queue bucket size option that is not a positive integer' + ) + ) + expect( + () => + new WorkerNode( + WorkerTypes.thread, + './tests/worker-files/thread/testWorker.mjs', + { tasksQueueBackPressureSize: 12, tasksQueueBucketSize: -1 } + ) + ).toThrow( + new RangeError( + 'Cannot construct a worker node with a tasks queue bucket size option that is not a positive integer' + ) + ) expect(threadWorkerNode).toBeInstanceOf(WorkerNode) + expect(threadWorkerNode.worker).toBeInstanceOf(ThreadWorker) expect(threadWorkerNode.info).toStrictEqual({ id: threadWorkerNode.worker.threadId, type: WorkerTypes.thread, dynamic: false, - ready: false + ready: false, + stealing: false, + backPressure: false }) expect(threadWorkerNode.usage).toStrictEqual({ tasks: { @@ -151,20 +222,24 @@ describe('Worker node test suite', () => { }) expect(threadWorkerNode.messageChannel).toBeInstanceOf(MessageChannel) expect(threadWorkerNode.tasksQueueBackPressureSize).toBe(12) - expect(threadWorkerNode.tasksQueue).toBeInstanceOf(Deque) + expect(threadWorkerNode.tasksQueue).toBeInstanceOf(PriorityQueue) expect(threadWorkerNode.tasksQueue.size).toBe(0) + expect(threadWorkerNode.tasksQueue.k).toBe(6) expect(threadWorkerNode.tasksQueueSize()).toBe( threadWorkerNode.tasksQueue.size ) - expect(threadWorkerNode.onBackPressureStarted).toBe(false) + expect(threadWorkerNode.setBackPressureFlag).toBe(false) expect(threadWorkerNode.taskFunctionsUsage).toBeInstanceOf(Map) expect(clusterWorkerNode).toBeInstanceOf(WorkerNode) + expect(clusterWorkerNode.worker).toBeInstanceOf(ClusterWorker) expect(clusterWorkerNode.info).toStrictEqual({ id: clusterWorkerNode.worker.id, type: WorkerTypes.cluster, dynamic: false, - ready: false + ready: false, + stealing: false, + backPressure: false }) expect(clusterWorkerNode.usage).toStrictEqual({ tasks: { @@ -193,12 +268,13 @@ describe('Worker node test suite', () => { }) expect(clusterWorkerNode.messageChannel).toBeUndefined() expect(clusterWorkerNode.tasksQueueBackPressureSize).toBe(12) - expect(clusterWorkerNode.tasksQueue).toBeInstanceOf(Deque) + expect(clusterWorkerNode.tasksQueue).toBeInstanceOf(PriorityQueue) expect(clusterWorkerNode.tasksQueue.size).toBe(0) + expect(clusterWorkerNode.tasksQueue.k).toBe(6) expect(clusterWorkerNode.tasksQueueSize()).toBe( clusterWorkerNode.tasksQueue.size ) - expect(clusterWorkerNode.onBackPressureStarted).toBe(false) + expect(clusterWorkerNode.setBackPressureFlag).toBe(false) expect(clusterWorkerNode.taskFunctionsUsage).toBeInstanceOf(Map) }) @@ -207,18 +283,25 @@ describe('Worker node test suite', () => { threadWorkerNode.getTaskFunctionWorkerUsage('invalidTaskFunction') ).toThrow( new TypeError( - "Cannot get task function worker usage for task function name 'invalidTaskFunction' when task function names list is not yet defined" + "Cannot get task function worker usage for task function name 'invalidTaskFunction' when task function properties list is not yet defined" ) ) - threadWorkerNode.info.taskFunctionNames = [DEFAULT_TASK_NAME, 'fn1'] + threadWorkerNode.info.taskFunctionsProperties = [ + { name: DEFAULT_TASK_NAME }, + { name: 'fn1' } + ] expect(() => threadWorkerNode.getTaskFunctionWorkerUsage('invalidTaskFunction') ).toThrow( new TypeError( - "Cannot get task function worker usage for task function name 'invalidTaskFunction' when task function names list has less than 3 elements" + "Cannot get task function worker usage for task function name 'invalidTaskFunction' when task function properties list has less than 3 elements" ) ) - threadWorkerNode.info.taskFunctionNames = [DEFAULT_TASK_NAME, 'fn1', 'fn2'] + threadWorkerNode.info.taskFunctionsProperties = [ + { name: DEFAULT_TASK_NAME }, + { name: 'fn1' }, + { name: 'fn2' } + ] expect( threadWorkerNode.getTaskFunctionWorkerUsage(DEFAULT_TASK_NAME) ).toStrictEqual({ @@ -297,10 +380,10 @@ describe('Worker node test suite', () => { }) it('Worker node deleteTaskFunctionWorkerUsage()', () => { - expect(threadWorkerNode.info.taskFunctionNames).toStrictEqual([ - DEFAULT_TASK_NAME, - 'fn1', - 'fn2' + expect(threadWorkerNode.info.taskFunctionsProperties).toStrictEqual([ + { name: DEFAULT_TASK_NAME }, + { name: 'fn1' }, + { name: 'fn2' } ]) expect(threadWorkerNode.taskFunctionsUsage.size).toBe(2) expect(