repositories
/
poolifier.git
/ blobdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
tree
raw
|
inline
| side by side
feat: use priority queue for task queueing
[poolifier.git]
/
tests
/
pools
/
abstract-pool.test.mjs
diff --git
a/tests/pools/abstract-pool.test.mjs
b/tests/pools/abstract-pool.test.mjs
index 50656e85f37bfc4bfe60af9edb69881876729438..7740b079cc5b68e7117eb12af70201f6750f6f62 100644
(file)
--- a/
tests/pools/abstract-pool.test.mjs
+++ b/
tests/pools/abstract-pool.test.mjs
@@
-9,7
+9,6
@@
import { expect } from 'expect'
import { restore, stub } from 'sinon'
import { CircularArray } from '../../lib/circular-array.cjs'
import { restore, stub } from 'sinon'
import { CircularArray } from '../../lib/circular-array.cjs'
-import { Deque } from '../../lib/deque.cjs'
import {
DynamicClusterPool,
DynamicThreadPool,
import {
DynamicClusterPool,
DynamicThreadPool,
@@
-21,6
+20,7
@@
import {
WorkerTypes
} from '../../lib/index.cjs'
import { WorkerNode } from '../../lib/pools/worker-node.cjs'
WorkerTypes
} from '../../lib/index.cjs'
import { WorkerNode } from '../../lib/pools/worker-node.cjs'
+import { PriorityQueue } from '../../lib/priority-queue.cjs'
import { DEFAULT_TASK_NAME } from '../../lib/utils.cjs'
import { waitPoolEvents } from '../test-utils.cjs'
import { DEFAULT_TASK_NAME } from '../../lib/utils.cjs'
import { waitPoolEvents } from '../test-utils.cjs'
@@
-232,7
+232,7
@@
describe('Abstract pool test suite', () => {
enableTasksQueue: false,
workerChoiceStrategy: WorkerChoiceStrategies.ROUND_ROBIN
})
enableTasksQueue: false,
workerChoiceStrategy: WorkerChoiceStrategies.ROUND_ROBIN
})
- for (const [, workerChoiceStrategy] of pool.workerChoiceStrateg
y
Context
+ for (const [, workerChoiceStrategy] of pool.workerChoiceStrateg
ies
Context
.workerChoiceStrategies) {
expect(workerChoiceStrategy.opts).toStrictEqual({
runTime: { median: false },
.workerChoiceStrategies) {
expect(workerChoiceStrategy.opts).toStrictEqual({
runTime: { median: false },
@@
-288,7
+288,7
@@
describe('Abstract pool test suite', () => {
errorHandler: testHandler,
exitHandler: testHandler
})
errorHandler: testHandler,
exitHandler: testHandler
})
- for (const [, workerChoiceStrategy] of pool.workerChoiceStrateg
y
Context
+ for (const [, workerChoiceStrategy] of pool.workerChoiceStrateg
ies
Context
.workerChoiceStrategies) {
expect(workerChoiceStrategy.opts).toStrictEqual({
runTime: { median: true },
.workerChoiceStrategies) {
expect(workerChoiceStrategy.opts).toStrictEqual({
runTime: { median: true },
@@
-447,7
+447,7
@@
describe('Abstract pool test suite', () => {
{ workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
)
expect(pool.opts.workerChoiceStrategyOptions).toBeUndefined()
{ workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
)
expect(pool.opts.workerChoiceStrategyOptions).toBeUndefined()
- for (const [, workerChoiceStrategy] of pool.workerChoiceStrateg
y
Context
+ for (const [, workerChoiceStrategy] of pool.workerChoiceStrateg
ies
Context
.workerChoiceStrategies) {
expect(workerChoiceStrategy.opts).toStrictEqual({
runTime: { median: false },
.workerChoiceStrategies) {
expect(workerChoiceStrategy.opts).toStrictEqual({
runTime: { median: false },
@@
-460,7
+460,7
@@
describe('Abstract pool test suite', () => {
})
}
expect(
})
}
expect(
- pool.workerChoiceStrateg
y
Context.getTaskStatisticsRequirements()
+ pool.workerChoiceStrateg
ies
Context.getTaskStatisticsRequirements()
).toStrictEqual({
runTime: {
aggregate: true,
).toStrictEqual({
runTime: {
aggregate: true,
@@
-486,7
+486,7
@@
describe('Abstract pool test suite', () => {
runTime: { median: true },
elu: { median: true }
})
runTime: { median: true },
elu: { median: true }
})
- for (const [, workerChoiceStrategy] of pool.workerChoiceStrateg
y
Context
+ for (const [, workerChoiceStrategy] of pool.workerChoiceStrateg
ies
Context
.workerChoiceStrategies) {
expect(workerChoiceStrategy.opts).toStrictEqual({
runTime: { median: true },
.workerChoiceStrategies) {
expect(workerChoiceStrategy.opts).toStrictEqual({
runTime: { median: true },
@@
-499,7
+499,7
@@
describe('Abstract pool test suite', () => {
})
}
expect(
})
}
expect(
- pool.workerChoiceStrateg
y
Context.getTaskStatisticsRequirements()
+ pool.workerChoiceStrateg
ies
Context.getTaskStatisticsRequirements()
).toStrictEqual({
runTime: {
aggregate: true,
).toStrictEqual({
runTime: {
aggregate: true,
@@
-525,7
+525,7
@@
describe('Abstract pool test suite', () => {
runTime: { median: false },
elu: { median: false }
})
runTime: { median: false },
elu: { median: false }
})
- for (const [, workerChoiceStrategy] of pool.workerChoiceStrateg
y
Context
+ for (const [, workerChoiceStrategy] of pool.workerChoiceStrateg
ies
Context
.workerChoiceStrategies) {
expect(workerChoiceStrategy.opts).toStrictEqual({
runTime: { median: false },
.workerChoiceStrategies) {
expect(workerChoiceStrategy.opts).toStrictEqual({
runTime: { median: false },
@@
-538,7
+538,7
@@
describe('Abstract pool test suite', () => {
})
}
expect(
})
}
expect(
- pool.workerChoiceStrateg
y
Context.getTaskStatisticsRequirements()
+ pool.workerChoiceStrateg
ies
Context.getTaskStatisticsRequirements()
).toStrictEqual({
runTime: {
aggregate: true,
).toStrictEqual({
runTime: {
aggregate: true,
@@
-706,7
+706,7
@@
describe('Abstract pool test suite', () => {
worker: WorkerTypes.thread,
started: true,
ready: true,
worker: WorkerTypes.thread,
started: true,
ready: true,
-
s
trategy: WorkerChoiceStrategies.ROUND_ROBIN,
+
defaultS
trategy: WorkerChoiceStrategies.ROUND_ROBIN,
strategyRetries: 0,
minSize: numberOfWorkers,
maxSize: numberOfWorkers,
strategyRetries: 0,
minSize: numberOfWorkers,
maxSize: numberOfWorkers,
@@
-729,7
+729,7
@@
describe('Abstract pool test suite', () => {
worker: WorkerTypes.cluster,
started: true,
ready: true,
worker: WorkerTypes.cluster,
started: true,
ready: true,
-
s
trategy: WorkerChoiceStrategies.ROUND_ROBIN,
+
defaultS
trategy: WorkerChoiceStrategies.ROUND_ROBIN,
strategyRetries: 0,
minSize: Math.floor(numberOfWorkers / 2),
maxSize: numberOfWorkers,
strategyRetries: 0,
minSize: Math.floor(numberOfWorkers / 2),
maxSize: numberOfWorkers,
@@
-786,7
+786,7
@@
describe('Abstract pool test suite', () => {
)
for (const workerNode of pool.workerNodes) {
expect(workerNode).toBeInstanceOf(WorkerNode)
)
for (const workerNode of pool.workerNodes) {
expect(workerNode).toBeInstanceOf(WorkerNode)
- expect(workerNode.tasksQueue).toBeInstanceOf(
Deq
ue)
+ expect(workerNode.tasksQueue).toBeInstanceOf(
PriorityQue
ue)
expect(workerNode.tasksQueue.size).toBe(0)
expect(workerNode.tasksQueue.maxSize).toBe(0)
}
expect(workerNode.tasksQueue.size).toBe(0)
expect(workerNode.tasksQueue.maxSize).toBe(0)
}
@@
-798,7
+798,7
@@
describe('Abstract pool test suite', () => {
)
for (const workerNode of pool.workerNodes) {
expect(workerNode).toBeInstanceOf(WorkerNode)
)
for (const workerNode of pool.workerNodes) {
expect(workerNode).toBeInstanceOf(WorkerNode)
- expect(workerNode.tasksQueue).toBeInstanceOf(
Deq
ue)
+ expect(workerNode.tasksQueue).toBeInstanceOf(
PriorityQue
ue)
expect(workerNode.tasksQueue.size).toBe(0)
expect(workerNode.tasksQueue.maxSize).toBe(0)
}
expect(workerNode.tasksQueue.size).toBe(0)
expect(workerNode.tasksQueue.maxSize).toBe(0)
}
@@
-975,7
+975,7
@@
describe('Abstract pool test suite', () => {
await pool.destroy()
})
await pool.destroy()
})
- it(
'Verify that pool worker tasks usage are reset at worker choice strategy change'
, async () => {
+ it(
"Verify that pool worker tasks usage aren't reset at worker choice strategy change"
, async () => {
const pool = new DynamicThreadPool(
Math.floor(numberOfWorkers / 2),
numberOfWorkers,
const pool = new DynamicThreadPool(
Math.floor(numberOfWorkers / 2),
numberOfWorkers,
@@
-1026,7
+1026,7
@@
describe('Abstract pool test suite', () => {
for (const workerNode of pool.workerNodes) {
expect(workerNode.usage).toStrictEqual({
tasks: {
for (const workerNode of pool.workerNodes) {
expect(workerNode.usage).toStrictEqual({
tasks: {
- executed:
0
,
+ executed:
expect.any(Number)
,
executing: 0,
queued: 0,
maxQueued: 0,
executing: 0,
queued: 0,
maxQueued: 0,
@@
-1049,6
+1049,10
@@
describe('Abstract pool test suite', () => {
}
}
})
}
}
})
+ expect(workerNode.usage.tasks.executed).toBeGreaterThan(0)
+ expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
+ numberOfWorkers * maxMultiplier
+ )
expect(workerNode.usage.runTime.history.length).toBe(0)
expect(workerNode.usage.waitTime.history.length).toBe(0)
expect(workerNode.usage.elu.idle.history.length).toBe(0)
expect(workerNode.usage.runTime.history.length).toBe(0)
expect(workerNode.usage.waitTime.history.length).toBe(0)
expect(workerNode.usage.elu.idle.history.length).toBe(0)
@@
-1079,7
+1083,7
@@
describe('Abstract pool test suite', () => {
worker: WorkerTypes.cluster,
started: true,
ready: true,
worker: WorkerTypes.cluster,
started: true,
ready: true,
-
s
trategy: WorkerChoiceStrategies.ROUND_ROBIN,
+
defaultS
trategy: WorkerChoiceStrategies.ROUND_ROBIN,
strategyRetries: expect.any(Number),
minSize: expect.any(Number),
maxSize: expect.any(Number),
strategyRetries: expect.any(Number),
minSize: expect.any(Number),
maxSize: expect.any(Number),
@@
-1120,7
+1124,7
@@
describe('Abstract pool test suite', () => {
worker: WorkerTypes.thread,
started: true,
ready: true,
worker: WorkerTypes.thread,
started: true,
ready: true,
-
s
trategy: WorkerChoiceStrategies.ROUND_ROBIN,
+
defaultS
trategy: WorkerChoiceStrategies.ROUND_ROBIN,
strategyRetries: expect.any(Number),
minSize: expect.any(Number),
maxSize: expect.any(Number),
strategyRetries: expect.any(Number),
minSize: expect.any(Number),
maxSize: expect.any(Number),
@@
-1160,7
+1164,7
@@
describe('Abstract pool test suite', () => {
worker: WorkerTypes.thread,
started: true,
ready: true,
worker: WorkerTypes.thread,
started: true,
ready: true,
-
s
trategy: WorkerChoiceStrategies.ROUND_ROBIN,
+
defaultS
trategy: WorkerChoiceStrategies.ROUND_ROBIN,
strategyRetries: expect.any(Number),
minSize: expect.any(Number),
maxSize: expect.any(Number),
strategyRetries: expect.any(Number),
minSize: expect.any(Number),
maxSize: expect.any(Number),
@@
-1203,7
+1207,7
@@
describe('Abstract pool test suite', () => {
worker: WorkerTypes.thread,
started: true,
ready: true,
worker: WorkerTypes.thread,
started: true,
ready: true,
-
s
trategy: WorkerChoiceStrategies.ROUND_ROBIN,
+
defaultS
trategy: WorkerChoiceStrategies.ROUND_ROBIN,
strategyRetries: expect.any(Number),
minSize: expect.any(Number),
maxSize: expect.any(Number),
strategyRetries: expect.any(Number),
minSize: expect.any(Number),
maxSize: expect.any(Number),
@@
-1372,20
+1376,33
@@
describe('Abstract pool test suite', () => {
{ name: DEFAULT_TASK_NAME },
{ name: 'test' }
])
{ name: DEFAULT_TASK_NAME },
{ name: 'test' }
])
+ expect([
+ ...dynamicThreadPool.workerChoiceStrategiesContext.workerChoiceStrategies.keys()
+ ]).toStrictEqual([WorkerChoiceStrategies.ROUND_ROBIN])
const echoTaskFunction = data => {
return data
}
await expect(
const echoTaskFunction = data => {
return data
}
await expect(
- dynamicThreadPool.addTaskFunction('echo', echoTaskFunction)
+ dynamicThreadPool.addTaskFunction('echo', {
+ taskFunction: echoTaskFunction,
+ strategy: WorkerChoiceStrategies.LEAST_ELU
+ })
).resolves.toBe(true)
expect(dynamicThreadPool.taskFunctions.size).toBe(1)
expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual({
).resolves.toBe(true)
expect(dynamicThreadPool.taskFunctions.size).toBe(1)
expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual({
- taskFunction: echoTaskFunction
+ taskFunction: echoTaskFunction,
+ strategy: WorkerChoiceStrategies.LEAST_ELU
})
})
+ expect([
+ ...dynamicThreadPool.workerChoiceStrategiesContext.workerChoiceStrategies.keys()
+ ]).toStrictEqual([
+ WorkerChoiceStrategies.ROUND_ROBIN,
+ WorkerChoiceStrategies.LEAST_ELU
+ ])
expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([
{ name: DEFAULT_TASK_NAME },
{ name: 'test' },
expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([
{ name: DEFAULT_TASK_NAME },
{ name: 'test' },
- { name: 'echo' }
+ { name: 'echo'
, strategy: WorkerChoiceStrategies.LEAST_ELU
}
])
const taskFunctionData = { test: 'test' }
const echoResult = await dynamicThreadPool.execute(taskFunctionData, 'echo')
])
const taskFunctionData = { test: 'test' }
const echoResult = await dynamicThreadPool.execute(taskFunctionData, 'echo')
@@
-1408,9
+1425,15
@@
describe('Abstract pool test suite', () => {
},
elu: {
idle: {
},
elu: {
idle: {
+ aggregate: 0,
+ maximum: 0,
+ minimum: 0,
history: new CircularArray()
},
active: {
history: new CircularArray()
},
active: {
+ aggregate: 0,
+ maximum: 0,
+ minimum: 0,
history: new CircularArray()
}
}
history: new CircularArray()
}
}
@@
-1436,21
+1459,34
@@
describe('Abstract pool test suite', () => {
const echoTaskFunction = data => {
return data
}
const echoTaskFunction = data => {
return data
}
- await dynamicThreadPool.addTaskFunction('echo', echoTaskFunction)
+ await dynamicThreadPool.addTaskFunction('echo', {
+ taskFunction: echoTaskFunction,
+ strategy: WorkerChoiceStrategies.LEAST_ELU
+ })
expect(dynamicThreadPool.taskFunctions.size).toBe(1)
expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual({
expect(dynamicThreadPool.taskFunctions.size).toBe(1)
expect(dynamicThreadPool.taskFunctions.get('echo')).toStrictEqual({
- taskFunction: echoTaskFunction
+ taskFunction: echoTaskFunction,
+ strategy: WorkerChoiceStrategies.LEAST_ELU
})
})
+ expect([
+ ...dynamicThreadPool.workerChoiceStrategiesContext.workerChoiceStrategies.keys()
+ ]).toStrictEqual([
+ WorkerChoiceStrategies.ROUND_ROBIN,
+ WorkerChoiceStrategies.LEAST_ELU
+ ])
expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([
{ name: DEFAULT_TASK_NAME },
{ name: 'test' },
expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([
{ name: DEFAULT_TASK_NAME },
{ name: 'test' },
- { name: 'echo' }
+ { name: 'echo'
, strategy: WorkerChoiceStrategies.LEAST_ELU
}
])
await expect(dynamicThreadPool.removeTaskFunction('echo')).resolves.toBe(
true
)
expect(dynamicThreadPool.taskFunctions.size).toBe(0)
expect(dynamicThreadPool.taskFunctions.get('echo')).toBeUndefined()
])
await expect(dynamicThreadPool.removeTaskFunction('echo')).resolves.toBe(
true
)
expect(dynamicThreadPool.taskFunctions.size).toBe(0)
expect(dynamicThreadPool.taskFunctions.get('echo')).toBeUndefined()
+ expect([
+ ...dynamicThreadPool.workerChoiceStrategiesContext.workerChoiceStrategies.keys()
+ ]).toStrictEqual([WorkerChoiceStrategies.ROUND_ROBIN])
expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([
{ name: DEFAULT_TASK_NAME },
{ name: 'test' }
expect(dynamicThreadPool.listTaskFunctionsProperties()).toStrictEqual([
{ name: DEFAULT_TASK_NAME },
{ name: 'test' }
@@
-1458,7
+1494,7
@@
describe('Abstract pool test suite', () => {
await dynamicThreadPool.destroy()
})
await dynamicThreadPool.destroy()
})
- it('Verify that listTaskFunction
Nam
es() is working', async () => {
+ it('Verify that listTaskFunction
sProperti
es() is working', async () => {
const dynamicThreadPool = new DynamicThreadPool(
Math.floor(numberOfWorkers / 2),
numberOfWorkers,
const dynamicThreadPool = new DynamicThreadPool(
Math.floor(numberOfWorkers / 2),
numberOfWorkers,