throw new Error('Worker message received without worker id')
} else if (this.getWorkerNodeKeyByWorkerId(message.workerId) === -1) {
throw new Error(
- `Worker message received from unknown worker '${message.workerId}'`
+ `Worker message received from unknown worker '${message.workerId.toString()}'`
)
}
}
} else {
reject(
new Error(
- `Task function operation '${message.taskFunctionOperation}' failed on worker ${message.workerId} with error: '${message.workerError?.message}'`
+ // eslint-disable-next-line @typescript-eslint/restrict-template-expressions
+ `Task function operation '${message.taskFunctionOperation?.toString()}' failed on worker ${message.workerId?.toString()} with error: '${
+ // eslint-disable-next-line @typescript-eslint/restrict-template-expressions
+ message.workerError?.message
+ }'`
)
)
}
new Error(
`Task function operation '${
message.taskFunctionOperation as string
- }' failed on worker ${errorResponse?.workerId} with error: '${
+ // eslint-disable-next-line @typescript-eslint/restrict-template-expressions
+ }' failed on worker ${errorResponse?.workerId?.toString()} with error: '${
+ // eslint-disable-next-line @typescript-eslint/restrict-template-expressions
errorResponse?.workerError?.message
}'`
)
const workerNodeKey = this.chooseWorkerNode(name)
const task: Task<Data> = {
name: name ?? DEFAULT_TASK_NAME,
-
data: data ?? ({} as Data),
priority: this.getWorkerNodeTaskFunctionPriority(workerNodeKey, name),
strategy: this.getWorkerNodeTaskFunctionWorkerChoiceStrategy(
})
}
+
+ /** @inheritDoc */
+ public mapExecute (
+ data: Iterable<Data>,
+ name?: string,
+ transferList?: readonly TransferListItem[]
+ ): Promise<Response[]> {
+ return Promise.all(
+ [...data].map(data => this.execute(data, name, transferList))
+ )
+ }
+
/**
* Starts the minimum number of workers.
- * @param initWorkerNodeUsage
+ * @param initWorkerNodeUsage - Whether to initialize the worker node usage or not. @defaultValue false
*/
private startMinimumNumberOfWorkers (initWorkerNodeUsage = false): void {
this.startingMinimumNumberOfWorkers = true
private async sendKillMessageToWorker (workerNodeKey: number): Promise<void> {
await new Promise<void>((resolve, reject) => {
+ // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
if (this.workerNodes[workerNodeKey] == null) {
resolve()
return
} else if (message.kill === 'failure') {
reject(
new Error(
- `Kill message handling failed on worker ${message.workerId}`
+ // eslint-disable-next-line @typescript-eslint/restrict-template-expressions
+ `Kill message handling failed on worker ${message.workerId?.toString()}`
)
)
}
workerNodeKey: number,
task: Task<Data>
): void {
+ // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
if (this.workerNodes[workerNodeKey]?.usage != null) {
const workerUsage = this.workerNodes[workerNodeKey].usage
++workerUsage.tasks.executing
message: MessageValue<Response>
): void {
let needWorkerChoiceStrategiesUpdate = false
-
+ // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
if (this.workerNodes[workerNodeKey]?.usage != null) {
const workerUsage = this.workerNodes[workerNodeKey].usage
updateTaskStatisticsWorkerUsage(workerUsage, message)
) {
this.redistributeQueuedTasks(this.workerNodes.indexOf(workerNode))
}
-
+ // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
workerNode?.terminate().catch((error: unknown) => {
this.emitter?.emit(PoolEvents.error, error)
})
taskName: string
): void {
const workerNode = this.workerNodes[workerNodeKey]
-
+ // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
if (workerNode?.usage != null) {
++workerNode.usage.tasks.stolen
}
previousTaskName?: string
): void {
const workerNode = this.workerNodes[workerNodeKey]
-
+ // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
if (workerNode?.usage != null) {
++workerNode.usage.tasks.sequentiallyStolen
}
taskName: string
): void {
const workerNode = this.workerNodes[workerNodeKey]
-
+ // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
if (workerNode?.usage != null) {
workerNode.usage.tasks.sequentiallyStolen = 0
}
const workerInfo = this.getWorkerInfo(workerNodeKey)
if (workerInfo == null) {
throw new Error(
- `Worker node with key '${workerNodeKey}' not found in pool`
+ `Worker node with key '${workerNodeKey.toString()}' not found in pool`
)
}
if (
const workerInfo = this.getWorkerInfo(workerNodeKey)
if (workerInfo == null) {
throw new Error(
- `Worker node with key '${workerNodeKey}' not found in pool`
+ `Worker node with key '${workerNodeKey.toString()}' not found in pool`
)
}
workerInfo.stealing = true
/**
* This method is the message listener registered on each worker.
- * @param message
+ * @param message - The message received from the worker.
*/
protected readonly workerMessageListener = (
message: MessageValue<Response>
private handleWorkerReadyResponse (message: MessageValue<Response>): void {
const { workerId, ready, taskFunctionsProperties } = message
if (ready == null || !ready) {
- throw new Error(`Worker ${workerId} failed to initialize`)
+ // eslint-disable-next-line @typescript-eslint/restrict-template-expressions
+ throw new Error(`Worker ${workerId?.toString()} failed to initialize`)
}
const workerNodeKey = this.getWorkerNodeKeyByWorkerId(workerId)
const workerNode = this.workerNodes[workerNodeKey]
this.afterTaskExecutionHook(workerNodeKey, message)
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
this.promiseResponseMap.delete(taskId!)
+ // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
workerNode?.emit('taskFinished', taskId)
if (
this.opts.enableTasksQueue === true &&
!this.destroying &&
+ // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
workerNode != null
) {
const workerNodeTasksUsage = workerNode.usage.tasks