* Constructs a new poolifier pool.
*
* @param numberOfWorkers - Number of workers that this pool should manage.
- * @param filePath - Path to the worker-file.
+ * @param filePath - Path to the worker file.
* @param opts - Options for the pool.
*/
public constructor (
this.checkFilePath(this.filePath)
this.checkPoolOptions(this.opts)
- this.chooseWorkerNode.bind(this)
- this.executeTask.bind(this)
- this.enqueueTask.bind(this)
- this.checkAndEmitEvents.bind(this)
+ this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
+ this.executeTask = this.executeTask.bind(this)
+ this.enqueueTask = this.enqueueTask.bind(this)
+ this.checkAndEmitEvents = this.checkAndEmitEvents.bind(this)
this.setupHook()
}
/** @inheritDoc */
- public enableTasksQueue (enable: boolean, opts?: TasksQueueOptions): void {
+ public enableTasksQueue (
+ enable: boolean,
+ tasksQueueOptions?: TasksQueueOptions
+ ): void {
if (this.opts.enableTasksQueue === true && !enable) {
- for (const [workerNodeKey] of this.workerNodes.entries()) {
- this.flushTasksQueue(workerNodeKey)
- }
+ this.flushTasksQueues()
}
this.opts.enableTasksQueue = enable
- this.setTasksQueueOptions(opts as TasksQueueOptions)
+ this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
}
/** @inheritDoc */
- public setTasksQueueOptions (opts: TasksQueueOptions): void {
+ public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
if (this.opts.enableTasksQueue === true) {
- this.checkValidTasksQueueOptions(opts)
- this.opts.tasksQueueOptions = this.buildTasksQueueOptions(opts)
+ this.checkValidTasksQueueOptions(tasksQueueOptions)
+ this.opts.tasksQueueOptions =
+ this.buildTasksQueueOptions(tasksQueueOptions)
} else {
delete this.opts.tasksQueueOptions
}
}
/** @inheritDoc */
- public async execute (data: Data): Promise<Response> {
+ public findLastFreeWorkerNodeKey (): number {
+ // It requires node >= 18.0.0
+ // return this.workerNodes.findLastIndex(workerNode => {
+ // return workerNode.tasksUsage?.running === 0
+ // })
+ for (let i = this.workerNodes.length - 1; i >= 0; i--) {
+ if (this.workerNodes[i].tasksUsage?.running === 0) {
+ return i
+ }
+ }
+ return -1
+ }
+
+ /** @inheritDoc */
+ public async execute (data?: Data): Promise<Response> {
const [workerNodeKey, workerNode] = this.chooseWorkerNode()
const submittedTask: Task<Data> = {
// eslint-disable-next-line @typescript-eslint/consistent-type-assertions
* Gets the given worker its tasks usage in the pool.
*
* @param worker - The worker.
- * @throws {@link Error} if the worker is not found in the pool worker nodes.
+ * @throws Error if the worker is not found in the pool worker nodes.
* @returns The worker tasks usage.
*/
private getWorkerTasksUsage (worker: Worker): TasksUsage | undefined {
const workerNodeKey = this.getWorkerNodeKey(worker)
this.flushTasksQueue(workerNodeKey)
}
+
+ private flushTasksQueues (): void {
+ for (const [workerNodeKey] of this.workerNodes.entries()) {
+ this.flushTasksQueue(workerNodeKey)
+ }
+ }
}