## [Unreleased]
+### Changed
+
+- TypeScript breaking change: merge ThreadPoolOptions and ClusterPoolOptions types into PoolOptions type.
+
## [3.0.14] - 2023-12-13
### Fixed
- [`pool.listTaskFunctionNames()`](#poollisttaskfunctionnames)
- [`pool.setDefaultTaskFunction(name)`](#poolsetdefaulttaskfunctionname)
- [`PoolOptions`](#pooloptions)
- - [`ThreadPoolOptions extends PoolOptions`](#threadpooloptions-extends-pooloptions)
- - [`ClusterPoolOptions extends PoolOptions`](#clusterpooloptions-extends-pooloptions)
- [Worker](#worker)
- [`class YourWorker extends ThreadWorker/ClusterWorker`](#class-yourworker-extends-threadworkerclusterworker)
- [`YourWorker.hasTaskFunction(name)`](#yourworkerhastaskfunctionname)
Default: `{ size: (pool maximum size)^2, concurrency: 1, taskStealing: true, tasksStealingOnBackPressure: true }`
-#### `ThreadPoolOptions extends PoolOptions`
-
- `workerOptions` (optional) - An object with the worker options to pass to worker. See [worker_threads](https://nodejs.org/api/worker_threads.html#worker_threads_new_worker_filename_options) for more details.
-#### `ClusterPoolOptions extends PoolOptions`
-
- `env` (optional) - An object with the environment variables to pass to worker. See [cluster](https://nodejs.org/api/cluster.html#cluster_cluster_fork_env) for more details.
- `settings` (optional) - An object with the cluster settings. See [cluster](https://nodejs.org/api/cluster.html#cluster_cluster_settings) for more details.
export type { AbstractPool } from './pools/abstract-pool'
export { DynamicClusterPool } from './pools/cluster/dynamic'
-export {
- FixedClusterPool,
- type ClusterPoolOptions
-} from './pools/cluster/fixed'
+export { FixedClusterPool } from './pools/cluster/fixed'
export { PoolEvents, PoolTypes } from './pools/pool'
export type {
IPool,
StrategyData,
TaskStatistics,
WorkerInfo,
+ WorkerNodeOptions,
WorkerType,
WorkerUsage
} from './pools/worker'
} from './pools/selection-strategies/selection-strategies-types'
export type { WorkerChoiceStrategyContext } from './pools/selection-strategies/worker-choice-strategy-context'
export { DynamicThreadPool } from './pools/thread/dynamic'
-export { FixedThreadPool, type ThreadPoolOptions } from './pools/thread/fixed'
+export { FixedThreadPool } from './pools/thread/fixed'
export type { AbstractWorker } from './worker/abstract-worker'
export { ClusterWorker } from './worker/cluster-worker'
export { ThreadWorker } from './worker/thread-worker'
transferList?: TransferListItem[]
): void
- /**
- * Creates a new worker.
- *
- * @returns Newly created worker.
- */
- protected abstract createWorker (): Worker
-
/**
* Creates a new, completely set up worker node.
*
* @returns New, completely set up worker node key.
*/
protected createAndSetupWorkerNode (): number {
- const worker = this.createWorker()
-
- worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
- worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
- worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
- worker.on('error', error => {
- const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
+ const workerNode = this.createWorkerNode()
+ workerNode.registerWorkerEventHandler(
+ 'online',
+ this.opts.onlineHandler ?? EMPTY_FUNCTION
+ )
+ workerNode.registerWorkerEventHandler(
+ 'message',
+ this.opts.messageHandler ?? EMPTY_FUNCTION
+ )
+ workerNode.registerWorkerEventHandler(
+ 'error',
+ this.opts.errorHandler ?? EMPTY_FUNCTION
+ )
+ workerNode.registerWorkerEventHandler('error', (error: Error) => {
+ const workerNodeKey = this.getWorkerNodeKeyByWorker(workerNode.worker)
this.flagWorkerNodeAsNotReady(workerNodeKey)
const workerInfo = this.getWorkerInfo(workerNodeKey)
this.emitter?.emit(PoolEvents.error, error)
this.redistributeQueuedTasks(workerNodeKey)
}
})
- worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
- worker.once('exit', () => {
- this.removeWorkerNode(worker)
+ workerNode.registerWorkerEventHandler(
+ 'exit',
+ this.opts.exitHandler ?? EMPTY_FUNCTION
+ )
+ workerNode.registerOnceWorkerEventHandler('exit', () => {
+ this.removeWorkerNode(workerNode.worker)
})
-
- const workerNodeKey = this.addWorkerNode(worker)
-
+ const workerNodeKey = this.addWorkerNode(workerNode)
this.afterWorkerNodeSetup(workerNodeKey)
-
return workerNodeKey
}
}
/**
- * Adds the given worker in the pool worker nodes.
+ * Creates a worker node.
*
- * @param worker - The worker.
- * @returns The added worker node key.
- * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
+ * @returns The created worker node.
*/
- private addWorkerNode (worker: Worker): number {
+ private createWorkerNode (): IWorkerNode<Worker, Data> {
const workerNode = new WorkerNode<Worker, Data>(
- worker,
- this.opts.tasksQueueOptions?.size ?? Math.pow(this.maxSize, 2)
+ this.worker,
+ this.filePath,
+ {
+ env: this.opts.env,
+ workerOptions: this.opts.workerOptions,
+ tasksQueueBackPressureSize:
+ this.opts.tasksQueueOptions?.size ?? Math.pow(this.maxSize, 2)
+ }
)
// Flag the worker node as ready at pool startup.
if (this.starting) {
workerNode.info.ready = true
}
+ return workerNode
+ }
+
+ /**
+ * Adds the given worker node in the pool worker nodes.
+ *
+ * @param workerNode - The worker node.
+ * @returns The added worker node key.
+ * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
+ */
+ private addWorkerNode (workerNode: IWorkerNode<Worker, Data>): number {
this.workerNodes.push(workerNode)
- const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
+ const workerNodeKey = this.workerNodes.indexOf(workerNode)
if (workerNodeKey === -1) {
throw new Error('Worker added not found in worker nodes')
}
}
/**
- * Removes the given worker from the pool worker nodes.
+ * Removes the worker node associated to the give given worker from the pool worker nodes.
*
* @param worker - The worker.
*/
-import { type PoolType, PoolTypes } from '../pool'
+import { type Worker } from 'node:cluster'
+import { type PoolOptions, type PoolType, PoolTypes } from '../pool'
import { checkDynamicPoolSize } from '../utils'
-import { type ClusterPoolOptions, FixedClusterPool } from './fixed'
+import { FixedClusterPool } from './fixed'
/**
* A cluster pool with a dynamic number of workers, but a guaranteed minimum number of workers.
min: number,
protected readonly max: number,
filePath: string,
- opts: ClusterPoolOptions = {}
+ opts: PoolOptions<Worker> = {}
) {
super(min, filePath, opts)
checkDynamicPoolSize(this.numberOfWorkers, this.max)
-import cluster, { type ClusterSettings, type Worker } from 'node:cluster'
+import cluster, { type Worker } from 'node:cluster'
import type { MessageValue } from '../../utility-types'
import { AbstractPool } from '../abstract-pool'
import { type PoolOptions, type PoolType, PoolTypes } from '../pool'
import { type WorkerType, WorkerTypes } from '../worker'
-/**
- * Options for a poolifier cluster pool.
- */
-export interface ClusterPoolOptions extends PoolOptions<Worker> {
- /**
- * Key/value pairs to add to worker process environment.
- *
- * @see https://nodejs.org/api/cluster.html#cluster_cluster_fork_env
- */
- env?: Record<string, unknown>
- /**
- * Cluster settings.
- *
- * @see https://nodejs.org/api/cluster.html#cluster_cluster_settings
- */
- settings?: ClusterSettings
-}
-
/**
* A cluster pool with a fixed number of workers.
*
public constructor (
numberOfWorkers: number,
filePath: string,
- protected readonly opts: ClusterPoolOptions = {}
+ protected readonly opts: PoolOptions<Worker> = {}
) {
super(numberOfWorkers, filePath, opts)
}
this.flushTasksQueue(workerNodeKey)
// FIXME: wait for tasks to be finished
const workerNode = this.workerNodes[workerNodeKey]
- const worker = workerNode.worker
const waitWorkerExit = new Promise<void>(resolve => {
- worker.once('exit', () => {
+ workerNode.registerOnceWorkerEventHandler('exit', () => {
resolve()
})
})
- worker.once('disconnect', () => {
- worker.kill()
+ workerNode.registerOnceWorkerEventHandler('disconnect', () => {
+ workerNode.worker.kill()
})
await this.sendKillMessageToWorker(workerNodeKey)
workerNode.removeAllListeners()
- worker.disconnect()
+ workerNode.worker.disconnect()
await waitWorkerExit
}
this.workerNodes[workerNodeKey].worker.off('message', listener)
}
- /** @inheritDoc */
- protected createWorker (): Worker {
- return cluster.fork(this.opts.env)
- }
-
/** @inheritDoc */
protected get type (): PoolType {
return PoolTypes.fixed
import type { TransferListItem } from 'node:worker_threads'
import type { EventEmitterAsyncResource } from 'node:events'
+import type { ClusterSettings } from 'node:cluster'
import type { TaskFunction } from '../worker/task-functions'
import type {
ErrorHandler,
* Pool worker node tasks queue options.
*/
tasksQueueOptions?: TasksQueueOptions
+ /**
+ * Worker options.
+ *
+ * @see https://nodejs.org/api/worker_threads.html#new-workerfilename-options
+ */
+ workerOptions?: WorkerOptions
+ /**
+ * Key/value pairs to add to worker process environment.
+ *
+ * @see https://nodejs.org/api/cluster.html#cluster_cluster_fork_env
+ */
+ env?: Record<string, unknown>
+ /**
+ * Cluster settings.
+ *
+ * @see https://nodejs.org/api/cluster.html#cluster_cluster_settings
+ */
+ settings?: ClusterSettings
}
/**
-import { type PoolType, PoolTypes } from '../pool'
+import { type Worker } from 'node:worker_threads'
+import { type PoolOptions, type PoolType, PoolTypes } from '../pool'
import { checkDynamicPoolSize } from '../utils'
-import { FixedThreadPool, type ThreadPoolOptions } from './fixed'
+import { FixedThreadPool } from './fixed'
/**
* A thread pool with a dynamic number of threads, but a guaranteed minimum number of threads.
min: number,
protected readonly max: number,
filePath: string,
- opts: ThreadPoolOptions = {}
+ opts: PoolOptions<Worker> = {}
) {
super(min, filePath, opts)
checkDynamicPoolSize(this.numberOfWorkers, this.max)
import {
type MessageChannel,
type MessagePort,
- SHARE_ENV,
type TransferListItem,
- Worker,
- type WorkerOptions,
+ type Worker,
isMainThread
} from 'node:worker_threads'
import type { MessageValue } from '../../utility-types'
import { type PoolOptions, type PoolType, PoolTypes } from '../pool'
import { type WorkerType, WorkerTypes } from '../worker'
-/**
- * Options for a poolifier thread pool.
- */
-export interface ThreadPoolOptions extends PoolOptions<Worker> {
- /**
- * Worker options.
- *
- * @see https://nodejs.org/api/worker_threads.html#new-workerfilename-options
- */
- workerOptions?: WorkerOptions
-}
-
/**
* A thread pool with a fixed number of threads.
*
public constructor (
numberOfThreads: number,
filePath: string,
- protected readonly opts: ThreadPoolOptions = {}
+ protected readonly opts: PoolOptions<Worker> = {}
) {
super(numberOfThreads, filePath, opts)
}
this.flushTasksQueue(workerNodeKey)
// FIXME: wait for tasks to be finished
const workerNode = this.workerNodes[workerNodeKey]
- const worker = workerNode.worker
const waitWorkerExit = new Promise<void>(resolve => {
- worker.once('exit', () => {
+ workerNode.registerOnceWorkerEventHandler('exit', () => {
resolve()
})
})
await this.sendKillMessageToWorker(workerNodeKey)
workerNode.closeChannel()
workerNode.removeAllListeners()
- await worker.terminate()
+ await workerNode.worker.terminate()
await waitWorkerExit
}
)
}
- /** @inheritDoc */
- protected createWorker (): Worker {
- return new Worker(this.filePath, {
- env: SHARE_ENV,
- ...this.opts.workerOptions
- })
- }
-
/** @inheritDoc */
protected get type (): PoolType {
return PoolTypes.fixed
import { existsSync } from 'node:fs'
+import cluster from 'node:cluster'
+import { SHARE_ENV, Worker, type WorkerOptions } from 'node:worker_threads'
import { average, isPlainObject, max, median, min } from '../utils'
import {
type MeasurementStatisticsRequirements,
type WorkerChoiceStrategy
} from './selection-strategies/selection-strategies-types'
import type { TasksQueueOptions } from './pool'
-import type { IWorker, MeasurementStatistics } from './worker'
+import {
+ type IWorker,
+ type MeasurementStatistics,
+ type WorkerNodeOptions,
+ type WorkerType,
+ WorkerTypes
+} from './worker'
export const checkFilePath = (filePath: string): void => {
+ if (filePath == null) {
+ throw new TypeError('The worker file path must be specified')
+ }
+ if (typeof filePath !== 'string') {
+ throw new TypeError('The worker file path must be a string')
+ }
if (!existsSync(filePath)) {
throw new Error(`Cannot find the worker file '${filePath}'`)
}
}
}
-export const checkWorkerNodeArguments = <Worker extends IWorker>(
- worker: Worker,
- tasksQueueBackPressureSize: number
+export const checkWorkerNodeArguments = (
+ type: WorkerType,
+ filePath: string,
+ opts: WorkerNodeOptions
): void => {
- if (worker == null) {
- throw new TypeError('Cannot construct a worker node without a worker')
+ if (type == null) {
+ throw new TypeError('Cannot construct a worker node without a worker type')
+ }
+ if (!Object.values(WorkerTypes).includes(type)) {
+ throw new TypeError(
+ `Cannot construct a worker node with an invalid worker type '${type}'`
+ )
}
- if (tasksQueueBackPressureSize == null) {
+ checkFilePath(filePath)
+ if (opts == null) {
throw new TypeError(
- 'Cannot construct a worker node without a tasks queue back pressure size'
+ 'Cannot construct a worker node without worker node options'
)
}
- if (!Number.isSafeInteger(tasksQueueBackPressureSize)) {
+ if (opts != null && !isPlainObject(opts)) {
throw new TypeError(
- 'Cannot construct a worker node with a tasks queue back pressure size that is not an integer'
+ 'Cannot construct a worker node with invalid options: must be a plain object'
)
}
- if (tasksQueueBackPressureSize <= 0) {
+ if (opts.tasksQueueBackPressureSize == null) {
+ throw new TypeError(
+ 'Cannot construct a worker node without a tasks queue back pressure size option'
+ )
+ }
+ if (!Number.isSafeInteger(opts.tasksQueueBackPressureSize)) {
+ throw new TypeError(
+ 'Cannot construct a worker node with a tasks queue back pressure size option that is not an integer'
+ )
+ }
+ if (opts.tasksQueueBackPressureSize <= 0) {
throw new RangeError(
- 'Cannot construct a worker node with a tasks queue back pressure size that is not a positive integer'
+ 'Cannot construct a worker node with a tasks queue back pressure size option that is not a positive integer'
)
}
}
}
}
}
+
+export const createWorker = <Worker extends IWorker>(
+ type: WorkerType,
+ filePath: string,
+ opts: { env?: Record<string, unknown>, workerOptions?: WorkerOptions }
+): Worker => {
+ switch (type) {
+ case WorkerTypes.thread:
+ return new Worker(filePath, {
+ env: SHARE_ENV,
+ ...opts?.workerOptions
+ }) as unknown as Worker
+ case WorkerTypes.cluster:
+ return cluster.fork(opts?.env) as unknown as Worker
+ default:
+ // eslint-disable-next-line @typescript-eslint/restrict-template-expressions
+ throw new Error(`Unknown worker type '${type}'`)
+ }
+}
import { DEFAULT_TASK_NAME, getWorkerId, getWorkerType } from '../utils'
import { Deque } from '../deque'
import {
+ type ErrorHandler,
+ type ExitHandler,
type IWorker,
type IWorkerNode,
+ type MessageHandler,
+ type OnlineHandler,
type StrategyData,
type WorkerInfo,
+ type WorkerNodeOptions,
type WorkerType,
WorkerTypes,
type WorkerUsage
} from './worker'
-import { checkWorkerNodeArguments } from './utils'
+import { checkWorkerNodeArguments, createWorker } from './utils'
/**
* Worker node.
/**
* Constructs a new worker node.
*
- * @param worker - The worker.
- * @param tasksQueueBackPressureSize - The tasks queue back pressure size.
+ * @param type - The worker type.
+ * @param filePath - The worker file path.
+ * @param opts - The worker node options.
*/
- constructor (worker: Worker, tasksQueueBackPressureSize: number) {
+ constructor (type: WorkerType, filePath: string, opts: WorkerNodeOptions) {
super()
- checkWorkerNodeArguments<Worker>(worker, tasksQueueBackPressureSize)
- this.worker = worker
- this.info = this.initWorkerInfo(worker)
+ checkWorkerNodeArguments(type, filePath, opts)
+ this.worker = createWorker<Worker>(type, filePath, {
+ env: opts.env,
+ workerOptions: opts.workerOptions
+ })
+ this.info = this.initWorkerInfo(this.worker)
this.usage = this.initWorkerUsage()
if (this.info.type === WorkerTypes.thread) {
this.messageChannel = new MessageChannel()
}
- this.tasksQueueBackPressureSize = tasksQueueBackPressureSize
+ this.tasksQueueBackPressureSize = opts.tasksQueueBackPressureSize
this.tasksQueue = new Deque<Task<Data>>()
this.onBackPressureStarted = false
this.taskFunctionsUsage = new Map<string, WorkerUsage>()
}
}
+ /** @inheritdoc */
+ public registerWorkerEventHandler (
+ event: string,
+ listener:
+ | OnlineHandler<Worker>
+ | MessageHandler<Worker>
+ | ErrorHandler<Worker>
+ | ExitHandler<Worker>
+ ): void {
+ this.worker.on(event, listener)
+ }
+
+ /** @inheritdoc */
+ public registerOnceWorkerEventHandler (
+ event: string,
+ listener:
+ | OnlineHandler<Worker>
+ | MessageHandler<Worker>
+ | ErrorHandler<Worker>
+ | ExitHandler<Worker>
+ ): void {
+ this.worker.once(event, listener)
+ }
+
/** @inheritdoc */
public getTaskFunctionWorkerUsage (name: string): WorkerUsage | undefined {
if (!Array.isArray(this.info.taskFunctionNames)) {
* @param event - The event.
* @param handler - The event handler.
*/
- readonly on: ((event: 'online', handler: OnlineHandler<this>) => void) &
- ((event: 'message', handler: MessageHandler<this>) => void) &
- ((event: 'error', handler: ErrorHandler<this>) => void) &
- ((event: 'exit', handler: ExitHandler<this>) => void)
- /**
- * Registers a listener to the exit event that will only be performed once.
+ readonly on: (
+ event: string,
+ handler:
+ | OnlineHandler<this>
+ | MessageHandler<this>
+ | ErrorHandler<this>
+ | ExitHandler<this>
+ ) => void
+ /**
+ * Registers once an event listener.
*
- * @param event - The `'exit'` event.
- * @param handler - The exit handler.
+ * @param event - The event.
+ * @param handler - The event handler.
*/
- readonly once: (event: 'exit', handler: ExitHandler<this>) => void
+ readonly once: (
+ event: string,
+ handler:
+ | OnlineHandler<this>
+ | MessageHandler<this>
+ | ErrorHandler<this>
+ | ExitHandler<this>
+ ) => void
}
/**
- * Worker node event detail.
+ * Worker node options.
*
* @internal
*/
-export interface WorkerNodeEventDetail {
- workerId: number
- workerNodeKey?: number
+export interface WorkerNodeOptions {
+ workerOptions?: WorkerOptions
+ env?: Record<string, unknown>
+ tasksQueueBackPressureSize: number
}
/**
* Closes communication channel.
*/
readonly closeChannel: () => void
+ /**
+ * Registers a worker event handler.
+ *
+ * @param event - The event.
+ * @param listener - The event listener.
+ */
+ readonly registerWorkerEventHandler: (
+ event: string,
+ listener:
+ | OnlineHandler<Worker>
+ | MessageHandler<Worker>
+ | ErrorHandler<Worker>
+ | ExitHandler<Worker>
+ ) => void
+ /**
+ * Registers once a worker event handler.
+ *
+ * @param event - The event.
+ * @param listener - The event listener.
+ */
+ readonly registerOnceWorkerEventHandler: (
+ event: string,
+ listener:
+ | OnlineHandler<Worker>
+ | MessageHandler<Worker>
+ | ErrorHandler<Worker>
+ | ExitHandler<Worker>
+ ) => void
/**
* Gets task function worker usage statistics.
*
*/
readonly deleteTaskFunctionWorkerUsage: (name: string) => boolean
}
+
+/**
+ * Worker node event detail.
+ *
+ * @internal
+ */
+export interface WorkerNodeEventDetail {
+ workerId: number
+ workerNodeKey?: number
+}
it('Verify that filePath is checked', () => {
expect(() => new FixedThreadPool(numberOfWorkers)).toThrow(
- new Error("Cannot find the worker file 'undefined'")
+ new TypeError('The worker file path must be specified')
+ )
+ expect(() => new FixedThreadPool(numberOfWorkers, 0)).toThrow(
+ new TypeError('The worker file path must be a string')
)
expect(
() => new FixedThreadPool(numberOfWorkers, './dummyWorker.ts')
it('Validation of inputs test', () => {
expect(() => new DynamicClusterPool(min)).toThrow(
- "Cannot find the worker file 'undefined'"
+ 'The worker file path must be specified'
)
})
it('Validation of inputs test', () => {
expect(() => new DynamicThreadPool(min)).toThrow(
- "Cannot find the worker file 'undefined'"
+ 'The worker file path must be specified'
)
})
+import { Worker as ThreadWorker } from 'node:worker_threads'
+import { Worker as ClusterWorker } from 'node:cluster'
import { expect } from 'expect'
import {
CircularArray,
DEFAULT_CIRCULAR_ARRAY_SIZE
} from '../../lib/circular-array.js'
-import { updateMeasurementStatistics } from '../../lib/pools/utils.js'
+import {
+ createWorker,
+ updateMeasurementStatistics
+} from '../../lib/pools/utils.js'
+import { WorkerTypes } from '../../lib/index.js'
describe('Pool utils test suite', () => {
it('Verify updateMeasurementStatistics() behavior', () => {
)
})
})
+
+ it('Verify createWorker() behavior', () => {
+ expect(
+ createWorker(
+ WorkerTypes.thread,
+ './tests/worker-files/thread/testWorker.mjs'
+ )
+ ).toBeInstanceOf(ThreadWorker)
+ expect(
+ createWorker(
+ WorkerTypes.cluster,
+ './tests/worker-files/cluster/testWorker.mjs'
+ )
+ ).toBeInstanceOf(ClusterWorker)
+ })
})
-import { MessageChannel, Worker } from 'node:worker_threads'
-import cluster from 'node:cluster'
+import { MessageChannel } from 'node:worker_threads'
import { expect } from 'expect'
import { WorkerNode } from '../../lib/pools/worker-node.js'
import { WorkerTypes } from '../../lib/index.js'
import { DEFAULT_TASK_NAME } from '../../lib/utils.js'
describe('Worker node test suite', () => {
- const threadWorker = new Worker('./tests/worker-files/thread/testWorker.mjs')
- const clusterWorker = cluster.fork()
- const threadWorkerNode = new WorkerNode(threadWorker, 12)
- const clusterWorkerNode = new WorkerNode(clusterWorker, 12)
+ const threadWorkerNode = new WorkerNode(
+ WorkerTypes.thread,
+ './tests/worker-files/thread/testWorker.mjs',
+ { tasksQueueBackPressureSize: 12 }
+ )
+ const clusterWorkerNode = new WorkerNode(
+ WorkerTypes.cluster,
+ './tests/worker-files/cluster/testWorker.js',
+ { tasksQueueBackPressureSize: 12 }
+ )
it('Worker node instantiation', () => {
expect(() => new WorkerNode()).toThrow(
- new TypeError('Cannot construct a worker node without a worker')
+ new TypeError('Cannot construct a worker node without a worker type')
)
- expect(() => new WorkerNode(threadWorker)).toThrow(
+ expect(
+ () =>
+ new WorkerNode(
+ 'invalidWorkerType',
+ './tests/worker-files/thread/testWorker.mjs',
+ { tasksQueueBackPressureSize: 12 }
+ )
+ ).toThrow(
+ new TypeError(
+ "Cannot construct a worker node with an invalid worker type 'invalidWorkerType'"
+ )
+ )
+ expect(
+ () =>
+ new WorkerNode(
+ WorkerTypes.thread,
+ './tests/worker-files/thread/testWorker.mjs'
+ )
+ ).toThrow(
new TypeError(
- 'Cannot construct a worker node without a tasks queue back pressure size'
+ 'Cannot construct a worker node without worker node options'
)
)
expect(
- () => new WorkerNode(threadWorker, 'invalidTasksQueueBackPressureSize')
+ () =>
+ new WorkerNode(
+ WorkerTypes.thread,
+ './tests/worker-files/thread/testWorker.mjs',
+ ''
+ )
).toThrow(
new TypeError(
- 'Cannot construct a worker node with a tasks queue back pressure size that is not an integer'
+ 'Cannot construct a worker node with invalid options: must be a plain object'
)
)
- expect(() => new WorkerNode(threadWorker, 0.2)).toThrow(
+ expect(
+ () =>
+ new WorkerNode(
+ WorkerTypes.thread,
+ './tests/worker-files/thread/testWorker.mjs',
+ {}
+ )
+ ).toThrow(
new TypeError(
- 'Cannot construct a worker node with a tasks queue back pressure size that is not an integer'
+ 'Cannot construct a worker node without a tasks queue back pressure size option'
)
)
- expect(() => new WorkerNode(threadWorker, 0)).toThrow(
+ expect(
+ () =>
+ new WorkerNode(
+ WorkerTypes.thread,
+ './tests/worker-files/thread/testWorker.mjs',
+ { tasksQueueBackPressureSize: 'invalidTasksQueueBackPressureSize' }
+ )
+ ).toThrow(
+ new TypeError(
+ 'Cannot construct a worker node with a tasks queue back pressure size option that is not an integer'
+ )
+ )
+ expect(
+ () =>
+ new WorkerNode(
+ WorkerTypes.thread,
+ './tests/worker-files/thread/testWorker.mjs',
+ { tasksQueueBackPressureSize: 0.2 }
+ )
+ ).toThrow(
+ new TypeError(
+ 'Cannot construct a worker node with a tasks queue back pressure size option that is not an integer'
+ )
+ )
+ expect(
+ () =>
+ new WorkerNode(
+ WorkerTypes.thread,
+ './tests/worker-files/thread/testWorker.mjs',
+ { tasksQueueBackPressureSize: 0 }
+ )
+ ).toThrow(
new RangeError(
- 'Cannot construct a worker node with a tasks queue back pressure size that is not a positive integer'
+ 'Cannot construct a worker node with a tasks queue back pressure size option that is not a positive integer'
)
)
- expect(() => new WorkerNode(threadWorker, -1)).toThrow(
+ expect(
+ () =>
+ new WorkerNode(
+ WorkerTypes.thread,
+ './tests/worker-files/thread/testWorker.mjs',
+ { tasksQueueBackPressureSize: -1 }
+ )
+ ).toThrow(
new RangeError(
- 'Cannot construct a worker node with a tasks queue back pressure size that is not a positive integer'
+ 'Cannot construct a worker node with a tasks queue back pressure size option that is not a positive integer'
)
)
expect(threadWorkerNode).toBeInstanceOf(WorkerNode)
- expect(threadWorkerNode.worker).toBe(threadWorker)
expect(threadWorkerNode.info).toStrictEqual({
- id: threadWorker.threadId,
+ id: threadWorkerNode.worker.threadId,
type: WorkerTypes.thread,
dynamic: false,
ready: false
expect(threadWorkerNode.taskFunctionsUsage).toBeInstanceOf(Map)
expect(clusterWorkerNode).toBeInstanceOf(WorkerNode)
- expect(clusterWorkerNode.worker).toBe(clusterWorker)
expect(clusterWorkerNode.info).toStrictEqual({
- id: clusterWorker.id,
+ id: clusterWorkerNode.worker.id,
type: WorkerTypes.cluster,
dynamic: false,
ready: false