From 280c2a7728fbeb53612d8bc115a295d0255dd991 Mon Sep 17 00:00:00 2001 From: Shinigami Date: Sat, 13 Feb 2021 19:28:44 +0100 Subject: [PATCH] Improve property namings (#145) --- CHANGELOG.md | 11 ++- src/pools/abstract-pool.ts | 131 ++++++++++++++++++++--------------- src/pools/cluster/dynamic.ts | 2 +- src/pools/cluster/fixed.ts | 4 +- src/pools/pool.ts | 8 +-- src/pools/thread/dynamic.ts | 6 +- src/pools/thread/fixed.ts | 6 +- 7 files changed, 95 insertions(+), 73 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index ec50e05d..6e8e90a0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -34,13 +34,18 @@ _This is not a limitation by poolifier but NodeJS._ #### Public properties renaming - Thread Pool's `numWorkers` is now `numberOfWorkers` +- Thread Pool's `nextWorker` is now `nextWorkerIndex` -#### Internal (protected) methods renaming +#### Internal (protected) properties and methods renaming -Those methods are not intended to be used from final users +These properties are not intended for end users + +- `id` => `nextMessageId` + +These methods are not intended for end users - `_chooseWorker` => `chooseWorker` -- `_newWorker` => `newWorker` +- `_newWorker` => `createWorker` - `_execute` => `internalExecute` - `_chooseWorker` => `chooseWorker` - `_checkAlive` => `checkAlive` diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index b3198af3..6e4465ed 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -76,9 +76,9 @@ export abstract class AbstractPool< public readonly workers: Worker[] = [] /** - * ID for the next worker. + * Index for the next worker. */ - public nextWorker: number = 0 + public nextWorkerIndex: number = 0 /** * - `key`: The `Worker` @@ -98,7 +98,7 @@ export abstract class AbstractPool< /** * ID of the next message. */ - protected id: number = 0 + protected nextMessageId: number = 0 /** * Constructs a new poolifier pool. @@ -123,7 +123,7 @@ export abstract class AbstractPool< this.setupHook() for (let i = 1; i <= this.numberOfWorkers; i++) { - this.internalNewWorker() + this.createAndSetupWorker() } this.emitter = new PoolEmitter() @@ -141,17 +141,24 @@ export abstract class AbstractPool< } /** - * Setup hook that can be overridden by a Poolifier pool implementation - * to run code before workers are created in the abstract constructor. + * Index for the next worker. + * + * @returns Index for the next worker. + * @deprecated Only here for backward compatibility. */ - protected setupHook (): void { - // Can be overridden + public get nextWorker (): number { + return this.nextWorkerIndex } - /** - * Should return whether the worker is the main worker or not. - */ - protected abstract isMain (): boolean + public execute (data: Data): Promise { + // Configure worker to handle message with the specified task + const worker = this.chooseWorker() + this.increaseWorkersTask(worker) + const messageId = ++this.nextMessageId + const res = this.internalExecute(worker, messageId) + this.sendToWorker(worker, { data: data || ({} as Data), id: messageId }) + return res + } public async destroy (): Promise { for (const worker of this.workers) { @@ -167,25 +174,27 @@ export abstract class AbstractPool< protected abstract destroyWorker (worker: Worker): void | Promise /** - * Send a message to the given worker. - * - * @param worker The worker which should receive the message. - * @param message The message. + * Setup hook that can be overridden by a Poolifier pool implementation + * to run code before workers are created in the abstract constructor. */ - protected abstract sendToWorker ( - worker: Worker, - message: MessageValue - ): void + protected setupHook (): void { + // Can be overridden + } /** - * Adds the given worker to the pool. + * Should return whether the worker is the main worker or not. + */ + protected abstract isMain (): boolean + + /** + * Increase the number of tasks that the given workers has done. * - * @param worker Worker that will be added. + * @param worker Workers whose tasks are increased. */ - protected addWorker (worker: Worker): void { - const previousWorkerIndex = this.tasks.get(worker) - if (previousWorkerIndex !== undefined) { - this.tasks.set(worker, previousWorkerIndex + 1) + protected increaseWorkersTask (worker: Worker): void { + const numberOfTasksTheWorkerHas = this.tasks.get(worker) + if (numberOfTasksTheWorkerHas !== undefined) { + this.tasks.set(worker, numberOfTasksTheWorkerHas + 1) } else { throw Error('Worker could not be found in tasks map') } @@ -203,16 +212,31 @@ export abstract class AbstractPool< this.tasks.delete(worker) } - public execute (data: Data): Promise { - // Configure worker to handle message with the specified task - const worker = this.chooseWorker() - this.addWorker(worker) - const id = ++this.id - const res = this.internalExecute(worker, id) - this.sendToWorker(worker, { data: data || ({} as Data), id: id }) - return res + /** + * Choose a worker for the next task. + * + * The default implementation uses a round robin algorithm to distribute the load. + * + * @returns Worker. + */ + protected chooseWorker (): Worker { + const chosenWorker = this.workers[this.nextWorkerIndex] + this.nextWorkerIndex++ + this.nextWorkerIndex %= this.workers.length + return chosenWorker } + /** + * Send a message to the given worker. + * + * @param worker The worker which should receive the message. + * @param message The message. + */ + protected abstract sendToWorker ( + worker: Worker, + message: MessageValue + ): void + protected abstract registerWorkerMessageListener ( port: Worker, listener: (message: MessageValue) => void @@ -223,12 +247,15 @@ export abstract class AbstractPool< listener: (message: MessageValue) => void ): void - protected internalExecute (worker: Worker, id: number): Promise { + protected internalExecute ( + worker: Worker, + messageId: number + ): Promise { return new Promise((resolve, reject) => { const listener: (message: MessageValue) => void = message => { - if (message.id === id) { + if (message.id === messageId) { this.unregisterWorkerMessageListener(worker, listener) - this.addWorker(worker) + this.increaseWorkersTask(worker) if (message.error) reject(message.error) else resolve(message.data as Response) } @@ -237,23 +264,10 @@ export abstract class AbstractPool< }) } - /** - * Choose a worker for the next task. - * - * The default implementation uses a round robin algorithm to distribute the load. - * - * @returns Worker. - */ - protected chooseWorker (): Worker { - this.nextWorker = - this.nextWorker === this.workers.length - 1 ? 0 : this.nextWorker + 1 - return this.workers[this.nextWorker] - } - /** * Returns a newly created worker. */ - protected abstract newWorker (): Worker + protected abstract createWorker (): Worker /** * Function that can be hooked up when a worker has been newly created and moved to the workers registry. @@ -262,23 +276,28 @@ export abstract class AbstractPool< * * @param worker The newly created worker. */ - protected abstract afterNewWorkerPushed (worker: Worker): void + protected abstract afterWorkerSetup (worker: Worker): void /** * Creates a new worker for this pool and sets it up completely. * * @returns New, completely set up worker. */ - protected internalNewWorker (): Worker { - const worker: Worker = this.newWorker() + protected createAndSetupWorker (): Worker { + const worker: Worker = this.createWorker() + worker.on('error', this.opts.errorHandler ?? (() => {})) worker.on('online', this.opts.onlineHandler ?? (() => {})) // TODO handle properly when a worker exit worker.on('exit', this.opts.exitHandler ?? (() => {})) + this.workers.push(worker) - this.afterNewWorkerPushed(worker) - // init tasks map + + // Init tasks map this.tasks.set(worker, 0) + + this.afterWorkerSetup(worker) + return worker } } diff --git a/src/pools/cluster/dynamic.ts b/src/pools/cluster/dynamic.ts index 3d523890..6d665cef 100644 --- a/src/pools/cluster/dynamic.ts +++ b/src/pools/cluster/dynamic.ts @@ -63,7 +63,7 @@ export class DynamicClusterPool< return super.chooseWorker() } // All workers are busy, create a new worker - const worker = this.internalNewWorker() + const worker = this.createAndSetupWorker() worker.on('message', (message: MessageValue) => { if (message.kill) { this.sendToWorker(worker, { kill: 1 }) diff --git a/src/pools/cluster/fixed.ts b/src/pools/cluster/fixed.ts index 601ee532..0d8021eb 100644 --- a/src/pools/cluster/fixed.ts +++ b/src/pools/cluster/fixed.ts @@ -81,11 +81,11 @@ export class FixedClusterPool< port.removeListener('message', listener) } - protected newWorker (): Worker { + protected createWorker (): Worker { return fork(this.opts.env) } - protected afterNewWorkerPushed (worker: Worker): void { + protected afterWorkerSetup (worker: Worker): void { // we will attach a listener for every task, // when task is completed the listener will be removed but to avoid warnings we are increasing the max listeners size worker.setMaxListeners(this.opts.maxTasks ?? 1000) diff --git a/src/pools/pool.ts b/src/pools/pool.ts index 55e15792..f96e5c0c 100644 --- a/src/pools/pool.ts +++ b/src/pools/pool.ts @@ -5,10 +5,6 @@ * @template Response Type of response of execution. */ export interface IPool { - /** - * Shut down every current worker in this pool. - */ - destroy(): Promise /** * Perform the task specified in the constructor with the data parameter. * @@ -16,4 +12,8 @@ export interface IPool { * @returns Promise that will be resolved when the task is successfully completed. */ execute(data: Data): Promise + /** + * Shut down every current worker in this pool. + */ + destroy(): Promise } diff --git a/src/pools/thread/dynamic.ts b/src/pools/thread/dynamic.ts index c8e93850..c1033315 100644 --- a/src/pools/thread/dynamic.ts +++ b/src/pools/thread/dynamic.ts @@ -55,15 +55,15 @@ export class DynamicThreadPool< } if (worker) { - // a worker is free, use it + // A worker is free, use it return worker } else { if (this.workers.length === this.max) { this.emitter.emit('FullPool') return super.chooseWorker() } - // all workers are busy create a new worker - const worker = this.internalNewWorker() + // All workers are busy, create a new worker + const worker = this.createAndSetupWorker() worker.port2?.on('message', (message: MessageValue) => { if (message.kill) { this.sendToWorker(worker, { kill: 1 }) diff --git a/src/pools/thread/fixed.ts b/src/pools/thread/fixed.ts index ee3d63c1..74c14ff2 100644 --- a/src/pools/thread/fixed.ts +++ b/src/pools/thread/fixed.ts @@ -72,15 +72,13 @@ export class FixedThreadPool< port.port2?.removeListener('message', listener) } - protected newWorker (): ThreadWorkerWithMessageChannel { + protected createWorker (): ThreadWorkerWithMessageChannel { return new Worker(this.filePath, { env: SHARE_ENV }) } - protected afterNewWorkerPushed ( - worker: ThreadWorkerWithMessageChannel - ): void { + protected afterWorkerSetup (worker: ThreadWorkerWithMessageChannel): void { const { port1, port2 } = new MessageChannel() worker.postMessage({ parent: port1 }, [port1]) worker.port1 = port1 -- 2.34.1