## [Unreleased]
+### Fixed
+
+- Avoid worker node cross tasks stealing.
+- Ensure only half the pool worker nodes can steal tasks.
+
## [3.1.10] - 2023-12-23
### Changed
: accumulator,
0
),
+ ...(this.opts.enableTasksQueue === true && {
+ stealingWorkerNodes: this.workerNodes.reduce(
+ (accumulator, workerNode) =>
+ workerNode.info.stealing ? accumulator + 1 : accumulator,
+ 0
+ )
+ }),
busyWorkerNodes: this.workerNodes.reduce(
(accumulator, _workerNode, workerNodeKey) =>
this.isWorkerNodeBusy(workerNodeKey) ? accumulator + 1 : accumulator,
})
}
+ private cannotStealTask (): boolean {
+ return this.workerNodes.length <= 1 || this.info.queuedTasks === 0
+ }
+
private handleTask (workerNodeKey: number, task: Task<Data>): void {
if (this.shallExecuteTask(workerNodeKey)) {
this.executeTask(workerNodeKey, task)
if (workerNodeKey === -1) {
return
}
- if (this.workerNodes.length <= 1) {
+ if (this.cannotStealTask()) {
return
}
while (this.tasksQueueSize(workerNodeKey) > 0) {
eventDetail: WorkerNodeEventDetail,
previousStolenTask?: Task<Data>
): void => {
- if (this.workerNodes.length <= 1) {
- return
- }
const { workerNodeKey } = eventDetail
if (workerNodeKey == null) {
throw new Error(
- 'WorkerNode event detail workerNodeKey attribute must be defined'
+ 'WorkerNode event detail workerNodeKey property must be defined'
)
}
+ if (
+ this.cannotStealTask() ||
+ (this.info.stealingWorkerNodes as number) >
+ Math.floor(this.workerNodes.length / 2)
+ ) {
+ if (previousStolenTask != null) {
+ this.getWorkerInfo(workerNodeKey).stealing = false
+ }
+ return
+ }
const workerNodeTasksUsage = this.workerNodes[workerNodeKey].usage.tasks
if (
previousStolenTask != null &&
(workerNodeTasksUsage.executing > 0 ||
this.tasksQueueSize(workerNodeKey) > 0)
) {
+ this.getWorkerInfo(workerNodeKey).stealing = false
for (const taskName of this.workerNodes[workerNodeKey].info
.taskFunctionNames as string[]) {
this.resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
this.resetTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey)
return
}
+ this.getWorkerInfo(workerNodeKey).stealing = true
const stolenTask = this.workerNodeStealTask(workerNodeKey)
if (
this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
const sourceWorkerNode = workerNodes.find(
(sourceWorkerNode, sourceWorkerNodeKey) =>
sourceWorkerNode.info.ready &&
+ !sourceWorkerNode.info.stealing &&
sourceWorkerNodeKey !== workerNodeKey &&
sourceWorkerNode.usage.tasks.queued > 0
)
private readonly handleBackPressureEvent = (
eventDetail: WorkerNodeEventDetail
): void => {
- if (this.workerNodes.length <= 1) {
+ if (
+ this.cannotStealTask() ||
+ (this.info.stealingWorkerNodes as number) >
+ Math.floor(this.workerNodes.length / 2)
+ ) {
return
}
const { workerId } = eventDetail
if (
sourceWorkerNode.usage.tasks.queued > 0 &&
workerNode.info.ready &&
+ !workerNode.info.stealing &&
workerNode.info.id !== workerId &&
workerNode.usage.tasks.queued <
(this.opts.tasksQueueOptions?.size as number) - sizeOffset
) {
+ this.getWorkerInfo(workerNodeKey).stealing = true
const task = sourceWorkerNode.popTask() as Task<Data>
this.handleTask(workerNodeKey, task)
this.updateTaskStolenStatisticsWorkerUsage(
workerNodeKey,
task.name as string
)
+ this.getWorkerInfo(workerNodeKey).stealing = false
}
}
}
readonly utilization?: number
/** Pool total worker nodes. */
readonly workerNodes: number
+ /** Pool stealing worker nodes. */
+ readonly stealingWorkerNodes?: number
/** Pool idle worker nodes. */
readonly idleWorkerNodes: number
/** Pool busy worker nodes. */
id: getWorkerId(worker),
type: getWorkerType(worker) as WorkerType,
dynamic: false,
- ready: false
+ ready: false,
+ stealing: false
}
}
* Ready flag.
*/
ready: boolean
+ /**
+ * Stealing flag.
+ * This flag is set to `true` when worker node is stealing tasks from another worker node.
+ */
+ stealing: boolean
/**
* Task function names.
*/
id: expect.any(Number),
type: WorkerTypes.cluster,
dynamic: false,
- ready: true
+ ready: true,
+ stealing: false
})
}
await pool.destroy()
id: expect.any(Number),
type: WorkerTypes.thread,
dynamic: false,
- ready: true
+ ready: true,
+ stealing: false
})
}
await pool.destroy()
maxSize: expect.any(Number),
workerNodes: expect.any(Number),
idleWorkerNodes: expect.any(Number),
+ stealingWorkerNodes: expect.any(Number),
busyWorkerNodes: expect.any(Number),
executedTasks: expect.any(Number),
executingTasks: expect.any(Number),
stolenTasks: expect.any(Number),
failedTasks: expect.any(Number)
})
- expect(pool.hasBackPressure.callCount).toBe(5)
+ expect(pool.hasBackPressure.callCount).toBeGreaterThanOrEqual(7)
await pool.destroy()
})
id: threadWorkerNode.worker.threadId,
type: WorkerTypes.thread,
dynamic: false,
- ready: false
+ ready: false,
+ stealing: false
})
expect(threadWorkerNode.usage).toStrictEqual({
tasks: {
id: clusterWorkerNode.worker.id,
type: WorkerTypes.cluster,
dynamic: false,
- ready: false
+ ready: false,
+ stealing: false
})
expect(clusterWorkerNode.usage).toStrictEqual({
tasks: {
import { DEFAULT_TASK_NAME } from '../../lib/utils.js'
describe('Cluster worker test suite', () => {
- const sendStub = stub().returns()
- class SpyWorker extends ClusterWorker {
- getMainWorker () {
- return { send: sendStub }
- }
- }
-
afterEach(() => {
restore()
})
send: stub().returns()
})
worker.handleKillMessage()
+ expect(worker.getMainWorker.calledTwice).toBe(true)
expect(worker.getMainWorker().send.calledOnce).toBe(true)
expect(worker.opts.killHandler.calledOnce).toBe(true)
})
return 2
}
const worker = new ClusterWorker({ fn1, fn2 })
+ worker.getMainWorker = stub().returns({
+ id: 1,
+ send: stub().returns()
+ })
expect(worker.removeTaskFunction(0, fn1)).toStrictEqual({
status: false,
error: new TypeError('name parameter is not a string')
status: false,
error: new TypeError('name parameter is an empty string')
})
- worker.getMainWorker = stub().returns({
- id: 1,
- send: stub().returns()
- })
expect(worker.taskFunctions.get(DEFAULT_TASK_NAME)).toBeInstanceOf(Function)
expect(worker.taskFunctions.get('fn1')).toBeInstanceOf(Function)
expect(worker.taskFunctions.get('fn2')).toBeInstanceOf(Function)
expect(worker.taskFunctions.get('fn1')).toBeInstanceOf(Function)
expect(worker.taskFunctions.get('fn2')).toBeUndefined()
expect(worker.taskFunctions.size).toBe(2)
+ expect(worker.getMainWorker.calledTwice).toBe(true)
expect(worker.getMainWorker().send.calledOnce).toBe(true)
})
expect(worker.handleError(errorMessage)).toStrictEqual(errorMessage)
})
- it('Verify worker invokes the getMainWorker() and send() methods', () => {
- const worker = new SpyWorker(() => {})
+ it('Verify that sendToMainWorker() method invokes the getMainWorker() and send() methods', () => {
+ const worker = new ClusterWorker(() => {})
+ worker.getMainWorker = stub().returns({
+ send: stub().returns()
+ })
worker.sendToMainWorker({ ok: 1 })
+ expect(worker.getMainWorker.calledTwice).toBe(true)
expect(worker.getMainWorker().send.calledOnce).toBe(true)
})
})
import { DEFAULT_TASK_NAME } from '../../lib/utils.js'
describe('Thread worker test suite', () => {
- class SpyWorker extends ThreadWorker {
- constructor (fn) {
- super(fn)
- this.port = { postMessage: stub().returns() }
- }
- }
-
afterEach(() => {
restore()
})
return 2
}
const worker = new ThreadWorker({ fn1, fn2 })
+ worker.port = {
+ postMessage: stub().returns()
+ }
expect(worker.removeTaskFunction(0, fn1)).toStrictEqual({
status: false,
error: new TypeError('name parameter is not a string')
status: false,
error: new TypeError('name parameter is an empty string')
})
- worker.port = {
- postMessage: stub().returns()
- }
expect(worker.taskFunctions.get(DEFAULT_TASK_NAME)).toBeInstanceOf(Function)
expect(worker.taskFunctions.get('fn1')).toBeInstanceOf(Function)
expect(worker.taskFunctions.get('fn2')).toBeInstanceOf(Function)
expect(worker.handleError(errorMessage)).toStrictEqual(errorMessage)
})
- it('Verify worker invokes the postMessage() method on port property', () => {
- const worker = new SpyWorker(() => {})
+ it('Verify that sendToMainWorker() method invokes the port property postMessage() method', () => {
+ const worker = new ThreadWorker(() => {})
+ worker.port = { postMessage: stub().returns() }
worker.sendToMainWorker({ ok: 1 })
expect(worker.port.postMessage.calledOnce).toBe(true)
})