import { expect } from 'expect'
import { CircularArray } from '../../lib/circular-array.cjs'
-import { Deque } from '../../lib/deque.cjs'
import { WorkerTypes } from '../../lib/index.cjs'
import { WorkerNode } from '../../lib/pools/worker-node.cjs'
+import { PriorityQueue } from '../../lib/priority-queue.cjs'
import { DEFAULT_TASK_NAME } from '../../lib/utils.cjs'
describe('Worker node test suite', () => {
const threadWorkerNode = new WorkerNode(
WorkerTypes.thread,
'./tests/worker-files/thread/testWorker.mjs',
- { tasksQueueBackPressureSize: 12 }
+ { tasksQueueBackPressureSize: 12, tasksQueueBucketSize: 6 }
)
const clusterWorkerNode = new WorkerNode(
WorkerTypes.cluster,
'./tests/worker-files/cluster/testWorker.cjs',
- { tasksQueueBackPressureSize: 12 }
+ { tasksQueueBackPressureSize: 12, tasksQueueBucketSize: 6 }
)
it('Worker node instantiation', () => {
'Cannot construct a worker node with a tasks queue back pressure size option that is not a positive integer'
)
)
+ expect(
+ () =>
+ new WorkerNode(
+ WorkerTypes.thread,
+ './tests/worker-files/thread/testWorker.mjs',
+ {
+ tasksQueueBackPressureSize: 12
+ }
+ )
+ ).toThrow(
+ new TypeError(
+ 'Cannot construct a worker node without a tasks queue bucket size option'
+ )
+ )
+ expect(
+ () =>
+ new WorkerNode(
+ WorkerTypes.thread,
+ './tests/worker-files/thread/testWorker.mjs',
+ {
+ tasksQueueBackPressureSize: 12,
+ tasksQueueBucketSize: 'invalidTasksQueueBucketSize'
+ }
+ )
+ ).toThrow(
+ new TypeError(
+ 'Cannot construct a worker node with a tasks queue bucket size option that is not an integer'
+ )
+ )
+ expect(
+ () =>
+ new WorkerNode(
+ WorkerTypes.thread,
+ './tests/worker-files/thread/testWorker.mjs',
+ { tasksQueueBackPressureSize: 12, tasksQueueBucketSize: 0.2 }
+ )
+ ).toThrow(
+ new TypeError(
+ 'Cannot construct a worker node with a tasks queue bucket size option that is not an integer'
+ )
+ )
+ expect(
+ () =>
+ new WorkerNode(
+ WorkerTypes.thread,
+ './tests/worker-files/thread/testWorker.mjs',
+ { tasksQueueBackPressureSize: 12, tasksQueueBucketSize: 0 }
+ )
+ ).toThrow(
+ new RangeError(
+ 'Cannot construct a worker node with a tasks queue bucket size option that is not a positive integer'
+ )
+ )
+ expect(
+ () =>
+ new WorkerNode(
+ WorkerTypes.thread,
+ './tests/worker-files/thread/testWorker.mjs',
+ { tasksQueueBackPressureSize: 12, tasksQueueBucketSize: -1 }
+ )
+ ).toThrow(
+ new RangeError(
+ 'Cannot construct a worker node with a tasks queue bucket size option that is not a positive integer'
+ )
+ )
expect(threadWorkerNode).toBeInstanceOf(WorkerNode)
expect(threadWorkerNode.worker).toBeInstanceOf(ThreadWorker)
expect(threadWorkerNode.info).toStrictEqual({
type: WorkerTypes.thread,
dynamic: false,
ready: false,
- stealing: false
+ stealing: false,
+ backPressure: false
})
expect(threadWorkerNode.usage).toStrictEqual({
tasks: {
})
expect(threadWorkerNode.messageChannel).toBeInstanceOf(MessageChannel)
expect(threadWorkerNode.tasksQueueBackPressureSize).toBe(12)
- expect(threadWorkerNode.tasksQueue).toBeInstanceOf(Deque)
+ expect(threadWorkerNode.tasksQueue).toBeInstanceOf(PriorityQueue)
expect(threadWorkerNode.tasksQueue.size).toBe(0)
+ expect(threadWorkerNode.tasksQueue.k).toBe(6)
expect(threadWorkerNode.tasksQueueSize()).toBe(
threadWorkerNode.tasksQueue.size
)
- expect(threadWorkerNode.onBackPressureStarted).toBe(false)
+ expect(threadWorkerNode.setBackPressureFlag).toBe(false)
expect(threadWorkerNode.taskFunctionsUsage).toBeInstanceOf(Map)
expect(clusterWorkerNode).toBeInstanceOf(WorkerNode)
type: WorkerTypes.cluster,
dynamic: false,
ready: false,
- stealing: false
+ stealing: false,
+ backPressure: false
})
expect(clusterWorkerNode.usage).toStrictEqual({
tasks: {
})
expect(clusterWorkerNode.messageChannel).toBeUndefined()
expect(clusterWorkerNode.tasksQueueBackPressureSize).toBe(12)
- expect(clusterWorkerNode.tasksQueue).toBeInstanceOf(Deque)
+ expect(clusterWorkerNode.tasksQueue).toBeInstanceOf(PriorityQueue)
expect(clusterWorkerNode.tasksQueue.size).toBe(0)
+ expect(clusterWorkerNode.tasksQueue.k).toBe(6)
expect(clusterWorkerNode.tasksQueueSize()).toBe(
clusterWorkerNode.tasksQueue.size
)
- expect(clusterWorkerNode.onBackPressureStarted).toBe(false)
+ expect(clusterWorkerNode.setBackPressureFlag).toBe(false)
expect(clusterWorkerNode.taskFunctionsUsage).toBeInstanceOf(Map)
})