[![Javascript Standard Style Guide](<https://badgen.net/static/code style/standard/green>)](https://standardjs.com)
[![Discord](https://badgen.net/discord/online-members/vXxZhyb3b6?icon=discord&label=discord&color=green)](https://discord.gg/vXxZhyb3b6)
[![Open Collective](https://opencollective.com/poolifier/tiers/badge.svg)](https://opencollective.com/poolifier)
-[![PRs Welcome](https://badgen.net/static/PRs/welcome/green)](http://makeapullrequest.com)
+[![PRs Welcome](https://badgen.net/static/PRs/welcome/green)](https://makeapullrequest.com)
[![No Dependencies](<https://badgen.net/static/dependencies/no dependencies/green>)](<https://badgen.net/static/dependencies/no dependencies/green>)
</div>
workerUsage.tasks.executing === 0 &&
this.tasksQueueSize(localWorkerNodeKey) === 0)))
) {
+ // Flag the worker as not ready immediately
+ this.flagWorkerNodeAsNotReady(localWorkerNodeKey)
this.destroyWorkerNode(localWorkerNodeKey).catch(error => {
this.emitter?.emit(PoolEvents.error, error)
})
}
}
+ protected flagWorkerNodeAsNotReady (workerNodeKey: number): void {
+ this.getWorkerInfo(workerNodeKey).ready = false
+ }
+
/** @inheritDoc */
public hasWorkerNodeBackPressure (workerNodeKey: number): boolean {
return (
/** @inheritDoc */
protected async destroyWorkerNode (workerNodeKey: number): Promise<void> {
+ this.flagWorkerNodeAsNotReady(workerNodeKey)
this.flushTasksQueue(workerNodeKey)
// FIXME: wait for tasks to be finished
const workerNode = this.workerNodes[workerNodeKey]
* @param workerNodeKey - The worker node key.
* @returns Whether the worker node is ready or not.
*/
- private isWorkerNodeReady (workerNodeKey: number): boolean {
- return this.pool.workerNodes[workerNodeKey]?.info.ready
+ protected isWorkerNodeReady (workerNodeKey: number): boolean {
+ return this.pool.workerNodes[workerNodeKey]?.info.ready ?? false
}
/**
* @param workerNodeKey - The worker node key.
* @returns `true` if the worker node has back pressure, `false` otherwise.
*/
- private hasWorkerNodeBackPressure (workerNodeKey: number): boolean {
+ protected hasWorkerNodeBackPressure (workerNodeKey: number): boolean {
return this.pool.hasWorkerNodeBackPressure(workerNodeKey)
}
- /**
- * Whether the worker node is eligible or not.
- * A worker node is eligible if it is ready and does not have back pressure.
- *
- * @param workerNodeKey - The worker node key.
- * @returns `true` if the worker node is eligible, `false` otherwise.
- * @see {@link isWorkerNodeReady}
- * @see {@link hasWorkerNodeBackPressure}
- */
- protected isWorkerNodeEligible (workerNodeKey: number): boolean {
- return (
- this.isWorkerNodeReady(workerNodeKey) &&
- !this.hasWorkerNodeBackPressure(workerNodeKey)
- )
- }
-
/**
* Gets the worker node task runtime.
* If the task statistics require the average runtime, the average runtime is returned.
this.previousWorkerNodeKey = workerNodeKey ?? this.previousWorkerNodeKey
}
- /**
- * Check the next worker node eligibility.
- */
- protected checkNextWorkerNodeEligibility (): void {
- if (!this.isWorkerNodeEligible(this.nextWorkerNodeKey as number)) {
- delete this.nextWorkerNodeKey
- }
- }
-
protected computeDefaultWorkerWeight (): number {
let cpusCycleTimeWeight = 0
for (const cpu of cpus()) {
this.computeWorkerNodeVirtualTaskEndTimestamp(workerNodeKey)
}
}
- return (workerNode.strategyData.virtualTaskEndTimestamp as number) <
- ((workerNodes[minWorkerNodeKey].strategyData as StrategyData)
- .virtualTaskEndTimestamp as number)
+ return this.isWorkerNodeReady(workerNodeKey) &&
+ (workerNode.strategyData.virtualTaskEndTimestamp as number) <
+ ((workerNodes[minWorkerNodeKey].strategyData as StrategyData)
+ .virtualTaskEndTimestamp as number)
? workerNodeKey
: minWorkerNodeKey
},
private leastBusyNextWorkerNodeKey (): number | undefined {
return this.pool.workerNodes.reduce(
(minWorkerNodeKey, workerNode, workerNodeKey, workerNodes) => {
- return (workerNode.usage.runTime.aggregate ?? 0) +
- (workerNode.usage.waitTime.aggregate ?? 0) <
- (workerNodes[minWorkerNodeKey].usage.runTime.aggregate ?? 0) +
- (workerNodes[minWorkerNodeKey].usage.waitTime.aggregate ?? 0)
+ return this.isWorkerNodeReady(workerNodeKey) &&
+ (workerNode.usage.runTime.aggregate ?? 0) +
+ (workerNode.usage.waitTime.aggregate ?? 0) <
+ (workerNodes[minWorkerNodeKey].usage.runTime.aggregate ?? 0) +
+ (workerNodes[minWorkerNodeKey].usage.waitTime.aggregate ?? 0)
? workerNodeKey
: minWorkerNodeKey
},
private leastEluNextWorkerNodeKey (): number | undefined {
return this.pool.workerNodes.reduce(
(minWorkerNodeKey, workerNode, workerNodeKey, workerNodes) => {
- return (workerNode.usage.elu.active.aggregate ?? 0) <
- (workerNodes[minWorkerNodeKey].usage.elu.active.aggregate ?? 0)
+ return this.isWorkerNodeReady(workerNodeKey) &&
+ (workerNode.usage.elu.active.aggregate ?? 0) <
+ (workerNodes[minWorkerNodeKey].usage.elu.active.aggregate ?? 0)
? workerNodeKey
: minWorkerNodeKey
},
private leastUsedNextWorkerNodeKey (): number | undefined {
return this.pool.workerNodes.reduce(
(minWorkerNodeKey, workerNode, workerNodeKey, workerNodes) => {
- return workerNode.usage.tasks.executed +
- workerNode.usage.tasks.executing +
- workerNode.usage.tasks.queued <
- workerNodes[minWorkerNodeKey].usage.tasks.executed +
- workerNodes[minWorkerNodeKey].usage.tasks.executing +
- workerNodes[minWorkerNodeKey].usage.tasks.queued
+ return this.isWorkerNodeReady(workerNodeKey) &&
+ workerNode.usage.tasks.executed +
+ workerNode.usage.tasks.executing +
+ workerNode.usage.tasks.queued <
+ workerNodes[minWorkerNodeKey].usage.tasks.executed +
+ workerNodes[minWorkerNodeKey].usage.tasks.executing +
+ workerNodes[minWorkerNodeKey].usage.tasks.queued
? workerNodeKey
: minWorkerNodeKey
},
}
private roundRobinNextWorkerNodeKey (): number | undefined {
- this.nextWorkerNodeKey =
- this.nextWorkerNodeKey === this.pool.workerNodes.length - 1
- ? 0
- : (this.nextWorkerNodeKey ?? this.previousWorkerNodeKey) + 1
+ do {
+ this.nextWorkerNodeKey =
+ this.nextWorkerNodeKey === this.pool.workerNodes.length - 1
+ ? 0
+ : (this.nextWorkerNodeKey ?? this.previousWorkerNodeKey) + 1
+ } while (!this.isWorkerNodeReady(this.nextWorkerNodeKey))
return this.nextWorkerNodeKey
}
}
}
private weightedRoundRobinNextWorkerNodeKey (): number | undefined {
- const workerWeight =
- this.opts.weights?.[
- this.nextWorkerNodeKey ?? this.previousWorkerNodeKey
- ] ?? this.defaultWorkerWeight
- if (this.workerNodeVirtualTaskRunTime < workerWeight) {
- this.workerNodeVirtualTaskRunTime =
- this.workerNodeVirtualTaskRunTime +
- this.getWorkerNodeTaskRunTime(
+ do {
+ const workerWeight =
+ this.opts.weights?.[
this.nextWorkerNodeKey ?? this.previousWorkerNodeKey
- )
- } else {
- this.nextWorkerNodeKey =
- this.nextWorkerNodeKey === this.pool.workerNodes.length - 1
- ? 0
- : (this.nextWorkerNodeKey ?? this.previousWorkerNodeKey) + 1
- this.workerNodeVirtualTaskRunTime = 0
- }
+ ] ?? this.defaultWorkerWeight
+ if (this.workerNodeVirtualTaskRunTime < workerWeight) {
+ this.workerNodeVirtualTaskRunTime =
+ this.workerNodeVirtualTaskRunTime +
+ this.getWorkerNodeTaskRunTime(
+ this.nextWorkerNodeKey ?? this.previousWorkerNodeKey
+ )
+ } else {
+ this.nextWorkerNodeKey =
+ this.nextWorkerNodeKey === this.pool.workerNodes.length - 1
+ ? 0
+ : (this.nextWorkerNodeKey ?? this.previousWorkerNodeKey) + 1
+ this.workerNodeVirtualTaskRunTime = 0
+ }
+ } while (!this.isWorkerNodeReady(this.nextWorkerNodeKey as number))
return this.nextWorkerNodeKey
}
}
/** @inheritDoc */
protected async destroyWorkerNode (workerNodeKey: number): Promise<void> {
+ this.flagWorkerNodeAsNotReady(workerNodeKey)
this.flushTasksQueue(workerNodeKey)
// FIXME: wait for tasks to be finished
const workerNode = this.workerNodes[workerNodeKey]
transferList?: TransferListItem[]
): void {
(
- this.workerNodes[workerNodeKey].messageChannel as MessageChannel
+ this.workerNodes[workerNodeKey]?.messageChannel as MessageChannel
).port1.postMessage(
{ ...message, workerId: this.getWorkerInfo(workerNodeKey).id },
transferList
listener: (message: MessageValue<Message>) => void
): void {
(
- this.workerNodes[workerNodeKey].messageChannel as MessageChannel
+ this.workerNodes[workerNodeKey]?.messageChannel as MessageChannel
).port1.on('message', listener)
}
listener: (message: MessageValue<Message>) => void
): void {
(
- this.workerNodes[workerNodeKey].messageChannel as MessageChannel
+ this.workerNodes[workerNodeKey]?.messageChannel as MessageChannel
).port1.once('message', listener)
}
listener: (message: MessageValue<Message>) => void
): void {
(
- this.workerNodes[workerNodeKey].messageChannel as MessageChannel
+ this.workerNodes[workerNodeKey]?.messageChannel as MessageChannel
).port1.off('message', listener)
}
readonly usage: WorkerUsage
/**
* Worker choice strategy data.
- * This is used to store data that is specific to the worker choice strategy.
+ * This is used to store data that are specific to the worker choice strategy.
*/
strategyData?: StrategyData
/**