X-Git-Url: https://git.piment-noir.org/?a=blobdiff_plain;f=src%2Fpools%2Fabstract-pool.ts;h=f1392edd5124e25421531fec9958a2fea610bf59;hb=3d6dd312a7825521cce506ebb7443bae36a111e6;hp=812a2087419de6df8d457bd5d875c2d2b5f9f179;hpb=19dbc45b0e2975f938aaae8274902ebe82c48cad;p=poolifier.git diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index 812a2087..f1392edd 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -1,5 +1,6 @@ import { randomUUID } from 'node:crypto' import { performance } from 'node:perf_hooks' +import { existsSync } from 'node:fs' import type { MessageValue, PromiseResponseWrapper, @@ -137,10 +138,14 @@ export abstract class AbstractPool< private checkFilePath (filePath: string): void { if ( filePath == null || + typeof filePath !== 'string' || (typeof filePath === 'string' && filePath.trim().length === 0) ) { throw new Error('Please specify a file with a worker implementation') } + if (!existsSync(filePath)) { + throw new Error(`Cannot find the worker file '${filePath}'`) + } } private checkNumberOfWorkers (numberOfWorkers: number): void { @@ -426,7 +431,7 @@ export abstract class AbstractPool< * @returns The pool utilization. */ private get utilization (): number { - const poolRunTimeCapacity = + const poolTimeCapacity = (performance.now() - this.startTimestamp) * this.maxSize const totalTasksRunTime = this.workerNodes.reduce( (accumulator, workerNode) => @@ -438,7 +443,7 @@ export abstract class AbstractPool< accumulator + (workerNode.usage.waitTime?.aggregate ?? 0), 0 ) - return (totalTasksRunTime + totalTasksWaitTime) / poolRunTimeCapacity + return (totalTasksRunTime + totalTasksWaitTime) / poolTimeCapacity } /** @@ -474,6 +479,12 @@ export abstract class AbstractPool< ?.worker } + /** + * Checks if the worker id sent in the received message from a worker is valid. + * + * @param message - The received message. + * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid. + */ private checkMessageWorkerId (message: MessageValue): void { if ( message.workerId != null && @@ -898,19 +909,6 @@ export abstract class AbstractPool< message: MessageValue ): void - /** - * Registers a listener callback on the given worker. - * - * @param worker - The worker which should register a listener. - * @param listener - The message listener callback. - */ - private registerWorkerMessageListener( - worker: Worker, - listener: (message: MessageValue) => void - ): void { - worker.on('message', listener as MessageHandler) - } - /** * Creates a new worker. * @@ -918,24 +916,6 @@ export abstract class AbstractPool< */ protected abstract createWorker (): Worker - /** - * Function that can be hooked up when a worker has been newly created and moved to the pool worker nodes. - * Can be overridden. - * - * @param worker - The newly created worker. - */ - protected afterWorkerSetup (worker: Worker): void { - // Listen to worker messages. - this.registerWorkerMessageListener(worker, this.workerListener()) - // Send startup message to worker. - this.sendToWorker(worker, { - ready: false, - workerId: this.getWorkerInfo(this.getWorkerNodeKey(worker)).id as number - }) - // Setup worker task statistics computation. - this.setWorkerStatistics(worker) - } - /** * Creates a new worker and sets it up completely in the pool worker nodes. * @@ -947,18 +927,21 @@ export abstract class AbstractPool< worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION) worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION) worker.on('error', error => { + const workerNodeKey = this.getWorkerNodeKey(worker) + const workerInfo = this.getWorkerInfo(workerNodeKey) + workerInfo.ready = false if (this.emitter != null) { this.emitter.emit(PoolEvents.error, error) } if (this.opts.restartWorkerOnError === true && !this.starting) { - if (this.getWorkerInfo(this.getWorkerNodeKey(worker)).dynamic) { + if (workerInfo.dynamic) { this.createAndSetupDynamicWorker() } else { this.createAndSetupWorker() } } if (this.opts.enableTasksQueue === true) { - this.redistributeQueuedTasks(worker) + this.redistributeQueuedTasks(workerNodeKey) } }) worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION) @@ -974,34 +957,6 @@ export abstract class AbstractPool< return worker } - private redistributeQueuedTasks (worker: Worker): void { - const workerNodeKey = this.getWorkerNodeKey(worker) - while (this.tasksQueueSize(workerNodeKey) > 0) { - let targetWorkerNodeKey: number = workerNodeKey - let minQueuedTasks = Infinity - for (const [workerNodeId, workerNode] of this.workerNodes.entries()) { - if ( - workerNodeId !== workerNodeKey && - workerNode.usage.tasks.queued === 0 - ) { - targetWorkerNodeKey = workerNodeId - break - } - if ( - workerNodeId !== workerNodeKey && - workerNode.usage.tasks.queued < minQueuedTasks - ) { - minQueuedTasks = workerNode.usage.tasks.queued - targetWorkerNodeKey = workerNodeId - } - } - this.enqueueTask( - targetWorkerNodeKey, - this.dequeueTask(workerNodeKey) as Task - ) - } - } - /** * Creates a new dynamic worker and sets it up completely in the pool worker nodes. * @@ -1033,6 +988,67 @@ export abstract class AbstractPool< return worker } + /** + * Registers a listener callback on the given worker. + * + * @param worker - The worker which should register a listener. + * @param listener - The message listener callback. + */ + private registerWorkerMessageListener( + worker: Worker, + listener: (message: MessageValue) => void + ): void { + worker.on('message', listener as MessageHandler) + } + + /** + * Function that can be hooked up when a worker has been newly created and moved to the pool worker nodes. + * Can be overridden. + * + * @param worker - The newly created worker. + */ + protected afterWorkerSetup (worker: Worker): void { + // Listen to worker messages. + this.registerWorkerMessageListener(worker, this.workerListener()) + // Send startup message to worker. + this.sendToWorker(worker, { + ready: false, + workerId: this.getWorkerInfo(this.getWorkerNodeKey(worker)).id as number + }) + // Setup worker task statistics computation. + this.setWorkerStatistics(worker) + } + + private redistributeQueuedTasks (workerNodeKey: number): void { + while (this.tasksQueueSize(workerNodeKey) > 0) { + let targetWorkerNodeKey: number = workerNodeKey + let minQueuedTasks = Infinity + for (const [workerNodeId, workerNode] of this.workerNodes.entries()) { + const workerInfo = this.getWorkerInfo(workerNodeId) + if ( + workerNodeId !== workerNodeKey && + workerInfo.ready && + workerNode.usage.tasks.queued === 0 + ) { + targetWorkerNodeKey = workerNodeId + break + } + if ( + workerNodeId !== workerNodeKey && + workerInfo.ready && + workerNode.usage.tasks.queued < minQueuedTasks + ) { + minQueuedTasks = workerNode.usage.tasks.queued + targetWorkerNodeKey = workerNodeId + } + } + this.enqueueTask( + targetWorkerNodeKey, + this.dequeueTask(workerNodeKey) as Task + ) + } + } + /** * This function is the listener registered for each worker message. *