* Whether the pool is starting or not.
*/
private starting: boolean
+ /**
+ * Whether the pool is destroying or not.
+ */
+ private destroying: boolean
/**
* The start timestamp of the pool.
*/
this.started = false
this.starting = false
+ this.destroying = false
if (this.opts.startWorkers === true) {
this.start()
}
reject(new Error('Cannot execute a task on not started pool'))
return
}
+ if (this.destroying) {
+ reject(new Error('Cannot execute a task on destroying pool'))
+ return
+ }
if (name != null && typeof name !== 'string') {
reject(new TypeError('name argument must be a string'))
return
/** @inheritdoc */
public start (): void {
+ if (this.started) {
+ throw new Error('Cannot start an already started pool')
+ }
+ if (this.starting) {
+ throw new Error('Cannot start an already starting pool')
+ }
+ if (this.destroying) {
+ throw new Error('Cannot start a destroying pool')
+ }
this.starting = true
while (
this.workerNodes.reduce(
/** @inheritDoc */
public async destroy (): Promise<void> {
+ if (!this.started) {
+ throw new Error('Cannot destroy an already destroyed pool')
+ }
+ if (this.starting) {
+ throw new Error('Cannot destroy an starting pool')
+ }
+ if (this.destroying) {
+ throw new Error('Cannot destroy an already destroying pool')
+ }
+ this.destroying = true
await Promise.all(
this.workerNodes.map(async (_, workerNodeKey) => {
await this.destroyWorkerNode(workerNodeKey)
)
this.emitter?.emit(PoolEvents.destroy, this.info)
this.emitter?.emitDestroy()
+ this.destroying = false
this.started = false
}
if (
this.started &&
!this.starting &&
+ !this.destroying &&
this.opts.restartWorkerOnError === true
) {
if (workerInfo.dynamic) {
this.createAndSetupWorkerNode()
}
}
- if (this.started && this.opts.enableTasksQueue === true) {
+ if (
+ this.started &&
+ !this.destroying &&
+ this.opts.enableTasksQueue === true
+ ) {
this.redistributeQueuedTasks(workerNodeKey)
}
})
numberOfWorkers,
'./tests/worker-files/thread/testWorker.mjs'
)
- expect(pool.starting).toBe(false)
expect(pool.started).toBe(true)
+ expect(pool.starting).toBe(false)
+ expect(pool.destroying).toBe(false)
await pool.destroy()
})
await pool.destroy()
})
+ it('Verify that pool statuses are checked at start or destroy', async () => {
+ const pool = new FixedThreadPool(
+ numberOfWorkers,
+ './tests/worker-files/thread/testWorker.mjs'
+ )
+ expect(pool.info.started).toBe(true)
+ expect(pool.info.ready).toBe(true)
+ expect(() => pool.start()).toThrow(
+ new Error('Cannot start an already started pool')
+ )
+ await pool.destroy()
+ expect(pool.info.started).toBe(false)
+ expect(pool.info.ready).toBe(false)
+ await expect(pool.destroy()).rejects.toThrow(
+ new Error('Cannot destroy an already destroyed pool')
+ )
+ })
+
it('Verify that pool can be started after initialization', async () => {
const pool = new FixedClusterPool(
numberOfWorkers,
'./tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
)
await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
+ const workerId = dynamicThreadPool.workerNodes[0].info.id
await expect(dynamicThreadPool.setDefaultTaskFunction(0)).rejects.toThrow(
new Error(
- "Task function operation 'default' failed on worker 33 with error: 'TypeError: name parameter is not a string'"
+ `Task function operation 'default' failed on worker ${workerId} with error: 'TypeError: name parameter is not a string'`
)
)
await expect(
dynamicThreadPool.setDefaultTaskFunction(DEFAULT_TASK_NAME)
).rejects.toThrow(
new Error(
- "Task function operation 'default' failed on worker 33 with error: 'Error: Cannot set the default task function reserved name as the default task function'"
+ `Task function operation 'default' failed on worker ${workerId} with error: 'Error: Cannot set the default task function reserved name as the default task function'`
)
)
await expect(
dynamicThreadPool.setDefaultTaskFunction('unknown')
).rejects.toThrow(
new Error(
- "Task function operation 'default' failed on worker 33 with error: 'Error: Cannot set the default task function to a non-existing task function'"
+ `Task function operation 'default' failed on worker ${workerId} with error: 'Error: Cannot set the default task function to a non-existing task function'`
)
)
expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([