fix: fix worker options argument passing to worker pool/set
authorJérôme Benoit <jerome.benoit@sap.com>
Sat, 10 Jun 2023 21:57:38 +0000 (23:57 +0200)
committerJérôme Benoit <jerome.benoit@sap.com>
Sat, 10 Jun 2023 21:57:38 +0000 (23:57 +0200)
Signed-off-by: Jérôme Benoit <jerome.benoit@sap.com>
src/charging-station/Bootstrap.ts
src/worker/WorkerAbstract.ts
src/worker/WorkerDynamicPool.ts
src/worker/WorkerFactory.ts
src/worker/WorkerSet.ts
src/worker/WorkerStaticPool.ts
src/worker/WorkerTypes.ts
src/worker/index.ts

index b59d925fb233202461ba71d2d07a9d9d3c7f5c01..d56beefcdc4df36076911751ce32f54dd26edf3e 100644 (file)
@@ -3,7 +3,7 @@
 import { EventEmitter } from 'node:events';
 import path from 'node:path';
 import { fileURLToPath } from 'node:url';
-import { type Worker, isMainThread } from 'node:worker_threads';
+import { isMainThread } from 'node:worker_threads';
 
 import chalk from 'chalk';
 
@@ -31,7 +31,7 @@ import {
   handleUnhandledRejection,
   logger,
 } from '../utils';
-import { type MessageHandler, type WorkerAbstract, WorkerFactory } from '../worker';
+import { type WorkerAbstract, WorkerFactory } from '../worker';
 
 const moduleName = 'Bootstrap';
 
@@ -201,8 +201,8 @@ export class Bootstrap extends EventEmitter {
           elementsPerWorker: Configuration.getWorker().elementsPerWorker,
           poolOptions: {
             workerChoiceStrategy: Configuration.getWorker().poolStrategy,
+            messageHandler: this.messageHandler.bind(this) as (message: unknown) => void,
           },
-          messageHandler: this.messageHandler.bind(this) as MessageHandler<Worker>,
         }
       ));
   }
index d42f629db3ff0b5ced69039c28c2b180dab7731b..0ef5470649c2e943e0ff7134cb89b9a3ed29ae3a 100644 (file)
@@ -1,10 +1,12 @@
 import type EventEmitterAsyncResource from 'node:events';
 import fs from 'node:fs';
+import type { Worker } from 'node:worker_threads';
 
-import type { PoolInfo } from 'poolifier';
+import type { ErrorHandler, ExitHandler, PoolInfo } from 'poolifier';
 
 import { WorkerConstants } from './WorkerConstants';
 import type { SetInfo, WorkerData, WorkerOptions } from './WorkerTypes';
