'Cannot instantiate a pool with a negative number of workers'
)
} else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
- throw new Error('Cannot instantiate a fixed pool with no worker')
+ throw new RangeError('Cannot instantiate a fixed pool with zero worker')
+ }
+ }
+
+ protected checkDynamicPoolSize (min: number, max: number): void {
+ if (this.type === PoolTypes.dynamic && min > max) {
+ throw new RangeError(
+ 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
+ )
+ } else if (this.type === PoolTypes.dynamic && min === max) {
+ throw new RangeError(
+ 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
+ )
}
}
version,
type: this.type,
worker: this.worker,
+ ready: this.ready,
+ strategy: this.opts.workerChoiceStrategy as WorkerChoiceStrategy,
minSize: this.minSize,
maxSize: this.maxSize,
...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
}
}
+ private get starting (): boolean {
+ return (
+ !this.full ||
+ (this.full && this.workerNodes.some(workerNode => !workerNode.info.ready))
+ )
+ }
+
+ private get ready (): boolean {
+ return (
+ this.full && this.workerNodes.every(workerNode => workerNode.info.ready)
+ )
+ }
+
/**
* Gets the approximate pool utilization.
*
protected afterWorkerSetup (worker: Worker): void {
// Listen to worker messages.
this.registerWorkerMessageListener(worker, this.workerListener())
+ // Send startup message to worker.
+ this.sendToWorker(worker, {
+ ready: false,
+ workerId: this.getWorkerInfo(this.getWorkerNodeKey(worker)).id
+ })
+ // Setup worker task statistics computation.
+ this.setWorkerStatistics(worker)
}
/**
if (this.opts.enableTasksQueue === true) {
this.redistributeQueuedTasks(worker)
}
- if (this.opts.restartWorkerOnError === true) {
+ if (this.opts.restartWorkerOnError === true && !this.starting) {
if (this.getWorkerInfo(this.getWorkerNodeKey(worker)).dynamic) {
this.createAndSetupDynamicWorker()
} else {
this.pushWorkerNode(worker)
- this.setWorkerStatistics(worker)
-
this.afterWorkerSetup(worker)
return worker
*/
protected createAndSetupDynamicWorker (): Worker {
const worker = this.createAndSetupWorker()
- this.getWorkerInfo(this.getWorkerNodeKey(worker)).dynamic = true
this.registerWorkerMessageListener(worker, message => {
const workerNodeKey = this.getWorkerNodeKey(worker)
if (
void (this.destroyWorker(worker) as Promise<void>)
}
})
+ this.getWorkerInfo(this.getWorkerNodeKey(worker)).dynamic = true
this.sendToWorker(worker, { checkAlive: true })
return worker
}
*/
protected workerListener (): (message: MessageValue<Response>) => void {
return message => {
- if (message.workerId != null && message.ready != null) {
+ if (message.ready != null && message.workerId != null) {
// Worker ready message received
this.handleWorkerReadyMessage(message)
} else if (message.id != null) {
}'`
)
}
+ if (this.emitter != null && this.ready) {
+ this.emitter.emit(PoolEvents.ready, this.info)
+ }
}
private handleTaskExecutionResponse (message: MessageValue<Response>): void {
const { CircularArray } = require('../../../lib/circular-array')
const { Queue } = require('../../../lib/queue')
const { version } = require('../../../package.json')
+const { waitPoolEvents } = require('../../test-utils')
describe('Abstract pool test suite', () => {
const numberOfWorkers = 2
removeAllWorker () {
this.workerNodes = []
this.promiseResponseMap.clear()
+ this.handleWorkerReadyMessage = () => {}
}
}
class StubPoolWithIsMain extends FixedThreadPool {
)
})
+ it('Verify dynamic pool sizing', () => {
+ expect(
+ () =>
+ new DynamicThreadPool(2, 1, './tests/worker-files/thread/testWorker.js')
+ ).toThrowError(
+ new RangeError(
+ 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
+ )
+ )
+ expect(
+ () =>
+ new DynamicThreadPool(1, 1, './tests/worker-files/thread/testWorker.js')
+ ).toThrowError(
+ new RangeError(
+ 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
+ )
+ )
+ })
+
it('Verify that pool options are checked', async () => {
let pool = new FixedThreadPool(
numberOfWorkers,
).toThrowError('Invalid worker tasks concurrency: must be an integer')
})
- it('Verify that worker choice strategy options can be set', async () => {
+ it('Verify that pool worker choice strategy options can be set', async () => {
const pool = new FixedThreadPool(
numberOfWorkers,
'./tests/worker-files/thread/testWorker.js',
await pool.destroy()
})
- it('Verify that tasks queue can be enabled/disabled', async () => {
+ it('Verify that pool tasks queue can be enabled/disabled', async () => {
const pool = new FixedThreadPool(
numberOfWorkers,
'./tests/worker-files/thread/testWorker.js'
await pool.destroy()
})
- it('Verify that tasks queue options can be set', async () => {
+ it('Verify that pool tasks queue options can be set', async () => {
const pool = new FixedThreadPool(
numberOfWorkers,
'./tests/worker-files/thread/testWorker.js',
version,
type: PoolTypes.fixed,
worker: WorkerTypes.thread,
+ ready: false,
+ strategy: WorkerChoiceStrategies.ROUND_ROBIN,
minSize: numberOfWorkers,
maxSize: numberOfWorkers,
workerNodes: numberOfWorkers,
})
await pool.destroy()
pool = new DynamicClusterPool(
+ Math.floor(numberOfWorkers / 2),
numberOfWorkers,
- numberOfWorkers * 2,
'./tests/worker-files/cluster/testWorker.js'
)
expect(pool.info).toStrictEqual({
version,
type: PoolTypes.dynamic,
worker: WorkerTypes.cluster,
- minSize: numberOfWorkers,
- maxSize: numberOfWorkers * 2,
- workerNodes: numberOfWorkers,
- idleWorkerNodes: numberOfWorkers,
+ ready: false,
+ strategy: WorkerChoiceStrategies.ROUND_ROBIN,
+ minSize: Math.floor(numberOfWorkers / 2),
+ maxSize: numberOfWorkers,
+ workerNodes: Math.floor(numberOfWorkers / 2),
+ idleWorkerNodes: Math.floor(numberOfWorkers / 2),
busyWorkerNodes: 0,
executedTasks: 0,
executingTasks: 0,
await pool.destroy()
})
- it('Verify that worker pool tasks usage are initialized', async () => {
+ it('Verify that pool worker tasks usage are initialized', async () => {
const pool = new FixedClusterPool(
numberOfWorkers,
'./tests/worker-files/cluster/testWorker.js'
await pool.destroy()
})
- it('Verify that worker pool tasks queue are initialized', async () => {
- const pool = new FixedClusterPool(
+ it('Verify that pool worker tasks queue are initialized', async () => {
+ let pool = new FixedClusterPool(
numberOfWorkers,
'./tests/worker-files/cluster/testWorker.js'
)
expect(workerNode.tasksQueue.maxSize).toBe(0)
}
await pool.destroy()
+ pool = new DynamicThreadPool(
+ Math.floor(numberOfWorkers / 2),
+ numberOfWorkers,
+ './tests/worker-files/thread/testWorker.js'
+ )
+ for (const workerNode of pool.workerNodes) {
+ expect(workerNode.tasksQueue).toBeDefined()
+ expect(workerNode.tasksQueue).toBeInstanceOf(Queue)
+ expect(workerNode.tasksQueue.size).toBe(0)
+ expect(workerNode.tasksQueue.maxSize).toBe(0)
+ }
+ })
+
+ it('Verify that pool worker info are initialized', async () => {
+ let pool = new FixedClusterPool(
+ numberOfWorkers,
+ './tests/worker-files/cluster/testWorker.js'
+ )
+ for (const workerNode of pool.workerNodes) {
+ expect(workerNode.info).toStrictEqual({
+ id: expect.any(Number),
+ type: WorkerTypes.cluster,
+ dynamic: false,
+ ready: false
+ })
+ }
+ await pool.destroy()
+ pool = new DynamicThreadPool(
+ Math.floor(numberOfWorkers / 2),
+ numberOfWorkers,
+ './tests/worker-files/thread/testWorker.js'
+ )
+ for (const workerNode of pool.workerNodes) {
+ expect(workerNode.info).toStrictEqual({
+ id: expect.any(Number),
+ type: WorkerTypes.thread,
+ dynamic: false,
+ ready: false
+ })
+ }
})
- it('Verify that worker pool tasks usage are computed', async () => {
+ it('Verify that pool worker tasks usage are computed', async () => {
const pool = new FixedClusterPool(
numberOfWorkers,
'./tests/worker-files/cluster/testWorker.js'
await pool.destroy()
})
- it('Verify that worker pool tasks usage are reset at worker choice strategy change', async () => {
+ it('Verify that pool worker tasks usage are reset at worker choice strategy change', async () => {
const pool = new DynamicThreadPool(
- numberOfWorkers,
+ Math.floor(numberOfWorkers / 2),
numberOfWorkers,
'./tests/worker-files/thread/testWorker.js'
)
it("Verify that pool event emitter 'full' event can register a callback", async () => {
const pool = new DynamicThreadPool(
- numberOfWorkers,
+ Math.floor(numberOfWorkers / 2),
numberOfWorkers,
'./tests/worker-files/thread/testWorker.js'
)
promises.add(pool.execute())
}
await Promise.all(promises)
- // The `full` event is triggered when the number of submitted tasks at once reach the max number of workers in the dynamic pool.
- // So in total numberOfWorkers * 2 times for a loop submitting up to numberOfWorkers * 2 tasks to the dynamic pool with min = max = numberOfWorkers.
- expect(poolFull).toBe(numberOfWorkers * 2)
+ // The `full` event is triggered when the number of submitted tasks at once reach the maximum number of workers in the dynamic pool.
+ // So in total numberOfWorkers * 2 - 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the dynamic pool with min = (max = numberOfWorkers) / 2.
+ expect(poolFull).toBe(numberOfWorkers * 2 - 1)
expect(poolInfo).toStrictEqual({
version,
type: PoolTypes.dynamic,
worker: WorkerTypes.thread,
+ ready: expect.any(Boolean),
+ strategy: WorkerChoiceStrategies.ROUND_ROBIN,
+ minSize: expect.any(Number),
+ maxSize: expect.any(Number),
+ workerNodes: expect.any(Number),
+ idleWorkerNodes: expect.any(Number),
+ busyWorkerNodes: expect.any(Number),
+ executedTasks: expect.any(Number),
+ executingTasks: expect.any(Number),
+ queuedTasks: expect.any(Number),
+ maxQueuedTasks: expect.any(Number),
+ failedTasks: expect.any(Number)
+ })
+ await pool.destroy()
+ })
+
+ it("Verify that pool event emitter 'ready' event can register a callback", async () => {
+ const pool = new FixedClusterPool(
+ numberOfWorkers,
+ './tests/worker-files/cluster/testWorker.js'
+ )
+ let poolReady = 0
+ let poolInfo
+ pool.emitter.on(PoolEvents.ready, info => {
+ ++poolReady
+ poolInfo = info
+ })
+ await waitPoolEvents(pool, PoolEvents.ready, 1)
+ expect(poolReady).toBe(1)
+ expect(poolInfo).toStrictEqual({
+ version,
+ type: PoolTypes.fixed,
+ worker: WorkerTypes.cluster,
+ ready: true,
+ strategy: WorkerChoiceStrategies.ROUND_ROBIN,
minSize: expect.any(Number),
maxSize: expect.any(Number),
workerNodes: expect.any(Number),
version,
type: PoolTypes.fixed,
worker: WorkerTypes.thread,
+ ready: expect.any(Boolean),
+ strategy: WorkerChoiceStrategies.ROUND_ROBIN,
minSize: expect.any(Number),
maxSize: expect.any(Number),
workerNodes: expect.any(Number),
it('Verify that multiple tasks worker is working', async () => {
const pool = new DynamicClusterPool(
+ Math.floor(numberOfWorkers / 2),
numberOfWorkers,
- numberOfWorkers * 2,
'./tests/worker-files/cluster/testMultiTasksWorker.js'
)
const data = { n: 10 }