public readonly workers: Worker[] = []
/**
- * ID for the next worker.
+ * Index for the next worker.
*/
- public nextWorker: number = 0
+ public nextWorkerIndex: number = 0
/**
* - `key`: The `Worker`
/**
* ID of the next message.
*/
- protected id: number = 0
+ protected nextMessageId: number = 0
/**
* Constructs a new poolifier pool.
this.setupHook()
for (let i = 1; i <= this.numberOfWorkers; i++) {
- this.internalNewWorker()
+ this.createAndSetupWorker()
}
this.emitter = new PoolEmitter()
}
/**
- * Setup hook that can be overridden by a Poolifier pool implementation
- * to run code before workers are created in the abstract constructor.
+ * Index for the next worker.
+ *
+ * @returns Index for the next worker.
+ * @deprecated Only here for backward compatibility.
*/
- protected setupHook (): void {
- // Can be overridden
+ public get nextWorker (): number {
+ return this.nextWorkerIndex
}
- /**
- * Should return whether the worker is the main worker or not.
- */
- protected abstract isMain (): boolean
+ public execute (data: Data): Promise<Response> {
+ // Configure worker to handle message with the specified task
+ const worker = this.chooseWorker()
+ this.increaseWorkersTask(worker)
+ const messageId = ++this.nextMessageId
+ const res = this.internalExecute(worker, messageId)
+ this.sendToWorker(worker, { data: data || ({} as Data), id: messageId })
+ return res
+ }
public async destroy (): Promise<void> {
for (const worker of this.workers) {
protected abstract destroyWorker (worker: Worker): void | Promise<void>
/**
- * Send a message to the given worker.
- *
- * @param worker The worker which should receive the message.
- * @param message The message.
+ * Setup hook that can be overridden by a Poolifier pool implementation
+ * to run code before workers are created in the abstract constructor.
*/
- protected abstract sendToWorker (
- worker: Worker,
- message: MessageValue<Data>
- ): void
+ protected setupHook (): void {
+ // Can be overridden
+ }
/**
- * Adds the given worker to the pool.
+ * Should return whether the worker is the main worker or not.
+ */
+ protected abstract isMain (): boolean
+
+ /**
+ * Increase the number of tasks that the given workers has done.
*
- * @param worker Worker that will be added.
+ * @param worker Workers whose tasks are increased.
*/
- protected addWorker (worker: Worker): void {
- const previousWorkerIndex = this.tasks.get(worker)
- if (previousWorkerIndex !== undefined) {
- this.tasks.set(worker, previousWorkerIndex + 1)
+ protected increaseWorkersTask (worker: Worker): void {
+ const numberOfTasksTheWorkerHas = this.tasks.get(worker)
+ if (numberOfTasksTheWorkerHas !== undefined) {
+ this.tasks.set(worker, numberOfTasksTheWorkerHas + 1)
} else {
throw Error('Worker could not be found in tasks map')
}
this.tasks.delete(worker)
}
- public execute (data: Data): Promise<Response> {
- // Configure worker to handle message with the specified task
- const worker = this.chooseWorker()
- this.addWorker(worker)
- const id = ++this.id
- const res = this.internalExecute(worker, id)
- this.sendToWorker(worker, { data: data || ({} as Data), id: id })
- return res
+ /**
+ * Choose a worker for the next task.
+ *
+ * The default implementation uses a round robin algorithm to distribute the load.
+ *
+ * @returns Worker.
+ */
+ protected chooseWorker (): Worker {
+ const chosenWorker = this.workers[this.nextWorkerIndex]
+ this.nextWorkerIndex++
+ this.nextWorkerIndex %= this.workers.length
+ return chosenWorker
}
+ /**
+ * Send a message to the given worker.
+ *
+ * @param worker The worker which should receive the message.
+ * @param message The message.
+ */
+ protected abstract sendToWorker (
+ worker: Worker,
+ message: MessageValue<Data>
+ ): void
+
protected abstract registerWorkerMessageListener (
port: Worker,
listener: (message: MessageValue<Response>) => void
listener: (message: MessageValue<Response>) => void
): void
- protected internalExecute (worker: Worker, id: number): Promise<Response> {
+ protected internalExecute (
+ worker: Worker,
+ messageId: number
+ ): Promise<Response> {
return new Promise((resolve, reject) => {
const listener: (message: MessageValue<Response>) => void = message => {
- if (message.id === id) {
+ if (message.id === messageId) {
this.unregisterWorkerMessageListener(worker, listener)
- this.addWorker(worker)
+ this.increaseWorkersTask(worker)
if (message.error) reject(message.error)
else resolve(message.data as Response)
}
})
}
- /**
- * Choose a worker for the next task.
- *
- * The default implementation uses a round robin algorithm to distribute the load.
- *
- * @returns Worker.
- */
- protected chooseWorker (): Worker {
- this.nextWorker =
- this.nextWorker === this.workers.length - 1 ? 0 : this.nextWorker + 1
- return this.workers[this.nextWorker]
- }
-
/**
* Returns a newly created worker.
*/
- protected abstract newWorker (): Worker
+ protected abstract createWorker (): Worker
/**
* Function that can be hooked up when a worker has been newly created and moved to the workers registry.
*
* @param worker The newly created worker.
*/
- protected abstract afterNewWorkerPushed (worker: Worker): void
+ protected abstract afterWorkerSetup (worker: Worker): void
/**
* Creates a new worker for this pool and sets it up completely.
*
* @returns New, completely set up worker.
*/
- protected internalNewWorker (): Worker {
- const worker: Worker = this.newWorker()
+ protected createAndSetupWorker (): Worker {
+ const worker: Worker = this.createWorker()
+
worker.on('error', this.opts.errorHandler ?? (() => {}))
worker.on('online', this.opts.onlineHandler ?? (() => {}))
// TODO handle properly when a worker exit
worker.on('exit', this.opts.exitHandler ?? (() => {}))
+
this.workers.push(worker)
- this.afterNewWorkerPushed(worker)
- // init tasks map
+
+ // Init tasks map
this.tasks.set(worker, 0)
+
+ this.afterWorkerSetup(worker)
+
return worker
}
}