X-Git-Url: https://git.piment-noir.org/?a=blobdiff_plain;f=src%2Fpools%2Fabstract-pool.ts;h=2a52d5486984133e620552877e90921d50d96190;hb=73dc166d705728ff26331592a0dc0081dc594761;hp=00bfaf8eea87f991221664e7cea3059cca4f5457;hpb=501aea93c14789538c4e221fbab2f8afb80f4cf9;p=poolifier.git diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index 00bfaf8e..2a52d548 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -1,6 +1,7 @@ import { randomUUID } from 'node:crypto' import { performance } from 'node:perf_hooks' import { existsSync } from 'node:fs' +import { type TransferListItem } from 'node:worker_threads' import type { MessageValue, PromiseResponseWrapper, @@ -105,7 +106,9 @@ export abstract class AbstractPool< protected readonly opts: PoolOptions ) { if (!this.isMain()) { - throw new Error('Cannot start a pool from a worker!') + throw new Error( + 'Cannot start a pool from a worker with the same type as the pool' + ) } this.checkNumberOfWorkers(this.numberOfWorkers) this.checkFilePath(this.filePath) @@ -114,6 +117,7 @@ export abstract class AbstractPool< this.chooseWorkerNode = this.chooseWorkerNode.bind(this) this.executeTask = this.executeTask.bind(this) this.enqueueTask = this.enqueueTask.bind(this) + this.dequeueTask = this.dequeueTask.bind(this) this.checkAndEmitEvents = this.checkAndEmitEvents.bind(this) if (this.opts.enableEvents === true) { @@ -171,13 +175,21 @@ export abstract class AbstractPool< protected checkDynamicPoolSize (min: number, max: number): void { if (this.type === PoolTypes.dynamic) { - if (min > max) { + if (max == null) { + throw new Error( + 'Cannot instantiate a dynamic pool without specifying the maximum pool size' + ) + } else if (!Number.isSafeInteger(max)) { + throw new TypeError( + 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size' + ) + } else if (min > max) { throw new RangeError( 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size' ) } else if (max === 0) { throw new RangeError( - 'Cannot instantiate a dynamic pool with a pool size equal to zero' + 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero' ) } else if (min === max) { throw new RangeError( @@ -284,7 +296,7 @@ export abstract class AbstractPool< 0 ) < this.numberOfWorkers ) { - this.createAndSetupWorker() + this.createAndSetupWorkerNode() } } @@ -325,16 +337,20 @@ export abstract class AbstractPool< accumulator + workerNode.usage.tasks.executing, 0 ), - queuedTasks: this.workerNodes.reduce( - (accumulator, workerNode) => - accumulator + workerNode.usage.tasks.queued, - 0 - ), - maxQueuedTasks: this.workerNodes.reduce( - (accumulator, workerNode) => - accumulator + (workerNode.usage.tasks?.maxQueued ?? 0), - 0 - ), + ...(this.opts.enableTasksQueue === true && { + queuedTasks: this.workerNodes.reduce( + (accumulator, workerNode) => + accumulator + workerNode.usage.tasks.queued, + 0 + ) + }), + ...(this.opts.enableTasksQueue === true && { + maxQueuedTasks: this.workerNodes.reduce( + (accumulator, workerNode) => + accumulator + (workerNode.usage.tasks?.maxQueued ?? 0), + 0 + ) + }), failedTasks: this.workerNodes.reduce( (accumulator, workerNode) => accumulator + workerNode.usage.tasks.failed, @@ -346,14 +362,14 @@ export abstract class AbstractPool< minimum: round( Math.min( ...this.workerNodes.map( - workerNode => workerNode.usage.runTime?.minimum ?? Infinity + (workerNode) => workerNode.usage.runTime?.minimum ?? Infinity ) ) ), maximum: round( Math.max( ...this.workerNodes.map( - workerNode => workerNode.usage.runTime?.maximum ?? -Infinity + (workerNode) => workerNode.usage.runTime?.maximum ?? -Infinity ) ) ), @@ -374,7 +390,7 @@ export abstract class AbstractPool< median: round( median( this.workerNodes.map( - workerNode => workerNode.usage.runTime?.median ?? 0 + (workerNode) => workerNode.usage.runTime?.median ?? 0 ) ) ) @@ -387,14 +403,14 @@ export abstract class AbstractPool< minimum: round( Math.min( ...this.workerNodes.map( - workerNode => workerNode.usage.waitTime?.minimum ?? Infinity + (workerNode) => workerNode.usage.waitTime?.minimum ?? Infinity ) ) ), maximum: round( Math.max( ...this.workerNodes.map( - workerNode => workerNode.usage.waitTime?.maximum ?? -Infinity + (workerNode) => workerNode.usage.waitTime?.maximum ?? -Infinity ) ) ), @@ -415,7 +431,7 @@ export abstract class AbstractPool< median: round( median( this.workerNodes.map( - workerNode => workerNode.usage.waitTime?.median ?? 0 + (workerNode) => workerNode.usage.waitTime?.median ?? 0 ) ) ) @@ -425,6 +441,9 @@ export abstract class AbstractPool< } } + /** + * The pool readiness boolean status. + */ private get ready (): boolean { return ( this.workerNodes.reduce( @@ -438,7 +457,7 @@ export abstract class AbstractPool< } /** - * Gets the approximate pool utilization. + * The approximate pool utilization. * * @returns The pool utilization. */ @@ -459,38 +478,27 @@ export abstract class AbstractPool< } /** - * Pool type. + * The pool type. * * If it is `'dynamic'`, it provides the `max` property. */ protected abstract get type (): PoolType /** - * Gets the worker type. + * The worker type. */ protected abstract get worker (): WorkerType /** - * Pool minimum size. + * The pool minimum size. */ protected abstract get minSize (): number /** - * Pool maximum size. + * The pool maximum size. */ protected abstract get maxSize (): number - /** - * Get the worker given its id. - * - * @param workerId - The worker id. - * @returns The worker if found in the pool worker nodes, `undefined` otherwise. - */ - private getWorkerById (workerId: number): Worker | undefined { - return this.workerNodes.find(workerNode => workerNode.info.id === workerId) - ?.worker - } - /** * Checks if the worker id sent in the received message from a worker is valid. * @@ -500,7 +508,7 @@ export abstract class AbstractPool< private checkMessageWorkerId (message: MessageValue): void { if ( message.workerId != null && - this.getWorkerById(message.workerId) == null + this.getWorkerNodeKeyByWorkerId(message.workerId) === -1 ) { throw new Error( `Worker message received from unknown worker '${message.workerId}'` @@ -514,9 +522,21 @@ export abstract class AbstractPool< * @param worker - The worker. * @returns The worker node key if found in the pool worker nodes, `-1` otherwise. */ - protected getWorkerNodeKey (worker: Worker): number { + private getWorkerNodeKeyByWorker (worker: Worker): number { + return this.workerNodes.findIndex( + (workerNode) => workerNode.worker === worker + ) + } + + /** + * Gets the worker node key given its worker id. + * + * @param workerId - The worker id. + * @returns The worker node key if the worker id is found in the pool worker nodes, `-1` otherwise. + */ + private getWorkerNodeKeyByWorkerId (workerId: number): number { return this.workerNodes.findIndex( - workerNode => workerNode.worker === worker + (workerNode) => workerNode.info.id === workerId ) } @@ -533,9 +553,9 @@ export abstract class AbstractPool< if (workerChoiceStrategyOptions != null) { this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions) } - for (const workerNode of this.workerNodes) { + for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) { workerNode.resetUsage() - this.setWorkerStatistics(workerNode.worker) + this.sendWorkerStatisticsMessageToWorker(workerNodeKey) } } @@ -598,46 +618,68 @@ export abstract class AbstractPool< protected abstract get busy (): boolean /** - * Whether worker nodes are executing at least one task. + * Whether worker nodes are executing concurrently their tasks quota or not. * * @returns Worker nodes busyness boolean status. */ protected internalBusy (): boolean { - return ( - this.workerNodes.findIndex(workerNode => { - return workerNode.usage.tasks.executing === 0 - }) === -1 - ) + if (this.opts.enableTasksQueue === true) { + return ( + this.workerNodes.findIndex( + (workerNode) => + workerNode.info.ready && + workerNode.usage.tasks.executing < + (this.opts.tasksQueueOptions?.concurrency as number) + ) === -1 + ) + } else { + return ( + this.workerNodes.findIndex( + (workerNode) => + workerNode.info.ready && workerNode.usage.tasks.executing === 0 + ) === -1 + ) + } } /** @inheritDoc */ - public async execute (data?: Data, name?: string): Promise { + public async execute ( + data?: Data, + name?: string, + transferList?: TransferListItem[] + ): Promise { return await new Promise((resolve, reject) => { + if (name != null && typeof name !== 'string') { + reject(new TypeError('name argument must be a string')) + } + if (transferList != null && !Array.isArray(transferList)) { + reject(new TypeError('transferList argument must be an array')) + } const timestamp = performance.now() const workerNodeKey = this.chooseWorkerNode() const task: Task = { name: name ?? DEFAULT_TASK_NAME, // eslint-disable-next-line @typescript-eslint/consistent-type-assertions data: data ?? ({} as Data), + transferList, timestamp, workerId: this.getWorkerInfo(workerNodeKey).id as number, - id: randomUUID() + taskId: randomUUID() } - this.promiseResponseMap.set(task.id as string, { + this.promiseResponseMap.set(task.taskId as string, { resolve, reject, workerNodeKey }) if ( - this.opts.enableTasksQueue === true && - (this.busy || - this.workerNodes[workerNodeKey].usage.tasks.executing >= - ((this.opts.tasksQueueOptions as TasksQueueOptions) - .concurrency as number)) + this.opts.enableTasksQueue === false || + (this.opts.enableTasksQueue === true && + this.workerNodes[workerNodeKey].usage.tasks.executing < + (this.opts.tasksQueueOptions?.concurrency as number)) ) { - this.enqueueTask(workerNodeKey, task) - } else { this.executeTask(workerNodeKey, task) + } else { + this.enqueueTask(workerNodeKey, task) } this.checkAndEmitEvents() }) @@ -646,26 +688,35 @@ export abstract class AbstractPool< /** @inheritDoc */ public async destroy (): Promise { await Promise.all( - this.workerNodes.map(async (workerNode, workerNodeKey) => { - this.flushTasksQueue(workerNodeKey) - // FIXME: wait for tasks to be finished - const workerExitPromise = new Promise(resolve => { - workerNode.worker.on('exit', () => { - resolve() - }) - }) - await this.destroyWorker(workerNode.worker) - await workerExitPromise + this.workerNodes.map(async (_, workerNodeKey) => { + await this.destroyWorkerNode(workerNodeKey) }) ) } + protected async sendKillMessageToWorker ( + workerNodeKey: number, + workerId: number + ): Promise { + const waitForKillResponse = new Promise((resolve, reject) => { + this.registerWorkerMessageListener(workerNodeKey, (message) => { + if (message.kill === 'success') { + resolve() + } else if (message.kill === 'failure') { + reject(new Error(`Worker ${workerId} kill message handling failed`)) + } + }) + }) + this.sendToWorker(workerNodeKey, { kill: true, workerId }) + await waitForKillResponse + } + /** - * Terminates the given worker. + * Terminates the worker node given its worker node key. * - * @param worker - A worker within `workerNodes`. + * @param workerNodeKey - The worker node key. */ - protected abstract destroyWorker (worker: Worker): void | Promise + protected abstract destroyWorkerNode (workerNodeKey: number): Promise /** * Setup hook to execute code before worker nodes are created in the abstract constructor. @@ -802,15 +853,15 @@ export abstract class AbstractPool< * * The default worker choice strategy uses a round robin algorithm to distribute the tasks. * - * @returns The worker node key + * @returns The chosen worker node key */ private chooseWorkerNode (): number { if (this.shallCreateDynamicWorker()) { - const worker = this.createAndSetupDynamicWorker() + const workerNodeKey = this.createAndSetupDynamicWorkerNode() if ( this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker ) { - return this.getWorkerNodeKey(worker) + return workerNodeKey } } return this.workerChoiceStrategyContext.execute() @@ -826,14 +877,16 @@ export abstract class AbstractPool< } /** - * Sends a message to the given worker. + * Sends a message to worker given its worker node key. * - * @param worker - The worker which should receive the message. + * @param workerNodeKey - The worker node key. * @param message - The message. + * @param transferList - The optional array of transferable objects. */ protected abstract sendToWorker ( - worker: Worker, - message: MessageValue + workerNodeKey: number, + message: MessageValue, + transferList?: TransferListItem[] ): void /** @@ -844,26 +897,26 @@ export abstract class AbstractPool< protected abstract createWorker (): Worker /** - * Creates a new worker and sets it up completely in the pool worker nodes. + * Creates a new, completely set up worker node. * - * @returns New, completely set up worker. + * @returns New, completely set up worker node key. */ - protected createAndSetupWorker (): Worker { + protected createAndSetupWorkerNode (): number { const worker = this.createWorker() worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION) worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION) - worker.on('error', error => { - const workerNodeKey = this.getWorkerNodeKey(worker) + worker.on('error', (error) => { + const workerNodeKey = this.getWorkerNodeKeyByWorker(worker) const workerInfo = this.getWorkerInfo(workerNodeKey) workerInfo.ready = false this.workerNodes[workerNodeKey].closeChannel() this.emitter?.emit(PoolEvents.error, error) if (this.opts.restartWorkerOnError === true && !this.starting) { if (workerInfo.dynamic) { - this.createAndSetupDynamicWorker() + this.createAndSetupDynamicWorkerNode() } else { - this.createAndSetupWorker() + this.createAndSetupWorkerNode() } } if (this.opts.enableTasksQueue === true) { @@ -876,83 +929,110 @@ export abstract class AbstractPool< this.removeWorkerNode(worker) }) - this.addWorkerNode(worker) + const workerNodeKey = this.addWorkerNode(worker) - this.afterWorkerSetup(worker) + this.afterWorkerNodeSetup(workerNodeKey) - return worker + return workerNodeKey } /** - * Creates a new dynamic worker and sets it up completely in the pool worker nodes. + * Creates a new, completely set up dynamic worker node. * - * @returns New, completely set up dynamic worker. + * @returns New, completely set up dynamic worker node key. */ - protected createAndSetupDynamicWorker (): Worker { - const worker = this.createAndSetupWorker() - this.registerWorkerMessageListener(worker, message => { - const workerNodeKey = this.getWorkerNodeKey(worker) + protected createAndSetupDynamicWorkerNode (): number { + const workerNodeKey = this.createAndSetupWorkerNode() + this.registerWorkerMessageListener(workerNodeKey, (message) => { + const localWorkerNodeKey = this.getWorkerNodeKeyByWorkerId( + message.workerId + ) + const workerUsage = this.workerNodes[localWorkerNodeKey].usage + // Kill message received from worker if ( isKillBehavior(KillBehaviors.HARD, message.kill) || - (message.kill != null && + (isKillBehavior(KillBehaviors.SOFT, message.kill) && ((this.opts.enableTasksQueue === false && - this.workerNodes[workerNodeKey].usage.tasks.executing === 0) || + workerUsage.tasks.executing === 0) || (this.opts.enableTasksQueue === true && - this.workerNodes[workerNodeKey].usage.tasks.executing === 0 && - this.tasksQueueSize(workerNodeKey) === 0))) + workerUsage.tasks.executing === 0 && + this.tasksQueueSize(localWorkerNodeKey) === 0))) ) { - // Kill message received from the worker: no new tasks are submitted to that worker for a while ( > maxInactiveTime) - void (this.destroyWorker(worker) as Promise) + this.destroyWorkerNode(localWorkerNodeKey).catch((error) => { + this.emitter?.emit(PoolEvents.error, error) + }) } }) - const workerInfo = this.getWorkerInfoByWorker(worker) + const workerInfo = this.getWorkerInfo(workerNodeKey) + this.sendToWorker(workerNodeKey, { + checkActive: true, + workerId: workerInfo.id as number + }) workerInfo.dynamic = true if (this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker) { workerInfo.ready = true } - this.sendToWorker(worker, { - checkActive: true, - workerId: workerInfo.id as number - }) - return worker + return workerNodeKey } /** - * Registers a listener callback on the given worker. + * Registers a listener callback on the worker given its worker node key. * - * @param worker - The worker which should register a listener. + * @param workerNodeKey - The worker node key. * @param listener - The message listener callback. */ protected abstract registerWorkerMessageListener< Message extends Data | Response - >(worker: Worker, listener: (message: MessageValue) => void): void + >( + workerNodeKey: number, + listener: (message: MessageValue) => void + ): void /** - * Function that can be hooked up when a worker has been newly created and moved to the pool worker nodes. + * Method hooked up after a worker node has been newly created. * Can be overridden. * - * @param worker - The newly created worker. + * @param workerNodeKey - The newly created worker node key. */ - protected afterWorkerSetup (worker: Worker): void { + protected afterWorkerNodeSetup (workerNodeKey: number): void { // Listen to worker messages. - this.registerWorkerMessageListener(worker, this.workerListener()) + this.registerWorkerMessageListener(workerNodeKey, this.workerListener()) // Send the startup message to worker. - this.sendStartupMessageToWorker(worker) - // Setup worker task statistics computation. - this.setWorkerStatistics(worker) + this.sendStartupMessageToWorker(workerNodeKey) + // Send the worker statistics message to worker. + this.sendWorkerStatisticsMessageToWorker(workerNodeKey) } /** - * Sends the startup message to the given worker. + * Sends the startup message to worker given its worker node key. * - * @param worker - The worker which should receive the startup message. + * @param workerNodeKey - The worker node key. */ - protected abstract sendStartupMessageToWorker (worker: Worker): void + protected abstract sendStartupMessageToWorker (workerNodeKey: number): void + + /** + * Sends the worker statistics message to worker given its worker node key. + * + * @param workerNodeKey - The worker node key. + */ + private sendWorkerStatisticsMessageToWorker (workerNodeKey: number): void { + this.sendToWorker(workerNodeKey, { + statistics: { + runTime: + this.workerChoiceStrategyContext.getTaskStatisticsRequirements() + .runTime.aggregate, + elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements() + .elu.aggregate + }, + workerId: this.getWorkerInfo(workerNodeKey).id as number + }) + } private redistributeQueuedTasks (workerNodeKey: number): void { while (this.tasksQueueSize(workerNodeKey) > 0) { let targetWorkerNodeKey: number = workerNodeKey let minQueuedTasks = Infinity + let executeTask = false for (const [workerNodeId, workerNode] of this.workerNodes.entries()) { const workerInfo = this.getWorkerInfo(workerNodeId) if ( @@ -960,6 +1040,12 @@ export abstract class AbstractPool< workerInfo.ready && workerNode.usage.tasks.queued === 0 ) { + if ( + this.workerNodes[workerNodeId].usage.tasks.executing < + (this.opts.tasksQueueOptions?.concurrency as number) + ) { + executeTask = true + } targetWorkerNodeKey = workerNodeId break } @@ -972,34 +1058,41 @@ export abstract class AbstractPool< targetWorkerNodeKey = workerNodeId } } - this.enqueueTask( - targetWorkerNodeKey, - this.dequeueTask(workerNodeKey) as Task - ) + if (executeTask) { + this.executeTask( + targetWorkerNodeKey, + this.dequeueTask(workerNodeKey) as Task + ) + } else { + this.enqueueTask( + targetWorkerNodeKey, + this.dequeueTask(workerNodeKey) as Task + ) + } } } /** - * This function is the listener registered for each worker message. + * This method is the listener registered for each worker message. * * @returns The listener function to execute when a message is received from a worker. */ protected workerListener (): (message: MessageValue) => void { - return message => { + return (message) => { this.checkMessageWorkerId(message) if (message.ready != null) { - // Worker ready response received + // Worker ready response received from worker this.handleWorkerReadyResponse(message) - } else if (message.id != null) { - // Task execution response received + } else if (message.taskId != null) { + // Task execution response received from worker this.handleTaskExecutionResponse(message) } } } private handleWorkerReadyResponse (message: MessageValue): void { - this.getWorkerInfoByWorker( - this.getWorkerById(message.workerId) as Worker + this.getWorkerInfo( + this.getWorkerNodeKeyByWorkerId(message.workerId) ).ready = message.ready as boolean if (this.emitter != null && this.ready) { this.emitter.emit(PoolEvents.ready, this.info) @@ -1007,7 +1100,9 @@ export abstract class AbstractPool< } private handleTaskExecutionResponse (message: MessageValue): void { - const promiseResponse = this.promiseResponseMap.get(message.id as string) + const promiseResponse = this.promiseResponseMap.get( + message.taskId as string + ) if (promiseResponse != null) { if (message.taskError != null) { this.emitter?.emit(PoolEvents.taskError, message.taskError) @@ -1017,10 +1112,12 @@ export abstract class AbstractPool< } const workerNodeKey = promiseResponse.workerNodeKey this.afterTaskExecutionHook(workerNodeKey, message) - this.promiseResponseMap.delete(message.id as string) + this.promiseResponseMap.delete(message.taskId as string) if ( this.opts.enableTasksQueue === true && - this.tasksQueueSize(workerNodeKey) > 0 + this.tasksQueueSize(workerNodeKey) > 0 && + this.workerNodes[workerNodeKey].usage.tasks.executing < + (this.opts.tasksQueueOptions?.concurrency as number) ) { this.executeTask( workerNodeKey, @@ -1043,27 +1140,12 @@ export abstract class AbstractPool< } /** - * Gets the worker information from the given worker node key. + * Gets the worker information given its worker node key. * * @param workerNodeKey - The worker node key. * @returns The worker information. */ - private getWorkerInfo (workerNodeKey: number): WorkerInfo { - return this.workerNodes[workerNodeKey].info - } - - /** - * Gets the worker information from the given worker. - * - * @param worker - The worker. - * @returns The worker information. - * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker is not found. - */ - protected getWorkerInfoByWorker (worker: Worker): WorkerInfo { - const workerNodeKey = this.getWorkerNodeKey(worker) - if (workerNodeKey === -1) { - throw new Error('Worker not found') - } + protected getWorkerInfo (workerNodeKey: number): WorkerInfo { return this.workerNodes[workerNodeKey].info } @@ -1071,7 +1153,8 @@ export abstract class AbstractPool< * Adds the given worker in the pool worker nodes. * * @param worker - The worker. - * @returns The worker nodes length. + * @returns The added worker node key. + * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found. */ private addWorkerNode (worker: Worker): number { const workerNode = new WorkerNode(worker, this.worker) @@ -1079,7 +1162,12 @@ export abstract class AbstractPool< if (this.starting) { workerNode.info.ready = true } - return this.workerNodes.push(workerNode) + this.workerNodes.push(workerNode) + const workerNodeKey = this.getWorkerNodeKeyByWorker(worker) + if (workerNodeKey === -1) { + throw new Error('Worker node not found') + } + return workerNodeKey } /** @@ -1088,7 +1176,7 @@ export abstract class AbstractPool< * @param worker - The worker. */ private removeWorkerNode (worker: Worker): void { - const workerNodeKey = this.getWorkerNodeKey(worker) + const workerNodeKey = this.getWorkerNodeKeyByWorker(worker) if (workerNodeKey !== -1) { this.workerNodes.splice(workerNodeKey, 1) this.workerChoiceStrategyContext.remove(workerNodeKey) @@ -1096,14 +1184,14 @@ export abstract class AbstractPool< } /** - * Executes the given task on the given worker. + * Executes the given task on the worker given its worker node key. * - * @param worker - The worker. + * @param workerNodeKey - The worker node key. * @param task - The task to execute. */ private executeTask (workerNodeKey: number, task: Task): void { this.beforeTaskExecutionHook(workerNodeKey, task) - this.sendToWorker(this.workerNodes[workerNodeKey].worker, task) + this.sendToWorker(workerNodeKey, task, task.transferList) } private enqueueTask (workerNodeKey: number, task: Task): number { @@ -1118,7 +1206,7 @@ export abstract class AbstractPool< return this.workerNodes[workerNodeKey].tasksQueueSize() } - private flushTasksQueue (workerNodeKey: number): void { + protected flushTasksQueue (workerNodeKey: number): void { while (this.tasksQueueSize(workerNodeKey) > 0) { this.executeTask( workerNodeKey, @@ -1133,17 +1221,4 @@ export abstract class AbstractPool< this.flushTasksQueue(workerNodeKey) } } - - private setWorkerStatistics (worker: Worker): void { - this.sendToWorker(worker, { - statistics: { - runTime: - this.workerChoiceStrategyContext.getTaskStatisticsRequirements() - .runTime.aggregate, - elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements() - .elu.aggregate - }, - workerId: this.getWorkerInfoByWorker(worker).id as number - }) - } }