repositories
/
poolifier.git
/ blobdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
tree
raw
|
inline
| side by side
test: refine test naming
[poolifier.git]
/
src
/
pools
/
worker-node.ts
diff --git
a/src/pools/worker-node.ts
b/src/pools/worker-node.ts
index 7ea617fbc4cd3023770699585a83757596efe12f..d29a6e323f35208e22b6f87ca4610fb0af1b3e34 100644
(file)
--- a/
src/pools/worker-node.ts
+++ b/
src/pools/worker-node.ts
@@
-9,7
+9,7
@@
import {
checkWorkerNodeArguments,
createWorker,
getWorkerId,
checkWorkerNodeArguments,
createWorker,
getWorkerId,
- getWorkerType
+ getWorkerType
,
} from './utils.js'
import {
type EventHandler,
} from './utils.js'
import {
type EventHandler,
@@
-21,12
+21,11
@@
import {
type WorkerNodeOptions,
type WorkerType,
WorkerTypes,
type WorkerNodeOptions,
type WorkerType,
WorkerTypes,
- type WorkerUsage
+ type WorkerUsage
,
} from './worker.js'
/**
* Worker node.
} from './worker.js'
/**
* Worker node.
- *
* @typeParam Worker - Type of worker.
* @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
*/
* @typeParam Worker - Type of worker.
* @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
*/
@@
-51,7
+50,6
@@
export class WorkerNode<Worker extends IWorker, Data = unknown>
/**
* Constructs a new worker node.
/**
* Constructs a new worker node.
- *
* @param type - The worker type.
* @param filePath - Path to the worker file.
* @param opts - The worker node options.
* @param type - The worker type.
* @param filePath - Path to the worker file.
* @param opts - The worker node options.
@@
-61,7
+59,7
@@
export class WorkerNode<Worker extends IWorker, Data = unknown>
checkWorkerNodeArguments(type, filePath, opts)
this.worker = createWorker<Worker>(type, filePath, {
env: opts.env,
checkWorkerNodeArguments(type, filePath, opts)
this.worker = createWorker<Worker>(type, filePath, {
env: opts.env,
- workerOptions: opts.workerOptions
+ workerOptions: opts.workerOptions
,
})
this.info = this.initWorkerInfo(this.worker)
this.usage = this.initWorkerUsage()
})
this.info = this.initWorkerInfo(this.worker)
this.usage = this.initWorkerUsage()
@@
-70,11
+68,19
@@
export class WorkerNode<Worker extends IWorker, Data = unknown>
}
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
this.tasksQueueBackPressureSize = opts.tasksQueueBackPressureSize!
}
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
this.tasksQueueBackPressureSize = opts.tasksQueueBackPressureSize!
- this.tasksQueue = new PriorityQueue<Task<Data>>(opts.tasksQueueBucketSize)
+ this.tasksQueue = new PriorityQueue<Task<Data>>(
+ opts.tasksQueueBucketSize,
+ opts.tasksQueuePriority
+ )
this.setBackPressureFlag = false
this.taskFunctionsUsage = new Map<string, WorkerUsage>()
}
this.setBackPressureFlag = false
this.taskFunctionsUsage = new Map<string, WorkerUsage>()
}
+ /** @inheritdoc */
+ public setTasksQueuePriority (enablePriority: boolean): void {
+ this.tasksQueue.enablePriority = enablePriority
+ }
+
/** @inheritdoc */
public tasksQueueSize (): number {
return this.tasksQueue.size
/** @inheritdoc */
public tasksQueueSize (): number {
return this.tasksQueue.size
@@
-114,7
+120,7
@@
export class WorkerNode<Worker extends IWorker, Data = unknown>
/** @inheritdoc */
public dequeueLastPrioritizedTask (): Task<Data> | undefined {
// Start from the last empty or partially filled bucket
/** @inheritdoc */
public dequeueLastPrioritizedTask (): Task<Data> | undefined {
// Start from the last empty or partially filled bucket
- return this.dequeueTask()
+ return this.dequeueTask(
this.tasksQueue.buckets + 1
)
}
/** @inheritdoc */
}
/** @inheritdoc */
@@
-214,7
+220,7
@@
export class WorkerNode<Worker extends IWorker, Data = unknown>
dynamic: false,
ready: false,
stealing: false,
dynamic: false,
ready: false,
stealing: false,
- backPressure: false
+ backPressure: false
,
}
}
}
}
@@
-237,22
+243,22
@@
export class WorkerNode<Worker extends IWorker, Data = unknown>
},
sequentiallyStolen: 0,
stolen: 0,
},
sequentiallyStolen: 0,
stolen: 0,
- failed: 0
+ failed: 0
,
},
runTime: {
},
runTime: {
- history: new CircularBuffer(MeasurementHistorySize)
+ history: new CircularBuffer(MeasurementHistorySize)
,
},
waitTime: {
},
waitTime: {
- history: new CircularBuffer(MeasurementHistorySize)
+ history: new CircularBuffer(MeasurementHistorySize)
,
},
elu: {
idle: {
},
elu: {
idle: {
- history: new CircularBuffer(MeasurementHistorySize)
+ history: new CircularBuffer(MeasurementHistorySize)
,
},
active: {
},
active: {
- history: new CircularBuffer(MeasurementHistorySize)
- }
- }
+ history: new CircularBuffer(MeasurementHistorySize)
,
+ }
,
+ }
,
}
}
}
}
@@
-280,22
+286,22
@@
export class WorkerNode<Worker extends IWorker, Data = unknown>
},
sequentiallyStolen: 0,
stolen: 0,
},
sequentiallyStolen: 0,
stolen: 0,
- failed: 0
+ failed: 0
,
},
runTime: {
},
runTime: {
- history: new CircularBuffer(MeasurementHistorySize)
+ history: new CircularBuffer(MeasurementHistorySize)
,
},
waitTime: {
},
waitTime: {
- history: new CircularBuffer(MeasurementHistorySize)
+ history: new CircularBuffer(MeasurementHistorySize)
,
},
elu: {
idle: {
},
elu: {
idle: {
- history: new CircularBuffer(MeasurementHistorySize)
+ history: new CircularBuffer(MeasurementHistorySize)
,
},
active: {
},
active: {
- history: new CircularBuffer(MeasurementHistorySize)
- }
- }
+ history: new CircularBuffer(MeasurementHistorySize)
,
+ }
,
+ }
,
}
}
}
}
}
}