Signed-off-by: Jérôme Benoit <jerome.benoit@piment-noir.org>
+ /**
+ * Returns an backward iterator for the deque.
+ *
+ * @returns An backward iterator for the deque.
+ * @see https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Iteration_protocols
+ */
backward (): Iterable<T> {
return {
[Symbol.iterator]: (): Iterator<T> => {
backward (): Iterable<T> {
return {
[Symbol.iterator]: (): Iterator<T> => {
}
private redistributeQueuedTasks (workerNodeKey: number): void {
}
private redistributeQueuedTasks (workerNodeKey: number): void {
+ const workerNodes = this.workerNodes.filter(
+ (_, workerNodeId) => workerNodeId !== workerNodeKey
+ )
while (this.tasksQueueSize(workerNodeKey) > 0) {
let targetWorkerNodeKey: number = workerNodeKey
let minQueuedTasks = Infinity
let executeTask = false
while (this.tasksQueueSize(workerNodeKey) > 0) {
let targetWorkerNodeKey: number = workerNodeKey
let minQueuedTasks = Infinity
let executeTask = false
- for (const [workerNodeId, workerNode] of this.workerNodes.entries()) {
+ for (const [workerNodeId, workerNode] of workerNodes.entries()) {
if (
this.workerNodes[workerNodeId].usage.tasks.executing <
(this.opts.tasksQueueOptions?.concurrency as number)
) {
executeTask = true
}
if (
this.workerNodes[workerNodeId].usage.tasks.executing <
(this.opts.tasksQueueOptions?.concurrency as number)
) {
executeTask = true
}
- if (
- workerNodeId !== workerNodeKey &&
- workerNode.info.ready &&
- workerNode.usage.tasks.queued === 0
- ) {
+ if (workerNode.info.ready && workerNode.usage.tasks.queued === 0) {
targetWorkerNodeKey = workerNodeId
break
}
if (
targetWorkerNodeKey = workerNodeId
break
}
if (
- workerNodeId !== workerNodeKey &&
workerNode.info.ready &&
workerNode.usage.tasks.queued < minQueuedTasks
) {
workerNode.info.ready &&
workerNode.usage.tasks.queued < minQueuedTasks
) {
if (
workerNode.info.ready &&
sourceWorkerNode.usage.tasks.queued > 0 &&
if (
workerNode.info.ready &&
sourceWorkerNode.usage.tasks.queued > 0 &&
- !workerNode.hasBackPressure() &&
- workerNode.usage.tasks.executing <
- (this.opts.tasksQueueOptions?.concurrency as number)
+ !workerNode.hasBackPressure()
- this.executeTask(
- workerNodeKey,
- sourceWorkerNode.popTask() as Task<Data>
- )
- } else if (
- workerNode.info.ready &&
- sourceWorkerNode.usage.tasks.queued > 0 &&
- !workerNode.hasBackPressure() &&
- workerNode.usage.tasks.executing >=
+ if (
+ workerNode.usage.tasks.executing <
(this.opts.tasksQueueOptions?.concurrency as number)
(this.opts.tasksQueueOptions?.concurrency as number)
- ) {
- this.enqueueTask(
- workerNodeKey,
- sourceWorkerNode.popTask() as Task<Data>
- )
+ ) {
+ this.executeTask(
+ workerNodeKey,
+ sourceWorkerNode.popTask() as Task<Data>
+ )
+ } else {
+ this.enqueueTask(
+ workerNodeKey,
+ sourceWorkerNode.popTask() as Task<Data>
+ )
+ }