+ public hasTaskFunction (name: string): boolean {
+ return this.listTaskFunctionsProperties().some(
+ taskFunctionProperties => taskFunctionProperties.name === name
+ )
+ }
+
+ /** @inheritDoc */
+ public async addTaskFunction (
+ name: string,
+ fn: TaskFunction<Data, Response> | TaskFunctionObject<Data, Response>
+ ): Promise<boolean> {
+ if (typeof name !== 'string') {
+ throw new TypeError('name argument must be a string')
+ }
+ if (typeof name === 'string' && name.trim().length === 0) {
+ throw new TypeError('name argument must not be an empty string')
+ }
+ if (typeof fn === 'function') {
+ fn = { taskFunction: fn } satisfies TaskFunctionObject<Data, Response>
+ }
+ if (typeof fn.taskFunction !== 'function') {
+ throw new TypeError('taskFunction property must be a function')
+ }
+ checkValidPriority(fn.priority)
+ checkValidWorkerChoiceStrategy(fn.strategy)
+ const opResult = await this.sendTaskFunctionOperationToWorkers({
+ taskFunctionOperation: 'add',
+ taskFunctionProperties: buildTaskFunctionProperties(name, fn),
+ taskFunction: fn.taskFunction.toString(),
+ })
+ this.taskFunctions.set(name, fn)
+ this.workerChoiceStrategiesContext?.syncWorkerChoiceStrategies(
+ this.getWorkerChoiceStrategies()
+ )
+ for (const workerNodeKey of this.workerNodes.keys()) {
+ this.sendStatisticsMessageToWorker(workerNodeKey)
+ }
+ return opResult
+ }
+
+ /** @inheritDoc */
+ public async removeTaskFunction (name: string): Promise<boolean> {
+ if (!this.taskFunctions.has(name)) {
+ throw new Error(
+ 'Cannot remove a task function not handled on the pool side'
+ )
+ }
+ const opResult = await this.sendTaskFunctionOperationToWorkers({
+ taskFunctionOperation: 'remove',
+ taskFunctionProperties: buildTaskFunctionProperties(
+ name,
+ this.taskFunctions.get(name)
+ ),
+ })
+ for (const workerNode of this.workerNodes) {
+ workerNode.deleteTaskFunctionWorkerUsage(name)
+ }
+ this.taskFunctions.delete(name)
+ this.workerChoiceStrategiesContext?.syncWorkerChoiceStrategies(
+ this.getWorkerChoiceStrategies()
+ )
+ for (const workerNodeKey of this.workerNodes.keys()) {
+ this.sendStatisticsMessageToWorker(workerNodeKey)
+ }
+ return opResult
+ }
+
+ /** @inheritDoc */
+ public listTaskFunctionsProperties (): TaskFunctionProperties[] {
+ for (const workerNode of this.workerNodes) {
+ if (
+ Array.isArray(workerNode.info.taskFunctionsProperties) &&
+ workerNode.info.taskFunctionsProperties.length > 0
+ ) {
+ return workerNode.info.taskFunctionsProperties
+ }
+ }
+ return []
+ }
+
+ /**
+ * Gets task function worker choice strategy, if any.
+ * @param name - The task function name.
+ * @returns The task function worker choice strategy if the task function worker choice strategy is defined, `undefined` otherwise.
+ */
+ private readonly getTaskFunctionWorkerChoiceStrategy = (
+ name?: string
+ ): WorkerChoiceStrategy | undefined => {
+ name = name ?? DEFAULT_TASK_NAME
+ const taskFunctionsProperties = this.listTaskFunctionsProperties()
+ if (name === DEFAULT_TASK_NAME) {
+ name = taskFunctionsProperties[1]?.name
+ }
+ return taskFunctionsProperties.find(
+ (taskFunctionProperties: TaskFunctionProperties) =>
+ taskFunctionProperties.name === name
+ )?.strategy
+ }
+
+ /**
+ * Gets worker node task function worker choice strategy, if any.
+ * @param workerNodeKey - The worker node key.
+ * @param name - The task function name.
+ * @returns The worker node task function worker choice strategy if the worker node task function worker choice strategy is defined, `undefined` otherwise.
+ */
+ private readonly getWorkerNodeTaskFunctionWorkerChoiceStrategy = (
+ workerNodeKey: number,
+ name?: string
+ ): WorkerChoiceStrategy | undefined => {
+ const workerInfo = this.getWorkerInfo(workerNodeKey)
+ if (workerInfo == null) {
+ return
+ }
+ name = name ?? DEFAULT_TASK_NAME
+ if (name === DEFAULT_TASK_NAME) {
+ name = workerInfo.taskFunctionsProperties?.[1]?.name
+ }
+ return workerInfo.taskFunctionsProperties?.find(
+ (taskFunctionProperties: TaskFunctionProperties) =>
+ taskFunctionProperties.name === name
+ )?.strategy
+ }
+
+ /**
+ * Gets worker node task function priority, if any.
+ * @param workerNodeKey - The worker node key.
+ * @param name - The task function name.
+ * @returns The worker node task function priority if the worker node task function priority is defined, `undefined` otherwise.
+ */
+ private readonly getWorkerNodeTaskFunctionPriority = (
+ workerNodeKey: number,
+ name?: string
+ ): number | undefined => {
+ const workerInfo = this.getWorkerInfo(workerNodeKey)
+ if (workerInfo == null) {
+ return
+ }
+ name = name ?? DEFAULT_TASK_NAME
+ if (name === DEFAULT_TASK_NAME) {
+ name = workerInfo.taskFunctionsProperties?.[1]?.name
+ }
+ return workerInfo.taskFunctionsProperties?.find(
+ (taskFunctionProperties: TaskFunctionProperties) =>
+ taskFunctionProperties.name === name
+ )?.priority
+ }
+
+ /**
+ * Gets the worker choice strategies registered in this pool.
+ * @returns The worker choice strategies.
+ */
+ private readonly getWorkerChoiceStrategies =
+ (): Set<WorkerChoiceStrategy> => {
+ return new Set([
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ this.opts.workerChoiceStrategy!,
+ ...(this.listTaskFunctionsProperties()
+ .map(
+ (taskFunctionProperties: TaskFunctionProperties) =>
+ taskFunctionProperties.strategy
+ )
+ .filter(
+ (strategy: WorkerChoiceStrategy | undefined) => strategy != null
+ ) as WorkerChoiceStrategy[]),
+ ])
+ }
+
+ /** @inheritDoc */
+ public async setDefaultTaskFunction (name: string): Promise<boolean> {
+ return await this.sendTaskFunctionOperationToWorkers({
+ taskFunctionOperation: 'default',
+ taskFunctionProperties: buildTaskFunctionProperties(
+ name,
+ this.taskFunctions.get(name)
+ ),
+ })
+ }
+
+ private shallExecuteTask (workerNodeKey: number): boolean {
+ return (
+ this.tasksQueueSize(workerNodeKey) === 0 &&
+ this.workerNodes[workerNodeKey].usage.tasks.executing <
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ this.opts.tasksQueueOptions!.concurrency!
+ )
+ }
+
+ /** @inheritDoc */
+ public async execute (
+ data?: Data,
+ name?: string,
+ transferList?: readonly TransferListItem[]
+ ): Promise<Response> {
+ return await new Promise<Response>((resolve, reject) => {
+ if (!this.started) {
+ reject(new Error('Cannot execute a task on not started pool'))
+ return
+ }
+ if (this.destroying) {
+ reject(new Error('Cannot execute a task on destroying pool'))
+ return
+ }
+ if (name != null && typeof name !== 'string') {
+ reject(new TypeError('name argument must be a string'))
+ return
+ }
+ if (
+ name != null &&
+ typeof name === 'string' &&
+ name.trim().length === 0
+ ) {
+ reject(new TypeError('name argument must not be an empty string'))
+ return
+ }
+ if (transferList != null && !Array.isArray(transferList)) {
+ reject(new TypeError('transferList argument must be an array'))
+ return
+ }
+ const timestamp = performance.now()
+ const workerNodeKey = this.chooseWorkerNode(name)
+ const task: Task<Data> = {
+ name: name ?? DEFAULT_TASK_NAME,
+ data: data ?? ({} as Data),
+ priority: this.getWorkerNodeTaskFunctionPriority(workerNodeKey, name),
+ strategy: this.getWorkerNodeTaskFunctionWorkerChoiceStrategy(
+ workerNodeKey,
+ name
+ ),
+ transferList,
+ timestamp,
+ taskId: randomUUID(),
+ }
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ this.promiseResponseMap.set(task.taskId!, {
+ resolve,
+ reject,
+ workerNodeKey,
+ ...(this.emitter != null && {
+ asyncResource: new AsyncResource('poolifier:task', {
+ triggerAsyncId: this.emitter.asyncId,
+ requireManualDestroy: true,
+ }),
+ }),
+ })
+ if (
+ this.opts.enableTasksQueue === false ||
+ (this.opts.enableTasksQueue === true &&
+ this.shallExecuteTask(workerNodeKey))
+ ) {
+ this.executeTask(workerNodeKey, task)
+ } else {
+ this.enqueueTask(workerNodeKey, task)
+ }
+ })
+ }
+
+
+ /** @inheritDoc */
+ public mapExecute (
+ data: Iterable<Data>,
+ name?: string,
+ transferList?: readonly TransferListItem[]
+ ): Promise<Response[]> {
+ // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
+ if (data == null) {
+ throw new TypeError('data argument must be a defined iterable')
+ }
+ if (typeof data[Symbol.iterator] !== 'function') {
+ throw new TypeError('data argument must be an iterable')
+ }
+ if (!Array.isArray(data)) {
+ data = [...data]
+ }
+ return Promise.all(
+ (data as Data[]).map(data => this.execute(data, name, transferList))
+ )
+ }
+
+ /**
+ * Starts the minimum number of workers.
+ * @param initWorkerNodeUsage - Whether to initialize the worker node usage or not. @defaultValue false
+ */
+ private startMinimumNumberOfWorkers (initWorkerNodeUsage = false): void {
+ if (this.minimumNumberOfWorkers === 0) {
+ return
+ }
+ this.startingMinimumNumberOfWorkers = true
+ while (
+ this.workerNodes.reduce(
+ (accumulator, workerNode) =>
+ !workerNode.info.dynamic ? accumulator + 1 : accumulator,
+ 0
+ ) < this.minimumNumberOfWorkers
+ ) {
+ const workerNodeKey = this.createAndSetupWorkerNode()
+ initWorkerNodeUsage &&
+ this.initWorkerNodeUsage(this.workerNodes[workerNodeKey])
+ }
+ this.startingMinimumNumberOfWorkers = false
+ }
+
+ /** @inheritdoc */
+ public start (): void {
+ if (this.started) {
+ throw new Error('Cannot start an already started pool')
+ }
+ if (this.starting) {
+ throw new Error('Cannot start an already starting pool')
+ }
+ if (this.destroying) {
+ throw new Error('Cannot start a destroying pool')
+ }
+ this.starting = true
+ this.startMinimumNumberOfWorkers()
+ this.startTimestamp = performance.now()
+ this.starting = false
+ this.started = true
+ }
+
+ /** @inheritDoc */
+ public async destroy (): Promise<void> {
+ if (!this.started) {
+ throw new Error('Cannot destroy an already destroyed pool')
+ }
+ if (this.starting) {
+ throw new Error('Cannot destroy an starting pool')
+ }
+ if (this.destroying) {
+ throw new Error('Cannot destroy an already destroying pool')
+ }
+ this.destroying = true
+ await Promise.all(
+ this.workerNodes.map(async (_, workerNodeKey) => {