+ expect(pool.hasBackPressure.callCount).toBeGreaterThanOrEqual(7)
+ await pool.destroy()
+ })
+
+ it('Verify that destroy() waits for queued tasks to finish', async () => {
+ const tasksFinishedTimeout = 2500
+ const pool = new FixedThreadPool(
+ numberOfWorkers,
+ './tests/worker-files/thread/asyncWorker.mjs',
+ {
+ enableTasksQueue: true,
+ tasksQueueOptions: { tasksFinishedTimeout }
+ }
+ )
+ const maxMultiplier = 4
+ let tasksFinished = 0
+ for (const workerNode of pool.workerNodes) {
+ workerNode.on('taskFinished', () => {
+ ++tasksFinished
+ })
+ }
+ for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
+ pool.execute()
+ }
+ expect(pool.info.queuedTasks).toBeGreaterThan(0)
+ const startTime = performance.now()
+ await pool.destroy()
+ const elapsedTime = performance.now() - startTime
+ expect(tasksFinished).toBeLessThanOrEqual(numberOfWorkers * maxMultiplier)
+ expect(elapsedTime).toBeGreaterThanOrEqual(2000)
+ expect(elapsedTime).toBeLessThanOrEqual(tasksFinishedTimeout + 800)
+ })
+
+ it('Verify that destroy() waits until the tasks finished timeout is reached', async () => {
+ const tasksFinishedTimeout = 1000
+ const pool = new FixedThreadPool(
+ numberOfWorkers,
+ './tests/worker-files/thread/asyncWorker.mjs',
+ {
+ enableTasksQueue: true,
+ tasksQueueOptions: { tasksFinishedTimeout }
+ }
+ )
+ const maxMultiplier = 4
+ let tasksFinished = 0
+ for (const workerNode of pool.workerNodes) {
+ workerNode.on('taskFinished', () => {
+ ++tasksFinished
+ })
+ }
+ for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
+ pool.execute()
+ }
+ expect(pool.info.queuedTasks).toBeGreaterThan(0)
+ const startTime = performance.now()
+ await pool.destroy()
+ const elapsedTime = performance.now() - startTime
+ expect(tasksFinished).toBe(0)
+ expect(elapsedTime).toBeLessThanOrEqual(tasksFinishedTimeout + 800)
+ })
+
+ it('Verify that pool asynchronous resource track tasks execution', async () => {
+ let taskAsyncId
+ let initCalls = 0
+ let beforeCalls = 0
+ let afterCalls = 0
+ let resolveCalls = 0
+ const hook = createHook({
+ init (asyncId, type) {
+ if (type === 'poolifier:task') {
+ initCalls++
+ taskAsyncId = asyncId
+ }
+ },
+ before (asyncId) {
+ if (asyncId === taskAsyncId) beforeCalls++
+ },
+ after (asyncId) {
+ if (asyncId === taskAsyncId) afterCalls++
+ },
+ promiseResolve () {
+ if (executionAsyncId() === taskAsyncId) resolveCalls++
+ }
+ })
+ const pool = new FixedThreadPool(
+ numberOfWorkers,
+ './tests/worker-files/thread/testWorker.mjs'
+ )
+ hook.enable()
+ await pool.execute()
+ hook.disable()
+ expect(initCalls).toBe(1)
+ expect(beforeCalls).toBe(1)
+ expect(afterCalls).toBe(1)
+ expect(resolveCalls).toBe(1)