+ if (
+ sourceWorkerNode.info.ready &&
+ sourceWorkerNode.info.id !== workerId &&
+ sourceWorkerNode.usage.tasks.queued > 0
+ ) {
+ const task = {
+ ...(sourceWorkerNode.popTask() as Task<Data>),
+ workerId: destinationWorkerNode.info.id as number
+ }
+ if (
+ this.tasksQueueSize(destinationWorkerNodeKey) === 0 &&
+ destinationWorkerNode.usage.tasks.executing <
+ (this.opts.tasksQueueOptions?.concurrency as number)
+ ) {
+ this.executeTask(destinationWorkerNodeKey, task)
+ } else {
+ this.enqueueTask(destinationWorkerNodeKey, task)
+ }
+ if (destinationWorkerNode?.usage != null) {
+ ++destinationWorkerNode.usage.tasks.stolen
+ }
+ if (
+ this.shallUpdateTaskFunctionWorkerUsage(destinationWorkerNodeKey) &&
+ destinationWorkerNode.getTaskFunctionWorkerUsage(
+ task.name as string
+ ) != null
+ ) {
+ const taskFunctionWorkerUsage =
+ destinationWorkerNode.getTaskFunctionWorkerUsage(
+ task.name as string
+ ) as WorkerUsage
+ ++taskFunctionWorkerUsage.tasks.stolen
+ }
+ break
+ }
+ }
+ }
+
+ private tasksStealingOnBackPressure (workerId: number): void {
+ const sourceWorkerNode =
+ this.workerNodes[this.getWorkerNodeKeyByWorkerId(workerId)]
+ const workerNodes = this.workerNodes
+ .slice()
+ .sort(
+ (workerNodeA, workerNodeB) =>
+ workerNodeA.usage.tasks.queued - workerNodeB.usage.tasks.queued
+ )
+ for (const [workerNodeKey, workerNode] of workerNodes.entries()) {
+ if (
+ sourceWorkerNode.usage.tasks.queued > 0 &&
+ workerNode.info.ready &&
+ workerNode.info.id !== workerId &&
+ workerNode.usage.tasks.queued <
+ (this.opts.tasksQueueOptions?.size as number) - 1
+ ) {
+ const task = {
+ ...(sourceWorkerNode.popTask() as Task<Data>),
+ workerId: workerNode.info.id as number
+ }
+ if (
+ this.tasksQueueSize(workerNodeKey) === 0 &&
+ workerNode.usage.tasks.executing <
+ (this.opts.tasksQueueOptions?.concurrency as number)
+ ) {
+ this.executeTask(workerNodeKey, task)
+ } else {
+ this.enqueueTask(workerNodeKey, task)
+ }
+ if (workerNode?.usage != null) {
+ ++workerNode.usage.tasks.stolen
+ }
+ if (
+ this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
+ workerNode.getTaskFunctionWorkerUsage(task.name as string) != null
+ ) {
+ const taskFunctionWorkerUsage = workerNode.getTaskFunctionWorkerUsage(
+ task.name as string
+ ) as WorkerUsage
+ ++taskFunctionWorkerUsage.tasks.stolen
+ }
+ }
+ }
+ }
+
+ /**
+ * This method is the listener registered for each worker message.
+ *
+ * @returns The listener function to execute when a message is received from a worker.
+ */
+ protected workerListener (): (message: MessageValue<Response>) => void {
+ return (message) => {
+ this.checkMessageWorkerId(message)
+ if (message.ready != null && message.taskFunctions != null) {
+ // Worker ready response received from worker
+ this.handleWorkerReadyResponse(message)
+ } else if (message.taskId != null) {
+ // Task execution response received from worker
+ this.handleTaskExecutionResponse(message)
+ } else if (message.taskFunctions != null) {
+ // Task functions message received from worker
+ (
+ this.getWorkerInfo(
+ this.getWorkerNodeKeyByWorkerId(message.workerId)
+ ) as WorkerInfo
+ ).taskFunctions = message.taskFunctions
+ }
+ }
+ }
+
+ private handleWorkerReadyResponse (message: MessageValue<Response>): void {
+ if (message.ready === false) {
+ throw new Error(`Worker ${message.workerId} failed to initialize`)
+ }
+ const workerInfo = this.getWorkerInfo(
+ this.getWorkerNodeKeyByWorkerId(message.workerId)
+ ) as WorkerInfo
+ workerInfo.ready = message.ready as boolean
+ workerInfo.taskFunctions = message.taskFunctions
+ if (this.emitter != null && this.ready) {
+ this.emitter.emit(PoolEvents.ready, this.info)
+ }
+ }
+
+ private handleTaskExecutionResponse (message: MessageValue<Response>): void {
+ const { taskId, taskError, data } = message
+ const promiseResponse = this.promiseResponseMap.get(taskId as string)
+ if (promiseResponse != null) {
+ if (taskError != null) {
+ this.emitter?.emit(PoolEvents.taskError, taskError)
+ promiseResponse.reject(taskError.message)
+ } else {
+ promiseResponse.resolve(data as Response)
+ }
+ const workerNodeKey = promiseResponse.workerNodeKey
+ this.afterTaskExecutionHook(workerNodeKey, message)
+ this.promiseResponseMap.delete(taskId as string)
+ if (
+ this.opts.enableTasksQueue === true &&
+ this.tasksQueueSize(workerNodeKey) > 0 &&
+ this.workerNodes[workerNodeKey].usage.tasks.executing <
+ (this.opts.tasksQueueOptions?.concurrency as number)
+ ) {
+ this.executeTask(
+ workerNodeKey,
+ this.dequeueTask(workerNodeKey) as Task<Data>
+ )
+ }
+ this.workerChoiceStrategyContext.update(workerNodeKey)
+ }
+ }
+
+ private checkAndEmitTaskExecutionEvents (): void {
+ if (this.busy) {
+ this.emitter?.emit(PoolEvents.busy, this.info)
+ }
+ }
+
+ private checkAndEmitTaskQueuingEvents (): void {
+ if (this.hasBackPressure()) {
+ this.emitter?.emit(PoolEvents.backPressure, this.info)
+ }
+ }
+
+ private checkAndEmitDynamicWorkerCreationEvents (): void {
+ if (this.type === PoolTypes.dynamic) {
+ if (this.full) {