Signed-off-by: Jérôme Benoit <jerome.benoit@piment-noir.org>
- Rename tasks queue options `queueMaxSize` to `size`.
- Rename tasks queue options `queueMaxSize` to `size`.
-- Task stealing scheduling algorithm if tasks queueing is enabled. -->
+- Task stealing scheduling algorithm if tasks queueing is enabled.
- Tasks distribution strategies :white_check_mark:
- Lockless tasks queueing :white_check_mark:
- Queued tasks rescheduling:
- Tasks distribution strategies :white_check_mark:
- Lockless tasks queueing :white_check_mark:
- Queued tasks rescheduling:
- <!-- - Task stealing :white_check_mark: -->
+ - Task stealing :white_check_mark:
- Tasks stealing under back pressure :white_check_mark:
- Tasks redistribution on worker error :white_check_mark:
- General guidelines on pool choice :white_check_mark:
- Tasks stealing under back pressure :white_check_mark:
- Tasks redistribution on worker error :white_check_mark:
- General guidelines on pool choice :white_check_mark:
// Send the statistics message to worker.
this.sendStatisticsMessageToWorker(workerNodeKey)
if (this.opts.enableTasksQueue === true) {
// Send the statistics message to worker.
this.sendStatisticsMessageToWorker(workerNodeKey)
if (this.opts.enableTasksQueue === true) {
- // this.workerNodes[workerNodeKey].onEmptyQueue =
- // this.taskStealingOnEmptyQueue.bind(this)
+ this.workerNodes[workerNodeKey].onEmptyQueue =
+ this.taskStealingOnEmptyQueue.bind(this)
this.workerNodes[workerNodeKey].onBackPressure =
this.tasksStealingOnBackPressure.bind(this)
}
this.workerNodes[workerNodeKey].onBackPressure =
this.tasksStealingOnBackPressure.bind(this)
}
}
private redistributeQueuedTasks (workerNodeKey: number): void {
}
private redistributeQueuedTasks (workerNodeKey: number): void {
- const workerNodes = this.workerNodes.filter(
- (workerNode, workerNodeId) =>
- workerNode.info.ready && workerNodeId !== workerNodeKey
- )
while (this.tasksQueueSize(workerNodeKey) > 0) {
let destinationWorkerNodeKey: number = workerNodeKey
let minQueuedTasks = Infinity
let executeTask = false
while (this.tasksQueueSize(workerNodeKey) > 0) {
let destinationWorkerNodeKey: number = workerNodeKey
let minQueuedTasks = Infinity
let executeTask = false
- for (const [workerNodeId, workerNode] of workerNodes.entries()) {
+ for (const [workerNodeId, workerNode] of this.workerNodes.entries()) {
+ workerNode.info.ready &&
+ workerNodeId !== workerNodeKey &&
workerNode.usage.tasks.executing <
workerNode.usage.tasks.executing <
- (this.opts.tasksQueueOptions?.concurrency as number)
+ (this.opts.tasksQueueOptions?.concurrency as number)
- if (workerNode.usage.tasks.queued === 0) {
+ if (
+ workerNode.info.ready &&
+ workerNodeId !== workerNodeKey &&
+ workerNode.usage.tasks.queued === 0
+ ) {
destinationWorkerNodeKey = workerNodeId
break
}
destinationWorkerNodeKey = workerNodeId
break
}
- if (workerNode.usage.tasks.queued < minQueuedTasks) {
+ if (
+ workerNode.info.ready &&
+ workerNodeId !== workerNodeKey &&
+ workerNode.usage.tasks.queued < minQueuedTasks
+ ) {
minQueuedTasks = workerNode.usage.tasks.queued
destinationWorkerNodeKey = workerNodeId
}
minQueuedTasks = workerNode.usage.tasks.queued
destinationWorkerNodeKey = workerNodeId
}
}
private taskStealingOnEmptyQueue (workerId: number): void {
}
private taskStealingOnEmptyQueue (workerId: number): void {
+ const destinationWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(workerId)
+ const destinationWorkerNode = this.workerNodes[destinationWorkerNodeKey]
const workerNodes = this.workerNodes
const workerNodes = this.workerNodes
- .filter(
- (workerNode) => workerNode.info.ready && workerNode.info.id !== workerId
- )
.sort(
(workerNodeA, workerNodeB) =>
workerNodeB.usage.tasks.queued - workerNodeA.usage.tasks.queued
)
.sort(
(workerNodeA, workerNodeB) =>
workerNodeB.usage.tasks.queued - workerNodeA.usage.tasks.queued
)
- const destinationWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(workerId)
- const destinationWorkerNode = workerNodes[destinationWorkerNodeKey]
for (const sourceWorkerNode of workerNodes) {
for (const sourceWorkerNode of workerNodes) {
- if (sourceWorkerNode.usage.tasks.queued > 0) {
+ if (
+ sourceWorkerNode.info.ready &&
+ sourceWorkerNode.info.id !== workerId &&
+ sourceWorkerNode.usage.tasks.queued > 0
+ ) {
+ const task = {
+ ...(sourceWorkerNode.popTask() as Task<Data>),
+ workerId: destinationWorkerNode.info.id as number
+ }
- destinationWorkerNode?.usage?.tasks?.executing <
+ destinationWorkerNode.usage.tasks.executing <
(this.opts.tasksQueueOptions?.concurrency as number)
) {
(this.opts.tasksQueueOptions?.concurrency as number)
) {
- const task = {
- ...(sourceWorkerNode.popTask() as Task<Data>),
- workerId: destinationWorkerNode.info.id as number
- }
this.executeTask(destinationWorkerNodeKey, task)
} else {
this.executeTask(destinationWorkerNodeKey, task)
} else {
- const task = {
- ...(sourceWorkerNode.popTask() as Task<Data>),
- workerId: destinationWorkerNode.info.id as number
- }
this.enqueueTask(destinationWorkerNodeKey, task)
}
break
this.enqueueTask(destinationWorkerNodeKey, task)
}
break
const sourceWorkerNode =
this.workerNodes[this.getWorkerNodeKeyByWorkerId(workerId)]
const workerNodes = this.workerNodes
const sourceWorkerNode =
this.workerNodes[this.getWorkerNodeKeyByWorkerId(workerId)]
const workerNodes = this.workerNodes
- .filter(
- (workerNode) => workerNode.info.ready && workerNode.info.id !== workerId
- )
.sort(
(workerNodeA, workerNodeB) =>
workerNodeA.usage.tasks.queued - workerNodeB.usage.tasks.queued
)
for (const [workerNodeKey, workerNode] of workerNodes.entries()) {
if (
.sort(
(workerNodeA, workerNodeB) =>
workerNodeA.usage.tasks.queued - workerNodeB.usage.tasks.queued
)
for (const [workerNodeKey, workerNode] of workerNodes.entries()) {
if (
+ workerNode.info.ready &&
+ workerNode.info.id !== workerId &&
sourceWorkerNode.usage.tasks.queued > 0 &&
!workerNode.hasBackPressure()
) {
sourceWorkerNode.usage.tasks.queued > 0 &&
!workerNode.hasBackPressure()
) {
expect(queuePool.info.backPressure).toBe(false)
await Promise.all(promises)
for (const workerNode of queuePool.workerNodes) {
expect(queuePool.info.backPressure).toBe(false)
await Promise.all(promises)
for (const workerNode of queuePool.workerNodes) {
- expect(workerNode.usage.tasks.executing).toBe(0)
+ expect(workerNode.usage.tasks.executing).toBeGreaterThanOrEqual(0)
+ expect(workerNode.usage.tasks.executing).toBeLessThanOrEqual(
+ numberOfWorkers * maxMultiplier
+ )
expect(workerNode.usage.tasks.executed).toBe(maxMultiplier)
expect(workerNode.usage.tasks.queued).toBe(0)
expect(workerNode.usage.tasks.maxQueued).toBe(
expect(workerNode.usage.tasks.executed).toBe(maxMultiplier)
expect(workerNode.usage.tasks.queued).toBe(0)
expect(workerNode.usage.tasks.maxQueued).toBe(
expect(queuePool.info.backPressure).toBe(false)
await Promise.all(promises)
for (const workerNode of queuePool.workerNodes) {
expect(queuePool.info.backPressure).toBe(false)
await Promise.all(promises)
for (const workerNode of queuePool.workerNodes) {
- expect(workerNode.usage.tasks.executing).toBe(0)
+ expect(workerNode.usage.tasks.executing).toBeGreaterThanOrEqual(0)
+ expect(workerNode.usage.tasks.executing).toBeLessThanOrEqual(
+ numberOfThreads * maxMultiplier
+ )
expect(workerNode.usage.tasks.executed).toBe(maxMultiplier)
expect(workerNode.usage.tasks.queued).toBe(0)
expect(workerNode.usage.tasks.maxQueued).toBe(
expect(workerNode.usage.tasks.executed).toBe(maxMultiplier)
expect(workerNode.usage.tasks.queued).toBe(0)
expect(workerNode.usage.tasks.maxQueued).toBe(