- protected workerListener (): (message: MessageValue<Response>) => void {
- return message => {
- if (message.id != null) {
- // Task execution response received
- const promiseResponse = this.promiseResponseMap.get(message.id)
- if (promiseResponse != null) {
- if (message.error != null) {
- promiseResponse.reject(message.error)
- if (this.emitter != null) {
- this.emitter.emit(PoolEvents.taskError, {
- error: message.error,
- errorData: message.errorData
- })
- }
- } else {
- promiseResponse.resolve(message.data as Response)
- }
- this.afterTaskExecutionHook(promiseResponse.worker, message)
- this.promiseResponseMap.delete(message.id)
- const workerNodeKey = this.getWorkerNodeKey(promiseResponse.worker)
+ protected createAndSetupDynamicWorkerNode (): number {
+ const workerNodeKey = this.createAndSetupWorkerNode()
+ this.registerWorkerMessageListener(workerNodeKey, (message) => {
+ const localWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(
+ message.workerId
+ )
+ const workerUsage = this.workerNodes[localWorkerNodeKey].usage
+ // Kill message received from worker
+ if (
+ isKillBehavior(KillBehaviors.HARD, message.kill) ||
+ (isKillBehavior(KillBehaviors.SOFT, message.kill) &&
+ ((this.opts.enableTasksQueue === false &&
+ workerUsage.tasks.executing === 0) ||
+ (this.opts.enableTasksQueue === true &&
+ workerUsage.tasks.executing === 0 &&
+ this.tasksQueueSize(localWorkerNodeKey) === 0)))
+ ) {
+ this.destroyWorkerNode(localWorkerNodeKey).catch((error) => {
+ this.emitter?.emit(PoolEvents.error, error)
+ })
+ }
+ })
+ const workerInfo = this.getWorkerInfo(workerNodeKey)
+ this.sendToWorker(workerNodeKey, {
+ checkActive: true,
+ workerId: workerInfo.id as number
+ })
+ workerInfo.dynamic = true
+ if (this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker) {
+ workerInfo.ready = true
+ }
+ return workerNodeKey
+ }
+
+ /**
+ * Registers a listener callback on the worker given its worker node key.
+ *
+ * @param workerNodeKey - The worker node key.
+ * @param listener - The message listener callback.
+ */
+ protected abstract registerWorkerMessageListener<
+ Message extends Data | Response
+ >(
+ workerNodeKey: number,
+ listener: (message: MessageValue<Message>) => void
+ ): void
+
+ /**
+ * Method hooked up after a worker node has been newly created.
+ * Can be overridden.
+ *
+ * @param workerNodeKey - The newly created worker node key.
+ */
+ protected afterWorkerNodeSetup (workerNodeKey: number): void {
+ // Listen to worker messages.
+ this.registerWorkerMessageListener(workerNodeKey, this.workerListener())
+ // Send the startup message to worker.
+ this.sendStartupMessageToWorker(workerNodeKey)
+ // Send the worker statistics message to worker.
+ this.sendWorkerStatisticsMessageToWorker(workerNodeKey)
+ }
+
+ /**
+ * Sends the startup message to worker given its worker node key.
+ *
+ * @param workerNodeKey - The worker node key.
+ */
+ protected abstract sendStartupMessageToWorker (workerNodeKey: number): void
+
+ /**
+ * Sends the worker statistics message to worker given its worker node key.
+ *
+ * @param workerNodeKey - The worker node key.
+ */
+ private sendWorkerStatisticsMessageToWorker (workerNodeKey: number): void {
+ this.sendToWorker(workerNodeKey, {
+ statistics: {
+ runTime:
+ this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
+ .runTime.aggregate,
+ elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
+ .elu.aggregate
+ },
+ workerId: this.getWorkerInfo(workerNodeKey).id as number
+ })
+ }
+
+ private redistributeQueuedTasks (workerNodeKey: number): void {
+ while (this.tasksQueueSize(workerNodeKey) > 0) {
+ let targetWorkerNodeKey: number = workerNodeKey
+ let minQueuedTasks = Infinity
+ let executeTask = false
+ for (const [workerNodeId, workerNode] of this.workerNodes.entries()) {
+ const workerInfo = this.getWorkerInfo(workerNodeId)
+ if (
+ workerNodeId !== workerNodeKey &&
+ workerInfo.ready &&
+ workerNode.usage.tasks.queued === 0
+ ) {