X-Git-Url: https://git.piment-noir.org/?a=blobdiff_plain;f=src%2Fpools%2Fabstract-pool.ts;h=f1392edd5124e25421531fec9958a2fea610bf59;hb=3d6dd312a7825521cce506ebb7443bae36a111e6;hp=4bf7785d7eb4813dc9a56e63239db29f29de770b;hpb=9b1068374b1a52479b07e1e22b692289d5579237;p=poolifier.git diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index 4bf7785d..f1392edd 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -1,5 +1,6 @@ import { randomUUID } from 'node:crypto' import { performance } from 'node:perf_hooks' +import { existsSync } from 'node:fs' import type { MessageValue, PromiseResponseWrapper, @@ -137,10 +138,14 @@ export abstract class AbstractPool< private checkFilePath (filePath: string): void { if ( filePath == null || + typeof filePath !== 'string' || (typeof filePath === 'string' && filePath.trim().length === 0) ) { throw new Error('Please specify a file with a worker implementation') } + if (!existsSync(filePath)) { + throw new Error(`Cannot find the worker file '${filePath}'`) + } } private checkNumberOfWorkers (numberOfWorkers: number): void { @@ -904,19 +909,6 @@ export abstract class AbstractPool< message: MessageValue ): void - /** - * Registers a listener callback on the given worker. - * - * @param worker - The worker which should register a listener. - * @param listener - The message listener callback. - */ - private registerWorkerMessageListener( - worker: Worker, - listener: (message: MessageValue) => void - ): void { - worker.on('message', listener as MessageHandler) - } - /** * Creates a new worker. * @@ -924,24 +916,6 @@ export abstract class AbstractPool< */ protected abstract createWorker (): Worker - /** - * Function that can be hooked up when a worker has been newly created and moved to the pool worker nodes. - * Can be overridden. - * - * @param worker - The newly created worker. - */ - protected afterWorkerSetup (worker: Worker): void { - // Listen to worker messages. - this.registerWorkerMessageListener(worker, this.workerListener()) - // Send startup message to worker. - this.sendToWorker(worker, { - ready: false, - workerId: this.getWorkerInfo(this.getWorkerNodeKey(worker)).id as number - }) - // Setup worker task statistics computation. - this.setWorkerStatistics(worker) - } - /** * Creates a new worker and sets it up completely in the pool worker nodes. * @@ -983,33 +957,6 @@ export abstract class AbstractPool< return worker } - private redistributeQueuedTasks (workerNodeKey: number): void { - while (this.tasksQueueSize(workerNodeKey) > 0) { - let targetWorkerNodeKey: number = workerNodeKey - let minQueuedTasks = Infinity - for (const [workerNodeId, workerNode] of this.workerNodes.entries()) { - if ( - workerNodeId !== workerNodeKey && - workerNode.usage.tasks.queued === 0 - ) { - targetWorkerNodeKey = workerNodeId - break - } - if ( - workerNodeId !== workerNodeKey && - workerNode.usage.tasks.queued < minQueuedTasks - ) { - minQueuedTasks = workerNode.usage.tasks.queued - targetWorkerNodeKey = workerNodeId - } - } - this.enqueueTask( - targetWorkerNodeKey, - this.dequeueTask(workerNodeKey) as Task - ) - } - } - /** * Creates a new dynamic worker and sets it up completely in the pool worker nodes. * @@ -1041,6 +988,67 @@ export abstract class AbstractPool< return worker } + /** + * Registers a listener callback on the given worker. + * + * @param worker - The worker which should register a listener. + * @param listener - The message listener callback. + */ + private registerWorkerMessageListener( + worker: Worker, + listener: (message: MessageValue) => void + ): void { + worker.on('message', listener as MessageHandler) + } + + /** + * Function that can be hooked up when a worker has been newly created and moved to the pool worker nodes. + * Can be overridden. + * + * @param worker - The newly created worker. + */ + protected afterWorkerSetup (worker: Worker): void { + // Listen to worker messages. + this.registerWorkerMessageListener(worker, this.workerListener()) + // Send startup message to worker. + this.sendToWorker(worker, { + ready: false, + workerId: this.getWorkerInfo(this.getWorkerNodeKey(worker)).id as number + }) + // Setup worker task statistics computation. + this.setWorkerStatistics(worker) + } + + private redistributeQueuedTasks (workerNodeKey: number): void { + while (this.tasksQueueSize(workerNodeKey) > 0) { + let targetWorkerNodeKey: number = workerNodeKey + let minQueuedTasks = Infinity + for (const [workerNodeId, workerNode] of this.workerNodes.entries()) { + const workerInfo = this.getWorkerInfo(workerNodeId) + if ( + workerNodeId !== workerNodeKey && + workerInfo.ready && + workerNode.usage.tasks.queued === 0 + ) { + targetWorkerNodeKey = workerNodeId + break + } + if ( + workerNodeId !== workerNodeKey && + workerInfo.ready && + workerNode.usage.tasks.queued < minQueuedTasks + ) { + minQueuedTasks = workerNode.usage.tasks.queued + targetWorkerNodeKey = workerNodeId + } + } + this.enqueueTask( + targetWorkerNodeKey, + this.dequeueTask(workerNodeKey) as Task + ) + } + } + /** * This function is the listener registered for each worker message. *