+ private shallExecuteTask (workerNodeKey: number): boolean {
+ return (
+ this.tasksQueueSize(workerNodeKey) === 0 &&
+ this.workerNodes[workerNodeKey].usage.tasks.executing <
+ (this.opts.tasksQueueOptions?.concurrency as number)
+ )
+ }
+
+ /** @inheritDoc */
+ public async execute (
+ data?: Data,
+ name?: string,
+ transferList?: TransferListItem[]
+ ): Promise<Response> {
+ return await new Promise<Response>((resolve, reject) => {
+ if (!this.started) {
+ reject(new Error('Cannot execute a task on not started pool'))
+ return
+ }
+ if (this.destroying) {
+ reject(new Error('Cannot execute a task on destroying pool'))
+ return
+ }
+ if (name != null && typeof name !== 'string') {
+ reject(new TypeError('name argument must be a string'))
+ return
+ }
+ if (
+ name != null &&
+ typeof name === 'string' &&
+ name.trim().length === 0
+ ) {
+ reject(new TypeError('name argument must not be an empty string'))
+ return
+ }
+ if (transferList != null && !Array.isArray(transferList)) {
+ reject(new TypeError('transferList argument must be an array'))
+ return
+ }
+ const timestamp = performance.now()
+ const workerNodeKey = this.chooseWorkerNode()
+ const task: Task<Data> = {
+ name: name ?? DEFAULT_TASK_NAME,
+ // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
+ data: data ?? ({} as Data),
+ transferList,
+ timestamp,
+ taskId: randomUUID()
+ }
+ this.promiseResponseMap.set(task.taskId as string, {
+ resolve,
+ reject,
+ workerNodeKey
+ })
+ if (
+ this.opts.enableTasksQueue === false ||
+ (this.opts.enableTasksQueue === true &&
+ this.shallExecuteTask(workerNodeKey))
+ ) {
+ this.executeTask(workerNodeKey, task)
+ } else {
+ this.enqueueTask(workerNodeKey, task)
+ }
+ })
+ }
+
+ /** @inheritdoc */
+ public start (): void {
+ if (this.started) {
+ throw new Error('Cannot start an already started pool')
+ }
+ if (this.starting) {
+ throw new Error('Cannot start an already starting pool')
+ }
+ if (this.destroying) {
+ throw new Error('Cannot start a destroying pool')
+ }
+ this.starting = true
+ while (
+ this.workerNodes.reduce(
+ (accumulator, workerNode) =>
+ !workerNode.info.dynamic ? accumulator + 1 : accumulator,
+ 0
+ ) < this.numberOfWorkers
+ ) {
+ this.createAndSetupWorkerNode()
+ }
+ this.starting = false
+ this.started = true
+ }
+
+ /** @inheritDoc */
+ public async destroy (): Promise<void> {
+ if (!this.started) {
+ throw new Error('Cannot destroy an already destroyed pool')
+ }
+ if (this.starting) {
+ throw new Error('Cannot destroy an starting pool')
+ }
+ if (this.destroying) {
+ throw new Error('Cannot destroy an already destroying pool')
+ }
+ this.destroying = true
+ await Promise.all(
+ this.workerNodes.map(async (_workerNode, workerNodeKey) => {
+ await this.destroyWorkerNode(workerNodeKey)
+ })
+ )
+ this.emitter?.emit(PoolEvents.destroy, this.info)
+ this.emitter?.emitDestroy()
+ this.emitter?.removeAllListeners()
+ this.readyEventEmitted = false
+ this.destroying = false
+ this.started = false
+ }
+
+ protected async sendKillMessageToWorker (
+ workerNodeKey: number
+ ): Promise<void> {
+ await new Promise<void>((resolve, reject) => {
+ const killMessageListener = (message: MessageValue<Response>): void => {
+ this.checkMessageWorkerId(message)
+ if (message.kill === 'success') {
+ resolve()
+ } else if (message.kill === 'failure') {
+ reject(
+ new Error(
+ `Kill message handling failed on worker ${
+ message.workerId as number
+ }`
+ )
+ )
+ }
+ }
+ // FIXME: should be registered only once
+ this.registerWorkerMessageListener(workerNodeKey, killMessageListener)
+ this.sendToWorker(workerNodeKey, { kill: true })
+ })
+ }
+
+ /**
+ * Terminates the worker node given its worker node key.
+ *
+ * @param workerNodeKey - The worker node key.
+ */
+ protected abstract destroyWorkerNode (workerNodeKey: number): Promise<void>
+
+ /**
+ * Setup hook to execute code before worker nodes are created in the abstract constructor.
+ * Can be overridden.
+ *
+ * @virtual
+ */
+ protected setupHook (): void {
+ /* Intentionally empty */
+ }
+
+ /**
+ * Should return whether the worker is the main worker or not.
+ */
+ protected abstract isMain (): boolean
+
+ /**
+ * Hook executed before the worker task execution.
+ * Can be overridden.
+ *
+ * @param workerNodeKey - The worker node key.
+ * @param task - The task to execute.
+ */
+ protected beforeTaskExecutionHook (
+ workerNodeKey: number,
+ task: Task<Data>
+ ): void {
+ if (this.workerNodes[workerNodeKey]?.usage != null) {
+ const workerUsage = this.workerNodes[workerNodeKey].usage
+ ++workerUsage.tasks.executing
+ this.updateWaitTimeWorkerUsage(workerUsage, task)
+ }
+ if (
+ this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
+ this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
+ task.name as string
+ ) != null
+ ) {
+ const taskFunctionWorkerUsage = this.workerNodes[
+ workerNodeKey
+ ].getTaskFunctionWorkerUsage(task.name as string) as WorkerUsage
+ ++taskFunctionWorkerUsage.tasks.executing
+ this.updateWaitTimeWorkerUsage(taskFunctionWorkerUsage, task)
+ }
+ }
+
+ /**
+ * Hook executed after the worker task execution.
+ * Can be overridden.
+ *
+ * @param workerNodeKey - The worker node key.