1 import type EventEmitterAsyncResource from
'node:events';
2 import type { Worker
} from
'node:worker_threads';
4 import { DynamicThreadPool
, type ErrorHandler
, type ExitHandler
} from
'poolifier';
6 import { WorkerAbstract
} from
'./WorkerAbstract';
7 import type { WorkerData
, WorkerOptions
} from
'./WorkerTypes';
8 import { defaultErrorHandler
, defaultExitHandler
, sleep
} from
'./WorkerUtils';
10 export class WorkerDynamicPool
extends WorkerAbstract
<WorkerData
> {
11 private readonly pool
: DynamicThreadPool
<WorkerData
>;
14 * Create a new `WorkerDynamicPool`.
16 * @param workerScript -
17 * @param workerOptions -
19 constructor(workerScript
: string, workerOptions
?: WorkerOptions
) {
20 super(workerScript
, workerOptions
);
21 this.workerOptions
.poolOptions
.errorHandler
= (
22 this.workerOptions
?.poolOptions
?.errorHandler
?? defaultErrorHandler
23 ).bind(this) as ErrorHandler
<Worker
>;
24 this.workerOptions
.poolOptions
.exitHandler
= (
25 this.workerOptions
?.poolOptions
?.exitHandler
?? defaultExitHandler
26 ).bind(this) as ExitHandler
<Worker
>;
27 this.workerOptions
.poolOptions
.messageHandler
.bind(this);
28 this.pool
= new DynamicThreadPool
<WorkerData
>(
29 this.workerOptions
.poolMinSize
,
30 this.workerOptions
.poolMaxSize
,
32 this.workerOptions
.poolOptions
37 return this.pool
.workerNodes
.length
;
40 get
maxElementsPerWorker(): number | undefined {
44 get
emitter(): EventEmitterAsyncResource
| undefined {
45 return this.pool
?.emitter
;
49 public async start(): Promise
<void> {
50 // This is intentional
54 public async stop(): Promise
<void> {
55 return this.pool
.destroy();
59 public async addElement(elementData
: WorkerData
): Promise
<void> {
60 await this.pool
.execute(elementData
);
61 // Start element sequentially to optimize memory at startup
62 this.workerOptions
.elementStartDelay
> 0 && (await sleep(this.workerOptions
.elementStartDelay
));