import { restore, stub } from 'sinon'
import { CircularArray } from '../../lib/circular-array.cjs'
-import { Deque } from '../../lib/deque.cjs'
import {
DynamicClusterPool,
DynamicThreadPool,
WorkerTypes
} from '../../lib/index.cjs'
import { WorkerNode } from '../../lib/pools/worker-node.cjs'
+import { PriorityQueue } from '../../lib/priority-queue.cjs'
import { DEFAULT_TASK_NAME } from '../../lib/utils.cjs'
import { waitPoolEvents } from '../test-utils.cjs'
concurrency: 2,
size: Math.pow(numberOfWorkers, 2),
taskStealing: true,
- tasksStealingOnBackPressure: true,
+ tasksStealingOnBackPressure: false,
tasksFinishedTimeout: 2000
},
workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
median: false
},
waitTime: {
- aggregate: false,
- average: false,
+ aggregate: true,
+ average: true,
median: false
},
elu: {
median: true
},
waitTime: {
- aggregate: false,
- average: false,
+ aggregate: true,
+ average: true,
median: false
},
elu: {
median: false
},
waitTime: {
- aggregate: false,
- average: false,
+ aggregate: true,
+ average: true,
median: false
},
elu: {
concurrency: 1,
size: Math.pow(numberOfWorkers, 2),
taskStealing: true,
- tasksStealingOnBackPressure: true,
+ tasksStealingOnBackPressure: false,
tasksFinishedTimeout: 2000
})
pool.enableTasksQueue(true, { concurrency: 2 })
concurrency: 2,
size: Math.pow(numberOfWorkers, 2),
taskStealing: true,
- tasksStealingOnBackPressure: true,
+ tasksStealingOnBackPressure: false,
tasksFinishedTimeout: 2000
})
pool.enableTasksQueue(false)
concurrency: 1,
size: Math.pow(numberOfWorkers, 2),
taskStealing: true,
- tasksStealingOnBackPressure: true,
+ tasksStealingOnBackPressure: false,
tasksFinishedTimeout: 2000
})
for (const workerNode of pool.workerNodes) {
)
for (const workerNode of pool.workerNodes) {
expect(workerNode).toBeInstanceOf(WorkerNode)
- expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
+ expect(workerNode.tasksQueue).toBeInstanceOf(PriorityQueue)
expect(workerNode.tasksQueue.size).toBe(0)
expect(workerNode.tasksQueue.maxSize).toBe(0)
+ expect(workerNode.tasksQueue.k).toBe(numberOfWorkers * 2)
}
await pool.destroy()
pool = new DynamicThreadPool(
)
for (const workerNode of pool.workerNodes) {
expect(workerNode).toBeInstanceOf(WorkerNode)
- expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
+ expect(workerNode.tasksQueue).toBeInstanceOf(PriorityQueue)
expect(workerNode.tasksQueue.size).toBe(0)
expect(workerNode.tasksQueue.maxSize).toBe(0)
+ expect(workerNode.tasksQueue.k).toBe(numberOfWorkers * 2)
}
await pool.destroy()
})
type: WorkerTypes.cluster,
dynamic: false,
ready: true,
- stealing: false
+ stealing: false,
+ backPressure: false
})
}
await pool.destroy()
type: WorkerTypes.thread,
dynamic: false,
ready: true,
- stealing: false
+ stealing: false,
+ backPressure: false
})
}
await pool.destroy()
await expect(dynamicThreadPool.addTaskFunction('test', '')).rejects.toThrow(
new TypeError('taskFunction property must be a function')
)
+ await expect(
+ dynamicThreadPool.addTaskFunction('test', { taskFunction: 0 })
+ ).rejects.toThrow(new TypeError('taskFunction property must be a function'))
+ await expect(
+ dynamicThreadPool.addTaskFunction('test', { taskFunction: '' })
+ ).rejects.toThrow(new TypeError('taskFunction property must be a function'))
+ await expect(
+ dynamicThreadPool.addTaskFunction('test', {
+ taskFunction: () => {},
+ priority: -21
+ })
+ ).rejects.toThrow(
+ new RangeError("Property 'priority' must be between -20 and 19")
+ )
+ await expect(
+ dynamicThreadPool.addTaskFunction('test', {
+ taskFunction: () => {},
+ priority: 20
+ })
+ ).rejects.toThrow(
+ new RangeError("Property 'priority' must be between -20 and 19")
+ )
+ await expect(
+ dynamicThreadPool.addTaskFunction('test', {
+ taskFunction: () => {},
+ strategy: 'invalidStrategy'
+ })
+ ).rejects.toThrow(
+ new Error("Invalid worker choice strategy 'invalidStrategy'")
+ )
expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([
{ name: DEFAULT_TASK_NAME },
{ name: 'test' }
waitTime: {
history: new CircularArray()
},
- elu: {
- idle: {
- aggregate: 0,
- maximum: 0,
- minimum: 0,
- history: new CircularArray()
- },
- active: {
- aggregate: 0,
- maximum: 0,
- minimum: 0,
- history: new CircularArray()
- }
- }
+ elu: expect.objectContaining({
+ idle: expect.objectContaining({
+ history: expect.any(CircularArray)
+ }),
+ active: expect.objectContaining({
+ history: expect.any(CircularArray)
+ })
+ })
})
+ expect(
+ workerNode.getTaskFunctionWorkerUsage('echo').tasks.executed
+ ).toBeGreaterThan(0)
+ if (
+ workerNode.getTaskFunctionWorkerUsage('echo').elu.active.aggregate ==
+ null
+ ) {
+ expect(
+ workerNode.getTaskFunctionWorkerUsage('echo').elu.active.aggregate
+ ).toBeUndefined()
+ } else {
+ expect(
+ workerNode.getTaskFunctionWorkerUsage('echo').elu.active.aggregate
+ ).toBeGreaterThan(0)
+ }
+ if (
+ workerNode.getTaskFunctionWorkerUsage('echo').elu.idle.aggregate == null
+ ) {
+ expect(
+ workerNode.getTaskFunctionWorkerUsage('echo').elu.idle.aggregate
+ ).toBeUndefined()
+ } else {
+ expect(
+ workerNode.getTaskFunctionWorkerUsage('echo').elu.idle.aggregate
+ ).toBeGreaterThanOrEqual(0)
+ }
+ if (
+ workerNode.getTaskFunctionWorkerUsage('echo').elu.utilization == null
+ ) {
+ expect(
+ workerNode.getTaskFunctionWorkerUsage('echo').elu.utilization
+ ).toBeUndefined()
+ } else {
+ expect(
+ workerNode.getTaskFunctionWorkerUsage('echo').elu.utilization
+ ).toBeGreaterThanOrEqual(0)
+ expect(
+ workerNode.getTaskFunctionWorkerUsage('echo').elu.utilization
+ ).toBeLessThanOrEqual(1)
+ }
}
await dynamicThreadPool.destroy()
})