throw new RangeError(
'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
)
+ } else if (this.type === PoolTypes.dynamic && min === 0 && max === 0) {
+ throw new RangeError(
+ 'Cannot instantiate a dynamic pool with a minimum pool size and a maximum pool size equal to zero'
+ )
} else if (this.type === PoolTypes.dynamic && min === max) {
throw new RangeError(
'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
?.worker
}
+ private checkMessageWorkerId (message: MessageValue<Response>): void {
+ if (
+ message.workerId != null &&
+ this.getWorkerById(message.workerId) == null
+ ) {
+ throw new Error(
+ `Worker message received from unknown worker '${message.workerId}'`
+ )
+ }
+ }
+
/**
* Gets the given worker its worker node key.
*
// eslint-disable-next-line @typescript-eslint/consistent-type-assertions
data: data ?? ({} as Data),
timestamp,
+ workerId: this.getWorkerInfo(workerNodeKey).id as number,
id: randomUUID()
}
const res = new Promise<Response>((resolve, reject) => {
// Send startup message to worker.
this.sendToWorker(worker, {
ready: false,
- workerId: this.getWorkerInfo(this.getWorkerNodeKey(worker)).id
+ workerId: this.getWorkerInfo(this.getWorkerNodeKey(worker)).id as number
})
// Setup worker task statistics computation.
this.setWorkerStatistics(worker)
void (this.destroyWorker(worker) as Promise<void>)
}
})
- this.getWorkerInfo(this.getWorkerNodeKey(worker)).dynamic = true
- this.sendToWorker(worker, { checkAlive: true })
+ const workerInfo = this.getWorkerInfo(this.getWorkerNodeKey(worker))
+ workerInfo.dynamic = true
+ this.sendToWorker(worker, {
+ checkAlive: true,
+ workerId: workerInfo.id as number
+ })
return worker
}
*/
protected workerListener (): (message: MessageValue<Response>) => void {
return message => {
+ this.checkMessageWorkerId(message)
if (message.ready != null && message.workerId != null) {
// Worker ready message received
this.handleWorkerReadyMessage(message)
}
private handleWorkerReadyMessage (message: MessageValue<Response>): void {
- const worker = this.getWorkerById(message.workerId as number)
- if (worker != null) {
- this.getWorkerInfo(this.getWorkerNodeKey(worker)).ready =
- message.ready as boolean
- } else {
- throw new Error(
- `Worker ready message received from unknown worker '${
- message.workerId as number
- }'`
- )
- }
+ const worker = this.getWorkerById(message.workerId)
+ this.getWorkerInfo(this.getWorkerNodeKey(worker as Worker)).ready =
+ message.ready as boolean
if (this.emitter != null && this.ready) {
this.emitter.emit(PoolEvents.ready, this.info)
}
.runTime.aggregate,
elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
.elu.aggregate
- }
+ },
+ workerId: this.getWorkerInfo(this.getWorkerNodeKey(worker)).id as number
})
}
}
/** @inheritDoc */
protected destroyWorker (worker: Worker): void {
- this.sendToWorker(worker, { kill: true })
+ this.sendToWorker(worker, { kill: true, workerId: worker.id })
worker.on('disconnect', () => {
worker.kill()
})
/** @inheritDoc */
protected async destroyWorker (worker: Worker): Promise<void> {
- this.sendToWorker(worker, { kill: true })
+ this.sendToWorker(worker, { kill: true, workerId: worker.threadId })
await worker.terminate()
}
* @internal
*/
export interface Task<Data = unknown> {
+ /**
+ * Worker id.
+ */
+ readonly workerId: number
/**
* Task name.
*/
* @typeParam Data - Type of data sent to the worker triggering an error. This can only be structured-cloneable data.
*/
export interface TaskError<Data = unknown> {
- /**
- * Worker id.
- */
- readonly workerId: number
/**
* Error message.
*/
*/
export interface MessageValue<Data = unknown, ErrorData = unknown>
extends Task<Data> {
- /**
- * Worker id.
- */
- readonly workerId?: number
/**
* Kill code.
*/
if (message.ready != null && message.workerId === this.id) {
// Startup message received
this.workerReady()
- } else if (message.statistics != null) {
+ } else if (message.statistics != null && message.workerId === this.id) {
// Statistics message received
this.statistics = message.statistics
- } else if (message.checkAlive != null) {
+ } else if (message.checkAlive != null && message.workerId === this.id) {
// Check alive message received
message.checkAlive ? this.startCheckAlive() : this.stopCheckAlive()
- } else if (message.id != null && message.data != null) {
+ } else if (
+ message.id != null &&
+ message.data != null &&
+ message.workerId === this.id
+ ) {
// Task message received
const fn = this.getTaskFunction(message.name)
if (isAsyncFunction(fn)) {
} else {
this.runInAsyncScope(this.runSync.bind(this), this, fn, message)
}
- } else if (message.kill === true) {
+ } else if (message.kill === true && message.workerId === this.id) {
// Kill message received
this.stopCheckAlive()
this.emitDestroy()
performance.now() - this.lastTaskTimestamp >
(this.opts.maxInactiveTime ?? DEFAULT_MAX_INACTIVE_TIME)
) {
- this.sendToMainWorker({ kill: this.opts.killBehavior })
+ this.sendToMainWorker({ kill: this.opts.killBehavior, workerId: this.id })
}
}
const errorMessage = this.handleError(e as Error | string)
this.sendToMainWorker({
taskError: {
- workerId: this.id,
message: errorMessage,
data: message.data
},
+ workerId: this.id,
id: message.id
})
} finally {
const errorMessage = this.handleError(e as Error | string)
this.sendToMainWorker({
taskError: {
- workerId: this.id,
message: errorMessage,
data: message.data
},
+ workerId: this.id,
id: message.id
})
})
describe('Abstract pool test suite', () => {
const numberOfWorkers = 2
- class StubPoolWithRemoveAllWorker extends FixedThreadPool {
- removeAllWorker () {
- this.workerNodes = []
- this.promiseResponseMap.clear()
- this.handleWorkerReadyMessage = () => {}
- }
- }
class StubPoolWithIsMain extends FixedThreadPool {
isMain () {
return false
'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
)
)
+ expect(
+ () =>
+ new DynamicThreadPool(0, 0, './tests/worker-files/thread/testWorker.js')
+ ).toThrowError(
+ new RangeError(
+ 'Cannot instantiate a dynamic pool with a minimum pool size and a maximum pool size equal to zero'
+ )
+ )
})
it('Verify that pool options are checked', async () => {
await pool.destroy()
})
- it('Simulate worker not found', async () => {
- const pool = new StubPoolWithRemoveAllWorker(
- numberOfWorkers,
- './tests/worker-files/thread/testWorker.js',
- {
- errorHandler: e => console.error(e)
- }
- )
- expect(pool.workerNodes.length).toBe(numberOfWorkers)
- // Simulate worker not found.
- pool.removeAllWorker()
- expect(pool.workerNodes.length).toBe(0)
- await pool.destroy()
- })
-
it('Verify that pool worker tasks usage are initialized', async () => {
const pool = new FixedClusterPool(
numberOfWorkers,
expect(typeof inError === 'string').toBe(true)
expect(inError).toBe('Error Message from ClusterWorker')
expect(taskError).toStrictEqual({
- workerId: expect.any(Number),
message: 'Error Message from ClusterWorker',
data
})
expect(typeof inError === 'string').toBe(true)
expect(inError).toBe('Error Message from ClusterWorker:async')
expect(taskError).toStrictEqual({
- workerId: expect.any(Number),
message: 'Error Message from ClusterWorker:async',
data
})
expect(typeof inError.message === 'string').toBe(true)
expect(inError.message).toBe('Error Message from ThreadWorker')
expect(taskError).toStrictEqual({
- workerId: expect.any(Number),
message: new Error('Error Message from ThreadWorker'),
data
})
expect(typeof inError.message === 'string').toBe(true)
expect(inError.message).toBe('Error Message from ThreadWorker:async')
expect(taskError).toStrictEqual({
- workerId: expect.any(Number),
message: new Error('Error Message from ThreadWorker:async'),
data
})