Signed-off-by: Jérôme Benoit <jerome.benoit@sap.com>
19 files changed:
'poolify',
'readonly',
'resize',
'poolify',
'readonly',
'resize',
'sinon',
'threadjs',
'threadwork',
'sinon',
'threadjs',
'threadwork',
"autobuild",
"Benoit",
"caffeinate",
"autobuild",
"Benoit",
"caffeinate",
"codeql",
"commitlint",
"Dependabot",
"codeql",
"commitlint",
"Dependabot",
"poolify",
"preinstall",
"Quadflieg",
"poolify",
"preinstall",
"Quadflieg",
"Shinigami",
"sonarsource",
"suchmokuo",
"Shinigami",
"sonarsource",
"suchmokuo",
#### New type definitions for input data and response
#### New type definitions for input data and response
-For cluster worker and worker-thread pools, you can now only send and receive serializable data.
+For cluster worker and worker-thread pools, you can now only send and receive structured-cloneable data.
_This is not a limitation by poolifier but NodeJS._
#### Public property replacements
_This is not a limitation by poolifier but NodeJS._
#### Public property replacements
**See examples folder for more details (in particular if you want to use a pool with [multiple worker functions](./examples/multiFunctionExample.js))**.
**See examples folder for more details (in particular if you want to use a pool with [multiple worker functions](./examples/multiFunctionExample.js))**.
-Remember that workers can only send and receive serializable data.
+Remember that workers can only send and receive structured-cloneable data.
constructor () {
// eslint-disable-next-line @typescript-eslint/promise-function-async
super((data: MyData) => this.process(data), {
constructor () {
// eslint-disable-next-line @typescript-eslint/promise-function-async
super((data: MyData) => this.process(data), {
- maxInactiveTime: 60000,
- async: true
protected abstract destroyWorker (worker: Worker): void | Promise<void>
/**
protected abstract destroyWorker (worker: Worker): void | Promise<void>
/**
- * Setup hook to execute code before worker node are created in the abstract constructor.
- * Can be overridden
+ * Setup hook to execute code before worker nodes are created in the abstract constructor.
+ * Can be overridden.
/**
* Function that can be hooked up when a worker has been newly created and moved to the pool worker nodes.
/**
* Function that can be hooked up when a worker has been newly created and moved to the pool worker nodes.
*
* @param worker - The newly created worker.
*/
*
* @param worker - The newly created worker.
*/
- private afterWorkerSetup (worker: Worker): void {
+ protected afterWorkerSetup (worker: Worker): void {
// Listen to worker messages.
this.registerWorkerMessageListener(worker, this.workerListener())
}
// Listen to worker messages.
this.registerWorkerMessageListener(worker, this.workerListener())
}
/**
* A cluster pool with a fixed number of workers.
*
/**
* A cluster pool with a fixed number of workers.
*
- * It is possible to perform tasks in sync or asynchronous mode as you prefer.
- *
* @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
* @typeParam Response - Type of execution response. This can only be structured-cloneable data.
* @author [Christopher Quadflieg](https://github.com/Shinigami92)
* @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
* @typeParam Response - Type of execution response. This can only be structured-cloneable data.
* @author [Christopher Quadflieg](https://github.com/Shinigami92)
/**
* A thread pool with a fixed number of threads.
*
/**
* A thread pool with a fixed number of threads.
*
- * It is possible to perform tasks in sync or asynchronous mode as you prefer.
- *
* @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
* @typeParam Response - Type of execution response. This can only be structured-cloneable data.
* @author [Alessandro Pio Ardizio](https://github.com/pioardi)
* @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
* @typeParam Response - Type of execution response. This can only be structured-cloneable data.
* @author [Alessandro Pio Ardizio](https://github.com/pioardi)
*
* @typeParam Data - Type of data sent to the worker or execution response. This can only be structured-cloneable data.
* @typeParam ErrorData - Type of data sent to the worker triggering an error. This can only be structured-cloneable data.
*
* @typeParam Data - Type of data sent to the worker or execution response. This can only be structured-cloneable data.
* @typeParam ErrorData - Type of data sent to the worker triggering an error. This can only be structured-cloneable data.
- * @typeParam MainWorker - Type of main worker.
-export interface MessageValue<
- Data = unknown,
- ErrorData = unknown,
- MainWorker = NodeJS.Process | MessagePort
-> extends Task<Data> {
+export interface MessageValue<Data = unknown, ErrorData = unknown>
+ extends Task<Data> {
* Task performance.
*/
readonly taskPerformance?: TaskPerformance
* Task performance.
*/
readonly taskPerformance?: TaskPerformance
- /**
- * Reference to main worker.
- */
- readonly parent?: MainWorker
/**
* Whether to compute the given statistics or not.
*/
/**
* Whether to compute the given statistics or not.
*/
import { AsyncResource } from 'node:async_hooks'
import { AsyncResource } from 'node:async_hooks'
+import type { Worker } from 'node:cluster'
import type { MessagePort } from 'node:worker_threads'
import { performance } from 'node:perf_hooks'
import type {
import type { MessagePort } from 'node:worker_threads'
import { performance } from 'node:perf_hooks'
import type {
* @typeParam Response - Type of response the worker sends back to the main worker. This can only be structured-cloneable data.
*/
export abstract class AbstractWorker<
* @typeParam Response - Type of response the worker sends back to the main worker. This can only be structured-cloneable data.
*/
export abstract class AbstractWorker<
- MainWorker extends NodeJS.Process | MessagePort,
+ MainWorker extends Worker | MessagePort,
Data = unknown,
Response = unknown
> extends AsyncResource {
Data = unknown,
Response = unknown
> extends AsyncResource {
*
* @param message - Message received.
*/
*
* @param message - Message received.
*/
- protected messageListener (
- message: MessageValue<Data, Data, MainWorker>
- ): void {
+ protected messageListener (message: MessageValue<Data, Data>): void {
if (message.id != null && message.data != null) {
// Task message received
const fn = this.getTaskFunction(message.name)
if (message.id != null && message.data != null) {
// Task message received
const fn = this.getTaskFunction(message.name)
} else {
this.runInAsyncScope(this.runSync.bind(this), this, fn, message)
}
} else {
this.runInAsyncScope(this.runSync.bind(this), this, fn, message)
}
- } else if (message.parent != null) {
- // Main worker reference message received
- this.mainWorker = message.parent
} else if (message.statistics != null) {
// Statistics message received
this.statistics = message.statistics
} else if (message.statistics != null) {
// Statistics message received
this.statistics = message.statistics
-import cluster from 'node:cluster'
+import cluster, { type Worker } from 'node:cluster'
import type { MessageValue } from '../utility-types'
import { AbstractWorker } from './abstract-worker'
import type { WorkerOptions } from './worker-options'
import type { MessageValue } from '../utility-types'
import { AbstractWorker } from './abstract-worker'
import type { WorkerOptions } from './worker-options'
export class ClusterWorker<
Data = unknown,
Response = unknown
export class ClusterWorker<
Data = unknown,
Response = unknown
-> extends AbstractWorker<NodeJS.Process, Data, Response> {
+> extends AbstractWorker<Worker, Data, Response> {
/**
* Constructs a new poolifier cluster worker.
*
/**
* Constructs a new poolifier cluster worker.
*
'worker-cluster-pool:poolifier',
cluster.isPrimary,
taskFunctions,
'worker-cluster-pool:poolifier',
cluster.isPrimary,
taskFunctions,
+ cluster.worker as Worker,
opts
)
}
/** @inheritDoc */
protected sendToMainWorker (message: MessageValue<Response>): void {
opts
)
}
/** @inheritDoc */
protected sendToMainWorker (message: MessageValue<Response>): void {
- const mainWorker = this.getMainWorker()
- if (mainWorker.send == null) {
- throw new Error('Main worker does not support IPC communication')
- }
- mainWorker.send(message)
+ this.getMainWorker().send(message)
module.exports = new ClusterWorker(error, {
maxInactiveTime: 500,
module.exports = new ClusterWorker(error, {
maxInactiveTime: 500,
killBehavior: KillBehaviors.HARD
})
killBehavior: KillBehaviors.HARD
})
module.exports = new ClusterWorker(sleep, {
maxInactiveTime: 500,
module.exports = new ClusterWorker(sleep, {
maxInactiveTime: 500,
killBehavior: KillBehaviors.HARD
})
killBehavior: KillBehaviors.HARD
})
module.exports = new ClusterWorker(sleep, {
maxInactiveTime: 500,
module.exports = new ClusterWorker(sleep, {
maxInactiveTime: 500,
killBehavior: KillBehaviors.HARD
})
killBehavior: KillBehaviors.HARD
})
}
module.exports = new ClusterWorker(sleep, {
}
module.exports = new ClusterWorker(sleep, {
- maxInactiveTime: 500,
- async: true
module.exports = new ThreadWorker(error, {
maxInactiveTime: 500,
module.exports = new ThreadWorker(error, {
maxInactiveTime: 500,
killBehavior: KillBehaviors.HARD
})
killBehavior: KillBehaviors.HARD
})
module.exports = new ThreadWorker(sleep, {
maxInactiveTime: 500,
module.exports = new ThreadWorker(sleep, {
maxInactiveTime: 500,
killBehavior: KillBehaviors.HARD
})
killBehavior: KillBehaviors.HARD
})
module.exports = new ThreadWorker(sleep, {
maxInactiveTime: 500,
module.exports = new ThreadWorker(sleep, {
maxInactiveTime: 500,
killBehavior: KillBehaviors.HARD
})
killBehavior: KillBehaviors.HARD
})
}
module.exports = new ThreadWorker(sleep, {
}
module.exports = new ThreadWorker(sleep, {
- maxInactiveTime: 500,
- async: true