private addWorkerNode (worker: Worker): number {
const workerNode = new WorkerNode<Worker, Data>(
worker,
- this.worker,
this.opts.tasksQueueOptions?.size ?? Math.pow(this.maxSize, 2)
)
// Flag the worker node as ready at pool startup.
protected async destroyWorkerNode (workerNodeKey: number): Promise<void> {
this.flushTasksQueue(workerNodeKey)
// FIXME: wait for tasks to be finished
- const worker = this.workerNodes[workerNodeKey].worker
+ const workerNode = this.workerNodes[workerNodeKey]
+ const worker = workerNode.worker
const waitWorkerExit = new Promise<void>((resolve) => {
worker.on('exit', () => {
resolve()
worker.on('disconnect', () => {
worker.kill()
})
- await this.sendKillMessageToWorker(workerNodeKey, worker.id)
+ await this.sendKillMessageToWorker(
+ workerNodeKey,
+ workerNode.info.id as number
+ )
worker.disconnect()
await waitWorkerExit
}
protected sendStartupMessageToWorker (workerNodeKey: number): void {
this.sendToWorker(workerNodeKey, {
ready: false,
- workerId: this.workerNodes[workerNodeKey].worker.id
+ workerId: this.workerNodes[workerNodeKey].info.id as number
})
}
resolve()
})
})
- await this.sendKillMessageToWorker(workerNodeKey, worker.threadId)
+ await this.sendKillMessageToWorker(
+ workerNodeKey,
+ workerNode.info.id as number
+ )
workerNode.closeChannel()
await worker.terminate()
await waitWorkerExit
/** @inheritDoc */
protected sendStartupMessageToWorker (workerNodeKey: number): void {
- const worker = this.workerNodes[workerNodeKey].worker
- const port2: MessagePort = (
- this.workerNodes[workerNodeKey].messageChannel as MessageChannel
- ).port2
+ const workerNode = this.workerNodes[workerNodeKey]
+ const worker = workerNode.worker
+ const port2: MessagePort = (workerNode.messageChannel as MessageChannel)
+ .port2
worker.postMessage(
{
ready: false,
- workerId: worker.threadId,
+ workerId: workerNode.info.id,
port: port2
},
[port2]
DEFAULT_TASK_NAME,
EMPTY_FUNCTION,
exponentialDelay,
+ getWorkerId,
+ getWorkerType,
sleep
} from '../utils'
import { Deque } from '../deque'
* Constructs a new worker node.
*
* @param worker - The worker.
- * @param workerType - The worker type.
* @param tasksQueueBackPressureSize - The tasks queue back pressure size.
*/
- constructor (
- worker: Worker,
- workerType: WorkerType,
- tasksQueueBackPressureSize: number
- ) {
+ constructor (worker: Worker, tasksQueueBackPressureSize: number) {
if (worker == null) {
throw new TypeError('Cannot construct a worker node without a worker')
}
- if (workerType == null) {
- throw new TypeError(
- 'Cannot construct a worker node without a worker type'
- )
- }
+
if (tasksQueueBackPressureSize == null) {
throw new TypeError(
'Cannot construct a worker node without a tasks queue back pressure size'
)
}
this.worker = worker
- this.info = this.initWorkerInfo(worker, workerType)
+ this.info = this.initWorkerInfo(worker)
this.usage = this.initWorkerUsage()
- if (workerType === WorkerTypes.thread) {
+ if (this.info.type === WorkerTypes.thread) {
this.messageChannel = new MessageChannel()
}
this.tasksQueueBackPressureSize = tasksQueueBackPressureSize
await this.startOnEmptyQueue()
}
- private initWorkerInfo (worker: Worker, workerType: WorkerType): WorkerInfo {
+ private initWorkerInfo (worker: Worker): WorkerInfo {
return {
- id: this.getWorkerId(worker, workerType),
- type: workerType,
+ id: getWorkerId(worker),
+ type: getWorkerType(worker) as WorkerType,
dynamic: false,
ready: false
}
}
}
}
-
- /**
- * Gets the worker id.
- *
- * @param worker - The worker.
- * @param workerType - The worker type.
- * @returns The worker id.
- */
- private getWorkerId (
- worker: Worker,
- workerType: WorkerType
- ): number | undefined {
- if (workerType === WorkerTypes.thread) {
- return worker.threadId
- } else if (workerType === WorkerTypes.cluster) {
- return worker.id
- }
- }
}
import * as os from 'node:os'
import { webcrypto } from 'node:crypto'
+import { Worker as ClusterWorker } from 'node:cluster'
+import { Worker as ThreadWorker } from 'node:worker_threads'
import type {
MeasurementStatisticsRequirements,
WorkerChoiceStrategyOptions
} from './pools/selection-strategies/selection-strategies-types'
import type { KillBehavior } from './worker/worker-options'
-import type { MeasurementStatistics } from './pools/worker'
+import {
+ type IWorker,
+ type MeasurementStatistics,
+ type WorkerType,
+ WorkerTypes
+} from './pools/worker'
/**
* Default task name.
)
}
+/**
+ * Returns the worker type of the given worker.
+ *
+ * @param worker - The worker to get the type of.
+ * @returns The worker type of the given worker.
+ * @internal
+ */
+export const getWorkerType = <Worker extends IWorker>(
+ worker: Worker
+): WorkerType | undefined => {
+ if (worker instanceof ThreadWorker) {
+ return WorkerTypes.thread
+ }
+ if (worker instanceof ClusterWorker) {
+ return WorkerTypes.cluster
+ }
+}
+
+/**
+ * Returns the worker id of the given worker.
+ *
+ * @param worker - The worker to get the id of.
+ * @returns The worker id of the given worker.
+ * @internal
+ */
+export const getWorkerId = <Worker extends IWorker>(
+ worker: Worker
+): number | undefined => {
+ if (worker instanceof ThreadWorker) {
+ return worker.threadId
+ } else if (worker instanceof ClusterWorker) {
+ return worker.id
+ }
+}
+
/**
* Computes the median of the given data set.
*
const { MessageChannel, Worker } = require('worker_threads')
+const cluster = require('cluster')
const { expect } = require('expect')
const { WorkerNode } = require('../../../lib/pools/worker-node')
const { WorkerTypes } = require('../../../lib')
const { DEFAULT_TASK_NAME } = require('../../../lib/utils')
describe('Worker node test suite', () => {
- const worker = new Worker('./tests/worker-files/thread/testWorker.js')
- const workerNode = new WorkerNode(worker, WorkerTypes.thread, 12)
+ const threadWorker = new Worker('./tests/worker-files/thread/testWorker.js')
+ const clusterWorker = cluster.fork()
+ const threadWorkerNode = new WorkerNode(threadWorker, 12)
+ const clusterWorkerNode = new WorkerNode(clusterWorker, 12)
it('Worker node instantiation', () => {
expect(() => new WorkerNode()).toThrowError(
new TypeError('Cannot construct a worker node without a worker')
)
- expect(() => new WorkerNode(worker)).toThrowError(
- new TypeError('Cannot construct a worker node without a worker type')
- )
- expect(() => new WorkerNode(worker, WorkerTypes.thread)).toThrowError(
+ expect(() => new WorkerNode(threadWorker)).toThrowError(
new TypeError(
'Cannot construct a worker node without a tasks queue back pressure size'
)
)
expect(
- () =>
- new WorkerNode(
- worker,
- WorkerTypes.thread,
- 'invalidTasksQueueBackPressureSize'
- )
+ () => new WorkerNode(threadWorker, 'invalidTasksQueueBackPressureSize')
).toThrowError(
new TypeError(
'Cannot construct a worker node with a tasks queue back pressure size that is not an integer'
)
)
- expect(workerNode).toBeInstanceOf(WorkerNode)
- expect(workerNode.worker).toBe(worker)
- expect(workerNode.info).toStrictEqual({
- id: worker.threadId,
+ expect(threadWorkerNode).toBeInstanceOf(WorkerNode)
+ expect(threadWorkerNode.worker).toBe(threadWorker)
+ expect(threadWorkerNode.info).toStrictEqual({
+ id: threadWorker.threadId,
type: WorkerTypes.thread,
dynamic: false,
ready: false
})
- expect(workerNode.usage).toStrictEqual({
+ expect(threadWorkerNode.usage).toStrictEqual({
+ tasks: {
+ executed: 0,
+ executing: 0,
+ queued: 0,
+ maxQueued: 0,
+ stolen: 0,
+ failed: 0
+ },
+ runTime: {
+ history: expect.any(CircularArray)
+ },
+ waitTime: {
+ history: expect.any(CircularArray)
+ },
+ elu: {
+ idle: {
+ history: expect.any(CircularArray)
+ },
+ active: {
+ history: expect.any(CircularArray)
+ }
+ }
+ })
+ expect(threadWorkerNode.messageChannel).toBeInstanceOf(MessageChannel)
+ expect(threadWorkerNode.tasksQueueBackPressureSize).toBe(12)
+ expect(threadWorkerNode.tasksQueue).toBeInstanceOf(Deque)
+ expect(threadWorkerNode.tasksQueue.size).toBe(0)
+ expect(threadWorkerNode.taskFunctionsUsage).toBeInstanceOf(Map)
+
+ expect(clusterWorkerNode).toBeInstanceOf(WorkerNode)
+ expect(clusterWorkerNode.worker).toBe(clusterWorker)
+ expect(clusterWorkerNode.info).toStrictEqual({
+ id: clusterWorker.id,
+ type: WorkerTypes.cluster,
+ dynamic: false,
+ ready: false
+ })
+ expect(clusterWorkerNode.usage).toStrictEqual({
tasks: {
executed: 0,
executing: 0,
}
}
})
- expect(workerNode.messageChannel).toBeInstanceOf(MessageChannel)
- expect(workerNode.tasksQueueBackPressureSize).toBe(12)
- expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
- expect(workerNode.tasksQueue.size).toBe(0)
- expect(workerNode.taskFunctionsUsage).toBeInstanceOf(Map)
+ expect(clusterWorkerNode.messageChannel).toBeUndefined()
+ expect(clusterWorkerNode.tasksQueueBackPressureSize).toBe(12)
+ expect(clusterWorkerNode.tasksQueue).toBeInstanceOf(Deque)
+ expect(clusterWorkerNode.tasksQueue.size).toBe(0)
+ expect(clusterWorkerNode.taskFunctionsUsage).toBeInstanceOf(Map)
})
it('Worker node getTaskFunctionWorkerUsage()', () => {
expect(() =>
- workerNode.getTaskFunctionWorkerUsage('invalidTaskFunction')
+ threadWorkerNode.getTaskFunctionWorkerUsage('invalidTaskFunction')
).toThrowError(
new TypeError(
"Cannot get task function worker usage for task function name 'invalidTaskFunction' when task function names list is not yet defined"
)
)
- workerNode.info.taskFunctions = [DEFAULT_TASK_NAME, 'fn1']
+ threadWorkerNode.info.taskFunctions = [DEFAULT_TASK_NAME, 'fn1']
expect(() =>
- workerNode.getTaskFunctionWorkerUsage('invalidTaskFunction')
+ threadWorkerNode.getTaskFunctionWorkerUsage('invalidTaskFunction')
).toThrowError(
new TypeError(
"Cannot get task function worker usage for task function name 'invalidTaskFunction' when task function names list has less than 3 elements"
)
)
- workerNode.info.taskFunctions = [DEFAULT_TASK_NAME, 'fn1', 'fn2']
+ threadWorkerNode.info.taskFunctions = [DEFAULT_TASK_NAME, 'fn1', 'fn2']
expect(
- workerNode.getTaskFunctionWorkerUsage(DEFAULT_TASK_NAME)
+ threadWorkerNode.getTaskFunctionWorkerUsage(DEFAULT_TASK_NAME)
).toStrictEqual({
tasks: {
executed: 0,
}
}
})
- expect(workerNode.getTaskFunctionWorkerUsage('fn1')).toStrictEqual({
+ expect(threadWorkerNode.getTaskFunctionWorkerUsage('fn1')).toStrictEqual({
tasks: {
executed: 0,
executing: 0,
}
}
})
- expect(workerNode.getTaskFunctionWorkerUsage('fn2')).toStrictEqual({
+ expect(threadWorkerNode.getTaskFunctionWorkerUsage('fn2')).toStrictEqual({
tasks: {
executed: 0,
executing: 0,
}
}
})
- expect(workerNode.taskFunctionsUsage.size).toBe(2)
+ expect(threadWorkerNode.taskFunctionsUsage.size).toBe(2)
})
})