X-Git-Url: https://git.piment-noir.org/?a=blobdiff_plain;f=src%2Ffixed.ts;h=d2c6ba0c2fd542c8dd928d08ad6f54902224734b;hb=776419ad922294f825dbaf8d1b55595d3706d102;hp=dee28c53fc00d0f62f5d9ea753a8ecad3151bba3;hpb=1eaa679e42a2a158b38e6fd615fd04b1bf6eb694;p=poolifier.git diff --git a/src/fixed.ts b/src/fixed.ts index dee28c53..d2c6ba0c 100644 --- a/src/fixed.ts +++ b/src/fixed.ts @@ -1,7 +1,4 @@ -import { MessageChannel, SHARE_ENV, Worker, isMainThread } from 'worker_threads' - -function empty () {} -const _void = {} +import { isMainThread, MessageChannel, SHARE_ENV, Worker } from 'worker_threads' export type Draft = { -readonly [P in keyof T]?: T[P] } @@ -36,7 +33,8 @@ export interface FixedThreadPoolOptions { * @author [Alessandro Pio Ardizio](https://github.com/pioardi) * @since 0.0.1 */ -export default class FixedThreadPool { +// eslint-disable-next-line @typescript-eslint/no-explicit-any +export class FixedThreadPool { public readonly workers: WorkerWithMessageChannel[] = [] public nextWorker: number = 0 @@ -46,7 +44,7 @@ export default class FixedThreadPool { number >() - protected _id: number = 0 + protected id: number = 0 /** * @param numThreads Num of threads for this worker pool. @@ -61,12 +59,13 @@ export default class FixedThreadPool { if (!isMainThread) { throw new Error('Cannot start a thread pool from a worker thread !!!') } + // TODO christopher 2021-02-07: Improve this check e.g. with a pattern or blank check if (!this.filePath) { throw new Error('Please specify a file with a worker implementation') } for (let i = 1; i <= this.numThreads; i++) { - this._newWorker() + this.newWorker() } } @@ -82,34 +81,48 @@ export default class FixedThreadPool { * @param data The input for the task specified. * @returns Promise that is resolved when the task is done. */ - public async execute (data: Data): Promise { + public execute (data: Data): Promise { // configure worker to handle message with the specified task - const worker = this._chooseWorker() - this.tasks.set(worker, this.tasks.get(worker) + 1) - const id = ++this._id - const res = this._execute(worker, id) - worker.postMessage({ data: data || _void, _id: id }) + const worker = this.chooseWorker() + const previousWorkerIndex = this.tasks.get(worker) + if (previousWorkerIndex !== undefined) { + this.tasks.set(worker, previousWorkerIndex + 1) + } else { + throw Error('Worker could not be found in tasks map') + } + const id = ++this.id + const res = this.internalExecute(worker, id) + worker.postMessage({ data: data || {}, id: id }) return res } - protected _execute ( + protected internalExecute ( worker: WorkerWithMessageChannel, id: number ): Promise { return new Promise((resolve, reject) => { - const listener = (message) => { - if (message._id === id) { - worker.port2.removeListener('message', listener) - this.tasks.set(worker, this.tasks.get(worker) - 1) + const listener = (message: { + id: number + error?: string + data: Response + }): void => { + if (message.id === id) { + worker.port2?.removeListener('message', listener) + const previousWorkerIndex = this.tasks.get(worker) + if (previousWorkerIndex !== undefined) { + this.tasks.set(worker, previousWorkerIndex + 1) + } else { + throw Error('Worker could not be found in tasks map') + } if (message.error) reject(message.error) else resolve(message.data) } } - worker.port2.on('message', listener) + worker.port2?.on('message', listener) }) } - protected _chooseWorker (): WorkerWithMessageChannel { + protected chooseWorker (): WorkerWithMessageChannel { if (this.workers.length - 1 === this.nextWorker) { this.nextWorker = 0 return this.workers[this.nextWorker] @@ -119,14 +132,14 @@ export default class FixedThreadPool { } } - protected _newWorker (): WorkerWithMessageChannel { + protected newWorker (): WorkerWithMessageChannel { const worker: WorkerWithMessageChannel = new Worker(this.filePath, { env: SHARE_ENV }) - worker.on('error', this.opts.errorHandler || empty) - worker.on('online', this.opts.onlineHandler || empty) + worker.on('error', this.opts.errorHandler ?? (() => {})) + worker.on('online', this.opts.onlineHandler ?? (() => {})) // TODO handle properly when a thread exit - worker.on('exit', this.opts.exitHandler || empty) + worker.on('exit', this.opts.exitHandler ?? (() => {})) this.workers.push(worker) const { port1, port2 } = new MessageChannel() worker.postMessage({ parent: port1 }, [port1]) @@ -134,11 +147,9 @@ export default class FixedThreadPool { worker.port2 = port2 // we will attach a listener for every task, // when task is completed the listener will be removed but to avoid warnings we are increasing the max listeners size - worker.port2.setMaxListeners(this.opts.maxTasks || 1000) + worker.port2.setMaxListeners(this.opts.maxTasks ?? 1000) // init tasks map this.tasks.set(worker, 0) return worker } } - -module.exports = FixedThreadPool