})
it("Verify that pool event emitter 'full' event can register a callback", async () => {
- const pool = new DynamicThreadPool(
+ const pool = new DynamicClusterPool(
Math.floor(numberOfWorkers / 2),
numberOfWorkers,
- './tests/worker-files/thread/testWorker.mjs'
+ './tests/worker-files/cluster/testWorker.cjs'
)
expect(pool.emitter.eventNames()).toStrictEqual([])
const promises = new Set()
strategyRetries: expect.any(Number),
type: PoolTypes.dynamic,
version,
- worker: WorkerTypes.thread,
+ worker: WorkerTypes.cluster,
workerNodes: expect.any(Number),
})
await pool.destroy()
await pool.destroy()
})
+ it("Verify that pool event emitter 'empty' event can register a callback", async () => {
+ const pool = new DynamicClusterPool(
+ 0,
+ numberOfWorkers,
+ './tests/worker-files/cluster/testWorker.cjs'
+ )
+ expect(pool.emitter.eventNames()).toStrictEqual([])
+ const promises = new Set()
+ let poolEmpty = 0
+ let poolInfo
+ pool.emitter.on(PoolEvents.empty, info => {
+ ++poolEmpty
+ poolInfo = info
+ })
+ expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.empty])
+ for (let i = 0; i < numberOfWorkers * 2; i++) {
+ promises.add(pool.execute())
+ }
+ await Promise.all(promises)
+ await waitPoolEvents(pool, PoolEvents.empty, 1)
+ expect(poolEmpty).toBe(1)
+ expect(poolInfo).toStrictEqual({
+ busyWorkerNodes: expect.any(Number),
+ defaultStrategy: WorkerChoiceStrategies.ROUND_ROBIN,
+ executedTasks: expect.any(Number),
+ executingTasks: expect.any(Number),
+ failedTasks: expect.any(Number),
+ idleWorkerNodes: expect.any(Number),
+ maxSize: expect.any(Number),
+ minSize: expect.any(Number),
+ ready: true,
+ started: true,
+ strategyRetries: expect.any(Number),
+ type: PoolTypes.dynamic,
+ version,
+ worker: WorkerTypes.cluster,
+ workerNodes: expect.any(Number),
+ })
+ await pool.destroy()
+ })
+
it('Verify that destroy() waits for queued tasks to finish', async () => {
const tasksFinishedTimeout = 2500
const pool = new FixedThreadPool(
import { FixedClusterPool, PoolEvents } from '../../../lib/index.cjs'
import { DEFAULT_TASK_NAME } from '../../../lib/utils.cjs'
import { TaskFunctions } from '../../test-types.cjs'
-import { waitPoolEvents, waitWorkerEvents } from '../../test-utils.cjs'
+import { waitWorkerEvents } from '../../test-utils.cjs'
describe('Fixed cluster pool test suite', () => {
const numberOfWorkers = 8
expect(result).toStrictEqual({ ok: 1 })
})
- it("Verify that 'ready' event is emitted", async () => {
- const pool = new FixedClusterPool(
- numberOfWorkers,
- './tests/worker-files/cluster/testWorker.cjs',
- {
- errorHandler: e => console.error(e),
- }
- )
- expect(pool.emitter.eventNames()).toStrictEqual([])
- let poolReady = 0
- pool.emitter.on(PoolEvents.ready, () => ++poolReady)
- await waitPoolEvents(pool, PoolEvents.ready, 1)
- expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.ready])
- expect(poolReady).toBe(1)
- await pool.destroy()
- })
-
- it("Verify that 'busy' event is emitted", async () => {
- const promises = new Set()
- expect(pool.emitter.eventNames()).toStrictEqual([])
- let poolBusy = 0
- pool.emitter.on(PoolEvents.busy, () => ++poolBusy)
- expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.busy])
- for (let i = 0; i < numberOfWorkers * 2; i++) {
- promises.add(pool.execute())
- }
- await Promise.all(promises)
- // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
- // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
- expect(poolBusy).toBe(numberOfWorkers + 1)
- })
-
it('Verify that tasks queuing is working', async () => {
const promises = new Set()
const maxMultiplier = 3 // Must be greater than tasksConcurrency
it('Shutdown test', async () => {
const exitPromise = waitWorkerEvents(pool, 'exit', numberOfWorkers)
- expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.busy])
+ expect(pool.emitter.eventNames()).toStrictEqual([])
let poolDestroy = 0
pool.emitter.on(PoolEvents.destroy, () => ++poolDestroy)
- expect(pool.emitter.eventNames()).toStrictEqual([
- PoolEvents.busy,
- PoolEvents.destroy,
- ])
+ expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.destroy])
await pool.destroy()
const numberOfExitEvents = await exitPromise
expect(pool.info.started).toBe(false)
- expect(pool.emitter.eventNames()).toStrictEqual([
- PoolEvents.busy,
- PoolEvents.destroy,
- ])
+ expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.destroy])
expect(pool.readyEventEmitted).toBe(false)
expect(pool.workerNodes.length).toBe(0)
expect(numberOfExitEvents).toBe(numberOfWorkers)
import { FixedThreadPool, PoolEvents } from '../../../lib/index.cjs'
import { DEFAULT_TASK_NAME } from '../../../lib/utils.cjs'
import { TaskFunctions } from '../../test-types.cjs'
-import { waitPoolEvents, waitWorkerEvents } from '../../test-utils.cjs'
+import { waitWorkerEvents } from '../../test-utils.cjs'
describe('Fixed thread pool test suite', () => {
const numberOfThreads = 6
expect(result).toStrictEqual({ ok: 1 })
})
- it("Verify that 'ready' event is emitted", async () => {
- const pool = new FixedThreadPool(
- numberOfThreads,
- './tests/worker-files/thread/testWorker.mjs',
- {
- errorHandler: e => console.error(e),
- }
- )
- expect(pool.emitter.eventNames()).toStrictEqual([])
- let poolReady = 0
- pool.emitter.on(PoolEvents.ready, () => ++poolReady)
- await waitPoolEvents(pool, PoolEvents.ready, 1)
- expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.ready])
- expect(poolReady).toBe(1)
- await pool.destroy()
- })
-
- it("Verify that 'busy' event is emitted", async () => {
- const promises = new Set()
- expect(pool.emitter.eventNames()).toStrictEqual([])
- let poolBusy = 0
- pool.emitter.on(PoolEvents.busy, () => ++poolBusy)
- expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.busy])
- for (let i = 0; i < numberOfThreads * 2; i++) {
- promises.add(pool.execute())
- }
- await Promise.all(promises)
- // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
- // So in total numberOfThreads + 1 times for a loop submitting up to numberOfThreads * 2 tasks to the fixed pool.
- expect(poolBusy).toBe(numberOfThreads + 1)
- })
-
it('Verify that tasks queuing is working', async () => {
const promises = new Set()
const maxMultiplier = 3 // Must be greater than tasksConcurrency
it('Shutdown test', async () => {
const exitPromise = waitWorkerEvents(pool, 'exit', numberOfThreads)
- expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.busy])
+ expect(pool.emitter.eventNames()).toStrictEqual([])
let poolDestroy = 0
pool.emitter.on(PoolEvents.destroy, () => ++poolDestroy)
- expect(pool.emitter.eventNames()).toStrictEqual([
- PoolEvents.busy,
- PoolEvents.destroy,
- ])
+ expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.destroy])
await pool.destroy()
const numberOfExitEvents = await exitPromise
expect(pool.info.started).toBe(false)
- expect(pool.emitter.eventNames()).toStrictEqual([
- PoolEvents.busy,
- PoolEvents.destroy,
- ])
+ expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.destroy])
expect(pool.readyEventEmitted).toBe(false)
expect(pool.workerNodes.length).toBe(0)
expect(numberOfExitEvents).toBe(numberOfThreads)