import { restore, stub } from 'sinon'
import { CircularArray } from '../../lib/circular-array.cjs'
-import { Deque } from '../../lib/deque.cjs'
import {
DynamicClusterPool,
DynamicThreadPool,
WorkerTypes
} from '../../lib/index.cjs'
import { WorkerNode } from '../../lib/pools/worker-node.cjs'
+import { PriorityQueue } from '../../lib/priority-queue.cjs'
import { DEFAULT_TASK_NAME } from '../../lib/utils.cjs'
import { waitPoolEvents } from '../test-utils.cjs'
enableTasksQueue: false,
workerChoiceStrategy: WorkerChoiceStrategies.ROUND_ROBIN
})
- for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
+ for (const [, workerChoiceStrategy] of pool.workerChoiceStrategiesContext
.workerChoiceStrategies) {
expect(workerChoiceStrategy.opts).toStrictEqual({
runTime: { median: false },
errorHandler: testHandler,
exitHandler: testHandler
})
- for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
+ for (const [, workerChoiceStrategy] of pool.workerChoiceStrategiesContext
.workerChoiceStrategies) {
expect(workerChoiceStrategy.opts).toStrictEqual({
runTime: { median: true },
{ workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
)
expect(pool.opts.workerChoiceStrategyOptions).toBeUndefined()
- for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
+ for (const [, workerChoiceStrategy] of pool.workerChoiceStrategiesContext
.workerChoiceStrategies) {
expect(workerChoiceStrategy.opts).toStrictEqual({
runTime: { median: false },
})
}
expect(
- pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
+ pool.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
).toStrictEqual({
runTime: {
aggregate: true,
runTime: { median: true },
elu: { median: true }
})
- for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
+ for (const [, workerChoiceStrategy] of pool.workerChoiceStrategiesContext
.workerChoiceStrategies) {
expect(workerChoiceStrategy.opts).toStrictEqual({
runTime: { median: true },
})
}
expect(
- pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
+ pool.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
).toStrictEqual({
runTime: {
aggregate: true,
runTime: { median: false },
elu: { median: false }
})
- for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
+ for (const [, workerChoiceStrategy] of pool.workerChoiceStrategiesContext
.workerChoiceStrategies) {
expect(workerChoiceStrategy.opts).toStrictEqual({
runTime: { median: false },
})
}
expect(
- pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
+ pool.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
).toStrictEqual({
runTime: {
aggregate: true,
worker: WorkerTypes.thread,
started: true,
ready: true,
- strategy: WorkerChoiceStrategies.ROUND_ROBIN,
+ defaultStrategy: WorkerChoiceStrategies.ROUND_ROBIN,
strategyRetries: 0,
minSize: numberOfWorkers,
maxSize: numberOfWorkers,
worker: WorkerTypes.cluster,
started: true,
ready: true,
- strategy: WorkerChoiceStrategies.ROUND_ROBIN,
+ defaultStrategy: WorkerChoiceStrategies.ROUND_ROBIN,
strategyRetries: 0,
minSize: Math.floor(numberOfWorkers / 2),
maxSize: numberOfWorkers,
)
for (const workerNode of pool.workerNodes) {
expect(workerNode).toBeInstanceOf(WorkerNode)
- expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
+ expect(workerNode.tasksQueue).toBeInstanceOf(PriorityQueue)
expect(workerNode.tasksQueue.size).toBe(0)
expect(workerNode.tasksQueue.maxSize).toBe(0)
}
)
for (const workerNode of pool.workerNodes) {
expect(workerNode).toBeInstanceOf(WorkerNode)
- expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
+ expect(workerNode.tasksQueue).toBeInstanceOf(PriorityQueue)
expect(workerNode.tasksQueue.size).toBe(0)
expect(workerNode.tasksQueue.maxSize).toBe(0)
}
await pool.destroy()
})
- it('Verify that pool worker tasks usage are reset at worker choice strategy change', async () => {
+ it("Verify that pool worker tasks usage aren't reset at worker choice strategy change", async () => {
const pool = new DynamicThreadPool(
Math.floor(numberOfWorkers / 2),
numberOfWorkers,
for (const workerNode of pool.workerNodes) {
expect(workerNode.usage).toStrictEqual({
tasks: {
- executed: 0,
+ executed: expect.any(Number),
executing: 0,
queued: 0,
maxQueued: 0,
}
}
})
+ expect(workerNode.usage.tasks.executed).toBeGreaterThan(0)
+ expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
+ numberOfWorkers * maxMultiplier
+ )
expect(workerNode.usage.runTime.history.length).toBe(0)
expect(workerNode.usage.waitTime.history.length).toBe(0)
expect(workerNode.usage.elu.idle.history.length).toBe(0)
worker: WorkerTypes.cluster,
started: true,
ready: true,
- strategy: WorkerChoiceStrategies.ROUND_ROBIN,
+ defaultStrategy: WorkerChoiceStrategies.ROUND_ROBIN,
strategyRetries: expect.any(Number),
minSize: expect.any(Number),
maxSize: expect.any(Number),
worker: WorkerTypes.thread,
started: true,
ready: true,
- strategy: WorkerChoiceStrategies.ROUND_ROBIN,
+ defaultStrategy: WorkerChoiceStrategies.ROUND_ROBIN,
strategyRetries: expect.any(Number),
minSize: expect.any(Number),
maxSize: expect.any(Number),
worker: WorkerTypes.thread,
started: true,
ready: true,
- strategy: WorkerChoiceStrategies.ROUND_ROBIN,
+ defaultStrategy: WorkerChoiceStrategies.ROUND_ROBIN,
strategyRetries: expect.any(Number),
minSize: expect.any(Number),
maxSize: expect.any(Number),
worker: WorkerTypes.thread,
started: true,
ready: true,
- strategy: WorkerChoiceStrategies.ROUND_ROBIN,
+ defaultStrategy: WorkerChoiceStrategies.ROUND_ROBIN,
strategyRetries: expect.any(Number),
minSize: expect.any(Number),
maxSize: expect.any(Number),
{ name: DEFAULT_TASK_NAME },
{ name: 'test' }
])
+ expect([
+ ...dynamicThreadPool.workerChoiceStrategiesContext.workerChoiceStrategies.keys()
+ ]).toStrictEqual([WorkerChoiceStrategies.ROUND_ROBIN])
const echoTaskFunction = data => {
return data
}
await expect(
- dynamicThreadPool.addTaskFunction('echo', echoTaskFunction)
+ dynamicThreadPool.addTaskFunction('echo', {
+ taskFunction: echoTaskFunction,
+ strategy: WorkerChoiceStrategies.LEAST_ELU
+ })
).resolves.toBe(true)
expect(dynamicThreadPool.taskFunctions.size).toBe(1)
expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual({
- taskFunction: echoTaskFunction
+ taskFunction: echoTaskFunction,
+ strategy: WorkerChoiceStrategies.LEAST_ELU
})
+ expect([
+ ...dynamicThreadPool.workerChoiceStrategiesContext.workerChoiceStrategies.keys()
+ ]).toStrictEqual([
+ WorkerChoiceStrategies.ROUND_ROBIN,
+ WorkerChoiceStrategies.LEAST_ELU
+ ])
expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([
{ name: DEFAULT_TASK_NAME },
{ name: 'test' },
- { name: 'echo' }
+ { name: 'echo', strategy: WorkerChoiceStrategies.LEAST_ELU }
])
const taskFunctionData = { test: 'test' }
const echoResult = await dynamicThreadPool.execute(taskFunctionData, 'echo')
},
elu: {
idle: {
+ aggregate: 0,
+ maximum: 0,
+ minimum: 0,
history: new CircularArray()
},
active: {
+ aggregate: 0,
+ maximum: 0,
+ minimum: 0,
history: new CircularArray()
}
}
const echoTaskFunction = data => {
return data
}
- await dynamicThreadPool.addTaskFunction('echo', echoTaskFunction)
+ await dynamicThreadPool.addTaskFunction('echo', {
+ taskFunction: echoTaskFunction,
+ strategy: WorkerChoiceStrategies.LEAST_ELU
+ })
expect(dynamicThreadPool.taskFunctions.size).toBe(1)
expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual({
- taskFunction: echoTaskFunction
+ taskFunction: echoTaskFunction,
+ strategy: WorkerChoiceStrategies.LEAST_ELU
})
+ expect([
+ ...dynamicThreadPool.workerChoiceStrategiesContext.workerChoiceStrategies.keys()
+ ]).toStrictEqual([
+ WorkerChoiceStrategies.ROUND_ROBIN,
+ WorkerChoiceStrategies.LEAST_ELU
+ ])
expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([
{ name: DEFAULT_TASK_NAME },
{ name: 'test' },
- { name: 'echo' }
+ { name: 'echo', strategy: WorkerChoiceStrategies.LEAST_ELU }
])
await expect(dynamicThreadPool.removeTaskFunction('echo')).resolves.toBe(
true
)
expect(dynamicThreadPool.taskFunctions.size).toBe(0)
expect(dynamicThreadPool.taskFunctions.get('echo')).toBeUndefined()
+ expect([
+ ...dynamicThreadPool.workerChoiceStrategiesContext.workerChoiceStrategies.keys()
+ ]).toStrictEqual([WorkerChoiceStrategies.ROUND_ROBIN])
expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([
{ name: DEFAULT_TASK_NAME },
{ name: 'test' }
await dynamicThreadPool.destroy()
})
- it('Verify that listTaskFunctionNames() is working', async () => {
+ it('Verify that listTaskFunctionsProperties() is working', async () => {
const dynamicThreadPool = new DynamicThreadPool(
Math.floor(numberOfWorkers / 2),
numberOfWorkers,