import { expect } from 'expect'
import { restore, stub } from 'sinon'
-import { CircularArray } from '../../lib/circular-array.cjs'
+import { CircularBuffer } from '../../lib/circular-buffer.cjs'
import {
DynamicClusterPool,
DynamicThreadPool,
WorkerTypes
} from '../../lib/index.cjs'
import { WorkerNode } from '../../lib/pools/worker-node.cjs'
-import { PriorityQueue } from '../../lib/priority-queue.cjs'
+import { defaultBucketSize, PriorityQueue } from '../../lib/priority-queue.cjs'
import { DEFAULT_TASK_NAME } from '../../lib/utils.cjs'
import { waitPoolEvents } from '../test-utils.cjs'
concurrency: 2,
size: Math.pow(numberOfWorkers, 2),
taskStealing: true,
- tasksStealingOnBackPressure: true,
+ tasksStealingOnBackPressure: false,
tasksFinishedTimeout: 2000
},
workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
concurrency: 1,
size: Math.pow(numberOfWorkers, 2),
taskStealing: true,
- tasksStealingOnBackPressure: true,
+ tasksStealingOnBackPressure: false,
tasksFinishedTimeout: 2000
})
pool.enableTasksQueue(true, { concurrency: 2 })
concurrency: 2,
size: Math.pow(numberOfWorkers, 2),
taskStealing: true,
- tasksStealingOnBackPressure: true,
+ tasksStealingOnBackPressure: false,
tasksFinishedTimeout: 2000
})
pool.enableTasksQueue(false)
concurrency: 1,
size: Math.pow(numberOfWorkers, 2),
taskStealing: true,
- tasksStealingOnBackPressure: true,
+ tasksStealingOnBackPressure: false,
tasksFinishedTimeout: 2000
})
for (const workerNode of pool.workerNodes) {
failed: 0
},
runTime: {
- history: new CircularArray()
+ history: expect.any(CircularBuffer)
},
waitTime: {
- history: new CircularArray()
+ history: expect.any(CircularBuffer)
},
elu: {
idle: {
- history: new CircularArray()
+ history: expect.any(CircularBuffer)
},
active: {
- history: new CircularArray()
+ history: expect.any(CircularBuffer)
}
}
})
expect(workerNode.tasksQueue).toBeInstanceOf(PriorityQueue)
expect(workerNode.tasksQueue.size).toBe(0)
expect(workerNode.tasksQueue.maxSize).toBe(0)
+ expect(workerNode.tasksQueue.bucketSize).toBe(defaultBucketSize)
+ expect(workerNode.tasksQueue.enablePriority).toBe(false)
}
await pool.destroy()
pool = new DynamicThreadPool(
expect(workerNode.tasksQueue).toBeInstanceOf(PriorityQueue)
expect(workerNode.tasksQueue.size).toBe(0)
expect(workerNode.tasksQueue.maxSize).toBe(0)
+ expect(workerNode.tasksQueue.bucketSize).toBe(defaultBucketSize)
+ expect(workerNode.tasksQueue.enablePriority).toBe(false)
}
await pool.destroy()
})
type: WorkerTypes.cluster,
dynamic: false,
ready: true,
- stealing: false
+ stealing: false,
+ backPressure: false
})
}
await pool.destroy()
type: WorkerTypes.thread,
dynamic: false,
ready: true,
- stealing: false
+ stealing: false,
+ backPressure: false
})
}
await pool.destroy()
failed: 0
},
runTime: {
- history: expect.any(CircularArray)
+ history: expect.any(CircularBuffer)
},
waitTime: {
- history: expect.any(CircularArray)
+ history: expect.any(CircularBuffer)
},
elu: {
idle: {
- history: expect.any(CircularArray)
+ history: expect.any(CircularBuffer)
},
active: {
- history: expect.any(CircularArray)
+ history: expect.any(CircularBuffer)
}
}
})
failed: 0
},
runTime: {
- history: expect.any(CircularArray)
+ history: expect.any(CircularBuffer)
},
waitTime: {
- history: expect.any(CircularArray)
+ history: expect.any(CircularBuffer)
},
elu: {
idle: {
- history: expect.any(CircularArray)
+ history: expect.any(CircularBuffer)
},
active: {
- history: expect.any(CircularArray)
+ history: expect.any(CircularBuffer)
}
}
})
failed: 0
},
runTime: {
- history: expect.any(CircularArray)
+ history: expect.any(CircularBuffer)
},
waitTime: {
- history: expect.any(CircularArray)
+ history: expect.any(CircularBuffer)
},
elu: {
idle: {
- history: expect.any(CircularArray)
+ history: expect.any(CircularBuffer)
},
active: {
- history: expect.any(CircularArray)
+ history: expect.any(CircularBuffer)
}
}
})
expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
numberOfWorkers * maxMultiplier
)
- expect(workerNode.usage.runTime.history.length).toBe(0)
- expect(workerNode.usage.waitTime.history.length).toBe(0)
- expect(workerNode.usage.elu.idle.history.length).toBe(0)
- expect(workerNode.usage.elu.active.history.length).toBe(0)
}
pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.FAIR_SHARE)
for (const workerNode of pool.workerNodes) {
failed: 0
},
runTime: {
- history: expect.any(CircularArray)
+ history: expect.any(CircularBuffer)
},
waitTime: {
- history: expect.any(CircularArray)
+ history: expect.any(CircularBuffer)
},
elu: {
idle: {
- history: expect.any(CircularArray)
+ history: expect.any(CircularBuffer)
},
active: {
- history: expect.any(CircularArray)
+ history: expect.any(CircularBuffer)
}
}
})
expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
numberOfWorkers * maxMultiplier
)
- expect(workerNode.usage.runTime.history.length).toBe(0)
- expect(workerNode.usage.waitTime.history.length).toBe(0)
- expect(workerNode.usage.elu.idle.history.length).toBe(0)
- expect(workerNode.usage.elu.active.history.length).toBe(0)
}
await pool.destroy()
})
failed: 0
},
runTime: {
- history: new CircularArray()
+ history: expect.any(CircularBuffer)
},
waitTime: {
- history: new CircularArray()
+ history: expect.any(CircularBuffer)
},
elu: expect.objectContaining({
idle: expect.objectContaining({
- history: expect.any(CircularArray)
+ history: expect.any(CircularBuffer)
}),
active: expect.objectContaining({
- history: expect.any(CircularArray)
+ history: expect.any(CircularBuffer)
})
})
})
{ name: 'fibonacci' }
])
expect(workerNode.taskFunctionsUsage.size).toBe(3)
+ expect(workerNode.usage.tasks.executed).toBeGreaterThan(0)
+ expect(workerNode.tasksQueue.enablePriority).toBe(false)
+ for (const taskFunctionProperties of pool.listTaskFunctionsProperties()) {
+ expect(
+ workerNode.getTaskFunctionWorkerUsage(taskFunctionProperties.name)
+ ).toStrictEqual({
+ tasks: {
+ executed: expect.any(Number),
+ executing: 0,
+ failed: 0,
+ queued: 0,
+ sequentiallyStolen: 0,
+ stolen: 0
+ },
+ runTime: {
+ history: expect.any(CircularBuffer)
+ },
+ waitTime: {
+ history: expect.any(CircularBuffer)
+ },
+ elu: {
+ idle: {
+ history: expect.any(CircularBuffer)
+ },
+ active: {
+ history: expect.any(CircularBuffer)
+ }
+ }
+ })
+ expect(
+ workerNode.getTaskFunctionWorkerUsage(taskFunctionProperties.name)
+ .tasks.executed
+ ).toBeGreaterThan(0)
+ }
+ expect(
+ workerNode.getTaskFunctionWorkerUsage(DEFAULT_TASK_NAME)
+ ).toStrictEqual(
+ workerNode.getTaskFunctionWorkerUsage(
+ workerNode.info.taskFunctionsProperties[1].name
+ )
+ )
+ }
+ await pool.destroy()
+ })
+
+ it('Verify that task function objects worker is working', async () => {
+ const pool = new DynamicThreadPool(
+ Math.floor(numberOfWorkers / 2),
+ numberOfWorkers,
+ './tests/worker-files/thread/testTaskFunctionObjectsWorker.mjs'
+ )
+ const data = { n: 10 }
+ const result0 = await pool.execute(data)
+ expect(result0).toStrictEqual({ ok: 1 })
+ const result1 = await pool.execute(data, 'jsonIntegerSerialization')
+ expect(result1).toStrictEqual({ ok: 1 })
+ const result2 = await pool.execute(data, 'factorial')
+ expect(result2).toBe(3628800)
+ const result3 = await pool.execute(data, 'fibonacci')
+ expect(result3).toBe(55)
+ expect(pool.info.executingTasks).toBe(0)
+ expect(pool.info.executedTasks).toBe(4)
+ for (const workerNode of pool.workerNodes) {
+ expect(workerNode.info.taskFunctionsProperties).toStrictEqual([
+ { name: DEFAULT_TASK_NAME },
+ { name: 'jsonIntegerSerialization' },
+ { name: 'factorial' },
+ { name: 'fibonacci', priority: -5 }
+ ])
+ expect(workerNode.taskFunctionsUsage.size).toBe(3)
+ expect(workerNode.usage.tasks.executed).toBeGreaterThan(0)
+ expect(workerNode.tasksQueue.enablePriority).toBe(true)
for (const taskFunctionProperties of pool.listTaskFunctionsProperties()) {
expect(
workerNode.getTaskFunctionWorkerUsage(taskFunctionProperties.name)
stolen: 0
},
runTime: {
- history: expect.any(CircularArray)
+ history: expect.any(CircularBuffer)
},
waitTime: {
- history: expect.any(CircularArray)
+ history: expect.any(CircularBuffer)
},
elu: {
idle: {
- history: expect.any(CircularArray)
+ history: expect.any(CircularBuffer)
},
active: {
- history: expect.any(CircularArray)
+ history: expect.any(CircularBuffer)
}
}
})