+ expect(poolInfo).toStrictEqual({
+ version,
+ type: PoolTypes.fixed,
+ worker: WorkerTypes.thread,
+ started: true,
+ ready: true,
+ strategy: WorkerChoiceStrategies.ROUND_ROBIN,
+ minSize: expect.any(Number),
+ maxSize: expect.any(Number),
+ workerNodes: expect.any(Number),
+ idleWorkerNodes: expect.any(Number),
+ busyWorkerNodes: expect.any(Number),
+ executedTasks: expect.any(Number),
+ executingTasks: expect.any(Number),
+ failedTasks: expect.any(Number)
+ })
+ await pool.destroy()
+ })
+
+ it("Verify that pool event emitter 'full' event can register a callback", async () => {
+ const pool = new DynamicThreadPool(
+ Math.floor(numberOfWorkers / 2),
+ numberOfWorkers,
+ './tests/worker-files/thread/testWorker.js'
+ )
+ const promises = new Set()
+ let poolFull = 0
+ let poolInfo
+ pool.emitter.on(PoolEvents.full, info => {
+ ++poolFull
+ poolInfo = info
+ })
+ for (let i = 0; i < numberOfWorkers * 2; i++) {
+ promises.add(pool.execute())
+ }
+ await Promise.all(promises)
+ expect(poolFull).toBe(1)
+ expect(poolInfo).toStrictEqual({
+ version,
+ type: PoolTypes.dynamic,
+ worker: WorkerTypes.thread,
+ started: true,
+ ready: true,
+ strategy: WorkerChoiceStrategies.ROUND_ROBIN,
+ minSize: expect.any(Number),
+ maxSize: expect.any(Number),
+ workerNodes: expect.any(Number),
+ idleWorkerNodes: expect.any(Number),
+ busyWorkerNodes: expect.any(Number),
+ executedTasks: expect.any(Number),
+ executingTasks: expect.any(Number),
+ failedTasks: expect.any(Number)
+ })
+ await pool.destroy()
+ })
+
+ it("Verify that pool event emitter 'backPressure' event can register a callback", async () => {
+ const pool = new FixedThreadPool(
+ numberOfWorkers,
+ './tests/worker-files/thread/testWorker.js',
+ {
+ enableTasksQueue: true
+ }
+ )
+ sinon.stub(pool, 'hasBackPressure').returns(true)
+ const promises = new Set()
+ let poolBackPressure = 0
+ let poolInfo
+ pool.emitter.on(PoolEvents.backPressure, info => {
+ ++poolBackPressure
+ poolInfo = info
+ })
+ for (let i = 0; i < numberOfWorkers + 1; i++) {
+ promises.add(pool.execute())
+ }
+ await Promise.all(promises)
+ expect(poolBackPressure).toBe(1)
+ expect(poolInfo).toStrictEqual({
+ version,
+ type: PoolTypes.fixed,
+ worker: WorkerTypes.thread,
+ started: true,
+ ready: true,
+ strategy: WorkerChoiceStrategies.ROUND_ROBIN,
+ minSize: expect.any(Number),
+ maxSize: expect.any(Number),
+ workerNodes: expect.any(Number),
+ idleWorkerNodes: expect.any(Number),
+ busyWorkerNodes: expect.any(Number),
+ executedTasks: expect.any(Number),
+ executingTasks: expect.any(Number),
+ maxQueuedTasks: expect.any(Number),
+ queuedTasks: expect.any(Number),
+ backPressure: true,
+ stolenTasks: expect.any(Number),
+ failedTasks: expect.any(Number)
+ })
+ expect(pool.hasBackPressure.called).toBe(true)
+ await pool.destroy()
+ })
+
+ it('Verify that listTaskFunctions() is working', async () => {
+ const dynamicThreadPool = new DynamicThreadPool(
+ Math.floor(numberOfWorkers / 2),
+ numberOfWorkers,
+ './tests/worker-files/thread/testMultipleTaskFunctionsWorker.js'
+ )
+ await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
+ expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
+ DEFAULT_TASK_NAME,
+ 'jsonIntegerSerialization',
+ 'factorial',
+ 'fibonacci'
+ ])
+ const fixedClusterPool = new FixedClusterPool(
+ numberOfWorkers,
+ './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
+ )
+ await waitPoolEvents(fixedClusterPool, PoolEvents.ready, 1)
+ expect(fixedClusterPool.listTaskFunctionNames()).toStrictEqual([
+ DEFAULT_TASK_NAME,
+ 'jsonIntegerSerialization',
+ 'factorial',
+ 'fibonacci'
+ ])
+ await dynamicThreadPool.destroy()
+ await fixedClusterPool.destroy()
+ })
+
+ it('Verify that multiple task functions worker is working', async () => {
+ const pool = new DynamicClusterPool(
+ Math.floor(numberOfWorkers / 2),
+ numberOfWorkers,
+ './tests/worker-files/cluster/testMultipleTaskFunctionsWorker.js'
+ )
+ const data = { n: 10 }
+ const result0 = await pool.execute(data)
+ expect(result0).toStrictEqual({ ok: 1 })
+ const result1 = await pool.execute(data, 'jsonIntegerSerialization')
+ expect(result1).toStrictEqual({ ok: 1 })
+ const result2 = await pool.execute(data, 'factorial')
+ expect(result2).toBe(3628800)
+ const result3 = await pool.execute(data, 'fibonacci')
+ expect(result3).toBe(55)
+ expect(pool.info.executingTasks).toBe(0)
+ expect(pool.info.executedTasks).toBe(4)
+ for (const workerNode of pool.workerNodes) {
+ expect(workerNode.info.taskFunctionNames).toStrictEqual([
+ DEFAULT_TASK_NAME,
+ 'jsonIntegerSerialization',
+ 'factorial',
+ 'fibonacci'
+ ])
+ expect(workerNode.taskFunctionsUsage.size).toBe(3)
+ for (const name of pool.listTaskFunctionNames()) {
+ expect(workerNode.getTaskFunctionWorkerUsage(name)).toStrictEqual({
+ tasks: {
+ executed: expect.any(Number),
+ executing: 0,
+ failed: 0,
+ queued: 0,
+ stolen: 0
+ },
+ runTime: {
+ history: expect.any(CircularArray)
+ },
+ waitTime: {
+ history: expect.any(CircularArray)
+ },
+ elu: {
+ idle: {
+ history: expect.any(CircularArray)
+ },
+ active: {
+ history: expect.any(CircularArray)
+ }
+ }
+ })
+ expect(
+ workerNode.getTaskFunctionWorkerUsage(name).tasks.executed
+ ).toBeGreaterThan(0)
+ }
+ expect(
+ workerNode.getTaskFunctionWorkerUsage(DEFAULT_TASK_NAME)
+ ).toStrictEqual(
+ workerNode.getTaskFunctionWorkerUsage(
+ workerNode.info.taskFunctionNames[1]
+ )
+ )
+ }
+ await pool.destroy()
+ })
+
+ it('Verify sendKillMessageToWorker()', async () => {
+ const pool = new DynamicClusterPool(
+ Math.floor(numberOfWorkers / 2),
+ numberOfWorkers,
+ './tests/worker-files/cluster/testWorker.js'
+ )
+ const workerNodeKey = 0
+ await expect(
+ pool.sendKillMessageToWorker(
+ workerNodeKey,
+ pool.workerNodes[workerNodeKey].info.id
+ )
+ ).resolves.toBeUndefined()