MessageValue,
PromiseWorkerResponseWrapper
} from '../utility-types'
+import { EMPTY_FUNCTION } from '../utils'
import { isKillBehavior, KillBehaviors } from '../worker/worker-options'
import type { IPoolInternal } from './pool-internal'
-import { PoolEmitter } from './pool-internal'
+import { PoolEmitter, PoolType } from './pool-internal'
import type { WorkerChoiceStrategy } from './selection-strategies'
import {
WorkerChoiceStrategies,
WorkerChoiceStrategyContext
} from './selection-strategies'
-/**
- * An intentional empty function.
- */
-const EMPTY_FUNCTION: () => void = () => {
- /* Intentionally empty */
-}
-
/**
* Callback invoked if the worker raised an error.
*/
* The work choice strategy to use in this pool.
*/
workerChoiceStrategy?: WorkerChoiceStrategy
+ /**
+ * Pool events emission.
+ *
+ * Default to true.
+ */
+ enableEvents?: boolean
}
/**
public readonly tasks: Map<Worker, number> = new Map<Worker, number>()
/** @inheritdoc */
- public readonly emitter: PoolEmitter
+ public readonly emitter?: PoolEmitter
+
+ /** @inheritdoc */
+ public readonly max?: number
/**
* The promise map.
*
- * - `key`: This is the message ID of each submitted task.
+ * - `key`: This is the message Id of each submitted task.
* - `value`: An object that contains the worker, the resolve function and the reject function.
*
* When we receive a message from the worker we get a map entry and resolve/reject the promise based on the message.
> = new Map<number, PromiseWorkerResponseWrapper<Worker, Response>>()
/**
- * ID of the next message.
+ * Id of the next message.
*/
protected nextMessageId: number = 0
}
this.checkNumberOfWorkers(this.numberOfWorkers)
this.checkFilePath(this.filePath)
+ this.checkPoolOptions(this.opts)
this.setupHook()
for (let i = 1; i <= this.numberOfWorkers; i++) {
this.createAndSetupWorker()
}
- this.emitter = new PoolEmitter()
+ if (this.opts.enableEvents) {
+ this.emitter = new PoolEmitter()
+ }
this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext(
this,
() => {
const workerCreated = this.createAndSetupWorker()
- this.registerWorkerMessageListener(workerCreated, message => {
+ this.registerWorkerMessageListener(workerCreated, async message => {
const tasksInProgress = this.tasks.get(workerCreated)
if (
isKillBehavior(KillBehaviors.HARD, message.kill) ||
tasksInProgress === 0
) {
// Kill received from the worker, means that no new tasks are submitted to that worker for a while ( > maxInactiveTime)
- void this.destroyWorker(workerCreated)
+ await this.destroyWorker(workerCreated)
}
})
return workerCreated
},
- opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
+ this.opts.workerChoiceStrategy
)
}
throw new Error(
'Cannot instantiate a pool with a negative number of workers'
)
- } else if (!this.dynamic && numberOfWorkers === 0) {
+ } else if (this.type === PoolType.FIXED && numberOfWorkers === 0) {
throw new Error('Cannot instantiate a fixed pool with no worker')
}
}
+ private checkPoolOptions (opts: PoolOptions<Worker>): void {
+ this.opts.workerChoiceStrategy =
+ opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
+ this.opts.enableEvents = opts.enableEvents ?? true
+ }
+
/** @inheritdoc */
- public get dynamic (): boolean {
- return false
+ public abstract get type (): PoolType
+
+ /** @inheritdoc */
+ public get numberOfRunningTasks (): number {
+ return this.promiseMap.size
}
/** @inheritdoc */
)
}
+ /** @inheritdoc */
+ public abstract get busy (): boolean
+
+ protected internalGetBusyStatus (): boolean {
+ return (
+ this.numberOfRunningTasks >= this.numberOfWorkers &&
+ this.findFreeTasksMapEntry() === false
+ )
+ }
+
+ /** @inheritdoc */
+ public findFreeTasksMapEntry (): [Worker, number] | false {
+ for (const [worker, numberOfTasks] of this.tasks) {
+ if (numberOfTasks === 0) {
+ // A worker is free, return the matching tasks map entry
+ return [worker, numberOfTasks]
+ }
+ }
+ return false
+ }
+
/** @inheritdoc */
public execute (data: Data): Promise<Response> {
// Configure worker to handle message with the specified task
const worker = this.chooseWorker()
- this.increaseWorkersTask(worker)
const messageId = ++this.nextMessageId
const res = this.internalExecute(worker, messageId)
+ this.checkAndEmitBusy()
this.sendToWorker(worker, { data: data || ({} as Data), id: messageId })
return res
}
protected abstract isMain (): boolean
/**
- * Increase the number of tasks that the given workers has done.
+ * Increase the number of tasks that the given workers has applied.
*
* @param worker Worker whose tasks are increased.
*/
}
/**
- * Decrease the number of tasks that the given workers has done.
+ * Decrease the number of tasks that the given workers has applied.
*
* @param worker Worker whose tasks are decreased.
*/
}
/**
- * Step the number of tasks that the given workers has done.
+ * Step the number of tasks that the given workers has applied.
*
* @param worker Worker whose tasks are set.
* @param step Worker number of tasks step.
worker: Worker,
messageId: number
): Promise<Response> {
+ this.increaseWorkersTask(worker)
return new Promise<Response>((resolve, reject) => {
this.promiseMap.set(messageId, { resolve, reject, worker })
})
}
}
}
+
+ private checkAndEmitBusy (): void {
+ if (this.opts.enableEvents && this.busy) {
+ this.emitter?.emit('busy')
+ }
+ }
}