* Constructs a new poolifier pool.
*
* @param numberOfWorkers - Number of workers that this pool should manage.
- * @param filePath - Path to the worker-file.
+ * @param filePath - Path to the worker file.
* @param opts - Options for the pool.
*/
public constructor (
this.checkFilePath(this.filePath)
this.checkPoolOptions(this.opts)
- this.chooseWorkerNode.bind(this)
- this.executeTask.bind(this)
- this.enqueueTask.bind(this)
- this.checkAndEmitEvents.bind(this)
+ this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
+ this.executeTask = this.executeTask.bind(this)
+ this.enqueueTask = this.enqueueTask.bind(this)
+ this.checkAndEmitEvents = this.checkAndEmitEvents.bind(this)
this.setupHook()
tasksQueueOptions?: TasksQueueOptions
): void {
if (this.opts.enableTasksQueue === true && !enable) {
- for (const [workerNodeKey] of this.workerNodes.entries()) {
- this.flushTasksQueue(workerNodeKey)
- }
+ this.flushTasksQueues()
}
this.opts.enableTasksQueue = enable
this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
}
/** @inheritDoc */
- public async execute (data: Data): Promise<Response> {
+ public findLastFreeWorkerNodeKey (): number {
+ // It requires node >= 18.0.0
+ // return this.workerNodes.findLastIndex(workerNode => {
+ // return workerNode.tasksUsage?.running === 0
+ // })
+ for (let i = this.workerNodes.length - 1; i >= 0; i--) {
+ if (this.workerNodes[i].tasksUsage?.running === 0) {
+ return i
+ }
+ }
+ return -1
+ }
+
+ /** @inheritDoc */
+ public async execute (data?: Data): Promise<Response> {
const [workerNodeKey, workerNode] = this.chooseWorkerNode()
const submittedTask: Task<Data> = {
// eslint-disable-next-line @typescript-eslint/consistent-type-assertions
const workerNodeKey = this.getWorkerNodeKey(worker)
this.flushTasksQueue(workerNodeKey)
}
+
+ private flushTasksQueues (): void {
+ for (const [workerNodeKey] of this.workerNodes.entries()) {
+ this.flushTasksQueue(workerNodeKey)
+ }
+ }
}