*/
protected readonly max?: number
+ /**
+ * The task functions added at runtime map:
+ * - `key`: The task function name.
+ * - `value`: The task function itself.
+ */
+ private readonly taskFunctions: Map<string, TaskFunction<Data, Response>>
+
/**
* Whether the pool is starting or not.
*/
this.setupHook()
+ this.taskFunctions = new Map<string, TaskFunction<Data, Response>>()
+
this.starting = true
this.startPool()
this.starting = false
* @param workerId - The worker id.
* @returns The worker node key if the worker id is found in the pool worker nodes, `-1` otherwise.
*/
- private getWorkerNodeKeyByWorkerId (workerId: number): number {
+ private getWorkerNodeKeyByWorkerId (workerId: number | undefined): number {
return this.workerNodes.findIndex(
workerNode => workerNode.info.id === workerId
)
}
private async sendTaskFunctionOperationToWorker (
+ workerNodeKey: number,
+ message: MessageValue<Data>
+ ): Promise<boolean> {
+ const workerId = this.getWorkerInfo(workerNodeKey).id as number
+ return await new Promise<boolean>((resolve, reject) => {
+ this.registerWorkerMessageListener(workerNodeKey, message => {
+ if (
+ message.workerId === workerId &&
+ message.taskFunctionOperationStatus === true
+ ) {
+ resolve(true)
+ } else if (
+ message.workerId === workerId &&
+ message.taskFunctionOperationStatus === false
+ ) {
+ reject(
+ new Error(
+ `Task function operation ${
+ message.taskFunctionOperation as string
+ } failed on worker ${message.workerId}`
+ )
+ )
+ }
+ })
+ this.sendToWorker(workerNodeKey, message)
+ })
+ }
+
+ private async sendTaskFunctionOperationToWorkers (
message: Omit<MessageValue<Data>, 'workerId'>
): Promise<boolean> {
return await new Promise<boolean>((resolve, reject) => {
new Error(
`Task function operation ${
message.taskFunctionOperation as string
- } failed on worker ${message.workerId}`
+ } failed on worker ${message.workerId as number}`
)
)
}
}
})
- this.sendToWorker(workerNodeKey, {
- ...message,
- workerId: this.getWorkerInfo(workerNodeKey).id as number
- })
+ this.sendToWorker(workerNodeKey, message)
}
})
}
/** @inheritDoc */
public async addTaskFunction (
name: string,
- taskFunction: TaskFunction
+ taskFunction: TaskFunction<Data, Response>
): Promise<boolean> {
- return await this.sendTaskFunctionOperationToWorker({
+ this.taskFunctions.set(name, taskFunction)
+ return await this.sendTaskFunctionOperationToWorkers({
taskFunctionOperation: 'add',
taskFunctionName: name,
taskFunction: taskFunction.toString()
/** @inheritDoc */
public async removeTaskFunction (name: string): Promise<boolean> {
- return await this.sendTaskFunctionOperationToWorker({
+ this.taskFunctions.delete(name)
+ return await this.sendTaskFunctionOperationToWorkers({
taskFunctionOperation: 'remove',
taskFunctionName: name
})
/** @inheritDoc */
public async setDefaultTaskFunction (name: string): Promise<boolean> {
- return await this.sendTaskFunctionOperationToWorker({
+ return await this.sendTaskFunctionOperationToWorkers({
taskFunctionOperation: 'default',
taskFunctionName: name
})
data: data ?? ({} as Data),
transferList,
timestamp,
- workerId: this.getWorkerInfo(workerNodeKey).id as number,
taskId: randomUUID()
}
this.promiseResponseMap.set(task.taskId as string, {
}
protected async sendKillMessageToWorker (
- workerNodeKey: number,
- workerId: number
+ workerNodeKey: number
): Promise<void> {
await new Promise<void>((resolve, reject) => {
this.registerWorkerMessageListener(workerNodeKey, message => {
if (message.kill === 'success') {
resolve()
} else if (message.kill === 'failure') {
- reject(new Error(`Worker ${workerId} kill message handling failed`))
+ reject(
+ new Error(
+ `Worker ${
+ message.workerId as number
+ } kill message handling failed`
+ )
+ )
}
})
- this.sendToWorker(workerNodeKey, { kill: true, workerId })
+ this.sendToWorker(workerNodeKey, { kill: true })
})
}
checkActive: true,
workerId: workerInfo.id as number
})
+ if (this.taskFunctions.size > 0) {
+ for (const [taskFunctionName, taskFunction] of this.taskFunctions) {
+ this.sendTaskFunctionOperationToWorker(workerNodeKey, {
+ taskFunctionOperation: 'add',
+ taskFunctionName,
+ taskFunction: taskFunction.toString()
+ }).catch(error => {
+ this.emitter?.emit(PoolEvents.error, error)
+ })
+ }
+ }
workerInfo.dynamic = true
if (
this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerReady ||
.runTime.aggregate,
elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
.elu.aggregate
- },
- workerId: this.getWorkerInfo(workerNodeKey).id as number
+ }
})
}
},
0
)
- const destinationWorkerNode = this.workerNodes[destinationWorkerNodeKey]
- const task = {
- ...(this.dequeueTask(workerNodeKey) as Task<Data>),
- workerId: destinationWorkerNode.info.id as number
- }
+ const task = this.dequeueTask(workerNodeKey) as Task<Data>
if (this.shallExecuteTask(destinationWorkerNodeKey)) {
this.executeTask(destinationWorkerNodeKey, task)
} else {
private taskStealingOnEmptyQueue (workerId: number): void {
const destinationWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(workerId)
- const destinationWorkerNode = this.workerNodes[destinationWorkerNodeKey]
const workerNodes = this.workerNodes
.slice()
.sort(
workerNode.usage.tasks.queued > 0
)
if (sourceWorkerNode != null) {
- const task = {
- ...(sourceWorkerNode.popTask() as Task<Data>),
- workerId: destinationWorkerNode.info.id as number
- }
+ const task = sourceWorkerNode.popTask() as Task<Data>
if (this.shallExecuteTask(destinationWorkerNodeKey)) {
this.executeTask(destinationWorkerNodeKey, task)
} else {
workerNode.usage.tasks.queued <
(this.opts.tasksQueueOptions?.size as number) - sizeOffset
) {
- const task = {
- ...(sourceWorkerNode.popTask() as Task<Data>),
- workerId: workerNode.info.id as number
- }
+ const task = sourceWorkerNode.popTask() as Task<Data>
if (this.shallExecuteTask(workerNodeKey)) {
this.executeTask(workerNodeKey, task)
} else {
this.getWorkerInfo(
this.getWorkerNodeKeyByWorkerId(message.workerId)
).taskFunctionNames = message.taskFunctionNames
- } else if (message.taskFunctionOperation != null) {
- // Task function operation response received from worker
}
}
}
private handleWorkerReadyResponse (message: MessageValue<Response>): void {
if (message.ready === false) {
- throw new Error(`Worker ${message.workerId} failed to initialize`)
+ throw new Error(
+ `Worker ${message.workerId as number} failed to initialize`
+ )
}
const workerInfo = this.getWorkerInfo(
this.getWorkerNodeKeyByWorkerId(message.workerId)