From 1c02490bb950dbb6dc413f9cb7c36aae8fb4a254 Mon Sep 17 00:00:00 2001 From: =?utf8?q?J=C3=A9r=C3=B4me=20Benoit?= Date: Tue, 1 Jul 2025 16:07:01 +0200 Subject: [PATCH] fix: ensure using task abortion API is not altering operations MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Signed-off-by: Jérôme Benoit --- src/pools/abstract-pool.ts | 2 +- tests/pools/abstract-pool.test.mjs | 16 ++++++++++++---- tests/pools/cluster/dynamic.test.mjs | 20 ++++++++++++++------ tests/pools/cluster/fixed.test.mjs | 20 ++++++++++++++------ tests/pools/thread/dynamic.test.mjs | 20 ++++++++++++++------ tests/pools/thread/fixed.test.mjs | 20 ++++++++++++++------ 6 files changed, 69 insertions(+), 29 deletions(-) diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index 695861966..60f350645 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -2011,7 +2011,7 @@ export abstract class AbstractPool< abortSignal?.addEventListener( 'abort', () => { - this.workerNodes[workerNodeKey].emit('abortTask', { + this.workerNodes[workerNodeKey]?.emit('abortTask', { taskId: task.taskId, // eslint-disable-next-line @typescript-eslint/no-non-null-assertion workerId: this.getWorkerInfo(workerNodeKey)!.id!, diff --git a/tests/pools/abstract-pool.test.mjs b/tests/pools/abstract-pool.test.mjs index 542e491c1..2887aba77 100644 --- a/tests/pools/abstract-pool.test.mjs +++ b/tests/pools/abstract-pool.test.mjs @@ -1950,15 +1950,17 @@ describe('Abstract pool test suite', () => { new Error("Task function 'unknown' not found") ) let results = await pool.mapExecute( - [{}, {}, {}, {}], - 'jsonIntegerSerialization' + Array(4).fill({}), + 'jsonIntegerSerialization', + Array(4).fill(AbortSignal.timeout(1000)) ) expect(results).toStrictEqual([{ ok: 1 }, { ok: 1 }, { ok: 1 }, { ok: 1 }]) expect(pool.info.executingTasks).toBe(0) expect(pool.info.executedTasks).toBe(4) results = await pool.mapExecute( [{ n: 10 }, { n: 20 }, { n: 30 }, { n: 40 }], - 'factorial' + 'factorial', + Array(4).fill(AbortSignal.timeout(1000)) ) expect(results).toStrictEqual([ 3628800, 2432902008176640000, 2.6525285981219103e32, 8.159152832478977e47, @@ -1967,7 +1969,13 @@ describe('Abstract pool test suite', () => { expect(pool.info.executedTasks).toBe(8) results = await pool.mapExecute( new Set([{ n: 10 }, { n: 20 }, { n: 30 }, { n: 40 }]), - 'factorial' + 'factorial', + new Set([ + AbortSignal.timeout(1000), + AbortSignal.timeout(1500), + AbortSignal.timeout(2000), + AbortSignal.timeout(2500), + ]) ) expect(results).toStrictEqual([ 3628800, 2432902008176640000, 2.6525285981219103e32, 8.159152832478977e47, diff --git a/tests/pools/cluster/dynamic.test.mjs b/tests/pools/cluster/dynamic.test.mjs index e1b386a47..48e2fed45 100644 --- a/tests/pools/cluster/dynamic.test.mjs +++ b/tests/pools/cluster/dynamic.test.mjs @@ -25,13 +25,21 @@ describe('Dynamic cluster pool test suite', () => { }) it('Verify that the function is executed in a worker cluster', async () => { - let result = await pool.execute({ - function: TaskFunctions.fibonacci, - }) + let result = await pool.execute( + { + function: TaskFunctions.fibonacci, + }, + 'default', + AbortSignal.timeout(2000) + ) expect(result).toBe(354224848179262000000) - result = await pool.execute({ - function: TaskFunctions.factorial, - }) + result = await pool.execute( + { + function: TaskFunctions.factorial, + }, + 'default', + AbortSignal.timeout(2000) + ) expect(result).toBe(9.33262154439441e157) }) diff --git a/tests/pools/cluster/fixed.test.mjs b/tests/pools/cluster/fixed.test.mjs index e41a8a6fc..a3f69131a 100644 --- a/tests/pools/cluster/fixed.test.mjs +++ b/tests/pools/cluster/fixed.test.mjs @@ -70,13 +70,21 @@ describe('Fixed cluster pool test suite', () => { }) it('Verify that the function is executed in a worker cluster', async () => { - let result = await pool.execute({ - function: TaskFunctions.fibonacci, - }) + let result = await pool.execute( + { + function: TaskFunctions.fibonacci, + }, + 'default', + AbortSignal.timeout(2000) + ) expect(result).toBe(354224848179262000000) - result = await pool.execute({ - function: TaskFunctions.factorial, - }) + result = await pool.execute( + { + function: TaskFunctions.factorial, + }, + 'default', + AbortSignal.timeout(2000) + ) expect(result).toBe(9.33262154439441e157) }) diff --git a/tests/pools/thread/dynamic.test.mjs b/tests/pools/thread/dynamic.test.mjs index d78ebbd94..da4d5eae8 100644 --- a/tests/pools/thread/dynamic.test.mjs +++ b/tests/pools/thread/dynamic.test.mjs @@ -25,13 +25,21 @@ describe('Dynamic thread pool test suite', () => { }) it('Verify that the function is executed in a worker thread', async () => { - let result = await pool.execute({ - function: TaskFunctions.fibonacci, - }) + let result = await pool.execute( + { + function: TaskFunctions.fibonacci, + }, + 'default', + AbortSignal.timeout(2000) + ) expect(result).toBe(354224848179262000000) - result = await pool.execute({ - function: TaskFunctions.factorial, - }) + result = await pool.execute( + { + function: TaskFunctions.factorial, + }, + 'default', + AbortSignal.timeout(2000) + ) expect(result).toBe(9.33262154439441e157) }) diff --git a/tests/pools/thread/fixed.test.mjs b/tests/pools/thread/fixed.test.mjs index ac7937095..f83147c66 100644 --- a/tests/pools/thread/fixed.test.mjs +++ b/tests/pools/thread/fixed.test.mjs @@ -69,13 +69,21 @@ describe('Fixed thread pool test suite', () => { }) it('Verify that the function is executed in a worker thread', async () => { - let result = await pool.execute({ - function: TaskFunctions.fibonacci, - }) + let result = await pool.execute( + { + function: TaskFunctions.fibonacci, + }, + 'default', + AbortSignal.timeout(2000) + ) expect(result).toBe(354224848179262000000) - result = await pool.execute({ - function: TaskFunctions.factorial, - }) + result = await pool.execute( + { + function: TaskFunctions.factorial, + }, + 'default', + AbortSignal.timeout(2000) + ) expect(result).toBe(9.33262154439441e157) }) -- 2.43.0