import { dirname, join } from 'node:path'
import { readFileSync } from 'node:fs'
import { fileURLToPath } from 'node:url'
+import { createHook, executionAsyncId } from 'node:async_hooks'
import { expect } from 'expect'
import { restore, stub } from 'sinon'
import {
restore()
})
- it('Simulate pool creation from a non main thread/process', () => {
+ it('Verify that pool can be created and destroyed', async () => {
+ const pool = new FixedThreadPool(
+ numberOfWorkers,
+ './tests/worker-files/thread/testWorker.mjs'
+ )
+ expect(pool).toBeInstanceOf(FixedThreadPool)
+ await pool.destroy()
+ })
+
+ it('Verify that pool cannot be created from a non main thread/process', () => {
expect(
() =>
new StubPoolWithIsMain(
numberOfWorkers,
'./tests/worker-files/thread/testWorker.mjs'
)
- expect(pool.starting).toBe(false)
expect(pool.started).toBe(true)
+ expect(pool.starting).toBe(false)
+ expect(pool.destroying).toBe(false)
await pool.destroy()
})
it('Verify that filePath is checked', () => {
expect(() => new FixedThreadPool(numberOfWorkers)).toThrow(
- new Error("Cannot find the worker file 'undefined'")
+ new TypeError('The worker file path must be specified')
+ )
+ expect(() => new FixedThreadPool(numberOfWorkers, 0)).toThrow(
+ new TypeError('The worker file path must be a string')
)
expect(
() => new FixedThreadPool(numberOfWorkers, './dummyWorker.ts')
executing: 0,
queued: 0,
maxQueued: 0,
+ sequentiallyStolen: 0,
stolen: 0,
failed: 0
},
await pool.destroy()
})
+ it('Verify that pool statuses are checked at start or destroy', async () => {
+ const pool = new FixedThreadPool(
+ numberOfWorkers,
+ './tests/worker-files/thread/testWorker.mjs'
+ )
+ expect(pool.info.started).toBe(true)
+ expect(pool.info.ready).toBe(true)
+ expect(() => pool.start()).toThrow(
+ new Error('Cannot start an already started pool')
+ )
+ await pool.destroy()
+ expect(pool.info.started).toBe(false)
+ expect(pool.info.ready).toBe(false)
+ await expect(pool.destroy()).rejects.toThrow(
+ new Error('Cannot destroy an already destroyed pool')
+ )
+ })
+
it('Verify that pool can be started after initialization', async () => {
const pool = new FixedClusterPool(
numberOfWorkers,
)
expect(pool.info.started).toBe(false)
expect(pool.info.ready).toBe(false)
+ expect(pool.readyEventEmitted).toBe(false)
expect(pool.workerNodes).toStrictEqual([])
await expect(pool.execute()).rejects.toThrow(
new Error('Cannot execute a task on not started pool')
pool.start()
expect(pool.info.started).toBe(true)
expect(pool.info.ready).toBe(true)
+ await waitPoolEvents(pool, PoolEvents.ready, 1)
+ expect(pool.readyEventEmitted).toBe(true)
expect(pool.workerNodes.length).toBe(numberOfWorkers)
for (const workerNode of pool.workerNodes) {
expect(workerNode).toBeInstanceOf(WorkerNode)
executing: maxMultiplier,
queued: 0,
maxQueued: 0,
+ sequentiallyStolen: 0,
stolen: 0,
failed: 0
},
executing: 0,
queued: 0,
maxQueued: 0,
+ sequentiallyStolen: 0,
stolen: 0,
failed: 0
},
executing: 0,
queued: 0,
maxQueued: 0,
+ sequentiallyStolen: 0,
stolen: 0,
failed: 0
},
executing: 0,
queued: 0,
maxQueued: 0,
+ sequentiallyStolen: 0,
stolen: 0,
failed: 0
},
stolenTasks: expect.any(Number),
failedTasks: expect.any(Number)
})
- expect(pool.hasBackPressure.called).toBe(true)
+ expect(pool.hasBackPressure.callCount).toBe(5)
+ await pool.destroy()
+ })
+
+ it('Verify that pool asynchronous resource track tasks execution', async () => {
+ let taskAsyncId
+ let initCalls = 0
+ let beforeCalls = 0
+ let afterCalls = 0
+ let resolveCalls = 0
+ const hook = createHook({
+ init (asyncId, type) {
+ if (type === 'poolifier:task') {
+ initCalls++
+ taskAsyncId = asyncId
+ }
+ },
+ before (asyncId) {
+ if (asyncId === taskAsyncId) beforeCalls++
+ },
+ after (asyncId) {
+ if (asyncId === taskAsyncId) afterCalls++
+ },
+ promiseResolve () {
+ if (executionAsyncId() === taskAsyncId) resolveCalls++
+ }
+ })
+ const pool = new FixedThreadPool(
+ numberOfWorkers,
+ './tests/worker-files/thread/testWorker.mjs'
+ )
+ hook.enable()
+ await pool.execute()
+ hook.disable()
+ expect(initCalls).toBe(1)
+ expect(beforeCalls).toBe(1)
+ expect(afterCalls).toBe(1)
+ expect(resolveCalls).toBe(1)
await pool.destroy()
})
executed: expect.any(Number),
executing: 0,
queued: 0,
+ sequentiallyStolen: 0,
stolen: 0,
failed: 0
},
'./tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs'
)
await waitPoolEvents(dynamicThreadPool, PoolEvents.ready, 1)
+ const workerId = dynamicThreadPool.workerNodes[0].info.id
await expect(dynamicThreadPool.setDefaultTaskFunction(0)).rejects.toThrow(
new Error(
- "Task function operation 'default' failed on worker 31 with error: 'TypeError: name parameter is not a string'"
+ `Task function operation 'default' failed on worker ${workerId} with error: 'TypeError: name parameter is not a string'`
)
)
await expect(
dynamicThreadPool.setDefaultTaskFunction(DEFAULT_TASK_NAME)
).rejects.toThrow(
new Error(
- "Task function operation 'default' failed on worker 31 with error: 'Error: Cannot set the default task function reserved name as the default task function'"
+ `Task function operation 'default' failed on worker ${workerId} with error: 'Error: Cannot set the default task function reserved name as the default task function'`
)
)
await expect(
dynamicThreadPool.setDefaultTaskFunction('unknown')
).rejects.toThrow(
new Error(
- "Task function operation 'default' failed on worker 31 with error: 'Error: Cannot set the default task function to a non-existing task function'"
+ `Task function operation 'default' failed on worker ${workerId} with error: 'Error: Cannot set the default task function to a non-existing task function'`
)
)
expect(dynamicThreadPool.listTaskFunctionNames()).toStrictEqual([
executing: 0,
failed: 0,
queued: 0,
+ sequentiallyStolen: 0,
stolen: 0
},
runTime: {