-import EventEmitter from 'events'
import type { MessageValue } from '../utility-types'
-import type { IPool } from './pool'
+import type { IPoolInternal } from './pool-internal'
+import { PoolEmitter } from './pool-internal'
+import type { WorkerChoiceStrategy } from './selection-strategies'
+import {
+ WorkerChoiceStrategies,
+ WorkerChoiceStrategyContext
+} from './selection-strategies'
/**
* An intentional empty function.
*/
-function emptyFunction () {
- // intentionally left blank
+const EMPTY_FUNCTION: () => void = () => {
+ /* Intentionally empty */
}
/**
* @see [Node events emitter.setMaxListeners(n)](https://nodejs.org/api/events.html#events_emitter_setmaxlisteners_n)
*/
maxTasks?: number
+ /**
+ * The work choice strategy to use in this pool.
+ */
+ workerChoiceStrategy?: WorkerChoiceStrategy
}
-/**
- * Internal poolifier pool emitter.
- */
-class PoolEmitter extends EventEmitter {}
-
/**
* Base class containing some shared logic for all poolifier pools.
*
Worker extends IWorker,
Data = unknown,
Response = unknown
-> implements IPool<Data, Response> {
- /**
- * List of currently available workers.
- */
+> implements IPoolInternal<Worker, Data, Response> {
+ /** @inheritdoc */
public readonly workers: Worker[] = []
- /**
- * Index for the next worker.
- */
- public nextWorkerIndex: number = 0
-
- /**
- * The tasks map.
- *
- * - `key`: The `Worker`
- * - `value`: Number of tasks currently in progress on the worker.
- */
+ /** @inheritdoc */
public readonly tasks: Map<Worker, number> = new Map<Worker, number>()
- /**
- * Emitter on which events can be listened to.
- *
- * Events that can currently be listened to:
- *
- * - `'FullPool'`
- */
+ /** @inheritdoc */
public readonly emitter: PoolEmitter
/**
*/
protected nextMessageId: number = 0
+ /**
+ * Worker choice strategy instance implementing the worker choice algorithm.
+ *
+ * Default to a strategy implementing a round robin algorithm.
+ */
+ protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
+ Worker,
+ Data,
+ Response
+ >
+
/**
* Constructs a new poolifier pool.
*
}
this.emitter = new PoolEmitter()
+ this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext(
+ this,
+ opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
+ )
}
- private checkFilePath (filePath: string) {
+ private checkFilePath (filePath: string): void {
if (!filePath) {
throw new Error('Please specify a file with a worker implementation')
}
}
- /**
- * Perform the task specified in the constructor with the data parameter.
- *
- * @param data The input for the specified task. This can only be serializable data.
- * @returns Promise that will be resolved when the task is successfully completed.
- */
+ /** @inheritdoc */
+ public isDynamic (): boolean {
+ return false
+ }
+
+ /** @inheritdoc */
+ public setWorkerChoiceStrategy (
+ workerChoiceStrategy: WorkerChoiceStrategy
+ ): void {
+ this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
+ workerChoiceStrategy
+ )
+ }
+
+ /** @inheritdoc */
public execute (data: Data): Promise<Response> {
// Configure worker to handle message with the specified task
const worker = this.chooseWorker()
return res
}
- /**
- * Shut down every current worker in this pool.
- */
+ /** @inheritdoc */
public async destroy (): Promise<void> {
await Promise.all(this.workers.map(worker => this.destroyWorker(worker)))
}
- /**
- * Shut down given worker.
- *
- * @param worker A worker within `workers`.
- */
- protected abstract destroyWorker (worker: Worker): void | Promise<void>
+ /** @inheritdoc */
+ public abstract destroyWorker (worker: Worker): void | Promise<void>
/**
* Setup hook that can be overridden by a Poolifier pool implementation
* @param worker Worker whose tasks are set.
* @param step Worker number of tasks step.
*/
- private stepWorkerNumberOfTasks (worker: Worker, step: number) {
+ private stepWorkerNumberOfTasks (worker: Worker, step: number): void {
const numberOfTasksInProgress = this.tasks.get(worker)
if (numberOfTasksInProgress !== undefined) {
this.tasks.set(worker, numberOfTasksInProgress + step)
* @returns Worker.
*/
protected chooseWorker (): Worker {
- const chosenWorker = this.workers[this.nextWorkerIndex]
- this.nextWorkerIndex =
- this.workers.length - 1 === this.nextWorkerIndex
- ? 0
- : this.nextWorkerIndex + 1
- return chosenWorker
+ return this.workerChoiceStrategyContext.execute()
}
/**
message: MessageValue<Data>
): void
- protected abstract registerWorkerMessageListener<
+ /** @inheritdoc */
+ public abstract registerWorkerMessageListener<
Message extends Data | Response
> (worker: Worker, listener: (message: MessageValue<Message>) => void): void
*/
protected abstract afterWorkerSetup (worker: Worker): void
- /**
- * Creates a new worker for this pool and sets it up completely.
- *
- * @returns New, completely set up worker.
- */
- protected createAndSetupWorker (): Worker {
+ /** @inheritdoc */
+ public createAndSetupWorker (): Worker {
const worker: Worker = this.createWorker()
- worker.on('error', this.opts.errorHandler ?? emptyFunction)
- worker.on('online', this.opts.onlineHandler ?? emptyFunction)
- worker.on('exit', this.opts.exitHandler ?? emptyFunction)
+ worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
+ worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
+ worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
worker.once('exit', () => this.removeWorker(worker))
this.workers.push(worker)