X-Git-Url: https://git.piment-noir.org/?a=blobdiff_plain;f=src%2Fworkers.ts;h=57f2fd95bef547161ca510078a5191dd6bd0ee9b;hb=776419ad922294f825dbaf8d1b55595d3706d102;hp=990f2208bc0cd506d12776eaf1397035116294c9;hpb=e8dac82a8da9a04d91e89d788e3c966cd6cf095d;p=poolifier.git diff --git a/src/workers.ts b/src/workers.ts index 990f2208..57f2fd95 100644 --- a/src/workers.ts +++ b/src/workers.ts @@ -1,6 +1,5 @@ -import { isMainThread, parentPort } from 'worker_threads' - import { AsyncResource } from 'async_hooks' +import { isMainThread, parentPort } from 'worker_threads' export interface ThreadWorkerOptions { /** @@ -26,12 +25,13 @@ export interface ThreadWorkerOptions { * @author [Alessandro Pio Ardizio](https://github.com/pioardi) * @since 0.0.1 */ +// eslint-disable-next-line @typescript-eslint/no-explicit-any export class ThreadWorker extends AsyncResource { protected readonly maxInactiveTime: number protected readonly async: boolean protected lastTask: number - protected readonly interval: NodeJS.Timeout - protected parent: any + protected readonly interval?: NodeJS.Timeout + protected parent?: MessagePort public constructor ( fn: (data: Data) => Response, @@ -39,70 +39,78 @@ export class ThreadWorker extends AsyncResource { ) { super('worker-thread-pool:pioardi') - this.maxInactiveTime = this.opts.maxInactiveTime || 1000 * 60 + this.maxInactiveTime = this.opts.maxInactiveTime ?? 1000 * 60 this.async = !!this.opts.async this.lastTask = Date.now() if (!fn) throw new Error('Fn parameter is mandatory') // keep the worker active if (!isMainThread) { this.interval = setInterval( - this._checkAlive.bind(this), + this.checkAlive.bind(this), this.maxInactiveTime / 2 ) - this._checkAlive.bind(this)() + this.checkAlive.bind(this)() } - parentPort.on('message', (value) => { - if (value && value.data && value._id) { - // here you will receive messages - // console.log('This is the main thread ' + isMainThread) - if (this.async) { - this.runInAsyncScope(this._runAsync.bind(this), this, fn, value) - } else { - this.runInAsyncScope(this._run.bind(this), this, fn, value) + parentPort?.on( + 'message', + (value: { + data?: Response + id?: number + parent?: MessagePort + kill?: number + }) => { + if (value?.data && value.id) { + // here you will receive messages + // console.log('This is the main thread ' + isMainThread) + if (this.async) { + this.runInAsyncScope(this.runAsync.bind(this), this, fn, value) + } else { + this.runInAsyncScope(this.run.bind(this), this, fn, value) + } + } else if (value.parent) { + // save the port to communicate with the main thread + // this will be received once + this.parent = value.parent + } else if (value.kill) { + // here is time to kill this thread, just clearing the interval + if (this.interval) clearInterval(this.interval) + this.emitDestroy() } - } else if (value.parent) { - // save the port to communicate with the main thread - // this will be received once - this.parent = value.parent - } else if (value.kill) { - // here is time to kill this thread, just clearing the interval - clearInterval(this.interval) - this.emitDestroy() } - }) + ) } - protected _checkAlive (): void { + protected checkAlive (): void { if (Date.now() - this.lastTask > this.maxInactiveTime) { - this.parent.postMessage({ kill: 1 }) + this.parent?.postMessage({ kill: 1 }) } } - protected _run ( + protected run ( fn: (data: Data) => Response, - value: { readonly data: Data, readonly _id: number } + value: { readonly data: Data; readonly id: number } ): void { try { const res = fn(value.data) - this.parent.postMessage({ data: res, _id: value._id }) + this.parent?.postMessage({ data: res, id: value.id }) this.lastTask = Date.now() } catch (e) { - this.parent.postMessage({ error: e, _id: value._id }) + this.parent?.postMessage({ error: e, id: value.id }) this.lastTask = Date.now() } } - protected _runAsync ( + protected runAsync ( fn: (data: Data) => Promise, - value: { readonly data: Data, readonly _id: number } + value: { readonly data: Data; readonly id: number } ): void { fn(value.data) - .then((res) => { - this.parent.postMessage({ data: res, _id: value._id }) + .then(res => { + this.parent?.postMessage({ data: res, id: value.id }) this.lastTask = Date.now() }) - .catch((e) => { - this.parent.postMessage({ error: e, _id: value._id }) + .catch(e => { + this.parent?.postMessage({ error: e, id: value.id }) this.lastTask = Date.now() }) }