repositories
/
poolifier.git
/ blobdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
tree
raw
|
inline
| side by side
build(deps-dev): apply updates
[poolifier.git]
/
src
/
pools
/
abstract-pool.ts
diff --git
a/src/pools/abstract-pool.ts
b/src/pools/abstract-pool.ts
index 56bd0348829590429dc465944fef22b9f28c4b4c..c8d4551167d28d0bcf64cf3e7afb082dff53841d 100644
(file)
--- a/
src/pools/abstract-pool.ts
+++ b/
src/pools/abstract-pool.ts
@@
-130,10
+130,10
@@
export abstract class AbstractPool<
/**
* Constructs a new poolifier pool.
*
/**
* Constructs a new poolifier pool.
*
- * @param minimumNumberOfWorkers - Minimum number of workers that this pool
should manage
.
+ * @param minimumNumberOfWorkers - Minimum number of workers that this pool
manages
.
* @param filePath - Path to the worker file.
* @param opts - Options for the pool.
* @param filePath - Path to the worker file.
* @param opts - Options for the pool.
- * @param maximumNumberOfWorkers - Maximum number of workers that this pool
should manage
.
+ * @param maximumNumberOfWorkers - Maximum number of workers that this pool
manages
.
*/
public constructor (
protected readonly minimumNumberOfWorkers: number,
*/
public constructor (
protected readonly minimumNumberOfWorkers: number,
@@
-191,20
+191,20
@@
export abstract class AbstractPool<
}
}
}
}
- private checkMinimumNumberOfWorkers (
n
umberOfWorkers: number): void {
- if (
n
umberOfWorkers == null) {
+ private checkMinimumNumberOfWorkers (
minimumN
umberOfWorkers: number): void {
+ if (
minimumN
umberOfWorkers == null) {
throw new Error(
'Cannot instantiate a pool without specifying the number of workers'
)
throw new Error(
'Cannot instantiate a pool without specifying the number of workers'
)
- } else if (!Number.isSafeInteger(
n
umberOfWorkers)) {
+ } else if (!Number.isSafeInteger(
minimumN
umberOfWorkers)) {
throw new TypeError(
'Cannot instantiate a pool with a non safe integer number of workers'
)
throw new TypeError(
'Cannot instantiate a pool with a non safe integer number of workers'
)
- } else if (
n
umberOfWorkers < 0) {
+ } else if (
minimumN
umberOfWorkers < 0) {
throw new RangeError(
'Cannot instantiate a pool with a negative number of workers'
)
throw new RangeError(
'Cannot instantiate a pool with a negative number of workers'
)
- } else if (this.type === PoolTypes.fixed &&
n
umberOfWorkers === 0) {
+ } else if (this.type === PoolTypes.fixed &&
minimumN
umberOfWorkers === 0) {
throw new RangeError('Cannot instantiate a fixed pool with zero worker')
}
}
throw new RangeError('Cannot instantiate a fixed pool with zero worker')
}
}
@@
-298,6
+298,13
@@
export abstract class AbstractPool<
: accumulator,
0
),
: accumulator,
0
),
+ ...(this.opts.enableTasksQueue === true && {
+ stealingWorkerNodes: this.workerNodes.reduce(
+ (accumulator, workerNode) =>
+ workerNode.info.stealing ? accumulator + 1 : accumulator,
+ 0
+ )
+ }),
busyWorkerNodes: this.workerNodes.reduce(
(accumulator, _workerNode, workerNodeKey) =>
this.isWorkerNodeBusy(workerNodeKey) ? accumulator + 1 : accumulator,
busyWorkerNodes: this.workerNodes.reduce(
(accumulator, _workerNode, workerNodeKey) =>
this.isWorkerNodeBusy(workerNodeKey) ? accumulator + 1 : accumulator,
@@
-984,8
+991,8
@@
export abstract class AbstractPool<
private async sendKillMessageToWorker (workerNodeKey: number): Promise<void> {
await new Promise<void>((resolve, reject) => {
private async sendKillMessageToWorker (workerNodeKey: number): Promise<void> {
await new Promise<void>((resolve, reject) => {
- if (
workerNodeKey < 0 || workerNodeKey >= this.workerNodes.length
) {
- re
ject(new Error(`Invalid worker node key '${workerNodeKey}'`)
)
+ if (
this.workerNodes?.[workerNodeKey] == null
) {
+ re
solve(
)
return
}
const killMessageListener = (message: MessageValue<Response>): void => {
return
}
const killMessageListener = (message: MessageValue<Response>): void => {
@@
-1178,9
+1185,7
@@
export abstract class AbstractPool<
*
* @returns Whether to create a dynamic worker or not.
*/
*
* @returns Whether to create a dynamic worker or not.
*/
- private shallCreateDynamicWorker (): boolean {
- return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
- }
+ protected abstract shallCreateDynamicWorker (): boolean
/**
* Sends a message to worker given its worker node key.
/**
* Sends a message to worker given its worker node key.
@@
-1399,6
+1404,10
@@
export abstract class AbstractPool<
})
}
})
}
+ private cannotStealTask (): boolean {
+ return this.workerNodes.length <= 1 || this.info.queuedTasks === 0
+ }
+
private handleTask (workerNodeKey: number, task: Task<Data>): void {
if (this.shallExecuteTask(workerNodeKey)) {
this.executeTask(workerNodeKey, task)
private handleTask (workerNodeKey: number, task: Task<Data>): void {
if (this.shallExecuteTask(workerNodeKey)) {
this.executeTask(workerNodeKey, task)
@@
-1411,7
+1420,7
@@
export abstract class AbstractPool<
if (workerNodeKey === -1) {
return
}
if (workerNodeKey === -1) {
return
}
- if (this.
workerNodes.length <= 1
) {
+ if (this.
cannotStealTask()
) {
return
}
while (this.tasksQueueSize(workerNodeKey) > 0) {
return
}
while (this.tasksQueueSize(workerNodeKey) > 0) {
@@
-1505,15
+1514,22
@@
export abstract class AbstractPool<
eventDetail: WorkerNodeEventDetail,
previousStolenTask?: Task<Data>
): void => {
eventDetail: WorkerNodeEventDetail,
previousStolenTask?: Task<Data>
): void => {
- if (this.workerNodes.length <= 1) {
- return
- }
const { workerNodeKey } = eventDetail
if (workerNodeKey == null) {
throw new Error(
const { workerNodeKey } = eventDetail
if (workerNodeKey == null) {
throw new Error(
- 'WorkerNode event detail workerNodeKey
attribute
must be defined'
+ 'WorkerNode event detail workerNodeKey
property
must be defined'
)
}
)
}
+ if (
+ this.cannotStealTask() ||
+ (this.info.stealingWorkerNodes as number) >
+ Math.floor(this.workerNodes.length / 2)
+ ) {
+ if (previousStolenTask != null) {
+ this.getWorkerInfo(workerNodeKey).stealing = false
+ }
+ return
+ }
const workerNodeTasksUsage = this.workerNodes[workerNodeKey].usage.tasks
if (
previousStolenTask != null &&
const workerNodeTasksUsage = this.workerNodes[workerNodeKey].usage.tasks
if (
previousStolenTask != null &&
@@
-1521,6
+1537,7
@@
export abstract class AbstractPool<
(workerNodeTasksUsage.executing > 0 ||
this.tasksQueueSize(workerNodeKey) > 0)
) {
(workerNodeTasksUsage.executing > 0 ||
this.tasksQueueSize(workerNodeKey) > 0)
) {
+ this.getWorkerInfo(workerNodeKey).stealing = false
for (const taskName of this.workerNodes[workerNodeKey].info
.taskFunctionNames as string[]) {
this.resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
for (const taskName of this.workerNodes[workerNodeKey].info
.taskFunctionNames as string[]) {
this.resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
@@
-1531,6
+1548,7
@@
export abstract class AbstractPool<
this.resetTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey)
return
}
this.resetTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey)
return
}
+ this.getWorkerInfo(workerNodeKey).stealing = true
const stolenTask = this.workerNodeStealTask(workerNodeKey)
if (
this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
const stolenTask = this.workerNodeStealTask(workerNodeKey)
if (
this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
@@
-1577,6
+1595,7
@@
export abstract class AbstractPool<
const sourceWorkerNode = workerNodes.find(
(sourceWorkerNode, sourceWorkerNodeKey) =>
sourceWorkerNode.info.ready &&
const sourceWorkerNode = workerNodes.find(
(sourceWorkerNode, sourceWorkerNodeKey) =>
sourceWorkerNode.info.ready &&
+ !sourceWorkerNode.info.stealing &&
sourceWorkerNodeKey !== workerNodeKey &&
sourceWorkerNode.usage.tasks.queued > 0
)
sourceWorkerNodeKey !== workerNodeKey &&
sourceWorkerNode.usage.tasks.queued > 0
)
@@
-1595,7
+1614,11
@@
export abstract class AbstractPool<
private readonly handleBackPressureEvent = (
eventDetail: WorkerNodeEventDetail
): void => {
private readonly handleBackPressureEvent = (
eventDetail: WorkerNodeEventDetail
): void => {
- if (this.workerNodes.length <= 1) {
+ if (
+ this.cannotStealTask() ||
+ (this.info.stealingWorkerNodes as number) >
+ Math.floor(this.workerNodes.length / 2)
+ ) {
return
}
const { workerId } = eventDetail
return
}
const { workerId } = eventDetail
@@
-1615,16
+1638,19
@@
export abstract class AbstractPool<
if (
sourceWorkerNode.usage.tasks.queued > 0 &&
workerNode.info.ready &&
if (
sourceWorkerNode.usage.tasks.queued > 0 &&
workerNode.info.ready &&
+ !workerNode.info.stealing &&
workerNode.info.id !== workerId &&
workerNode.usage.tasks.queued <
(this.opts.tasksQueueOptions?.size as number) - sizeOffset
) {
workerNode.info.id !== workerId &&
workerNode.usage.tasks.queued <
(this.opts.tasksQueueOptions?.size as number) - sizeOffset
) {
+ this.getWorkerInfo(workerNodeKey).stealing = true
const task = sourceWorkerNode.popTask() as Task<Data>
this.handleTask(workerNodeKey, task)
this.updateTaskStolenStatisticsWorkerUsage(
workerNodeKey,
task.name as string
)
const task = sourceWorkerNode.popTask() as Task<Data>
this.handleTask(workerNodeKey, task)
this.updateTaskStolenStatisticsWorkerUsage(
workerNodeKey,
task.name as string
)
+ this.getWorkerInfo(workerNodeKey).stealing = false
}
}
}
}
}
}
@@
-1729,13
+1755,10
@@
export abstract class AbstractPool<
}
}
}
}
- private checkAndEmitDynamicWorkerCreationEvents (): void {
- if (this.type === PoolTypes.dynamic) {
- if (this.full) {
- this.emitter?.emit(PoolEvents.full, this.info)
- }
- }
- }
+ /**
+ * Emits dynamic worker creation events.
+ */
+ protected abstract checkAndEmitDynamicWorkerCreationEvents (): void
/**
* Gets the worker information given its worker node key.
/**
* Gets the worker information given its worker node key.