import {
DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
EMPTY_FUNCTION,
+ isKillBehavior,
isPlainObject,
- median
+ median,
+ round
} from '../utils'
-import { KillBehaviors, isKillBehavior } from '../worker/worker-options'
+import { KillBehaviors } from '../worker/worker-options'
import { CircularArray } from '../circular-array'
import { Queue } from '../queue'
import {
type WorkerChoiceStrategyOptions
} from './selection-strategies/selection-strategies-types'
import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
+import { version } from './version'
/**
* Base class that implements some shared logic for all poolifier pools.
Response
>
+ /**
+ * The start timestamp of the pool.
+ */
+ private readonly startTimestamp
+
/**
* Constructs a new poolifier pool.
*
this.setupHook()
- for (let i = 1; i <= this.numberOfWorkers; i++) {
+ while (this.workerNodes.length < this.numberOfWorkers) {
this.createAndSetupWorker()
}
+
+ this.startTimestamp = performance.now()
}
private checkFilePath (filePath: string): void {
}
}
- private get starting (): boolean {
- return this.workerNodes.some(workerNode => !workerNode.info.started)
- }
-
- private get started (): boolean {
- return this.workerNodes.some(workerNode => workerNode.info.started)
- }
-
/** @inheritDoc */
public get info (): PoolInfo {
return {
+ version,
type: this.type,
worker: this.worker,
minSize: this.minSize,
maxSize: this.maxSize,
+ ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
+ .runTime.aggregate &&
+ this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
+ .waitTime.aggregate && { utilization: round(this.utilization) }),
workerNodes: this.workerNodes.length,
idleWorkerNodes: this.workerNodes.reduce(
(accumulator, workerNode) =>
}
}
+ /**
+ * Gets the approximate pool utilization.
+ *
+ * @returns The pool utilization.
+ */
+ private get utilization (): number {
+ const poolRunTimeCapacity =
+ (performance.now() - this.startTimestamp) * this.maxSize
+ const totalTasksRunTime = this.workerNodes.reduce(
+ (accumulator, workerNode) =>
+ accumulator + workerNode.usage.runTime.aggregate,
+ 0
+ )
+ const totalTasksWaitTime = this.workerNodes.reduce(
+ (accumulator, workerNode) =>
+ accumulator + workerNode.usage.waitTime.aggregate,
+ 0
+ )
+ return (totalTasksRunTime + totalTasksWaitTime) / poolRunTimeCapacity
+ }
+
/**
* Pool type.
*
this.workerNodes.map(async (workerNode, workerNodeKey) => {
this.flushTasksQueue(workerNodeKey)
// FIXME: wait for tasks to be finished
+ const workerExitPromise = new Promise<void>(resolve => {
+ workerNode.worker.on('exit', () => {
+ resolve()
+ })
+ })
await this.destroyWorker(workerNode.worker)
+ await workerExitPromise
})
)
}
if (this.emitter != null) {
this.emitter.emit(PoolEvents.error, error)
}
- if (this.opts.restartWorkerOnError === true && !this.starting) {
+ if (this.opts.enableTasksQueue === true) {
+ const workerNodeKey = this.getWorkerNodeKey(worker)
+ while (this.tasksQueueSize(workerNodeKey) > 0) {
+ let targetWorkerNodeKey: number = workerNodeKey
+ let minQueuedTasks = Infinity
+ for (const [workerNodeId, workerNode] of this.workerNodes.entries()) {
+ if (
+ workerNodeId !== workerNodeKey &&
+ workerNode.usage.tasks.queued === 0
+ ) {
+ targetWorkerNodeKey = workerNodeId
+ break
+ }
+ if (
+ workerNodeId !== workerNodeKey &&
+ workerNode.usage.tasks.queued < minQueuedTasks
+ ) {
+ minQueuedTasks = workerNode.usage.tasks.queued
+ targetWorkerNodeKey = workerNodeId
+ }
+ }
+ this.enqueueTask(
+ targetWorkerNodeKey,
+ this.dequeueTask(workerNodeKey) as Task<Data>
+ )
+ }
+ }
+ if (this.opts.restartWorkerOnError === true) {
this.createAndSetupWorker()
}
})
return message => {
if (message.workerId != null && message.started != null) {
// Worker started message received
- const worker = this.getWorkerById(message.workerId)
- if (worker != null) {
- this.workerNodes[this.getWorkerNodeKey(worker)].info.started =
- message.started
- } else {
- throw new Error('Worker started message received from unknown worker')
- }
+ this.handleWorkerStartedMessage(message)
} else if (message.id != null) {
// Task execution response received
- const promiseResponse = this.promiseResponseMap.get(message.id)
- if (promiseResponse != null) {
- if (message.taskError != null) {
- if (this.emitter != null) {
- this.emitter.emit(PoolEvents.taskError, message.taskError)
- }
- promiseResponse.reject(message.taskError.message)
- } else {
- promiseResponse.resolve(message.data as Response)
- }
- this.afterTaskExecutionHook(promiseResponse.worker, message)
- this.promiseResponseMap.delete(message.id)
- const workerNodeKey = this.getWorkerNodeKey(promiseResponse.worker)
- if (
- this.opts.enableTasksQueue === true &&
- this.tasksQueueSize(workerNodeKey) > 0
- ) {
- this.executeTask(
- workerNodeKey,
- this.dequeueTask(workerNodeKey) as Task<Data>
- )
- }
- this.workerChoiceStrategyContext.update(workerNodeKey)
+ this.handleTaskExecutionResponse(message)
+ }
+ }
+ }
+
+ private handleWorkerStartedMessage (message: MessageValue<Response>): void {
+ // Worker started message received
+ const worker = this.getWorkerById(message.workerId as number)
+ if (worker != null) {
+ this.workerNodes[this.getWorkerNodeKey(worker)].info.started =
+ message.started as boolean
+ } else {
+ throw new Error(
+ `Worker started message received from unknown worker '${
+ message.workerId as number
+ }'`
+ )
+ }
+ }
+
+ private handleTaskExecutionResponse (message: MessageValue<Response>): void {
+ const promiseResponse = this.promiseResponseMap.get(message.id as string)
+ if (promiseResponse != null) {
+ if (message.taskError != null) {
+ if (this.emitter != null) {
+ this.emitter.emit(PoolEvents.taskError, message.taskError)
}
+ promiseResponse.reject(message.taskError.message)
+ } else {
+ promiseResponse.resolve(message.data as Response)
+ }
+ this.afterTaskExecutionHook(promiseResponse.worker, message)
+ this.promiseResponseMap.delete(message.id as string)
+ const workerNodeKey = this.getWorkerNodeKey(promiseResponse.worker)
+ if (
+ this.opts.enableTasksQueue === true &&
+ this.tasksQueueSize(workerNodeKey) > 0
+ ) {
+ this.executeTask(
+ workerNodeKey,
+ this.dequeueTask(workerNodeKey) as Task<Data>
+ )
}
+ this.workerChoiceStrategyContext.update(workerNodeKey)
}
}
private pushWorkerNode (worker: Worker): number {
this.workerNodes.push({
worker,
- info: { id: this.getWorkerId(worker), started: false },
+ info: { id: this.getWorkerId(worker), started: true },
usage: this.getWorkerUsage(),
tasksQueue: new Queue<Task<Data>>()
})
}
private flushTasksQueue (workerNodeKey: number): void {
- if (this.tasksQueueSize(workerNodeKey) > 0) {
- for (let i = 0; i < this.tasksQueueSize(workerNodeKey); i++) {
- this.executeTask(
- workerNodeKey,
- this.dequeueTask(workerNodeKey) as Task<Data>
- )
- }
+ while (this.tasksQueueSize(workerNodeKey) > 0) {
+ this.executeTask(
+ workerNodeKey,
+ this.dequeueTask(workerNodeKey) as Task<Data>
+ )
}
this.workerNodes[workerNodeKey].tasksQueue.clear()
}