1 import { FixedThreadPool
, PoolOptions
} from
'poolifier';
3 import Utils from
'../utils/Utils';
4 import { Worker
} from
'worker_threads';
5 import WorkerAbstract from
'./WorkerAbstract';
6 import { WorkerData
} from
'../types/Worker';
8 export default class WorkerStaticPool
<T
> extends WorkerAbstract
{
9 private pool
: StaticPool
;
12 * Create a new `WorkerStaticPool`.
14 * @param {string} workerScript
15 * @param {number} numberOfThreads
16 * @param {number} startWorkerDelay
17 * @param {PoolOptions} opts
19 constructor(workerScript
: string, numberOfThreads
: number, startWorkerDelay
?: number, opts
?: PoolOptions
<Worker
>) {
20 super(workerScript
, startWorkerDelay
);
21 this.pool
= StaticPool
.getInstance(numberOfThreads
, this.workerScript
, opts
);
25 return this.pool
.workers
.length
;
28 get
maxElementsPerWorker(): number | null {
34 * @returns {Promise<void>}
37 // eslint-disable-next-line @typescript-eslint/no-empty-function
38 public async start(): Promise
<void> { }
42 * @returns {Promise<void>}
45 public async stop(): Promise
<void> {
46 return this.pool
.destroy();
51 * @param {T} elementData
52 * @returns {Promise<void>}
55 public async addElement(elementData
: T
): Promise
<void> {
56 await this.pool
.execute(elementData
);
57 // Start worker sequentially to optimize memory at startup
58 await Utils
.sleep(this.workerStartDelay
);
62 class StaticPool
extends FixedThreadPool
<WorkerData
> {
63 private static instance
: StaticPool
;
65 private constructor(numberOfThreads
: number, workerScript
: string, opts
?: PoolOptions
<Worker
>) {
66 super(numberOfThreads
, workerScript
, opts
);
69 public static getInstance(numberOfThreads
: number, workerScript
: string, opts
?: PoolOptions
<Worker
>): StaticPool
{
70 if (!StaticPool
.instance
) {
71 opts
.exitHandler
= opts
?.exitHandler
?? ((code
) => {
73 console
.error(`Worker stopped with exit code ${code}`);
76 StaticPool
.instance
= new StaticPool(numberOfThreads
, workerScript
, opts
);
78 return StaticPool
.instance
;