DynamicThreadPool,
FixedClusterPool,
FixedThreadPool,
+ Measurements,
PoolTypes,
WorkerChoiceStrategies,
WorkerTypes
WorkerChoiceStrategies
)) {
for (const enableTasksQueue of [false, true]) {
- suite.add(
- `${name}|${workerChoiceStrategy}|${
- enableTasksQueue ? 'with' : 'without'
- } tasks queue`,
- async () => {
- pool.setWorkerChoiceStrategy(workerChoiceStrategy)
- pool.enableTasksQueue(enableTasksQueue)
- assert.strictEqual(
- pool.opts.workerChoiceStrategy,
- workerChoiceStrategy
+ if (workerChoiceStrategy === WorkerChoiceStrategies.FAIR_SHARE) {
+ for (const measurement of [
+ Measurements.runTime,
+ Measurements.elu
+ ]) {
+ suite.add(
+ `${name}|${workerChoiceStrategy}|${measurement}|${
+ enableTasksQueue ? 'with' : 'without'
+ } tasks queue`,
+ async () => {
+ pool.setWorkerChoiceStrategy(workerChoiceStrategy, {
+ measurement
+ })
+ pool.enableTasksQueue(enableTasksQueue)
+ assert.strictEqual(
+ pool.opts.workerChoiceStrategy,
+ workerChoiceStrategy
+ )
+ assert.strictEqual(
+ pool.opts.enableTasksQueue,
+ enableTasksQueue
+ )
+ assert.strictEqual(
+ pool.opts.workerChoiceStrategyOptions.measurement,
+ measurement
+ )
+ await runPoolifierPool(pool, {
+ taskExecutions,
+ workerData
+ })
+ }
)
- assert.strictEqual(pool.opts.enableTasksQueue, enableTasksQueue)
- await runPoolifierPool(pool, {
- taskExecutions,
- workerData
- })
}
- )
+ } else {
+ suite.add(
+ `${name}|${workerChoiceStrategy}|${
+ enableTasksQueue ? 'with' : 'without'
+ } tasks queue`,
+ async () => {
+ pool.setWorkerChoiceStrategy(workerChoiceStrategy)
+ pool.enableTasksQueue(enableTasksQueue)
+ assert.strictEqual(
+ pool.opts.workerChoiceStrategy,
+ workerChoiceStrategy
+ )
+ assert.strictEqual(pool.opts.enableTasksQueue, enableTasksQueue)
+ await runPoolifierPool(pool, {
+ taskExecutions,
+ workerData
+ })
+ }
+ )
+ }
}
}
suite
const tasksMap = generateRandomTasksMap(60, 20)
function loopSelect (tasksMap) {
- let minValue = Infinity
let minKey
+ let minValue = Infinity
for (const [key, value] of tasksMap) {
if (value === 0) {
return key
resolution: {integrity: sha512-XktuhWlJ5g+3TJXc5upd9Ks1HutSArik6jf2eAjYFyIOf4ej3RN+184cZbzDvbPnuTJIUhPKKJE3cIsYTiAT3w==}
engines: {node: '>=6.9.0'}
dependencies:
- '@babel/highlight': 7.22.13
+ '@babel/highlight': 7.22.20
chalk: 2.4.2
dev: true
- /@babel/helper-validator-identifier@7.22.19:
- resolution: {integrity: sha512-Tinq7ybnEPFFXhlYOYFiSjespWQk0dq2dRNAiMdRTOYQzEGqnnNyrTxPYHP5r6wGjlF1rFgABdDV0g8EwD6Qbg==}
+ /@babel/helper-validator-identifier@7.22.20:
+ resolution: {integrity: sha512-Y4OZ+ytlatR8AI+8KZfKuL5urKp7qey08ha31L8b3BwewJAoJamTzyvxPR/5D+KkdJCGPq/+8TukHBlY10FX9A==}
engines: {node: '>=6.9.0'}
requiresBuild: true
dev: true
- /@babel/highlight@7.22.13:
- resolution: {integrity: sha512-C/BaXcnnvBCmHTpz/VGZ8jgtE2aYlW4hxDhseJAWZb7gqGM/qtCK6iZUb0TyKFf7BOUsBH7Q7fkRsDRhg1XklQ==}
+ /@babel/highlight@7.22.20:
+ resolution: {integrity: sha512-dkdMCN3py0+ksCgYmGG8jKeGA/8Tk+gJwSYYlFGxG5lmhfKNoAy004YpLxpS1W2J8m/EK2Ew+yOs9pVRwO89mg==}
engines: {node: '>=6.9.0'}
requiresBuild: true
dependencies:
- '@babel/helper-validator-identifier': 7.22.19
+ '@babel/helper-validator-identifier': 7.22.20
chalk: 2.4.2
js-tokens: 4.0.0
dev: true
resolution: {integrity: sha512-OHx4Qwrrt0E4jEIcI5/Xb+f+QmJYNj2rrK8wiIdQOIrB9WrrJL8cjZvXdXuBTkkEwEqLycb5BeZDV1o2i9bTew==}
engines: {node: '>=12.0.0'}
dependencies:
- flatted: 3.2.8
+ flatted: 3.2.9
keyv: 4.5.3
rimraf: 3.0.2
dev: true
hasBin: true
dev: true
- /flatted@3.2.8:
- resolution: {integrity: sha512-6qu0W+A94UKNJRs3ffE8s/fWSHQbjqdNx8elGAe95IqnJA77P68TFz4+2cwC28ouAibiZdGBeV6DsvvMg+4vhQ==}
+ /flatted@3.2.9:
+ resolution: {integrity: sha512-36yxDn5H7OFZQla0/jFJmbIKTdZAQHngCedGxiMmpNfEZM0sdEeT+WczLQrjK6D7o2aiyLYDnkw0R3JK0Qv1RQ==}
dev: true
/for-each@0.3.3:
export interface PoolOptions<Worker extends IWorker> {
/**
* A function that will listen for online event on each worker.
+ *
+ * @defaultValue `() => {}`
*/
onlineHandler?: OnlineHandler<Worker>
/**
* A function that will listen for message event on each worker.
+ *
+ * @defaultValue `() => {}`
*/
messageHandler?: MessageHandler<Worker>
/**
* A function that will listen for error event on each worker.
+ *
+ * @defaultValue `() => {}`
*/
errorHandler?: ErrorHandler<Worker>
/**
* A function that will listen for exit event on each worker.
+ *
+ * @defaultValue `() => {}`
*/
exitHandler?: ExitHandler<Worker>
/**
expect(threadWorkerNode.tasksQueueBackPressureSize).toBe(12)
expect(threadWorkerNode.tasksQueue).toBeInstanceOf(Deque)
expect(threadWorkerNode.tasksQueue.size).toBe(0)
+ expect(threadWorkerNode.tasksQueueSize()).toBe(
+ threadWorkerNode.tasksQueue.size
+ )
+ expect(threadWorkerNode.onBackPressureStarted).toBe(false)
+ expect(threadWorkerNode.onEmptyQueueCount).toBe(0)
expect(threadWorkerNode.taskFunctionsUsage).toBeInstanceOf(Map)
expect(clusterWorkerNode).toBeInstanceOf(WorkerNode)
expect(clusterWorkerNode.tasksQueueBackPressureSize).toBe(12)
expect(clusterWorkerNode.tasksQueue).toBeInstanceOf(Deque)
expect(clusterWorkerNode.tasksQueue.size).toBe(0)
+ expect(clusterWorkerNode.tasksQueueSize()).toBe(
+ clusterWorkerNode.tasksQueue.size
+ )
+ expect(clusterWorkerNode.onBackPressureStarted).toBe(false)
+ expect(clusterWorkerNode.onEmptyQueueCount).toBe(0)
expect(clusterWorkerNode.taskFunctionsUsage).toBeInstanceOf(Map)
})