From be245fdab36274873e0a9651589cebd097548076 Mon Sep 17 00:00:00 2001 From: =?utf8?q?J=C3=A9r=C3=B4me=20Benoit?= Date: Sat, 10 Jun 2023 23:57:38 +0200 Subject: [PATCH] fix: fix worker options argument passing to worker pool/set MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Signed-off-by: Jérôme Benoit --- src/charging-station/Bootstrap.ts | 6 +++--- src/worker/WorkerAbstract.ts | 13 +++++++++++-- src/worker/WorkerDynamicPool.ts | 12 ++---------- src/worker/WorkerFactory.ts | 8 +++----- src/worker/WorkerSet.ts | 28 +++++++++++++++++++--------- src/worker/WorkerStaticPool.ts | 12 ++---------- src/worker/WorkerTypes.ts | 7 ++----- src/worker/index.ts | 1 - 8 files changed, 42 insertions(+), 45 deletions(-) diff --git a/src/charging-station/Bootstrap.ts b/src/charging-station/Bootstrap.ts index b59d925f..d56beefc 100644 --- a/src/charging-station/Bootstrap.ts +++ b/src/charging-station/Bootstrap.ts @@ -3,7 +3,7 @@ import { EventEmitter } from 'node:events'; import path from 'node:path'; import { fileURLToPath } from 'node:url'; -import { type Worker, isMainThread } from 'node:worker_threads'; +import { isMainThread } from 'node:worker_threads'; import chalk from 'chalk'; @@ -31,7 +31,7 @@ import { handleUnhandledRejection, logger, } from '../utils'; -import { type MessageHandler, type WorkerAbstract, WorkerFactory } from '../worker'; +import { type WorkerAbstract, WorkerFactory } from '../worker'; const moduleName = 'Bootstrap'; @@ -201,8 +201,8 @@ export class Bootstrap extends EventEmitter { elementsPerWorker: Configuration.getWorker().elementsPerWorker, poolOptions: { workerChoiceStrategy: Configuration.getWorker().poolStrategy, + messageHandler: this.messageHandler.bind(this) as (message: unknown) => void, }, - messageHandler: this.messageHandler.bind(this) as MessageHandler, } )); } diff --git a/src/worker/WorkerAbstract.ts b/src/worker/WorkerAbstract.ts index d42f629d..0ef54706 100644 --- a/src/worker/WorkerAbstract.ts +++ b/src/worker/WorkerAbstract.ts @@ -1,10 +1,12 @@ import type EventEmitterAsyncResource from 'node:events'; import fs from 'node:fs'; +import type { Worker } from 'node:worker_threads'; -import type { PoolInfo } from 'poolifier'; +import type { ErrorHandler, ExitHandler, PoolInfo } from 'poolifier'; import { WorkerConstants } from './WorkerConstants'; import type { SetInfo, WorkerData, WorkerOptions } from './WorkerTypes'; +import { defaultErrorHandler, defaultExitHandler } from './WorkerUtils'; export abstract class WorkerAbstract { protected readonly workerScript: string; @@ -29,7 +31,6 @@ export abstract class WorkerAbstract { poolMaxSize: WorkerConstants.DEFAULT_POOL_MAX_SIZE, elementsPerWorker: WorkerConstants.DEFAULT_ELEMENTS_PER_WORKER, poolOptions: {}, - messageHandler: WorkerConstants.EMPTY_FUNCTION, } ) { if (workerScript === null || workerScript === undefined) { @@ -43,6 +44,14 @@ export abstract class WorkerAbstract { } this.workerScript = workerScript; this.workerOptions = workerOptions; + this.workerOptions.poolOptions?.messageHandler?.bind(this); + this.workerOptions.poolOptions.errorHandler = ( + this.workerOptions?.poolOptions?.errorHandler ?? defaultErrorHandler + ).bind(this) as ErrorHandler; + this.workerOptions.poolOptions?.onlineHandler?.bind(this); + this.workerOptions.poolOptions.exitHandler = ( + this.workerOptions?.poolOptions?.exitHandler ?? defaultExitHandler + ).bind(this) as ExitHandler; } /** diff --git a/src/worker/WorkerDynamicPool.ts b/src/worker/WorkerDynamicPool.ts index 38d85aea..4be6bd57 100644 --- a/src/worker/WorkerDynamicPool.ts +++ b/src/worker/WorkerDynamicPool.ts @@ -1,11 +1,10 @@ import type EventEmitterAsyncResource from 'node:events'; -import type { Worker } from 'node:worker_threads'; -import { DynamicThreadPool, type ErrorHandler, type ExitHandler, type PoolInfo } from 'poolifier'; +import { DynamicThreadPool, type PoolInfo } from 'poolifier'; import { WorkerAbstract } from './WorkerAbstract'; import type { WorkerData, WorkerOptions } from './WorkerTypes'; -import { defaultErrorHandler, defaultExitHandler, sleep } from './WorkerUtils'; +import { sleep } from './WorkerUtils'; export class WorkerDynamicPool extends WorkerAbstract { private readonly pool: DynamicThreadPool; @@ -18,13 +17,6 @@ export class WorkerDynamicPool extends WorkerAbstract { */ constructor(workerScript: string, workerOptions?: WorkerOptions) { super(workerScript, workerOptions); - this.workerOptions.poolOptions.errorHandler = ( - this.workerOptions?.poolOptions?.errorHandler ?? defaultErrorHandler - ).bind(this) as ErrorHandler; - this.workerOptions.poolOptions.exitHandler = ( - this.workerOptions?.poolOptions?.exitHandler ?? defaultExitHandler - ).bind(this) as ExitHandler; - this.workerOptions.poolOptions.messageHandler.bind(this); this.pool = new DynamicThreadPool( this.workerOptions.poolMinSize, this.workerOptions.poolMaxSize, diff --git a/src/worker/WorkerFactory.ts b/src/worker/WorkerFactory.ts index 0a532681..35fe82eb 100644 --- a/src/worker/WorkerFactory.ts +++ b/src/worker/WorkerFactory.ts @@ -1,6 +1,6 @@ -import { type Worker, isMainThread } from 'node:worker_threads'; +import { isMainThread } from 'node:worker_threads'; -import type { PoolOptions } from 'poolifier'; +import type { ThreadPoolOptions } from 'poolifier'; import type { WorkerAbstract } from './WorkerAbstract'; import { WorkerConstants } from './WorkerConstants'; @@ -27,9 +27,7 @@ export class WorkerFactory { workerOptions?.workerStartDelay ?? WorkerConstants.DEFAULT_WORKER_START_DELAY; workerOptions.elementStartDelay = workerOptions?.elementStartDelay ?? WorkerConstants.DEFAULT_ELEMENT_START_DELAY; - workerOptions.poolOptions = workerOptions?.poolOptions ?? ({} as PoolOptions); - workerOptions?.messageHandler && - (workerOptions.poolOptions.messageHandler = workerOptions.messageHandler); + workerOptions.poolOptions = workerOptions?.poolOptions ?? ({} as ThreadPoolOptions); let workerImplementation: WorkerAbstract | null = null; switch (workerProcessType) { case WorkerProcessType.workerSet: diff --git a/src/worker/WorkerSet.ts b/src/worker/WorkerSet.ts index 9efc88c1..da5684b0 100644 --- a/src/worker/WorkerSet.ts +++ b/src/worker/WorkerSet.ts @@ -1,12 +1,11 @@ // Partial Copyright Jerome Benoit. 2021-2023. All Rights Reserved. import EventEmitterAsyncResource from 'node:events'; -import { Worker } from 'node:worker_threads'; +import { SHARE_ENV, Worker } from 'node:worker_threads'; import { WorkerAbstract } from './WorkerAbstract'; import { WorkerConstants } from './WorkerConstants'; import { - type MessageHandler, type SetInfo, type WorkerData, WorkerMessageEvents, @@ -14,7 +13,7 @@ import { type WorkerSetElement, WorkerSetEvents, } from './WorkerTypes'; -import { defaultErrorHandler, defaultExitHandler, sleep } from './WorkerUtils'; +import { sleep } from './WorkerUtils'; export class WorkerSet extends WorkerAbstract { public readonly emitter: EventEmitterAsyncResource; @@ -87,19 +86,30 @@ export class WorkerSet extends WorkerAbstract { * Add a new `WorkerSetElement`. */ private addWorkerSetElement(): WorkerSetElement { - const worker = new Worker(this.workerScript); + const worker = new Worker(this.workerScript, { + env: SHARE_ENV, + ...this.workerOptions.poolOptions.workerOptions, + }); worker.on( 'message', - (this.workerOptions?.messageHandler ?? WorkerConstants.EMPTY_FUNCTION).bind( - this - ) as MessageHandler + this.workerOptions?.poolOptions?.messageHandler ?? WorkerConstants.EMPTY_FUNCTION + ); + worker.on( + 'error', + this.workerOptions?.poolOptions?.errorHandler ?? WorkerConstants.EMPTY_FUNCTION ); - worker.on('error', defaultErrorHandler.bind(this) as (err: Error) => void); worker.on('error', (error) => { this.emitter.emit(WorkerSetEvents.error, error); this.addWorkerSetElement(); }); - worker.on('exit', defaultExitHandler.bind(this) as (exitCode: number) => void); + worker.on( + 'online', + this.workerOptions?.poolOptions?.onlineHandler ?? WorkerConstants.EMPTY_FUNCTION + ); + worker.on( + 'exit', + this.workerOptions?.poolOptions?.exitHandler ?? WorkerConstants.EMPTY_FUNCTION + ); worker.on('exit', () => this.workerSet.delete(this.getWorkerSetElementByWorker(worker))); const workerSetElement: WorkerSetElement = { worker, numberOfWorkerElements: 0 }; this.workerSet.add(workerSetElement); diff --git a/src/worker/WorkerStaticPool.ts b/src/worker/WorkerStaticPool.ts index f5c2a6e8..473ff3bb 100644 --- a/src/worker/WorkerStaticPool.ts +++ b/src/worker/WorkerStaticPool.ts @@ -1,11 +1,10 @@ import type EventEmitterAsyncResource from 'node:events'; -import type { Worker } from 'node:worker_threads'; -import { type ErrorHandler, type ExitHandler, FixedThreadPool, type PoolInfo } from 'poolifier'; +import { FixedThreadPool, type PoolInfo } from 'poolifier'; import { WorkerAbstract } from './WorkerAbstract'; import type { WorkerData, WorkerOptions } from './WorkerTypes'; -import { defaultErrorHandler, defaultExitHandler, sleep } from './WorkerUtils'; +import { sleep } from './WorkerUtils'; export class WorkerStaticPool extends WorkerAbstract { private readonly pool: FixedThreadPool; @@ -18,13 +17,6 @@ export class WorkerStaticPool extends WorkerAbstract { */ constructor(workerScript: string, workerOptions?: WorkerOptions) { super(workerScript, workerOptions); - this.workerOptions.poolOptions.errorHandler = ( - this.workerOptions?.poolOptions?.errorHandler ?? defaultErrorHandler - ).bind(this) as ErrorHandler; - this.workerOptions.poolOptions.exitHandler = ( - this.workerOptions?.poolOptions?.exitHandler ?? defaultExitHandler - ).bind(this) as ExitHandler; - this.workerOptions.poolOptions.messageHandler.bind(this); this.pool = new FixedThreadPool( this.workerOptions.poolMaxSize, this.workerScript, diff --git a/src/worker/WorkerTypes.ts b/src/worker/WorkerTypes.ts index fa1c82b0..a6de3353 100644 --- a/src/worker/WorkerTypes.ts +++ b/src/worker/WorkerTypes.ts @@ -1,6 +1,6 @@ import type { Worker } from 'node:worker_threads'; -import { type PoolEvent, PoolEvents, type PoolOptions } from 'poolifier'; +import { type PoolEvent, PoolEvents, type ThreadPoolOptions } from 'poolifier'; export enum WorkerProcessType { workerSet = 'workerSet', @@ -24,16 +24,13 @@ export const WorkerEvents = { } as const; export type WorkerEvents = PoolEvent | WorkerSetEvents; -export type MessageHandler = (this: T, message: unknown) => void; - export type WorkerOptions = { workerStartDelay?: number; elementStartDelay?: number; poolMaxSize: number; poolMinSize: number; elementsPerWorker?: number; - poolOptions?: PoolOptions; - messageHandler?: MessageHandler; + poolOptions?: ThreadPoolOptions; }; export type WorkerData = Record; diff --git a/src/worker/index.ts b/src/worker/index.ts index 9bf1e284..50848d09 100644 --- a/src/worker/index.ts +++ b/src/worker/index.ts @@ -2,7 +2,6 @@ export type { WorkerAbstract } from './WorkerAbstract'; export { WorkerConstants } from './WorkerConstants'; export { WorkerFactory } from './WorkerFactory'; export { - type MessageHandler, type WorkerData, WorkerEvents, type WorkerMessage, -- 2.34.1