updateMeasurementStatistics
} from '../utils'
import { KillBehaviors } from '../worker/worker-options'
+import type { TaskFunction } from '../worker/task-functions'
import {
type IPool,
PoolEmitter,
}
}
+ private async sendTaskFunctionOperationToWorker (
+ message: Omit<MessageValue<Data>, 'workerId'>
+ ): Promise<boolean> {
+ return await new Promise<boolean>((resolve, reject) => {
+ const responsesReceived = new Array<MessageValue<Data | Response>>()
+ for (const [workerNodeKey] of this.workerNodes.entries()) {
+ this.registerWorkerMessageListener(workerNodeKey, message => {
+ if (message.taskFunctionOperationStatus != null) {
+ responsesReceived.push(message)
+ if (
+ responsesReceived.length === this.workerNodes.length &&
+ responsesReceived.every(
+ message => message.taskFunctionOperationStatus === true
+ )
+ ) {
+ resolve(true)
+ } else if (
+ responsesReceived.length === this.workerNodes.length &&
+ responsesReceived.some(
+ message => message.taskFunctionOperationStatus === false
+ )
+ ) {
+ reject(
+ new Error(
+ `Task function operation ${
+ message.taskFunctionOperation as string
+ } failed on worker ${message.workerId}`
+ )
+ )
+ }
+ }
+ })
+ this.sendToWorker(workerNodeKey, {
+ ...message,
+ workerId: this.getWorkerInfo(workerNodeKey).id as number
+ })
+ }
+ })
+ }
+
+ /** @inheritDoc */
+ public hasTaskFunction (name: string): boolean {
+ for (const workerNode of this.workerNodes) {
+ if (
+ Array.isArray(workerNode.info.taskFunctionNames) &&
+ workerNode.info.taskFunctionNames.includes(name)
+ ) {
+ return true
+ }
+ }
+ return false
+ }
+
+ /** @inheritDoc */
+ public async addTaskFunction (
+ name: string,
+ taskFunction: TaskFunction
+ ): Promise<boolean> {
+ return await this.sendTaskFunctionOperationToWorker({
+ taskFunctionOperation: 'add',
+ taskFunctionName: name,
+ taskFunction: taskFunction.toString()
+ })
+ }
+
/** @inheritDoc */
- public listTaskFunctions (): string[] {
+ public async removeTaskFunction (name: string): Promise<boolean> {
+ return await this.sendTaskFunctionOperationToWorker({
+ taskFunctionOperation: 'remove',
+ taskFunctionName: name
+ })
+ }
+
+ /** @inheritDoc */
+ public listTaskFunctionNames (): string[] {
for (const workerNode of this.workerNodes) {
if (
- Array.isArray(workerNode.info.taskFunctions) &&
- workerNode.info.taskFunctions.length > 0
+ Array.isArray(workerNode.info.taskFunctionNames) &&
+ workerNode.info.taskFunctionNames.length > 0
) {
- return workerNode.info.taskFunctions
+ return workerNode.info.taskFunctionNames
}
}
return []
}
+ /** @inheritDoc */
+ public async setDefaultTaskFunction (name: string): Promise<boolean> {
+ return await this.sendTaskFunctionOperationToWorker({
+ taskFunctionOperation: 'default',
+ taskFunctionName: name
+ })
+ }
+
private shallExecuteTask (workerNodeKey: number): boolean {
return (
this.tasksQueueSize(workerNodeKey) === 0 &&
const workerInfo = this.getWorkerInfo(workerNodeKey)
return (
workerInfo != null &&
- Array.isArray(workerInfo.taskFunctions) &&
- workerInfo.taskFunctions.length > 2
+ Array.isArray(workerInfo.taskFunctionNames) &&
+ workerInfo.taskFunctionNames.length > 2
)
}
) {
--workerTaskStatistics.executing
}
- if (message.taskError == null) {
+ if (message.workerError == null) {
++workerTaskStatistics.executed
} else {
++workerTaskStatistics.failed
workerUsage: WorkerUsage,
message: MessageValue<Response>
): void {
- if (message.taskError != null) {
+ if (message.workerError != null) {
return
}
updateMeasurementStatistics(
workerUsage: WorkerUsage,
message: MessageValue<Response>
): void {
- if (message.taskError != null) {
+ if (message.workerError != null) {
return
}
const eluTaskStatisticsRequirements: MeasurementStatisticsRequirements =
this.emitter?.emit(PoolEvents.error, error)
if (
this.opts.restartWorkerOnError === true &&
- !this.starting &&
- this.started
+ this.started &&
+ !this.starting
) {
if (workerInfo.dynamic) {
this.createAndSetupDynamicWorkerNode()
protected workerListener (): (message: MessageValue<Response>) => void {
return message => {
this.checkMessageWorkerId(message)
- if (message.ready != null && message.taskFunctions != null) {
+ if (message.ready != null && message.taskFunctionNames != null) {
// Worker ready response received from worker
this.handleWorkerReadyResponse(message)
} else if (message.taskId != null) {
// Task execution response received from worker
this.handleTaskExecutionResponse(message)
- } else if (message.taskFunctions != null) {
- // Task functions message received from worker
+ } else if (message.taskFunctionNames != null) {
+ // Task function names message received from worker
this.getWorkerInfo(
this.getWorkerNodeKeyByWorkerId(message.workerId)
- ).taskFunctions = message.taskFunctions
+ ).taskFunctionNames = message.taskFunctionNames
+ } else if (message.taskFunctionOperation != null) {
+ // Task function operation response received from worker
}
}
}
this.getWorkerNodeKeyByWorkerId(message.workerId)
)
workerInfo.ready = message.ready as boolean
- workerInfo.taskFunctions = message.taskFunctions
+ workerInfo.taskFunctionNames = message.taskFunctionNames
if (this.emitter != null && this.ready) {
this.emitter.emit(PoolEvents.ready, this.info)
}
}
private handleTaskExecutionResponse (message: MessageValue<Response>): void {
- const { taskId, taskError, data } = message
+ const { taskId, workerError, data } = message
const promiseResponse = this.promiseResponseMap.get(taskId as string)
if (promiseResponse != null) {
- if (taskError != null) {
- this.emitter?.emit(PoolEvents.taskError, taskError)
- promiseResponse.reject(taskError.message)
+ if (workerError != null) {
+ this.emitter?.emit(PoolEvents.taskError, workerError)
+ promiseResponse.reject(workerError.message)
} else {
promiseResponse.resolve(data as Response)
}