]> Piment Noir Git Repositories - poolifier.git/commitdiff
feat!: add abortable task support (#1625)
authorJérôme Benoit <jerome.benoit@piment-noir.org>
Tue, 1 Jul 2025 13:24:02 +0000 (15:24 +0200)
committerGitHub <noreply@github.com>
Tue, 1 Jul 2025 13:24:02 +0000 (15:24 +0200)
* refactor: make worker a first class event emitter

Signed-off-by: Jérôme Benoit <jerome.benoit@piment-noir.org>
* docs: fix API documentation

Signed-off-by: Jérôme Benoit <jerome.benoit@piment-noir.org>
* docs: move changelog entry at the beginning

Signed-off-by: Jérôme Benoit <jerome.benoit@piment-noir.org>
* build(ci): silence linter errors

Signed-off-by: Jérôme Benoit <jerome.benoit@piment-noir.org>
* build: fix linter errors

Signed-off-by: Jérôme Benoit <jerome.benoit@piment-noir.org>
* build: silence linter error

Signed-off-by: Jérôme Benoit <jerome.benoit@piment-noir.org>
* feat: add delete op to priority queue

Signed-off-by: Jérôme Benoit <jerome.benoit@piment-noir.org>
* test: add priority queue delete() test

Signed-off-by: Jérôme Benoit <jerome.benoit@piment-noir.org>
* docs: add missing @param to workerNode.deleteTask()

Signed-off-by: Jérôme Benoit <jerome.benoit@piment-noir.org>
* docs: add code comment

Signed-off-by: Jérôme Benoit <jerome.benoit@piment-noir.org>
* refactor: silencer linter

Signed-off-by: Jérôme Benoit <jerome.benoit@piment-noir.org>
* fix: fix queues delete() implementation

Signed-off-by: Jérôme Benoit <jerome.benoit@piment-noir.org>
* fix: fix delete() in fixed queue

Signed-off-by: Jérôme Benoit <jerome.benoit@piment-noir.org>
* Update src/priority-queue.ts

* Update src/priority-queue.ts

* Update src/priority-queue.ts

* test: improve priority queue delete() coverage

Signed-off-by: Jérôme Benoit <jerome.benoit@piment-noir.org>
* build(ci): silence linting errors

Signed-off-by: Jérôme Benoit <jerome.benoit@piment-noir.org>
* refactor: code reformatting

Signed-off-by: Jérôme Benoit <jerome.benoit@piment-noir.org>
* refactor: code reformatting

Signed-off-by: Jérôme Benoit <jerome.benoit@piment-noir.org>
* refactor: code formatting

Signed-off-by: Jérôme Benoit <jerome.benoit@piment-noir.org>
* chore: fix merge issues

Signed-off-by: Jérôme Benoit <jerome.benoit@piment-noir.org>
* docs(api): formatting

Signed-off-by: Jérôme Benoit <jerome.benoit@piment-noir.org>
* chore: merge eslint-plugin-perfectionist reformatting

Signed-off-by: Jérôme Benoit <jerome.benoit@piment-noir.org>
* [autofix.ci] apply automated fixes

* chore: fix mismerge

Signed-off-by: Jérôme Benoit <jerome.benoit@piment-noir.org>
* chore: fix CHANGELOG.md mismerge

Signed-off-by: Jérôme Benoit <jerome.benoit@piment-noir.org>
* chore: silence linter

Signed-off-by: Jérôme Benoit <jerome.benoit@piment-noir.org>
* fix: ensure abort signal is defined before usage

Signed-off-by: Jérôme Benoit <jerome.benoit@piment-noir.org>
* refactor: code formatting

Signed-off-by: Jérôme Benoit <jerome.benoit@piment-noir.org>
* [autofix.ci] apply automated fixes

* [autofix.ci] apply automated fixes (attempt 2/3)

* refactor: make mapExecute() properly handle one abort signal per task
data

Signed-off-by: Jérôme Benoit <jerome.benoit@piment-noir.org>
* refactor: propagate abort signal error when possible

Signed-off-by: Jérôme Benoit <jerome.benoit@piment-noir.org>
* test: improve queue delete() coverage

Signed-off-by: Jérôme Benoit <jerome.benoit@piment-noir.org>
* test: add abort error UT

Signed-off-by: Jérôme Benoit <jerome.benoit@piment-noir.org>
* test: tests for tasks abortion

Signed-off-by: Jérôme Benoit <jerome.benoit@piment-noir.org>
* refactor: fix taskId type definition

Signed-off-by: Jérôme Benoit <jerome.benoit@piment-noir.org>
* refactor: cleanup AbortError properties

Signed-off-by: Jérôme Benoit <jerome.benoit@piment-noir.org>
* docs(README.md): refinement

Signed-off-by: Jérôme Benoit <jerome.benoit@piment-noir.org>
* fix: ensure task abortion play nice with task stealing

Signed-off-by: Jérôme Benoit <jerome.benoit@piment-noir.org>
* fix: cascaded tasks abortion is legit

Signed-off-by: Jérôme Benoit <jerome.benoit@piment-noir.org>
---------

Signed-off-by: Jérôme Benoit <jerome.benoit@piment-noir.org>
Co-authored-by: Jérôme Benoit <jerome.benoit@sap.com>
Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
32 files changed:
.vscode/settings.json
README.md
docs/api.md
examples/typescript/http-server-pool/fastify-hybrid/@types/fastify/index.d.ts
examples/typescript/http-server-pool/fastify-hybrid/src/fastify-poolifier.ts
examples/typescript/http-server-pool/fastify-worker_threads/@types/fastify/index.d.ts
examples/typescript/http-server-pool/fastify-worker_threads/src/fastify-poolifier.ts
src/pools/abstract-pool.ts
src/pools/pool.ts
src/pools/utils.ts
src/pools/worker-node.ts
src/pools/worker.ts
src/queues/abstract-fixed-queue.ts
src/queues/priority-queue.ts
src/queues/queue-types.ts
src/utility-types.ts
src/worker/abort-error.ts [new file with mode: 0644]
src/worker/abstract-worker.ts
src/worker/cluster-worker.ts
src/worker/thread-worker.ts
src/worker/worker-types.ts [new file with mode: 0644]
tests/pools/abstract-pool.test.mjs
tests/pools/cluster/fixed.test.mjs
tests/pools/thread/fixed.test.mjs
tests/pools/utils.test.mjs
tests/pools/worker-node.test.mjs
tests/queues/fixed-priority-queue.test.mjs
tests/queues/fixed-queue.test.mjs
tests/queues/priority-queue.test.mjs
tests/worker/abort-error.test.mjs [new file with mode: 0644]
tests/worker/cluster-worker.test.mjs
tests/worker/thread-worker.test.mjs

index 510af97ac124e64633646d8b876d40c3acfd4912..e9b64b23b2232c26e776764372a0b0f6f59281aa 100644 (file)
@@ -3,6 +3,7 @@
     "source.fixAll": "explicit"
   },
   "cSpell.words": [
+    "abortable",
     "Alessandro",
     "Ardizio",
     "autobuild",
index ddaa511dd29b16a5b92058a56902f348f3d237f6..2bd85571e03ddbce0aeba049a56837c22cd5f372 100644 (file)
--- a/README.md
+++ b/README.md
@@ -46,6 +46,7 @@ Please consult our [general guidelines](#general-guidelines).
   - Tasks stealing under back pressure :white_check_mark:
   - Tasks redistribution on worker error :white_check_mark:
 - Support for sync and async task function :white_check_mark:
+- Support for abortable task function :white_check_mark:
 - Support for multiple task functions with per task function queuing priority and tasks distribution strategy :white_check_mark:
 - Support for task functions [CRUD](https://en.wikipedia.org/wiki/Create,_read,_update_and_delete) operations at runtime :white_check_mark:
 - General guidelines on pool choice :white_check_mark:
index 12b552af2a48f491f60a924bd8951c32467dc999..79243daa2728d1146910b22611f038cb7466436d 100644 (file)
@@ -5,8 +5,8 @@
 - [Pool](#pool)
   - [`pool = new FixedThreadPool/FixedClusterPool(numberOfThreads/numberOfWorkers, filePath, opts)`](#pool--new-fixedthreadpoolfixedclusterpoolnumberofthreadsnumberofworkers-filepath-opts)
   - [`pool = new DynamicThreadPool/DynamicClusterPool(min, max, filePath, opts)`](#pool--new-dynamicthreadpooldynamicclusterpoolmin-max-filepath-opts)
-  - [`pool.execute(data, name, transferList)`](#poolexecutedata-name-transferlist)
-  - [`pool.mapExecute(data, name, transferList)`](#poolmapexecutedata-name-transferlist)
+  - [`pool.execute(data, name, abortSignal, transferList)`](#poolexecutedata-name-abortsignal-transferlist)
+  - [`pool.mapExecute(data, name, abortSignals, transferList)`](#poolmapexecutedata-name-abortsignals-transferlist)
   - [`pool.start()`](#poolstart)
   - [`pool.destroy()`](#pooldestroy)
   - [`pool.hasTaskFunction(name)`](#poolhastaskfunctionname)
 `filePath` (mandatory) Path to a file with a worker implementation.  
 `opts` (optional) An object with the pool options properties described below.
 
-### `pool.execute(data, name, transferList)`
+### `pool.execute(data, name, abortSignal, transferList)`
 
 `data` (optional) An object that you want to pass to your worker task function implementation.  
 `name` (optional) A string with the task function name that you want to execute on the worker. Default: `'default'`  
+`abortSignal` (optional) An abort signal to abort the task function execution.  
 `transferList` (optional) An array of transferable objects that you want to transfer to your [`ThreadWorker`](#class-yourworker-extends-threadworkerclusterworker) worker implementation.
 
 This method is available on both pool implementations and returns a promise with the task function execution response.
 
-### `pool.mapExecute(data, name, transferList)`
+### `pool.mapExecute(data, name, abortSignals, transferList)`
 
-`data` Iterable objects that you want to pass to your worker task function implementation.  
+`data` An iterable of objects that you want to pass to your worker task function implementation.  
 `name` (optional) A string with the task function name that you want to execute on the worker. Default: `'default'`  
+`abortSignals` (optional) An iterable of AbortSignal to abort the matching object task function execution.  
 `transferList` (optional) An array of transferable objects that you want to transfer to your [`ThreadWorker`](#class-yourworker-extends-threadworkerclusterworker) worker implementation.
 
 This method is available on both pool implementations and returns a promise with the task function execution responses array.
@@ -105,7 +107,6 @@ An object with these properties:
   Default: `() => {}`
 
 - `workerChoiceStrategy` (optional) - The default worker choice strategy to use in this pool:
-
   - `WorkerChoiceStrategies.ROUND_ROBIN`: Submit tasks to worker in a round robin fashion
   - `WorkerChoiceStrategies.LEAST_USED`: Submit tasks to the worker with the minimum number of executing and queued tasks
   - `WorkerChoiceStrategies.LEAST_BUSY`: Submit tasks to the worker with the minimum tasks execution time
@@ -119,7 +120,6 @@ An object with these properties:
 
 - `workerChoiceStrategyOptions` (optional) - The worker choice strategy options object to use in this pool.  
   Properties:
-
   - `measurement` (optional) - The measurement to use in worker choice strategies: `runTime`, `waitTime` or `elu`.
   - `runTime` (optional) - Use the tasks [simple moving median](./worker-choice-strategies.md) runtime instead of the tasks simple moving average runtime in worker choice strategies.
   - `waitTime` (optional) - Use the tasks [simple moving median](./worker-choice-strategies.md) wait time instead of the tasks simple moving average wait time in worker choice strategies.
@@ -139,7 +139,6 @@ An object with these properties:
 
 - `tasksQueueOptions` (optional) - The worker tasks queue options object to use in this pool.  
   Properties:
-
   - `size` (optional) - The maximum number of tasks that can be queued on a worker before flagging it as back pressured. It must be a positive integer.
   - `concurrency` (optional) - The maximum number of tasks that can be executed concurrently on a worker. It must be a positive integer.
   - `taskStealing` (optional) - Task stealing enablement on idle.
index 2ffda708537ae6797a32df215e054130a0d3b5db..5d61b6eaf707c2ec71e84089fe4184ecbf89c1d6 100644 (file)
@@ -9,11 +9,13 @@ declare module 'fastify' {
     execute: (
       data?: ThreadWorkerData,
       name?: string,
+      abortSignal?: AbortSignal,
       transferList?: readonly Transferable[]
     ) => Promise<ThreadWorkerResponse>
     mapExecute: (
       data: Iterable<ThreadWorkerData>,
       name?: string,
+      abortSignals?: Iterable<AbortSignal>,
       transferList?: readonly Transferable[]
     ) => Promise<ThreadWorkerResponse[]>
     pool: DynamicThreadPool<ThreadWorkerData, ThreadWorkerResponse>
index 9808af10aaceb26de13e056be47e54a6eb824747..ef247f7cd4d1c9224d6948432425b0e9b61fa552 100644 (file)
@@ -40,9 +40,10 @@ const fastifyPoolifierPlugin: FastifyPluginCallback<FastifyPoolifierOptions> = (
       async (
         data?: ThreadWorkerData,
         name?: string,
+        abortSignal?: AbortSignal,
         transferList?: readonly Transferable[]
       ): Promise<ThreadWorkerResponse> =>
-        await pool.execute(data, name, transferList)
+        await pool.execute(data, name, abortSignal, transferList)
     )
   }
   if (!fastify.hasDecorator('mapExecute')) {
@@ -51,9 +52,10 @@ const fastifyPoolifierPlugin: FastifyPluginCallback<FastifyPoolifierOptions> = (
       async (
         data: Iterable<ThreadWorkerData>,
         name?: string,
+        abortSignals?: Iterable<AbortSignal>,
         transferList?: readonly Transferable[]
       ): Promise<ThreadWorkerResponse[]> =>
-        await pool.mapExecute(data, name, transferList)
+        await pool.mapExecute(data, name, abortSignals, transferList)
     )
   }
   done()
index 7e46d42188508dfa3c354fd6af8b577ccf29a3de..bd52a2f83d86cb81580e262f122a6115ce260c91 100644 (file)
@@ -9,11 +9,13 @@ declare module 'fastify' {
     execute: (
       data?: WorkerData,
       name?: string,
+      abortSignal?: AbortSignal,
       transferList?: readonly Transferable[]
     ) => Promise<WorkerResponse>
     mapExecute: (
       data: Iterable<WorkerData>,
       name?: string,
+      abortSignals?: Iterable<AbortSignal>,
       transferList?: readonly Transferable[]
     ) => Promise<WorkerResponse[]>
     pool: DynamicThreadPool<WorkerData, WorkerResponse>
index 2273b03177f2a1fdf5292e36a8c98f580de8f6d6..1d97e64c14beb53864f3385513693985740f5d4c 100644 (file)
@@ -40,8 +40,10 @@ const fastifyPoolifierPlugin: FastifyPluginCallback<FastifyPoolifierOptions> = (
       async (
         data?: WorkerData,
         name?: string,
+        abortSignal?: AbortSignal,
         transferList?: readonly Transferable[]
-      ): Promise<WorkerResponse> => await pool.execute(data, name, transferList)
+      ): Promise<WorkerResponse> =>
+        await pool.execute(data, name, abortSignal, transferList)
     )
   }
   if (!fastify.hasDecorator('mapExecute')) {
@@ -50,9 +52,10 @@ const fastifyPoolifierPlugin: FastifyPluginCallback<FastifyPoolifierOptions> = (
       async (
         data: Iterable<WorkerData>,
         name?: string,
+        abortSignals?: Iterable<AbortSignal>,
         transferList?: readonly Transferable[]
       ): Promise<WorkerResponse[]> =>
-        await pool.mapExecute(data, name, transferList)
+        await pool.mapExecute(data, name, abortSignals, transferList)
     )
   }
   done()
index dad20de3d28284a2aa3850b629e9f57a3c5dc9a9..695861966a9b18afc20b98fe319c93efde39528b 100644 (file)
@@ -667,6 +667,7 @@ export abstract class AbstractPool<
   public async execute (
     data?: Data,
     name?: string,
+    abortSignal?: AbortSignal,
     transferList?: readonly Transferable[]
   ): Promise<Response> {
     if (!this.started) {
@@ -681,10 +682,13 @@ export abstract class AbstractPool<
     if (name != null && typeof name === 'string' && name.trim().length === 0) {
       throw new TypeError('name argument must not be an empty string')
     }
+    if (abortSignal != null && !(abortSignal instanceof AbortSignal)) {
+      throw new TypeError('abortSignal argument must be an AbortSignal')
+    }
     if (transferList != null && !Array.isArray(transferList)) {
       throw new TypeError('transferList argument must be an array')
     }
-    return await this.internalExecute(data, name, transferList)
+    return await this.internalExecute(data, name, abortSignal, transferList)
   }
 
   /** @inheritDoc */
@@ -711,6 +715,7 @@ export abstract class AbstractPool<
   public async mapExecute (
     data: Iterable<Data>,
     name?: string,
+    abortSignals?: Iterable<AbortSignal>,
     transferList?: readonly Transferable[]
   ): Promise<Response[]> {
     if (!this.started) {
@@ -732,15 +737,42 @@ export abstract class AbstractPool<
     if (name != null && typeof name === 'string' && name.trim().length === 0) {
       throw new TypeError('name argument must not be an empty string')
     }
-    if (transferList != null && !Array.isArray(transferList)) {
-      throw new TypeError('transferList argument must be an array')
-    }
     if (!Array.isArray(data)) {
       data = [...data]
     }
+    if (abortSignals != null) {
+      if (typeof abortSignals[Symbol.iterator] !== 'function') {
+        throw new TypeError('abortSignals argument must be an iterable')
+      }
+      for (const abortSignal of abortSignals) {
+        if (!(abortSignal instanceof AbortSignal)) {
+          throw new TypeError(
+            'abortSignals argument must be an iterable of AbortSignal'
+          )
+        }
+      }
+      if (!Array.isArray(abortSignals)) {
+        abortSignals = [...abortSignals]
+      }
+      if ((data as Data[]).length !== (abortSignals as AbortSignal[]).length) {
+        throw new Error(
+          'data and abortSignals arguments must have the same length'
+        )
+      }
+    }
+    if (transferList != null && !Array.isArray(transferList)) {
+      throw new TypeError('transferList argument must be an array')
+    }
+    const tasks: [Data, AbortSignal | undefined][] = Array.from(
+      { length: (data as Data[]).length },
+      (_, i) => [
+        (data as Data[])[i],
+        abortSignals != null ? (abortSignals as AbortSignal[])[i] : undefined,
+      ]
+    )
     return await Promise.all(
-      (data as Data[]).map(data =>
-        this.internalExecute(data, name, transferList)
+      tasks.map(([data, abortSignal]) =>
+        this.internalExecute(data, name, abortSignal, transferList)
       )
     )
   }
@@ -969,6 +1001,7 @@ export abstract class AbstractPool<
         )
       }
     }
+    this.workerNodes[workerNodeKey].on('abortTask', this.abortTask)
   }
 
   /**
@@ -1318,6 +1351,43 @@ export abstract class AbstractPool<
     }
   }
 
+  private readonly abortTask = (eventDetail: WorkerNodeEventDetail): void => {
+    if (!this.started || this.destroying) {
+      return
+    }
+    const { taskId, workerId } = eventDetail
+    // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+    const promiseResponse = this.promiseResponseMap.get(taskId!)
+    if (promiseResponse == null) {
+      return
+    }
+    const { abortSignal, reject } = promiseResponse
+    if (abortSignal?.aborted === false) {
+      return
+    }
+    const workerNodeKey = this.getWorkerNodeKeyByWorkerId(workerId)
+    const workerNode = this.workerNodes[workerNodeKey]
+    if (!workerNode.info.ready) {
+      return
+    }
+    if (this.opts.enableTasksQueue === true) {
+      for (const task of workerNode.tasksQueue) {
+        const { abortable, name } = task
+        if (taskId === task.taskId && abortable === true) {
+          workerNode.info.queuedTaskAbortion = true
+          workerNode.deleteTask(task)
+          // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+          this.promiseResponseMap.delete(taskId!)
+          workerNode.info.queuedTaskAbortion = false
+          // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+          reject(this.getAbortError(name!, taskId!))
+          return
+        }
+      }
+    }
+    this.sendToWorker(workerNodeKey, { taskId, taskOperation: 'abort' })
+  }
+
   /**
    * Adds the given worker node in the pool worker nodes.
    * @param workerNode - The worker node.
@@ -1559,8 +1629,9 @@ export abstract class AbstractPool<
    * @param task - The task to execute.
    */
   private executeTask (workerNodeKey: number, task: Task<Data>): void {
+    const { transferList } = task
     this.beforeTaskExecutionHook(workerNodeKey, task)
-    this.sendToWorker(workerNodeKey, task, task.transferList)
+    this.sendToWorker(workerNodeKey, task, transferList)
     this.checkAndEmitTaskExecutionEvents()
   }
 
@@ -1570,6 +1641,19 @@ export abstract class AbstractPool<
     }
   }
 
+  private readonly getAbortError = (
+    taskName: string,
+    taskId: `${string}-${string}-${string}-${string}-${string}`
+  ): Error => {
+    const abortError = this.promiseResponseMap.get(taskId)?.abortSignal
+      ?.reason as Error | string
+    return abortError instanceof Error
+      ? abortError
+      : typeof abortError === 'string'
+        ? new Error(abortError)
+        : new Error(`Task '${taskName}' id '${taskId}' aborted`)
+  }
+
   /**
    * Gets task function worker choice strategy, if any.
    * @param name - The task function name.
@@ -1691,7 +1775,8 @@ export abstract class AbstractPool<
       const workerNode = this.workerNodes[workerNodeKey]
       if (workerError != null) {
         this.emitter?.emit(PoolEvents.taskError, workerError)
-        const error = this.handleWorkerError(workerError)
+        // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+        const error = this.handleWorkerError(taskId!, workerError)
         asyncResource != null
           ? asyncResource.runInAsyncScope(reject, this.emitter, error)
           : reject(error)
@@ -1729,13 +1814,21 @@ export abstract class AbstractPool<
     }
   }
 
-  private handleWorkerError (workerError: WorkerError): Error {
-    if (workerError.error != null) {
-      return workerError.error
+  private readonly handleWorkerError = (
+    taskId: `${string}-${string}-${string}-${string}-${string}`,
+    workerError: WorkerError
+  ): Error => {
+    const { aborted, error, message, name, stack } = workerError
+    if (aborted) {
+      // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+      return this.getAbortError(name!, taskId)
+    }
+    if (error != null) {
+      return error
     }
-    const error = new Error(workerError.message)
-    error.stack = workerError.stack
-    return error
+    const wError = new Error(message)
+    wError.stack = stack
+    return wError
   }
 
   private readonly handleWorkerNodeBackPressureEvent = (
@@ -1896,12 +1989,14 @@ export abstract class AbstractPool<
   private async internalExecute (
     data?: Data,
     name?: string,
+    abortSignal?: AbortSignal,
     transferList?: readonly Transferable[]
   ): Promise<Response> {
     return await new Promise<Response>((resolve, reject) => {
       const timestamp = performance.now()
       const workerNodeKey = this.chooseWorkerNode(name)
       const task: Task<Data> = {
+        abortable: abortSignal != null,
         data: data ?? ({} as Data),
         name: name ?? DEFAULT_TASK_NAME,
         priority: this.getWorkerNodeTaskFunctionPriority(workerNodeKey, name),
@@ -1913,6 +2008,30 @@ export abstract class AbstractPool<
         timestamp,
         transferList,
       }
+      abortSignal?.addEventListener(
+        'abort',
+        () => {
+          this.workerNodes[workerNodeKey].emit('abortTask', {
+            taskId: task.taskId,
+            // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+            workerId: this.getWorkerInfo(workerNodeKey)!.id!,
+          })
+        },
+        { once: true }
+      )
+      // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+      this.promiseResponseMap.set(task.taskId!, {
+        reject,
+        resolve,
+        workerNodeKey,
+        ...(this.emitter != null && {
+          asyncResource: new AsyncResource('poolifier:task', {
+            requireManualDestroy: true,
+            triggerAsyncId: this.emitter.asyncId,
+          }),
+        }),
+        abortSignal,
+      })
       if (
         this.opts.enableTasksQueue === false ||
         (this.opts.enableTasksQueue === true &&
@@ -1922,20 +2041,6 @@ export abstract class AbstractPool<
       } else {
         this.enqueueTask(workerNodeKey, task)
       }
-      queueMicrotask(() => {
-        // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
-        this.promiseResponseMap.set(task.taskId!, {
-          reject,
-          resolve,
-          workerNodeKey,
-          ...(this.emitter != null && {
-            asyncResource: new AsyncResource('poolifier:task', {
-              requireManualDestroy: true,
-              triggerAsyncId: this.emitter.asyncId,
-            }),
-          }),
-        })
-      })
     })
   }
 
@@ -2275,9 +2380,11 @@ export abstract class AbstractPool<
       !sourceWorkerNode.info.ready ||
       sourceWorkerNode.info.stolen ||
       sourceWorkerNode.info.stealing ||
+      sourceWorkerNode.info.queuedTaskAbortion ||
       !destinationWorkerNode.info.ready ||
       destinationWorkerNode.info.stolen ||
-      destinationWorkerNode.info.stealing
+      destinationWorkerNode.info.stealing ||
+      destinationWorkerNode.info.queuedTaskAbortion
     ) {
       return
     }
index 96050444a0969b06707f99f97fe29635ce720085..ba9d3b479fb88d8aaf52861e65063dce478672af 100644 (file)
@@ -132,12 +132,14 @@ export interface IPool<
    * Executes the specified function in the worker constructor with the task data input parameter.
    * @param data - The optional task input data for the specified task function. This can only be structured-cloneable data.
    * @param name - The optional name of the task function to execute. If not specified, the default task function will be executed.
-   * @param transferList - An optional array of transferable objects to transfer ownership of. Ownership of the transferred objects is given to the chosen pool's worker_threads worker and they should not be used in the main thread afterwards.
+   * @param abortSignal - The optional AbortSignal to abort the task.
+   * @param transferList - The optional array of transferable objects to transfer ownership of. Ownership of the transferred objects is given to the chosen pool's worker_threads worker and they should not be used in the main thread afterwards.
    * @returns Promise with a task function response that will be fulfilled when the task is completed.
    */
   readonly execute: (
     data?: Data,
     name?: string,
+    abortSignal?: AbortSignal,
     transferList?: readonly Transferable[]
   ) => Promise<Response>
   /**
@@ -159,12 +161,14 @@ export interface IPool<
    * Executes the specified function in the worker constructor with the tasks data iterable input parameter.
    * @param data - The tasks iterable input data for the specified task function. This can only be an iterable of structured-cloneable data.
    * @param name - The optional name of the task function to execute. If not specified, the default task function will be executed.
-   * @param transferList - An optional array of transferable objects to transfer ownership of. Ownership of the transferred objects is given to the chosen pool's worker_threads worker and they should not be used in the main thread afterwards.
+   * @param abortSignals - The optional iterable of AbortSignal to abort the tasks iterable.
+   * @param transferList - The optional array of transferable objects to transfer ownership of. Ownership of the transferred objects is given to the chosen pool's worker_threads worker and they should not be used in the main thread afterwards.
    * @returns Promise with an array of task function responses that will be fulfilled when the tasks are completed.
    */
   readonly mapExecute: (
     data: Iterable<Data>,
     name?: string,
+    abortSignals?: Iterable<AbortSignal>,
     transferList?: readonly Transferable[]
   ) => Promise<Response[]>
   /**
index baa4e4480e8fe989240b920986b8f9ac42b9443a..d708a5cf5b00a0caf314d815c1b30d8e370821e2 100644 (file)
@@ -440,6 +440,7 @@ export const initWorkerInfo = (worker: IWorker): WorkerInfo => {
     continuousStealing: false,
     dynamic: false,
     id: getWorkerId(worker),
+    queuedTaskAbortion: false,
     ready: false,
     stealing: false,
     stolen: false,
index 6c7235b8c0c90da462205fba3bcdde6b906e1f5a..8e33add0fff01894164dd2d8c4cdc553a324715f 100644 (file)
@@ -39,6 +39,8 @@ export class WorkerNode<Worker extends IWorker, Data = unknown>
   /** @inheritdoc */
   public strategyData?: StrategyData
   /** @inheritdoc */
+  public readonly tasksQueue: PriorityQueue<Task<Data>>
+  /** @inheritdoc */
   public tasksQueueBackPressureSize: number
   /** @inheritdoc */
   public usage: WorkerUsage
@@ -46,7 +48,6 @@ export class WorkerNode<Worker extends IWorker, Data = unknown>
   public readonly worker: Worker
   private setBackPressureFlag: boolean
   private readonly taskFunctionsUsage: Map<string, WorkerUsage>
-  private readonly tasksQueue: PriorityQueue<Task<Data>>
 
   /**
    * Constructs a new worker node.
@@ -81,6 +82,11 @@ export class WorkerNode<Worker extends IWorker, Data = unknown>
     this.tasksQueue.clear()
   }
 
+  /** @inheritdoc */
+  public deleteTask (task: Task<Data>): boolean {
+    return this.tasksQueue.delete(task)
+  }
+
   /** @inheritdoc */
   public deleteTaskFunctionWorkerUsage (name: string): boolean {
     return this.taskFunctionsUsage.delete(name)
index ff3a5c828b022102671ee2cbcca7a2a1e9b331e9..edc659d37169e3833d99da62314bc0b3e5430baa 100644 (file)
@@ -2,6 +2,7 @@ import type { EventEmitter } from 'node:events'
 import type { MessageChannel, WorkerOptions } from 'node:worker_threads'
 
 import type { CircularBuffer } from '../circular-buffer.js'
+import type { PriorityQueue } from '../queues/priority-queue.js'
 import type { Task, TaskFunctionProperties } from '../utility-types.js'
 
 /**
@@ -194,6 +195,12 @@ export interface IWorkerNode<Worker extends IWorker, Data = unknown>
    * Clears tasks queue.
    */
   readonly clearTasksQueue: () => void
+  /**
+   * Deletes a task from the tasks queue.
+   * @param task - The task to delete.
+   * @returns `true` if the task was deleted, `false` otherwise.
+   */
+  readonly deleteTask: (task: Task<Data>) => boolean
   /**
    * Deletes task function worker usage statistics.
    * @param name - The task function name.
@@ -259,6 +266,10 @@ export interface IWorkerNode<Worker extends IWorker, Data = unknown>
    * This is used to store data that are specific to the worker choice strategy.
    */
   strategyData?: StrategyData
+  /**
+   * Tasks queue.
+   */
+  readonly tasksQueue: PriorityQueue<Task<Data>>
   /**
    * Tasks queue back pressure size.
    * This is the number of tasks that can be enqueued before the worker node has back pressure.
@@ -319,6 +330,11 @@ export interface WorkerInfo {
    * Worker id.
    */
   readonly id: number | undefined
+  /**
+   * Queued task abortion flag.
+   * This flag is set to `true` when worker node is aborting a queued task.
+   */
+  queuedTaskAbortion: boolean
   /**
    * Ready flag.
    */
@@ -348,6 +364,7 @@ export interface WorkerInfo {
  * @internal
  */
 export interface WorkerNodeEventDetail {
+  taskId?: `${string}-${string}-${string}-${string}-${string}`
   workerId?: number
   workerNodeKey?: number
 }
index b3090adff68f1bb222d04546b66f1e539d1d3eee..9bbd32fe4c17f37006e78377da30a9657c392100 100644 (file)
@@ -36,6 +36,19 @@ export abstract class AbstractFixedQueue<T> implements IFixedQueue<T> {
     this.size = 0
   }
 
+  /** @inheritdoc */
+  public delete (data: T): boolean {
+    // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
+    const index = this.nodeArray.findIndex(node => node?.data === data)
+    if (index !== -1) {
+      this.nodeArray.splice(index, 1)
+      this.nodeArray.length = this.capacity
+      --this.size
+      return true
+    }
+    return false
+  }
+
   /** @inheritdoc */
   public dequeue (): T | undefined {
     if (this.empty()) {
index 51be4266f6a2695b358db188172dad767a8302a7..86961b396988ec0cedaa33b55028f745df55bcd2 100644 (file)
@@ -119,6 +119,36 @@ export class PriorityQueue<T> {
     this.maxSize = 0
   }
 
+  /**
+   * Deletes the given data from the priority queue.
+   * @param data - Data to delete.
+   * @returns `true` if the data was deleted, `false` otherwise.
+   */
+  public delete (data: T): boolean {
+    let node: PriorityQueueNode<T> | undefined = this.tail
+    let prev: PriorityQueueNode<T> | undefined
+    while (node != null) {
+      if (node.delete(data)) {
+        if (node.empty()) {
+          if (node === this.tail && node.next != null) {
+            this.tail = node.next
+            delete node.next
+          } else if (node.next != null && prev != null) {
+            prev.next = node.next
+            delete node.next
+          } else if (node.next == null && prev != null) {
+            delete prev.next
+            this.head = prev
+          }
+        }
+        return true
+      }
+      prev = node
+      node = node.next
+    }
+    return false
+  }
+
   /**
    * Dequeue data from the priority queue.
    * @param bucket - The prioritized bucket to dequeue from.
index 1e374b594a91052f440d51d47686655cea83abda..c8aba72ebb301e5be7b3893cc2d74a7a81d25dfc 100644 (file)
@@ -32,6 +32,12 @@ export interface IFixedQueue<T> {
    * Clears the fixed queue.
    */
   clear: () => void
+  /**
+   * Deletes the given data from the fixed priority queue.
+   * @param data - Data to delete.
+   * @returns `true` if the data was deleted, `false` otherwise.
+   */
+  delete: (data: T) => boolean
   /**
    * Dequeue data from the fixed queue.
    * @returns The dequeued data or `undefined` if the fixed queue is empty.
index 571de00c137e83b7daf4868c547901a1cf1282c5..cd395c5519a3cac44784f4c0b6d3528aee862ef9 100644 (file)
@@ -56,6 +56,11 @@ export interface MessageValue<Data = unknown, ErrorData = unknown>
    * Task functions properties.
    */
   readonly taskFunctionsProperties?: TaskFunctionProperties[]
+  /**
+   * Task operation:
+   * - `'abort'` - Abort a task.
+   */
+  readonly taskOperation?: 'abort'
   /**
    * Task performance.
    */
@@ -76,6 +81,10 @@ export interface MessageValue<Data = unknown, ErrorData = unknown>
  * @internal
  */
 export interface PromiseResponseWrapper<Response = unknown> {
+  /**
+   * The task abort signal.
+   */
+  readonly abortSignal?: AbortSignal
   /**
    * The asynchronous resource used to track the task execution.
    */
@@ -100,6 +109,10 @@ export interface PromiseResponseWrapper<Response = unknown> {
  * @internal
  */
 export interface Task<Data = unknown> {
+  /**
+   * Whether the task is abortable or not.
+   */
+  readonly abortable?: boolean
   /**
    * Task input data that will be passed to the worker.
    */
@@ -177,6 +190,10 @@ export interface TaskPerformance {
  * @typeParam Data - Type of data sent to the worker triggering an error. This can only be structured-cloneable data.
  */
 export interface WorkerError<Data = unknown> {
+  /**
+   * Whether the error is an abort error or not.
+   */
+  readonly aborted: boolean
   /**
    * Data triggering the error.
    */
diff --git a/src/worker/abort-error.ts b/src/worker/abort-error.ts
new file mode 100644 (file)
index 0000000..198a969
--- /dev/null
@@ -0,0 +1,6 @@
+export class AbortError extends Error {
+  public constructor (message: string) {
+    super(message)
+    this.name = 'AbortError'
+  }
+}
index b54834d7753bbc724dd392e0d02865ad600715e8..3b5e6af07c69322298d01cc875e520fd7852f097 100644 (file)
@@ -1,6 +1,7 @@
 import type { Worker } from 'node:cluster'
 import type { MessagePort } from 'node:worker_threads'
 
+import { EventEmitter } from 'node:events'
 import { performance } from 'node:perf_hooks'
 
 import type {
@@ -18,6 +19,7 @@ import type {
   TaskFunctions,
   TaskSyncFunction,
 } from './task-functions.js'
+import type { AbortTaskEventDetail } from './worker-types.js'
 
 import {
   buildTaskFunctionProperties,
@@ -26,6 +28,7 @@ import {
   isAsyncFunction,
   isPlainObject,
 } from '../utils.js'
+import { AbortError } from './abort-error.js'
 import {
   checkTaskFunctionName,
   checkValidTaskFunctionObjectEntry,
@@ -60,7 +63,7 @@ export abstract class AbstractWorker<
   MainWorker extends MessagePort | Worker,
   Data = unknown,
   Response = unknown
-> {
+> extends EventEmitter {
   /**
    * Handler id of the `activeInterval` worker activity check.
    */
@@ -79,6 +82,14 @@ export abstract class AbstractWorker<
    */
   protected statistics?: WorkerStatistics
 
+  /**
+   * Task abort functions processed by the worker when task operation 'abort' is received.
+   */
+  protected taskAbortFunctions: Map<
+    `${string}-${string}-${string}-${string}-${string}`,
+    () => void
+  >
+
   /**
    * Task function object(s) processed by the worker when the pool's `execute` method is invoked.
    */
@@ -97,10 +108,21 @@ export abstract class AbstractWorker<
     taskFunctions: TaskFunction<Data, Response> | TaskFunctions<Data, Response>,
     protected opts: WorkerOptions = DEFAULT_WORKER_OPTIONS
   ) {
+    super()
     if (this.isMain == null) {
       throw new Error('isMain parameter is mandatory')
     }
     this.checkTaskFunctions(taskFunctions)
+    this.taskAbortFunctions = new Map<
+      `${string}-${string}-${string}-${string}-${string}`,
+      () => void
+    >()
+    this.on('abortTask', (eventDetail: AbortTaskEventDetail) => {
+      const { taskId } = eventDetail
+      if (this.taskAbortFunctions.has(taskId)) {
+        this.taskAbortFunctions.get(taskId)?.()
+      }
+    })
     this.checkWorkerOptions(this.opts)
     if (!this.isMain) {
       // Should be once() but Node.js on windows has a bug that prevents it from working
@@ -268,6 +290,7 @@ export abstract class AbstractWorker<
    * @returns The worker error object.
    */
   protected abstract handleError (error: Error): {
+    aborted: boolean
     error?: Error
     message: string
     stack?: string
@@ -296,6 +319,7 @@ export abstract class AbstractWorker<
         this.sendToMainWorker({ kill: 'failure' })
       }
     }
+    this.removeAllListeners()
   }
 
   /**
@@ -376,6 +400,7 @@ export abstract class AbstractWorker<
       statistics,
       taskFunctionOperation,
       taskId,
+      taskOperation,
     } = message
     if (statistics != null) {
       // Statistics message received
@@ -389,6 +414,9 @@ export abstract class AbstractWorker<
     } else if (taskId != null && data != null) {
       // Task message received
       this.run(message)
+    } else if (taskOperation === 'abort' && taskId != null) {
+      // Abort task operation message received
+      this.emit('abortTask', { taskId })
     } else if (kill === true) {
       // Kill message received
       this.handleKillMessage(message)
@@ -400,7 +428,7 @@ export abstract class AbstractWorker<
    * @param task - The task to execute.
    */
   protected readonly run = (task: Task<Data>): void => {
-    const { data, name, taskId } = task
+    const { abortable, data, name, taskId } = task
     const taskFunctionName = name ?? DEFAULT_TASK_NAME
     if (!this.taskFunctions.has(taskFunctionName)) {
       this.sendToMainWorker({
@@ -416,7 +444,14 @@ export abstract class AbstractWorker<
       })
       return
     }
-    const fn = this.taskFunctions.get(taskFunctionName)?.taskFunction
+    let fn: TaskFunction<Data, Response>
+    if (abortable === true) {
+      // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+      fn = this.getAbortableTaskFunction(taskFunctionName, taskId!)
+    } else {
+      // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+      fn = this.taskFunctions.get(taskFunctionName)!.taskFunction
+    }
     if (isAsyncFunction(fn)) {
       this.runAsync(fn as TaskAsyncFunction<Data, Response>, task)
     } else {
@@ -433,7 +468,7 @@ export abstract class AbstractWorker<
     fn: TaskAsyncFunction<Data, Response>,
     task: Task<Data>
   ): void => {
-    const { data, name, taskId } = task
+    const { abortable, data, name, taskId } = task
     let taskPerformance = this.beginTaskPerformance(name)
     fn(data)
       .then(res => {
@@ -457,6 +492,10 @@ export abstract class AbstractWorker<
       })
       .finally(() => {
         this.updateLastTaskTimestamp()
+        if (abortable === true) {
+          // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+          this.taskAbortFunctions.delete(taskId!)
+        }
       })
       .catch(EMPTY_FUNCTION)
   }
@@ -470,7 +509,7 @@ export abstract class AbstractWorker<
     fn: TaskSyncFunction<Data, Response>,
     task: Task<Data>
   ): void => {
-    const { data, name, taskId } = task
+    const { abortable, data, name, taskId } = task
     try {
       let taskPerformance = this.beginTaskPerformance(name)
       const res = fn(data)
@@ -491,6 +530,10 @@ export abstract class AbstractWorker<
       })
     } finally {
       this.updateLastTaskTimestamp()
+      if (abortable === true) {
+        // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+        this.taskAbortFunctions.delete(taskId!)
+      }
     }
   }
 
@@ -624,6 +667,35 @@ export abstract class AbstractWorker<
     }
   }
 
+  /**
+   * Gets abortable task function.
+   * An abortable promise is built to permit the task to be aborted.
+   * @param name - The name of the task.
+   * @param taskId - The task id.
+   * @returns The abortable task function.
+   */
+  private getAbortableTaskFunction (
+    name: string,
+    taskId: `${string}-${string}-${string}-${string}-${string}`
+  ): TaskAsyncFunction<Data, Response> {
+    return async (data?: Data): Promise<Response> =>
+      await new Promise<Response>(
+        (resolve, reject: (reason?: unknown) => void) => {
+          this.taskAbortFunctions.set(taskId, () => {
+            reject(new AbortError(`Task '${name}' id '${taskId}' aborted`))
+          })
+          const taskFunction = this.taskFunctions.get(name)?.taskFunction
+          if (isAsyncFunction(taskFunction)) {
+            ;(taskFunction as TaskAsyncFunction<Data, Response>)(data)
+              .then(resolve)
+              .catch(reject)
+          } else {
+            resolve((taskFunction as TaskSyncFunction<Data, Response>)(data))
+          }
+        }
+      )
+  }
+
   /**
    * Starts the worker check active interval.
    */
index 394e367431da4dadd5dc7f420bad041424e9d8c9..4a0b56554a7b6f51e42388fad2f4379458070eee 100644 (file)
@@ -4,6 +4,7 @@ import type { MessageValue } from '../utility-types.js'
 import type { TaskFunction, TaskFunctions } from './task-functions.js'
 import type { WorkerOptions } from './worker-options.js'
 
+import { AbortError } from './abort-error.js'
 import { AbstractWorker } from './abstract-worker.js'
 
 /**
@@ -43,8 +44,16 @@ export class ClusterWorker<
   /**
    * @inheritDoc
    */
-  protected handleError (error: Error): { message: string; stack?: string } {
-    return { message: error.message, stack: error.stack }
+  protected handleError (error: Error): {
+    aborted: boolean
+    message: string
+    stack?: string
+  } {
+    return {
+      aborted: error instanceof AbortError,
+      message: error.message,
+      stack: error.stack,
+    }
   }
 
   /** @inheritDoc */
index 9cbba3a57059d289c50d278bc9d78472b9d3d99b..2bf172b8ee27b892c76d8f0c2857f70b883dd8ef 100644 (file)
@@ -9,6 +9,7 @@ import type { MessageValue } from '../utility-types.js'
 import type { TaskFunction, TaskFunctions } from './task-functions.js'
 import type { WorkerOptions } from './worker-options.js'
 
+import { AbortError } from './abort-error.js'
 import { AbstractWorker } from './abstract-worker.js'
 
 /**
@@ -54,11 +55,17 @@ export class ThreadWorker<
    * @inheritDoc
    */
   protected handleError (error: Error): {
+    aborted: boolean
     error: Error
     message: string
     stack?: string
   } {
-    return { error, message: error.message, stack: error.stack }
+    return {
+      aborted: error instanceof AbortError,
+      error,
+      message: error.message,
+      stack: error.stack,
+    }
   }
 
   /** @inheritDoc */
diff --git a/src/worker/worker-types.ts b/src/worker/worker-types.ts
new file mode 100644 (file)
index 0000000..706b155
--- /dev/null
@@ -0,0 +1,3 @@
+export interface AbortTaskEventDetail {
+  taskId: `${string}-${string}-${string}-${string}-${string}`
+}
index 46487a0ba27d19feb1ba4ed4b677c5fe6c9d9797..542e491c157271625172c596589fdb5457eca9da 100644 (file)
@@ -872,6 +872,7 @@ describe('Abstract pool test suite', () => {
         continuousStealing: false,
         dynamic: false,
         id: expect.any(Number),
+        queuedTaskAbortion: false,
         ready: true,
         stealing: false,
         stolen: false,
@@ -892,6 +893,7 @@ describe('Abstract pool test suite', () => {
         continuousStealing: false,
         dynamic: false,
         id: expect.any(Number),
+        queuedTaskAbortion: false,
         ready: true,
         stealing: false,
         stolen: false,
@@ -959,8 +961,11 @@ describe('Abstract pool test suite', () => {
       new TypeError('name argument must not be an empty string')
     )
     await expect(pool.execute(undefined, undefined, {})).rejects.toThrow(
-      new TypeError('transferList argument must be an array')
+      new TypeError('abortSignal argument must be an AbortSignal')
     )
+    await expect(
+      pool.execute(undefined, undefined, new AbortController().signal, {})
+    ).rejects.toThrow(new TypeError('transferList argument must be an array'))
     await expect(pool.execute(undefined, 'unknown')).rejects.toThrow(
       new Error("Task function 'unknown' not found")
     )
@@ -1917,9 +1922,30 @@ describe('Abstract pool test suite', () => {
     await expect(pool.mapExecute([undefined], '')).rejects.toThrow(
       new TypeError('name argument must not be an empty string')
     )
-    await expect(pool.mapExecute([undefined], undefined, {})).rejects.toThrow(
-      new TypeError('transferList argument must be an array')
+    await expect(pool.mapExecute([undefined], undefined, 0)).rejects.toThrow(
+      new TypeError('abortSignals argument must be an iterable')
+    )
+    await expect(
+      pool.mapExecute([undefined], undefined, [undefined])
+    ).rejects.toThrow(
+      new TypeError('abortSignals argument must be an iterable of AbortSignal')
+    )
+    await expect(
+      pool.mapExecute([undefined], undefined, [
+        new AbortController().signal,
+        new AbortController().signal,
+      ])
+    ).rejects.toThrow(
+      new Error('data and abortSignals arguments must have the same length')
     )
+    await expect(
+      pool.mapExecute(
+        [undefined],
+        undefined,
+        [new AbortController().signal],
+        {}
+      )
+    ).rejects.toThrow(new TypeError('transferList argument must be an array'))
     await expect(pool.mapExecute([undefined], 'unknown')).rejects.toThrow(
       new Error("Task function 'unknown' not found")
     )
index e911a60c0a14139918adb9152809512eb2d05f89..e41a8a6fcd56b11cbfec73b1c8092e188ec7e81d 100644 (file)
@@ -180,6 +180,7 @@ describe('Fixed cluster pool test suite', () => {
     expect(inError.message).toStrictEqual('Error Message from ClusterWorker')
     expect(typeof inError.stack === 'string').toBe(true)
     expect(taskError).toStrictEqual({
+      aborted: false,
       data,
       message: inError.message,
       name: DEFAULT_TASK_NAME,
@@ -214,6 +215,7 @@ describe('Fixed cluster pool test suite', () => {
     )
     expect(typeof inError.stack === 'string').toBe(true)
     expect(taskError).toStrictEqual({
+      aborted: false,
       data,
       message: inError.message,
       name: DEFAULT_TASK_NAME,
@@ -235,6 +237,33 @@ describe('Fixed cluster pool test suite', () => {
     expect(usedTime).toBeGreaterThanOrEqual(2000)
   })
 
+  it('Verify that task can be aborted', async () => {
+    let error
+
+    try {
+      await asyncErrorPool.execute({}, 'default', AbortSignal.timeout(500))
+    } catch (e) {
+      error = e
+    }
+    expect(error).toBeInstanceOf(Error)
+    expect(error.name).toBe('TimeoutError')
+    expect(error.message).toBe('The operation was aborted due to timeout')
+    expect(error.stack).toBeDefined()
+
+    const abortController = new AbortController()
+    setTimeout(() => {
+      abortController.abort(new Error('Task aborted'))
+    }, 500)
+    try {
+      await asyncErrorPool.execute({}, 'default', abortController.signal)
+    } catch (e) {
+      error = e
+    }
+    expect(error).toBeInstanceOf(Error)
+    expect(error.message).toBe('Task aborted')
+    expect(error.stack).toBeDefined()
+  })
+
   it('Shutdown test', async () => {
     const exitPromise = waitWorkerEvents(pool, 'exit', numberOfWorkers)
     expect(pool.emitter.eventNames()).toStrictEqual([])
index dd01556fa45565b795a282dd796519b2b582688d..ac7937095840a3ba7ae4d1b988d511f79e0e02e6 100644 (file)
@@ -165,7 +165,7 @@ describe('Fixed thread pool test suite', () => {
     let error
     let result
     try {
-      result = await pool.execute(undefined, undefined, [
+      result = await pool.execute(undefined, undefined, undefined, [
         new ArrayBuffer(16),
         new MessageChannel().port1,
       ])
@@ -175,7 +175,7 @@ describe('Fixed thread pool test suite', () => {
     expect(result).toStrictEqual({ ok: 1 })
     expect(error).toBeUndefined()
     try {
-      result = await pool.execute(undefined, undefined, [
+      result = await pool.execute(undefined, undefined, undefined, [
         new SharedArrayBuffer(16),
       ])
     } catch (e) {
@@ -206,6 +206,7 @@ describe('Fixed thread pool test suite', () => {
     expect(inError.message).toStrictEqual('Error Message from ThreadWorker')
     expect(typeof inError.stack === 'string').toBe(true)
     expect(taskError).toStrictEqual({
+      aborted: false,
       data,
       error: inError,
       message: inError.message,
@@ -241,6 +242,7 @@ describe('Fixed thread pool test suite', () => {
     )
     expect(typeof inError.stack === 'string').toBe(true)
     expect(taskError).toStrictEqual({
+      aborted: false,
       data,
       error: inError,
       message: inError.message,
@@ -263,6 +265,33 @@ describe('Fixed thread pool test suite', () => {
     expect(usedTime).toBeGreaterThanOrEqual(2000)
   })
 
+  it('Verify that task can be aborted', async () => {
+    let error
+
+    try {
+      await asyncErrorPool.execute({}, 'default', AbortSignal.timeout(500))
+    } catch (e) {
+      error = e
+    }
+    expect(error).toBeInstanceOf(Error)
+    expect(error.name).toBe('TimeoutError')
+    expect(error.message).toBe('The operation was aborted due to timeout')
+    expect(error.stack).toBeDefined()
+
+    const abortController = new AbortController()
+    setTimeout(() => {
+      abortController.abort(new Error('Task aborted'))
+    }, 500)
+    try {
+      await asyncErrorPool.execute({}, 'default', abortController.signal)
+    } catch (e) {
+      error = e
+    }
+    expect(error).toBeInstanceOf(Error)
+    expect(error.message).toBe('Task aborted')
+    expect(error.stack).toBeDefined()
+  })
+
   it('Shutdown test', async () => {
     const exitPromise = waitWorkerEvents(pool, 'exit', numberOfThreads)
     expect(pool.emitter.eventNames()).toStrictEqual([])
index 18e26c9cfa4e4b1714029662e1505b6f60164b12..5f7f5102ec535ead82e7995f6234e8e147c13369 100644 (file)
@@ -134,6 +134,7 @@ describe('Pool utils test suite', () => {
       continuousStealing: false,
       dynamic: false,
       id: threadWorker.threadId,
+      queuedTaskAbortion: false,
       ready: false,
       stealing: false,
       stolen: false,
@@ -151,6 +152,7 @@ describe('Pool utils test suite', () => {
       continuousStealing: false,
       dynamic: false,
       id: clusterWorker.id,
+      queuedTaskAbortion: false,
       ready: false,
       stealing: false,
       stolen: false,
index 753cb31d67452ca02ccb1239a5c2f60ccfb0cb7a..709c67699d609c975d13aa41d6c7efa564a4e3d2 100644 (file)
@@ -240,6 +240,7 @@ describe('Worker node test suite', () => {
       continuousStealing: false,
       dynamic: false,
       id: threadWorkerNode.worker.threadId,
+      queuedTaskAbortion: false,
       ready: false,
       stealing: false,
       stolen: false,
@@ -302,6 +303,7 @@ describe('Worker node test suite', () => {
       continuousStealing: false,
       dynamic: false,
       id: clusterWorkerNode.worker.id,
+      queuedTaskAbortion: false,
       ready: false,
       stealing: false,
       stolen: false,
index d91ea712eeeb3d7812836d4e8c543a3f396b19c2..1f1eb8c31964d548a2830cc99e52170f6ca90bde 100644 (file)
@@ -138,6 +138,41 @@ describe('Fixed priority queue test suite', () => {
     expect(fixedPriorityQueue.capacity).toBe(queueSize)
   })
 
+  it('Verify delete() behavior', () => {
+    const fixedPriorityQueue = new FixedPriorityQueue()
+    fixedPriorityQueue.enqueue(1)
+    fixedPriorityQueue.enqueue(2, -1)
+    fixedPriorityQueue.enqueue(3)
+    expect(fixedPriorityQueue.start).toBe(0)
+    expect(fixedPriorityQueue.size).toBe(3)
+    expect(fixedPriorityQueue.nodeArray).toMatchObject([
+      { data: 2, priority: -1 },
+      { data: 1, priority: 0 },
+      { data: 3, priority: 0 },
+    ])
+    expect(fixedPriorityQueue.delete(2)).toBe(true)
+    expect(fixedPriorityQueue.start).toBe(0)
+    expect(fixedPriorityQueue.size).toBe(2)
+    expect(fixedPriorityQueue.nodeArray).toMatchObject([
+      { data: 1, priority: 0 },
+      { data: 3, priority: 0 },
+    ])
+    expect(fixedPriorityQueue.delete(3)).toBe(true)
+    expect(fixedPriorityQueue.start).toBe(0)
+    expect(fixedPriorityQueue.size).toBe(1)
+    expect(fixedPriorityQueue.nodeArray).toMatchObject([
+      { data: 1, priority: 0 },
+    ])
+    expect(fixedPriorityQueue.delete(1)).toBe(true)
+    expect(fixedPriorityQueue.start).toBe(0)
+    expect(fixedPriorityQueue.size).toBe(0)
+    expect(fixedPriorityQueue.nodeArray).toMatchObject([])
+    expect(fixedPriorityQueue.delete(2)).toBe(false)
+    expect(fixedPriorityQueue.start).toBe(0)
+    expect(fixedPriorityQueue.size).toBe(0)
+    expect(fixedPriorityQueue.nodeArray).toMatchObject([])
+  })
+
   it('Verify iterator behavior', () => {
     const fixedPriorityQueue = new FixedPriorityQueue()
     fixedPriorityQueue.enqueue(1)
index 7b1053ad799e9da06b84cebc9b3d89593544d7de..d1231643f7496353def85b77ef80a9c130a3e5bd 100644 (file)
@@ -136,6 +136,39 @@ describe('Fixed queue test suite', () => {
     expect(fixedQueue.capacity).toBe(queueSize)
   })
 
+  it('Verify delete() behavior', () => {
+    const fixedQueue = new FixedQueue()
+    fixedQueue.enqueue(1)
+    fixedQueue.enqueue(2, -1)
+    fixedQueue.enqueue(3)
+    expect(fixedQueue.start).toBe(0)
+    expect(fixedQueue.size).toBe(3)
+    expect(fixedQueue.nodeArray).toMatchObject([
+      { data: 1, priority: 0 },
+      { data: 2, priority: -1 },
+      { data: 3, priority: 0 },
+    ])
+    expect(fixedQueue.delete(2)).toBe(true)
+    expect(fixedQueue.start).toBe(0)
+    expect(fixedQueue.size).toBe(2)
+    expect(fixedQueue.nodeArray).toMatchObject([
+      { data: 1, priority: 0 },
+      { data: 3, priority: 0 },
+    ])
+    expect(fixedQueue.delete(3)).toBe(true)
+    expect(fixedQueue.start).toBe(0)
+    expect(fixedQueue.size).toBe(1)
+    expect(fixedQueue.nodeArray).toMatchObject([{ data: 1, priority: 0 }])
+    expect(fixedQueue.delete(1)).toBe(true)
+    expect(fixedQueue.start).toBe(0)
+    expect(fixedQueue.size).toBe(0)
+    expect(fixedQueue.nodeArray).toMatchObject([])
+    expect(fixedQueue.delete(2)).toBe(false)
+    expect(fixedQueue.start).toBe(0)
+    expect(fixedQueue.size).toBe(0)
+    expect(fixedQueue.nodeArray).toMatchObject([])
+  })
+
   it('Verify iterator behavior', () => {
     const fixedQueue = new FixedQueue()
     fixedQueue.enqueue(1)
index 48a68732662afbd1231dc8611fac387e7945d8e0..64174908e7dd049f0f3ebc65e9bd55b2ac03b7d9 100644 (file)
@@ -300,6 +300,112 @@ describe('Priority queue test suite', () => {
     expect(priorityQueue.tail).toStrictEqual(priorityQueue.head)
   })
 
+  it('Verify default bucket size delete() behavior', () => {
+    const priorityQueue = new PriorityQueue(defaultBucketSize, true)
+    priorityQueue.enqueue(1)
+    priorityQueue.enqueue(2)
+    priorityQueue.enqueue(3)
+    expect(priorityQueue.buckets).toBe(0)
+    expect(priorityQueue.size).toBe(3)
+    expect(priorityQueue.maxSize).toBe(3)
+    expect(priorityQueue.tail.empty()).toBe(false)
+    expect(priorityQueue.tail.next).toBe(undefined)
+    expect(priorityQueue.tail).toStrictEqual(priorityQueue.head)
+    expect(priorityQueue.delete(2)).toBe(true)
+    expect(priorityQueue.buckets).toBe(0)
+    expect(priorityQueue.size).toBe(2)
+    expect(priorityQueue.maxSize).toBe(3)
+    expect(priorityQueue.tail.empty()).toBe(false)
+    expect(priorityQueue.tail.next).toBe(undefined)
+    expect(priorityQueue.tail).toStrictEqual(priorityQueue.head)
+    expect(priorityQueue.delete(3)).toBe(true)
+    expect(priorityQueue.buckets).toBe(0)
+    expect(priorityQueue.size).toBe(1)
+    expect(priorityQueue.maxSize).toBe(3)
+    expect(priorityQueue.tail.empty()).toBe(false)
+    expect(priorityQueue.tail.next).toBe(undefined)
+    expect(priorityQueue.tail).toStrictEqual(priorityQueue.head)
+    expect(priorityQueue.delete(1)).toBe(true)
+    expect(priorityQueue.buckets).toBe(0)
+    expect(priorityQueue.size).toBe(0)
+    expect(priorityQueue.maxSize).toBe(3)
+    expect(priorityQueue.tail.empty()).toBe(true)
+    expect(priorityQueue.tail.next).toBe(undefined)
+    expect(priorityQueue.tail).toStrictEqual(priorityQueue.head)
+    expect(priorityQueue.delete(2)).toBe(false)
+    expect(priorityQueue.buckets).toBe(0)
+    expect(priorityQueue.size).toBe(0)
+    expect(priorityQueue.maxSize).toBe(3)
+    expect(priorityQueue.tail.empty()).toBe(true)
+    expect(priorityQueue.tail.next).toBe(undefined)
+    expect(priorityQueue.tail).toStrictEqual(priorityQueue.head)
+  })
+
+  it('Verify bucketSize=2 delete() behavior', () => {
+    const priorityQueue = new PriorityQueue(2, true)
+    priorityQueue.enqueue(1)
+    priorityQueue.enqueue(2)
+    priorityQueue.enqueue(3)
+    priorityQueue.enqueue(3, -1)
+    priorityQueue.enqueue(1, 1)
+    priorityQueue.enqueue(3, -2)
+    expect(priorityQueue.buckets).toBe(3)
+    expect(priorityQueue.size).toBe(6)
+    expect(priorityQueue.maxSize).toBe(6)
+    expect(priorityQueue.tail.empty()).toBe(false)
+    expect(priorityQueue.tail.next).toBeInstanceOf(FixedPriorityQueue)
+    expect(priorityQueue.tail).not.toStrictEqual(priorityQueue.head)
+    expect(priorityQueue.delete(2)).toBe(true)
+    expect(priorityQueue.buckets).toBe(2)
+    expect(priorityQueue.size).toBe(5)
+    expect(priorityQueue.maxSize).toBe(6)
+    expect(priorityQueue.tail.empty()).toBe(false)
+    expect(priorityQueue.tail.next).toBeInstanceOf(FixedPriorityQueue)
+    expect(priorityQueue.tail).not.toStrictEqual(priorityQueue.head)
+    expect(priorityQueue.delete(3)).toBe(true)
+    expect(priorityQueue.buckets).toBe(2)
+    expect(priorityQueue.size).toBe(4)
+    expect(priorityQueue.maxSize).toBe(6)
+    expect(priorityQueue.tail.empty()).toBe(false)
+    expect(priorityQueue.tail.next).toBeInstanceOf(FixedPriorityQueue)
+    expect(priorityQueue.tail).not.toStrictEqual(priorityQueue.head)
+    expect(priorityQueue.delete(1)).toBe(true)
+    expect(priorityQueue.buckets).toBe(1)
+    expect(priorityQueue.size).toBe(3)
+    expect(priorityQueue.maxSize).toBe(6)
+    expect(priorityQueue.tail.empty()).toBe(false)
+    expect(priorityQueue.tail.next).toBeInstanceOf(FixedPriorityQueue)
+    expect(priorityQueue.tail).not.toStrictEqual(priorityQueue.head)
+    expect(priorityQueue.delete(1)).toBe(true)
+    expect(priorityQueue.buckets).toBe(1)
+    expect(priorityQueue.size).toBe(2)
+    expect(priorityQueue.maxSize).toBe(6)
+    expect(priorityQueue.tail.empty()).toBe(false)
+    expect(priorityQueue.tail.next).toBeInstanceOf(FixedPriorityQueue)
+    expect(priorityQueue.tail).not.toStrictEqual(priorityQueue.head)
+    expect(priorityQueue.delete(3)).toBe(true)
+    expect(priorityQueue.buckets).toBe(0)
+    expect(priorityQueue.size).toBe(1)
+    expect(priorityQueue.maxSize).toBe(6)
+    expect(priorityQueue.tail.empty()).toBe(false)
+    expect(priorityQueue.tail.next).toBe(undefined)
+    expect(priorityQueue.tail).toStrictEqual(priorityQueue.head)
+    expect(priorityQueue.delete(2)).toBe(false)
+    expect(priorityQueue.buckets).toBe(0)
+    expect(priorityQueue.size).toBe(1)
+    expect(priorityQueue.maxSize).toBe(6)
+    expect(priorityQueue.tail.empty()).toBe(false)
+    expect(priorityQueue.tail.next).toBe(undefined)
+    expect(priorityQueue.tail).toStrictEqual(priorityQueue.head)
+    expect(priorityQueue.delete(3)).toBe(true)
+    expect(priorityQueue.buckets).toBe(0)
+    expect(priorityQueue.size).toBe(0)
+    expect(priorityQueue.maxSize).toBe(6)
+    expect(priorityQueue.tail.empty()).toBe(true)
+    expect(priorityQueue.tail.next).toBe(undefined)
+    expect(priorityQueue.tail).toStrictEqual(priorityQueue.head)
+  })
+
   it('Verify enablePriority setter behavior', () => {
     const priorityQueue = new PriorityQueue(2)
     expect(priorityQueue.enablePriority).toBe(false)
diff --git a/tests/worker/abort-error.test.mjs b/tests/worker/abort-error.test.mjs
new file mode 100644 (file)
index 0000000..1de5f4e
--- /dev/null
@@ -0,0 +1,16 @@
+import { expect } from '@std/expect'
+
+import { AbortError } from '../../lib/worker/abort-error.cjs'
+
+describe('Abort error test suite', () => {
+  it('Verify constructor() behavior', () => {
+    const errorMessage = 'This is an abort error message'
+    const abortError = new AbortError(errorMessage)
+
+    expect(abortError).toBeInstanceOf(AbortError)
+    expect(abortError).toBeInstanceOf(Error)
+    expect(abortError.name).toBe('AbortError')
+    expect(abortError.message).toBe(errorMessage)
+    expect(abortError.stack).toBeDefined()
+  })
+})
index eaf46d22220fcd04809744dc04eab9a69be9148d..c0df3d447558abff9b57bd808e6df1f796b8f846 100644 (file)
@@ -94,6 +94,7 @@ describe('Cluster worker test suite', () => {
     const error = new Error('Error as an error')
     const worker = new ClusterWorker(() => {})
     expect(worker.handleError(error)).toStrictEqual({
+      aborted: false,
       message: error.message,
       stack: error.stack,
     })
index 4d10cf622685d4ec920bed4bc189f3ead0cf18da..49e1e572b63dfa2c8e834a56379a251552564ca8 100644 (file)
@@ -94,6 +94,7 @@ describe('Thread worker test suite', () => {
     const error = new Error('Error as an error')
     const worker = new ThreadWorker(() => {})
     expect(worker.handleError(error)).toStrictEqual({
+      aborted: false,
       error,
       message: error.message,
       stack: error.stack,