import {
DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
EMPTY_FUNCTION,
+ isPlainObject,
median
} from '../utils'
import { KillBehaviors, isKillBehavior } from '../worker/worker-options'
)
} else if (!Number.isSafeInteger(numberOfWorkers)) {
throw new TypeError(
- 'Cannot instantiate a pool with a non integer number of workers'
+ 'Cannot instantiate a pool with a non safe integer number of workers'
)
} else if (numberOfWorkers < 0) {
throw new RangeError(
}
private checkPoolOptions (opts: PoolOptions<Worker>): void {
- this.opts.workerChoiceStrategy =
- opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
- this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
- this.opts.workerChoiceStrategyOptions =
- opts.workerChoiceStrategyOptions ?? DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
- this.opts.enableEvents = opts.enableEvents ?? true
- this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
- if (this.opts.enableTasksQueue) {
- this.checkValidTasksQueueOptions(
- opts.tasksQueueOptions as TasksQueueOptions
- )
- this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
- opts.tasksQueueOptions as TasksQueueOptions
- )
+ if (isPlainObject(opts)) {
+ this.opts.workerChoiceStrategy =
+ opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
+ this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
+ this.opts.workerChoiceStrategyOptions =
+ opts.workerChoiceStrategyOptions ??
+ DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
+ this.opts.enableEvents = opts.enableEvents ?? true
+ this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
+ if (this.opts.enableTasksQueue) {
+ this.checkValidTasksQueueOptions(
+ opts.tasksQueueOptions as TasksQueueOptions
+ )
+ this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
+ opts.tasksQueueOptions as TasksQueueOptions
+ )
+ }
+ } else {
+ throw new TypeError('Invalid pool options: must be a plain object')
}
}
}
}
+ private checkValidWorkerChoiceStrategyOptions (
+ workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
+ ): void {
+ if (!isPlainObject(workerChoiceStrategyOptions)) {
+ throw new TypeError(
+ 'Invalid worker choice strategy options: must be a plain object'
+ )
+ }
+ }
+
private checkValidTasksQueueOptions (
tasksQueueOptions: TasksQueueOptions
): void {
+ if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
+ throw new TypeError('Invalid tasks queue options: must be a plain object')
+ }
if ((tasksQueueOptions?.concurrency as number) <= 0) {
throw new Error(
`Invalid worker tasks concurrency '${
public setWorkerChoiceStrategyOptions (
workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
): void {
+ this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions
this.workerChoiceStrategyContext.setOptions(
this.opts.workerChoiceStrategyOptions
let chosenWorkerNodeKey!: number
for (const [workerNodeKey] of this.pool.workerNodes.entries()) {
this.computeWorkerVirtualTaskTimestamp(workerNodeKey)
- const workerLastVirtualTaskEndTimestamp =
+ const workerVirtualTaskEndTimestamp =
this.workersVirtualTaskTimestamp[workerNodeKey]?.end ?? 0
- if (
- workerLastVirtualTaskEndTimestamp < minWorkerVirtualTaskEndTimestamp
- ) {
- minWorkerVirtualTaskEndTimestamp = workerLastVirtualTaskEndTimestamp
+ if (workerVirtualTaskEndTimestamp < minWorkerVirtualTaskEndTimestamp) {
+ minWorkerVirtualTaskEndTimestamp = workerVirtualTaskEndTimestamp
chosenWorkerNodeKey = workerNodeKey
}
}
/** @inheritDoc */
public choose (): number {
const chosenWorkerNodeKey = this.currentWorkerNodeId
- const workerTaskRunTime = this.workerVirtualTaskRunTime ?? 0
+ const workerVirtualTaskRunTime = this.workerVirtualTaskRunTime ?? 0
const workerTaskWeight =
this.opts.weights?.[chosenWorkerNodeKey] ?? this.defaultWorkerWeight
- if (workerTaskRunTime < workerTaskWeight) {
+ if (workerVirtualTaskRunTime < workerTaskWeight) {
this.workerVirtualTaskRunTime =
- workerTaskRunTime +
+ workerVirtualTaskRunTime +
(this.getWorkerVirtualTaskRunTime(chosenWorkerNodeKey) ?? 0)
} else {
this.currentWorkerNodeId =
}
return (sortedDataSet[middleIndex - 1] + sortedDataSet[middleIndex]) / 2
}
+
+export const isPlainObject = (obj: unknown): boolean =>
+ typeof obj === 'object' &&
+ obj !== null &&
+ obj?.constructor === Object &&
+ Object.prototype.toString.call(obj) === '[object Object]'
WorkerFunction,
WorkerSyncFunction
} from '../utility-types'
-import { EMPTY_FUNCTION } from '../utils'
+import { EMPTY_FUNCTION, isPlainObject } from '../utils'
import type { KillBehavior, WorkerOptions } from './worker-options'
import { KillBehaviors } from './worker-options'
typeof taskFunctions !== 'function' &&
typeof taskFunctions !== 'object'
) {
- throw new Error('taskFunctions parameter is not a function or an object')
- }
- if (
- typeof taskFunctions === 'object' &&
- taskFunctions.constructor !== Object &&
- Object.prototype.toString.call(taskFunctions) !== '[object Object]'
- ) {
- throw new Error('taskFunctions parameter is not an object literal')
+ throw new TypeError(
+ 'taskFunctions parameter is not a function or an object'
+ )
}
this.taskFunctions = new Map<string, WorkerFunction<Data, Response>>()
- if (typeof taskFunctions !== 'function') {
+ if (typeof taskFunctions === 'function') {
+ this.taskFunctions.set(DEFAULT_FUNCTION_NAME, taskFunctions.bind(this))
+ } else if (isPlainObject(taskFunctions)) {
let firstEntry = true
for (const [name, fn] of Object.entries(taskFunctions)) {
if (typeof fn !== 'function') {
- throw new Error(
+ throw new TypeError(
'A taskFunctions parameter object value is not a function'
)
}
throw new Error('taskFunctions parameter object is empty')
}
} else {
- this.taskFunctions.set(DEFAULT_FUNCTION_NAME, taskFunctions.bind(this))
+ throw new TypeError('taskFunctions parameter is not an object literal')
}
}
new FixedThreadPool(0.25, './tests/worker-files/thread/testWorker.js')
).toThrowError(
new TypeError(
- 'Cannot instantiate a pool with a non integer number of workers'
+ 'Cannot instantiate a pool with a non safe integer number of workers'
)
)
})
new TypeError('taskFunctions parameter is not an object literal')
)
expect(() => new ClusterWorker({})).toThrowError(
- new TypeError('taskFunctions parameter object is empty')
+ new Error('taskFunctions parameter object is empty')
)
})