this.opts.errorHandler ?? EMPTY_FUNCTION
)
workerNode.registerWorkerEventHandler('error', (error: Error) => {
- const workerNodeKey = this.getWorkerNodeKeyByWorker(workerNode.worker)
workerNode.info.ready = false
this.emitter?.emit(PoolEvents.error, error)
if (
}
}
if (this.started && this.opts.enableTasksQueue === true) {
- this.redistributeQueuedTasks(workerNodeKey)
+ this.redistributeQueuedTasks(this.workerNodes.indexOf(workerNode))
}
workerNode.terminate().catch(error => {
this.emitter?.emit(PoolEvents.error, error)
this.opts.exitHandler ?? EMPTY_FUNCTION
)
workerNode.registerOnceWorkerEventHandler('exit', () => {
- this.removeWorkerNode(workerNode.worker)
+ this.removeWorkerNode(workerNode)
})
const workerNodeKey = this.addWorkerNode(workerNode)
this.afterWorkerNodeSetup(workerNodeKey)
}
/**
- * Removes the worker node associated to the given worker from the pool worker nodes.
+ * Removes the worker node from the pool worker nodes.
*
- * @param worker - The worker.
+ * @param workerNode - The worker node.
*/
- private removeWorkerNode (worker: Worker): void {
- const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
+ private removeWorkerNode (workerNode: IWorkerNode<Worker, Data>): void {
+ const workerNodeKey = this.workerNodes.indexOf(workerNode)
if (workerNodeKey !== -1) {
this.workerNodes.splice(workerNodeKey, 1)
this.workerChoiceStrategyContext.remove(workerNodeKey)
'Cannot construct a worker node without worker node options'
)
}
- if (opts != null && !isPlainObject(opts)) {
+ if (!isPlainObject(opts)) {
throw new TypeError(
'Cannot construct a worker node with invalid options: must be a plain object'
)
* Constructs a new worker node.
*
* @param type - The worker type.
- * @param filePath - The worker file path.
+ * @param filePath - Path to the worker file.
* @param opts - The worker node options.
*/
constructor (type: WorkerType, filePath: string, opts: WorkerNodeOptions) {
-import { MessageChannel } from 'node:worker_threads'
+import { MessageChannel, Worker as ThreadWorker } from 'node:worker_threads'
+import { Worker as ClusterWorker } from 'node:cluster'
import { expect } from 'expect'
import { WorkerNode } from '../../lib/pools/worker-node.js'
import { WorkerTypes } from '../../lib/index.js'
)
)
expect(threadWorkerNode).toBeInstanceOf(WorkerNode)
+ expect(threadWorkerNode.worker).toBeInstanceOf(ThreadWorker)
expect(threadWorkerNode.info).toStrictEqual({
id: threadWorkerNode.worker.threadId,
type: WorkerTypes.thread,
expect(threadWorkerNode.taskFunctionsUsage).toBeInstanceOf(Map)
expect(clusterWorkerNode).toBeInstanceOf(WorkerNode)
+ expect(clusterWorkerNode.worker).toBeInstanceOf(ClusterWorker)
expect(clusterWorkerNode.info).toStrictEqual({
id: clusterWorkerNode.worker.id,
type: WorkerTypes.cluster,
const start = performance.now()
await sleep(1000)
const elapsed = performance.now() - start
- expect(elapsed).toBeGreaterThanOrEqual(999)
+ expect(elapsed).toBeGreaterThanOrEqual(1000)
})
it('Verify exponentialDelay() behavior', () => {