PoolTypes,
type TasksQueueOptions
} from './pool'
-import type {
- IWorker,
- IWorkerNode,
- WorkerInfo,
- WorkerType,
- WorkerUsage
+import {
+ type IWorker,
+ type IWorkerNode,
+ type WorkerInfo,
+ type WorkerType,
+ type WorkerUsage
} from './worker'
import {
type MeasurementStatisticsRequirements,
message: MessageValue<Data>
): Promise<boolean> {
return await new Promise<boolean>((resolve, reject) => {
- const workerId = this.getWorkerInfo(workerNodeKey).id as number
- this.registerWorkerMessageListener(workerNodeKey, message => {
+ const taskFunctionOperationListener = (
+ message: MessageValue<Response>
+ ): void => {
+ this.checkMessageWorkerId(message)
+ const workerId = this.getWorkerInfo(workerNodeKey).id as number
if (
- message.workerId === workerId &&
- message.taskFunctionOperationStatus === true
- ) {
- resolve(true)
- } else if (
- message.workerId === workerId &&
- message.taskFunctionOperationStatus === false
+ message.taskFunctionOperationStatus != null &&
+ message.workerId === workerId
) {
- reject(
- new Error(
- `Task function operation '${
- message.taskFunctionOperation as string
- }' failed on worker ${message.workerId} with error: '${
- message.workerError?.message as string
- }'`
+ if (message.taskFunctionOperationStatus) {
+ resolve(true)
+ } else if (!message.taskFunctionOperationStatus) {
+ reject(
+ new Error(
+ `Task function operation '${
+ message.taskFunctionOperation as string
+ }' failed on worker ${message.workerId} with error: '${
+ message.workerError?.message as string
+ }'`
+ )
)
+ }
+ this.deregisterWorkerMessageListener(
+ this.getWorkerNodeKeyByWorkerId(message.workerId),
+ taskFunctionOperationListener
)
}
- })
+ }
+ this.registerWorkerMessageListener(
+ workerNodeKey,
+ taskFunctionOperationListener
+ )
this.sendToWorker(workerNodeKey, message)
})
}
message: MessageValue<Data>
): Promise<boolean> {
return await new Promise<boolean>((resolve, reject) => {
- const responsesReceived = new Array<MessageValue<Data | Response>>()
- for (const [workerNodeKey] of this.workerNodes.entries()) {
- this.registerWorkerMessageListener(workerNodeKey, message => {
- if (message.taskFunctionOperationStatus != null) {
- responsesReceived.push(message)
+ const responsesReceived = new Array<MessageValue<Response>>()
+ const taskFunctionOperationsListener = (
+ message: MessageValue<Response>
+ ): void => {
+ this.checkMessageWorkerId(message)
+ if (message.taskFunctionOperationStatus != null) {
+ responsesReceived.push(message)
+ if (responsesReceived.length === this.workerNodes.length) {
if (
- responsesReceived.length === this.workerNodes.length &&
responsesReceived.every(
message => message.taskFunctionOperationStatus === true
)
) {
resolve(true)
} else if (
- responsesReceived.length === this.workerNodes.length &&
responsesReceived.some(
message => message.taskFunctionOperationStatus === false
)
)
)
}
+ this.deregisterWorkerMessageListener(
+ this.getWorkerNodeKeyByWorkerId(message.workerId),
+ taskFunctionOperationsListener
+ )
}
- })
+ }
+ }
+ for (const [workerNodeKey] of this.workerNodes.entries()) {
+ this.registerWorkerMessageListener(
+ workerNodeKey,
+ taskFunctionOperationsListener
+ )
this.sendToWorker(workerNodeKey, message)
}
})
workerNodeKey: number
): Promise<void> {
await new Promise<void>((resolve, reject) => {
- this.registerWorkerMessageListener(workerNodeKey, message => {
+ const killMessageListener = (message: MessageValue<Response>): void => {
+ this.checkMessageWorkerId(message)
if (message.kill === 'success') {
resolve()
} else if (message.kill === 'failure') {
reject(
new Error(
- `Worker ${
+ `Kill message handling failed on worker ${
message.workerId as number
- } kill message handling failed`
+ }`
)
)
}
- })
+ }
+ this.registerWorkerMessageListener(workerNodeKey, killMessageListener)
this.sendToWorker(workerNodeKey, { kill: true })
})
}
listener: (message: MessageValue<Message>) => void
): void
+ /**
+ * Registers once a listener callback on the worker given its worker node key.
+ *
+ * @param workerNodeKey - The worker node key.
+ * @param listener - The message listener callback.
+ */
+ protected abstract registerOnceWorkerMessageListener<
+ Message extends Data | Response
+ >(
+ workerNodeKey: number,
+ listener: (message: MessageValue<Message>) => void
+ ): void
+
+ /**
+ * Deregisters a listener callback on the worker given its worker node key.
+ *
+ * @param workerNodeKey - The worker node key.
+ * @param listener - The message listener callback.
+ */
+ protected abstract deregisterWorkerMessageListener<
+ Message extends Data | Response
+ >(
+ workerNodeKey: number,
+ listener: (message: MessageValue<Message>) => void
+ ): void
+
/**
* Method hooked up after a worker node has been newly created.
* Can be overridden.
const workerNode = this.workerNodes[workerNodeKey]
const worker = workerNode.worker
const waitWorkerExit = new Promise<void>(resolve => {
- worker.on('exit', () => {
+ worker.once('exit', () => {
resolve()
})
})
- worker.on('disconnect', () => {
+ worker.once('disconnect', () => {
worker.kill()
})
await this.sendKillMessageToWorker(workerNodeKey)
this.workerNodes[workerNodeKey].worker.on('message', listener)
}
+ /** @inheritDoc */
+ protected registerOnceWorkerMessageListener<Message extends Data | Response>(
+ workerNodeKey: number,
+ listener: (message: MessageValue<Message>) => void
+ ): void {
+ this.workerNodes[workerNodeKey].worker.once('message', listener)
+ }
+
+ /** @inheritDoc */
+ protected deregisterWorkerMessageListener<Message extends Data | Response>(
+ workerNodeKey: number,
+ listener: (message: MessageValue<Message>) => void
+ ): void {
+ this.workerNodes[workerNodeKey].worker.off('message', listener)
+ }
+
/** @inheritDoc */
protected createWorker (): Worker {
return cluster.fork(this.opts.env)
const workerNode = this.workerNodes[workerNodeKey]
const worker = workerNode.worker
const waitWorkerExit = new Promise<void>(resolve => {
- worker.on('exit', () => {
+ worker.once('exit', () => {
resolve()
})
})
).port1.on('message', listener)
}
+ /** @inheritDoc */
+ protected registerOnceWorkerMessageListener<Message extends Data | Response>(
+ workerNodeKey: number,
+ listener: (message: MessageValue<Message>) => void
+ ): void {
+ (
+ this.workerNodes[workerNodeKey].messageChannel as MessageChannel
+ ).port1.once('message', listener)
+ }
+
+ /** @inheritDoc */
+ protected deregisterWorkerMessageListener<Message extends Data | Response>(
+ workerNodeKey: number,
+ listener: (message: MessageValue<Message>) => void
+ ): void {
+ (
+ this.workerNodes[workerNodeKey].messageChannel as MessageChannel
+ ).port1.off('message', listener)
+ }
+
/** @inheritDoc */
protected createWorker (): Worker {
return new Worker(this.filePath, {