type WorkerChoiceStrategyOptions
} from './selection-strategies/selection-strategies-types'
import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
+import { version } from './version'
/**
* Base class that implements some shared logic for all poolifier pools.
/** @inheritDoc */
public get info (): PoolInfo {
return {
+ version,
type: this.type,
worker: this.worker,
minSize: this.minSize,
}
}
- /**
- * Gets the pool run time.
- *
- * @returns The pool run time in milliseconds.
- */
- private get runTime (): number {
- return performance.now() - this.startTimestamp
- }
-
/**
* Gets the approximate pool utilization.
*
* @returns The pool utilization.
*/
private get utilization (): number {
- const poolRunTimeCapacity = this.runTime * this.maxSize
+ const poolRunTimeCapacity =
+ (performance.now() - this.startTimestamp) * this.maxSize
const totalTasksRunTime = this.workerNodes.reduce(
(accumulator, workerNode) =>
accumulator + workerNode.usage.runTime.aggregate,
this.workerNodes.map(async (workerNode, workerNodeKey) => {
this.flushTasksQueue(workerNodeKey)
// FIXME: wait for tasks to be finished
+ const workerExitPromise = new Promise<void>(resolve => {
+ workerNode.worker.on('exit', () => {
+ resolve()
+ })
+ })
await this.destroyWorker(workerNode.worker)
+ await workerExitPromise
})
)
}
if (this.emitter != null) {
this.emitter.emit(PoolEvents.error, error)
}
+ if (this.opts.enableTasksQueue === true) {
+ const workerNodeKey = this.getWorkerNodeKey(worker)
+ while (this.tasksQueueSize(workerNodeKey) > 0) {
+ let targetWorkerNodeKey: number = workerNodeKey
+ let minQueuedTasks = Infinity
+ for (const [workerNodeId, workerNode] of this.workerNodes.entries()) {
+ if (
+ workerNodeId !== workerNodeKey &&
+ workerNode.usage.tasks.queued === 0
+ ) {
+ targetWorkerNodeKey = workerNodeId
+ break
+ }
+ if (
+ workerNodeId !== workerNodeKey &&
+ workerNode.usage.tasks.queued < minQueuedTasks
+ ) {
+ minQueuedTasks = workerNode.usage.tasks.queued
+ targetWorkerNodeKey = workerNodeId
+ }
+ }
+ this.enqueueTask(
+ targetWorkerNodeKey,
+ this.dequeueTask(workerNodeKey) as Task<Data>
+ )
+ }
+ }
if (this.opts.restartWorkerOnError === true) {
this.createAndSetupWorker()
}
}
private flushTasksQueue (workerNodeKey: number): void {
- if (this.tasksQueueSize(workerNodeKey) > 0) {
- for (let i = 0; i < this.tasksQueueSize(workerNodeKey); i++) {
- this.executeTask(
- workerNodeKey,
- this.dequeueTask(workerNodeKey) as Task<Data>
- )
- }
+ while (this.tasksQueueSize(workerNodeKey) > 0) {
+ this.executeTask(
+ workerNodeKey,
+ this.dequeueTask(workerNodeKey) as Task<Data>
+ )
}
this.workerNodes[workerNodeKey].tasksQueue.clear()
}