X-Git-Url: https://git.piment-noir.org/?a=blobdiff_plain;f=src%2Ffixed.ts;h=d2c6ba0c2fd542c8dd928d08ad6f54902224734b;hb=776419ad922294f825dbaf8d1b55595d3706d102;hp=39ee595408be3c5ddf07d2d2419c1596628bc644;hpb=60fbd6d6188b0902d157fd0cde04d6af3a391e32;p=poolifier.git diff --git a/src/fixed.ts b/src/fixed.ts index 39ee5954..d2c6ba0c 100644 --- a/src/fixed.ts +++ b/src/fixed.ts @@ -1,9 +1,4 @@ -/* eslint-disable @typescript-eslint/strict-boolean-expressions */ - -import { MessageChannel, SHARE_ENV, Worker, isMainThread } from 'worker_threads' - -function empty (): void {} -const _void = {} +import { isMainThread, MessageChannel, SHARE_ENV, Worker } from 'worker_threads' export type Draft = { -readonly [P in keyof T]?: T[P] } @@ -38,19 +33,18 @@ export interface FixedThreadPoolOptions { * @author [Alessandro Pio Ardizio](https://github.com/pioardi) * @since 0.0.1 */ +// eslint-disable-next-line @typescript-eslint/no-explicit-any export class FixedThreadPool { public readonly workers: WorkerWithMessageChannel[] = [] public nextWorker: number = 0 // threadId as key and an integer value - /* eslint-disable @typescript-eslint/indent */ public readonly tasks: Map = new Map< WorkerWithMessageChannel, number >() - /* eslint-enable @typescript-eslint/indent */ - protected _id: number = 0 + protected id: number = 0 /** * @param numThreads Num of threads for this worker pool. @@ -71,7 +65,7 @@ export class FixedThreadPool { } for (let i = 1; i <= this.numThreads; i++) { - this._newWorker() + this.newWorker() } } @@ -87,34 +81,32 @@ export class FixedThreadPool { * @param data The input for the task specified. * @returns Promise that is resolved when the task is done. */ - // eslint-disable-next-line @typescript-eslint/promise-function-async public execute (data: Data): Promise { // configure worker to handle message with the specified task - const worker = this._chooseWorker() + const worker = this.chooseWorker() const previousWorkerIndex = this.tasks.get(worker) if (previousWorkerIndex !== undefined) { this.tasks.set(worker, previousWorkerIndex + 1) } else { throw Error('Worker could not be found in tasks map') } - const id = ++this._id - const res = this._execute(worker, id) - worker.postMessage({ data: data || _void, _id: id }) + const id = ++this.id + const res = this.internalExecute(worker, id) + worker.postMessage({ data: data || {}, id: id }) return res } - // eslint-disable-next-line @typescript-eslint/promise-function-async - protected _execute ( + protected internalExecute ( worker: WorkerWithMessageChannel, id: number ): Promise { return new Promise((resolve, reject) => { const listener = (message: { - _id: number + id: number error?: string data: Response }): void => { - if (message._id === id) { + if (message.id === id) { worker.port2?.removeListener('message', listener) const previousWorkerIndex = this.tasks.get(worker) if (previousWorkerIndex !== undefined) { @@ -130,7 +122,7 @@ export class FixedThreadPool { }) } - protected _chooseWorker (): WorkerWithMessageChannel { + protected chooseWorker (): WorkerWithMessageChannel { if (this.workers.length - 1 === this.nextWorker) { this.nextWorker = 0 return this.workers[this.nextWorker] @@ -140,14 +132,14 @@ export class FixedThreadPool { } } - protected _newWorker (): WorkerWithMessageChannel { + protected newWorker (): WorkerWithMessageChannel { const worker: WorkerWithMessageChannel = new Worker(this.filePath, { env: SHARE_ENV }) - worker.on('error', this.opts.errorHandler ?? empty) - worker.on('online', this.opts.onlineHandler ?? empty) + worker.on('error', this.opts.errorHandler ?? (() => {})) + worker.on('online', this.opts.onlineHandler ?? (() => {})) // TODO handle properly when a thread exit - worker.on('exit', this.opts.exitHandler ?? empty) + worker.on('exit', this.opts.exitHandler ?? (() => {})) this.workers.push(worker) const { port1, port2 } = new MessageChannel() worker.postMessage({ parent: port1 }, [port1])