1 import type { EventEmitter
} from
'node:events';
3 import { DynamicThreadPool
, type PoolInfo
} from
'poolifier';
5 import { WorkerAbstract
} from
'./WorkerAbstract';
6 import type { WorkerData
, WorkerOptions
} from
'./WorkerTypes';
7 import { randomizeDelay
, sleep
} from
'./WorkerUtils';
9 export class WorkerDynamicPool
extends WorkerAbstract
<WorkerData
> {
10 private readonly pool
: DynamicThreadPool
<WorkerData
>;
13 * Creates a new `WorkerDynamicPool`.
15 * @param workerScript -
16 * @param workerOptions -
18 constructor(workerScript
: string, workerOptions
: WorkerOptions
) {
19 super(workerScript
, workerOptions
);
20 this.pool
= new DynamicThreadPool
<WorkerData
>(
21 this.workerOptions
.poolMinSize
,
22 this.workerOptions
.poolMaxSize
,
24 this.workerOptions
.poolOptions
,
28 get
info(): PoolInfo
{
29 return this.pool
.info
;
33 return this.pool
.info
.workerNodes
;
36 get
maxElementsPerWorker(): number | undefined {
40 get
emitter(): EventEmitter
| undefined {
41 return this.pool
?.emitter
as EventEmitter
;
45 public async start(): Promise
<void> {
46 // This is intentional
50 public async stop(): Promise
<void> {
51 return this.pool
.destroy();
55 public async addElement(elementData
: WorkerData
): Promise
<void> {
56 await this.pool
.execute(elementData
);
57 // Start element sequentially to optimize memory at startup
58 this.workerOptions
.elementStartDelay
! > 0 &&
59 (await sleep(randomizeDelay(this.workerOptions
.elementStartDelay
!)));