### Added
+- Add `startWorkers` to pool options to whether start the minimum number of workers at pool creation or not.
+- Add `tasksStealing` and `tasksStealingOnPressure` to tasks queue options to whether enable tasks stealing or not and whether enable tasks stealing on back pressure or not.
- Continuous internal benchmarking: https://poolifier.github.io/benchmark-results/dev/bench.
## [2.6.44] - 2023-09-08
runPoolifierTest
} from '../benchmarks-utils.mjs'
+const poolifierSuite = new Benchmark.Suite('Poolifier', {
+ onCycle: event => {
+ console.info(event.target.toString())
+ },
+ onComplete: function () {
+ console.info(
+ 'Fastest is ' + LIST_FORMATTER.format(this.filter('fastest').map('name'))
+ )
+ }
+})
+
const poolSize = availableParallelism()
-const pools = []
+const benchmarkSettings = []
for (const poolType of Object.values(PoolTypes)) {
for (const workerType of Object.values(WorkerTypes)) {
if (workerType === WorkerTypes.cluster) {
continue
}
for (const workerChoiceStrategy of Object.values(WorkerChoiceStrategies)) {
- for (const enableTasksQueue of [false]) {
+ for (const enableTasksQueue of [false, true]) {
if (workerChoiceStrategy === WorkerChoiceStrategies.FAIR_SHARE) {
for (const measurement of [Measurements.runTime, Measurements.elu]) {
- pools.push([
+ benchmarkSettings.push([
`${poolType}|${workerType}|${workerChoiceStrategy}|tasks queue:${enableTasksQueue}|measurement:${measurement}`,
- buildPoolifierPool(workerType, poolType, poolSize, {
+ workerType,
+ poolType,
+ poolSize,
+ {
workerChoiceStrategy,
workerChoiceStrategyOptions: {
measurement
},
enableTasksQueue
- })
+ }
])
}
} else {
- pools.push([
+ benchmarkSettings.push([
`${poolType}|${workerType}|${workerChoiceStrategy}|tasks queue:${enableTasksQueue}`,
- buildPoolifierPool(workerType, poolType, poolSize, {
+ workerType,
+ poolType,
+ poolSize,
+ {
workerChoiceStrategy,
enableTasksQueue
- })
+ }
])
}
}
taskSize: 100
}
-const suite = new Benchmark.Suite('Poolifier')
-for (const [name, pool] of pools) {
- suite.add(name, async () => {
+for (const [
+ name,
+ workerType,
+ poolType,
+ poolSize,
+ poolOptions
+] of benchmarkSettings) {
+ poolifierSuite.add(name, async () => {
+ const pool = buildPoolifierPool(workerType, poolType, poolSize, poolOptions)
await runPoolifierTest(pool, {
taskExecutions,
workerData
})
+ await pool.destroy()
})
}
-suite
- .on('cycle', event => {
- console.info(event.target.toString())
- })
- .on('complete', function () {
- console.info(
- 'Fastest is ' + LIST_FORMATTER.format(this.filter('fastest').map('name'))
- )
- // eslint-disable-next-line n/no-process-exit
- process.exit()
- })
- .run({ async: true, maxTime: 120 })
+poolifierSuite.run({ async: true })
- [`pool = new FixedThreadPool/FixedClusterPool(numberOfThreads/numberOfWorkers, filePath, opts)`](#pool--new-fixedthreadpoolfixedclusterpoolnumberofthreadsnumberofworkers-filepath-opts)
- [`pool = new DynamicThreadPool/DynamicClusterPool(min, max, filePath, opts)`](#pool--new-dynamicthreadpooldynamicclusterpoolmin-max-filepath-opts)
- [`pool.execute(data, name, transferList)`](#poolexecutedata-name-transferlist)
+ - [`pool.start()`](#poolstart)
- [`pool.destroy()`](#pooldestroy)
- [`pool.listTaskFunctions()`](#poollisttaskfunctions)
- [`PoolOptions`](#pooloptions)
This method is available on both pool implementations and returns a promise with the task function execution response.
+### `pool.start()`
+
+This method is available on both pool implementations and will start the minimum number of workers.
+
### `pool.destroy()`
This method is available on both pool implementations and will call the terminate method on each worker.
- `messageHandler` (optional) - A function that will listen for message event on each worker
- `errorHandler` (optional) - A function that will listen for error event on each worker
- `exitHandler` (optional) - A function that will listen for exit event on each worker
+- `startWorkers` (optional) - Start the minimum number of workers at pool creation.
+ Default: `true`
- `workerChoiceStrategy` (optional) - The worker choice strategy to use in this pool:
- `WorkerChoiceStrategies.ROUND_ROBIN`: Submit tasks to worker in a round robin fashion
- `size` (optional) - The maximum number of tasks that can be queued on a worker before flagging it as back pressured. It must be a positive integer.
- `concurrency` (optional) - The maximum number of tasks that can be executed concurrently on a worker. It must be a positive integer.
+ - `tasksStealing` (optional) - Tasks stealing enablement.
+ - `tasksStealingOnBackPressure` (optional) - Tasks stealing enablement on back pressure.
- Default: `{ size: (pool maximum size)^2, concurrency: 1 }`
+ Default: `{ size: (pool maximum size)^2, concurrency: 1, tasksStealing: true, tasksStealingOnBackPressure: true }`
#### `ThreadPoolOptions extends PoolOptions`
*/
protected readonly max?: number
- /**
- * Whether the pool is starting or not.
- */
- private readonly starting: boolean
/**
* Whether the pool is started or not.
*/
private started: boolean
+ /**
+ * Whether the pool is starting or not.
+ */
+ private starting: boolean
/**
* The start timestamp of the pool.
*/
this.setupHook()
- this.starting = true
- this.startPool()
+ this.started = false
this.starting = false
- this.started = true
+ if (this.opts.startWorkers === true) {
+ this.start()
+ }
this.startTimestamp = performance.now()
}
private checkPoolOptions (opts: PoolOptions<Worker>): void {
if (isPlainObject(opts)) {
+ this.opts.startWorkers = opts.startWorkers ?? true
this.opts.workerChoiceStrategy =
opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
`Invalid worker node tasks concurrency: ${tasksQueueOptions?.concurrency} is a negative integer or zero`
)
}
- if (tasksQueueOptions?.queueMaxSize != null) {
- throw new Error(
- 'Invalid tasks queue options: queueMaxSize is deprecated, please use size instead'
- )
- }
if (
tasksQueueOptions?.size != null &&
!Number.isSafeInteger(tasksQueueOptions?.size)
}
}
- private startPool (): void {
- while (
- this.workerNodes.reduce(
- (accumulator, workerNode) =>
- !workerNode.info.dynamic ? accumulator + 1 : accumulator,
- 0
- ) < this.numberOfWorkers
- ) {
- this.createAndSetupWorkerNode()
- }
- }
-
/** @inheritDoc */
public get info (): PoolInfo {
return {
version,
type: this.type,
worker: this.worker,
+ started: this.started,
ready: this.ready,
strategy: this.opts.workerChoiceStrategy as WorkerChoiceStrategy,
minSize: this.minSize,
return {
...{
size: Math.pow(this.maxSize, 2),
- concurrency: 1
+ concurrency: 1,
+ tasksStealing: true,
+ tasksStealingOnBackPressure: true
},
...tasksQueueOptions
}
): Promise<Response> {
return await new Promise<Response>((resolve, reject) => {
if (!this.started) {
- reject(new Error('Cannot execute a task on destroyed pool'))
+ reject(new Error('Cannot execute a task on not started pool'))
return
}
if (name != null && typeof name !== 'string') {
})
}
+ /** @inheritdoc */
+ public start (): void {
+ this.starting = true
+ while (
+ this.workerNodes.reduce(
+ (accumulator, workerNode) =>
+ !workerNode.info.dynamic ? accumulator + 1 : accumulator,
+ 0
+ ) < this.numberOfWorkers
+ ) {
+ this.createAndSetupWorkerNode()
+ }
+ this.starting = false
+ this.started = true
+ }
+
/** @inheritDoc */
public async destroy (): Promise<void> {
await Promise.all(
// Send the statistics message to worker.
this.sendStatisticsMessageToWorker(workerNodeKey)
if (this.opts.enableTasksQueue === true) {
- this.workerNodes[workerNodeKey].onEmptyQueue =
- this.taskStealingOnEmptyQueue.bind(this)
- this.workerNodes[workerNodeKey].onBackPressure =
- this.tasksStealingOnBackPressure.bind(this)
+ if (this.opts.tasksQueueOptions?.tasksStealing === true) {
+ this.workerNodes[workerNodeKey].onEmptyQueue =
+ this.taskStealingOnEmptyQueue.bind(this)
+ }
+ if (this.opts.tasksQueueOptions?.tasksStealingOnBackPressure === true) {
+ this.workerNodes[workerNodeKey].onBackPressure =
+ this.tasksStealingOnBackPressure.bind(this)
+ }
}
}
readonly version: string
readonly type: PoolType
readonly worker: WorkerType
+ readonly started: boolean
readonly ready: boolean
readonly strategy: WorkerChoiceStrategy
readonly minSize: number
* @defaultValue (pool maximum size)^2
*/
readonly size?: number
- /**
- * @deprecated Use `size` instead.
- */
- readonly queueMaxSize?: number
/**
* Maximum number of tasks that can be executed concurrently on a worker node.
*
* @defaultValue 1
*/
readonly concurrency?: number
+ /**
+ * Whether to enable tasks stealing.
+ *
+ * @defaultValue true
+ */
+ readonly tasksStealing?: boolean
+ /**
+ * Whether to enable tasks stealing on back pressure.
+ *
+ * @defaultValue true
+ */
+ readonly tasksStealingOnBackPressure?: boolean
}
/**
* A function that will listen for exit event on each worker.
*/
exitHandler?: ExitHandler<Worker>
+ /**
+ * Whether to start the minimum number of workers at pool initialization.
+ *
+ * @defaultValue false
+ */
+ startWorkers?: boolean
/**
* The worker choice strategy to use in this pool.
*
name?: string,
transferList?: TransferListItem[]
) => Promise<Response>
+ /**
+ * Starts the minimum number of workers in this pool.
+ */
+ readonly start: () => void
/**
* Terminates all workers in this pool.
*/
/** @inheritdoc */
public onEmptyQueue?: WorkerNodeEventCallback
private readonly tasksQueue: Deque<Task<Data>>
+ private onBackPressureStarted: boolean
private onEmptyQueueCount: number
private readonly taskFunctionsUsage: Map<string, WorkerUsage>
}
this.tasksQueueBackPressureSize = tasksQueueBackPressureSize
this.tasksQueue = new Deque<Task<Data>>()
+ this.onBackPressureStarted = false
this.onEmptyQueueCount = 0
this.taskFunctionsUsage = new Map<string, WorkerUsage>()
}
/** @inheritdoc */
public enqueueTask (task: Task<Data>): number {
const tasksQueueSize = this.tasksQueue.push(task)
- if (this.onBackPressure != null && this.hasBackPressure()) {
+ if (
+ this.onBackPressure != null &&
+ this.hasBackPressure() &&
+ !this.onBackPressureStarted
+ ) {
+ this.onBackPressureStarted = true
this.onBackPressure(this.info.id as number)
+ this.onBackPressureStarted = false
}
return tasksQueueSize
}
/** @inheritdoc */
public unshiftTask (task: Task<Data>): number {
const tasksQueueSize = this.tasksQueue.unshift(task)
- if (this.onBackPressure != null && this.hasBackPressure()) {
+ if (
+ this.onBackPressure != null &&
+ this.hasBackPressure() &&
+ !this.onBackPressureStarted
+ ) {
+ this.onBackPressureStarted = true
this.onBackPressure(this.info.id as number)
+ this.onBackPressureStarted = false
}
return tasksQueueSize
}
/** @inheritdoc */
public dequeueTask (): Task<Data> | undefined {
const task = this.tasksQueue.shift()
- if (this.onEmptyQueue != null && this.tasksQueue.size === 0) {
+ if (
+ this.onEmptyQueue != null &&
+ this.tasksQueue.size === 0 &&
+ this.onEmptyQueueCount === 0
+ ) {
this.startOnEmptyQueue().catch(EMPTY_FUNCTION)
}
return task
/** @inheritdoc */
public popTask (): Task<Data> | undefined {
const task = this.tasksQueue.pop()
- if (this.onEmptyQueue != null && this.tasksQueue.size === 0) {
+ if (
+ this.onEmptyQueue != null &&
+ this.tasksQueue.size === 0 &&
+ this.onEmptyQueueCount === 0
+ ) {
this.startOnEmptyQueue().catch(EMPTY_FUNCTION)
}
return task
this.onEmptyQueueCount = 0
return
}
- (this.onEmptyQueue as WorkerNodeEventCallback)(this.info.id as number)
++this.onEmptyQueueCount
+ this.onEmptyQueue?.(this.info.id as number)
await sleep(exponentialDelay(this.onEmptyQueueCount))
await this.startOnEmptyQueue()
}
const { DEFAULT_TASK_NAME } = require('../../../lib/utils')
const { version } = require('../../../package.json')
const { waitPoolEvents } = require('../../test-utils')
+const { WorkerNode } = require('../../../lib/pools/worker-node')
describe('Abstract pool test suite', () => {
const numberOfWorkers = 2
'./tests/worker-files/thread/testWorker.js'
)
expect(pool.emitter).toBeInstanceOf(EventEmitter)
- expect(pool.opts.enableEvents).toBe(true)
- expect(pool.opts.restartWorkerOnError).toBe(true)
- expect(pool.opts.enableTasksQueue).toBe(false)
- expect(pool.opts.tasksQueueOptions).toBeUndefined()
- expect(pool.opts.workerChoiceStrategy).toBe(
- WorkerChoiceStrategies.ROUND_ROBIN
- )
- expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
- retries: 6,
- runTime: { median: false },
- waitTime: { median: false },
- elu: { median: false }
+ expect(pool.opts).toStrictEqual({
+ startWorkers: true,
+ enableEvents: true,
+ restartWorkerOnError: true,
+ enableTasksQueue: false,
+ workerChoiceStrategy: WorkerChoiceStrategies.ROUND_ROBIN,
+ workerChoiceStrategyOptions: {
+ retries: 6,
+ runTime: { median: false },
+ waitTime: { median: false },
+ elu: { median: false }
+ }
})
expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
retries: 6,
elu: { median: false }
})
}
- expect(pool.opts.messageHandler).toBeUndefined()
- expect(pool.opts.errorHandler).toBeUndefined()
- expect(pool.opts.onlineHandler).toBeUndefined()
- expect(pool.opts.exitHandler).toBeUndefined()
await pool.destroy()
const testHandler = () => console.info('test handler executed')
pool = new FixedThreadPool(
}
)
expect(pool.emitter).toBeUndefined()
- expect(pool.opts.enableEvents).toBe(false)
- expect(pool.opts.restartWorkerOnError).toBe(false)
- expect(pool.opts.enableTasksQueue).toBe(true)
- expect(pool.opts.tasksQueueOptions).toStrictEqual({
- concurrency: 2,
- size: 4
- })
- expect(pool.opts.workerChoiceStrategy).toBe(
- WorkerChoiceStrategies.LEAST_USED
- )
- expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
- retries: 6,
- runTime: { median: true },
- waitTime: { median: false },
- elu: { median: false },
- weights: { 0: 300, 1: 200 }
+ expect(pool.opts).toStrictEqual({
+ startWorkers: true,
+ enableEvents: false,
+ restartWorkerOnError: false,
+ enableTasksQueue: true,
+ tasksQueueOptions: {
+ concurrency: 2,
+ size: 4,
+ tasksStealing: true,
+ tasksStealingOnBackPressure: true
+ },
+ workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
+ workerChoiceStrategyOptions: {
+ retries: 6,
+ runTime: { median: true },
+ waitTime: { median: false },
+ elu: { median: false },
+ weights: { 0: 300, 1: 200 }
+ },
+ onlineHandler: testHandler,
+ messageHandler: testHandler,
+ errorHandler: testHandler,
+ exitHandler: testHandler
})
expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
retries: 6,
weights: { 0: 300, 1: 200 }
})
}
- expect(pool.opts.messageHandler).toStrictEqual(testHandler)
- expect(pool.opts.errorHandler).toStrictEqual(testHandler)
- expect(pool.opts.onlineHandler).toStrictEqual(testHandler)
- expect(pool.opts.exitHandler).toStrictEqual(testHandler)
await pool.destroy()
})
).toThrowError(
new TypeError('Invalid worker node tasks concurrency: must be an integer')
)
- expect(
- () =>
- new FixedThreadPool(
- numberOfWorkers,
- './tests/worker-files/thread/testWorker.js',
- {
- enableTasksQueue: true,
- tasksQueueOptions: { queueMaxSize: 2 }
- }
- )
- ).toThrowError(
- new Error(
- 'Invalid tasks queue options: queueMaxSize is deprecated, please use size instead'
- )
- )
expect(
() =>
new FixedThreadPool(
expect(pool.opts.enableTasksQueue).toBe(true)
expect(pool.opts.tasksQueueOptions).toStrictEqual({
concurrency: 1,
- size: 4
+ size: 4,
+ tasksStealing: true,
+ tasksStealingOnBackPressure: true
})
pool.enableTasksQueue(true, { concurrency: 2 })
expect(pool.opts.enableTasksQueue).toBe(true)
expect(pool.opts.tasksQueueOptions).toStrictEqual({
concurrency: 2,
- size: 4
+ size: 4,
+ tasksStealing: true,
+ tasksStealingOnBackPressure: true
})
pool.enableTasksQueue(false)
expect(pool.opts.enableTasksQueue).toBe(false)
)
expect(pool.opts.tasksQueueOptions).toStrictEqual({
concurrency: 1,
- size: 4
+ size: 4,
+ tasksStealing: true,
+ tasksStealingOnBackPressure: true
})
pool.setTasksQueueOptions({ concurrency: 2 })
expect(pool.opts.tasksQueueOptions).toStrictEqual({
concurrency: 2,
- size: 4
+ size: 4,
+ tasksStealing: true,
+ tasksStealingOnBackPressure: true
})
expect(() =>
pool.setTasksQueueOptions('invalidTasksQueueOptions')
expect(() => pool.setTasksQueueOptions({ concurrency: 0.2 })).toThrowError(
new TypeError('Invalid worker node tasks concurrency: must be an integer')
)
- expect(() => pool.setTasksQueueOptions({ queueMaxSize: 2 })).toThrowError(
- new Error(
- 'Invalid tasks queue options: queueMaxSize is deprecated, please use size instead'
- )
- )
expect(() => pool.setTasksQueueOptions({ size: 0 })).toThrowError(
new RangeError(
'Invalid worker node tasks queue size: 0 is a negative integer or zero'
version,
type: PoolTypes.fixed,
worker: WorkerTypes.thread,
+ started: true,
ready: true,
strategy: WorkerChoiceStrategies.ROUND_ROBIN,
minSize: numberOfWorkers,
version,
type: PoolTypes.dynamic,
worker: WorkerTypes.cluster,
+ started: true,
ready: true,
strategy: WorkerChoiceStrategies.ROUND_ROBIN,
minSize: Math.floor(numberOfWorkers / 2),
'./tests/worker-files/cluster/testWorker.js'
)
for (const workerNode of pool.workerNodes) {
+ expect(workerNode).toBeInstanceOf(WorkerNode)
expect(workerNode.usage).toStrictEqual({
tasks: {
executed: 0,
'./tests/worker-files/cluster/testWorker.js'
)
for (const workerNode of pool.workerNodes) {
- expect(workerNode.tasksQueue).toBeDefined()
+ expect(workerNode).toBeInstanceOf(WorkerNode)
expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
expect(workerNode.tasksQueue.size).toBe(0)
expect(workerNode.tasksQueue.maxSize).toBe(0)
'./tests/worker-files/thread/testWorker.js'
)
for (const workerNode of pool.workerNodes) {
- expect(workerNode.tasksQueue).toBeDefined()
+ expect(workerNode).toBeInstanceOf(WorkerNode)
expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
expect(workerNode.tasksQueue.size).toBe(0)
expect(workerNode.tasksQueue.maxSize).toBe(0)
'./tests/worker-files/cluster/testWorker.js'
)
for (const workerNode of pool.workerNodes) {
+ expect(workerNode).toBeInstanceOf(WorkerNode)
expect(workerNode.info).toStrictEqual({
id: expect.any(Number),
type: WorkerTypes.cluster,
'./tests/worker-files/thread/testWorker.js'
)
for (const workerNode of pool.workerNodes) {
+ expect(workerNode).toBeInstanceOf(WorkerNode)
expect(workerNode.info).toStrictEqual({
id: expect.any(Number),
type: WorkerTypes.thread,
await pool.destroy()
})
+ it('Verify that pool can be started after initialization', async () => {
+ const pool = new FixedClusterPool(
+ numberOfWorkers,
+ './tests/worker-files/cluster/testWorker.js',
+ {
+ startWorkers: false
+ }
+ )
+ expect(pool.info.started).toBe(false)
+ expect(pool.info.ready).toBe(false)
+ expect(pool.workerNodes).toStrictEqual([])
+ await expect(pool.execute()).rejects.toThrowError(
+ new Error('Cannot execute a task on not started pool')
+ )
+ pool.start()
+ expect(pool.info.started).toBe(true)
+ expect(pool.info.ready).toBe(true)
+ expect(pool.workerNodes.length).toBe(numberOfWorkers)
+ for (const workerNode of pool.workerNodes) {
+ expect(workerNode).toBeInstanceOf(WorkerNode)
+ }
+ await pool.destroy()
+ })
+
it('Verify that pool execute() arguments are checked', async () => {
const pool = new FixedClusterPool(
numberOfWorkers,
"Task function 'unknown' not found"
)
await pool.destroy()
- await expect(pool.execute(undefined, undefined, {})).rejects.toThrowError(
- new Error('Cannot execute a task on destroyed pool')
+ await expect(pool.execute()).rejects.toThrowError(
+ new Error('Cannot execute a task on not started pool')
)
})
version,
type: PoolTypes.dynamic,
worker: WorkerTypes.cluster,
+ started: true,
ready: true,
strategy: WorkerChoiceStrategies.ROUND_ROBIN,
minSize: expect.any(Number),
version,
type: PoolTypes.fixed,
worker: WorkerTypes.thread,
- ready: expect.any(Boolean),
+ started: true,
+ ready: true,
strategy: WorkerChoiceStrategies.ROUND_ROBIN,
minSize: expect.any(Number),
maxSize: expect.any(Number),
version,
type: PoolTypes.dynamic,
worker: WorkerTypes.thread,
- ready: expect.any(Boolean),
+ started: true,
+ ready: true,
strategy: WorkerChoiceStrategies.ROUND_ROBIN,
minSize: expect.any(Number),
maxSize: expect.any(Number),
version,
type: PoolTypes.fixed,
worker: WorkerTypes.thread,
- ready: expect.any(Boolean),
+ started: true,
+ ready: true,
strategy: WorkerChoiceStrategies.ROUND_ROBIN,
minSize: expect.any(Number),
maxSize: expect.any(Number),