this.emitter.emit(PoolEvents.error, error)
}
if (this.opts.enableTasksQueue === true) {
- const workerNodeKey = this.getWorkerNodeKey(worker)
- while (this.tasksQueueSize(workerNodeKey) > 0) {
- let targetWorkerNodeKey: number = workerNodeKey
- let minQueuedTasks = Infinity
- for (const [workerNodeId, workerNode] of this.workerNodes.entries()) {
- if (
- workerNodeId !== workerNodeKey &&
- workerNode.usage.tasks.queued === 0
- ) {
- targetWorkerNodeKey = workerNodeId
- break
- }
- if (
- workerNodeId !== workerNodeKey &&
- workerNode.usage.tasks.queued < minQueuedTasks
- ) {
- minQueuedTasks = workerNode.usage.tasks.queued
- targetWorkerNodeKey = workerNodeId
- }
- }
- this.enqueueTask(
- targetWorkerNodeKey,
- this.dequeueTask(workerNodeKey) as Task<Data>
- )
- }
+ this.redistributeQueuedTasks(worker)
}
if (this.opts.restartWorkerOnError === true) {
if (this.getWorkerInfo(this.getWorkerNodeKey(worker)).dynamic) {
return worker
}
+ private redistributeQueuedTasks (worker: Worker): void {
+ const workerNodeKey = this.getWorkerNodeKey(worker)
+ while (this.tasksQueueSize(workerNodeKey) > 0) {
+ let targetWorkerNodeKey: number = workerNodeKey
+ let minQueuedTasks = Infinity
+ for (const [workerNodeId, workerNode] of this.workerNodes.entries()) {
+ if (
+ workerNodeId !== workerNodeKey &&
+ workerNode.usage.tasks.queued === 0
+ ) {
+ targetWorkerNodeKey = workerNodeId
+ break
+ }
+ if (
+ workerNodeId !== workerNodeKey &&
+ workerNode.usage.tasks.queued < minQueuedTasks
+ ) {
+ minQueuedTasks = workerNode.usage.tasks.queued
+ targetWorkerNodeKey = workerNodeId
+ }
+ }
+ this.enqueueTask(
+ targetWorkerNodeKey,
+ this.dequeueTask(workerNodeKey) as Task<Data>
+ )
+ }
+ }
+
/**
* Creates a new dynamic worker and sets it up completely in the pool worker nodes.
*