enableTasksQueue: true,
tasksQueueOptions: {
concurrency: 2,
- size: 4,
+ size: Math.pow(numberOfWorkers, 2),
taskStealing: true,
tasksStealingOnBackPressure: true
},
expect(pool.opts.enableTasksQueue).toBe(true)
expect(pool.opts.tasksQueueOptions).toStrictEqual({
concurrency: 1,
- size: 4,
+ size: Math.pow(numberOfWorkers, 2),
taskStealing: true,
tasksStealingOnBackPressure: true
})
expect(pool.opts.enableTasksQueue).toBe(true)
expect(pool.opts.tasksQueueOptions).toStrictEqual({
concurrency: 2,
- size: 4,
+ size: Math.pow(numberOfWorkers, 2),
taskStealing: true,
tasksStealingOnBackPressure: true
})
)
expect(pool.opts.tasksQueueOptions).toStrictEqual({
concurrency: 1,
- size: 4,
+ size: Math.pow(numberOfWorkers, 2),
taskStealing: true,
tasksStealingOnBackPressure: true
})
for (const workerNode of pool.workerNodes) {
+ expect(workerNode.tasksQueueBackPressureSize).toBe(
+ pool.opts.tasksQueueOptions.size
+ )
expect(workerNode.onEmptyQueue).toBeInstanceOf(Function)
expect(workerNode.onBackPressure).toBeInstanceOf(Function)
}
pool.setTasksQueueOptions({
concurrency: 2,
+ size: 2,
taskStealing: false,
tasksStealingOnBackPressure: false
})
expect(pool.opts.tasksQueueOptions).toStrictEqual({
concurrency: 2,
- size: 4,
+ size: 2,
taskStealing: false,
tasksStealingOnBackPressure: false
})
for (const workerNode of pool.workerNodes) {
+ expect(workerNode.tasksQueueBackPressureSize).toBe(
+ pool.opts.tasksQueueOptions.size
+ )
expect(workerNode.onEmptyQueue).toBeUndefined()
expect(workerNode.onBackPressure).toBeUndefined()
}
})
expect(pool.opts.tasksQueueOptions).toStrictEqual({
concurrency: 1,
- size: 4,
+ size: Math.pow(numberOfWorkers, 2),
taskStealing: true,
tasksStealingOnBackPressure: true
})
for (const workerNode of pool.workerNodes) {
+ expect(workerNode.tasksQueueBackPressureSize).toBe(
+ pool.opts.tasksQueueOptions.size
+ )
expect(workerNode.onEmptyQueue).toBeInstanceOf(Function)
expect(workerNode.onBackPressure).toBeInstanceOf(Function)
}
await pool.destroy()
})
- it('Verify that listTaskFunctions() is working', async () => {
+ it('Verify that hasTaskFunction() is working', async () => {
+ const dynamicThreadPool = new DynamicThreadPool(
+ Math.floor(numberOfWorkers / 2),
+ numberOfWorkers,
+ './tests/worker-files/thread/testMultipleTaskFunctionsWorker.js'
+ )
+ await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
+ expect(dynamicThreadPool.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true)
+ expect(dynamicThreadPool.hasTaskFunction('jsonIntegerSerialization')).toBe(
+ true
+ )
+ expect(dynamicThreadPool.hasTaskFunction('factorial')).toBe(true)
+ expect(dynamicThreadPool.hasTaskFunction('fibonacci')).toBe(true)
+ expect(dynamicThreadPool.hasTaskFunction('unknown')).toBe(false)
+ await dynamicThreadPool.destroy()
+ const fixedClusterPool = new FixedClusterPool(
+ numberOfWorkers,
+ './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
+ )
+ await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
+ expect(fixedClusterPool.hasTaskFunction(DEFAULT_TASK_NAME)).toBe(true)
+ expect(fixedClusterPool.hasTaskFunction('jsonIntegerSerialization')).toBe(
+ true
+ )
+ expect(fixedClusterPool.hasTaskFunction('factorial')).toBe(true)
+ expect(fixedClusterPool.hasTaskFunction('fibonacci')).toBe(true)
+ expect(fixedClusterPool.hasTaskFunction('unknown')).toBe(false)
+ await fixedClusterPool.destroy()
+ })
+
+ it('Verify that addTaskFunction() is working', async () => {
+ const dynamicThreadPool = new DynamicThreadPool(
+ Math.floor(numberOfWorkers / 2),
+ numberOfWorkers,
+ './tests/worker-files/thread/testWorker.js'
+ )
+ await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
+ await expect(
+ dynamicThreadPool.addTaskFunction(0, () => {})
+ ).rejects.toThrowError(new TypeError('name argument must be a string'))
+ await expect(
+ dynamicThreadPool.addTaskFunction('', () => {})
+ ).rejects.toThrowError(
+ new TypeError('name argument must not be an empty string')
+ )
+ await expect(
+ dynamicThreadPool.addTaskFunction('test', 0)
+ ).rejects.toThrowError(new TypeError('fn argument must be a function'))
+ await expect(
+ dynamicThreadPool.addTaskFunction('test', '')
+ ).rejects.toThrowError(new TypeError('fn argument must be a function'))
+ expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
+ DEFAULT_TASK_NAME,
+ 'test'
+ ])
+ const echoTaskFunction = data => {
+ return data
+ }
+ await expect(
+ dynamicThreadPool.addTaskFunction('echo', echoTaskFunction)
+ ).resolves.toBe(true)
+ expect(dynamicThreadPool.taskFunctions.size).toBe(1)
+ expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual(
+ echoTaskFunction
+ )
+ expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
+ DEFAULT_TASK_NAME,
+ 'test',
+ 'echo'
+ ])
+ const taskFunctionData = { test: 'test' }
+ const echoResult = await dynamicThreadPool.execute(taskFunctionData, 'echo')
+ expect(echoResult).toStrictEqual(taskFunctionData)
+ await dynamicThreadPool.destroy()
+ })
+
+ it('Verify that removeTaskFunction() is working', async () => {
+ const dynamicThreadPool = new DynamicThreadPool(
+ Math.floor(numberOfWorkers / 2),
+ numberOfWorkers,
+ './tests/worker-files/thread/testWorker.js'
+ )
+ await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
+ expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
+ DEFAULT_TASK_NAME,
+ 'test'
+ ])
+ await expect(
+ dynamicThreadPool.removeTaskFunction('test')
+ ).rejects.toThrowError(
+ new Error('Cannot remove a task function not handled on the pool side')
+ )
+ const echoTaskFunction = data => {
+ return data
+ }
+ await dynamicThreadPool.addTaskFunction('echo', echoTaskFunction)
+ expect(dynamicThreadPool.taskFunctions.size).toBe(1)
+ expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual(
+ echoTaskFunction
+ )
+ expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
+ DEFAULT_TASK_NAME,
+ 'test',
+ 'echo'
+ ])
+ await expect(dynamicThreadPool.removeTaskFunction('echo')).resolves.toBe(
+ true
+ )
+ expect(dynamicThreadPool.taskFunctions.size).toBe(0)
+ expect(dynamicThreadPool.taskFunctions.get('echo')).toBeUndefined()
+ expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
+ DEFAULT_TASK_NAME,
+ 'test'
+ ])
+ await dynamicThreadPool.destroy()
+ })
+
+ it('Verify that listTaskFunctionNames() is working', async () => {
const dynamicThreadPool = new DynamicThreadPool(
Math.floor(numberOfWorkers / 2),
numberOfWorkers,
'./tests/worker-files/thread/testMultipleTaskFunctionsWorker.js'
)
await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
- expect(dynamicThreadPool.listTaskFunctions()).toStrictEqual([
+ expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
DEFAULT_TASK_NAME,
'jsonIntegerSerialization',
'factorial',
'fibonacci'
])
+ await dynamicThreadPool.destroy()
const fixedClusterPool = new FixedClusterPool(
numberOfWorkers,
'./tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
)
await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
- expect(fixedClusterPool.listTaskFunctions()).toStrictEqual([
+ expect(fixedClusterPool.listTaskFunctionNames()).toStrictEqual([
DEFAULT_TASK_NAME,
'jsonIntegerSerialization',
'factorial',
'fibonacci'
])
- await dynamicThreadPool.destroy()
await fixedClusterPool.destroy()
})
+ it('Verify that setDefaultTaskFunction() is working', async () => {
+ const dynamicThreadPool = new DynamicThreadPool(
+ Math.floor(numberOfWorkers / 2),
+ numberOfWorkers,
+ './tests/worker-files/thread/testMultipleTaskFunctionsWorker.js'
+ )
+ await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
+ expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
+ DEFAULT_TASK_NAME,
+ 'jsonIntegerSerialization',
+ 'factorial',
+ 'fibonacci'
+ ])
+ await expect(
+ dynamicThreadPool.setDefaultTaskFunction('factorial')
+ ).resolves.toBe(true)
+ expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
+ DEFAULT_TASK_NAME,
+ 'factorial',
+ 'jsonIntegerSerialization',
+ 'fibonacci'
+ ])
+ await expect(
+ dynamicThreadPool.setDefaultTaskFunction('fibonacci')
+ ).resolves.toBe(true)
+ expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
+ DEFAULT_TASK_NAME,
+ 'fibonacci',
+ 'jsonIntegerSerialization',
+ 'factorial'
+ ])
+ })
+
it('Verify that multiple task functions worker is working', async () => {
const pool = new DynamicClusterPool(
Math.floor(numberOfWorkers / 2),
expect(pool.info.executingTasks).toBe(0)
expect(pool.info.executedTasks).toBe(4)
for (const workerNode of pool.workerNodes) {
- expect(workerNode.info.taskFunctions).toStrictEqual([
+ expect(workerNode.info.taskFunctionNames).toStrictEqual([
DEFAULT_TASK_NAME,
'jsonIntegerSerialization',
'factorial',
'fibonacci'
])
expect(workerNode.taskFunctionsUsage.size).toBe(3)
- for (const name of pool.listTaskFunctions()) {
+ for (const name of pool.listTaskFunctionNames()) {
expect(workerNode.getTaskFunctionWorkerUsage(name)).toStrictEqual({
tasks: {
executed: expect.any(Number),
expect(
workerNode.getTaskFunctionWorkerUsage(DEFAULT_TASK_NAME)
).toStrictEqual(
- workerNode.getTaskFunctionWorkerUsage(workerNode.info.taskFunctions[1])
+ workerNode.getTaskFunctionWorkerUsage(
+ workerNode.info.taskFunctionNames[1]
+ )
)
}
await pool.destroy()