const Benchmark = require('benchmark')
+const { dynamicClusterTest } = require('./cluster/dynamic')
+const { fixedClusterTest } = require('./cluster/fixed')
+const { dynamicThreadTest } = require('./thread/dynamic')
+const { fixedThreadTest } = require('./thread/fixed')
+
const suite = new Benchmark.Suite()
-const { FixedThreadPool } = require('../lib/index')
-const { DynamicThreadPool } = require('../lib/index')
-const size = 30
-const tasks = 1
const LIST_FORMATTER = new Intl.ListFormat('en-US', {
style: 'long',
type: 'conjunction'
})
-// pools
-const fixedPool = new FixedThreadPool(size, './threadWorker.js', {
- maxTasks: 10000
-})
-const dynamicPool = new DynamicThreadPool(
- size / 2,
- size * 3,
- './threadWorker.js',
- { maxTasks: 10000 }
-)
-const workerData = { proof: 'ok' }
-
// wait some seconds before start, my pools need to load threads !!!
setTimeout(async () => {
test()
}, 3000)
-// fixed pool proof
-async function fixedTest () {
- return new Promise((resolve, reject) => {
- let executions = 0
- for (let i = 0; i <= tasks; i++) {
- fixedPool
- .execute(workerData)
- .then(res => {
- executions++
- if (executions === tasks) {
- return resolve('FINISH')
- }
- return null
- })
- .catch(err => {
- console.error(err)
- })
- }
- })
-}
-
-async function dynamicTest () {
- return new Promise((resolve, reject) => {
- let executions = 0
- for (let i = 0; i <= tasks; i++) {
- dynamicPool
- .execute(workerData)
- .then(res => {
- executions++
- if (executions === tasks) {
- return resolve('FINISH')
- }
- return null
- })
- .catch(err => console.error(err))
- }
- })
-}
-
async function test () {
// add tests
suite
- .add('PioardiStaticPool', async function () {
- await fixedTest()
+ .add('Pioardi:Static:ThreadPool', async function () {
+ await fixedThreadTest()
+ })
+ .add('Pioardi:Dynamic:ThreadPool', async function () {
+ await dynamicThreadTest()
+ })
+ .add('Pioardi:Static:ClusterPool', async function () {
+ await fixedClusterTest()
})
- .add('PioardiDynamicPool', async function () {
- await dynamicTest()
+ .add('Pioardi:Dynamic:ClusterPool', async function () {
+ await dynamicClusterTest()
})
// add listeners
.on('cycle', function (event) {
--- /dev/null
+const { DynamicClusterPool } = require('../../lib/index')
+
+const size = 30
+
+const dynamicPool = new DynamicClusterPool(
+ size / 2,
+ size * 3,
+ './benchmarks/cluster/worker.js',
+ {
+ maxTasks: 10000
+ }
+)
+
+async function dynamicClusterTest (
+ { tasks, workerData } = { tasks: 1, workerData: { proof: 'ok' } }
+) {
+ return new Promise((resolve, reject) => {
+ let executions = 0
+ for (let i = 0; i <= tasks; i++) {
+ dynamicPool
+ .execute(workerData)
+ .then(res => {
+ executions++
+ if (executions === tasks) {
+ return resolve('FINISH')
+ }
+ return null
+ })
+ .catch(err => console.error(err))
+ }
+ })
+}
+
+module.exports = { dynamicClusterTest }
--- /dev/null
+const { FixedClusterPool } = require('../../lib/index')
+
+const size = 30
+
+const fixedPool = new FixedClusterPool(size, './benchmarks/cluster/worker.js', {
+ maxTasks: 10000
+})
+
+async function fixedClusterTest (
+ { tasks, workerData } = { tasks: 1, workerData: { proof: 'ok' } }
+) {
+ return new Promise((resolve, reject) => {
+ let executions = 0
+ for (let i = 0; i <= tasks; i++) {
+ fixedPool
+ .execute(workerData)
+ .then(res => {
+ executions++
+ if (executions === tasks) {
+ return resolve('FINISH')
+ }
+ return null
+ })
+ .catch(err => {
+ console.error(err)
+ })
+ }
+ })
+}
+
+module.exports = { fixedClusterTest }
--- /dev/null
+'use strict'
+const { ClusterWorker } = require('../../lib/index')
+
+function yourFunction (data) {
+ for (let i = 0; i <= 1000; i++) {
+ const o = {
+ a: i
+ }
+ JSON.stringify(o)
+ }
+ // console.log('This is the main thread ' + isMainThread)
+ return { ok: 1 }
+}
+
+module.exports = new ClusterWorker(yourFunction)
// pools
const workerThreadsPool = new WorkerThreadsPool({ max: size })
-const workerPool = workerpool.pool('./workerpoolWorker.js', {
+const workerPool = workerpool.pool('./external/workerpoolWorker.js', {
minWorkers: size / 2,
maxWorkers: size * 3,
workerType: 'thread'
})
-const fixedPool = new FixedThreadPool(size, './threadWorker.js', {
+const fixedPool = new FixedThreadPool(size, './thread/worker.js', {
maxTasks: 10000
})
const dynamicPool = new DynamicThreadPool(
size / 2,
size * 3,
- './threadWorker.js',
+ './thread/worker.js',
{ maxTasks: 10000 }
)
for (let i = 0; i <= tasks; i++) {
new Promise((resolve, reject) => {
workerThreadsPool.acquire(
- './workerThreadsWorker.js',
+ './external/workerThreadsWorker.js',
{ workerData: workerData },
(err, worker) => {
if (err) {
--- /dev/null
+const { DynamicThreadPool } = require('../../lib/index')
+
+const size = 30
+
+const dynamicPool = new DynamicThreadPool(size / 2, size * 3, './worker.js', {
+ maxTasks: 10000
+})
+
+async function dynamicThreadTest (
+ { tasks, workerData } = { tasks: 1, workerData: { proof: 'ok' } }
+) {
+ return new Promise((resolve, reject) => {
+ let executions = 0
+ for (let i = 0; i <= tasks; i++) {
+ dynamicPool
+ .execute(workerData)
+ .then(res => {
+ executions++
+ if (executions === tasks) {
+ return resolve('FINISH')
+ }
+ return null
+ })
+ .catch(err => console.error(err))
+ }
+ })
+}
+
+module.exports = { dynamicThreadTest }
--- /dev/null
+const { FixedThreadPool } = require('../../lib/index')
+
+const size = 30
+
+const fixedPool = new FixedThreadPool(size, './worker.js', {
+ maxTasks: 10000
+})
+
+async function fixedThreadTest (
+ { tasks, workerData } = { tasks: 1, workerData: { proof: 'ok' } }
+) {
+ return new Promise((resolve, reject) => {
+ let executions = 0
+ for (let i = 0; i <= tasks; i++) {
+ fixedPool
+ .execute(workerData)
+ .then(res => {
+ executions++
+ if (executions === tasks) {
+ return resolve('FINISH')
+ }
+ return null
+ })
+ .catch(err => {
+ console.error(err)
+ })
+ }
+ })
+}
+
+module.exports = { fixedThreadTest }
'use strict'
-const { ThreadWorker } = require('../lib/index')
+const { ThreadWorker } = require('../../lib/index')
function yourFunction (data) {
for (let i = 0; i <= 1000; i++) {
}
},
"graceful-fs": {
- "version": "4.2.6",
- "resolved": "https://registry.npmjs.org/graceful-fs/-/graceful-fs-4.2.6.tgz",
- "integrity": "sha512-nTnJ528pbqxYanhpDYsi4Rd8MAeaBA67+RZ10CM1m3bTAVFEDcd5AuA4a6W5YkGZ1iNXHzZz8T6TBKLeBuNriQ==",
+ "version": "4.2.5",
+ "resolved": "https://registry.npmjs.org/graceful-fs/-/graceful-fs-4.2.5.tgz",
+ "integrity": "sha512-kBBSQbz2K0Nyn+31j/w36fUfxkBW9/gfwRWdUY1ULReH3iokVJgddZAFcD1D0xlgTmFxJCbUkUclAlc6/IDJkw==",
"dev": true
},
"graphql": {
"integrity": "sha512-N5ZAX4/LxJmF+7wN74pUD6qAh9/wnvdQcjq9TZjevvXzSUo7bfmw91saqMjzGS2xq91/odN2dW/WOl7qQHNDGA==",
"dev": true
},
- "queue-microtask": {
- "version": "1.2.2",
- "resolved": "https://registry.npmjs.org/queue-microtask/-/queue-microtask-1.2.2.tgz",
- "integrity": "sha512-dB15eXv3p2jDlbOiNLyMabYg1/sXvppd8DP2J3EOCQ0AkuSXCW2tP7mnVouVLJKgUMY6yP0kcQDVpLCN13h4Xg==",
- "dev": true
- },
"randombytes": {
"version": "2.1.0",
"resolved": "https://registry.npmjs.org/randombytes/-/randombytes-2.1.0.tgz",
}
},
"run-parallel": {
- "version": "1.2.0",
- "resolved": "https://registry.npmjs.org/run-parallel/-/run-parallel-1.2.0.tgz",
- "integrity": "sha512-5l4VyZR86LZ/lDxZTR6jqL8AFE2S0IFLMP26AbjsLVADxHdhB/c0GUsH+y39UfCi3dzz8OlQuPmnaJOMoDHQBA==",
- "dev": true,
- "requires": {
- "queue-microtask": "^1.2.2"
- }
+ "version": "1.1.10",
+ "resolved": "https://registry.npmjs.org/run-parallel/-/run-parallel-1.1.10.tgz",
+ "integrity": "sha512-zb/1OuZ6flOlH6tQyMPUrE3x3Ulxjlo9WIVXR4yVYi4H9UXQaeIsPbLn2R3O3vQCnDKkAl2qHiuocKKX4Tz/Sw==",
+ "dev": true
},
"safe-buffer": {
"version": "5.2.1",
-import { DynamicThreadPool } from './dynamic'
-import { FixedThreadPool } from './fixed'
-import { ThreadWorker } from './workers'
+import { DynamicClusterPool } from './pools/cluster/dynamic'
+import { FixedClusterPool } from './pools/cluster/fixed'
+import { DynamicThreadPool } from './pools/thread/dynamic'
+import { FixedThreadPool } from './pools/thread/fixed'
+import { ClusterWorker } from './worker/cluster-worker'
+import { ThreadWorker } from './worker/thread-worker'
-export { DynamicThreadPoolOptions } from './dynamic'
-export {
- Draft,
+export type { DynamicClusterPoolOptions } from './pools/cluster/dynamic'
+export type {
+ FixedClusterPoolOptions,
+ WorkerWithMessageChannel as ClusterWorkerWithMessageChannel
+} from './pools/cluster/fixed'
+export type { DynamicThreadPoolOptions } from './pools/thread/dynamic'
+export type {
FixedThreadPoolOptions,
- WorkerWithMessageChannel
-} from './fixed'
-export { ThreadWorkerOptions } from './workers'
-export { FixedThreadPool, DynamicThreadPool, ThreadWorker }
+ WorkerWithMessageChannel as ThreadWorkerWithMessageChannel
+} from './pools/thread/fixed'
+export type { WorkerOptions } from './worker/worker-options'
+export {
+ FixedThreadPool,
+ FixedClusterPool,
+ DynamicClusterPool,
+ DynamicThreadPool,
+ ThreadWorker,
+ ClusterWorker
+}
--- /dev/null
+import { EventEmitter } from 'events'
+import type { FixedClusterPoolOptions, WorkerWithMessageChannel } from './fixed'
+import { FixedClusterPool } from './fixed'
+
+class MyEmitter extends EventEmitter {}
+
+export type DynamicClusterPoolOptions = FixedClusterPoolOptions
+
+/**
+ * A cluster pool with a min/max number of workers, is possible to execute tasks in sync or async mode as you prefer.
+ *
+ * This cluster pool will create new workers when the other ones are busy, until the max number of workers,
+ * when the max number of workers is reached, an event will be emitted, if you want to listen this event use the emitter method.
+ *
+ * @author [Christopher Quadflieg](https://github.com/Shinigami92)
+ * @since 2.0.0
+ */
+export class DynamicClusterPool<
+ // eslint-disable-next-line @typescript-eslint/no-explicit-any
+ Data = any,
+ // eslint-disable-next-line @typescript-eslint/no-explicit-any
+ Response = any
+> extends FixedClusterPool<Data, Response> {
+ public readonly emitter: MyEmitter
+
+ /**
+ * @param min Min number of workers that will be always active
+ * @param max Max number of workers that will be active
+ * @param filename A file path with implementation of `ClusterWorker` class, relative path is fine.
+ * @param opts An object with possible options for example `errorHandler`, `onlineHandler`. Default: `{ maxTasks: 1000 }`
+ */
+ public constructor (
+ public readonly min: number,
+ public readonly max: number,
+ public readonly filename: string,
+ public readonly opts: DynamicClusterPoolOptions = { maxTasks: 1000 }
+ ) {
+ super(min, filename, opts)
+
+ this.emitter = new MyEmitter()
+ }
+
+ protected chooseWorker (): WorkerWithMessageChannel {
+ let worker: WorkerWithMessageChannel | undefined
+ for (const entry of this.tasks) {
+ if (entry[1] === 0) {
+ worker = entry[0]
+ break
+ }
+ }
+
+ if (worker) {
+ // a worker is free, use it
+ return worker
+ } else {
+ if (this.workers.length === this.max) {
+ this.emitter.emit('FullPool')
+ return super.chooseWorker()
+ }
+ // all workers are busy create a new worker
+ const worker = this.newWorker()
+ worker.on('message', (message: { kill?: number }) => {
+ if (message.kill) {
+ worker.send({ kill: 1 })
+ worker.kill()
+ // clean workers from data structures
+ const workerIndex = this.workers.indexOf(worker)
+ this.workers.splice(workerIndex, 1)
+ this.tasks.delete(worker)
+ }
+ })
+ return worker
+ }
+ }
+}
--- /dev/null
+import type { SendHandle } from 'child_process'
+import { fork, isMaster, setupMaster, Worker } from 'cluster'
+import type { MessageValue } from '../../utility-types'
+
+export type WorkerWithMessageChannel = Worker // & Draft<MessageChannel>
+
+export interface FixedClusterPoolOptions {
+ /**
+ * A function that will listen for error event on each worker.
+ */
+ errorHandler?: (this: Worker, e: Error) => void
+ /**
+ * A function that will listen for online event on each worker.
+ */
+ onlineHandler?: (this: Worker) => void
+ /**
+ * A function that will listen for exit event on each worker.
+ */
+ exitHandler?: (this: Worker, code: number) => void
+ /**
+ * This is just to avoid not useful warnings message, is used to set `maxListeners` on event emitters (workers are event emitters).
+ *
+ * @default 1000
+ */
+ maxTasks?: number
+ /**
+ * Key/value pairs to add to worker process environment.
+ *
+ * @see https://nodejs.org/api/cluster.html#cluster_cluster_fork_env
+ */
+ // eslint-disable-next-line @typescript-eslint/no-explicit-any
+ env?: any
+}
+
+/**
+ * A cluster pool with a static number of workers, is possible to execute tasks in sync or async mode as you prefer.
+ *
+ * This pool will select the worker in a round robin fashion.
+ *
+ * @author [Christopher Quadflieg](https://github.com/Shinigami92)
+ * @since 2.0.0
+ */
+// eslint-disable-next-line @typescript-eslint/no-explicit-any
+export class FixedClusterPool<Data = any, Response = any> {
+ public readonly workers: WorkerWithMessageChannel[] = []
+ public nextWorker: number = 0
+
+ // workerId as key and an integer value
+ public readonly tasks: Map<WorkerWithMessageChannel, number> = new Map<
+ WorkerWithMessageChannel,
+ number
+ >()
+
+ protected id: number = 0
+
+ /**
+ * @param numWorkers Number of workers for this pool.
+ * @param filePath A file path with implementation of `ClusterWorker` class, relative path is fine.
+ * @param opts An object with possible options for example `errorHandler`, `onlineHandler`. Default: `{ maxTasks: 1000 }`
+ */
+ public constructor (
+ public readonly numWorkers: number,
+ public readonly filePath: string,
+ public readonly opts: FixedClusterPoolOptions = { maxTasks: 1000 }
+ ) {
+ if (!isMaster) {
+ throw new Error('Cannot start a cluster pool from a worker!')
+ }
+ // TODO christopher 2021-02-09: Improve this check e.g. with a pattern or blank check
+ if (!this.filePath) {
+ throw new Error('Please specify a file with a worker implementation')
+ }
+
+ setupMaster({
+ exec: this.filePath
+ })
+
+ for (let i = 1; i <= this.numWorkers; i++) {
+ this.newWorker()
+ }
+ }
+
+ public destroy (): void {
+ for (const worker of this.workers) {
+ worker.kill()
+ }
+ }
+
+ /**
+ * Execute the task specified into the constructor with the data parameter.
+ *
+ * @param data The input for the task specified.
+ * @returns Promise that is resolved when the task is done.
+ */
+ public execute (data: Data): Promise<Response> {
+ // configure worker to handle message with the specified task
+ const worker: WorkerWithMessageChannel = this.chooseWorker()
+ // console.log('FixedClusterPool#execute choosen worker:', worker)
+ const previousWorkerIndex = this.tasks.get(worker)
+ if (previousWorkerIndex !== undefined) {
+ this.tasks.set(worker, previousWorkerIndex + 1)
+ } else {
+ throw Error('Worker could not be found in tasks map')
+ }
+ const id: number = ++this.id
+ const res: Promise<Response> = this.internalExecute(worker, id)
+ // console.log('FixedClusterPool#execute send data to worker:', worker)
+ worker.send({ data: data || {}, id: id })
+ return res
+ }
+
+ protected internalExecute (
+ worker: WorkerWithMessageChannel,
+ id: number
+ ): Promise<Response> {
+ return new Promise((resolve, reject) => {
+ const listener: (
+ message: MessageValue<Response>,
+ handle: SendHandle
+ ) => void = message => {
+ // console.log('FixedClusterPool#internalExecute listener:', message)
+ if (message.id === id) {
+ worker.removeListener('message', listener)
+ const previousWorkerIndex = this.tasks.get(worker)
+ if (previousWorkerIndex !== undefined) {
+ this.tasks.set(worker, previousWorkerIndex + 1)
+ } else {
+ throw Error('Worker could not be found in tasks map')
+ }
+ if (message.error) reject(message.error)
+ else resolve(message.data as Response)
+ }
+ }
+ worker.on('message', listener)
+ })
+ }
+
+ protected chooseWorker (): WorkerWithMessageChannel {
+ if (this.workers.length - 1 === this.nextWorker) {
+ this.nextWorker = 0
+ return this.workers[this.nextWorker]
+ } else {
+ this.nextWorker++
+ return this.workers[this.nextWorker]
+ }
+ }
+
+ protected newWorker (): WorkerWithMessageChannel {
+ const worker: WorkerWithMessageChannel = fork(this.opts.env)
+ worker.on('error', this.opts.errorHandler ?? (() => {}))
+ worker.on('online', this.opts.onlineHandler ?? (() => {}))
+ // TODO handle properly when a worker exit
+ worker.on('exit', this.opts.exitHandler ?? (() => {}))
+ this.workers.push(worker)
+ // we will attach a listener for every task,
+ // when task is completed the listener will be removed but to avoid warnings we are increasing the max listeners size
+ worker.setMaxListeners(this.opts.maxTasks ?? 1000)
+ // init tasks map
+ this.tasks.set(worker, 0)
+ return worker
+ }
+}
import { EventEmitter } from 'events'
-import {
- FixedThreadPool,
- FixedThreadPoolOptions,
- WorkerWithMessageChannel
-} from './fixed'
+import type { FixedThreadPoolOptions, WorkerWithMessageChannel } from './fixed'
+import { FixedThreadPool } from './fixed'
class MyEmitter extends EventEmitter {}
* @author [Alessandro Pio Ardizio](https://github.com/pioardi)
* @since 0.0.1
*/
-/* eslint-disable @typescript-eslint/no-explicit-any */
export class DynamicThreadPool<
+ // eslint-disable-next-line @typescript-eslint/no-explicit-any
Data = any,
+ // eslint-disable-next-line @typescript-eslint/no-explicit-any
Response = any
> extends FixedThreadPool<Data, Response> {
- /* eslint-enable @typescript-eslint/no-explicit-any */
public readonly emitter: MyEmitter
/**
import { isMainThread, MessageChannel, SHARE_ENV, Worker } from 'worker_threads'
-
-export type Draft<T> = { -readonly [P in keyof T]?: T[P] }
+import type { Draft, MessageValue } from '../../utility-types'
export type WorkerWithMessageChannel = Worker & Draft<MessageChannel>
id: number
): Promise<Response> {
return new Promise((resolve, reject) => {
- const listener = (message: {
- id: number
- error?: string
- data: Response
- }): void => {
+ const listener: (message: MessageValue<Response>) => void = message => {
if (message.id === id) {
worker.port2?.removeListener('message', listener)
const previousWorkerIndex = this.tasks.get(worker)
throw Error('Worker could not be found in tasks map')
}
if (message.error) reject(message.error)
- else resolve(message.data)
+ else resolve(message.data as Response)
}
}
worker.port2?.on('message', listener)
--- /dev/null
+export type Draft<T> = { -readonly [P in keyof T]?: T[P] }
+
+export interface MessageValue<Data> {
+ readonly data?: Data
+ readonly id?: number
+ readonly kill?: number
+ readonly error?: string
+ readonly parent?: MessagePort
+}
--- /dev/null
+import { AsyncResource } from 'async_hooks'
+import { isMaster, worker } from 'cluster'
+import type { MessageValue } from '../utility-types'
+import type { WorkerOptions } from './worker-options'
+
+/**
+ * An example worker that will be always alive, you just need to **extend** this class if you want a static pool.
+ *
+ * When this worker is inactive for more than 1 minute, it will send this info to the main worker,
+ * if you are using DynamicClusterPool, the workers created after will be killed, the min num of worker will be guaranteed.
+ *
+ * @author [Christopher Quadflieg](https://github.com/Shinigami92)
+ * @since 2.0.0
+ */
+// eslint-disable-next-line @typescript-eslint/no-explicit-any
+export class ClusterWorker<Data = any, Response = any> extends AsyncResource {
+ protected readonly maxInactiveTime: number
+ protected readonly async: boolean
+ protected lastTask: number
+ protected readonly interval?: NodeJS.Timeout
+
+ public constructor (
+ fn: (data: Data) => Response,
+ public readonly opts: WorkerOptions = {}
+ ) {
+ super('worker-cluster-pool:pioardi')
+
+ this.maxInactiveTime = this.opts.maxInactiveTime ?? 1000 * 60
+ this.async = !!this.opts.async
+ this.lastTask = Date.now()
+ if (!fn) throw new Error('Fn parameter is mandatory')
+ // keep the worker active
+ if (!isMaster) {
+ // console.log('ClusterWorker#constructor', 'is not master')
+ this.interval = setInterval(
+ this.checkAlive.bind(this),
+ this.maxInactiveTime / 2
+ )
+ this.checkAlive.bind(this)()
+ }
+ worker.on('message', (value: MessageValue<Data>) => {
+ // console.log("cluster.on('message', value)", value)
+ if (value?.data && value.id) {
+ // here you will receive messages
+ // console.log('This is the main worker ' + isMaster)
+ if (this.async) {
+ this.runInAsyncScope(this.runAsync.bind(this), this, fn, value)
+ } else {
+ this.runInAsyncScope(this.run.bind(this), this, fn, value)
+ }
+ } else if (value.kill) {
+ // here is time to kill this worker, just clearing the interval
+ if (this.interval) clearInterval(this.interval)
+ this.emitDestroy()
+ }
+ })
+ }
+
+ protected checkAlive (): void {
+ if (Date.now() - this.lastTask > this.maxInactiveTime) {
+ worker.send({ kill: 1 })
+ }
+ }
+
+ protected run (
+ fn: (data?: Data) => Response,
+ value: MessageValue<Data>
+ ): void {
+ try {
+ const res = fn(value.data as Data)
+ worker.send({ data: res, id: value.id })
+ this.lastTask = Date.now()
+ } catch (e) {
+ const err = e instanceof Error ? e.message : e
+ worker.send({ error: err, id: value.id })
+ this.lastTask = Date.now()
+ }
+ }
+
+ protected runAsync (
+ fn: (data?: Data) => Promise<Response>,
+ value: MessageValue<Data>
+ ): void {
+ fn(value.data)
+ .then(res => {
+ worker.send({ data: res, id: value.id })
+ this.lastTask = Date.now()
+ return null
+ })
+ .catch(e => {
+ const err = e instanceof Error ? e.message : e
+ worker.send({ error: err, id: value.id })
+ this.lastTask = Date.now()
+ })
+ }
+}
import { AsyncResource } from 'async_hooks'
import { isMainThread, parentPort } from 'worker_threads'
-
-export interface ThreadWorkerOptions {
- /**
- * Max time to wait tasks to work on (in ms), after this period the new worker threads will die.
- *
- * @default 60.000 ms
- */
- maxInactiveTime?: number
- /**
- * `true` if your function contains async pieces, else `false`.
- *
- * @default false
- */
- async?: boolean
-}
+import type { MessageValue } from '../utility-types'
+import type { WorkerOptions } from './worker-options'
/**
* An example worker that will be always alive, you just need to **extend** this class if you want a static pool.
public constructor (
fn: (data: Data) => Response,
- public readonly opts: ThreadWorkerOptions = {}
+ public readonly opts: WorkerOptions = {}
) {
super('worker-thread-pool:pioardi')
)
this.checkAlive.bind(this)()
}
- parentPort?.on(
- 'message',
- (value: {
- data?: Response
- id?: number
- parent?: MessagePort
- kill?: number
- }) => {
- if (value?.data && value.id) {
- // here you will receive messages
- // console.log('This is the main thread ' + isMainThread)
- if (this.async) {
- this.runInAsyncScope(this.runAsync.bind(this), this, fn, value)
- } else {
- this.runInAsyncScope(this.run.bind(this), this, fn, value)
- }
- } else if (value.parent) {
- // save the port to communicate with the main thread
- // this will be received once
- this.parent = value.parent
- } else if (value.kill) {
- // here is time to kill this thread, just clearing the interval
- if (this.interval) clearInterval(this.interval)
- this.emitDestroy()
+ parentPort?.on('message', (value: MessageValue<Data>) => {
+ if (value?.data && value.id) {
+ // here you will receive messages
+ // console.log('This is the main thread ' + isMainThread)
+ if (this.async) {
+ this.runInAsyncScope(this.runAsync.bind(this), this, fn, value)
+ } else {
+ this.runInAsyncScope(this.run.bind(this), this, fn, value)
}
+ } else if (value.parent) {
+ // save the port to communicate with the main thread
+ // this will be received once
+ this.parent = value.parent
+ } else if (value.kill) {
+ // here is time to kill this thread, just clearing the interval
+ if (this.interval) clearInterval(this.interval)
+ this.emitDestroy()
}
- )
+ })
}
protected checkAlive (): void {
}
protected run (
- fn: (data: Data) => Response,
- value: { readonly data: Data; readonly id: number }
+ fn: (data?: Data) => Response,
+ value: MessageValue<Data>
): void {
try {
const res = fn(value.data)
}
protected runAsync (
- fn: (data: Data) => Promise<Response>,
- value: { readonly data: Data; readonly id: number }
+ fn: (data?: Data) => Promise<Response>,
+ value: MessageValue<Data>
): void {
fn(value.data)
.then(res => {
--- /dev/null
+export interface WorkerOptions {
+ /**
+ * Max time to wait tasks to work on (in ms), after this period the new worker threads will die.
+ *
+ * @default 60.000 ms
+ */
+ maxInactiveTime?: number
+ /**
+ * `true` if your function contains async pieces, else `false`.
+ *
+ * @default false
+ */
+ async?: boolean
+}
--- /dev/null
+const expect = require('expect')
+const { DynamicClusterPool } = require('../../../lib/index')
+const min = 1
+const max = 3
+const pool = new DynamicClusterPool(
+ min,
+ max,
+ './tests/worker/cluster/testWorker.js',
+ {
+ errorHandler: e => console.error(e),
+ onlineHandler: () => console.log('worker is online')
+ }
+)
+
+describe('Dynamic cluster pool test suite ', () => {
+ it('Verify that the function is executed in a worker cluster', async () => {
+ const result = await pool.execute({ test: 'test' })
+ expect(result).toBeDefined()
+ expect(result).toBeFalsy()
+ })
+
+ it('Verify that new workers are created when required, max size is not exceeded and that after a while new workers will die', async () => {
+ const promises = []
+ let closedWorkers = 0
+ let fullPool = 0
+ pool.emitter.on('FullPool', () => fullPool++)
+ for (let i = 0; i < max * 2; i++) {
+ promises.push(pool.execute({ test: 'test' }))
+ }
+ expect(pool.workers.length).toBeLessThanOrEqual(max)
+ expect(pool.workers.length).toBeGreaterThan(min)
+ pool.workers.forEach(w => {
+ w.on('exit', () => {
+ closedWorkers++
+ })
+ })
+ expect(fullPool > 1).toBeTruthy()
+ await new Promise(resolve => setTimeout(resolve, 5000))
+ expect(closedWorkers).toBe(max - min)
+ })
+
+ it('Verify scale worker up and down is working', async () => {
+ expect(pool.workers.length).toBe(min)
+ for (let i = 0; i < max * 10; i++) {
+ pool.execute({ test: 'test' })
+ }
+ expect(pool.workers.length).toBeGreaterThan(min)
+ await new Promise(resolve => setTimeout(resolve, 3000))
+ expect(pool.workers.length).toBe(min)
+ for (let i = 0; i < max * 10; i++) {
+ pool.execute({ test: 'test' })
+ }
+ expect(pool.workers.length).toBeGreaterThan(min)
+ await new Promise(resolve => setTimeout(resolve, 2000))
+ expect(pool.workers.length).toBe(min)
+ })
+ it('Shutdown test', async () => {
+ let closedWorkers = 0
+ pool.workers.forEach(w => {
+ w.on('exit', () => {
+ closedWorkers++
+ })
+ })
+ pool.destroy()
+ await new Promise(resolve => setTimeout(resolve, 1000))
+ expect(closedWorkers).toBe(min)
+ })
+
+ it('Validations test', () => {
+ let error
+ try {
+ const pool1 = new DynamicClusterPool()
+ console.log(pool1)
+ } catch (e) {
+ error = e
+ }
+ expect(error).toBeTruthy()
+ expect(error.message).toBeTruthy()
+ })
+
+ it('Should work even without opts in input', async () => {
+ const pool1 = new DynamicClusterPool(
+ 1,
+ 1,
+ './tests/worker/cluster/testWorker.js'
+ )
+ const res = await pool1.execute({ test: 'test' })
+ expect(res).toBeFalsy()
+ })
+})
--- /dev/null
+const expect = require('expect')
+const { FixedClusterPool } = require('../../../lib/index')
+const numWorkers = 10
+const pool = new FixedClusterPool(
+ numWorkers,
+ './tests/worker/cluster/testWorker.js',
+ {
+ errorHandler: e => console.error(e),
+ onlineHandler: () => console.log('worker is online')
+ }
+)
+const emptyPool = new FixedClusterPool(
+ 1,
+ './tests/worker/cluster/emptyWorker.js'
+)
+const echoPool = new FixedClusterPool(1, './tests/worker/cluster/echoWorker.js')
+const errorPool = new FixedClusterPool(
+ 1,
+ './tests/worker/cluster/errorWorker.js',
+ {
+ errorHandler: e => console.error(e),
+ onlineHandler: () => console.log('worker is online')
+ }
+)
+
+const asyncErrorPool = new FixedClusterPool(
+ 1,
+ './tests/worker/cluster/asyncErrorWorker.js',
+ {
+ errorHandler: e => console.error(e),
+ onlineHandler: () => console.log('worker is online')
+ }
+)
+const asyncPool = new FixedClusterPool(
+ 1,
+ './tests/worker/cluster/asyncWorker.js'
+)
+
+describe('Fixed cluster pool test suite ', () => {
+ it('Choose worker round robin test', async () => {
+ const results = new Set()
+ for (let i = 0; i < numWorkers; i++) {
+ results.add(pool.chooseWorker().id)
+ }
+ expect(results.size).toBe(numWorkers)
+ })
+
+ it('Verify that the function is executed in a worker cluster', async () => {
+ const result = await pool.execute({ test: 'test' })
+ expect(result).toBeDefined()
+ expect(result).toBeFalsy()
+ })
+
+ it('Verify that is possible to invoke the execute method without input', async () => {
+ const result = await pool.execute()
+ expect(result).toBeDefined()
+ expect(result).toBeFalsy()
+ })
+
+ it('Verify that is possible to have a worker that return undefined', async () => {
+ const result = await emptyPool.execute()
+ expect(result).toBeFalsy()
+ })
+
+ it('Verify that data are sent to the worker correctly', async () => {
+ const data = { f: 10 }
+ const result = await echoPool.execute(data)
+ expect(result).toBeTruthy()
+ expect(result.f).toBe(data.f)
+ })
+
+ it('Verify that error handling is working properly:sync', async () => {
+ const data = { f: 10 }
+ let inError
+ try {
+ await errorPool.execute(data)
+ } catch (e) {
+ inError = e
+ }
+ expect(inError).toBeDefined()
+ expect(typeof inError === 'string').toBeTruthy()
+ expect(inError).toBe('Error Message from ClusterWorker')
+ })
+
+ it('Verify that error handling is working properly:async', async () => {
+ const data = { f: 10 }
+ let inError
+ try {
+ await asyncErrorPool.execute(data)
+ } catch (e) {
+ inError = e
+ }
+ expect(inError).toBeDefined()
+ expect(typeof inError === 'string').toBeTruthy()
+ expect(inError).toBe('Error Message from ClusterWorker:async')
+ })
+
+ it('Verify that async function is working properly', async () => {
+ const data = { f: 10 }
+ const startTime = new Date().getTime()
+ const result = await asyncPool.execute(data)
+ const usedTime = new Date().getTime() - startTime
+ expect(result).toBeTruthy()
+ expect(result.f).toBe(data.f)
+ expect(usedTime).toBeGreaterThanOrEqual(2000)
+ })
+
+ it('Shutdown test', async () => {
+ let closedWorkers = 0
+ pool.workers.forEach(w => {
+ w.on('exit', () => {
+ closedWorkers++
+ })
+ })
+ pool.destroy()
+ await new Promise(resolve => setTimeout(resolve, 200))
+ expect(closedWorkers).toBe(numWorkers)
+ })
+
+ it('Validations test', () => {
+ let error
+ try {
+ const pool1 = new FixedClusterPool()
+ console.log(pool1)
+ } catch (e) {
+ error = e
+ }
+ expect(error).toBeTruthy()
+ expect(error.message).toBeTruthy()
+ })
+
+ it('Should work even without opts in input', async () => {
+ const pool1 = new FixedClusterPool(
+ 1,
+ './tests/worker/cluster/testWorker.js'
+ )
+ const res = await pool1.execute({ test: 'test' })
+ expect(res).toBeFalsy()
+ })
+})
const expect = require('expect')
-const { DynamicThreadPool } = require('../lib/dynamic')
+const { DynamicThreadPool } = require('../../../lib/index')
const min = 1
const max = 3
-const pool = new DynamicThreadPool(min, max, './tests/workers/testWorker.js', {
- errorHandler: e => console.error(e),
- onlineHandler: () => console.log('worker is online')
-})
+const pool = new DynamicThreadPool(
+ min,
+ max,
+ './tests/worker/thread/testWorker.js',
+ {
+ errorHandler: e => console.error(e),
+ onlineHandler: () => console.log('worker is online')
+ }
+)
describe('Dynamic thread pool test suite ', () => {
it('Verify that the function is executed in a worker thread', async () => {
})
it('Should work even without opts in input', async () => {
- const pool1 = new DynamicThreadPool(1, 1, './tests/workers/testWorker.js')
+ const pool1 = new DynamicThreadPool(
+ 1,
+ 1,
+ './tests/worker/thread/testWorker.js'
+ )
const res = await pool1.execute({ test: 'test' })
expect(res).toBeFalsy()
})
const expect = require('expect')
-const { FixedThreadPool } = require('../lib/fixed')
+const { FixedThreadPool } = require('../../../lib/index')
const numThreads = 10
-const pool = new FixedThreadPool(numThreads, './tests/workers/testWorker.js', {
- errorHandler: e => console.error(e),
- onlineHandler: () => console.log('worker is online')
-})
-const emptyPool = new FixedThreadPool(1, './tests/workers/emptyWorker.js')
-const echoPool = new FixedThreadPool(1, './tests/workers/echoWorker.js')
-const errorPool = new FixedThreadPool(1, './tests/workers/errorWorker.js', {
- errorHandler: e => console.error(e),
- onlineHandler: () => console.log('worker is online')
-})
-const asyncPool = new FixedThreadPool(1, './tests/workers/asyncWorker.js')
+const pool = new FixedThreadPool(
+ numThreads,
+ './tests/worker/thread/testWorker.js',
+ {
+ errorHandler: e => console.error(e),
+ onlineHandler: () => console.log('worker is online')
+ }
+)
+const emptyPool = new FixedThreadPool(1, './tests/worker/thread/emptyWorker.js')
+const echoPool = new FixedThreadPool(1, './tests/worker/thread/echoWorker.js')
+const errorPool = new FixedThreadPool(
+ 1,
+ './tests/worker/thread/errorWorker.js',
+ {
+ errorHandler: e => console.error(e),
+ onlineHandler: () => console.log('worker is online')
+ }
+)
+const asyncPool = new FixedThreadPool(1, './tests/worker/thread/asyncWorker.js')
describe('Fixed thread pool test suite ', () => {
it('Choose worker round robin test', async () => {
})
it('Should work even without opts in input', async () => {
- const pool1 = new FixedThreadPool(1, './tests/workers/testWorker.js')
+ const pool1 = new FixedThreadPool(1, './tests/worker/thread/testWorker.js')
const res = await pool1.execute({ test: 'test' })
expect(res).toBeFalsy()
})
--- /dev/null
+'use strict'
+const { ClusterWorker } = require('../../../lib/index')
+
+async function error (data) {
+ return new Promise((resolve, reject) => {
+ setTimeout(
+ () => reject(new Error('Error Message from ClusterWorker:async')),
+ 2000
+ )
+ })
+}
+
+module.exports = new ClusterWorker(error, {
+ maxInactiveTime: 500,
+ async: true
+})
--- /dev/null
+'use strict'
+const { ClusterWorker } = require('../../../lib/index')
+
+async function sleep (data) {
+ return new Promise((resolve, reject) => {
+ setTimeout(() => resolve(data), 2000)
+ })
+}
+
+module.exports = new ClusterWorker(sleep, { maxInactiveTime: 500, async: true })
--- /dev/null
+'use strict'
+const { ClusterWorker } = require('../../../lib/index')
+
+function echo (data) {
+ return data
+}
+
+module.exports = new ClusterWorker(echo, { maxInactiveTime: 500 })
--- /dev/null
+'use strict'
+const { ClusterWorker } = require('../../../lib/index')
+
+function test (data) {}
+
+module.exports = new ClusterWorker(test, { maxInactiveTime: 500 })
--- /dev/null
+'use strict'
+const { ClusterWorker } = require('../../../lib/index')
+
+function error (data) {
+ throw new Error('Error Message from ClusterWorker')
+}
+
+module.exports = new ClusterWorker(error, {
+ maxInactiveTime: 500,
+ async: false
+})
--- /dev/null
+'use strict'
+const { ClusterWorker } = require('../../../lib/index')
+const cluster = require('cluster')
+
+function test (data) {
+ for (let i = 0; i <= 50; i++) {
+ const o = {
+ a: i
+ }
+ JSON.stringify(o)
+ }
+ return cluster.isMaster
+}
+
+module.exports = new ClusterWorker(test, { maxInactiveTime: 500 })
'use strict'
-const { ThreadWorker } = require('../../lib/workers')
+const { ThreadWorker } = require('../../../lib/index')
async function sleep (data) {
return new Promise((resolve, reject) => {
'use strict'
-const { ThreadWorker } = require('../../lib/workers')
+const { ThreadWorker } = require('../../../lib/index')
function echo (data) {
return data
'use strict'
-const { ThreadWorker } = require('../../lib/workers')
+const { ThreadWorker } = require('../../../lib/index')
function test (data) {}
'use strict'
-const { ThreadWorker } = require('../../lib/workers')
+const { ThreadWorker } = require('../../../lib/index')
function error (data) {
throw new Error(data)
'use strict'
-const { ThreadWorker } = require('../../lib/workers')
+const { ThreadWorker } = require('../../../lib/index')
const { isMainThread } = require('worker_threads')
function test (data) {