From: Jérôme Benoit Date: Sat, 1 Jul 2023 22:05:41 +0000 (+0200) Subject: Merge branch 'master' into worker-info X-Git-Tag: v2.6.7~16^2~5 X-Git-Url: https://git.piment-noir.org/?a=commitdiff_plain;h=53b1b2fc0c935c958a4b6a36223dbdc44c24d926;hp=64383951fbf11fdf6a804fc6d081635fb925e403;p=poolifier.git Merge branch 'master' into worker-info --- diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index 0f18de3f..f931a466 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -20,7 +20,8 @@ import { type PoolType, PoolTypes, type TasksQueueOptions, - type WorkerType + type WorkerType, + WorkerTypes } from './pool' import type { IWorker, @@ -244,6 +245,14 @@ export abstract class AbstractPool< } } + private get starting (): boolean { + return this.workerNodes.some(workerNode => !workerNode.info.started) + } + + private get started (): boolean { + return this.workerNodes.some(workerNode => workerNode.info.started) + } + /** @inheritDoc */ public get info (): PoolInfo { return { @@ -344,6 +353,17 @@ export abstract class AbstractPool< */ protected abstract get maxSize (): number + /** + * Get the worker given its id. + * + * @param workerId - The worker id. + * @returns The worker if found in the pool worker nodes, `undefined` otherwise. + */ + private getWorkerById (workerId: number): Worker | undefined { + return this.workerNodes.find(workerNode => workerNode.info.id === workerId) + ?.worker + } + /** * Gets the given worker its worker node key. * @@ -750,7 +770,7 @@ export abstract class AbstractPool< if (this.emitter != null) { this.emitter.emit(PoolEvents.error, error) } - if (this.opts.restartWorkerOnError === true) { + if (this.opts.restartWorkerOnError === true && !this.starting) { this.createAndSetupWorker() } }) @@ -801,7 +821,16 @@ export abstract class AbstractPool< */ protected workerListener (): (message: MessageValue) => void { return message => { - if (message.id != null) { + if (message.workerId != null && message.started != null) { + // Worker started message received + const worker = this.getWorkerById(message.workerId) + if (worker != null) { + this.workerNodes[this.getWorkerNodeKey(worker)].info.started = + message.started + } else { + throw new Error('Worker started message received from unknown worker') + } + } else if (message.id != null) { // Task execution response received const promiseResponse = this.promiseResponseMap.get(message.id) if (promiseResponse != null) { @@ -864,6 +893,7 @@ export abstract class AbstractPool< private pushWorkerNode (worker: Worker): number { this.workerNodes.push({ worker, + info: { id: this.getWorkerId(worker), started: false }, usage: this.getWorkerUsage(), tasksQueue: new Queue>() }) @@ -875,22 +905,39 @@ export abstract class AbstractPool< return this.workerNodes.length } + /** + * Gets the worker id. + * + * @param worker - The worker. + * @returns The worker id. + */ + private getWorkerId (worker: Worker): number | undefined { + if (this.worker === WorkerTypes.thread) { + return worker.threadId + } else if (this.worker === WorkerTypes.cluster) { + return worker.id + } + } + // /** // * Sets the given worker in the pool worker nodes. // * // * @param workerNodeKey - The worker node key. // * @param worker - The worker. + // * @param workerInfo - The worker info. // * @param workerUsage - The worker usage. // * @param tasksQueue - The worker task queue. // */ // private setWorkerNode ( // workerNodeKey: number, // worker: Worker, + // workerInfo: WorkerInfo, // workerUsage: WorkerUsage, // tasksQueue: Queue> // ): void { // this.workerNodes[workerNodeKey] = { // worker, + // info: workerInfo, // usage: workerUsage, // tasksQueue // } diff --git a/src/pools/worker.ts b/src/pools/worker.ts index dc283e0e..664b6a76 100644 --- a/src/pools/worker.ts +++ b/src/pools/worker.ts @@ -118,6 +118,22 @@ export interface TaskStatistics { failed: number } +/** + * Worker information. + * + * @internal + */ +export interface WorkerInfo { + /** + * Worker id. + */ + id: number | undefined + /** + * Started flag. + */ + started: boolean +} + /** * Worker usage statistics. * @@ -146,6 +162,11 @@ export interface WorkerUsage { * Worker interface. */ export interface IWorker { + /** + * Worker id. + */ + readonly id?: number + readonly threadId?: number /** * Registers an event listener. * @@ -177,6 +198,10 @@ export interface WorkerNode { * Worker node worker. */ readonly worker: Worker + /** + * Worker node worker info. + */ + info: WorkerInfo /** * Worker node worker usage statistics. */ diff --git a/src/utility-types.ts b/src/utility-types.ts index 1eb8df50..0443b4ce 100644 --- a/src/utility-types.ts +++ b/src/utility-types.ts @@ -53,6 +53,10 @@ export interface WorkerStatistics { */ export interface MessageValue extends Task { + /** + * Worker id. + */ + readonly workerId?: number /** * Kill code. */ @@ -69,6 +73,10 @@ export interface MessageValue * Whether the worker computes the given statistics or not. */ readonly statistics?: WorkerStatistics + /** + * Whether the worker has started or not. + */ + readonly started?: boolean } /** diff --git a/src/worker/abstract-worker.ts b/src/worker/abstract-worker.ts index 2a027269..6e17c9ed 100644 --- a/src/worker/abstract-worker.ts +++ b/src/worker/abstract-worker.ts @@ -36,6 +36,10 @@ export abstract class AbstractWorker< Data = unknown, Response = unknown > extends AsyncResource { + /** + * Worker id. + */ + protected abstract id: number /** * Task function(s) processed by the worker when the pool's `execution` function is invoked. */ @@ -225,6 +229,7 @@ export abstract class AbstractWorker< this.sendToMainWorker({ data: res, taskPerformance, + workerId: this.id, id: message.id }) } catch (e) { @@ -234,6 +239,7 @@ export abstract class AbstractWorker< message: err, data: message.data }, + workerId: this.id, id: message.id }) } finally { @@ -258,6 +264,7 @@ export abstract class AbstractWorker< this.sendToMainWorker({ data: res, taskPerformance, + workerId: this.id, id: message.id }) return null @@ -269,6 +276,7 @@ export abstract class AbstractWorker< message: err, data: message.data }, + workerId: this.id, id: message.id }) }) diff --git a/src/worker/cluster-worker.ts b/src/worker/cluster-worker.ts index 13735b1d..16dddd29 100644 --- a/src/worker/cluster-worker.ts +++ b/src/worker/cluster-worker.ts @@ -41,10 +41,19 @@ export class ClusterWorker< cluster.worker as Worker, opts ) + if (!this.isMain) { + this.sendToMainWorker({ workerId: this.id, started: true }) + } + } + + /** @inheritDoc */ + protected get id (): number { + return this.getMainWorker().id } /** @inheritDoc */ protected sendToMainWorker (message: MessageValue): void { + console.log('sending message to main worker(cluster)', message) this.getMainWorker().send(message) } diff --git a/src/worker/thread-worker.ts b/src/worker/thread-worker.ts index b6573a97..7a766a95 100644 --- a/src/worker/thread-worker.ts +++ b/src/worker/thread-worker.ts @@ -1,4 +1,9 @@ -import { type MessagePort, isMainThread, parentPort } from 'node:worker_threads' +import { + type MessagePort, + isMainThread, + parentPort, + threadId +} from 'node:worker_threads' import type { MessageValue } from '../utility-types' import { AbstractWorker } from './abstract-worker' import type { WorkerOptions } from './worker-options' @@ -41,10 +46,18 @@ export class ThreadWorker< parentPort as MessagePort, opts ) + if (!this.isMain) { + this.sendToMainWorker({ workerId: this.id, started: true }) + } + } + + protected get id (): number { + return threadId } /** @inheritDoc */ protected sendToMainWorker (message: MessageValue): void { + console.log('sending message to main worker(thread)', message) this.getMainWorker().postMessage(message) } } diff --git a/tests/pools/abstract/abstract-pool.test.js b/tests/pools/abstract/abstract-pool.test.js index 023b56b4..7c42d34a 100644 --- a/tests/pools/abstract/abstract-pool.test.js +++ b/tests/pools/abstract/abstract-pool.test.js @@ -407,6 +407,9 @@ describe('Abstract pool test suite', () => { maxQueuedTasks: 0, failedTasks: 0 }) + for (const workerNode of pool.workerNodes) { + console.log('thread:workerNode.info', workerNode.info) + } await pool.destroy() pool = new DynamicClusterPool( numberOfWorkers, @@ -428,6 +431,9 @@ describe('Abstract pool test suite', () => { maxQueuedTasks: 0, failedTasks: 0 }) + for (const workerNode of pool.workerNodes) { + console.log('cluster:workerNode.info', workerNode.info) + } await pool.destroy() })