repositories
/
poolifier.git
/ blobdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
tree
raw
|
inline
| side by side
fix: revert incorrect change
[poolifier.git]
/
src
/
pools
/
worker-node.ts
diff --git
a/src/pools/worker-node.ts
b/src/pools/worker-node.ts
index fbd00096a769d53cd5d3d9415fae64c428cfb3ee..03fc0df192f3ce6689306ba3f6eb38c12dabed40 100644
(file)
--- a/
src/pools/worker-node.ts
+++ b/
src/pools/worker-node.ts
@@
-9,6
+9,8
@@
import {
} from '../utils'
import { Deque } from '../deque'
import {
} from '../utils'
import { Deque } from '../deque'
import {
+ type BackPressureCallback,
+ type EmptyQueueCallback,
type IWorker,
type IWorkerNode,
type WorkerInfo,
type IWorker,
type IWorkerNode,
type WorkerInfo,
@@
-30,18
+32,18
@@
implements IWorkerNode<Worker, Data> {
/** @inheritdoc */
public readonly info: WorkerInfo
/** @inheritdoc */
/** @inheritdoc */
public readonly info: WorkerInfo
/** @inheritdoc */
- public messageChannel?: MessageChannel
- /** @inheritdoc */
public usage: WorkerUsage
/** @inheritdoc */
public usage: WorkerUsage
/** @inheritdoc */
+ public messageChannel?: MessageChannel
+ /** @inheritdoc */
public tasksQueueBackPressureSize: number
/** @inheritdoc */
public tasksQueueBackPressureSize: number
/** @inheritdoc */
- public onBackPressure?:
(workerId: number) => void
+ public onBackPressure?:
BackPressureCallback
/** @inheritdoc */
/** @inheritdoc */
- public onEmptyQueue?: (workerId: number) => void
- private readonly taskFunctionsUsage: Map<string, WorkerUsage>
+ public onEmptyQueue?: EmptyQueueCallback
private readonly tasksQueue: Deque<Task<Data>>
private onEmptyQueueCount: number
private readonly tasksQueue: Deque<Task<Data>>
private onEmptyQueueCount: number
+ private readonly taskFunctionsUsage: Map<string, WorkerUsage>
/**
* Constructs a new worker node.
/**
* Constructs a new worker node.
@@
-75,14
+77,14
@@
implements IWorkerNode<Worker, Data> {
}
this.worker = worker
this.info = this.initWorkerInfo(worker, workerType)
}
this.worker = worker
this.info = this.initWorkerInfo(worker, workerType)
+ this.usage = this.initWorkerUsage()
if (workerType === WorkerTypes.thread) {
this.messageChannel = new MessageChannel()
}
if (workerType === WorkerTypes.thread) {
this.messageChannel = new MessageChannel()
}
- this.usage = this.initWorkerUsage()
- this.taskFunctionsUsage = new Map<string, WorkerUsage>()
- this.tasksQueue = new Deque<Task<Data>>()
this.tasksQueueBackPressureSize = tasksQueueBackPressureSize
this.tasksQueueBackPressureSize = tasksQueueBackPressureSize
+ this.tasksQueue = new Deque<Task<Data>>()
this.onEmptyQueueCount = 0
this.onEmptyQueueCount = 0
+ this.taskFunctionsUsage = new Map<string, WorkerUsage>()
}
/** @inheritdoc */
}
/** @inheritdoc */
@@
-180,13
+182,12
@@
implements IWorkerNode<Worker, Data> {
private async startOnEmptyQueue (): Promise<void> {
if (
this.onEmptyQueueCount > 0 &&
private async startOnEmptyQueue (): Promise<void> {
if (
this.onEmptyQueueCount > 0 &&
- this.usage.tasks.executing > 0 &&
- this.tasksQueue.size > 0
+ (this.usage.tasks.executing > 0 || this.tasksQueue.size > 0)
) {
this.onEmptyQueueCount = 0
return
}
) {
this.onEmptyQueueCount = 0
return
}
- (this.onEmptyQueue as
(workerId: number) => void
)(this.info.id as number)
+ (this.onEmptyQueue as
EmptyQueueCallback
)(this.info.id as number)
++this.onEmptyQueueCount
await sleep(exponentialDelay(this.onEmptyQueueCount))
await this.startOnEmptyQueue()
++this.onEmptyQueueCount
await sleep(exponentialDelay(this.onEmptyQueueCount))
await this.startOnEmptyQueue()