repositories
/
poolifier.git
/ blobdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
tree
raw
|
inline
| side by side
fix: fix tasks queued count computation
[poolifier.git]
/
src
/
pools
/
abstract-pool.ts
diff --git
a/src/pools/abstract-pool.ts
b/src/pools/abstract-pool.ts
index 2b59abf27cfa2a37a518610afce2998d2dff1c6a..07907be9c6c48bd6eaf73edad4144bba9ada8800 100644
(file)
--- a/
src/pools/abstract-pool.ts
+++ b/
src/pools/abstract-pool.ts
@@
-21,14
+21,9
@@
import {
type TasksQueueOptions,
type WorkerType
} from './pool'
type TasksQueueOptions,
type WorkerType
} from './pool'
-import type {
- IWorker,
- Task,
- TaskStatistics,
- WorkerNode,
- WorkerUsage
-} from './worker'
+import type { IWorker, Task, WorkerNode, WorkerUsage } from './worker'
import {
import {
+ Measurements,
WorkerChoiceStrategies,
type WorkerChoiceStrategy,
type WorkerChoiceStrategyOptions
WorkerChoiceStrategies,
type WorkerChoiceStrategy,
type WorkerChoiceStrategyOptions
@@
-199,6
+194,16
@@
export abstract class AbstractPool<
'Invalid worker choice strategy options: must have a weight for each worker node'
)
}
'Invalid worker choice strategy options: must have a weight for each worker node'
)
}
+ if (
+ workerChoiceStrategyOptions.measurement != null &&
+ !Object.values(Measurements).includes(
+ workerChoiceStrategyOptions.measurement
+ )
+ ) {
+ throw new Error(
+ `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
+ )
+ }
}
private checkValidTasksQueueOptions (
}
private checkValidTasksQueueOptions (
@@
-207,11
+212,20
@@
export abstract class AbstractPool<
if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
throw new TypeError('Invalid tasks queue options: must be a plain object')
}
if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
throw new TypeError('Invalid tasks queue options: must be a plain object')
}
- if ((tasksQueueOptions?.concurrency as number) <= 0) {
+ if (
+ tasksQueueOptions?.concurrency != null &&
+ !Number.isSafeInteger(tasksQueueOptions.concurrency)
+ ) {
+ throw new TypeError(
+ 'Invalid worker tasks concurrency: must be an integer'
+ )
+ }
+ if (
+ tasksQueueOptions?.concurrency != null &&
+ tasksQueueOptions.concurrency <= 0
+ ) {
throw new Error(
throw new Error(
- `Invalid worker tasks concurrency '${
- tasksQueueOptions.concurrency as number
- }'`
+ `Invalid worker tasks concurrency '${tasksQueueOptions.concurrency}'`
)
}
}
)
}
}
@@
-312,10
+326,10
@@
export abstract class AbstractPool<
if (workerChoiceStrategyOptions != null) {
this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
}
if (workerChoiceStrategyOptions != null) {
this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
}
- for (const
workerNode of this.workerNodes
) {
+ for (const
[workerNodeKey, workerNode] of this.workerNodes.entries()
) {
this.setWorkerNodeTasksUsage(
workerNode,
this.setWorkerNodeTasksUsage(
workerNode,
- this.getWorkerUsage(workerNode
.worker
)
+ this.getWorkerUsage(workerNode
Key
)
)
this.setWorkerStatistics(workerNode.worker)
}
)
this.setWorkerStatistics(workerNode.worker)
}
@@
-421,7
+435,6
@@
export abstract class AbstractPool<
} else {
this.executeTask(workerNodeKey, submittedTask)
}
} else {
this.executeTask(workerNodeKey, submittedTask)
}
- this.workerChoiceStrategyContext.update(workerNodeKey)
this.checkAndEmitEvents()
// eslint-disable-next-line @typescript-eslint/return-await
return res
this.checkAndEmitEvents()
// eslint-disable-next-line @typescript-eslint/return-await
return res
@@
-661,7
+674,7
@@
export abstract class AbstractPool<
>(worker: Worker, listener: (message: MessageValue<Message>) => void): void
/**
>(worker: Worker, listener: (message: MessageValue<Message>) => void): void
/**
- * Creates a new
ly
worker.
+ * Creates a new worker.
*
* @returns Newly created worker.
*/
*
* @returns Newly created worker.
*/
@@
-717,16
+730,19
@@
export abstract class AbstractPool<
protected createAndSetupDynamicWorker (): Worker {
const worker = this.createAndSetupWorker()
this.registerWorkerMessageListener(worker, message => {
protected createAndSetupDynamicWorker (): Worker {
const worker = this.createAndSetupWorker()
this.registerWorkerMessageListener(worker, message => {
- const
currentW
orkerNodeKey = this.getWorkerNodeKey(worker)
+ const
w
orkerNodeKey = this.getWorkerNodeKey(worker)
if (
isKillBehavior(KillBehaviors.HARD, message.kill) ||
(message.kill != null &&
if (
isKillBehavior(KillBehaviors.HARD, message.kill) ||
(message.kill != null &&
- this.workerNodes[currentWorkerNodeKey].workerUsage.tasks.executing ===
- 0)
+ ((this.opts.enableTasksQueue === false &&
+ this.workerNodes[workerNodeKey].workerUsage.tasks.executing ===
+ 0) ||
+ (this.opts.enableTasksQueue === true &&
+ this.workerNodes[workerNodeKey].workerUsage.tasks.executing ===
+ 0 &&
+ this.tasksQueueSize(workerNodeKey) === 0)))
) {
// Kill message received from the worker: no new tasks are submitted to that worker for a while ( > maxInactiveTime)
) {
// Kill message received from the worker: no new tasks are submitted to that worker for a while ( > maxInactiveTime)
- this.flushTasksQueue(currentWorkerNodeKey)
- // FIXME: wait for tasks to be finished
void (this.destroyWorker(worker) as Promise<void>)
}
})
void (this.destroyWorker(worker) as Promise<void>)
}
})
@@
-745,10
+761,10
@@
export abstract class AbstractPool<
const promiseResponse = this.promiseResponseMap.get(message.id)
if (promiseResponse != null) {
if (message.taskError != null) {
const promiseResponse = this.promiseResponseMap.get(message.id)
if (promiseResponse != null) {
if (message.taskError != null) {
- promiseResponse.reject(message.taskError.message)
if (this.emitter != null) {
this.emitter.emit(PoolEvents.taskError, message.taskError)
}
if (this.emitter != null) {
this.emitter.emit(PoolEvents.taskError, message.taskError)
}
+ promiseResponse.reject(message.taskError.message)
} else {
promiseResponse.resolve(message.data as Response)
}
} else {
promiseResponse.resolve(message.data as Response)
}
@@
-764,6
+780,7
@@
export abstract class AbstractPool<
this.dequeueTask(workerNodeKey) as Task<Data>
)
}
this.dequeueTask(workerNodeKey) as Task<Data>
)
}
+ this.workerChoiceStrategyContext.update(workerNodeKey)
}
}
}
}
}
}
@@
-800,11
+817,17
@@
export abstract class AbstractPool<
* @returns The worker nodes length.
*/
private pushWorkerNode (worker: Worker): number {
* @returns The worker nodes length.
*/
private pushWorkerNode (worker: Worker): number {
-
return
this.workerNodes.push({
+ this.workerNodes.push({
worker,
worker,
- workerUsage: this.getWorkerUsage(
worker
),
+ workerUsage: this.getWorkerUsage(),
tasksQueue: new Queue<Task<Data>>()
})
tasksQueue: new Queue<Task<Data>>()
})
+ const workerNodeKey = this.getWorkerNodeKey(worker)
+ this.setWorkerNodeTasksUsage(
+ this.workerNodes[workerNodeKey],
+ this.getWorkerUsage(workerNodeKey)
+ )
+ return this.workerNodes.length
}
// /**
}
// /**
@@
-887,9
+910,19
@@
export abstract class AbstractPool<
})
}
})
}
- private getWorkerUsage (worker: Worker): WorkerUsage {
+ private getWorkerUsage (workerNodeKey?: number): WorkerUsage {
+ const getQueueSize = (workerNodeKey: number): number => {
+ return this.tasksQueueSize(workerNodeKey)
+ }
return {
return {
- tasks: this.getTaskStatistics(worker),
+ tasks: {
+ executed: 0,
+ executing: 0,
+ get queued (): number {
+ return workerNodeKey == null ? 0 : getQueueSize(workerNodeKey)
+ },
+ failed: 0
+ },
runTime: {
aggregate: 0,
average: 0,
runTime: {
aggregate: 0,
average: 0,
@@
-919,17
+952,4
@@
export abstract class AbstractPool<
}
}
}
}
}
}
-
- private getTaskStatistics (worker: Worker): TaskStatistics {
- const queueSize =
- this.workerNodes[this.getWorkerNodeKey(worker)]?.tasksQueue?.size
- return {
- executed: 0,
- executing: 0,
- get queued (): number {
- return queueSize ?? 0
- },
- failed: 0
- }
- }
}
}