repositories
/
poolifier.git
/ blobdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
tree
raw
|
inline
| side by side
feat: introduce worker node queue back pressure detection
[poolifier.git]
/
src
/
pools
/
abstract-pool.ts
diff --git
a/src/pools/abstract-pool.ts
b/src/pools/abstract-pool.ts
index 25c0eba870ee62560e7d47c69cad7909fa4ccc4e..eb2c2f7818a687ac8f8016eb27489895cadfaaee 100644
(file)
--- a/
src/pools/abstract-pool.ts
+++ b/
src/pools/abstract-pool.ts
@@
-646,14
+646,15
@@
export abstract class AbstractPool<
/** @inheritDoc */
public listTaskFunctions (): string[] {
/** @inheritDoc */
public listTaskFunctions (): string[] {
- if (
- Array.isArray(this.getWorkerInfo(0).taskFunctions) &&
- (this.getWorkerInfo(0).taskFunctions as string[]).length > 0
- ) {
- return this.getWorkerInfo(0).taskFunctions as string[]
- } else {
- return []
+ for (const workerNode of this.workerNodes) {
+ if (
+ Array.isArray(workerNode.info.taskFunctions) &&
+ workerNode.info.taskFunctions.length > 0
+ ) {
+ return workerNode.info.taskFunctions
+ }
}
}
+ return []
}
/** @inheritDoc */
}
/** @inheritDoc */
@@
-678,12
+679,11
@@
export abstract class AbstractPool<
}
const timestamp = performance.now()
const workerNodeKey = this.chooseWorkerNode()
}
const timestamp = performance.now()
const workerNodeKey = this.chooseWorkerNode()
+ const workerInfo = this.getWorkerInfo(workerNodeKey)
if (
name != null &&
if (
name != null &&
- Array.isArray(this.getWorkerInfo(workerNodeKey).taskFunctions) &&
- !(this.getWorkerInfo(workerNodeKey).taskFunctions as string[]).includes(
- name
- )
+ Array.isArray(workerInfo.taskFunctions) &&
+ !workerInfo.taskFunctions.includes(name)
) {
reject(
new Error(`Task function '${name}' is not registered in the pool`)
) {
reject(
new Error(`Task function '${name}' is not registered in the pool`)
@@
-695,7
+695,7
@@
export abstract class AbstractPool<
data: data ?? ({} as Data),
transferList,
timestamp,
data: data ?? ({} as Data),
transferList,
timestamp,
- workerId:
this.getWorkerInfo(workerNodeKey)
.id as number,
+ workerId:
workerInfo
.id as number,
taskId: randomUUID()
}
this.promiseResponseMap.set(task.taskId as string, {
taskId: randomUUID()
}
this.promiseResponseMap.set(task.taskId as string, {
@@
-816,9
+816,10
@@
export abstract class AbstractPool<
}
private canUpdateTaskWorkerUsage (workerNodeKey: number): boolean {
}
private canUpdateTaskWorkerUsage (workerNodeKey: number): boolean {
+ const workerInfo = this.getWorkerInfo(workerNodeKey)
return (
return (
- Array.isArray(
this.getWorkerInfo(workerNodeKey)
.taskFunctions) &&
-
(this.getWorkerInfo(workerNodeKey).taskFunctions as string[])
.length > 1
+ Array.isArray(
workerInfo
.taskFunctions) &&
+
workerInfo.taskFunctions
.length > 1
)
}
)
}
@@
-1125,7
+1126,7
@@
export abstract class AbstractPool<
protected workerListener (): (message: MessageValue<Response>) => void {
return (message) => {
this.checkMessageWorkerId(message)
protected workerListener (): (message: MessageValue<Response>) => void {
return (message) => {
this.checkMessageWorkerId(message)
- if (message.ready != null) {
+ if (message.ready != null
&& message.taskFunctions != null
) {
// Worker ready response received from worker
this.handleWorkerReadyResponse(message)
} else if (message.taskId != null) {
// Worker ready response received from worker
this.handleWorkerReadyResponse(message)
} else if (message.taskId != null) {
@@
-1144,28
+1145,29
@@
export abstract class AbstractPool<
if (message.ready === false) {
throw new Error(`Worker ${message.workerId} failed to initialize`)
}
if (message.ready === false) {
throw new Error(`Worker ${message.workerId} failed to initialize`)
}
- this.getWorkerInfo(
+
const workerInfo =
this.getWorkerInfo(
this.getWorkerNodeKeyByWorkerId(message.workerId)
this.getWorkerNodeKeyByWorkerId(message.workerId)
- ).ready = message.ready as boolean
+ )
+ workerInfo.ready = message.ready as boolean
+ workerInfo.taskFunctions = message.taskFunctions
if (this.emitter != null && this.ready) {
this.emitter.emit(PoolEvents.ready, this.info)
}
}
private handleTaskExecutionResponse (message: MessageValue<Response>): void {
if (this.emitter != null && this.ready) {
this.emitter.emit(PoolEvents.ready, this.info)
}
}
private handleTaskExecutionResponse (message: MessageValue<Response>): void {
- const promiseResponse = this.promiseResponseMap.get(
- message.taskId as string
- )
+ const { taskId, taskError, data } = message
+ const promiseResponse = this.promiseResponseMap.get(taskId as string)
if (promiseResponse != null) {
if (promiseResponse != null) {
- if (
message.
taskError != null) {
- this.emitter?.emit(PoolEvents.taskError,
message.
taskError)
- promiseResponse.reject(
message.
taskError.message)
+ if (taskError != null) {
+ this.emitter?.emit(PoolEvents.taskError, taskError)
+ promiseResponse.reject(taskError.message)
} else {
} else {
- promiseResponse.resolve(
message.
data as Response)
+ promiseResponse.resolve(data as Response)
}
const workerNodeKey = promiseResponse.workerNodeKey
this.afterTaskExecutionHook(workerNodeKey, message)
}
const workerNodeKey = promiseResponse.workerNodeKey
this.afterTaskExecutionHook(workerNodeKey, message)
- this.promiseResponseMap.delete(
message.
taskId as string)
+ this.promiseResponseMap.delete(taskId as string)
if (
this.opts.enableTasksQueue === true &&
this.tasksQueueSize(workerNodeKey) > 0 &&
if (
this.opts.enableTasksQueue === true &&
this.tasksQueueSize(workerNodeKey) > 0 &&
@@
-1210,7
+1212,11
@@
export abstract class AbstractPool<
* @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
*/
private addWorkerNode (worker: Worker): number {
* @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
*/
private addWorkerNode (worker: Worker): number {
- const workerNode = new WorkerNode<Worker, Data>(worker, this.worker)
+ const workerNode = new WorkerNode<Worker, Data>(
+ worker,
+ this.worker,
+ this.maxSize
+ )
// Flag the worker node as ready at pool startup.
if (this.starting) {
workerNode.info.ready = true
// Flag the worker node as ready at pool startup.
if (this.starting) {
workerNode.info.ready = true
@@
-1218,7
+1224,7
@@
export abstract class AbstractPool<
this.workerNodes.push(workerNode)
const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
if (workerNodeKey === -1) {
this.workerNodes.push(workerNode)
const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
if (workerNodeKey === -1) {
- throw new Error('Worker node not found')
+ throw new Error('Worker node
added
not found')
}
return workerNodeKey
}
}
return workerNodeKey
}
@@
-1248,6
+1254,15
@@
export abstract class AbstractPool<
}
private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
}
private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
+ if (
+ this.opts.enableTasksQueue === true &&
+ this.workerNodes[workerNodeKey].hasBackPressure()
+ ) {
+ this.emitter?.emit(PoolEvents.backPressure, {
+ workerId: this.getWorkerInfo(workerNodeKey).id,
+ ...this.info
+ })
+ }
return this.workerNodes[workerNodeKey].enqueueTask(task)
}
return this.workerNodes[workerNodeKey].enqueueTask(task)
}