+
+ /**
+ * This function is the listener registered for each worker message.
+ *
+ * @returns The listener function to execute when a message is received from a worker.
+ */
+ protected workerListener (): (message: MessageValue<Response>) => void {
+ return message => {
+ if (message.id != null) {
+ // Task execution response received
+ const promiseResponse = this.promiseResponseMap.get(message.id)
+ if (promiseResponse != null) {
+ if (message.error != null) {
+ promiseResponse.reject(message.error)
+ } else {
+ promiseResponse.resolve(message.data as Response)
+ }
+ this.afterTaskExecutionHook(promiseResponse.worker, message)
+ this.promiseResponseMap.delete(message.id)
+ const workerNodeKey = this.getWorkerNodeKey(promiseResponse.worker)
+ if (
+ this.opts.enableTasksQueue === true &&
+ this.tasksQueueSize(workerNodeKey) > 0
+ ) {
+ this.executeTask(
+ workerNodeKey,
+ this.dequeueTask(workerNodeKey) as Task<Data>
+ )
+ }
+ }
+ }
+ }
+ }
+
+ private checkAndEmitEvents (): void {
+ if (this.opts.enableEvents === true) {
+ if (this.busy) {
+ this.emitter?.emit(PoolEvents.busy)
+ }
+ if (this.type === PoolType.DYNAMIC && this.full) {
+ this.emitter?.emit(PoolEvents.full)
+ }
+ }
+ }
+
+ /**
+ * Sets the given worker node its tasks usage in the pool.
+ *
+ * @param workerNode - The worker node.
+ * @param tasksUsage - The worker node tasks usage.
+ */
+ private setWorkerNodeTasksUsage (
+ workerNode: WorkerNode<Worker, Data>,
+ tasksUsage: TasksUsage
+ ): void {
+ workerNode.tasksUsage = tasksUsage
+ }
+
+ /**
+ * Pushes the given worker in the pool worker nodes.
+ *
+ * @param worker - The worker.
+ * @returns The worker nodes length.
+ */
+ private pushWorkerNode (worker: Worker): number {
+ return this.workerNodes.push({
+ worker,
+ tasksUsage: {
+ run: 0,
+ running: 0,
+ runTime: 0,
+ runTimeHistory: new CircularArray(),
+ avgRunTime: 0,
+ medRunTime: 0,
+ error: 0
+ },
+ tasksQueue: new Queue<Task<Data>>()
+ })
+ }
+
+ /**
+ * Sets the given worker in the pool worker nodes.
+ *
+ * @param workerNodeKey - The worker node key.
+ * @param worker - The worker.
+ * @param tasksUsage - The worker tasks usage.
+ * @param tasksQueue - The worker task queue.
+ */
+ private setWorkerNode (
+ workerNodeKey: number,
+ worker: Worker,
+ tasksUsage: TasksUsage,
+ tasksQueue: Queue<Task<Data>>
+ ): void {
+ this.workerNodes[workerNodeKey] = {
+ worker,
+ tasksUsage,
+ tasksQueue
+ }
+ }
+
+ /**
+ * Removes the given worker from the pool worker nodes.
+ *
+ * @param worker - The worker.
+ */
+ private removeWorkerNode (worker: Worker): void {
+ const workerNodeKey = this.getWorkerNodeKey(worker)
+ this.workerNodes.splice(workerNodeKey, 1)
+ this.workerChoiceStrategyContext.remove(workerNodeKey)
+ }
+
+ private executeTask (workerNodeKey: number, task: Task<Data>): void {
+ this.beforeTaskExecutionHook(workerNodeKey)
+ this.sendToWorker(this.workerNodes[workerNodeKey].worker, task)
+ }
+
+ private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
+ return this.workerNodes[workerNodeKey].tasksQueue.enqueue(task)
+ }
+
+ private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
+ return this.workerNodes[workerNodeKey].tasksQueue.dequeue()
+ }
+
+ private tasksQueueSize (workerNodeKey: number): number {
+ return this.workerNodes[workerNodeKey].tasksQueue.size
+ }
+
+ private flushTasksQueue (workerNodeKey: number): void {
+ if (this.tasksQueueSize(workerNodeKey) > 0) {
+ for (let i = 0; i < this.tasksQueueSize(workerNodeKey); i++) {
+ this.executeTask(
+ workerNodeKey,
+ this.dequeueTask(workerNodeKey) as Task<Data>
+ )
+ }
+ }
+ }
+
+ private flushTasksQueues (): void {
+ for (const [workerNodeKey] of this.workerNodes.entries()) {
+ this.flushTasksQueue(workerNodeKey)
+ }
+ }