+ const timestamp = performance.now()
+ const workerNodeKey = this.chooseWorkerNode()
+ const task: Task<Data> = {
+ name: name ?? DEFAULT_TASK_NAME,
+ // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
+ data: data ?? ({} as Data),
+ transferList,
+ timestamp,
+ taskId: randomUUID()
+ }
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ this.promiseResponseMap.set(task.taskId!, {
+ resolve,
+ reject,
+ workerNodeKey,
+ ...(this.emitter != null && {
+ asyncResource: new AsyncResource('poolifier:task', {
+ triggerAsyncId: this.emitter.asyncId,
+ requireManualDestroy: true
+ })
+ })
+ })
+ if (
+ this.opts.enableTasksQueue === false ||
+ (this.opts.enableTasksQueue === true &&
+ this.shallExecuteTask(workerNodeKey))
+ ) {
+ this.executeTask(workerNodeKey, task)
+ } else {
+ this.enqueueTask(workerNodeKey, task)
+ }
+ })
+ }
+
+ /** @inheritdoc */
+ public start (): void {
+ if (this.started) {
+ throw new Error('Cannot start an already started pool')
+ }
+ if (this.starting) {
+ throw new Error('Cannot start an already starting pool')
+ }
+ if (this.destroying) {
+ throw new Error('Cannot start a destroying pool')
+ }
+ this.starting = true
+ while (
+ this.workerNodes.reduce(
+ (accumulator, workerNode) =>
+ !workerNode.info.dynamic ? accumulator + 1 : accumulator,
+ 0
+ ) < this.minimumNumberOfWorkers
+ ) {
+ this.createAndSetupWorkerNode()
+ }
+ this.starting = false
+ this.started = true
+ }
+
+ /** @inheritDoc */
+ public async destroy (): Promise<void> {
+ if (!this.started) {
+ throw new Error('Cannot destroy an already destroyed pool')
+ }
+ if (this.starting) {
+ throw new Error('Cannot destroy an starting pool')
+ }
+ if (this.destroying) {
+ throw new Error('Cannot destroy an already destroying pool')
+ }
+ this.destroying = true
+ await Promise.all(
+ this.workerNodes.map(async (_workerNode, workerNodeKey) => {
+ await this.destroyWorkerNode(workerNodeKey)
+ })
+ )
+ this.emitter?.emit(PoolEvents.destroy, this.info)
+ this.emitter?.emitDestroy()
+ this.emitter?.removeAllListeners()
+ this.readyEventEmitted = false
+ this.destroying = false
+ this.started = false
+ }
+
+ private async sendKillMessageToWorker (workerNodeKey: number): Promise<void> {
+ await new Promise<void>((resolve, reject) => {
+ // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
+ if (this.workerNodes[workerNodeKey] == null) {
+ resolve()
+ return
+ }
+ const killMessageListener = (message: MessageValue<Response>): void => {
+ this.checkMessageWorkerId(message)
+ if (message.kill === 'success') {
+ resolve()
+ } else if (message.kill === 'failure') {
+ reject(
+ new Error(
+ `Kill message handling failed on worker ${message.workerId}`
+ )
+ )
+ }
+ }
+ // FIXME: should be registered only once
+ this.registerWorkerMessageListener(workerNodeKey, killMessageListener)
+ this.sendToWorker(workerNodeKey, { kill: true })
+ })
+ }
+
+ /**
+ * Terminates the worker node given its worker node key.
+ *
+ * @param workerNodeKey - The worker node key.
+ */
+ protected async destroyWorkerNode (workerNodeKey: number): Promise<void> {
+ this.flagWorkerNodeAsNotReady(workerNodeKey)
+ const flushedTasks = this.flushTasksQueue(workerNodeKey)
+ const workerNode = this.workerNodes[workerNodeKey]
+ await waitWorkerNodeEvents(
+ workerNode,
+ 'taskFinished',
+ flushedTasks,
+ this.opts.tasksQueueOptions?.tasksFinishedTimeout ??
+ getDefaultTasksQueueOptions(
+ this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers
+ ).tasksFinishedTimeout
+ )
+ await this.sendKillMessageToWorker(workerNodeKey)
+ await workerNode.terminate()
+ }
+
+ /**
+ * Setup hook to execute code before worker nodes are created in the abstract constructor.
+ * Can be overridden.
+ *
+ * @virtual
+ */
+ protected setupHook (): void {
+ /* Intentionally empty */
+ }
+
+ /**
+ * Should return whether the worker is the main worker or not.
+ */
+ protected abstract isMain (): boolean
+
+ /**
+ * Hook executed before the worker task execution.
+ * Can be overridden.
+ *
+ * @param workerNodeKey - The worker node key.
+ * @param task - The task to execute.
+ */
+ protected beforeTaskExecutionHook (
+ workerNodeKey: number,
+ task: Task<Data>
+ ): void {
+ // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
+ if (this.workerNodes[workerNodeKey]?.usage != null) {
+ const workerUsage = this.workerNodes[workerNodeKey].usage
+ ++workerUsage.tasks.executing
+ updateWaitTimeWorkerUsage(
+ this.workerChoiceStrategyContext,
+ workerUsage,
+ task
+ )
+ }
+ if (
+ this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(task.name!) !=
+ null
+ ) {
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ const taskFunctionWorkerUsage = this.workerNodes[
+ workerNodeKey
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ ].getTaskFunctionWorkerUsage(task.name!)!
+ ++taskFunctionWorkerUsage.tasks.executing
+ updateWaitTimeWorkerUsage(
+ this.workerChoiceStrategyContext,
+ taskFunctionWorkerUsage,
+ task
+ )