-import { availableParallelism } from 'poolifier'
+import { availableParallelism, type ThreadPoolOptions } from 'poolifier'
import type { WorkerOptions } from './WorkerTypes.js'
export const DEFAULT_ELEMENT_ADD_DELAY = 0
export const DEFAULT_WORKER_START_DELAY = 500
-export const DEFAULT_POOL_MIN_SIZE = Math.floor(availableParallelism() / 2)
-export const DEFAULT_POOL_MAX_SIZE = Math.round(availableParallelism() * 1.5)
+export const DEFAULT_POOL_MIN_SIZE = Math.max(1, Math.floor(availableParallelism() / 2))
+export const DEFAULT_POOL_MAX_SIZE = Math.max(
+ DEFAULT_POOL_MIN_SIZE,
+ Math.round(availableParallelism() * 1.5)
+)
export const DEFAULT_ELEMENTS_PER_WORKER = 1
+export const DEFAULT_POOL_OPTIONS: Readonly<ThreadPoolOptions> = Object.freeze({
+ enableEvents: true,
+ errorHandler: defaultErrorHandler,
+ exitHandler: defaultExitHandler,
+ restartWorkerOnError: true,
+ startWorkers: false,
+})
+
export const DEFAULT_WORKER_OPTIONS: Readonly<WorkerOptions> = Object.freeze({
elementAddDelay: DEFAULT_ELEMENT_ADD_DELAY,
elementsPerWorker: DEFAULT_ELEMENTS_PER_WORKER,
poolMaxSize: DEFAULT_POOL_MAX_SIZE,
poolMinSize: DEFAULT_POOL_MIN_SIZE,
- poolOptions: {
- enableEvents: true,
- errorHandler: defaultErrorHandler,
- exitHandler: defaultExitHandler,
- restartWorkerOnError: true,
- startWorkers: false,
- },
+ poolOptions: DEFAULT_POOL_OPTIONS,
workerStartDelay: DEFAULT_WORKER_START_DELAY,
})
}
private readonly promiseResponseMap: Map<
- `${string}-${string}-${string}-${string}`,
+ `${string}-${string}-${string}-${string}-${string}`,
ResponseWrapper<R>
>
}
this.workerSet = new Set<WorkerSetElement>()
this.promiseResponseMap = new Map<
- `${string}-${string}-${string}-${string}`,
+ `${string}-${string}-${string}-${string}-${string}`,
ResponseWrapper<R>
>()
if (this.workerOptions.poolOptions?.enableEvents === true) {
event: WorkerMessageEvents.addWorkerElement,
uuid: randomUUID(),
} satisfies WorkerMessage<D>
- workerSetElement.worker.postMessage(message)
this.promiseResponseMap.set(message.uuid, {
reject,
resolve,
workerSetElement,
})
+ workerSetElement.worker.postMessage(message)
})
const response = await sendMessageToWorker
// Add element sequentially to optimize memory at startup
- // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
- if (this.workerOptions.elementAddDelay! > 0) {
- // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
- await sleep(randomizeDelay(this.workerOptions.elementAddDelay!))
+ if (this.workerOptions.elementAddDelay != null && this.workerOptions.elementAddDelay > 0) {
+ await sleep(randomizeDelay(this.workerOptions.elementAddDelay))
}
return response
}
this.workerOptions.workerStartDelay! > 0 &&
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
(await sleep(randomizeDelay(this.workerOptions.workerStartDelay!)))
- this.emitter?.emit(WorkerSetEvents.started, this.info)
this.started = true
+ this.emitter?.emit(WorkerSetEvents.started, this.info)
}
/** @inheritDoc */
await worker.terminate()
await waitWorkerExit
}
- this.emitter?.emit(WorkerSetEvents.stopped, this.info)
+ for (const [uuid, responseWrapper] of this.promiseResponseMap) {
+ try {
+ responseWrapper.reject(
+ new Error(`WorkerSet stopped before responding request (uuid: ${uuid})`)
+ )
+ } finally {
+ this.promiseResponseMap.delete(uuid)
+ }
+ }
+ if (this.workerSet.size > 0) {
+ this.workerSet.clear()
+ }
this.started = false
+ this.emitter?.emit(WorkerSetEvents.stopped, this.info)
this.emitter?.emitDestroy()
}
worker.on('message', (message: WorkerMessage<R>) => {
const { data, event, uuid } = message
if (this.promiseResponseMap.has(uuid)) {
+ let error: Error
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
const { reject, resolve, workerSetElement } = this.promiseResponseMap.get(uuid)!
switch (event) {
reject(data)
break
default:
- reject(
- new Error(
- `Unknown worker message event: '${event}' received with data: '${JSON.stringify(
- data,
- undefined,
- 2
- )}'`
- )
+ error = new Error(
+ `Unknown worker message event: '${event}' received with data: '${JSON.stringify(
+ data,
+ undefined,
+ 2
+ )}'`
)
+ this.emitter?.emit(WorkerSetEvents.error, error)
+ reject(error)
}
this.promiseResponseMap.delete(uuid)
+ } else {
+ this.emitter?.emit(WorkerSetEvents.elementError, {
+ data,
+ event,
+ message: `Unknown worker message uuid: '${uuid}'`,
+ })
}
})
worker.on('error', this.workerOptions.poolOptions?.errorHandler ?? EMPTY_FUNCTION)
worker.once('error', error => {
this.emitter?.emit(WorkerSetEvents.error, error)
+ const workerSetElement = this.getWorkerSetElementByWorker(worker)
+ if (workerSetElement != null) {
+ this.rejectPendingPromiseForWorker(workerSetElement, error)
+ }
if (
this.workerOptions.poolOptions?.restartWorkerOnError === true &&
this.started &&
worker.on('online', this.workerOptions.poolOptions?.onlineHandler ?? EMPTY_FUNCTION)
worker.on('exit', this.workerOptions.poolOptions?.exitHandler ?? EMPTY_FUNCTION)
worker.once('exit', () => {
- this.removeWorkerSetElement(this.getWorkerSetElementByWorker(worker))
+ const workerSetElement = this.getWorkerSetElementByWorker(worker)
+ if (workerSetElement != null) {
+ this.rejectPendingPromiseForWorker(workerSetElement, new Error('Worker exited'))
+ }
+ this.removeWorkerSetElement(workerSetElement)
})
const workerSetElement: WorkerSetElement = {
numberOfWorkerElements: 0,
if (chosenWorkerSetElement == null) {
chosenWorkerSetElement = this.addWorkerSetElement()
// Add worker set element sequentially to optimize memory at startup
- // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
- this.workerOptions.workerStartDelay! > 0 &&
- // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
- (await sleep(randomizeDelay(this.workerOptions.workerStartDelay!)))
+ if (this.workerOptions.workerStartDelay != null && this.workerOptions.workerStartDelay > 0) {
+ await sleep(randomizeDelay(this.workerOptions.workerStartDelay))
+ }
}
return chosenWorkerSetElement
}
private getWorkerSetElementByWorker (worker: Worker): undefined | WorkerSetElement {
- let workerSetElt: undefined | WorkerSetElement
+ let workerSetElementFound: undefined | WorkerSetElement
for (const workerSetElement of this.workerSet) {
if (workerSetElement.worker.threadId === worker.threadId) {
- workerSetElt = workerSetElement
+ workerSetElementFound = workerSetElement
break
}
}
- return workerSetElt
+ return workerSetElementFound
+ }
+
+ private rejectPendingPromiseForWorker (workerSetElement: WorkerSetElement, reason: unknown): void {
+ for (const [uuid, responseWrapper] of this.promiseResponseMap) {
+ if (responseWrapper.workerSetElement === workerSetElement) {
+ try {
+ responseWrapper.reject(
+ reason ?? new Error(`Worker failed before completing request (uuid: ${uuid})`)
+ )
+ } finally {
+ this.promiseResponseMap.delete(uuid)
+ }
+ }
+ }
}
private removeWorkerSetElement (workerSetElement: undefined | WorkerSetElement): void {