+ private redistributeQueuedTasks (worker: Worker): void {
+ const workerNodeKey = this.getWorkerNodeKey(worker)
+ while (this.tasksQueueSize(workerNodeKey) > 0) {
+ let targetWorkerNodeKey: number = workerNodeKey
+ let minQueuedTasks = Infinity
+ for (const [workerNodeId, workerNode] of this.workerNodes.entries()) {
+ if (
+ workerNodeId !== workerNodeKey &&
+ workerNode.usage.tasks.queued === 0
+ ) {
+ targetWorkerNodeKey = workerNodeId
+ break
+ }
+ if (
+ workerNodeId !== workerNodeKey &&
+ workerNode.usage.tasks.queued < minQueuedTasks
+ ) {
+ minQueuedTasks = workerNode.usage.tasks.queued
+ targetWorkerNodeKey = workerNodeId
+ }
+ }
+ this.enqueueTask(
+ targetWorkerNodeKey,
+ this.dequeueTask(workerNodeKey) as Task<Data>
+ )
+ }
+ }
+
+ /**
+ * Creates a new dynamic worker and sets it up completely in the pool worker nodes.
+ *
+ * @returns New, completely set up dynamic worker.
+ */
+ protected createAndSetupDynamicWorker (): Worker {
+ const worker = this.createAndSetupWorker()
+ this.registerWorkerMessageListener(worker, message => {
+ const workerNodeKey = this.getWorkerNodeKey(worker)
+ if (
+ isKillBehavior(KillBehaviors.HARD, message.kill) ||
+ (message.kill != null &&
+ ((this.opts.enableTasksQueue === false &&
+ this.workerNodes[workerNodeKey].usage.tasks.executing === 0) ||
+ (this.opts.enableTasksQueue === true &&
+ this.workerNodes[workerNodeKey].usage.tasks.executing === 0 &&
+ this.tasksQueueSize(workerNodeKey) === 0)))
+ ) {
+ // Kill message received from the worker: no new tasks are submitted to that worker for a while ( > maxInactiveTime)
+ void (this.destroyWorker(worker) as Promise<void>)
+ }
+ })
+ const workerInfo = this.getWorkerInfo(this.getWorkerNodeKey(worker))
+ workerInfo.dynamic = true
+ this.sendToWorker(worker, {
+ checkAlive: true,
+ workerId: workerInfo.id as number
+ })
+ return worker
+ }
+