X-Git-Url: https://git.piment-noir.org/?a=blobdiff_plain;f=src%2Fpools%2Fabstract-pool.ts;h=b3ebc19483d95e7c5f9863870e85823f976de157;hb=f4ff1ce25e8ec840112e33e306e492b063738e6d;hp=1d4e7ae0d07f18ede013a5fefda5f7430302584c;hpb=ffcbbad84f63b8a77f2b1a08f82deef5430f646e;p=poolifier.git diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index 1d4e7ae0..b3ebc194 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -1,3 +1,4 @@ +import crypto from 'node:crypto' import type { MessageValue, PromiseWorkerResponseWrapper @@ -50,14 +51,9 @@ export abstract class AbstractPool< * When we receive a message from the worker we get a map entry and resolve/reject the promise based on the message. */ protected promiseMap: Map< - number, + string, PromiseWorkerResponseWrapper - > = new Map>() - - /** - * Id of the next message. - */ - protected nextMessageId: number = 0 + > = new Map>() /** * Worker choice strategy instance implementing the worker choice algorithm. @@ -158,7 +154,7 @@ export abstract class AbstractPool< } /** - * Gets worker key. + * Gets the given worker key. * * @param worker - The worker. * @returns The worker key. @@ -169,14 +165,17 @@ export abstract class AbstractPool< /** {@inheritDoc} */ public getWorkerRunningTasks (worker: Worker): number | undefined { - return this.workers.get(this.getWorkerKey(worker) as number)?.tasksUsage - ?.running + return this.getWorkerTasksUsage(worker)?.running + } + + /** {@inheritDoc} */ + public getWorkerRunTasks (worker: Worker): number | undefined { + return this.getWorkerTasksUsage(worker)?.run } /** {@inheritDoc} */ public getWorkerAverageTasksRunTime (worker: Worker): number | undefined { - return this.workers.get(this.getWorkerKey(worker) as number)?.tasksUsage - ?.avgRunTime + return this.getWorkerTasksUsage(worker)?.avgRunTime } /** {@inheritDoc} */ @@ -220,16 +219,15 @@ export abstract class AbstractPool< /** {@inheritDoc} */ public async execute (data: Data): Promise { - // Configure worker to handle message with the specified task const worker = this.chooseWorker() - const res = this.internalExecute(worker, this.nextMessageId) + const messageId = crypto.randomUUID() + const res = this.internalExecute(worker, messageId) this.checkAndEmitBusy() this.sendToWorker(worker, { // eslint-disable-next-line @typescript-eslint/consistent-type-assertions data: data ?? ({} as Data), - id: this.nextMessageId + id: messageId }) - ++this.nextMessageId // eslint-disable-next-line @typescript-eslint/return-await return res } @@ -398,7 +396,7 @@ export abstract class AbstractPool< private async internalExecute ( worker: Worker, - messageId: number + messageId: string ): Promise { this.beforePromiseWorkerResponseHook(worker) return await new Promise((resolve, reject) => { @@ -431,7 +429,7 @@ export abstract class AbstractPool< } /** - * Get tasks usage of the given worker. + * Gets tasks usage of the given worker. * * @param worker - Worker which tasks usage is returned. */