+ private handleWorkerReadyResponse (message: MessageValue<Response>): void {
+ if (message.ready === false) {
+ throw new Error(`Worker ${message.workerId} failed to initialize`)
+ }
+ const workerInfo = this.getWorkerInfo(
+ this.getWorkerNodeKeyByWorkerId(message.workerId)
+ )
+ workerInfo.ready = message.ready as boolean
+ workerInfo.taskFunctions = message.taskFunctions
+ if (this.emitter != null && this.ready) {
+ this.emitter.emit(PoolEvents.ready, this.info)
+ }
+ }
+
+ private handleTaskExecutionResponse (message: MessageValue<Response>): void {
+ const { taskId, taskError, data } = message
+ const promiseResponse = this.promiseResponseMap.get(taskId as string)
+ if (promiseResponse != null) {
+ if (taskError != null) {
+ this.emitter?.emit(PoolEvents.taskError, taskError)
+ promiseResponse.reject(taskError.message)
+ } else {
+ promiseResponse.resolve(data as Response)
+ }
+ const workerNodeKey = promiseResponse.workerNodeKey
+ this.afterTaskExecutionHook(workerNodeKey, message)
+ this.promiseResponseMap.delete(taskId as string)
+ if (
+ this.opts.enableTasksQueue === true &&
+ this.tasksQueueSize(workerNodeKey) > 0 &&
+ this.workerNodes[workerNodeKey].usage.tasks.executing <
+ (this.opts.tasksQueueOptions?.concurrency as number)
+ ) {
+ this.executeTask(
+ workerNodeKey,
+ this.dequeueTask(workerNodeKey) as Task<Data>
+ )
+ }
+ this.workerChoiceStrategyContext.update(workerNodeKey)
+ }
+ }
+
+ private checkAndEmitTaskExecutionEvents (): void {
+ if (this.emitter != null) {
+ if (this.busy) {
+ this.emitter.emit(PoolEvents.busy, this.info)
+ }
+ if (this.type === PoolTypes.dynamic && this.full) {
+ this.emitter.emit(PoolEvents.full, this.info)
+ }
+ }
+ }
+
+ private checkAndEmitTaskQueuingEvents (): void {
+ if (this.hasBackPressure()) {
+ this.emitter?.emit(PoolEvents.backPressure, this.info)
+ }
+ }
+
+ /**
+ * Gets the worker information given its worker node key.
+ *
+ * @param workerNodeKey - The worker node key.
+ * @returns The worker information.
+ */
+ protected getWorkerInfo (workerNodeKey: number): WorkerInfo {
+ return this.workerNodes[workerNodeKey].info
+ }
+
+ /**
+ * Adds the given worker in the pool worker nodes.
+ *
+ * @param worker - The worker.
+ * @returns The added worker node key.
+ * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
+ */
+ private addWorkerNode (worker: Worker): number {
+ const workerNode = new WorkerNode<Worker, Data>(
+ worker,
+ this.worker,
+ this.maxSize
+ )
+ // Flag the worker node as ready at pool startup.
+ if (this.starting) {
+ workerNode.info.ready = true
+ }
+ this.workerNodes.push(workerNode)
+ const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
+ if (workerNodeKey === -1) {
+ throw new Error('Worker node added not found')
+ }
+ return workerNodeKey
+ }
+
+ /**
+ * Removes the given worker from the pool worker nodes.
+ *
+ * @param worker - The worker.
+ */
+ private removeWorkerNode (worker: Worker): void {
+ const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
+ if (workerNodeKey !== -1) {
+ this.workerNodes.splice(workerNodeKey, 1)
+ this.workerChoiceStrategyContext.remove(workerNodeKey)
+ }
+ }
+
+ /** @inheritDoc */
+ public hasWorkerNodeBackPressure (workerNodeKey: number): boolean {
+ return (
+ this.opts.enableTasksQueue === true &&
+ this.workerNodes[workerNodeKey].hasBackPressure()
+ )
+ }
+
+ private hasBackPressure (): boolean {
+ return (
+ this.opts.enableTasksQueue === true &&
+ this.workerNodes.findIndex(
+ (workerNode) => !workerNode.hasBackPressure()
+ ) === -1
+ )
+ }
+
+ /**
+ * Executes the given task on the worker given its worker node key.
+ *
+ * @param workerNodeKey - The worker node key.
+ * @param task - The task to execute.
+ */
+ private executeTask (workerNodeKey: number, task: Task<Data>): void {
+ this.beforeTaskExecutionHook(workerNodeKey, task)
+ this.sendToWorker(workerNodeKey, task, task.transferList)
+ this.checkAndEmitTaskExecutionEvents()
+ }
+
+ private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
+ const tasksQueueSize = this.workerNodes[workerNodeKey].enqueueTask(task)
+ this.checkAndEmitTaskQueuingEvents()
+ return tasksQueueSize
+ }
+
+ private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
+ return this.workerNodes[workerNodeKey].dequeueTask()
+ }
+
+ private tasksQueueSize (workerNodeKey: number): number {
+ return this.workerNodes[workerNodeKey].tasksQueueSize()
+ }
+
+ protected flushTasksQueue (workerNodeKey: number): void {
+ while (this.tasksQueueSize(workerNodeKey) > 0) {
+ this.executeTask(
+ workerNodeKey,
+ this.dequeueTask(workerNodeKey) as Task<Data>
+ )
+ }
+ this.workerNodes[workerNodeKey].clearTasksQueue()
+ }
+
+ private flushTasksQueues (): void {
+ for (const [workerNodeKey] of this.workerNodes.entries()) {
+ this.flushTasksQueue(workerNodeKey)