"lines": 92,
"statements": 92,
"functions": 95,
- "branches": 92
+ "branches": 90
}
let minWorkerVirtualTaskEndTimestamp = Infinity
let chosenWorkerNodeKey: number | undefined
for (const [workerNodeKey] of this.pool.workerNodes.entries()) {
+ if (!this.isWorkerNodeEligible(workerNodeKey)) {
+ continue
+ }
if (this.workersVirtualTaskEndTimestamp[workerNodeKey] == null) {
this.computeWorkerVirtualTaskEndTimestamp(workerNodeKey)
}
const workerVirtualTaskEndTimestamp =
this.workersVirtualTaskEndTimestamp[workerNodeKey]
- if (
- this.isWorkerNodeEligible(workerNodeKey) &&
- workerVirtualTaskEndTimestamp < minWorkerVirtualTaskEndTimestamp
- ) {
+ if (workerVirtualTaskEndTimestamp < minWorkerVirtualTaskEndTimestamp) {
minWorkerVirtualTaskEndTimestamp = workerVirtualTaskEndTimestamp
chosenWorkerNodeKey = workerNodeKey
}
workerNodeKey < this.pool.workerNodes.length;
workerNodeKey++
) {
+ if (!this.isWorkerNodeEligible(workerNodeKey)) {
+ continue
+ }
const workerWeight =
this.opts.weights?.[workerNodeKey] ?? this.defaultWorkerWeight
- if (
- this.isWorkerNodeEligible(workerNodeKey) &&
- workerWeight >= this.roundWeights[roundIndex]
- ) {
+ if (workerWeight >= this.roundWeights[roundIndex]) {
workerNodeId = workerNodeKey
break
}
let minTime = Infinity
let chosenWorkerNodeKey: number | undefined
for (const [workerNodeKey, workerNode] of this.pool.workerNodes.entries()) {
+ if (!this.isWorkerNodeEligible(workerNodeKey)) {
+ continue
+ }
const workerTime =
(workerNode.usage.runTime?.aggregate ?? 0) +
(workerNode.usage.waitTime?.aggregate ?? 0)
- if (this.isWorkerNodeEligible(workerNodeKey) && workerTime === 0) {
+ if (workerTime === 0) {
chosenWorkerNodeKey = workerNodeKey
break
- } else if (
- this.isWorkerNodeEligible(workerNodeKey) &&
- workerTime < minTime
- ) {
+ } else if (workerTime < minTime) {
minTime = workerTime
chosenWorkerNodeKey = workerNodeKey
}
let minWorkerElu = Infinity
let chosenWorkerNodeKey: number | undefined
for (const [workerNodeKey, workerNode] of this.pool.workerNodes.entries()) {
+ if (!this.isWorkerNodeEligible(workerNodeKey)) {
+ continue
+ }
const workerUsage = workerNode.usage
const workerElu = workerUsage.elu?.active?.aggregate ?? 0
- if (this.isWorkerNodeEligible(workerNodeKey) && workerElu === 0) {
+ if (workerElu === 0) {
chosenWorkerNodeKey = workerNodeKey
break
- } else if (
- this.isWorkerNodeEligible(workerNodeKey) &&
- workerElu < minWorkerElu
- ) {
+ } else if (workerElu < minWorkerElu) {
minWorkerElu = workerElu
chosenWorkerNodeKey = workerNodeKey
}
let minNumberOfTasks = Infinity
let chosenWorkerNodeKey: number | undefined
for (const [workerNodeKey, workerNode] of this.pool.workerNodes.entries()) {
+ if (!this.isWorkerNodeEligible(workerNodeKey)) {
+ continue
+ }
const workerTaskStatistics = workerNode.usage.tasks
const workerTasks =
workerTaskStatistics.executed +
workerTaskStatistics.executing +
workerTaskStatistics.queued
- if (this.isWorkerNodeEligible(workerNodeKey) && workerTasks === 0) {
+ if (workerTasks === 0) {
chosenWorkerNodeKey = workerNodeKey
break
- } else if (
- this.isWorkerNodeEligible(workerNodeKey) &&
- workerTasks < minNumberOfTasks
- ) {
+ } else if (workerTasks < minNumberOfTasks) {
minNumberOfTasks = workerTasks
chosenWorkerNodeKey = workerNodeKey
}
return availableParallelism
}
+/**
+ * Returns the worker type of the given worker.
+ *
+ * @param worker - The worker to get the type of.
+ * @returns The worker type of the given worker.
+ * @internal
+ */
+export const getWorkerType = <Worker extends IWorker>(
+ worker: Worker
+): WorkerType | undefined => {
+ if (worker instanceof ThreadWorker) {
+ return WorkerTypes.thread
+ } else if (worker instanceof ClusterWorker) {
+ return WorkerTypes.cluster
+ }
+}
+
+/**
+ * Returns the worker id of the given worker.
+ *
+ * @param worker - The worker to get the id of.
+ * @returns The worker id of the given worker.
+ * @internal
+ */
+export const getWorkerId = <Worker extends IWorker>(
+ worker: Worker
+): number | undefined => {
+ if (worker instanceof ThreadWorker) {
+ return worker.threadId
+ } else if (worker instanceof ClusterWorker) {
+ return worker.id
+ }
+}
+
/**
* Sleeps for the given amount of milliseconds.
*
)
}
-/**
- * Returns the worker type of the given worker.
- *
- * @param worker - The worker to get the type of.
- * @returns The worker type of the given worker.
- * @internal
- */
-export const getWorkerType = <Worker extends IWorker>(
- worker: Worker
-): WorkerType | undefined => {
- if (worker instanceof ThreadWorker) {
- return WorkerTypes.thread
- }
- if (worker instanceof ClusterWorker) {
- return WorkerTypes.cluster
- }
-}
-
-/**
- * Returns the worker id of the given worker.
- *
- * @param worker - The worker to get the id of.
- * @returns The worker id of the given worker.
- * @internal
- */
-export const getWorkerId = <Worker extends IWorker>(
- worker: Worker
-): number | undefined => {
- if (worker instanceof ThreadWorker) {
- return worker.threadId
- } else if (worker instanceof ClusterWorker) {
- return worker.id
- }
-}
-
/**
* Computes the median of the given data set.
*
const { randomInt } = require('crypto')
+const { Worker } = require('worker_threads')
+const cluster = require('cluster')
const { expect } = require('expect')
const {
CircularArray,
availableParallelism,
average,
exponentialDelay,
+ getWorkerType,
+ getWorkerId,
isAsyncFunction,
isKillBehavior,
isPlainObject,
sleep,
updateMeasurementStatistics
} = require('../lib/utils')
-const { KillBehaviors } = require('../lib/worker/worker-options')
+const { KillBehaviors, WorkerTypes } = require('../lib')
describe('Utils test suite', () => {
it('Verify availableParallelism() behavior', () => {
- expect(typeof availableParallelism() === 'number').toBe(true)
- expect(availableParallelism()).toBeGreaterThan(0)
- expect(Number.isSafeInteger(availableParallelism())).toBe(true)
+ const parallelism = availableParallelism()
+ expect(typeof parallelism === 'number').toBe(true)
+ expect(parallelism).toBeGreaterThan(0)
+ expect(Number.isSafeInteger(parallelism)).toBe(true)
+ })
+
+ it('Verify getWorkerType() behavior', () => {
+ expect(
+ getWorkerType(new Worker('./tests/worker-files/thread/testWorker.js'))
+ ).toBe(WorkerTypes.thread)
+ expect(getWorkerType(cluster.fork())).toBe(WorkerTypes.cluster)
+ })
+
+ it('Verify getWorkerId() behavior', () => {
+ const threadWorker = new Worker('./tests/worker-files/thread/testWorker.js')
+ const clusterWorker = cluster.fork()
+ expect(getWorkerId(threadWorker)).toBe(threadWorker.threadId)
+ expect(getWorkerId(clusterWorker)).toBe(clusterWorker.id)
})
it.skip('Verify sleep() behavior', async () => {