From: Jérôme Benoit Date: Wed, 29 May 2024 16:00:47 +0000 (+0200) Subject: perf: enable prioritized tasks queue only when necessary X-Git-Tag: v4.0.13~1 X-Git-Url: https://git.piment-noir.org/?a=commitdiff_plain;h=fcfc3353eb4053c02f64c80a14ae142d44388a71;p=poolifier.git perf: enable prioritized tasks queue only when necessary Signed-off-by: Jérôme Benoit --- diff --git a/CHANGELOG.md b/CHANGELOG.md index f7bcd617..410f4f17 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Changed + +- Optimize tasks queue implementation. +- Enable prioritized tasks queueing only when necessary. + ## [4.0.12] - 2024-05-25 ### Changed diff --git a/src/circular-buffer.ts b/src/circular-buffer.ts index 7abb982e..7d12069f 100644 --- a/src/circular-buffer.ts +++ b/src/circular-buffer.ts @@ -11,7 +11,7 @@ export const defaultBufferSize = 2048 export class CircularBuffer { private readIdx: number private writeIdx: number - private items: Float32Array + private readonly items: Float32Array private readonly maxArrayIdx: number public size: number @@ -84,6 +84,11 @@ export class CircularBuffer { return Array.from(this.items.filter(item => item !== -1)) } + /** + * Checks the buffer size. + * + * @param size - Buffer size. + */ private checkSize (size: number): void { if (!Number.isSafeInteger(size)) { throw new TypeError( diff --git a/src/fixed-priority-queue.ts b/src/fixed-priority-queue.ts index a248aa7e..792f23d7 100644 --- a/src/fixed-priority-queue.ts +++ b/src/fixed-priority-queue.ts @@ -25,18 +25,22 @@ export class FixedPriorityQueue { private readonly nodeArray: Array> /** The fixed priority queue capacity. */ public readonly capacity: number - /** The fixed priority queue size */ + /** The fixed priority queue size. */ public size!: number + /** Whether to enable priority. */ + public enablePriority: boolean /** * Constructs a fixed priority queue. * * @param size - Fixed priority queue size. @defaultValue defaultQueueSize + * @param enablePriority - Whether to enable priority. @defaultValue false * @returns FixedPriorityQueue. */ - constructor (size: number = defaultQueueSize) { + constructor (size: number = defaultQueueSize, enablePriority = false) { this.checkSize(size) this.capacity = size + this.enablePriority = enablePriority this.nodeArray = new Array>(this.capacity) this.clear() } @@ -72,19 +76,21 @@ export class FixedPriorityQueue { throw new Error('Priority queue is full') } priority = priority ?? 0 - let index = this.start let inserted = false - for (let i = 0; i < this.size; i++) { - if (this.nodeArray[index].priority > priority) { - this.nodeArray.splice(index, 0, { data, priority }) - this.nodeArray.length !== this.capacity && - (this.nodeArray.length = this.capacity) - inserted = true - break - } - ++index - if (index === this.capacity) { - index = 0 + if (this.enablePriority) { + let index = this.start + for (let i = 0; i < this.size; i++) { + if (this.nodeArray[index].priority > priority) { + this.nodeArray.splice(index, 0, { data, priority }) + this.nodeArray.length !== this.capacity && + (this.nodeArray.length = this.capacity) + inserted = true + break + } + ++index + if (index === this.capacity) { + index = 0 + } } } if (!inserted) { @@ -172,9 +178,9 @@ export class FixedPriorityQueue { } /** - * Checks the size. + * Checks the queue size. * - * @param size - The size to check. + * @param size - Queue size. */ private checkSize (size: number): void { if (!Number.isSafeInteger(size)) { diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index 2c4fdf5a..8f57a906 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -2025,6 +2025,12 @@ export abstract class AbstractPool< } } + private setTasksQueuePriority (workerNodeKey: number): void { + this.workerNodes[workerNodeKey].setTasksQueuePriority( + this.getTasksQueuePriority() + ) + } + /** * This method is the message listener registered on each worker. */ @@ -2043,6 +2049,7 @@ export abstract class AbstractPool< if (workerInfo != null) { workerInfo.taskFunctionsProperties = taskFunctionsProperties this.sendStatisticsMessageToWorker(workerNodeKey) + this.setTasksQueuePriority(workerNodeKey) } } else if (taskId != null) { // Task execution response received from worker @@ -2067,6 +2074,7 @@ export abstract class AbstractPool< workerNode.info.ready = ready workerNode.info.taskFunctionsProperties = taskFunctionsProperties this.sendStatisticsMessageToWorker(workerNodeKey) + this.setTasksQueuePriority(workerNodeKey) this.checkAndEmitReadyEvent() } @@ -2154,6 +2162,12 @@ export abstract class AbstractPool< return this.workerNodes[workerNodeKey]?.info } + private getTasksQueuePriority (): boolean { + return this.listTaskFunctionsProperties().some( + taskFunctionProperties => taskFunctionProperties.priority != null + ) + } + /** * Creates a worker node. * @@ -2171,7 +2185,8 @@ export abstract class AbstractPool< getDefaultTasksQueueOptions( this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers ).size, - tasksQueueBucketSize: defaultBucketSize + tasksQueueBucketSize: defaultBucketSize, + tasksQueuePriority: this.getTasksQueuePriority() } ) // Flag the worker node as ready at pool startup. diff --git a/src/pools/utils.ts b/src/pools/utils.ts index f2ede8b6..d4649d70 100644 --- a/src/pools/utils.ts +++ b/src/pools/utils.ts @@ -169,7 +169,7 @@ export const checkWorkerNodeArguments = ( } if (!isPlainObject(opts)) { throw new TypeError( - 'Cannot construct a worker node with invalid options: must be a plain object' + 'Cannot construct a worker node with invalid worker node options: must be a plain object' ) } if (opts.tasksQueueBackPressureSize == null) { @@ -202,6 +202,16 @@ export const checkWorkerNodeArguments = ( 'Cannot construct a worker node with a tasks queue bucket size option that is not a positive integer' ) } + if (opts.tasksQueuePriority == null) { + throw new TypeError( + 'Cannot construct a worker node without a tasks queue priority option' + ) + } + if (typeof opts.tasksQueuePriority !== 'boolean') { + throw new TypeError( + 'Cannot construct a worker node with a tasks queue priority option that is not a boolean' + ) + } } /** diff --git a/src/pools/worker-node.ts b/src/pools/worker-node.ts index 07955701..f7a48d3b 100644 --- a/src/pools/worker-node.ts +++ b/src/pools/worker-node.ts @@ -70,11 +70,19 @@ export class WorkerNode } // eslint-disable-next-line @typescript-eslint/no-non-null-assertion this.tasksQueueBackPressureSize = opts.tasksQueueBackPressureSize! - this.tasksQueue = new PriorityQueue>(opts.tasksQueueBucketSize) + this.tasksQueue = new PriorityQueue>( + opts.tasksQueueBucketSize, + opts.tasksQueuePriority + ) this.setBackPressureFlag = false this.taskFunctionsUsage = new Map() } + /** @inheritdoc */ + public setTasksQueuePriority (enablePriority: boolean): void { + this.tasksQueue.enablePriority = enablePriority + } + /** @inheritdoc */ public tasksQueueSize (): number { return this.tasksQueue.size diff --git a/src/pools/worker.ts b/src/pools/worker.ts index b2e6ba80..c1af1cf7 100644 --- a/src/pools/worker.ts +++ b/src/pools/worker.ts @@ -278,6 +278,7 @@ export interface WorkerNodeOptions { env?: Record tasksQueueBackPressureSize: number | undefined tasksQueueBucketSize: number | undefined + tasksQueuePriority: boolean | undefined } /** @@ -315,6 +316,12 @@ export interface IWorkerNode * This is the number of tasks that can be enqueued before the worker node has back pressure. */ tasksQueueBackPressureSize: number + /** + * Sets tasks queue priority. + * + * @param enablePriority - Whether to enable tasks queue priority. + */ + readonly setTasksQueuePriority: (enablePriority: boolean) => void /** * Tasks queue size. * diff --git a/src/priority-queue.ts b/src/priority-queue.ts index 0f75e52b..ee3310c3 100644 --- a/src/priority-queue.ts +++ b/src/priority-queue.ts @@ -34,9 +34,13 @@ export class PriorityQueue { * Constructs a priority queue. * * @param bucketSize - Prioritized bucket size. @defaultValue defaultBucketSize + * @param enablePriority - Whether to enable priority. @defaultValue false * @returns PriorityQueue. */ - public constructor (bucketSize: number = defaultBucketSize) { + public constructor ( + bucketSize: number = defaultBucketSize, + enablePriority = false + ) { if (!Number.isSafeInteger(bucketSize)) { throw new TypeError( `Invalid bucket size: '${bucketSize}' is not an integer` @@ -46,7 +50,11 @@ export class PriorityQueue { throw new RangeError(`Invalid bucket size: ${bucketSize} < 0`) } this.bucketSize = bucketSize - this.clear() + this.head = this.tail = new FixedPriorityQueue( + this.bucketSize, + enablePriority + ) + this.maxSize = 0 } /** @@ -62,6 +70,21 @@ export class PriorityQueue { return size } + public get enablePriority (): boolean { + return this.head.enablePriority + } + + public set enablePriority (enablePriority: boolean) { + if (this.head.enablePriority === enablePriority) { + return + } + let node: PriorityQueueNode | undefined = this.tail + while (node != null) { + node.enablePriority = enablePriority + node = node.next + } + } + /** * The number of filled prioritized buckets. */ @@ -78,7 +101,10 @@ export class PriorityQueue { */ public enqueue (data: T, priority?: number): number { if (this.head.full()) { - this.head = this.head.next = new FixedPriorityQueue(this.bucketSize) + this.head = this.head.next = new FixedPriorityQueue( + this.bucketSize, + this.enablePriority + ) } this.head.enqueue(data, priority) const size = this.size @@ -136,7 +162,10 @@ export class PriorityQueue { * Clears the priority queue. */ public clear (): void { - this.head = this.tail = new FixedPriorityQueue(this.bucketSize) + this.head = this.tail = new FixedPriorityQueue( + this.bucketSize, + this.enablePriority + ) this.maxSize = 0 } diff --git a/tests/fixed-priority-queue.test.mjs b/tests/fixed-priority-queue.test.mjs index e4f3abd3..99dcb001 100644 --- a/tests/fixed-priority-queue.test.mjs +++ b/tests/fixed-priority-queue.test.mjs @@ -18,16 +18,18 @@ describe('Fixed priority queue test suite', () => { expect(fixedPriorityQueue.size).toBe(0) expect(fixedPriorityQueue.nodeArray).toBeInstanceOf(Array) expect(fixedPriorityQueue.capacity).toBe(defaultQueueSize) - fixedPriorityQueue = new FixedPriorityQueue(2) + expect(fixedPriorityQueue.enablePriority).toBe(false) + fixedPriorityQueue = new FixedPriorityQueue(2, true) expect(fixedPriorityQueue.start).toBe(0) expect(fixedPriorityQueue.size).toBe(0) expect(fixedPriorityQueue.nodeArray).toBeInstanceOf(Array) expect(fixedPriorityQueue.capacity).toBe(2) + expect(fixedPriorityQueue.enablePriority).toBe(true) }) it('Verify enqueue() behavior', () => { const queueSize = 5 - const fixedPriorityQueue = new FixedPriorityQueue(queueSize) + const fixedPriorityQueue = new FixedPriorityQueue(queueSize, true) let rtSize = fixedPriorityQueue.enqueue(1) expect(fixedPriorityQueue.start).toBe(0) expect(fixedPriorityQueue.size).toBe(1) @@ -84,7 +86,7 @@ describe('Fixed priority queue test suite', () => { }) it('Verify get() behavior', () => { - const fixedPriorityQueue = new FixedPriorityQueue() + const fixedPriorityQueue = new FixedPriorityQueue(defaultQueueSize, true) fixedPriorityQueue.enqueue(1) fixedPriorityQueue.enqueue(2, -1) fixedPriorityQueue.enqueue(3) @@ -96,7 +98,7 @@ describe('Fixed priority queue test suite', () => { it('Verify dequeue() behavior', () => { const queueSize = 5 - const fixedPriorityQueue = new FixedPriorityQueue(queueSize) + const fixedPriorityQueue = new FixedPriorityQueue(queueSize, true) fixedPriorityQueue.enqueue(1) fixedPriorityQueue.enqueue(2, -1) fixedPriorityQueue.enqueue(3) diff --git a/tests/pools/abstract-pool.test.mjs b/tests/pools/abstract-pool.test.mjs index 62df26e3..ec8fe516 100644 --- a/tests/pools/abstract-pool.test.mjs +++ b/tests/pools/abstract-pool.test.mjs @@ -790,6 +790,7 @@ describe('Abstract pool test suite', () => { expect(workerNode.tasksQueue.size).toBe(0) expect(workerNode.tasksQueue.maxSize).toBe(0) expect(workerNode.tasksQueue.bucketSize).toBe(defaultBucketSize) + expect(workerNode.tasksQueue.enablePriority).toBe(false) } await pool.destroy() pool = new DynamicThreadPool( @@ -803,6 +804,7 @@ describe('Abstract pool test suite', () => { expect(workerNode.tasksQueue.size).toBe(0) expect(workerNode.tasksQueue.maxSize).toBe(0) expect(workerNode.tasksQueue.bucketSize).toBe(defaultBucketSize) + expect(workerNode.tasksQueue.enablePriority).toBe(false) } await pool.destroy() }) @@ -1662,6 +1664,7 @@ describe('Abstract pool test suite', () => { ]) expect(workerNode.taskFunctionsUsage.size).toBe(3) expect(workerNode.usage.tasks.executed).toBeGreaterThan(0) + expect(workerNode.tasksQueue.enablePriority).toBe(false) for (const taskFunctionProperties of pool.listTaskFunctionsProperties()) { expect( workerNode.getTaskFunctionWorkerUsage(taskFunctionProperties.name) @@ -1727,10 +1730,11 @@ describe('Abstract pool test suite', () => { { name: DEFAULT_TASK_NAME }, { name: 'jsonIntegerSerialization' }, { name: 'factorial' }, - { name: 'fibonacci' } + { name: 'fibonacci', priority: -5 } ]) expect(workerNode.taskFunctionsUsage.size).toBe(3) expect(workerNode.usage.tasks.executed).toBeGreaterThan(0) + expect(workerNode.tasksQueue.enablePriority).toBe(true) for (const taskFunctionProperties of pool.listTaskFunctionsProperties()) { expect( workerNode.getTaskFunctionWorkerUsage(taskFunctionProperties.name) diff --git a/tests/pools/worker-node.test.mjs b/tests/pools/worker-node.test.mjs index 54a69ca4..e47ba103 100644 --- a/tests/pools/worker-node.test.mjs +++ b/tests/pools/worker-node.test.mjs @@ -13,12 +13,20 @@ describe('Worker node test suite', () => { const threadWorkerNode = new WorkerNode( WorkerTypes.thread, './tests/worker-files/thread/testWorker.mjs', - { tasksQueueBackPressureSize: 12, tasksQueueBucketSize: 6 } + { + tasksQueueBackPressureSize: 12, + tasksQueueBucketSize: 6, + tasksQueuePriority: true + } ) const clusterWorkerNode = new WorkerNode( WorkerTypes.cluster, './tests/worker-files/cluster/testWorker.cjs', - { tasksQueueBackPressureSize: 12, tasksQueueBucketSize: 6 } + { + tasksQueueBackPressureSize: 12, + tasksQueueBucketSize: 6, + tasksQueuePriority: true + } ) it('Worker node instantiation', () => { @@ -29,8 +37,7 @@ describe('Worker node test suite', () => { () => new WorkerNode( 'invalidWorkerType', - './tests/worker-files/thread/testWorker.mjs', - { tasksQueueBackPressureSize: 12 } + './tests/worker-files/thread/testWorker.mjs' ) ).toThrow( new TypeError( @@ -57,7 +64,7 @@ describe('Worker node test suite', () => { ) ).toThrow( new TypeError( - 'Cannot construct a worker node with invalid options: must be a plain object' + 'Cannot construct a worker node with invalid worker node options: must be a plain object' ) ) expect( @@ -185,6 +192,37 @@ describe('Worker node test suite', () => { 'Cannot construct a worker node with a tasks queue bucket size option that is not a positive integer' ) ) + expect( + () => + new WorkerNode( + WorkerTypes.thread, + './tests/worker-files/thread/testWorker.mjs', + { + tasksQueueBackPressureSize: 12, + tasksQueueBucketSize: 6 + } + ) + ).toThrow( + new RangeError( + 'Cannot construct a worker node without a tasks queue priority option' + ) + ) + expect( + () => + new WorkerNode( + WorkerTypes.thread, + './tests/worker-files/thread/testWorker.mjs', + { + tasksQueueBackPressureSize: 12, + tasksQueueBucketSize: 6, + tasksQueuePriority: 'invalidTasksQueuePriority' + } + ) + ).toThrow( + new RangeError( + 'Cannot construct a worker node with a tasks queue priority option that is not a boolean' + ) + ) expect(threadWorkerNode).toBeInstanceOf(WorkerNode) expect(threadWorkerNode.worker).toBeInstanceOf(ThreadWorker) expect(threadWorkerNode.info).toStrictEqual({ @@ -225,6 +263,7 @@ describe('Worker node test suite', () => { expect(threadWorkerNode.tasksQueue).toBeInstanceOf(PriorityQueue) expect(threadWorkerNode.tasksQueue.size).toBe(0) expect(threadWorkerNode.tasksQueue.bucketSize).toBe(6) + expect(threadWorkerNode.tasksQueue.enablePriority).toBe(true) expect(threadWorkerNode.tasksQueueSize()).toBe( threadWorkerNode.tasksQueue.size ) @@ -271,6 +310,7 @@ describe('Worker node test suite', () => { expect(clusterWorkerNode.tasksQueue).toBeInstanceOf(PriorityQueue) expect(clusterWorkerNode.tasksQueue.size).toBe(0) expect(clusterWorkerNode.tasksQueue.bucketSize).toBe(6) + expect(clusterWorkerNode.tasksQueue.enablePriority).toBe(true) expect(clusterWorkerNode.tasksQueueSize()).toBe( clusterWorkerNode.tasksQueue.size ) diff --git a/tests/priority-queue.test.mjs b/tests/priority-queue.test.mjs index ddc8c09d..5240efd2 100644 --- a/tests/priority-queue.test.mjs +++ b/tests/priority-queue.test.mjs @@ -16,17 +16,19 @@ describe('Priority queue test suite', () => { expect(priorityQueue.buckets).toBe(0) expect(priorityQueue.size).toBe(0) expect(priorityQueue.maxSize).toBe(0) + expect(priorityQueue.enablePriority).toBe(false) expect(priorityQueue.head).toBeInstanceOf(FixedPriorityQueue) expect(priorityQueue.head.next).toBe(undefined) expect(priorityQueue.head.capacity).toBe(defaultBucketSize) expect(priorityQueue.tail).toBeInstanceOf(FixedPriorityQueue) expect(priorityQueue.tail).toStrictEqual(priorityQueue.head) const bucketSize = 2 - priorityQueue = new PriorityQueue(bucketSize) + priorityQueue = new PriorityQueue(bucketSize, true) expect(priorityQueue.bucketSize).toBe(bucketSize) expect(priorityQueue.buckets).toBe(0) expect(priorityQueue.size).toBe(0) expect(priorityQueue.maxSize).toBe(0) + expect(priorityQueue.enablePriority).toBe(true) expect(priorityQueue.head).toBeInstanceOf(FixedPriorityQueue) expect(priorityQueue.head.next).toBe(undefined) expect(priorityQueue.head.capacity).toBe(bucketSize) @@ -35,7 +37,7 @@ describe('Priority queue test suite', () => { }) it('Verify default bucket size enqueue() behavior', () => { - const priorityQueue = new PriorityQueue() + const priorityQueue = new PriorityQueue(defaultBucketSize, true) let rtSize = priorityQueue.enqueue(1) expect(priorityQueue.buckets).toBe(0) expect(priorityQueue.size).toBe(1) @@ -99,7 +101,7 @@ describe('Priority queue test suite', () => { }) it('Verify bucketSize=2 enqueue() behavior', () => { - const priorityQueue = new PriorityQueue(2) + const priorityQueue = new PriorityQueue(2, true) let rtSize = priorityQueue.enqueue(1) expect(priorityQueue.buckets).toBe(0) expect(priorityQueue.size).toBe(1) @@ -190,7 +192,7 @@ describe('Priority queue test suite', () => { }) it('Verify default bucket size dequeue() behavior', () => { - const priorityQueue = new PriorityQueue() + const priorityQueue = new PriorityQueue(defaultBucketSize, true) priorityQueue.enqueue(1) priorityQueue.enqueue(2, -1) priorityQueue.enqueue(3) @@ -223,7 +225,7 @@ describe('Priority queue test suite', () => { }) it('Verify bucketSize=2 dequeue() behavior', () => { - const priorityQueue = new PriorityQueue(2) + const priorityQueue = new PriorityQueue(2, true) priorityQueue.enqueue(1) priorityQueue.enqueue(2) priorityQueue.enqueue(3) @@ -279,6 +281,30 @@ describe('Priority queue test suite', () => { expect(priorityQueue.tail.next).toBe(undefined) }) + it('Verify enablePriority setter behavior', () => { + const priorityQueue = new PriorityQueue(2) + expect(priorityQueue.enablePriority).toBe(false) + priorityQueue.enqueue(1) + priorityQueue.enqueue(2) + priorityQueue.enqueue(3) + priorityQueue.enqueue(4) + let buckets = 0 + let node = priorityQueue.tail + while (node != null) { + expect(node.enablePriority).toBe(false) + node = node.next + ++buckets + } + expect(buckets).toBe(2) + priorityQueue.enablePriority = true + expect(priorityQueue.enablePriority).toBe(true) + node = priorityQueue.tail + while (node != null) { + expect(node.enablePriority).toBe(true) + node = node.next + } + }) + it('Verify iterator behavior', () => { const priorityQueue = new PriorityQueue(2) priorityQueue.enqueue(1) diff --git a/tests/worker-files/cluster/testTaskFunctionObjectsWorker.cjs b/tests/worker-files/cluster/testTaskFunctionObjectsWorker.cjs index cb81d15b..fcb6f18a 100644 --- a/tests/worker-files/cluster/testTaskFunctionObjectsWorker.cjs +++ b/tests/worker-files/cluster/testTaskFunctionObjectsWorker.cjs @@ -1,18 +1,18 @@ 'use strict' -const { KillBehaviors, ThreadWorker } = require('../../../lib/index.cjs') +const { KillBehaviors, ClusterWorker } = require('../../../lib/index.cjs') const { factorial, fibonacci, jsonIntegerSerialization } = require('../../test-utils.cjs') -module.exports = new ThreadWorker( +module.exports = new ClusterWorker( { jsonIntegerSerialization: { taskFunction: data => jsonIntegerSerialization(data.n) }, factorial: { taskFunction: data => factorial(data.n) }, - fibonacci: { taskFunction: data => fibonacci(data.n) } + fibonacci: { taskFunction: data => fibonacci(data.n), priority: -5 } }, { killBehavior: KillBehaviors.HARD, diff --git a/tests/worker-files/thread/testTaskFunctionObjectsWorker.mjs b/tests/worker-files/thread/testTaskFunctionObjectsWorker.mjs index 6b5a7eda..45f4f9f2 100644 --- a/tests/worker-files/thread/testTaskFunctionObjectsWorker.mjs +++ b/tests/worker-files/thread/testTaskFunctionObjectsWorker.mjs @@ -11,7 +11,7 @@ export default new ThreadWorker( taskFunction: data => jsonIntegerSerialization(data.n) }, factorial: { taskFunction: data => factorial(data.n) }, - fibonacci: { taskFunction: data => fibonacci(data.n) } + fibonacci: { taskFunction: data => fibonacci(data.n), priority: -5 } }, { killBehavior: KillBehaviors.HARD,