+ /**
+ * This method is the message listener registered on each worker.
+ */
+ protected readonly workerMessageListener = (
+ message: MessageValue<Response>
+ ): void => {
+ this.checkMessageWorkerId(message)
+ const { workerId, ready, taskId, taskFunctionNames } = message
+ if (ready != null && taskFunctionNames != null) {
+ // Worker ready response received from worker
+ this.handleWorkerReadyResponse(message)
+ } else if (taskId != null) {
+ // Task execution response received from worker
+ this.handleTaskExecutionResponse(message)
+ } else if (taskFunctionNames != null) {
+ // Task function names message received from worker
+ this.getWorkerInfo(
+ this.getWorkerNodeKeyByWorkerId(workerId)
+ ).taskFunctionNames = taskFunctionNames
+ }
+ }
+
+ private handleWorkerReadyResponse (message: MessageValue<Response>): void {
+ const { workerId, ready, taskFunctionNames } = message
+ if (ready === false) {
+ throw new Error(`Worker ${workerId as number} failed to initialize`)
+ }
+ const workerInfo = this.getWorkerInfo(
+ this.getWorkerNodeKeyByWorkerId(workerId)
+ )
+ workerInfo.ready = ready as boolean
+ workerInfo.taskFunctionNames = taskFunctionNames
+ if (!this.readyEventEmitted && this.ready) {
+ this.readyEventEmitted = true
+ this.emitter?.emit(PoolEvents.ready, this.info)
+ }
+ }
+
+ private handleTaskExecutionResponse (message: MessageValue<Response>): void {
+ const { workerId, taskId, workerError, data } = message
+ const promiseResponse = this.promiseResponseMap.get(taskId as string)
+ if (promiseResponse != null) {
+ const { resolve, reject, workerNodeKey, asyncResource } = promiseResponse
+ const workerNode = this.workerNodes[workerNodeKey]
+ if (workerError != null) {
+ this.emitter?.emit(PoolEvents.taskError, workerError)
+ asyncResource != null
+ ? asyncResource.runInAsyncScope(
+ reject,
+ this.emitter,
+ workerError.message
+ )
+ : reject(workerError.message)
+ } else {
+ asyncResource != null
+ ? asyncResource.runInAsyncScope(resolve, this.emitter, data)
+ : resolve(data as Response)
+ }
+ asyncResource?.emitDestroy()
+ this.afterTaskExecutionHook(workerNodeKey, message)
+ this.promiseResponseMap.delete(taskId as string)
+ workerNode?.emit('taskFinished', taskId)
+ if (this.opts.enableTasksQueue === true && !this.destroying) {
+ const workerNodeTasksUsage = workerNode.usage.tasks
+ if (
+ this.tasksQueueSize(workerNodeKey) > 0 &&
+ workerNodeTasksUsage.executing <
+ (this.opts.tasksQueueOptions?.concurrency as number)
+ ) {
+ this.executeTask(
+ workerNodeKey,
+ this.dequeueTask(workerNodeKey) as Task<Data>
+ )
+ }
+ if (
+ workerNodeTasksUsage.executing === 0 &&
+ this.tasksQueueSize(workerNodeKey) === 0 &&
+ workerNodeTasksUsage.sequentiallyStolen === 0
+ ) {
+ workerNode.emit('idleWorkerNode', {
+ workerId: workerId as number,
+ workerNodeKey
+ })
+ }