import { expect } from 'expect'
import { restore, stub } from 'sinon'
-import { CircularArray } from '../../lib/circular-array.cjs'
-import { Deque } from '../../lib/deque.cjs'
+import { CircularBuffer } from '../../lib/circular-buffer.cjs'
import {
DynamicClusterPool,
DynamicThreadPool,
PoolEvents,
PoolTypes,
WorkerChoiceStrategies,
- WorkerTypes
+ WorkerTypes,
} from '../../lib/index.cjs'
import { WorkerNode } from '../../lib/pools/worker-node.cjs'
+import { defaultBucketSize, PriorityQueue } from '../../lib/priority-queue.cjs'
import { DEFAULT_TASK_NAME } from '../../lib/utils.cjs'
import { waitPoolEvents } from '../test-utils.cjs'
numberOfWorkers,
'./tests/worker-files/thread/testWorker.mjs',
{
- errorHandler: e => console.error(e)
+ errorHandler: e => console.error(e),
}
)
).toThrow(
enableEvents: true,
restartWorkerOnError: true,
enableTasksQueue: false,
- workerChoiceStrategy: WorkerChoiceStrategies.ROUND_ROBIN
+ workerChoiceStrategy: WorkerChoiceStrategies.ROUND_ROBIN,
})
for (const [, workerChoiceStrategy] of pool.workerChoiceStrategiesContext
.workerChoiceStrategies) {
elu: { median: false },
weights: expect.objectContaining({
0: expect.any(Number),
- [pool.info.maxSize - 1]: expect.any(Number)
- })
+ [pool.info.maxSize - 1]: expect.any(Number),
+ }),
})
}
await pool.destroy()
workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
workerChoiceStrategyOptions: {
runTime: { median: true },
- weights: { 0: 300, 1: 200 }
+ weights: { 0: 300, 1: 200 },
},
enableEvents: false,
restartWorkerOnError: false,
messageHandler: testHandler,
errorHandler: testHandler,
onlineHandler: testHandler,
- exitHandler: testHandler
+ exitHandler: testHandler,
}
)
expect(pool.emitter).toBeUndefined()
concurrency: 2,
size: Math.pow(numberOfWorkers, 2),
taskStealing: true,
- tasksStealingOnBackPressure: true,
- tasksFinishedTimeout: 2000
+ tasksStealingOnBackPressure: false,
+ tasksFinishedTimeout: 2000,
},
workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
workerChoiceStrategyOptions: {
runTime: { median: true },
- weights: { 0: 300, 1: 200 }
+ weights: { 0: 300, 1: 200 },
},
onlineHandler: testHandler,
messageHandler: testHandler,
errorHandler: testHandler,
- exitHandler: testHandler
+ exitHandler: testHandler,
})
for (const [, workerChoiceStrategy] of pool.workerChoiceStrategiesContext
.workerChoiceStrategies) {
runTime: { median: true },
waitTime: { median: false },
elu: { median: false },
- weights: { 0: 300, 1: 200 }
+ weights: { 0: 300, 1: 200 },
})
}
await pool.destroy()
numberOfWorkers,
'./tests/worker-files/thread/testWorker.mjs',
{
- workerChoiceStrategy: 'invalidStrategy'
+ workerChoiceStrategy: 'invalidStrategy',
}
)
).toThrow(new Error("Invalid worker choice strategy 'invalidStrategy'"))
numberOfWorkers,
'./tests/worker-files/thread/testWorker.mjs',
{
- workerChoiceStrategyOptions: { weights: {} }
+ workerChoiceStrategyOptions: { weights: {} },
}
)
).toThrow(
numberOfWorkers,
'./tests/worker-files/thread/testWorker.mjs',
{
- workerChoiceStrategyOptions: { measurement: 'invalidMeasurement' }
+ workerChoiceStrategyOptions: { measurement: 'invalidMeasurement' },
}
)
).toThrow(
'./tests/worker-files/thread/testWorker.mjs',
{
enableTasksQueue: true,
- tasksQueueOptions: 'invalidTasksQueueOptions'
+ tasksQueueOptions: 'invalidTasksQueueOptions',
}
)
).toThrow(
'./tests/worker-files/thread/testWorker.mjs',
{
enableTasksQueue: true,
- tasksQueueOptions: { concurrency: 0 }
+ tasksQueueOptions: { concurrency: 0 },
}
)
).toThrow(
'./tests/worker-files/thread/testWorker.mjs',
{
enableTasksQueue: true,
- tasksQueueOptions: { concurrency: -1 }
+ tasksQueueOptions: { concurrency: -1 },
}
)
).toThrow(
'./tests/worker-files/thread/testWorker.mjs',
{
enableTasksQueue: true,
- tasksQueueOptions: { concurrency: 0.2 }
+ tasksQueueOptions: { concurrency: 0.2 },
}
)
).toThrow(
'./tests/worker-files/thread/testWorker.mjs',
{
enableTasksQueue: true,
- tasksQueueOptions: { size: 0 }
+ tasksQueueOptions: { size: 0 },
}
)
).toThrow(
'./tests/worker-files/thread/testWorker.mjs',
{
enableTasksQueue: true,
- tasksQueueOptions: { size: -1 }
+ tasksQueueOptions: { size: -1 },
}
)
).toThrow(
'./tests/worker-files/thread/testWorker.mjs',
{
enableTasksQueue: true,
- tasksQueueOptions: { size: 0.2 }
+ tasksQueueOptions: { size: 0.2 },
}
)
).toThrow(
elu: { median: false },
weights: expect.objectContaining({
0: expect.any(Number),
- [pool.info.maxSize - 1]: expect.any(Number)
- })
+ [pool.info.maxSize - 1]: expect.any(Number),
+ }),
})
}
expect(
runTime: {
aggregate: true,
average: true,
- median: false
+ median: false,
},
waitTime: {
- aggregate: false,
- average: false,
- median: false
+ aggregate: true,
+ average: true,
+ median: false,
},
elu: {
aggregate: true,
average: true,
- median: false
- }
+ median: false,
+ },
})
pool.setWorkerChoiceStrategyOptions({
runTime: { median: true },
- elu: { median: true }
+ elu: { median: true },
})
expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
runTime: { median: true },
- elu: { median: true }
+ elu: { median: true },
})
for (const [, workerChoiceStrategy] of pool.workerChoiceStrategiesContext
.workerChoiceStrategies) {
elu: { median: true },
weights: expect.objectContaining({
0: expect.any(Number),
- [pool.info.maxSize - 1]: expect.any(Number)
- })
+ [pool.info.maxSize - 1]: expect.any(Number),
+ }),
})
}
expect(
runTime: {
aggregate: true,
average: false,
- median: true
+ median: true,
},
waitTime: {
- aggregate: false,
- average: false,
- median: false
+ aggregate: true,
+ average: true,
+ median: false,
},
elu: {
aggregate: true,
average: false,
- median: true
- }
+ median: true,
+ },
})
pool.setWorkerChoiceStrategyOptions({
runTime: { median: false },
- elu: { median: false }
+ elu: { median: false },
})
expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
runTime: { median: false },
- elu: { median: false }
+ elu: { median: false },
})
for (const [, workerChoiceStrategy] of pool.workerChoiceStrategiesContext
.workerChoiceStrategies) {
elu: { median: false },
weights: expect.objectContaining({
0: expect.any(Number),
- [pool.info.maxSize - 1]: expect.any(Number)
- })
+ [pool.info.maxSize - 1]: expect.any(Number),
+ }),
})
}
expect(
runTime: {
aggregate: true,
average: true,
- median: false
+ median: false,
},
waitTime: {
- aggregate: false,
- average: false,
- median: false
+ aggregate: true,
+ average: true,
+ median: false,
},
elu: {
aggregate: true,
average: true,
- median: false
- }
+ median: false,
+ },
})
expect(() =>
pool.setWorkerChoiceStrategyOptions('invalidWorkerChoiceStrategyOptions')
concurrency: 1,
size: Math.pow(numberOfWorkers, 2),
taskStealing: true,
- tasksStealingOnBackPressure: true,
- tasksFinishedTimeout: 2000
+ tasksStealingOnBackPressure: false,
+ tasksFinishedTimeout: 2000,
})
pool.enableTasksQueue(true, { concurrency: 2 })
expect(pool.opts.enableTasksQueue).toBe(true)
concurrency: 2,
size: Math.pow(numberOfWorkers, 2),
taskStealing: true,
- tasksStealingOnBackPressure: true,
- tasksFinishedTimeout: 2000
+ tasksStealingOnBackPressure: false,
+ tasksFinishedTimeout: 2000,
})
pool.enableTasksQueue(false)
expect(pool.opts.enableTasksQueue).toBe(false)
concurrency: 1,
size: Math.pow(numberOfWorkers, 2),
taskStealing: true,
- tasksStealingOnBackPressure: true,
- tasksFinishedTimeout: 2000
+ tasksStealingOnBackPressure: false,
+ tasksFinishedTimeout: 2000,
})
for (const workerNode of pool.workerNodes) {
expect(workerNode.tasksQueueBackPressureSize).toBe(
size: 2,
taskStealing: false,
tasksStealingOnBackPressure: false,
- tasksFinishedTimeout: 3000
+ tasksFinishedTimeout: 3000,
})
expect(pool.opts.tasksQueueOptions).toStrictEqual({
concurrency: 2,
size: 2,
taskStealing: false,
tasksStealingOnBackPressure: false,
- tasksFinishedTimeout: 3000
+ tasksFinishedTimeout: 3000,
})
for (const workerNode of pool.workerNodes) {
expect(workerNode.tasksQueueBackPressureSize).toBe(
pool.setTasksQueueOptions({
concurrency: 1,
taskStealing: true,
- tasksStealingOnBackPressure: true
+ tasksStealingOnBackPressure: true,
})
expect(pool.opts.tasksQueueOptions).toStrictEqual({
concurrency: 1,
size: Math.pow(numberOfWorkers, 2),
taskStealing: true,
tasksStealingOnBackPressure: true,
- tasksFinishedTimeout: 2000
+ tasksFinishedTimeout: 2000,
})
for (const workerNode of pool.workerNodes) {
expect(workerNode.tasksQueueBackPressureSize).toBe(
busyWorkerNodes: 0,
executedTasks: 0,
executingTasks: 0,
- failedTasks: 0
+ failedTasks: 0,
})
await pool.destroy()
pool = new DynamicClusterPool(
busyWorkerNodes: 0,
executedTasks: 0,
executingTasks: 0,
- failedTasks: 0
+ failedTasks: 0,
})
await pool.destroy()
})
maxQueued: 0,
sequentiallyStolen: 0,
stolen: 0,
- failed: 0
+ failed: 0,
},
runTime: {
- history: new CircularArray()
+ history: expect.any(CircularBuffer),
},
waitTime: {
- history: new CircularArray()
+ history: expect.any(CircularBuffer),
},
elu: {
idle: {
- history: new CircularArray()
+ history: expect.any(CircularBuffer),
},
active: {
- history: new CircularArray()
- }
- }
+ history: expect.any(CircularBuffer),
+ },
+ },
})
}
await pool.destroy()
)
for (const workerNode of pool.workerNodes) {
expect(workerNode).toBeInstanceOf(WorkerNode)
- expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
+ expect(workerNode.tasksQueue).toBeInstanceOf(PriorityQueue)
expect(workerNode.tasksQueue.size).toBe(0)
expect(workerNode.tasksQueue.maxSize).toBe(0)
+ expect(workerNode.tasksQueue.bucketSize).toBe(defaultBucketSize)
+ expect(workerNode.tasksQueue.enablePriority).toBe(false)
}
await pool.destroy()
pool = new DynamicThreadPool(
)
for (const workerNode of pool.workerNodes) {
expect(workerNode).toBeInstanceOf(WorkerNode)
- expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
+ expect(workerNode.tasksQueue).toBeInstanceOf(PriorityQueue)
expect(workerNode.tasksQueue.size).toBe(0)
expect(workerNode.tasksQueue.maxSize).toBe(0)
+ expect(workerNode.tasksQueue.bucketSize).toBe(defaultBucketSize)
+ expect(workerNode.tasksQueue.enablePriority).toBe(false)
}
await pool.destroy()
})
type: WorkerTypes.cluster,
dynamic: false,
ready: true,
- stealing: false
+ stealing: false,
+ backPressure: false,
})
}
await pool.destroy()
type: WorkerTypes.thread,
dynamic: false,
ready: true,
- stealing: false
+ stealing: false,
+ backPressure: false,
})
}
await pool.destroy()
numberOfWorkers,
'./tests/worker-files/cluster/testWorker.cjs',
{
- startWorkers: false
+ startWorkers: false,
}
)
expect(pool.info.started).toBe(false)
maxQueued: 0,
sequentiallyStolen: 0,
stolen: 0,
- failed: 0
+ failed: 0,
},
runTime: {
- history: expect.any(CircularArray)
+ history: expect.any(CircularBuffer),
},
waitTime: {
- history: expect.any(CircularArray)
+ history: expect.any(CircularBuffer),
},
elu: {
idle: {
- history: expect.any(CircularArray)
+ history: expect.any(CircularBuffer),
},
active: {
- history: expect.any(CircularArray)
- }
- }
+ history: expect.any(CircularBuffer),
+ },
+ },
})
}
await Promise.all(promises)
maxQueued: 0,
sequentiallyStolen: 0,
stolen: 0,
- failed: 0
+ failed: 0,
},
runTime: {
- history: expect.any(CircularArray)
+ history: expect.any(CircularBuffer),
},
waitTime: {
- history: expect.any(CircularArray)
+ history: expect.any(CircularBuffer),
},
elu: {
idle: {
- history: expect.any(CircularArray)
+ history: expect.any(CircularBuffer),
},
active: {
- history: expect.any(CircularArray)
- }
- }
+ history: expect.any(CircularBuffer),
+ },
+ },
})
}
await pool.destroy()
maxQueued: 0,
sequentiallyStolen: 0,
stolen: 0,
- failed: 0
+ failed: 0,
},
runTime: {
- history: expect.any(CircularArray)
+ history: expect.any(CircularBuffer),
},
waitTime: {
- history: expect.any(CircularArray)
+ history: expect.any(CircularBuffer),
},
elu: {
idle: {
- history: expect.any(CircularArray)
+ history: expect.any(CircularBuffer),
},
active: {
- history: expect.any(CircularArray)
- }
- }
+ history: expect.any(CircularBuffer),
+ },
+ },
})
expect(workerNode.usage.tasks.executed).toBeGreaterThan(0)
expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
numberOfWorkers * maxMultiplier
)
- expect(workerNode.usage.runTime.history.length).toBe(0)
- expect(workerNode.usage.waitTime.history.length).toBe(0)
- expect(workerNode.usage.elu.idle.history.length).toBe(0)
- expect(workerNode.usage.elu.active.history.length).toBe(0)
}
pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.FAIR_SHARE)
for (const workerNode of pool.workerNodes) {
maxQueued: 0,
sequentiallyStolen: 0,
stolen: 0,
- failed: 0
+ failed: 0,
},
runTime: {
- history: expect.any(CircularArray)
+ history: expect.any(CircularBuffer),
},
waitTime: {
- history: expect.any(CircularArray)
+ history: expect.any(CircularBuffer),
},
elu: {
idle: {
- history: expect.any(CircularArray)
+ history: expect.any(CircularBuffer),
},
active: {
- history: expect.any(CircularArray)
- }
- }
+ history: expect.any(CircularBuffer),
+ },
+ },
})
expect(workerNode.usage.tasks.executed).toBeGreaterThan(0)
expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
numberOfWorkers * maxMultiplier
)
- expect(workerNode.usage.runTime.history.length).toBe(0)
- expect(workerNode.usage.waitTime.history.length).toBe(0)
- expect(workerNode.usage.elu.idle.history.length).toBe(0)
- expect(workerNode.usage.elu.active.history.length).toBe(0)
}
await pool.destroy()
})
busyWorkerNodes: expect.any(Number),
executedTasks: expect.any(Number),
executingTasks: expect.any(Number),
- failedTasks: expect.any(Number)
+ failedTasks: expect.any(Number),
})
await pool.destroy()
})
busyWorkerNodes: expect.any(Number),
executedTasks: expect.any(Number),
executingTasks: expect.any(Number),
- failedTasks: expect.any(Number)
+ failedTasks: expect.any(Number),
})
await pool.destroy()
})
busyWorkerNodes: expect.any(Number),
executedTasks: expect.any(Number),
executingTasks: expect.any(Number),
- failedTasks: expect.any(Number)
+ failedTasks: expect.any(Number),
})
await pool.destroy()
})
numberOfWorkers,
'./tests/worker-files/thread/testWorker.mjs',
{
- enableTasksQueue: true
+ enableTasksQueue: true,
}
)
stub(pool, 'hasBackPressure').returns(true)
queuedTasks: expect.any(Number),
backPressure: true,
stolenTasks: expect.any(Number),
- failedTasks: expect.any(Number)
+ failedTasks: expect.any(Number),
})
expect(pool.hasBackPressure.callCount).toBeGreaterThanOrEqual(7)
await pool.destroy()
'./tests/worker-files/thread/asyncWorker.mjs',
{
enableTasksQueue: true,
- tasksQueueOptions: { tasksFinishedTimeout }
+ tasksQueueOptions: { tasksFinishedTimeout },
}
)
const maxMultiplier = 4
'./tests/worker-files/thread/asyncWorker.mjs',
{
enableTasksQueue: true,
- tasksQueueOptions: { tasksFinishedTimeout }
+ tasksQueueOptions: { tasksFinishedTimeout },
}
)
const maxMultiplier = 4
},
promiseResolve () {
if (executionAsyncId() === taskAsyncId) resolveCalls++
- }
+ },
})
const pool = new FixedThreadPool(
numberOfWorkers,
await expect(dynamicThreadPool.addTaskFunction('test', '')).rejects.toThrow(
new TypeError('taskFunction property must be a function')
)
+ await expect(
+ dynamicThreadPool.addTaskFunction('test', { taskFunction: 0 })
+ ).rejects.toThrow(new TypeError('taskFunction property must be a function'))
+ await expect(
+ dynamicThreadPool.addTaskFunction('test', { taskFunction: '' })
+ ).rejects.toThrow(new TypeError('taskFunction property must be a function'))
+ await expect(
+ dynamicThreadPool.addTaskFunction('test', {
+ taskFunction: () => {},
+ priority: -21,
+ })
+ ).rejects.toThrow(
+ new RangeError("Property 'priority' must be between -20 and 19")
+ )
+ await expect(
+ dynamicThreadPool.addTaskFunction('test', {
+ taskFunction: () => {},
+ priority: 20,
+ })
+ ).rejects.toThrow(
+ new RangeError("Property 'priority' must be between -20 and 19")
+ )
+ await expect(
+ dynamicThreadPool.addTaskFunction('test', {
+ taskFunction: () => {},
+ strategy: 'invalidStrategy',
+ })
+ ).rejects.toThrow(
+ new Error("Invalid worker choice strategy 'invalidStrategy'")
+ )
expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([
{ name: DEFAULT_TASK_NAME },
- { name: 'test' }
+ { name: 'test' },
])
expect([
- ...dynamicThreadPool.workerChoiceStrategiesContext.workerChoiceStrategies.keys()
+ ...dynamicThreadPool.workerChoiceStrategiesContext.workerChoiceStrategies.keys(),
]).toStrictEqual([WorkerChoiceStrategies.ROUND_ROBIN])
const echoTaskFunction = data => {
return data
await expect(
dynamicThreadPool.addTaskFunction('echo', {
taskFunction: echoTaskFunction,
- strategy: WorkerChoiceStrategies.LEAST_ELU
+ strategy: WorkerChoiceStrategies.LEAST_ELU,
})
).resolves.toBe(true)
expect(dynamicThreadPool.taskFunctions.size).toBe(1)
expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual({
taskFunction: echoTaskFunction,
- strategy: WorkerChoiceStrategies.LEAST_ELU
+ strategy: WorkerChoiceStrategies.LEAST_ELU,
})
expect([
- ...dynamicThreadPool.workerChoiceStrategiesContext.workerChoiceStrategies.keys()
+ ...dynamicThreadPool.workerChoiceStrategiesContext.workerChoiceStrategies.keys(),
]).toStrictEqual([
WorkerChoiceStrategies.ROUND_ROBIN,
- WorkerChoiceStrategies.LEAST_ELU
+ WorkerChoiceStrategies.LEAST_ELU,
])
expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([
{ name: DEFAULT_TASK_NAME },
{ name: 'test' },
- { name: 'echo', strategy: WorkerChoiceStrategies.LEAST_ELU }
+ { name: 'echo', strategy: WorkerChoiceStrategies.LEAST_ELU },
])
const taskFunctionData = { test: 'test' }
const echoResult = await dynamicThreadPool.execute(taskFunctionData, 'echo')
queued: 0,
sequentiallyStolen: 0,
stolen: 0,
- failed: 0
+ failed: 0,
},
runTime: {
- history: new CircularArray()
+ history: expect.any(CircularBuffer),
},
waitTime: {
- history: new CircularArray()
+ history: expect.any(CircularBuffer),
},
- elu: {
- idle: {
- aggregate: 0,
- maximum: 0,
- minimum: 0,
- history: new CircularArray()
- },
- active: {
- aggregate: 0,
- maximum: 0,
- minimum: 0,
- history: new CircularArray()
- }
- }
+ elu: expect.objectContaining({
+ idle: expect.objectContaining({
+ history: expect.any(CircularBuffer),
+ }),
+ active: expect.objectContaining({
+ history: expect.any(CircularBuffer),
+ }),
+ }),
})
+ expect(
+ workerNode.getTaskFunctionWorkerUsage('echo').tasks.executed
+ ).toBeGreaterThan(0)
+ if (
+ workerNode.getTaskFunctionWorkerUsage('echo').elu.active.aggregate ==
+ null
+ ) {
+ expect(
+ workerNode.getTaskFunctionWorkerUsage('echo').elu.active.aggregate
+ ).toBeUndefined()
+ } else {
+ expect(
+ workerNode.getTaskFunctionWorkerUsage('echo').elu.active.aggregate
+ ).toBeGreaterThan(0)
+ }
+ if (
+ workerNode.getTaskFunctionWorkerUsage('echo').elu.idle.aggregate == null
+ ) {
+ expect(
+ workerNode.getTaskFunctionWorkerUsage('echo').elu.idle.aggregate
+ ).toBeUndefined()
+ } else {
+ expect(
+ workerNode.getTaskFunctionWorkerUsage('echo').elu.idle.aggregate
+ ).toBeGreaterThanOrEqual(0)
+ }
+ if (
+ workerNode.getTaskFunctionWorkerUsage('echo').elu.utilization == null
+ ) {
+ expect(
+ workerNode.getTaskFunctionWorkerUsage('echo').elu.utilization
+ ).toBeUndefined()
+ } else {
+ expect(
+ workerNode.getTaskFunctionWorkerUsage('echo').elu.utilization
+ ).toBeGreaterThanOrEqual(0)
+ expect(
+ workerNode.getTaskFunctionWorkerUsage('echo').elu.utilization
+ ).toBeLessThanOrEqual(1)
+ }
}
await dynamicThreadPool.destroy()
})
await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([
{ name: DEFAULT_TASK_NAME },
- { name: 'test' }
+ { name: 'test' },
])
await expect(dynamicThreadPool.removeTaskFunction('test')).rejects.toThrow(
new Error('Cannot remove a task function not handled on the pool side')
}
await dynamicThreadPool.addTaskFunction('echo', {
taskFunction: echoTaskFunction,
- strategy: WorkerChoiceStrategies.LEAST_ELU
+ strategy: WorkerChoiceStrategies.LEAST_ELU,
})
expect(dynamicThreadPool.taskFunctions.size).toBe(1)
expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual({
taskFunction: echoTaskFunction,
- strategy: WorkerChoiceStrategies.LEAST_ELU
+ strategy: WorkerChoiceStrategies.LEAST_ELU,
})
expect([
- ...dynamicThreadPool.workerChoiceStrategiesContext.workerChoiceStrategies.keys()
+ ...dynamicThreadPool.workerChoiceStrategiesContext.workerChoiceStrategies.keys(),
]).toStrictEqual([
WorkerChoiceStrategies.ROUND_ROBIN,
- WorkerChoiceStrategies.LEAST_ELU
+ WorkerChoiceStrategies.LEAST_ELU,
])
expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([
{ name: DEFAULT_TASK_NAME },
{ name: 'test' },
- { name: 'echo', strategy: WorkerChoiceStrategies.LEAST_ELU }
+ { name: 'echo', strategy: WorkerChoiceStrategies.LEAST_ELU },
])
await expect(dynamicThreadPool.removeTaskFunction('echo')).resolves.toBe(
true
expect(dynamicThreadPool.taskFunctions.size).toBe(0)
expect(dynamicThreadPool.taskFunctions.get('echo')).toBeUndefined()
expect([
- ...dynamicThreadPool.workerChoiceStrategiesContext.workerChoiceStrategies.keys()
+ ...dynamicThreadPool.workerChoiceStrategiesContext.workerChoiceStrategies.keys(),
]).toStrictEqual([WorkerChoiceStrategies.ROUND_ROBIN])
expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([
{ name: DEFAULT_TASK_NAME },
- { name: 'test' }
+ { name: 'test' },
])
await dynamicThreadPool.destroy()
})
{ name: DEFAULT_TASK_NAME },
{ name: 'jsonIntegerSerialization' },
{ name: 'factorial' },
- { name: 'fibonacci' }
+ { name: 'fibonacci' },
])
await dynamicThreadPool.destroy()
const fixedClusterPool = new FixedClusterPool(
{ name: DEFAULT_TASK_NAME },
{ name: 'jsonIntegerSerialization' },
{ name: 'factorial' },
- { name: 'fibonacci' }
+ { name: 'fibonacci' },
])
await fixedClusterPool.destroy()
})
{ name: DEFAULT_TASK_NAME },
{ name: 'jsonIntegerSerialization' },
{ name: 'factorial' },
- { name: 'fibonacci' }
+ { name: 'fibonacci' },
])
await expect(
dynamicThreadPool.setDefaultTaskFunction('factorial')
{ name: DEFAULT_TASK_NAME },
{ name: 'factorial' },
{ name: 'jsonIntegerSerialization' },
- { name: 'fibonacci' }
+ { name: 'fibonacci' },
])
await expect(
dynamicThreadPool.setDefaultTaskFunction('fibonacci')
{ name: DEFAULT_TASK_NAME },
{ name: 'fibonacci' },
{ name: 'jsonIntegerSerialization' },
- { name: 'factorial' }
+ { name: 'factorial' },
])
await dynamicThreadPool.destroy()
})
{ name: DEFAULT_TASK_NAME },
{ name: 'jsonIntegerSerialization' },
{ name: 'factorial' },
- { name: 'fibonacci' }
+ { name: 'fibonacci' },
])
expect(workerNode.taskFunctionsUsage.size).toBe(3)
+ expect(workerNode.usage.tasks.executed).toBeGreaterThan(0)
+ expect(workerNode.tasksQueue.enablePriority).toBe(false)
for (const taskFunctionProperties of pool.listTaskFunctionsProperties()) {
expect(
workerNode.getTaskFunctionWorkerUsage(taskFunctionProperties.name)
failed: 0,
queued: 0,
sequentiallyStolen: 0,
- stolen: 0
+ stolen: 0,
},
runTime: {
- history: expect.any(CircularArray)
+ history: expect.any(CircularBuffer),
},
waitTime: {
- history: expect.any(CircularArray)
+ history: expect.any(CircularBuffer),
},
elu: {
idle: {
- history: expect.any(CircularArray)
+ history: expect.any(CircularBuffer),
},
active: {
- history: expect.any(CircularArray)
- }
- }
+ history: expect.any(CircularBuffer),
+ },
+ },
+ })
+ expect(
+ workerNode.getTaskFunctionWorkerUsage(taskFunctionProperties.name)
+ .tasks.executed
+ ).toBeGreaterThan(0)
+ }
+ expect(
+ workerNode.getTaskFunctionWorkerUsage(DEFAULT_TASK_NAME)
+ ).toStrictEqual(
+ workerNode.getTaskFunctionWorkerUsage(
+ workerNode.info.taskFunctionsProperties[1].name
+ )
+ )
+ }
+ await pool.destroy()
+ })
+
+ it('Verify that task function objects worker is working', async () => {
+ const pool = new DynamicThreadPool(
+ Math.floor(numberOfWorkers / 2),
+ numberOfWorkers,
+ './tests/worker-files/thread/testTaskFunctionObjectsWorker.mjs'
+ )
+ const data = { n: 10 }
+ const result0 = await pool.execute(data)
+ expect(result0).toStrictEqual({ ok: 1 })
+ const result1 = await pool.execute(data, 'jsonIntegerSerialization')
+ expect(result1).toStrictEqual({ ok: 1 })
+ const result2 = await pool.execute(data, 'factorial')
+ expect(result2).toBe(3628800)
+ const result3 = await pool.execute(data, 'fibonacci')
+ expect(result3).toBe(55)
+ expect(pool.info.executingTasks).toBe(0)
+ expect(pool.info.executedTasks).toBe(4)
+ for (const workerNode of pool.workerNodes) {
+ expect(workerNode.info.taskFunctionsProperties).toStrictEqual([
+ { name: DEFAULT_TASK_NAME },
+ { name: 'jsonIntegerSerialization' },
+ { name: 'factorial' },
+ { name: 'fibonacci', priority: -5 },
+ ])
+ expect(workerNode.taskFunctionsUsage.size).toBe(3)
+ expect(workerNode.usage.tasks.executed).toBeGreaterThan(0)
+ expect(workerNode.tasksQueue.enablePriority).toBe(true)
+ for (const taskFunctionProperties of pool.listTaskFunctionsProperties()) {
+ expect(
+ workerNode.getTaskFunctionWorkerUsage(taskFunctionProperties.name)
+ ).toStrictEqual({
+ tasks: {
+ executed: expect.any(Number),
+ executing: 0,
+ failed: 0,
+ queued: 0,
+ sequentiallyStolen: 0,
+ stolen: 0,
+ },
+ runTime: {
+ history: expect.any(CircularBuffer),
+ },
+ waitTime: {
+ history: expect.any(CircularBuffer),
+ },
+ elu: {
+ idle: {
+ history: expect.any(CircularBuffer),
+ },
+ active: {
+ history: expect.any(CircularBuffer),
+ },
+ },
})
expect(
workerNode.getTaskFunctionWorkerUsage(taskFunctionProperties.name)
pool.sendTaskFunctionOperationToWorker(workerNodeKey, {
taskFunctionOperation: 'add',
taskFunctionProperties: { name: 'empty' },
- taskFunction: (() => {}).toString()
+ taskFunction: (() => {}).toString(),
})
).resolves.toBe(true)
expect(
).toStrictEqual([
{ name: DEFAULT_TASK_NAME },
{ name: 'test' },
- { name: 'empty' }
+ { name: 'empty' },
])
await pool.destroy()
})
pool.sendTaskFunctionOperationToWorkers({
taskFunctionOperation: 'add',
taskFunctionProperties: { name: 'empty' },
- taskFunction: (() => {}).toString()
+ taskFunction: (() => {}).toString(),
})
).resolves.toBe(true)
for (const workerNode of pool.workerNodes) {
expect(workerNode.info.taskFunctionsProperties).toStrictEqual([
{ name: DEFAULT_TASK_NAME },
{ name: 'test' },
- { name: 'empty' }
+ { name: 'empty' },
])
}
await pool.destroy()