From ee99693bf5c34d3acaf39db6d4506297b183e47d Mon Sep 17 00:00:00 2001 From: Shinigami92 Date: Sun, 7 Feb 2021 15:23:27 +0100 Subject: [PATCH] Use strict compiler flag --- src/dynamic.ts | 9 ++++--- src/fixed.ts | 33 +++++++++++++++++--------- src/index.ts | 6 ++--- src/workers.ts | 64 +++++++++++++++++++++++++++++--------------------- tsconfig.json | 3 ++- 5 files changed, 70 insertions(+), 45 deletions(-) diff --git a/src/dynamic.ts b/src/dynamic.ts index 667fa07a..211141ca 100644 --- a/src/dynamic.ts +++ b/src/dynamic.ts @@ -1,3 +1,5 @@ +/* eslint-disable @typescript-eslint/strict-boolean-expressions */ + import FixedThreadPool, { FixedThreadPoolOptions, WorkerWithMessageChannel @@ -42,7 +44,7 @@ export default class DynamicThreadPool< } protected _chooseWorker (): WorkerWithMessageChannel { - let worker: WorkerWithMessageChannel + let worker: WorkerWithMessageChannel | undefined for (const entry of this.tasks) { if (entry[1] === 0) { worker = entry[0] @@ -60,10 +62,11 @@ export default class DynamicThreadPool< } // all workers are busy create a new worker const worker = this._newWorker() - worker.port2.on('message', (message) => { + worker.port2?.on('message', (message: { kill?: number }) => { if (message.kill) { worker.postMessage({ kill: 1 }) - worker.terminate() + // eslint-disable-next-line no-void + void worker.terminate() // clean workers from data structures const workerIndex = this.workers.indexOf(worker) this.workers.splice(workerIndex, 1) diff --git a/src/fixed.ts b/src/fixed.ts index dee28c53..43171008 100644 --- a/src/fixed.ts +++ b/src/fixed.ts @@ -1,6 +1,8 @@ +/* eslint-disable @typescript-eslint/strict-boolean-expressions */ + import { MessageChannel, SHARE_ENV, Worker, isMainThread } from 'worker_threads' -function empty () {} +function empty (): void {} const _void = {} export type Draft = { -readonly [P in keyof T]?: T[P] } @@ -41,10 +43,12 @@ export default class FixedThreadPool { public nextWorker: number = 0 // threadId as key and an integer value + /* eslint-disable @typescript-eslint/indent */ public readonly tasks: Map = new Map< WorkerWithMessageChannel, number >() + /* eslint-enable @typescript-eslint/indent */ protected _id: number = 0 @@ -61,6 +65,7 @@ export default class FixedThreadPool { if (!isMainThread) { throw new Error('Cannot start a thread pool from a worker thread !!!') } + // TODO christopher 2021-02-07: Improve this check e.g. with a pattern or blank check if (!this.filePath) { throw new Error('Please specify a file with a worker implementation') } @@ -82,30 +87,36 @@ export default class FixedThreadPool { * @param data The input for the task specified. * @returns Promise that is resolved when the task is done. */ - public async execute (data: Data): Promise { + // eslint-disable-next-line @typescript-eslint/promise-function-async + public execute (data: Data): Promise { // configure worker to handle message with the specified task const worker = this._chooseWorker() - this.tasks.set(worker, this.tasks.get(worker) + 1) + this.tasks.set(worker, (this.tasks.get(worker) ?? 0) + 1) const id = ++this._id const res = this._execute(worker, id) worker.postMessage({ data: data || _void, _id: id }) return res } + // eslint-disable-next-line @typescript-eslint/promise-function-async protected _execute ( worker: WorkerWithMessageChannel, id: number ): Promise { return new Promise((resolve, reject) => { - const listener = (message) => { + const listener = (message: { + _id: number + error?: string + data: Response + }): void => { if (message._id === id) { - worker.port2.removeListener('message', listener) - this.tasks.set(worker, this.tasks.get(worker) - 1) + worker.port2?.removeListener('message', listener) + this.tasks.set(worker, (this.tasks.get(worker) ?? 0) - 1) if (message.error) reject(message.error) else resolve(message.data) } } - worker.port2.on('message', listener) + worker.port2?.on('message', listener) }) } @@ -123,10 +134,10 @@ export default class FixedThreadPool { const worker: WorkerWithMessageChannel = new Worker(this.filePath, { env: SHARE_ENV }) - worker.on('error', this.opts.errorHandler || empty) - worker.on('online', this.opts.onlineHandler || empty) + worker.on('error', this.opts.errorHandler ?? empty) + worker.on('online', this.opts.onlineHandler ?? empty) // TODO handle properly when a thread exit - worker.on('exit', this.opts.exitHandler || empty) + worker.on('exit', this.opts.exitHandler ?? empty) this.workers.push(worker) const { port1, port2 } = new MessageChannel() worker.postMessage({ parent: port1 }, [port1]) @@ -134,7 +145,7 @@ export default class FixedThreadPool { worker.port2 = port2 // we will attach a listener for every task, // when task is completed the listener will be removed but to avoid warnings we are increasing the max listeners size - worker.port2.setMaxListeners(this.opts.maxTasks || 1000) + worker.port2.setMaxListeners(this.opts.maxTasks ?? 1000) // init tasks map this.tasks.set(worker, 0) return worker diff --git a/src/index.ts b/src/index.ts index a5fe1519..db6960b8 100644 --- a/src/index.ts +++ b/src/index.ts @@ -2,13 +2,13 @@ import DynamicThreadPool from './dynamic' import FixedThreadPool from './fixed' import { ThreadWorker } from './workers' -export type { +export { Draft, FixedThreadPoolOptions, WorkerWithMessageChannel } from './fixed' -export type { DynamicThreadPoolOptions } from './dynamic' -export type { ThreadWorkerOptions } from './workers' +export { DynamicThreadPoolOptions } from './dynamic' +export { ThreadWorkerOptions } from './workers' export { FixedThreadPool, DynamicThreadPool, ThreadWorker } module.exports = { FixedThreadPool, DynamicThreadPool, ThreadWorker } diff --git a/src/workers.ts b/src/workers.ts index 990f2208..ec03c07e 100644 --- a/src/workers.ts +++ b/src/workers.ts @@ -1,3 +1,5 @@ +/* eslint-disable @typescript-eslint/strict-boolean-expressions */ + import { isMainThread, parentPort } from 'worker_threads' import { AsyncResource } from 'async_hooks' @@ -30,8 +32,8 @@ export class ThreadWorker extends AsyncResource { protected readonly maxInactiveTime: number protected readonly async: boolean protected lastTask: number - protected readonly interval: NodeJS.Timeout - protected parent: any + protected readonly interval?: NodeJS.Timeout + protected parent?: MessagePort public constructor ( fn: (data: Data) => Response, @@ -39,7 +41,7 @@ export class ThreadWorker extends AsyncResource { ) { super('worker-thread-pool:pioardi') - this.maxInactiveTime = this.opts.maxInactiveTime || 1000 * 60 + this.maxInactiveTime = this.opts.maxInactiveTime ?? 1000 * 60 this.async = !!this.opts.async this.lastTask = Date.now() if (!fn) throw new Error('Fn parameter is mandatory') @@ -51,30 +53,38 @@ export class ThreadWorker extends AsyncResource { ) this._checkAlive.bind(this)() } - parentPort.on('message', (value) => { - if (value && value.data && value._id) { - // here you will receive messages - // console.log('This is the main thread ' + isMainThread) - if (this.async) { - this.runInAsyncScope(this._runAsync.bind(this), this, fn, value) - } else { - this.runInAsyncScope(this._run.bind(this), this, fn, value) + parentPort?.on( + 'message', + (value: { + data?: Response + _id?: number + parent?: MessagePort + kill?: number + }) => { + if (value?.data && value._id) { + // here you will receive messages + // console.log('This is the main thread ' + isMainThread) + if (this.async) { + this.runInAsyncScope(this._runAsync.bind(this), this, fn, value) + } else { + this.runInAsyncScope(this._run.bind(this), this, fn, value) + } + } else if (value.parent) { + // save the port to communicate with the main thread + // this will be received once + this.parent = value.parent + } else if (value.kill) { + // here is time to kill this thread, just clearing the interval + if (this.interval) clearInterval(this.interval) + this.emitDestroy() } - } else if (value.parent) { - // save the port to communicate with the main thread - // this will be received once - this.parent = value.parent - } else if (value.kill) { - // here is time to kill this thread, just clearing the interval - clearInterval(this.interval) - this.emitDestroy() } - }) + ) } protected _checkAlive (): void { if (Date.now() - this.lastTask > this.maxInactiveTime) { - this.parent.postMessage({ kill: 1 }) + this.parent?.postMessage({ kill: 1 }) } } @@ -84,10 +94,10 @@ export class ThreadWorker extends AsyncResource { ): void { try { const res = fn(value.data) - this.parent.postMessage({ data: res, _id: value._id }) + this.parent?.postMessage({ data: res, _id: value._id }) this.lastTask = Date.now() } catch (e) { - this.parent.postMessage({ error: e, _id: value._id }) + this.parent?.postMessage({ error: e, _id: value._id }) this.lastTask = Date.now() } } @@ -97,12 +107,12 @@ export class ThreadWorker extends AsyncResource { value: { readonly data: Data, readonly _id: number } ): void { fn(value.data) - .then((res) => { - this.parent.postMessage({ data: res, _id: value._id }) + .then(res => { + this.parent?.postMessage({ data: res, _id: value._id }) this.lastTask = Date.now() }) - .catch((e) => { - this.parent.postMessage({ error: e, _id: value._id }) + .catch(e => { + this.parent?.postMessage({ error: e, _id: value._id }) this.lastTask = Date.now() }) } diff --git a/tsconfig.json b/tsconfig.json index 39bd6d90..75b92b54 100644 --- a/tsconfig.json +++ b/tsconfig.json @@ -4,7 +4,8 @@ "module": "CommonJS", "outDir": "lib", "esModuleInterop": true, - "declaration": true + "declaration": true, + "strict": true }, "include": ["src/**/*.ts"], "exclude": ["node_modules"] -- 2.34.1