repositories
/
poolifier.git
/ blobdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
tree
raw
|
inline
| side by side
Merge dependabot/npm_and_yarn/examples/typescript/websocket-server-pool/ws-worker_thr...
[poolifier.git]
/
src
/
pools
/
worker-node.ts
diff --git
a/src/pools/worker-node.ts
b/src/pools/worker-node.ts
index ca275dedab806b9a4cc2c9c6aee5d3cf550b99f5..e9680261b591e26707c11c5ca868873f3ca5a6e4 100644
(file)
--- a/
src/pools/worker-node.ts
+++ b/
src/pools/worker-node.ts
@@
-46,6
+46,7
@@
implements IWorkerNode<Worker, Data> {
/** @inheritdoc */
public onEmptyQueue?: WorkerNodeEventCallback
private readonly tasksQueue: Deque<Task<Data>>
/** @inheritdoc */
public onEmptyQueue?: WorkerNodeEventCallback
private readonly tasksQueue: Deque<Task<Data>>
+ private onBackPressureStarted: boolean
private onEmptyQueueCount: number
private readonly taskFunctionsUsage: Map<string, WorkerUsage>
private onEmptyQueueCount: number
private readonly taskFunctionsUsage: Map<string, WorkerUsage>
@@
-65,6
+66,7
@@
implements IWorkerNode<Worker, Data> {
}
this.tasksQueueBackPressureSize = tasksQueueBackPressureSize
this.tasksQueue = new Deque<Task<Data>>()
}
this.tasksQueueBackPressureSize = tasksQueueBackPressureSize
this.tasksQueue = new Deque<Task<Data>>()
+ this.onBackPressureStarted = false
this.onEmptyQueueCount = 0
this.taskFunctionsUsage = new Map<string, WorkerUsage>()
}
this.onEmptyQueueCount = 0
this.taskFunctionsUsage = new Map<string, WorkerUsage>()
}
@@
-77,8
+79,14
@@
implements IWorkerNode<Worker, Data> {
/** @inheritdoc */
public enqueueTask (task: Task<Data>): number {
const tasksQueueSize = this.tasksQueue.push(task)
/** @inheritdoc */
public enqueueTask (task: Task<Data>): number {
const tasksQueueSize = this.tasksQueue.push(task)
- if (this.onBackPressure != null && this.hasBackPressure()) {
+ if (
+ this.onBackPressure != null &&
+ this.hasBackPressure() &&
+ !this.onBackPressureStarted
+ ) {
+ this.onBackPressureStarted = true
this.onBackPressure(this.info.id as number)
this.onBackPressure(this.info.id as number)
+ this.onBackPressureStarted = false
}
return tasksQueueSize
}
}
return tasksQueueSize
}
@@
-86,8
+94,14
@@
implements IWorkerNode<Worker, Data> {
/** @inheritdoc */
public unshiftTask (task: Task<Data>): number {
const tasksQueueSize = this.tasksQueue.unshift(task)
/** @inheritdoc */
public unshiftTask (task: Task<Data>): number {
const tasksQueueSize = this.tasksQueue.unshift(task)
- if (this.onBackPressure != null && this.hasBackPressure()) {
+ if (
+ this.onBackPressure != null &&
+ this.hasBackPressure() &&
+ !this.onBackPressureStarted
+ ) {
+ this.onBackPressureStarted = true
this.onBackPressure(this.info.id as number)
this.onBackPressure(this.info.id as number)
+ this.onBackPressureStarted = false
}
return tasksQueueSize
}
}
return tasksQueueSize
}
@@
-95,7
+109,11
@@
implements IWorkerNode<Worker, Data> {
/** @inheritdoc */
public dequeueTask (): Task<Data> | undefined {
const task = this.tasksQueue.shift()
/** @inheritdoc */
public dequeueTask (): Task<Data> | undefined {
const task = this.tasksQueue.shift()
- if (this.onEmptyQueue != null && this.tasksQueue.size === 0) {
+ if (
+ this.onEmptyQueue != null &&
+ this.tasksQueue.size === 0 &&
+ this.onEmptyQueueCount === 0
+ ) {
this.startOnEmptyQueue().catch(EMPTY_FUNCTION)
}
return task
this.startOnEmptyQueue().catch(EMPTY_FUNCTION)
}
return task
@@
-104,7
+122,11
@@
implements IWorkerNode<Worker, Data> {
/** @inheritdoc */
public popTask (): Task<Data> | undefined {
const task = this.tasksQueue.pop()
/** @inheritdoc */
public popTask (): Task<Data> | undefined {
const task = this.tasksQueue.pop()
- if (this.onEmptyQueue != null && this.tasksQueue.size === 0) {
+ if (
+ this.onEmptyQueue != null &&
+ this.tasksQueue.size === 0 &&
+ this.onEmptyQueueCount === 0
+ ) {
this.startOnEmptyQueue().catch(EMPTY_FUNCTION)
}
return task
this.startOnEmptyQueue().catch(EMPTY_FUNCTION)
}
return task
@@
-139,21
+161,21
@@
implements IWorkerNode<Worker, Data> {
/** @inheritdoc */
public getTaskFunctionWorkerUsage (name: string): WorkerUsage | undefined {
/** @inheritdoc */
public getTaskFunctionWorkerUsage (name: string): WorkerUsage | undefined {
- if (!Array.isArray(this.info.taskFunctions)) {
+ if (!Array.isArray(this.info.taskFunction
Name
s)) {
throw new Error(
`Cannot get task function worker usage for task function name '${name}' when task function names list is not yet defined`
)
}
if (
throw new Error(
`Cannot get task function worker usage for task function name '${name}' when task function names list is not yet defined`
)
}
if (
- Array.isArray(this.info.taskFunctions) &&
- this.info.taskFunctions.length < 3
+ Array.isArray(this.info.taskFunction
Name
s) &&
+ this.info.taskFunction
Name
s.length < 3
) {
throw new Error(
`Cannot get task function worker usage for task function name '${name}' when task function names list has less than 3 elements`
)
}
if (name === DEFAULT_TASK_NAME) {
) {
throw new Error(
`Cannot get task function worker usage for task function name '${name}' when task function names list has less than 3 elements`
)
}
if (name === DEFAULT_TASK_NAME) {
- name = this.info.taskFunctions[1]
+ name = this.info.taskFunction
Name
s[1]
}
if (!this.taskFunctionsUsage.has(name)) {
this.taskFunctionsUsage.set(name, this.initTaskFunctionWorkerUsage(name))
}
if (!this.taskFunctionsUsage.has(name)) {
this.taskFunctionsUsage.set(name, this.initTaskFunctionWorkerUsage(name))
@@
-161,6
+183,11
@@
implements IWorkerNode<Worker, Data> {
return this.taskFunctionsUsage.get(name)
}
return this.taskFunctionsUsage.get(name)
}
+ /** @inheritdoc */
+ public deleteTaskFunctionWorkerUsage (name: string): boolean {
+ return this.taskFunctionsUsage.delete(name)
+ }
+
private async startOnEmptyQueue (): Promise<void> {
if (
this.onEmptyQueueCount > 0 &&
private async startOnEmptyQueue (): Promise<void> {
if (
this.onEmptyQueueCount > 0 &&
@@
-169,8
+196,8
@@
implements IWorkerNode<Worker, Data> {
this.onEmptyQueueCount = 0
return
}
this.onEmptyQueueCount = 0
return
}
- (this.onEmptyQueue as WorkerNodeEventCallback)(this.info.id as number)
++this.onEmptyQueueCount
++this.onEmptyQueueCount
+ this.onEmptyQueue?.(this.info.id as number)
await sleep(exponentialDelay(this.onEmptyQueueCount))
await this.startOnEmptyQueue()
}
await sleep(exponentialDelay(this.onEmptyQueueCount))
await this.startOnEmptyQueue()
}
@@
-227,7
+254,7
@@
implements IWorkerNode<Worker, Data> {
for (const task of this.tasksQueue) {
if (
(task.name === DEFAULT_TASK_NAME &&
for (const task of this.tasksQueue) {
if (
(task.name === DEFAULT_TASK_NAME &&
- name === (this.info.taskFunctions as string[])[1]) ||
+ name === (this.info.taskFunction
Name
s as string[])[1]) ||
(task.name !== DEFAULT_TASK_NAME && name === task.name)
) {
++taskFunctionQueueSize
(task.name !== DEFAULT_TASK_NAME && name === task.name)
) {
++taskFunctionQueueSize