+ this.destroyWorkerNode(localWorkerNodeKey).catch(error => {
+ this.emitter?.emit(PoolEvents.error, error)
+ })
+ }
+ })
+ const workerInfo = this.getWorkerInfo(workerNodeKey)
+ this.sendToWorker(workerNodeKey, {
+ checkActive: true
+ })
+ if (this.taskFunctions.size > 0) {
+ for (const [taskFunctionName, taskFunction] of this.taskFunctions) {
+ this.sendTaskFunctionOperationToWorker(workerNodeKey, {
+ taskFunctionOperation: 'add',
+ taskFunctionName,
+ taskFunction: taskFunction.toString()
+ }).catch(error => {
+ this.emitter?.emit(PoolEvents.error, error)
+ })
+ }
+ }
+ workerInfo.dynamic = true
+ if (
+ this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerReady ||
+ this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage
+ ) {
+ workerInfo.ready = true
+ }
+ this.checkAndEmitDynamicWorkerCreationEvents()
+ return workerNodeKey
+ }
+
+ /**
+ * Registers a listener callback on the worker given its worker node key.
+ *
+ * @param workerNodeKey - The worker node key.
+ * @param listener - The message listener callback.
+ */
+ protected abstract registerWorkerMessageListener<
+ Message extends Data | Response
+ >(
+ workerNodeKey: number,
+ listener: (message: MessageValue<Message>) => void
+ ): void
+
+ /**
+ * Registers once a listener callback on the worker given its worker node key.
+ *
+ * @param workerNodeKey - The worker node key.
+ * @param listener - The message listener callback.
+ */
+ protected abstract registerOnceWorkerMessageListener<
+ Message extends Data | Response
+ >(
+ workerNodeKey: number,
+ listener: (message: MessageValue<Message>) => void
+ ): void
+
+ /**
+ * Deregisters a listener callback on the worker given its worker node key.
+ *
+ * @param workerNodeKey - The worker node key.
+ * @param listener - The message listener callback.
+ */
+ protected abstract deregisterWorkerMessageListener<
+ Message extends Data | Response
+ >(
+ workerNodeKey: number,
+ listener: (message: MessageValue<Message>) => void
+ ): void
+
+ /**
+ * Method hooked up after a worker node has been newly created.
+ * Can be overridden.
+ *
+ * @param workerNodeKey - The newly created worker node key.
+ */
+ protected afterWorkerNodeSetup (workerNodeKey: number): void {
+ // Listen to worker messages.
+ this.registerWorkerMessageListener(workerNodeKey, this.workerListener())
+ // Send the startup message to worker.
+ this.sendStartupMessageToWorker(workerNodeKey)
+ // Send the statistics message to worker.
+ this.sendStatisticsMessageToWorker(workerNodeKey)
+ if (this.opts.enableTasksQueue === true) {
+ if (this.opts.tasksQueueOptions?.taskStealing === true) {
+ this.workerNodes[workerNodeKey].onEmptyQueue =
+ this.taskStealingOnEmptyQueue.bind(this)
+ }
+ if (this.opts.tasksQueueOptions?.tasksStealingOnBackPressure === true) {
+ this.workerNodes[workerNodeKey].onBackPressure =
+ this.tasksStealingOnBackPressure.bind(this)
+ }
+ }
+ }
+
+ /**
+ * Sends the startup message to worker given its worker node key.
+ *
+ * @param workerNodeKey - The worker node key.
+ */
+ protected abstract sendStartupMessageToWorker (workerNodeKey: number): void
+
+ /**
+ * Sends the statistics message to worker given its worker node key.
+ *
+ * @param workerNodeKey - The worker node key.
+ */
+ private sendStatisticsMessageToWorker (workerNodeKey: number): void {
+ this.sendToWorker(workerNodeKey, {
+ statistics: {
+ runTime:
+ this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
+ .runTime.aggregate,
+ elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
+ .elu.aggregate