+// eslint-disable-next-line n/no-unsupported-features/node-builtins
+import { createHook, executionAsyncId } from 'node:async_hooks'
import { EventEmitterAsyncResource } from 'node:events'
-import { dirname, join } from 'node:path'
import { readFileSync } from 'node:fs'
+import { dirname, join } from 'node:path'
import { fileURLToPath } from 'node:url'
-import { createHook, executionAsyncId } from 'node:async_hooks'
+
import { expect } from 'expect'
import { restore, stub } from 'sinon'
+
+import { CircularBuffer } from '../../lib/circular-buffer.cjs'
import {
DynamicClusterPool,
DynamicThreadPool,
PoolEvents,
PoolTypes,
WorkerChoiceStrategies,
- WorkerTypes
-} from '../../lib/index.js'
-import { CircularArray } from '../../lib/circular-array.js'
-import { Deque } from '../../lib/deque.js'
-import { DEFAULT_TASK_NAME } from '../../lib/utils.js'
-import { waitPoolEvents } from '../test-utils.js'
-import { WorkerNode } from '../../lib/pools/worker-node.js'
+ WorkerTypes,
+} from '../../lib/index.cjs'
+import { WorkerNode } from '../../lib/pools/worker-node.cjs'
+import { defaultBucketSize, PriorityQueue } from '../../lib/priority-queue.cjs'
+import { DEFAULT_TASK_NAME } from '../../lib/utils.cjs'
+import { waitPoolEvents } from '../test-utils.cjs'
describe('Abstract pool test suite', () => {
const version = JSON.parse(
numberOfWorkers,
'./tests/worker-files/thread/testWorker.mjs',
{
- errorHandler: e => console.error(e)
+ errorHandler: e => console.error(e),
}
)
).toThrow(
it('Verify that a negative number of workers is checked', () => {
expect(
() =>
- new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.js')
+ new FixedClusterPool(-1, './tests/worker-files/cluster/testWorker.cjs')
).toThrow(
new RangeError(
'Cannot instantiate a pool with a negative number of workers'
new DynamicClusterPool(
1,
undefined,
- './tests/worker-files/cluster/testWorker.js'
+ './tests/worker-files/cluster/testWorker.cjs'
)
).toThrow(
new TypeError(
new DynamicClusterPool(
0,
0.5,
- './tests/worker-files/cluster/testWorker.js'
+ './tests/worker-files/cluster/testWorker.cjs'
)
).toThrow(
new TypeError(
new DynamicClusterPool(
1,
1,
- './tests/worker-files/cluster/testWorker.js'
+ './tests/worker-files/cluster/testWorker.cjs'
)
).toThrow(
new RangeError(
'./tests/worker-files/thread/testWorker.mjs'
)
expect(pool.emitter).toBeInstanceOf(EventEmitterAsyncResource)
+ expect(pool.emitter.eventNames()).toStrictEqual([])
expect(pool.opts).toStrictEqual({
startWorkers: true,
enableEvents: true,
restartWorkerOnError: true,
enableTasksQueue: false,
- workerChoiceStrategy: WorkerChoiceStrategies.ROUND_ROBIN
- })
- expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
- retries: pool.info.maxSize,
- runTime: { median: false },
- waitTime: { median: false },
- elu: { median: false }
+ workerChoiceStrategy: WorkerChoiceStrategies.ROUND_ROBIN,
})
- for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
+ for (const [, workerChoiceStrategy] of pool.workerChoiceStrategiesContext
.workerChoiceStrategies) {
expect(workerChoiceStrategy.opts).toStrictEqual({
- retries: pool.info.maxSize,
runTime: { median: false },
waitTime: { median: false },
- elu: { median: false }
+ elu: { median: false },
+ weights: expect.objectContaining({
+ 0: expect.any(Number),
+ [pool.info.maxSize - 1]: expect.any(Number),
+ }),
})
}
await pool.destroy()
workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
workerChoiceStrategyOptions: {
runTime: { median: true },
- weights: { 0: 300, 1: 200 }
+ weights: { 0: 300, 1: 200 },
},
enableEvents: false,
restartWorkerOnError: false,
messageHandler: testHandler,
errorHandler: testHandler,
onlineHandler: testHandler,
- exitHandler: testHandler
+ exitHandler: testHandler,
}
)
expect(pool.emitter).toBeUndefined()
concurrency: 2,
size: Math.pow(numberOfWorkers, 2),
taskStealing: true,
- tasksStealingOnBackPressure: true,
- tasksFinishedTimeout: 2000
+ tasksStealingOnBackPressure: false,
+ tasksFinishedTimeout: 2000,
},
workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
workerChoiceStrategyOptions: {
runTime: { median: true },
- weights: { 0: 300, 1: 200 }
+ weights: { 0: 300, 1: 200 },
},
onlineHandler: testHandler,
messageHandler: testHandler,
errorHandler: testHandler,
- exitHandler: testHandler
- })
- expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
- retries:
- pool.info.maxSize +
- Object.keys(pool.opts.workerChoiceStrategyOptions.weights).length,
- runTime: { median: true },
- waitTime: { median: false },
- elu: { median: false },
- weights: { 0: 300, 1: 200 }
+ exitHandler: testHandler,
})
- for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
+ for (const [, workerChoiceStrategy] of pool.workerChoiceStrategiesContext
.workerChoiceStrategies) {
expect(workerChoiceStrategy.opts).toStrictEqual({
- retries:
- pool.info.maxSize +
- Object.keys(pool.opts.workerChoiceStrategyOptions.weights).length,
runTime: { median: true },
waitTime: { median: false },
elu: { median: false },
- weights: { 0: 300, 1: 200 }
+ weights: { 0: 300, 1: 200 },
})
}
await pool.destroy()
numberOfWorkers,
'./tests/worker-files/thread/testWorker.mjs',
{
- workerChoiceStrategy: 'invalidStrategy'
+ workerChoiceStrategy: 'invalidStrategy',
}
)
).toThrow(new Error("Invalid worker choice strategy 'invalidStrategy'"))
numberOfWorkers,
'./tests/worker-files/thread/testWorker.mjs',
{
- workerChoiceStrategyOptions: { weights: {} }
+ workerChoiceStrategyOptions: { weights: {} },
}
)
).toThrow(
numberOfWorkers,
'./tests/worker-files/thread/testWorker.mjs',
{
- workerChoiceStrategyOptions: { measurement: 'invalidMeasurement' }
+ workerChoiceStrategyOptions: { measurement: 'invalidMeasurement' },
}
)
).toThrow(
'./tests/worker-files/thread/testWorker.mjs',
{
enableTasksQueue: true,
- tasksQueueOptions: 'invalidTasksQueueOptions'
+ tasksQueueOptions: 'invalidTasksQueueOptions',
}
)
).toThrow(
'./tests/worker-files/thread/testWorker.mjs',
{
enableTasksQueue: true,
- tasksQueueOptions: { concurrency: 0 }
+ tasksQueueOptions: { concurrency: 0 },
}
)
).toThrow(
'./tests/worker-files/thread/testWorker.mjs',
{
enableTasksQueue: true,
- tasksQueueOptions: { concurrency: -1 }
+ tasksQueueOptions: { concurrency: -1 },
}
)
).toThrow(
'./tests/worker-files/thread/testWorker.mjs',
{
enableTasksQueue: true,
- tasksQueueOptions: { concurrency: 0.2 }
+ tasksQueueOptions: { concurrency: 0.2 },
}
)
).toThrow(
'./tests/worker-files/thread/testWorker.mjs',
{
enableTasksQueue: true,
- tasksQueueOptions: { size: 0 }
+ tasksQueueOptions: { size: 0 },
}
)
).toThrow(
'./tests/worker-files/thread/testWorker.mjs',
{
enableTasksQueue: true,
- tasksQueueOptions: { size: -1 }
+ tasksQueueOptions: { size: -1 },
}
)
).toThrow(
'./tests/worker-files/thread/testWorker.mjs',
{
enableTasksQueue: true,
- tasksQueueOptions: { size: 0.2 }
+ tasksQueueOptions: { size: 0.2 },
}
)
).toThrow(
{ workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
)
expect(pool.opts.workerChoiceStrategyOptions).toBeUndefined()
- expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
- retries: pool.info.maxSize,
- runTime: { median: false },
- waitTime: { median: false },
- elu: { median: false }
- })
- for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
+ for (const [, workerChoiceStrategy] of pool.workerChoiceStrategiesContext
.workerChoiceStrategies) {
expect(workerChoiceStrategy.opts).toStrictEqual({
- retries: pool.info.maxSize,
runTime: { median: false },
waitTime: { median: false },
- elu: { median: false }
+ elu: { median: false },
+ weights: expect.objectContaining({
+ 0: expect.any(Number),
+ [pool.info.maxSize - 1]: expect.any(Number),
+ }),
})
}
expect(
- pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
+ pool.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
).toStrictEqual({
runTime: {
aggregate: true,
average: true,
- median: false
+ median: false,
},
waitTime: {
- aggregate: false,
- average: false,
- median: false
+ aggregate: true,
+ average: true,
+ median: false,
},
elu: {
aggregate: true,
average: true,
- median: false
- }
+ median: false,
+ },
})
pool.setWorkerChoiceStrategyOptions({
runTime: { median: true },
- elu: { median: true }
+ elu: { median: true },
})
expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
runTime: { median: true },
- elu: { median: true }
- })
- expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
- retries: pool.info.maxSize,
- runTime: { median: true },
- waitTime: { median: false },
- elu: { median: true }
+ elu: { median: true },
})
- for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
+ for (const [, workerChoiceStrategy] of pool.workerChoiceStrategiesContext
.workerChoiceStrategies) {
expect(workerChoiceStrategy.opts).toStrictEqual({
- retries: pool.info.maxSize,
runTime: { median: true },
waitTime: { median: false },
- elu: { median: true }
+ elu: { median: true },
+ weights: expect.objectContaining({
+ 0: expect.any(Number),
+ [pool.info.maxSize - 1]: expect.any(Number),
+ }),
})
}
expect(
- pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
+ pool.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
).toStrictEqual({
runTime: {
aggregate: true,
average: false,
- median: true
+ median: true,
},
waitTime: {
- aggregate: false,
- average: false,
- median: false
+ aggregate: true,
+ average: true,
+ median: false,
},
elu: {
aggregate: true,
average: false,
- median: true
- }
+ median: true,
+ },
})
pool.setWorkerChoiceStrategyOptions({
runTime: { median: false },
- elu: { median: false }
+ elu: { median: false },
})
expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
runTime: { median: false },
- elu: { median: false }
- })
- expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
- retries: pool.info.maxSize,
- runTime: { median: false },
- waitTime: { median: false },
- elu: { median: false }
+ elu: { median: false },
})
- for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
+ for (const [, workerChoiceStrategy] of pool.workerChoiceStrategiesContext
.workerChoiceStrategies) {
expect(workerChoiceStrategy.opts).toStrictEqual({
- retries: pool.info.maxSize,
runTime: { median: false },
waitTime: { median: false },
- elu: { median: false }
+ elu: { median: false },
+ weights: expect.objectContaining({
+ 0: expect.any(Number),
+ [pool.info.maxSize - 1]: expect.any(Number),
+ }),
})
}
expect(
- pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
+ pool.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
).toStrictEqual({
runTime: {
aggregate: true,
average: true,
- median: false
+ median: false,
},
waitTime: {
- aggregate: false,
- average: false,
- median: false
+ aggregate: true,
+ average: true,
+ median: false,
},
elu: {
aggregate: true,
average: true,
- median: false
- }
+ median: false,
+ },
})
expect(() =>
pool.setWorkerChoiceStrategyOptions('invalidWorkerChoiceStrategyOptions')
concurrency: 1,
size: Math.pow(numberOfWorkers, 2),
taskStealing: true,
- tasksStealingOnBackPressure: true,
- tasksFinishedTimeout: 2000
+ tasksStealingOnBackPressure: false,
+ tasksFinishedTimeout: 2000,
})
pool.enableTasksQueue(true, { concurrency: 2 })
expect(pool.opts.enableTasksQueue).toBe(true)
concurrency: 2,
size: Math.pow(numberOfWorkers, 2),
taskStealing: true,
- tasksStealingOnBackPressure: true,
- tasksFinishedTimeout: 2000
+ tasksStealingOnBackPressure: false,
+ tasksFinishedTimeout: 2000,
})
pool.enableTasksQueue(false)
expect(pool.opts.enableTasksQueue).toBe(false)
concurrency: 1,
size: Math.pow(numberOfWorkers, 2),
taskStealing: true,
- tasksStealingOnBackPressure: true,
- tasksFinishedTimeout: 2000
+ tasksStealingOnBackPressure: false,
+ tasksFinishedTimeout: 2000,
})
for (const workerNode of pool.workerNodes) {
expect(workerNode.tasksQueueBackPressureSize).toBe(
size: 2,
taskStealing: false,
tasksStealingOnBackPressure: false,
- tasksFinishedTimeout: 3000
+ tasksFinishedTimeout: 3000,
})
expect(pool.opts.tasksQueueOptions).toStrictEqual({
concurrency: 2,
size: 2,
taskStealing: false,
tasksStealingOnBackPressure: false,
- tasksFinishedTimeout: 3000
+ tasksFinishedTimeout: 3000,
})
for (const workerNode of pool.workerNodes) {
expect(workerNode.tasksQueueBackPressureSize).toBe(
pool.setTasksQueueOptions({
concurrency: 1,
taskStealing: true,
- tasksStealingOnBackPressure: true
+ tasksStealingOnBackPressure: true,
})
expect(pool.opts.tasksQueueOptions).toStrictEqual({
concurrency: 1,
size: Math.pow(numberOfWorkers, 2),
taskStealing: true,
tasksStealingOnBackPressure: true,
- tasksFinishedTimeout: 2000
+ tasksFinishedTimeout: 2000,
})
for (const workerNode of pool.workerNodes) {
expect(workerNode.tasksQueueBackPressureSize).toBe(
worker: WorkerTypes.thread,
started: true,
ready: true,
- strategy: WorkerChoiceStrategies.ROUND_ROBIN,
+ defaultStrategy: WorkerChoiceStrategies.ROUND_ROBIN,
+ strategyRetries: 0,
minSize: numberOfWorkers,
maxSize: numberOfWorkers,
workerNodes: numberOfWorkers,
busyWorkerNodes: 0,
executedTasks: 0,
executingTasks: 0,
- failedTasks: 0
+ failedTasks: 0,
})
await pool.destroy()
pool = new DynamicClusterPool(
Math.floor(numberOfWorkers / 2),
numberOfWorkers,
- './tests/worker-files/cluster/testWorker.js'
+ './tests/worker-files/cluster/testWorker.cjs'
)
expect(pool.info).toStrictEqual({
version,
worker: WorkerTypes.cluster,
started: true,
ready: true,
- strategy: WorkerChoiceStrategies.ROUND_ROBIN,
+ defaultStrategy: WorkerChoiceStrategies.ROUND_ROBIN,
+ strategyRetries: 0,
minSize: Math.floor(numberOfWorkers / 2),
maxSize: numberOfWorkers,
workerNodes: Math.floor(numberOfWorkers / 2),
busyWorkerNodes: 0,
executedTasks: 0,
executingTasks: 0,
- failedTasks: 0
+ failedTasks: 0,
})
await pool.destroy()
})
it('Verify that pool worker tasks usage are initialized', async () => {
const pool = new FixedClusterPool(
numberOfWorkers,
- './tests/worker-files/cluster/testWorker.js'
+ './tests/worker-files/cluster/testWorker.cjs'
)
for (const workerNode of pool.workerNodes) {
expect(workerNode).toBeInstanceOf(WorkerNode)
maxQueued: 0,
sequentiallyStolen: 0,
stolen: 0,
- failed: 0
+ failed: 0,
},
runTime: {
- history: new CircularArray()
+ history: expect.any(CircularBuffer),
},
waitTime: {
- history: new CircularArray()
+ history: expect.any(CircularBuffer),
},
elu: {
idle: {
- history: new CircularArray()
+ history: expect.any(CircularBuffer),
},
active: {
- history: new CircularArray()
- }
- }
+ history: expect.any(CircularBuffer),
+ },
+ },
})
}
await pool.destroy()
it('Verify that pool worker tasks queue are initialized', async () => {
let pool = new FixedClusterPool(
numberOfWorkers,
- './tests/worker-files/cluster/testWorker.js'
+ './tests/worker-files/cluster/testWorker.cjs'
)
for (const workerNode of pool.workerNodes) {
expect(workerNode).toBeInstanceOf(WorkerNode)
- expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
+ expect(workerNode.tasksQueue).toBeInstanceOf(PriorityQueue)
expect(workerNode.tasksQueue.size).toBe(0)
expect(workerNode.tasksQueue.maxSize).toBe(0)
+ expect(workerNode.tasksQueue.bucketSize).toBe(defaultBucketSize)
+ expect(workerNode.tasksQueue.enablePriority).toBe(false)
}
await pool.destroy()
pool = new DynamicThreadPool(
)
for (const workerNode of pool.workerNodes) {
expect(workerNode).toBeInstanceOf(WorkerNode)
- expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
+ expect(workerNode.tasksQueue).toBeInstanceOf(PriorityQueue)
expect(workerNode.tasksQueue.size).toBe(0)
expect(workerNode.tasksQueue.maxSize).toBe(0)
+ expect(workerNode.tasksQueue.bucketSize).toBe(defaultBucketSize)
+ expect(workerNode.tasksQueue.enablePriority).toBe(false)
}
await pool.destroy()
})
it('Verify that pool worker info are initialized', async () => {
let pool = new FixedClusterPool(
numberOfWorkers,
- './tests/worker-files/cluster/testWorker.js'
+ './tests/worker-files/cluster/testWorker.cjs'
)
for (const workerNode of pool.workerNodes) {
expect(workerNode).toBeInstanceOf(WorkerNode)
id: expect.any(Number),
type: WorkerTypes.cluster,
dynamic: false,
- ready: true
+ ready: true,
+ stealing: false,
+ backPressure: false,
})
}
await pool.destroy()
id: expect.any(Number),
type: WorkerTypes.thread,
dynamic: false,
- ready: true
+ ready: true,
+ stealing: false,
+ backPressure: false,
})
}
await pool.destroy()
it('Verify that pool can be started after initialization', async () => {
const pool = new FixedClusterPool(
numberOfWorkers,
- './tests/worker-files/cluster/testWorker.js',
+ './tests/worker-files/cluster/testWorker.cjs',
{
- startWorkers: false
+ startWorkers: false,
}
)
expect(pool.info.started).toBe(false)
expect(pool.info.ready).toBe(false)
- expect(pool.readyEventEmitted).toBe(false)
expect(pool.workerNodes).toStrictEqual([])
+ expect(pool.readyEventEmitted).toBe(false)
await expect(pool.execute()).rejects.toThrow(
new Error('Cannot execute a task on not started pool')
)
it('Verify that pool execute() arguments are checked', async () => {
const pool = new FixedClusterPool(
numberOfWorkers,
- './tests/worker-files/cluster/testWorker.js'
+ './tests/worker-files/cluster/testWorker.cjs'
)
await expect(pool.execute(undefined, 0)).rejects.toThrow(
new TypeError('name argument must be a string')
it('Verify that pool worker tasks usage are computed', async () => {
const pool = new FixedClusterPool(
numberOfWorkers,
- './tests/worker-files/cluster/testWorker.js'
+ './tests/worker-files/cluster/testWorker.cjs'
)
const promises = new Set()
const maxMultiplier = 2
maxQueued: 0,
sequentiallyStolen: 0,
stolen: 0,
- failed: 0
+ failed: 0,
},
runTime: {
- history: expect.any(CircularArray)
+ history: expect.any(CircularBuffer),
},
waitTime: {
- history: expect.any(CircularArray)
+ history: expect.any(CircularBuffer),
},
elu: {
idle: {
- history: expect.any(CircularArray)
+ history: expect.any(CircularBuffer),
},
active: {
- history: expect.any(CircularArray)
- }
- }
+ history: expect.any(CircularBuffer),
+ },
+ },
})
}
await Promise.all(promises)
maxQueued: 0,
sequentiallyStolen: 0,
stolen: 0,
- failed: 0
+ failed: 0,
},
runTime: {
- history: expect.any(CircularArray)
+ history: expect.any(CircularBuffer),
},
waitTime: {
- history: expect.any(CircularArray)
+ history: expect.any(CircularBuffer),
},
elu: {
idle: {
- history: expect.any(CircularArray)
+ history: expect.any(CircularBuffer),
},
active: {
- history: expect.any(CircularArray)
- }
- }
+ history: expect.any(CircularBuffer),
+ },
+ },
})
}
await pool.destroy()
})
- it('Verify that pool worker tasks usage are reset at worker choice strategy change', async () => {
+ it("Verify that pool worker tasks usage aren't reset at worker choice strategy change", async () => {
const pool = new DynamicThreadPool(
Math.floor(numberOfWorkers / 2),
numberOfWorkers,
maxQueued: 0,
sequentiallyStolen: 0,
stolen: 0,
- failed: 0
+ failed: 0,
},
runTime: {
- history: expect.any(CircularArray)
+ history: expect.any(CircularBuffer),
},
waitTime: {
- history: expect.any(CircularArray)
+ history: expect.any(CircularBuffer),
},
elu: {
idle: {
- history: expect.any(CircularArray)
+ history: expect.any(CircularBuffer),
},
active: {
- history: expect.any(CircularArray)
- }
- }
+ history: expect.any(CircularBuffer),
+ },
+ },
})
expect(workerNode.usage.tasks.executed).toBeGreaterThan(0)
expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
numberOfWorkers * maxMultiplier
)
- expect(workerNode.usage.runTime.history.length).toBe(0)
- expect(workerNode.usage.waitTime.history.length).toBe(0)
- expect(workerNode.usage.elu.idle.history.length).toBe(0)
- expect(workerNode.usage.elu.active.history.length).toBe(0)
}
pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.FAIR_SHARE)
for (const workerNode of pool.workerNodes) {
expect(workerNode.usage).toStrictEqual({
tasks: {
- executed: 0,
+ executed: expect.any(Number),
executing: 0,
queued: 0,
maxQueued: 0,
sequentiallyStolen: 0,
stolen: 0,
- failed: 0
+ failed: 0,
},
runTime: {
- history: expect.any(CircularArray)
+ history: expect.any(CircularBuffer),
},
waitTime: {
- history: expect.any(CircularArray)
+ history: expect.any(CircularBuffer),
},
elu: {
idle: {
- history: expect.any(CircularArray)
+ history: expect.any(CircularBuffer),
},
active: {
- history: expect.any(CircularArray)
- }
- }
+ history: expect.any(CircularBuffer),
+ },
+ },
})
- expect(workerNode.usage.runTime.history.length).toBe(0)
- expect(workerNode.usage.waitTime.history.length).toBe(0)
- expect(workerNode.usage.elu.idle.history.length).toBe(0)
- expect(workerNode.usage.elu.active.history.length).toBe(0)
+ expect(workerNode.usage.tasks.executed).toBeGreaterThan(0)
+ expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
+ numberOfWorkers * maxMultiplier
+ )
}
await pool.destroy()
})
const pool = new DynamicClusterPool(
Math.floor(numberOfWorkers / 2),
numberOfWorkers,
- './tests/worker-files/cluster/testWorker.js'
+ './tests/worker-files/cluster/testWorker.cjs'
)
expect(pool.emitter.eventNames()).toStrictEqual([])
let poolInfo
worker: WorkerTypes.cluster,
started: true,
ready: true,
- strategy: WorkerChoiceStrategies.ROUND_ROBIN,
+ defaultStrategy: WorkerChoiceStrategies.ROUND_ROBIN,
+ strategyRetries: expect.any(Number),
minSize: expect.any(Number),
maxSize: expect.any(Number),
workerNodes: expect.any(Number),
busyWorkerNodes: expect.any(Number),
executedTasks: expect.any(Number),
executingTasks: expect.any(Number),
- failedTasks: expect.any(Number)
+ failedTasks: expect.any(Number),
})
await pool.destroy()
})
worker: WorkerTypes.thread,
started: true,
ready: true,
- strategy: WorkerChoiceStrategies.ROUND_ROBIN,
+ defaultStrategy: WorkerChoiceStrategies.ROUND_ROBIN,
+ strategyRetries: expect.any(Number),
minSize: expect.any(Number),
maxSize: expect.any(Number),
workerNodes: expect.any(Number),
busyWorkerNodes: expect.any(Number),
executedTasks: expect.any(Number),
executingTasks: expect.any(Number),
- failedTasks: expect.any(Number)
+ failedTasks: expect.any(Number),
})
await pool.destroy()
})
worker: WorkerTypes.thread,
started: true,
ready: true,
- strategy: WorkerChoiceStrategies.ROUND_ROBIN,
+ defaultStrategy: WorkerChoiceStrategies.ROUND_ROBIN,
+ strategyRetries: expect.any(Number),
minSize: expect.any(Number),
maxSize: expect.any(Number),
workerNodes: expect.any(Number),
busyWorkerNodes: expect.any(Number),
executedTasks: expect.any(Number),
executingTasks: expect.any(Number),
- failedTasks: expect.any(Number)
+ failedTasks: expect.any(Number),
})
await pool.destroy()
})
numberOfWorkers,
'./tests/worker-files/thread/testWorker.mjs',
{
- enableTasksQueue: true
+ enableTasksQueue: true,
}
)
stub(pool, 'hasBackPressure').returns(true)
worker: WorkerTypes.thread,
started: true,
ready: true,
- strategy: WorkerChoiceStrategies.ROUND_ROBIN,
+ defaultStrategy: WorkerChoiceStrategies.ROUND_ROBIN,
+ strategyRetries: expect.any(Number),
minSize: expect.any(Number),
maxSize: expect.any(Number),
workerNodes: expect.any(Number),
idleWorkerNodes: expect.any(Number),
+ stealingWorkerNodes: expect.any(Number),
busyWorkerNodes: expect.any(Number),
executedTasks: expect.any(Number),
executingTasks: expect.any(Number),
queuedTasks: expect.any(Number),
backPressure: true,
stolenTasks: expect.any(Number),
- failedTasks: expect.any(Number)
+ failedTasks: expect.any(Number),
})
- expect(pool.hasBackPressure.callCount).toBe(5)
+ expect(pool.hasBackPressure.callCount).toBeGreaterThanOrEqual(7)
await pool.destroy()
})
'./tests/worker-files/thread/asyncWorker.mjs',
{
enableTasksQueue: true,
- tasksQueueOptions: { tasksFinishedTimeout }
+ tasksQueueOptions: { tasksFinishedTimeout },
}
)
const maxMultiplier = 4
const startTime = performance.now()
await pool.destroy()
const elapsedTime = performance.now() - startTime
- expect(tasksFinished).toBe(numberOfWorkers * maxMultiplier)
+ expect(tasksFinished).toBeLessThanOrEqual(numberOfWorkers * maxMultiplier)
expect(elapsedTime).toBeGreaterThanOrEqual(2000)
- expect(elapsedTime).toBeLessThanOrEqual(tasksFinishedTimeout + 100)
+ expect(elapsedTime).toBeLessThanOrEqual(tasksFinishedTimeout + 800)
})
it('Verify that destroy() waits until the tasks finished timeout is reached', async () => {
'./tests/worker-files/thread/asyncWorker.mjs',
{
enableTasksQueue: true,
- tasksQueueOptions: { tasksFinishedTimeout }
+ tasksQueueOptions: { tasksFinishedTimeout },
}
)
const maxMultiplier = 4
await pool.destroy()
const elapsedTime = performance.now() - startTime
expect(tasksFinished).toBe(0)
- expect(elapsedTime).toBeLessThanOrEqual(tasksFinishedTimeout + 600)
+ expect(elapsedTime).toBeLessThanOrEqual(tasksFinishedTimeout + 800)
})
it('Verify that pool asynchronous resource track tasks execution', async () => {
},
promiseResolve () {
if (executionAsyncId() === taskAsyncId) resolveCalls++
- }
+ },
})
const pool = new FixedThreadPool(
numberOfWorkers,
await dynamicThreadPool.destroy()
const fixedClusterPool = new FixedClusterPool(
numberOfWorkers,
- './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
+ './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.cjs'
)
await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
expect(fixedClusterPool.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true)
new TypeError('name argument must not be an empty string')
)
await expect(dynamicThreadPool.addTaskFunction('test', 0)).rejects.toThrow(
- new TypeError('fn argument must be a function')
+ new TypeError('taskFunction property must be a function')
)
await expect(dynamicThreadPool.addTaskFunction('test', '')).rejects.toThrow(
- new TypeError('fn argument must be a function')
+ new TypeError('taskFunction property must be a function')
+ )
+ await expect(
+ dynamicThreadPool.addTaskFunction('test', { taskFunction: 0 })
+ ).rejects.toThrow(new TypeError('taskFunction property must be a function'))
+ await expect(
+ dynamicThreadPool.addTaskFunction('test', { taskFunction: '' })
+ ).rejects.toThrow(new TypeError('taskFunction property must be a function'))
+ await expect(
+ dynamicThreadPool.addTaskFunction('test', {
+ taskFunction: () => {},
+ priority: -21,
+ })
+ ).rejects.toThrow(
+ new RangeError("Property 'priority' must be between -20 and 19")
)
- expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
- DEFAULT_TASK_NAME,
- 'test'
+ await expect(
+ dynamicThreadPool.addTaskFunction('test', {
+ taskFunction: () => {},
+ priority: 20,
+ })
+ ).rejects.toThrow(
+ new RangeError("Property 'priority' must be between -20 and 19")
+ )
+ await expect(
+ dynamicThreadPool.addTaskFunction('test', {
+ taskFunction: () => {},
+ strategy: 'invalidStrategy',
+ })
+ ).rejects.toThrow(
+ new Error("Invalid worker choice strategy 'invalidStrategy'")
+ )
+ expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([
+ { name: DEFAULT_TASK_NAME },
+ { name: 'test' },
])
+ expect([
+ ...dynamicThreadPool.workerChoiceStrategiesContext.workerChoiceStrategies.keys(),
+ ]).toStrictEqual([WorkerChoiceStrategies.ROUND_ROBIN])
const echoTaskFunction = data => {
return data
}
await expect(
- dynamicThreadPool.addTaskFunction('echo', echoTaskFunction)
+ dynamicThreadPool.addTaskFunction('echo', {
+ taskFunction: echoTaskFunction,
+ strategy: WorkerChoiceStrategies.LEAST_ELU,
+ })
).resolves.toBe(true)
expect(dynamicThreadPool.taskFunctions.size).toBe(1)
- expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual(
- echoTaskFunction
- )
- expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
- DEFAULT_TASK_NAME,
- 'test',
- 'echo'
+ expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual({
+ taskFunction: echoTaskFunction,
+ strategy: WorkerChoiceStrategies.LEAST_ELU,
+ })
+ expect([
+ ...dynamicThreadPool.workerChoiceStrategiesContext.workerChoiceStrategies.keys(),
+ ]).toStrictEqual([
+ WorkerChoiceStrategies.ROUND_ROBIN,
+ WorkerChoiceStrategies.LEAST_ELU,
+ ])
+ expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([
+ { name: DEFAULT_TASK_NAME },
+ { name: 'test' },
+ { name: 'echo', strategy: WorkerChoiceStrategies.LEAST_ELU },
])
const taskFunctionData = { test: 'test' }
const echoResult = await dynamicThreadPool.execute(taskFunctionData, 'echo')
queued: 0,
sequentiallyStolen: 0,
stolen: 0,
- failed: 0
+ failed: 0,
},
runTime: {
- history: new CircularArray()
+ history: expect.any(CircularBuffer),
},
waitTime: {
- history: new CircularArray()
+ history: expect.any(CircularBuffer),
},
- elu: {
- idle: {
- history: new CircularArray()
- },
- active: {
- history: new CircularArray()
- }
- }
+ elu: expect.objectContaining({
+ idle: expect.objectContaining({
+ history: expect.any(CircularBuffer),
+ }),
+ active: expect.objectContaining({
+ history: expect.any(CircularBuffer),
+ }),
+ }),
})
+ expect(
+ workerNode.getTaskFunctionWorkerUsage('echo').tasks.executed
+ ).toBeGreaterThan(0)
+ if (
+ workerNode.getTaskFunctionWorkerUsage('echo').elu.active.aggregate ==
+ null
+ ) {
+ expect(
+ workerNode.getTaskFunctionWorkerUsage('echo').elu.active.aggregate
+ ).toBeUndefined()
+ } else {
+ expect(
+ workerNode.getTaskFunctionWorkerUsage('echo').elu.active.aggregate
+ ).toBeGreaterThan(0)
+ }
+ if (
+ workerNode.getTaskFunctionWorkerUsage('echo').elu.idle.aggregate == null
+ ) {
+ expect(
+ workerNode.getTaskFunctionWorkerUsage('echo').elu.idle.aggregate
+ ).toBeUndefined()
+ } else {
+ expect(
+ workerNode.getTaskFunctionWorkerUsage('echo').elu.idle.aggregate
+ ).toBeGreaterThanOrEqual(0)
+ }
+ if (
+ workerNode.getTaskFunctionWorkerUsage('echo').elu.utilization == null
+ ) {
+ expect(
+ workerNode.getTaskFunctionWorkerUsage('echo').elu.utilization
+ ).toBeUndefined()
+ } else {
+ expect(
+ workerNode.getTaskFunctionWorkerUsage('echo').elu.utilization
+ ).toBeGreaterThanOrEqual(0)
+ expect(
+ workerNode.getTaskFunctionWorkerUsage('echo').elu.utilization
+ ).toBeLessThanOrEqual(1)
+ }
}
await dynamicThreadPool.destroy()
})
'./tests/worker-files/thread/testWorker.mjs'
)
await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
- expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
- DEFAULT_TASK_NAME,
- 'test'
+ expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([
+ { name: DEFAULT_TASK_NAME },
+ { name: 'test' },
])
await expect(dynamicThreadPool.removeTaskFunction('test')).rejects.toThrow(
new Error('Cannot remove a task function not handled on the pool side')
const echoTaskFunction = data => {
return data
}
- await dynamicThreadPool.addTaskFunction('echo', echoTaskFunction)
+ await dynamicThreadPool.addTaskFunction('echo', {
+ taskFunction: echoTaskFunction,
+ strategy: WorkerChoiceStrategies.LEAST_ELU,
+ })
expect(dynamicThreadPool.taskFunctions.size).toBe(1)
- expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual(
- echoTaskFunction
- )
- expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
- DEFAULT_TASK_NAME,
- 'test',
- 'echo'
+ expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual({
+ taskFunction: echoTaskFunction,
+ strategy: WorkerChoiceStrategies.LEAST_ELU,
+ })
+ expect([
+ ...dynamicThreadPool.workerChoiceStrategiesContext.workerChoiceStrategies.keys(),
+ ]).toStrictEqual([
+ WorkerChoiceStrategies.ROUND_ROBIN,
+ WorkerChoiceStrategies.LEAST_ELU,
+ ])
+ expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([
+ { name: DEFAULT_TASK_NAME },
+ { name: 'test' },
+ { name: 'echo', strategy: WorkerChoiceStrategies.LEAST_ELU },
])
await expect(dynamicThreadPool.removeTaskFunction('echo')).resolves.toBe(
true
)
expect(dynamicThreadPool.taskFunctions.size).toBe(0)
expect(dynamicThreadPool.taskFunctions.get('echo')).toBeUndefined()
- expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
- DEFAULT_TASK_NAME,
- 'test'
+ expect([
+ ...dynamicThreadPool.workerChoiceStrategiesContext.workerChoiceStrategies.keys(),
+ ]).toStrictEqual([WorkerChoiceStrategies.ROUND_ROBIN])
+ expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([
+ { name: DEFAULT_TASK_NAME },
+ { name: 'test' },
])
await dynamicThreadPool.destroy()
})
- it('Verify that listTaskFunctionNames() is working', async () => {
+ it('Verify that listTaskFunctionsProperties() is working', async () => {
const dynamicThreadPool = new DynamicThreadPool(
Math.floor(numberOfWorkers / 2),
numberOfWorkers,
'./tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
)
await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
- expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
- DEFAULT_TASK_NAME,
- 'jsonIntegerSerialization',
- 'factorial',
- 'fibonacci'
+ expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([
+ { name: DEFAULT_TASK_NAME },
+ { name: 'jsonIntegerSerialization' },
+ { name: 'factorial' },
+ { name: 'fibonacci' },
])
await dynamicThreadPool.destroy()
const fixedClusterPool = new FixedClusterPool(
numberOfWorkers,
- './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
+ './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.cjs'
)
await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
- expect(fixedClusterPool.listTaskFunctionNames()).toStrictEqual([
- DEFAULT_TASK_NAME,
- 'jsonIntegerSerialization',
- 'factorial',
- 'fibonacci'
+ expect(fixedClusterPool.listTaskFunctionsProperties()).toStrictEqual([
+ { name: DEFAULT_TASK_NAME },
+ { name: 'jsonIntegerSerialization' },
+ { name: 'factorial' },
+ { name: 'fibonacci' },
])
await fixedClusterPool.destroy()
})
`Task function operation 'default' failed on worker ${workerId} with error: 'Error: Cannot set the default task function to a non-existing task function'`
)
)
- expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
- DEFAULT_TASK_NAME,
- 'jsonIntegerSerialization',
- 'factorial',
- 'fibonacci'
+ expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([
+ { name: DEFAULT_TASK_NAME },
+ { name: 'jsonIntegerSerialization' },
+ { name: 'factorial' },
+ { name: 'fibonacci' },
])
await expect(
dynamicThreadPool.setDefaultTaskFunction('factorial')
).resolves.toBe(true)
- expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
- DEFAULT_TASK_NAME,
- 'factorial',
- 'jsonIntegerSerialization',
- 'fibonacci'
+ expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([
+ { name: DEFAULT_TASK_NAME },
+ { name: 'factorial' },
+ { name: 'jsonIntegerSerialization' },
+ { name: 'fibonacci' },
])
await expect(
dynamicThreadPool.setDefaultTaskFunction('fibonacci')
).resolves.toBe(true)
- expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
- DEFAULT_TASK_NAME,
- 'fibonacci',
- 'jsonIntegerSerialization',
- 'factorial'
+ expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([
+ { name: DEFAULT_TASK_NAME },
+ { name: 'fibonacci' },
+ { name: 'jsonIntegerSerialization' },
+ { name: 'factorial' },
])
await dynamicThreadPool.destroy()
})
const pool = new DynamicClusterPool(
Math.floor(numberOfWorkers / 2),
numberOfWorkers,
- './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
+ './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.cjs'
)
const data = { n: 10 }
const result0 = await pool.execute(data)
expect(pool.info.executingTasks).toBe(0)
expect(pool.info.executedTasks).toBe(4)
for (const workerNode of pool.workerNodes) {
- expect(workerNode.info.taskFunctionNames).toStrictEqual([
- DEFAULT_TASK_NAME,
- 'jsonIntegerSerialization',
- 'factorial',
- 'fibonacci'
+ expect(workerNode.info.taskFunctionsProperties).toStrictEqual([
+ { name: DEFAULT_TASK_NAME },
+ { name: 'jsonIntegerSerialization' },
+ { name: 'factorial' },
+ { name: 'fibonacci' },
])
expect(workerNode.taskFunctionsUsage.size).toBe(3)
- for (const name of pool.listTaskFunctionNames()) {
- expect(workerNode.getTaskFunctionWorkerUsage(name)).toStrictEqual({
+ expect(workerNode.usage.tasks.executed).toBeGreaterThan(0)
+ expect(workerNode.tasksQueue.enablePriority).toBe(false)
+ for (const taskFunctionProperties of pool.listTaskFunctionsProperties()) {
+ expect(
+ workerNode.getTaskFunctionWorkerUsage(taskFunctionProperties.name)
+ ).toStrictEqual({
tasks: {
executed: expect.any(Number),
executing: 0,
failed: 0,
queued: 0,
sequentiallyStolen: 0,
- stolen: 0
+ stolen: 0,
},
runTime: {
- history: expect.any(CircularArray)
+ history: expect.any(CircularBuffer),
},
waitTime: {
- history: expect.any(CircularArray)
+ history: expect.any(CircularBuffer),
},
elu: {
idle: {
- history: expect.any(CircularArray)
+ history: expect.any(CircularBuffer),
},
active: {
- history: expect.any(CircularArray)
- }
- }
+ history: expect.any(CircularBuffer),
+ },
+ },
})
expect(
- workerNode.getTaskFunctionWorkerUsage(name).tasks.executed
+ workerNode.getTaskFunctionWorkerUsage(taskFunctionProperties.name)
+ .tasks.executed
).toBeGreaterThan(0)
}
expect(
workerNode.getTaskFunctionWorkerUsage(DEFAULT_TASK_NAME)
).toStrictEqual(
workerNode.getTaskFunctionWorkerUsage(
- workerNode.info.taskFunctionNames[1]
+ workerNode.info.taskFunctionsProperties[1].name
+ )
+ )
+ }
+ await pool.destroy()
+ })
+
+ it('Verify that task function objects worker is working', async () => {
+ const pool = new DynamicThreadPool(
+ Math.floor(numberOfWorkers / 2),
+ numberOfWorkers,
+ './tests/worker-files/thread/testTaskFunctionObjectsWorker.mjs'
+ )
+ const data = { n: 10 }
+ const result0 = await pool.execute(data)
+ expect(result0).toStrictEqual({ ok: 1 })
+ const result1 = await pool.execute(data, 'jsonIntegerSerialization')
+ expect(result1).toStrictEqual({ ok: 1 })
+ const result2 = await pool.execute(data, 'factorial')
+ expect(result2).toBe(3628800)
+ const result3 = await pool.execute(data, 'fibonacci')
+ expect(result3).toBe(55)
+ expect(pool.info.executingTasks).toBe(0)
+ expect(pool.info.executedTasks).toBe(4)
+ for (const workerNode of pool.workerNodes) {
+ expect(workerNode.info.taskFunctionsProperties).toStrictEqual([
+ { name: DEFAULT_TASK_NAME },
+ { name: 'jsonIntegerSerialization' },
+ { name: 'factorial' },
+ { name: 'fibonacci', priority: -5 },
+ ])
+ expect(workerNode.taskFunctionsUsage.size).toBe(3)
+ expect(workerNode.usage.tasks.executed).toBeGreaterThan(0)
+ expect(workerNode.tasksQueue.enablePriority).toBe(true)
+ for (const taskFunctionProperties of pool.listTaskFunctionsProperties()) {
+ expect(
+ workerNode.getTaskFunctionWorkerUsage(taskFunctionProperties.name)
+ ).toStrictEqual({
+ tasks: {
+ executed: expect.any(Number),
+ executing: 0,
+ failed: 0,
+ queued: 0,
+ sequentiallyStolen: 0,
+ stolen: 0,
+ },
+ runTime: {
+ history: expect.any(CircularBuffer),
+ },
+ waitTime: {
+ history: expect.any(CircularBuffer),
+ },
+ elu: {
+ idle: {
+ history: expect.any(CircularBuffer),
+ },
+ active: {
+ history: expect.any(CircularBuffer),
+ },
+ },
+ })
+ expect(
+ workerNode.getTaskFunctionWorkerUsage(taskFunctionProperties.name)
+ .tasks.executed
+ ).toBeGreaterThan(0)
+ }
+ expect(
+ workerNode.getTaskFunctionWorkerUsage(DEFAULT_TASK_NAME)
+ ).toStrictEqual(
+ workerNode.getTaskFunctionWorkerUsage(
+ workerNode.info.taskFunctionsProperties[1].name
)
)
}
const pool = new DynamicClusterPool(
Math.floor(numberOfWorkers / 2),
numberOfWorkers,
- './tests/worker-files/cluster/testWorker.js'
+ './tests/worker-files/cluster/testWorker.cjs'
)
const workerNodeKey = 0
await expect(
pool.sendKillMessageToWorker(workerNodeKey)
).resolves.toBeUndefined()
- await expect(
- pool.sendKillMessageToWorker(numberOfWorkers)
- ).rejects.toStrictEqual(
- new Error(`Invalid worker node key '${numberOfWorkers}'`)
- )
await pool.destroy()
})
const pool = new DynamicClusterPool(
Math.floor(numberOfWorkers / 2),
numberOfWorkers,
- './tests/worker-files/cluster/testWorker.js'
+ './tests/worker-files/cluster/testWorker.cjs'
)
const workerNodeKey = 0
await expect(
pool.sendTaskFunctionOperationToWorker(workerNodeKey, {
taskFunctionOperation: 'add',
- taskFunctionName: 'empty',
- taskFunction: (() => {}).toString()
+ taskFunctionProperties: { name: 'empty' },
+ taskFunction: (() => {}).toString(),
})
).resolves.toBe(true)
expect(
- pool.workerNodes[workerNodeKey].info.taskFunctionNames
- ).toStrictEqual([DEFAULT_TASK_NAME, 'test', 'empty'])
+ pool.workerNodes[workerNodeKey].info.taskFunctionsProperties
+ ).toStrictEqual([
+ { name: DEFAULT_TASK_NAME },
+ { name: 'test' },
+ { name: 'empty' },
+ ])
await pool.destroy()
})
const pool = new DynamicClusterPool(
Math.floor(numberOfWorkers / 2),
numberOfWorkers,
- './tests/worker-files/cluster/testWorker.js'
+ './tests/worker-files/cluster/testWorker.cjs'
)
await expect(
pool.sendTaskFunctionOperationToWorkers({
taskFunctionOperation: 'add',
- taskFunctionName: 'empty',
- taskFunction: (() => {}).toString()
+ taskFunctionProperties: { name: 'empty' },
+ taskFunction: (() => {}).toString(),
})
).resolves.toBe(true)
for (const workerNode of pool.workerNodes) {
- expect(workerNode.info.taskFunctionNames).toStrictEqual([
- DEFAULT_TASK_NAME,
- 'test',
- 'empty'
+ expect(workerNode.info.taskFunctionsProperties).toStrictEqual([
+ { name: DEFAULT_TASK_NAME },
+ { name: 'test' },
+ { name: 'empty' },
])
}
await pool.destroy()