+import { defaultErrorHandler, defaultExitHandler } from './WorkerUtils';
 
 export abstract class WorkerAbstract<T extends WorkerData> {
   protected readonly workerScript: string;
@@ -29,7 +31,6 @@ export abstract class WorkerAbstract<T extends WorkerData> {
       poolMaxSize: WorkerConstants.DEFAULT_POOL_MAX_SIZE,
       elementsPerWorker: WorkerConstants.DEFAULT_ELEMENTS_PER_WORKER,
       poolOptions: {},
-      messageHandler: WorkerConstants.EMPTY_FUNCTION,
     }
   ) {
     if (workerScript === null || workerScript === undefined) {
@@ -43,6 +44,14 @@ export abstract class WorkerAbstract<T extends WorkerData> {
     }
     this.workerScript = workerScript;
     this.workerOptions = workerOptions;
+    this.workerOptions.poolOptions?.messageHandler?.bind(this);
+    this.workerOptions.poolOptions.errorHandler = (
+      this.workerOptions?.poolOptions?.errorHandler ?? defaultErrorHandler
+    ).bind(this) as ErrorHandler<Worker>;
+    this.workerOptions.poolOptions?.onlineHandler?.bind(this);
+    this.workerOptions.poolOptions.exitHandler = (
+      this.workerOptions?.poolOptions?.exitHandler ?? defaultExitHandler
+    ).bind(this) as ExitHandler<Worker>;
   }
 
   /**
index 38d85aea06d0252531076338382cf161e447d991..4be6bd572bf5d258f40501ef4fa163eca3a4c8b9 100644 (file)
@@ -1,11 +1,10 @@
 import type EventEmitterAsyncResource from 'node:events';
-import type { Worker } from 'node:worker_threads';
 
-import { DynamicThreadPool, type ErrorHandler, type ExitHandler, type PoolInfo } from 'poolifier';
+import { DynamicThreadPool, type PoolInfo } from 'poolifier';
 
 import { WorkerAbstract } from './WorkerAbstract';
 import type { WorkerData, WorkerOptions } from './WorkerTypes';
-import { defaultErrorHandler, defaultExitHandler, sleep } from './WorkerUtils';
+import { sleep } from './WorkerUtils';
 
 export class WorkerDynamicPool extends WorkerAbstract<WorkerData> {
   private readonly pool: DynamicThreadPool<WorkerData>;
@@ -18,13 +17,6 @@ export class WorkerDynamicPool extends WorkerAbstract<WorkerData> {
    */
   constructor(workerScript: string, workerOptions?: WorkerOptions) {
     super(workerScript, workerOptions);
-    this.workerOptions.poolOptions.errorHandler = (
-      this.workerOptions?.poolOptions?.errorHandler ?? defaultErrorHandler
-    ).bind(this) as ErrorHandler<Worker>;
-    this.workerOptions.poolOptions.exitHandler = (
-      this.workerOptions?.poolOptions?.exitHandler ?? defaultExitHandler
-    ).bind(this) as ExitHandler<Worker>;
-    this.workerOptions.poolOptions.messageHandler.bind(this);
     this.pool = new DynamicThreadPool<WorkerData>(
       this.workerOptions.poolMinSize,
       this.workerOptions.poolMaxSize,
index 0a532681cf9a12d41ffb0c73b13b3fd71c8643e7..35fe82eb4c73a017d01316e265ddc7348fc72025 100644 (file)
@@ -1,6 +1,6 @@
-import { type Worker, isMainThread } from 'node:worker_threads';
+import { isMainThread } from 'node:worker_threads';
 
-import type { PoolOptions } from 'poolifier';
+import type { ThreadPoolOptions } from 'poolifier';
 
 import type { WorkerAbstract } from './WorkerAbstract';
 import { WorkerConstants } from './WorkerConstants';
@@ -27,9 +27,7 @@ export class WorkerFactory {
       workerOptions?.workerStartDelay ?? WorkerConstants.DEFAULT_WORKER_START_DELAY;
     workerOptions.elementStartDelay =
       workerOptions?.elementStartDelay ?? WorkerConstants.DEFAULT_ELEMENT_START_DELAY;
-    workerOptions.poolOptions = workerOptions?.poolOptions ?? ({} as PoolOptions<Worker>);
-    workerOptions?.messageHandler &&
-      (workerOptions.poolOptions.messageHandler = workerOptions.messageHandler);
+    workerOptions.poolOptions = workerOptions?.poolOptions ?? ({} as ThreadPoolOptions);
     let workerImplementation: WorkerAbstract<T> | null = null;
     switch (workerProcessType) {
       case WorkerProcessType.workerSet:
index 9efc88c1b3e76e53d5a58a09f2b06c1c3bb31436..da5684b0345bb2546fd0094e30e991edbb12634a 100644 (file)
@@ -1,12 +1,11 @@
 // Partial Copyright Jerome Benoit. 2021-2023. All Rights Reserved.
 
 import EventEmitterAsyncResource from 'node:events';
-import { Worker } from 'node:worker_threads';
+import { SHARE_ENV, Worker } from 'node:worker_threads';
 
 import { WorkerAbstract } from './WorkerAbstract';
 import { WorkerConstants } from './WorkerConstants';
 import {
-  type MessageHandler,
   type SetInfo,
   type WorkerData,
   WorkerMessageEvents,
@@ -14,7 +13,7 @@ import {
   type WorkerSetElement,
   WorkerSetEvents,
 } from './WorkerTypes';
-import { defaultErrorHandler, defaultExitHandler, sleep } from './WorkerUtils';
+import { sleep } from './WorkerUtils';
 
 export class WorkerSet extends WorkerAbstract<WorkerData> {
   public readonly emitter: EventEmitterAsyncResource;
@@ -87,19 +86,30 @@ export class WorkerSet extends WorkerAbstract<WorkerData> {
    * Add a new `WorkerSetElement`.
    */
   private addWorkerSetElement(): WorkerSetElement {
-    const worker = new Worker(this.workerScript);
+    const worker = new Worker(this.workerScript, {
+      env: SHARE_ENV,
+      ...this.workerOptions.poolOptions.workerOptions,
+    });
     worker.on(
       'message',
-      (this.workerOptions?.messageHandler ?? WorkerConstants.EMPTY_FUNCTION).bind(
-        this
-      ) as MessageHandler<Worker>
+      this.workerOptions?.poolOptions?.messageHandler ?? WorkerConstants.EMPTY_FUNCTION
+    );
+    worker.on(
+      'error',
+      this.workerOptions?.poolOptions?.errorHandler ?? WorkerConstants.EMPTY_FUNCTION
     );
-    worker.on('error', defaultErrorHandler.bind(this) as (err: Error) => void);
     worker.on('error', (error) => {
       this.emitter.emit(WorkerSetEvents.error, error);
       this.addWorkerSetElement();
     });
-    worker.on('exit', defaultExitHandler.bind(this) as (exitCode: number) => void);
+    worker.on(
+      'online',
+      this.workerOptions?.poolOptions?.onlineHandler ?? WorkerConstants.EMPTY_FUNCTION
+    );
+    worker.on(
+      'exit',
+      this.workerOptions?.poolOptions?.exitHandler ?? WorkerConstants.EMPTY_FUNCTION
+    );
     worker.on('exit', () => this.workerSet.delete(this.getWorkerSetElementByWorker(worker)));
     const workerSetElement: WorkerSetElement = { worker, numberOfWorkerElements: 0 };
     this.workerSet.add(workerSetElement);
index f5c2a6e89f60658dc0e30082d4aea62ad8e0ad62..473ff3bba41e0a332394f23a313a8040cf554179 100644 (file)
@@ -1,11 +1,10 @@
 import type EventEmitterAsyncResource from 'node:events';
-import type { Worker } from 'node:worker_threads';
 
-import { type ErrorHandler, type ExitHandler, FixedThreadPool, type PoolInfo } from 'poolifier';
+import { FixedThreadPool, type PoolInfo } from 'poolifier';
 
 import { WorkerAbstract } from './WorkerAbstract';
 import type { WorkerData, WorkerOptions } from './WorkerTypes';
-import { defaultErrorHandler, defaultExitHandler, sleep } from './WorkerUtils';
+import { sleep } from './WorkerUtils';
 
 export class WorkerStaticPool extends WorkerAbstract<WorkerData> {
   private readonly pool: FixedThreadPool<WorkerData>;
@@ -18,13 +17,6 @@ export class WorkerStaticPool extends WorkerAbstract<WorkerData> {
    */
   constructor(workerScript: string, workerOptions?: WorkerOptions) {
     super(workerScript, workerOptions);
-    this.workerOptions.poolOptions.errorHandler = (
-      this.workerOptions?.poolOptions?.errorHandler ?? defaultErrorHandler
-    ).bind(this) as ErrorHandler<Worker>;
-    this.workerOptions.poolOptions.exitHandler = (
-      this.workerOptions?.poolOptions?.exitHandler ?? defaultExitHandler
-    ).bind(this) as ExitHandler<Worker>;
-    this.workerOptions.poolOptions.messageHandler.bind(this);
     this.pool = new FixedThreadPool(
       this.workerOptions.poolMaxSize,
       this.workerScript,
index fa1c82b037a60c92dcdfe9366c9bde682b1fd8be..a6de3353cfbf74a3fd8f1a26055a7bfedaf3b9c4 100644 (file)
@@ -1,6 +1,6 @@
 import type { Worker } from 'node:worker_threads';
 
-import { type PoolEvent, PoolEvents, type PoolOptions } from 'poolifier';
+import { type PoolEvent, PoolEvents, type ThreadPoolOptions } from 'poolifier';
 
 export enum WorkerProcessType {
   workerSet = 'workerSet',
@@ -24,16 +24,13 @@ export const WorkerEvents = {
 } as const;
 export type WorkerEvents = PoolEvent | WorkerSetEvents;
 
-export type MessageHandler<T> = (this: T, message: unknown) => void;
-
 export type WorkerOptions = {
   workerStartDelay?: number;
   elementStartDelay?: number;
   poolMaxSize: number;
   poolMinSize: number;
   elementsPerWorker?: number;
-  poolOptions?: PoolOptions<Worker>;
-  messageHandler?: MessageHandler<Worker>;
+  poolOptions?: ThreadPoolOptions;
 };
 
 export type WorkerData = Record<string, unknown>;
index 9bf1e284f84ec4fb64c45c635af91115d1605cb5..50848d0900533e9d91cd3fe02b867c66797aad3b 100644 (file)
@@ -2,7 +2,6 @@ export type { WorkerAbstract } from './WorkerAbstract';
 export { WorkerConstants } from './WorkerConstants';
 export { WorkerFactory } from './WorkerFactory';
 export {
-  type MessageHandler,
   type WorkerData,
   WorkerEvents,
   type WorkerMessage,