repositories
/
poolifier.git
/ blobdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
tree
raw
|
inline
| side by side
perf: take into account the number of run tasks in LRU worker choice
[poolifier.git]
/
src
/
pools
/
abstract-pool.ts
diff --git
a/src/pools/abstract-pool.ts
b/src/pools/abstract-pool.ts
index 1d4e7ae0d07f18ede013a5fefda5f7430302584c..b3ebc19483d95e7c5f9863870e85823f976de157 100644
(file)
--- a/
src/pools/abstract-pool.ts
+++ b/
src/pools/abstract-pool.ts
@@
-1,3
+1,4
@@
+import crypto from 'node:crypto'
import type {
MessageValue,
PromiseWorkerResponseWrapper
import type {
MessageValue,
PromiseWorkerResponseWrapper
@@
-50,14
+51,9
@@
export abstract class AbstractPool<
* When we receive a message from the worker we get a map entry and resolve/reject the promise based on the message.
*/
protected promiseMap: Map<
* When we receive a message from the worker we get a map entry and resolve/reject the promise based on the message.
*/
protected promiseMap: Map<
-
number
,
+
string
,
PromiseWorkerResponseWrapper<Worker, Response>
PromiseWorkerResponseWrapper<Worker, Response>
- > = new Map<number, PromiseWorkerResponseWrapper<Worker, Response>>()
-
- /**
- * Id of the next message.
- */
- protected nextMessageId: number = 0
+ > = new Map<string, PromiseWorkerResponseWrapper<Worker, Response>>()
/**
* Worker choice strategy instance implementing the worker choice algorithm.
/**
* Worker choice strategy instance implementing the worker choice algorithm.
@@
-158,7
+154,7
@@
export abstract class AbstractPool<
}
/**
}
/**
- * Gets worker key.
+ * Gets
the given
worker key.
*
* @param worker - The worker.
* @returns The worker key.
*
* @param worker - The worker.
* @returns The worker key.
@@
-169,14
+165,17
@@
export abstract class AbstractPool<
/** {@inheritDoc} */
public getWorkerRunningTasks (worker: Worker): number | undefined {
/** {@inheritDoc} */
public getWorkerRunningTasks (worker: Worker): number | undefined {
- return this.workers.get(this.getWorkerKey(worker) as number)?.tasksUsage
- ?.running
+ return this.getWorkerTasksUsage(worker)?.running
+ }
+
+ /** {@inheritDoc} */
+ public getWorkerRunTasks (worker: Worker): number | undefined {
+ return this.getWorkerTasksUsage(worker)?.run
}
/** {@inheritDoc} */
public getWorkerAverageTasksRunTime (worker: Worker): number | undefined {
}
/** {@inheritDoc} */
public getWorkerAverageTasksRunTime (worker: Worker): number | undefined {
- return this.workers.get(this.getWorkerKey(worker) as number)?.tasksUsage
- ?.avgRunTime
+ return this.getWorkerTasksUsage(worker)?.avgRunTime
}
/** {@inheritDoc} */
}
/** {@inheritDoc} */
@@
-220,16
+219,15
@@
export abstract class AbstractPool<
/** {@inheritDoc} */
public async execute (data: Data): Promise<Response> {
/** {@inheritDoc} */
public async execute (data: Data): Promise<Response> {
- // Configure worker to handle message with the specified task
const worker = this.chooseWorker()
const worker = this.chooseWorker()
- const res = this.internalExecute(worker, this.nextMessageId)
+ const messageId = crypto.randomUUID()
+ const res = this.internalExecute(worker, messageId)
this.checkAndEmitBusy()
this.sendToWorker(worker, {
// eslint-disable-next-line @typescript-eslint/consistent-type-assertions
data: data ?? ({} as Data),
this.checkAndEmitBusy()
this.sendToWorker(worker, {
// eslint-disable-next-line @typescript-eslint/consistent-type-assertions
data: data ?? ({} as Data),
- id:
this.nextM
essageId
+ id:
m
essageId
})
})
- ++this.nextMessageId
// eslint-disable-next-line @typescript-eslint/return-await
return res
}
// eslint-disable-next-line @typescript-eslint/return-await
return res
}
@@
-398,7
+396,7
@@
export abstract class AbstractPool<
private async internalExecute (
worker: Worker,
private async internalExecute (
worker: Worker,
- messageId:
number
+ messageId:
string
): Promise<Response> {
this.beforePromiseWorkerResponseHook(worker)
return await new Promise<Response>((resolve, reject) => {
): Promise<Response> {
this.beforePromiseWorkerResponseHook(worker)
return await new Promise<Response>((resolve, reject) => {
@@
-431,7
+429,7
@@
export abstract class AbstractPool<
}
/**
}
/**
- * Get tasks usage of the given worker.
+ * Get
s
tasks usage of the given worker.
*
* @param worker - Worker which tasks usage is returned.
*/
*
* @param worker - Worker which tasks usage is returned.
*/