### Changed
- Disable publication on GitHub packages registry on release until authentication issue is fixed.
+- Rename `listTaskFunctions()` to `listTaskFunctionNames()` in pool and worker API.
+
+### Added
+
+- Add `addTaskFunction()`, `removeTaskFunction()`, `setDefaultTaskFunction()` methods to pool API.
+ ### Added
+
+ - Add `startWorkers` to pool options to whether start the minimum number of workers at pool creation or not.
+ - Add `taskStealing` and `tasksStealingOnPressure` to tasks queue options to whether enable task stealing or not and whether enable tasks stealing on back pressure or not.
+ - Continuous internal benchmarking: https://poolifier.github.io/benchmark-results/dev/bench.
+
## [2.6.44] - 2023-09-08
### Fixed
- Optimize worker alive status check.
- BREAKING CHANGE: Rename worker choice strategy `LESS_RECENTLY_USED` to `LESS_USED`.
- Optimize `LESS_USED` worker choice strategy.
- - Update benchmarks versus external threads pools.
+ - Update benchmark versus external threads pools.
- Optimize tasks usage statistics requirements for worker choice strategy.
### Fixed
- Optimize worker alive status check.
- BREAKING CHANGE: Rename worker choice strategy `LESS_RECENTLY_USED` to `LESS_USED`.
- Optimize `LESS_USED` worker choice strategy.
- - Update benchmarks versus external threads pools.
+ - Update benchmark versus external threads pools.
### Fixed
### Changed
- Optimize fair share task scheduling algorithm implementation.
- - Update benchmarks versus external pools results with latest version.
+ - Update benchmark versus external pools results with latest version.
## [2.3.3] - 2022-10-15
- [`pool = new FixedThreadPool/FixedClusterPool(numberOfThreads/numberOfWorkers, filePath, opts)`](#pool--new-fixedthreadpoolfixedclusterpoolnumberofthreadsnumberofworkers-filepath-opts)
- [`pool = new DynamicThreadPool/DynamicClusterPool(min, max, filePath, opts)`](#pool--new-dynamicthreadpooldynamicclusterpoolmin-max-filepath-opts)
- [`pool.execute(data, name, transferList)`](#poolexecutedata-name-transferlist)
+ - [`pool.start()`](#poolstart)
- [`pool.destroy()`](#pooldestroy)
- - [`pool.listTaskFunctions()`](#poollisttaskfunctions)
+ - [`pool.listTaskFunctionNames()`](#poollisttaskfunctionnames)
- [`PoolOptions`](#pooloptions)
- [`ThreadPoolOptions extends PoolOptions`](#threadpooloptions-extends-pooloptions)
- [`ClusterPoolOptions extends PoolOptions`](#clusterpooloptions-extends-pooloptions)
- [`YourWorker.hasTaskFunction(name)`](#yourworkerhastaskfunctionname)
- [`YourWorker.addTaskFunction(name, fn)`](#yourworkeraddtaskfunctionname-fn)
- [`YourWorker.removeTaskFunction(name)`](#yourworkerremovetaskfunctionname)
- - [`YourWorker.listTaskFunctions()`](#yourworkerlisttaskfunctions)
+ - [`YourWorker.listTaskFunctionNames()`](#yourworkerlisttaskfunctionnames)
- [`YourWorker.setDefaultTaskFunction(name)`](#yourworkersetdefaulttaskfunctionname)
## Pool
This method is available on both pool implementations and returns a promise with the task function execution response.
+ ### `pool.start()`
+
+ This method is available on both pool implementations and will start the minimum number of workers.
+
### `pool.destroy()`
This method is available on both pool implementations and will call the terminate method on each worker.
-### `pool.listTaskFunctions()`
+### `pool.listTaskFunctionNames()`
This method is available on both pool implementations and returns an array of the task function names.
An object with these properties:
- - `onlineHandler` (optional) - A function that will listen for online event on each worker
- - `messageHandler` (optional) - A function that will listen for message event on each worker
- - `errorHandler` (optional) - A function that will listen for error event on each worker
- - `exitHandler` (optional) - A function that will listen for exit event on each worker
+ - `onlineHandler` (optional) - A function that will listen for online event on each worker.
+ Default: `() => {}`
+ - `messageHandler` (optional) - A function that will listen for message event on each worker.
+ Default: `() => {}`
+ - `errorHandler` (optional) - A function that will listen for error event on each worker.
+ Default: `() => {}`
+ - `exitHandler` (optional) - A function that will listen for exit event on each worker.
+ Default: `() => {}`
+
- `workerChoiceStrategy` (optional) - The worker choice strategy to use in this pool:
- `WorkerChoiceStrategies.ROUND_ROBIN`: Submit tasks to worker in a round robin fashion
Default: `{ retries: 6, runTime: { median: false }, waitTime: { median: false }, elu: { median: false } }`
+ - `startWorkers` (optional) - Start the minimum number of workers at pool creation.
+ Default: `true`
- `restartWorkerOnError` (optional) - Restart worker on uncaught error in this pool.
Default: `true`
- `enableEvents` (optional) - Events emission enablement in this pool.
- `size` (optional) - The maximum number of tasks that can be queued on a worker before flagging it as back pressured. It must be a positive integer.
- `concurrency` (optional) - The maximum number of tasks that can be executed concurrently on a worker. It must be a positive integer.
+ - `taskStealing` (optional) - Task stealing enablement.
+ - `tasksStealingOnBackPressure` (optional) - Tasks stealing enablement on back pressure.
- Default: `{ size: (pool maximum size)^2, concurrency: 1 }`
+ Default: `{ size: (pool maximum size)^2, concurrency: 1, taskStealing: true, tasksStealingOnBackPressure: true }`
#### `ThreadPoolOptions extends PoolOptions`
This method is available on both worker implementations and returns a boolean.
-#### `YourWorker.listTaskFunctions()`
+#### `YourWorker.listTaskFunctionNames()`
This method is available on both worker implementations and returns an array of the task function names.
updateMeasurementStatistics
} from '../utils'
import { KillBehaviors } from '../worker/worker-options'
+import type { TaskFunction } from '../worker/task-functions'
import {
type IPool,
PoolEmitter,
public readonly emitter?: PoolEmitter
/**
- * The task execution response promise map.
- *
+ * The task execution response promise map:
* - `key`: The message id of each submitted task.
* - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
*
*/
protected readonly max?: number
- /**
- * Whether the pool is starting or not.
- */
- private readonly starting: boolean
+ /**
+ * The task functions added at runtime map:
+ * - `key`: The task function name.
+ * - `value`: The task function itself.
+ */
+ private readonly taskFunctions: Map<string, TaskFunction<Data, Response>>
+
/**
* Whether the pool is started or not.
*/
private started: boolean
+ /**
+ * Whether the pool is starting or not.
+ */
+ private starting: boolean
/**
* The start timestamp of the pool.
*/
this.setupHook()
- this.starting = true
- this.startPool()
+ this.taskFunctions = new Map<string, TaskFunction<Data, Response>>()
+
+ this.started = false
this.starting = false
- this.started = true
+ if (this.opts.startWorkers === true) {
+ this.start()
+ }
this.startTimestamp = performance.now()
}
private checkPoolOptions (opts: PoolOptions<Worker>): void {
if (isPlainObject(opts)) {
+ this.opts.startWorkers = opts.startWorkers ?? true
this.opts.workerChoiceStrategy =
opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
`Invalid worker node tasks concurrency: ${tasksQueueOptions?.concurrency} is a negative integer or zero`
)
}
- if (tasksQueueOptions?.queueMaxSize != null) {
- throw new Error(
- 'Invalid tasks queue options: queueMaxSize is deprecated, please use size instead'
- )
- }
if (
tasksQueueOptions?.size != null &&
!Number.isSafeInteger(tasksQueueOptions?.size)
}
}
- private startPool (): void {
- while (
- this.workerNodes.reduce(
- (accumulator, workerNode) =>
- !workerNode.info.dynamic ? accumulator + 1 : accumulator,
- 0
- ) < this.numberOfWorkers
- ) {
- this.createAndSetupWorkerNode()
- }
- }
-
/** @inheritDoc */
public get info (): PoolInfo {
return {
version,
type: this.type,
worker: this.worker,
+ started: this.started,
ready: this.ready,
strategy: this.opts.workerChoiceStrategy as WorkerChoiceStrategy,
minSize: this.minSize,
* @param workerId - The worker id.
* @returns The worker node key if the worker id is found in the pool worker nodes, `-1` otherwise.
*/
- private getWorkerNodeKeyByWorkerId (workerId: number): number {
+ private getWorkerNodeKeyByWorkerId (workerId: number | undefined): number {
return this.workerNodes.findIndex(
workerNode => workerNode.info.id === workerId
)
return {
...{
size: Math.pow(this.maxSize, 2),
- concurrency: 1
+ concurrency: 1,
+ taskStealing: true,
+ tasksStealingOnBackPressure: true
},
...tasksQueueOptions
}
(this.opts.tasksQueueOptions?.concurrency as number)
) === -1
)
- } else {
- return (
- this.workerNodes.findIndex(
- workerNode =>
- workerNode.info.ready && workerNode.usage.tasks.executing === 0
- ) === -1
- )
}
+ return (
+ this.workerNodes.findIndex(
+ workerNode =>
+ workerNode.info.ready && workerNode.usage.tasks.executing === 0
+ ) === -1
+ )
+ }
+
+ private async sendTaskFunctionOperationToWorker (
+ workerNodeKey: number,
+ message: MessageValue<Data>
+ ): Promise<boolean> {
+ const workerId = this.getWorkerInfo(workerNodeKey).id as number
+ return await new Promise<boolean>((resolve, reject) => {
+ this.registerWorkerMessageListener(workerNodeKey, message => {
+ if (
+ message.workerId === workerId &&
+ message.taskFunctionOperationStatus === true
+ ) {
+ resolve(true)
+ } else if (
+ message.workerId === workerId &&
+ message.taskFunctionOperationStatus === false
+ ) {
+ reject(
+ new Error(
+ `Task function operation ${
+ message.taskFunctionOperation as string
+ } failed on worker ${message.workerId}`
+ )
+ )
+ }
+ })
+ this.sendToWorker(workerNodeKey, message)
+ })
+ }
+
+ private async sendTaskFunctionOperationToWorkers (
+ message: Omit<MessageValue<Data>, 'workerId'>
+ ): Promise<boolean> {
+ return await new Promise<boolean>((resolve, reject) => {
+ const responsesReceived = new Array<MessageValue<Data | Response>>()
+ for (const [workerNodeKey] of this.workerNodes.entries()) {
+ this.registerWorkerMessageListener(workerNodeKey, message => {
+ if (message.taskFunctionOperationStatus != null) {
+ responsesReceived.push(message)
+ if (
+ responsesReceived.length === this.workerNodes.length &&
+ responsesReceived.every(
+ message => message.taskFunctionOperationStatus === true
+ )
+ ) {
+ resolve(true)
+ } else if (
+ responsesReceived.length === this.workerNodes.length &&
+ responsesReceived.some(
+ message => message.taskFunctionOperationStatus === false
+ )
+ ) {
+ reject(
+ new Error(
+ `Task function operation ${
+ message.taskFunctionOperation as string
+ } failed on worker ${message.workerId as number}`
+ )
+ )
+ }
+ }
+ })
+ this.sendToWorker(workerNodeKey, message)
+ }
+ })
}
/** @inheritDoc */
- public listTaskFunctions (): string[] {
+ public hasTaskFunction (name: string): boolean {
for (const workerNode of this.workerNodes) {
if (
- Array.isArray(workerNode.info.taskFunctions) &&
- workerNode.info.taskFunctions.length > 0
+ Array.isArray(workerNode.info.taskFunctionNames) &&
+ workerNode.info.taskFunctionNames.includes(name)
) {
- return workerNode.info.taskFunctions
+ return true
+ }
+ }
+ return false
+ }
+
+ /** @inheritDoc */
+ public async addTaskFunction (
+ name: string,
+ taskFunction: TaskFunction<Data, Response>
+ ): Promise<boolean> {
+ this.taskFunctions.set(name, taskFunction)
+ return await this.sendTaskFunctionOperationToWorkers({
+ taskFunctionOperation: 'add',
+ taskFunctionName: name,
+ taskFunction: taskFunction.toString()
+ })
+ }
+
+ /** @inheritDoc */
+ public async removeTaskFunction (name: string): Promise<boolean> {
+ this.taskFunctions.delete(name)
+ return await this.sendTaskFunctionOperationToWorkers({
+ taskFunctionOperation: 'remove',
+ taskFunctionName: name
+ })
+ }
+
+ /** @inheritDoc */
+ public listTaskFunctionNames (): string[] {
+ for (const workerNode of this.workerNodes) {
+ if (
+ Array.isArray(workerNode.info.taskFunctionNames) &&
+ workerNode.info.taskFunctionNames.length > 0
+ ) {
+ return workerNode.info.taskFunctionNames
}
}
return []
}
+ /** @inheritDoc */
+ public async setDefaultTaskFunction (name: string): Promise<boolean> {
+ return await this.sendTaskFunctionOperationToWorkers({
+ taskFunctionOperation: 'default',
+ taskFunctionName: name
+ })
+ }
+
private shallExecuteTask (workerNodeKey: number): boolean {
return (
this.tasksQueueSize(workerNodeKey) === 0 &&
): Promise<Response> {
return await new Promise<Response>((resolve, reject) => {
if (!this.started) {
- reject(new Error('Cannot execute a task on destroyed pool'))
+ reject(new Error('Cannot execute a task on not started pool'))
return
}
if (name != null && typeof name !== 'string') {
data: data ?? ({} as Data),
transferList,
timestamp,
- workerId: this.getWorkerInfo(workerNodeKey).id as number,
taskId: randomUUID()
}
this.promiseResponseMap.set(task.taskId as string, {
})
}
+ /** @inheritdoc */
+ public start (): void {
+ this.starting = true
+ while (
+ this.workerNodes.reduce(
+ (accumulator, workerNode) =>
+ !workerNode.info.dynamic ? accumulator + 1 : accumulator,
+ 0
+ ) < this.numberOfWorkers
+ ) {
+ this.createAndSetupWorkerNode()
+ }
+ this.starting = false
+ this.started = true
+ }
+
/** @inheritDoc */
public async destroy (): Promise<void> {
await Promise.all(
}
protected async sendKillMessageToWorker (
- workerNodeKey: number,
- workerId: number
+ workerNodeKey: number
): Promise<void> {
await new Promise<void>((resolve, reject) => {
this.registerWorkerMessageListener(workerNodeKey, message => {
if (message.kill === 'success') {
resolve()
} else if (message.kill === 'failure') {
- reject(new Error(`Worker ${workerId} kill message handling failed`))
+ reject(
+ new Error(
+ `Worker ${
+ message.workerId as number
+ } kill message handling failed`
+ )
+ )
}
})
- this.sendToWorker(workerNodeKey, { kill: true, workerId })
+ this.sendToWorker(workerNodeKey, { kill: true })
})
}
const workerInfo = this.getWorkerInfo(workerNodeKey)
return (
workerInfo != null &&
- Array.isArray(workerInfo.taskFunctions) &&
- workerInfo.taskFunctions.length > 2
+ Array.isArray(workerInfo.taskFunctionNames) &&
+ workerInfo.taskFunctionNames.length > 2
)
}
) {
--workerTaskStatistics.executing
}
- if (message.taskError == null) {
+ if (message.workerError == null) {
++workerTaskStatistics.executed
} else {
++workerTaskStatistics.failed
workerUsage: WorkerUsage,
message: MessageValue<Response>
): void {
- if (message.taskError != null) {
+ if (message.workerError != null) {
return
}
updateMeasurementStatistics(
workerUsage: WorkerUsage,
message: MessageValue<Response>
): void {
- if (message.taskError != null) {
+ if (message.workerError != null) {
return
}
const eluTaskStatisticsRequirements: MeasurementStatisticsRequirements =
})
const workerInfo = this.getWorkerInfo(workerNodeKey)
this.sendToWorker(workerNodeKey, {
- checkActive: true,
- workerId: workerInfo.id as number
+ checkActive: true
})
+ if (this.taskFunctions.size > 0) {
+ for (const [taskFunctionName, taskFunction] of this.taskFunctions) {
+ this.sendTaskFunctionOperationToWorker(workerNodeKey, {
+ taskFunctionOperation: 'add',
+ taskFunctionName,
+ taskFunction: taskFunction.toString()
+ }).catch(error => {
+ this.emitter?.emit(PoolEvents.error, error)
+ })
+ }
+ }
workerInfo.dynamic = true
if (
this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerReady ||
// Send the statistics message to worker.
this.sendStatisticsMessageToWorker(workerNodeKey)
if (this.opts.enableTasksQueue === true) {
- this.workerNodes[workerNodeKey].onEmptyQueue =
- this.taskStealingOnEmptyQueue.bind(this)
- this.workerNodes[workerNodeKey].onBackPressure =
- this.tasksStealingOnBackPressure.bind(this)
+ if (this.opts.tasksQueueOptions?.taskStealing === true) {
+ this.workerNodes[workerNodeKey].onEmptyQueue =
+ this.taskStealingOnEmptyQueue.bind(this)
+ }
+ if (this.opts.tasksQueueOptions?.tasksStealingOnBackPressure === true) {
+ this.workerNodes[workerNodeKey].onBackPressure =
+ this.tasksStealingOnBackPressure.bind(this)
+ }
}
}
.runTime.aggregate,
elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
.elu.aggregate
- },
- workerId: this.getWorkerInfo(workerNodeKey).id as number
+ }
})
}
},
0
)
- const destinationWorkerNode = this.workerNodes[destinationWorkerNodeKey]
- const task = {
- ...(this.dequeueTask(workerNodeKey) as Task<Data>),
- workerId: destinationWorkerNode.info.id as number
- }
+ const task = this.dequeueTask(workerNodeKey) as Task<Data>
if (this.shallExecuteTask(destinationWorkerNodeKey)) {
this.executeTask(destinationWorkerNodeKey, task)
} else {
private taskStealingOnEmptyQueue (workerId: number): void {
const destinationWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(workerId)
- const destinationWorkerNode = this.workerNodes[destinationWorkerNodeKey]
const workerNodes = this.workerNodes
.slice()
.sort(
workerNode.usage.tasks.queued > 0
)
if (sourceWorkerNode != null) {
- const task = {
- ...(sourceWorkerNode.popTask() as Task<Data>),
- workerId: destinationWorkerNode.info.id as number
- }
+ const task = sourceWorkerNode.popTask() as Task<Data>
if (this.shallExecuteTask(destinationWorkerNodeKey)) {
this.executeTask(destinationWorkerNodeKey, task)
} else {
workerNode.usage.tasks.queued <
(this.opts.tasksQueueOptions?.size as number) - sizeOffset
) {
- const task = {
- ...(sourceWorkerNode.popTask() as Task<Data>),
- workerId: workerNode.info.id as number
- }
+ const task = sourceWorkerNode.popTask() as Task<Data>
if (this.shallExecuteTask(workerNodeKey)) {
this.executeTask(workerNodeKey, task)
} else {
protected workerListener (): (message: MessageValue<Response>) => void {
return message => {
this.checkMessageWorkerId(message)
- if (message.ready != null && message.taskFunctions != null) {
+ if (message.ready != null && message.taskFunctionNames != null) {
// Worker ready response received from worker
this.handleWorkerReadyResponse(message)
} else if (message.taskId != null) {
// Task execution response received from worker
this.handleTaskExecutionResponse(message)
- } else if (message.taskFunctions != null) {
- // Task functions message received from worker
+ } else if (message.taskFunctionNames != null) {
+ // Task function names message received from worker
this.getWorkerInfo(
this.getWorkerNodeKeyByWorkerId(message.workerId)
- ).taskFunctions = message.taskFunctions
+ ).taskFunctionNames = message.taskFunctionNames
}
}
}
private handleWorkerReadyResponse (message: MessageValue<Response>): void {
if (message.ready === false) {
- throw new Error(`Worker ${message.workerId} failed to initialize`)
+ throw new Error(
+ `Worker ${message.workerId as number} failed to initialize`
+ )
}
const workerInfo = this.getWorkerInfo(
this.getWorkerNodeKeyByWorkerId(message.workerId)
)
workerInfo.ready = message.ready as boolean
- workerInfo.taskFunctions = message.taskFunctions
+ workerInfo.taskFunctionNames = message.taskFunctionNames
if (this.emitter != null && this.ready) {
this.emitter.emit(PoolEvents.ready, this.info)
}
}
private handleTaskExecutionResponse (message: MessageValue<Response>): void {
- const { taskId, taskError, data } = message
+ const { taskId, workerError, data } = message
const promiseResponse = this.promiseResponseMap.get(taskId as string)
if (promiseResponse != null) {
- if (taskError != null) {
- this.emitter?.emit(PoolEvents.taskError, taskError)
- promiseResponse.reject(taskError.message)
+ if (workerError != null) {
+ this.emitter?.emit(PoolEvents.taskError, workerError)
+ promiseResponse.reject(workerError.message)
} else {
promiseResponse.resolve(data as Response)
}
import { EventEmitter } from 'node:events'
import { type TransferListItem } from 'node:worker_threads'
+import type { TaskFunction } from '../worker/task-functions'
import type {
ErrorHandler,
ExitHandler,
readonly version: string
readonly type: PoolType
readonly worker: WorkerType
+ readonly started: boolean
readonly ready: boolean
readonly strategy: WorkerChoiceStrategy
readonly minSize: number
* @defaultValue (pool maximum size)^2
*/
readonly size?: number
- /**
- * @deprecated Use `size` instead.
- */
- readonly queueMaxSize?: number
/**
* Maximum number of tasks that can be executed concurrently on a worker node.
*
* @defaultValue 1
*/
readonly concurrency?: number
+ /**
+ * Whether to enable task stealing.
+ *
+ * @defaultValue true
+ */
+ readonly taskStealing?: boolean
+ /**
+ * Whether to enable tasks stealing on back pressure.
+ *
+ * @defaultValue true
+ */
+ readonly tasksStealingOnBackPressure?: boolean
}
/**
* A function that will listen for exit event on each worker.
*/
exitHandler?: ExitHandler<Worker>
+ /**
+ * Whether to start the minimum number of workers at pool initialization.
+ *
+ * @defaultValue false
+ */
+ startWorkers?: boolean
/**
* The worker choice strategy to use in this pool.
*
name?: string,
transferList?: TransferListItem[]
) => Promise<Response>
+ /**
+ * Starts the minimum number of workers in this pool.
+ */
+ readonly start: () => void
/**
* Terminates all workers in this pool.
*/
readonly destroy: () => Promise<void>
+ /**
+ * Whether the specified task function exists in this pool.
+ *
+ * @param name - The name of the task function.
+ * @returns `true` if the task function exists, `false` otherwise.
+ */
+ readonly hasTaskFunction: (name: string) => boolean
+ /**
+ * Adds a task function to this pool.
+ * If a task function with the same name already exists, it will be overwritten.
+ *
+ * @param name - The name of the task function.
+ * @param taskFunction - The task function.
+ * @returns `true` if the task function was added, `false` otherwise.
+ */
+ readonly addTaskFunction: (
+ name: string,
+ taskFunction: TaskFunction<Data, Response>
+ ) => Promise<boolean>
+ /**
+ * Removes a task function from this pool.
+ *
+ * @param name - The name of the task function.
+ * @returns `true` if the task function was removed, `false` otherwise.
+ */
+ readonly removeTaskFunction: (name: string) => Promise<boolean>
/**
* Lists the names of task function available in this pool.
*
* @returns The names of task function available in this pool.
*/
- readonly listTaskFunctions: () => string[]
+ readonly listTaskFunctionNames: () => string[]
+ /**
+ * Sets the default task function in this pool.
+ *
+ * @param name - The name of the task function.
+ * @returns `true` if the default task function was set, `false` otherwise.
+ */
+ readonly setDefaultTaskFunction: (name: string) => Promise<boolean>
/**
* Sets the worker choice strategy in this pool.
*
/** @inheritdoc */
public onEmptyQueue?: WorkerNodeEventCallback
private readonly tasksQueue: Deque<Task<Data>>
+ private onBackPressureStarted: boolean
private onEmptyQueueCount: number
private readonly taskFunctionsUsage: Map<string, WorkerUsage>
}
this.tasksQueueBackPressureSize = tasksQueueBackPressureSize
this.tasksQueue = new Deque<Task<Data>>()
+ this.onBackPressureStarted = false
this.onEmptyQueueCount = 0
this.taskFunctionsUsage = new Map<string, WorkerUsage>()
}
/** @inheritdoc */
public enqueueTask (task: Task<Data>): number {
const tasksQueueSize = this.tasksQueue.push(task)
- if (this.onBackPressure != null && this.hasBackPressure()) {
+ if (
+ this.onBackPressure != null &&
+ this.hasBackPressure() &&
+ !this.onBackPressureStarted
+ ) {
+ this.onBackPressureStarted = true
this.onBackPressure(this.info.id as number)
+ this.onBackPressureStarted = false
}
return tasksQueueSize
}
/** @inheritdoc */
public unshiftTask (task: Task<Data>): number {
const tasksQueueSize = this.tasksQueue.unshift(task)
- if (this.onBackPressure != null && this.hasBackPressure()) {
+ if (
+ this.onBackPressure != null &&
+ this.hasBackPressure() &&
+ !this.onBackPressureStarted
+ ) {
+ this.onBackPressureStarted = true
this.onBackPressure(this.info.id as number)
+ this.onBackPressureStarted = false
}
return tasksQueueSize
}
/** @inheritdoc */
public dequeueTask (): Task<Data> | undefined {
const task = this.tasksQueue.shift()
- if (this.onEmptyQueue != null && this.tasksQueue.size === 0) {
+ if (
+ this.onEmptyQueue != null &&
+ this.tasksQueue.size === 0 &&
+ this.onEmptyQueueCount === 0
+ ) {
this.startOnEmptyQueue().catch(EMPTY_FUNCTION)
}
return task
/** @inheritdoc */
public popTask (): Task<Data> | undefined {
const task = this.tasksQueue.pop()
- if (this.onEmptyQueue != null && this.tasksQueue.size === 0) {
+ if (
+ this.onEmptyQueue != null &&
+ this.tasksQueue.size === 0 &&
+ this.onEmptyQueueCount === 0
+ ) {
this.startOnEmptyQueue().catch(EMPTY_FUNCTION)
}
return task
/** @inheritdoc */
public getTaskFunctionWorkerUsage (name: string): WorkerUsage | undefined {
- if (!Array.isArray(this.info.taskFunctions)) {
+ if (!Array.isArray(this.info.taskFunctionNames)) {
throw new Error(
`Cannot get task function worker usage for task function name '${name}' when task function names list is not yet defined`
)
}
if (
- Array.isArray(this.info.taskFunctions) &&
- this.info.taskFunctions.length < 3
+ Array.isArray(this.info.taskFunctionNames) &&
+ this.info.taskFunctionNames.length < 3
) {
throw new Error(
`Cannot get task function worker usage for task function name '${name}' when task function names list has less than 3 elements`
)
}
if (name === DEFAULT_TASK_NAME) {
- name = this.info.taskFunctions[1]
+ name = this.info.taskFunctionNames[1]
}
if (!this.taskFunctionsUsage.has(name)) {
this.taskFunctionsUsage.set(name, this.initTaskFunctionWorkerUsage(name))
this.onEmptyQueueCount = 0
return
}
- (this.onEmptyQueue as WorkerNodeEventCallback)(this.info.id as number)
++this.onEmptyQueueCount
+ this.onEmptyQueue?.(this.info.id as number)
await sleep(exponentialDelay(this.onEmptyQueueCount))
await this.startOnEmptyQueue()
}
for (const task of this.tasksQueue) {
if (
(task.name === DEFAULT_TASK_NAME &&
- name === (this.info.taskFunctions as string[])[1]) ||
+ name === (this.info.taskFunctionNames as string[])[1]) ||
(task.name !== DEFAULT_TASK_NAME && name === task.name)
) {
++taskFunctionQueueSize
const { DEFAULT_TASK_NAME } = require('../../../lib/utils')
const { version } = require('../../../package.json')
const { waitPoolEvents } = require('../../test-utils')
+ const { WorkerNode } = require('../../../lib/pools/worker-node')
describe('Abstract pool test suite', () => {
const numberOfWorkers = 2
'./tests/worker-files/thread/testWorker.js'
)
expect(pool.emitter).toBeInstanceOf(EventEmitter)
- expect(pool.opts.enableEvents).toBe(true)
- expect(pool.opts.restartWorkerOnError).toBe(true)
- expect(pool.opts.enableTasksQueue).toBe(false)
- expect(pool.opts.tasksQueueOptions).toBeUndefined()
- expect(pool.opts.workerChoiceStrategy).toBe(
- WorkerChoiceStrategies.ROUND_ROBIN
- )
- expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
- retries: 6,
- runTime: { median: false },
- waitTime: { median: false },
- elu: { median: false }
+ expect(pool.opts).toStrictEqual({
+ startWorkers: true,
+ enableEvents: true,
+ restartWorkerOnError: true,
+ enableTasksQueue: false,
+ workerChoiceStrategy: WorkerChoiceStrategies.ROUND_ROBIN,
+ workerChoiceStrategyOptions: {
+ retries: 6,
+ runTime: { median: false },
+ waitTime: { median: false },
+ elu: { median: false }
+ }
})
expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
retries: 6,
elu: { median: false }
})
}
- expect(pool.opts.messageHandler).toBeUndefined()
- expect(pool.opts.errorHandler).toBeUndefined()
- expect(pool.opts.onlineHandler).toBeUndefined()
- expect(pool.opts.exitHandler).toBeUndefined()
await pool.destroy()
const testHandler = () => console.info('test handler executed')
pool = new FixedThreadPool(
}
)
expect(pool.emitter).toBeUndefined()
- expect(pool.opts.enableEvents).toBe(false)
- expect(pool.opts.restartWorkerOnError).toBe(false)
- expect(pool.opts.enableTasksQueue).toBe(true)
- expect(pool.opts.tasksQueueOptions).toStrictEqual({
- concurrency: 2,
- size: 4
- })
- expect(pool.opts.workerChoiceStrategy).toBe(
- WorkerChoiceStrategies.LEAST_USED
- )
- expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
- retries: 6,
- runTime: { median: true },
- waitTime: { median: false },
- elu: { median: false },
- weights: { 0: 300, 1: 200 }
+ expect(pool.opts).toStrictEqual({
+ startWorkers: true,
+ enableEvents: false,
+ restartWorkerOnError: false,
+ enableTasksQueue: true,
+ tasksQueueOptions: {
+ concurrency: 2,
+ size: 4,
+ taskStealing: true,
+ tasksStealingOnBackPressure: true
+ },
+ workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
+ workerChoiceStrategyOptions: {
+ retries: 6,
+ runTime: { median: true },
+ waitTime: { median: false },
+ elu: { median: false },
+ weights: { 0: 300, 1: 200 }
+ },
+ onlineHandler: testHandler,
+ messageHandler: testHandler,
+ errorHandler: testHandler,
+ exitHandler: testHandler
})
expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
retries: 6,
weights: { 0: 300, 1: 200 }
})
}
- expect(pool.opts.messageHandler).toStrictEqual(testHandler)
- expect(pool.opts.errorHandler).toStrictEqual(testHandler)
- expect(pool.opts.onlineHandler).toStrictEqual(testHandler)
- expect(pool.opts.exitHandler).toStrictEqual(testHandler)
await pool.destroy()
})
).toThrowError(
new TypeError('Invalid worker node tasks concurrency: must be an integer')
)
- expect(
- () =>
- new FixedThreadPool(
- numberOfWorkers,
- './tests/worker-files/thread/testWorker.js',
- {
- enableTasksQueue: true,
- tasksQueueOptions: { queueMaxSize: 2 }
- }
- )
- ).toThrowError(
- new Error(
- 'Invalid tasks queue options: queueMaxSize is deprecated, please use size instead'
- )
- )
expect(
() =>
new FixedThreadPool(
expect(pool.opts.enableTasksQueue).toBe(true)
expect(pool.opts.tasksQueueOptions).toStrictEqual({
concurrency: 1,
- size: 4
+ size: 4,
+ taskStealing: true,
+ tasksStealingOnBackPressure: true
})
pool.enableTasksQueue(true, { concurrency: 2 })
expect(pool.opts.enableTasksQueue).toBe(true)
expect(pool.opts.tasksQueueOptions).toStrictEqual({
concurrency: 2,
- size: 4
+ size: 4,
+ taskStealing: true,
+ tasksStealingOnBackPressure: true
})
pool.enableTasksQueue(false)
expect(pool.opts.enableTasksQueue).toBe(false)
)
expect(pool.opts.tasksQueueOptions).toStrictEqual({
concurrency: 1,
- size: 4
+ size: 4,
+ taskStealing: true,
+ tasksStealingOnBackPressure: true
})
pool.setTasksQueueOptions({ concurrency: 2 })
expect(pool.opts.tasksQueueOptions).toStrictEqual({
concurrency: 2,
- size: 4
+ size: 4,
+ taskStealing: true,
+ tasksStealingOnBackPressure: true
})
expect(() =>
pool.setTasksQueueOptions('invalidTasksQueueOptions')
expect(() => pool.setTasksQueueOptions({ concurrency: 0.2 })).toThrowError(
new TypeError('Invalid worker node tasks concurrency: must be an integer')
)
- expect(() => pool.setTasksQueueOptions({ queueMaxSize: 2 })).toThrowError(
- new Error(
- 'Invalid tasks queue options: queueMaxSize is deprecated, please use size instead'
- )
- )
expect(() => pool.setTasksQueueOptions({ size: 0 })).toThrowError(
new RangeError(
'Invalid worker node tasks queue size: 0 is a negative integer or zero'
version,
type: PoolTypes.fixed,
worker: WorkerTypes.thread,
+ started: true,
ready: true,
strategy: WorkerChoiceStrategies.ROUND_ROBIN,
minSize: numberOfWorkers,
version,
type: PoolTypes.dynamic,
worker: WorkerTypes.cluster,
+ started: true,
ready: true,
strategy: WorkerChoiceStrategies.ROUND_ROBIN,
minSize: Math.floor(numberOfWorkers / 2),
'./tests/worker-files/cluster/testWorker.js'
)
for (const workerNode of pool.workerNodes) {
+ expect(workerNode).toBeInstanceOf(WorkerNode)
expect(workerNode.usage).toStrictEqual({
tasks: {
executed: 0,
'./tests/worker-files/cluster/testWorker.js'
)
for (const workerNode of pool.workerNodes) {
- expect(workerNode.tasksQueue).toBeDefined()
+ expect(workerNode).toBeInstanceOf(WorkerNode)
expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
expect(workerNode.tasksQueue.size).toBe(0)
expect(workerNode.tasksQueue.maxSize).toBe(0)
'./tests/worker-files/thread/testWorker.js'
)
for (const workerNode of pool.workerNodes) {
- expect(workerNode.tasksQueue).toBeDefined()
+ expect(workerNode).toBeInstanceOf(WorkerNode)
expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
expect(workerNode.tasksQueue.size).toBe(0)
expect(workerNode.tasksQueue.maxSize).toBe(0)
'./tests/worker-files/cluster/testWorker.js'
)
for (const workerNode of pool.workerNodes) {
+ expect(workerNode).toBeInstanceOf(WorkerNode)
expect(workerNode.info).toStrictEqual({
id: expect.any(Number),
type: WorkerTypes.cluster,
'./tests/worker-files/thread/testWorker.js'
)
for (const workerNode of pool.workerNodes) {
+ expect(workerNode).toBeInstanceOf(WorkerNode)
expect(workerNode.info).toStrictEqual({
id: expect.any(Number),
type: WorkerTypes.thread,
await pool.destroy()
})
+ it('Verify that pool can be started after initialization', async () => {
+ const pool = new FixedClusterPool(
+ numberOfWorkers,
+ './tests/worker-files/cluster/testWorker.js',
+ {
+ startWorkers: false
+ }
+ )
+ expect(pool.info.started).toBe(false)
+ expect(pool.info.ready).toBe(false)
+ expect(pool.workerNodes).toStrictEqual([])
+ await expect(pool.execute()).rejects.toThrowError(
+ new Error('Cannot execute a task on not started pool')
+ )
+ pool.start()
+ expect(pool.info.started).toBe(true)
+ expect(pool.info.ready).toBe(true)
+ expect(pool.workerNodes.length).toBe(numberOfWorkers)
+ for (const workerNode of pool.workerNodes) {
+ expect(workerNode).toBeInstanceOf(WorkerNode)
+ }
+ await pool.destroy()
+ })
+
it('Verify that pool execute() arguments are checked', async () => {
const pool = new FixedClusterPool(
numberOfWorkers,
"Task function 'unknown' not found"
)
await pool.destroy()
- await expect(pool.execute(undefined, undefined, {})).rejects.toThrowError(
- new Error('Cannot execute a task on destroyed pool')
+ await expect(pool.execute()).rejects.toThrowError(
+ new Error('Cannot execute a task on not started pool')
)
})
version,
type: PoolTypes.dynamic,
worker: WorkerTypes.cluster,
+ started: true,
ready: true,
strategy: WorkerChoiceStrategies.ROUND_ROBIN,
minSize: expect.any(Number),
version,
type: PoolTypes.fixed,
worker: WorkerTypes.thread,
- ready: expect.any(Boolean),
+ started: true,
+ ready: true,
strategy: WorkerChoiceStrategies.ROUND_ROBIN,
minSize: expect.any(Number),
maxSize: expect.any(Number),
version,
type: PoolTypes.dynamic,
worker: WorkerTypes.thread,
- ready: expect.any(Boolean),
+ started: true,
+ ready: true,
strategy: WorkerChoiceStrategies.ROUND_ROBIN,
minSize: expect.any(Number),
maxSize: expect.any(Number),
version,
type: PoolTypes.fixed,
worker: WorkerTypes.thread,
- ready: expect.any(Boolean),
+ started: true,
+ ready: true,
strategy: WorkerChoiceStrategies.ROUND_ROBIN,
minSize: expect.any(Number),
maxSize: expect.any(Number),
'./tests/worker-files/thread/testMultipleTaskFunctionsWorker.js'
)
await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
- expect(dynamicThreadPool.listTaskFunctions()).toStrictEqual([
+ expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
DEFAULT_TASK_NAME,
'jsonIntegerSerialization',
'factorial',
'./tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
)
await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
- expect(fixedClusterPool.listTaskFunctions()).toStrictEqual([
+ expect(fixedClusterPool.listTaskFunctionNames()).toStrictEqual([
DEFAULT_TASK_NAME,
'jsonIntegerSerialization',
'factorial',
expect(pool.info.executingTasks).toBe(0)
expect(pool.info.executedTasks).toBe(4)
for (const workerNode of pool.workerNodes) {
- expect(workerNode.info.taskFunctions).toStrictEqual([
+ expect(workerNode.info.taskFunctionNames).toStrictEqual([
DEFAULT_TASK_NAME,
'jsonIntegerSerialization',
'factorial',
'fibonacci'
])
expect(workerNode.taskFunctionsUsage.size).toBe(3)
- for (const name of pool.listTaskFunctions()) {
+ for (const name of pool.listTaskFunctionNames()) {
expect(workerNode.getTaskFunctionWorkerUsage(name)).toStrictEqual({
tasks: {
executed: expect.any(Number),
expect(
workerNode.getTaskFunctionWorkerUsage(DEFAULT_TASK_NAME)
).toStrictEqual(
- workerNode.getTaskFunctionWorkerUsage(workerNode.info.taskFunctions[1])
+ workerNode.getTaskFunctionWorkerUsage(
+ workerNode.info.taskFunctionNames[1]
+ )
)
}
await pool.destroy()