+ const workerInfo = this.getWorkerInfo(workerNodeKey) as WorkerInfo
+ this.sendToWorker(workerNodeKey, {
+ checkActive: true,
+ workerId: workerInfo.id as number
+ })
+ workerInfo.dynamic = true
+ if (
+ this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerReady ||
+ this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage
+ ) {
+ workerInfo.ready = true
+ }
+ this.checkAndEmitDynamicWorkerCreationEvents()
+ return workerNodeKey
+ }
+
+ /**
+ * Registers a listener callback on the worker given its worker node key.
+ *
+ * @param workerNodeKey - The worker node key.
+ * @param listener - The message listener callback.
+ */
+ protected abstract registerWorkerMessageListener<
+ Message extends Data | Response
+ >(
+ workerNodeKey: number,
+ listener: (message: MessageValue<Message>) => void
+ ): void
+
+ /**
+ * Method hooked up after a worker node has been newly created.
+ * Can be overridden.
+ *
+ * @param workerNodeKey - The newly created worker node key.
+ */
+ protected afterWorkerNodeSetup (workerNodeKey: number): void {
+ // Listen to worker messages.
+ this.registerWorkerMessageListener(workerNodeKey, this.workerListener())
+ // Send the startup message to worker.
+ this.sendStartupMessageToWorker(workerNodeKey)
+ // Send the statistics message to worker.
+ this.sendStatisticsMessageToWorker(workerNodeKey)
+ if (this.opts.enableTasksQueue === true) {
+ this.workerNodes[workerNodeKey].onEmptyQueue =
+ this.taskStealingOnEmptyQueue.bind(this)
+ this.workerNodes[workerNodeKey].onBackPressure =
+ this.tasksStealingOnBackPressure.bind(this)
+ }
+ }
+
+ /**
+ * Sends the startup message to worker given its worker node key.
+ *
+ * @param workerNodeKey - The worker node key.
+ */
+ protected abstract sendStartupMessageToWorker (workerNodeKey: number): void
+
+ /**
+ * Sends the statistics message to worker given its worker node key.
+ *
+ * @param workerNodeKey - The worker node key.
+ */
+ private sendStatisticsMessageToWorker (workerNodeKey: number): void {
+ this.sendToWorker(workerNodeKey, {
+ statistics: {
+ runTime:
+ this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
+ .runTime.aggregate,
+ elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
+ .elu.aggregate
+ },
+ workerId: (this.getWorkerInfo(workerNodeKey) as WorkerInfo).id as number
+ })
+ }
+
+ private redistributeQueuedTasks (workerNodeKey: number): void {
+ while (this.tasksQueueSize(workerNodeKey) > 0) {
+ let destinationWorkerNodeKey!: number
+ let minQueuedTasks = Infinity
+ for (const [workerNodeId, workerNode] of this.workerNodes.entries()) {
+ if (workerNode.info.ready && workerNodeId !== workerNodeKey) {
+ if (workerNode.usage.tasks.queued === 0) {
+ destinationWorkerNodeKey = workerNodeId
+ break
+ }
+ if (workerNode.usage.tasks.queued < minQueuedTasks) {
+ minQueuedTasks = workerNode.usage.tasks.queued
+ destinationWorkerNodeKey = workerNodeId
+ }
+ }
+ }
+ if (destinationWorkerNodeKey != null) {
+ const destinationWorkerNode = this.workerNodes[destinationWorkerNodeKey]
+ const task = {
+ ...(this.dequeueTask(workerNodeKey) as Task<Data>),
+ workerId: destinationWorkerNode.info.id as number
+ }
+ if (
+ this.tasksQueueSize(destinationWorkerNodeKey) === 0 &&
+ destinationWorkerNode.usage.tasks.executing <
+ (this.opts.tasksQueueOptions?.concurrency as number)
+ ) {
+ this.executeTask(destinationWorkerNodeKey, task)
+ } else {
+ this.enqueueTask(destinationWorkerNodeKey, task)
+ }
+ }
+ }
+ }
+
+ private taskStealingOnEmptyQueue (workerId: number): void {
+ const destinationWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(workerId)
+ const destinationWorkerNode = this.workerNodes[destinationWorkerNodeKey]
+ const workerNodes = this.workerNodes
+ .slice()
+ .sort(
+ (workerNodeA, workerNodeB) =>
+ workerNodeB.usage.tasks.queued - workerNodeA.usage.tasks.queued
+ )
+ for (const sourceWorkerNode of workerNodes) {
+ if (sourceWorkerNode.usage.tasks.queued === 0) {
+ break
+ }
+ if (
+ sourceWorkerNode.info.ready &&
+ sourceWorkerNode.info.id !== workerId &&
+ sourceWorkerNode.usage.tasks.queued > 0
+ ) {
+ const task = {
+ ...(sourceWorkerNode.popTask() as Task<Data>),
+ workerId: destinationWorkerNode.info.id as number
+ }
+ if (
+ this.tasksQueueSize(destinationWorkerNodeKey) === 0 &&
+ destinationWorkerNode.usage.tasks.executing <
+ (this.opts.tasksQueueOptions?.concurrency as number)
+ ) {
+ this.executeTask(destinationWorkerNodeKey, task)
+ } else {
+ this.enqueueTask(destinationWorkerNodeKey, task)
+ }
+ break
+ }
+ }
+ }
+
+ private tasksStealingOnBackPressure (workerId: number): void {
+ const sourceWorkerNode =
+ this.workerNodes[this.getWorkerNodeKeyByWorkerId(workerId)]
+ const workerNodes = this.workerNodes
+ .slice()
+ .sort(
+ (workerNodeA, workerNodeB) =>
+ workerNodeA.usage.tasks.queued - workerNodeB.usage.tasks.queued
+ )
+ for (const [workerNodeKey, workerNode] of workerNodes.entries()) {
+ if (
+ sourceWorkerNode.usage.tasks.queued > 0 &&
+ workerNode.info.ready &&
+ workerNode.info.id !== workerId &&
+ workerNode.usage.tasks.queued <
+ (this.opts.tasksQueueOptions?.size as number) - 1
+ ) {
+ const task = {
+ ...(sourceWorkerNode.popTask() as Task<Data>),
+ workerId: workerNode.info.id as number
+ }
+ if (
+ this.tasksQueueSize(workerNodeKey) === 0 &&
+ workerNode.usage.tasks.executing <
+ (this.opts.tasksQueueOptions?.concurrency as number)
+ ) {
+ this.executeTask(workerNodeKey, task)
+ } else {
+ this.enqueueTask(workerNodeKey, task)
+ }
+ }
+ }