From fa0f5b28a536f2bc5ebe395a0bac90b8a35d149e Mon Sep 17 00:00:00 2001 From: Shinigami Date: Tue, 9 Feb 2021 10:57:51 +0100 Subject: [PATCH] Rename functions and methods to not use prefix underscore (#86) * Rename functions and methods to not use prefix underscore * Update changelog * Add sentence --- CHANGELOG.md | 33 +++++++++++++++++++++++++++++++++ src/dynamic.ts | 6 +++--- src/fixed.ts | 31 ++++++++++++++----------------- src/workers.ts | 30 +++++++++++++++--------------- tests/fixed.test.js | 2 +- 5 files changed, 66 insertions(+), 36 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index cf61e781..8479f042 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,39 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [2.0.0] - not released yet + +### Breaking Changes + +We changed some internal structures, but you shouldn't be too affected by them as these are internal changes. + +#### New `export` strategy + +```js +// Before +const DynamicThreadPool = require("poolifier/lib/dynamic"); +// After +const { DynamicThreadPool } = require("poolifier/lib/dynamic"); +``` + +But you should always prefer just using + +```js +const { DynamicThreadPool } = require("poolifier"); +``` + +#### Internal (protected) methods has renamed + +Those methods are not intended to be used from final users + +- `_chooseWorker` => `chooseWorker` +- `_newWorker` => `newWorker` +- `_execute` => `internalExecute` +- `_chooseWorker` => `chooseWorker` +- `_checkAlive` => `checkAlive` +- `_run` => `run` +- `_runAsync` => `runAsync` + ## [1.1.0] - 2020-21-05 ### Added diff --git a/src/dynamic.ts b/src/dynamic.ts index ca6e4849..09b9bedb 100644 --- a/src/dynamic.ts +++ b/src/dynamic.ts @@ -44,7 +44,7 @@ export class DynamicThreadPool< this.emitter = new MyEmitter() } - protected _chooseWorker (): WorkerWithMessageChannel { + protected chooseWorker (): WorkerWithMessageChannel { let worker: WorkerWithMessageChannel | undefined for (const entry of this.tasks) { if (entry[1] === 0) { @@ -59,10 +59,10 @@ export class DynamicThreadPool< } else { if (this.workers.length === this.max) { this.emitter.emit('FullPool') - return super._chooseWorker() + return super.chooseWorker() } // all workers are busy create a new worker - const worker = this._newWorker() + const worker = this.newWorker() worker.port2?.on('message', (message: { kill?: number }) => { if (message.kill) { worker.postMessage({ kill: 1 }) diff --git a/src/fixed.ts b/src/fixed.ts index 39ee5954..b499c269 100644 --- a/src/fixed.ts +++ b/src/fixed.ts @@ -2,9 +2,6 @@ import { MessageChannel, SHARE_ENV, Worker, isMainThread } from 'worker_threads' -function empty (): void {} -const _void = {} - export type Draft = { -readonly [P in keyof T]?: T[P] } export type WorkerWithMessageChannel = Worker & Draft @@ -50,7 +47,7 @@ export class FixedThreadPool { >() /* eslint-enable @typescript-eslint/indent */ - protected _id: number = 0 + protected id: number = 0 /** * @param numThreads Num of threads for this worker pool. @@ -71,7 +68,7 @@ export class FixedThreadPool { } for (let i = 1; i <= this.numThreads; i++) { - this._newWorker() + this.newWorker() } } @@ -90,31 +87,31 @@ export class FixedThreadPool { // eslint-disable-next-line @typescript-eslint/promise-function-async public execute (data: Data): Promise { // configure worker to handle message with the specified task - const worker = this._chooseWorker() + const worker = this.chooseWorker() const previousWorkerIndex = this.tasks.get(worker) if (previousWorkerIndex !== undefined) { this.tasks.set(worker, previousWorkerIndex + 1) } else { throw Error('Worker could not be found in tasks map') } - const id = ++this._id - const res = this._execute(worker, id) - worker.postMessage({ data: data || _void, _id: id }) + const id = ++this.id + const res = this.internalExecute(worker, id) + worker.postMessage({ data: data || {}, id: id }) return res } // eslint-disable-next-line @typescript-eslint/promise-function-async - protected _execute ( + protected internalExecute ( worker: WorkerWithMessageChannel, id: number ): Promise { return new Promise((resolve, reject) => { const listener = (message: { - _id: number + id: number error?: string data: Response }): void => { - if (message._id === id) { + if (message.id === id) { worker.port2?.removeListener('message', listener) const previousWorkerIndex = this.tasks.get(worker) if (previousWorkerIndex !== undefined) { @@ -130,7 +127,7 @@ export class FixedThreadPool { }) } - protected _chooseWorker (): WorkerWithMessageChannel { + protected chooseWorker (): WorkerWithMessageChannel { if (this.workers.length - 1 === this.nextWorker) { this.nextWorker = 0 return this.workers[this.nextWorker] @@ -140,14 +137,14 @@ export class FixedThreadPool { } } - protected _newWorker (): WorkerWithMessageChannel { + protected newWorker (): WorkerWithMessageChannel { const worker: WorkerWithMessageChannel = new Worker(this.filePath, { env: SHARE_ENV }) - worker.on('error', this.opts.errorHandler ?? empty) - worker.on('online', this.opts.onlineHandler ?? empty) + worker.on('error', this.opts.errorHandler ?? (() => {})) + worker.on('online', this.opts.onlineHandler ?? (() => {})) // TODO handle properly when a thread exit - worker.on('exit', this.opts.exitHandler ?? empty) + worker.on('exit', this.opts.exitHandler ?? (() => {})) this.workers.push(worker) const { port1, port2 } = new MessageChannel() worker.postMessage({ parent: port1 }, [port1]) diff --git a/src/workers.ts b/src/workers.ts index ec03c07e..9ead4d47 100644 --- a/src/workers.ts +++ b/src/workers.ts @@ -48,26 +48,26 @@ export class ThreadWorker extends AsyncResource { // keep the worker active if (!isMainThread) { this.interval = setInterval( - this._checkAlive.bind(this), + this.checkAlive.bind(this), this.maxInactiveTime / 2 ) - this._checkAlive.bind(this)() + this.checkAlive.bind(this)() } parentPort?.on( 'message', (value: { data?: Response - _id?: number + id?: number parent?: MessagePort kill?: number }) => { - if (value?.data && value._id) { + if (value?.data && value.id) { // here you will receive messages // console.log('This is the main thread ' + isMainThread) if (this.async) { - this.runInAsyncScope(this._runAsync.bind(this), this, fn, value) + this.runInAsyncScope(this.runAsync.bind(this), this, fn, value) } else { - this.runInAsyncScope(this._run.bind(this), this, fn, value) + this.runInAsyncScope(this.run.bind(this), this, fn, value) } } else if (value.parent) { // save the port to communicate with the main thread @@ -82,37 +82,37 @@ export class ThreadWorker extends AsyncResource { ) } - protected _checkAlive (): void { + protected checkAlive (): void { if (Date.now() - this.lastTask > this.maxInactiveTime) { this.parent?.postMessage({ kill: 1 }) } } - protected _run ( + protected run ( fn: (data: Data) => Response, - value: { readonly data: Data, readonly _id: number } + value: { readonly data: Data, readonly id: number } ): void { try { const res = fn(value.data) - this.parent?.postMessage({ data: res, _id: value._id }) + this.parent?.postMessage({ data: res, id: value.id }) this.lastTask = Date.now() } catch (e) { - this.parent?.postMessage({ error: e, _id: value._id }) + this.parent?.postMessage({ error: e, id: value.id }) this.lastTask = Date.now() } } - protected _runAsync ( + protected runAsync ( fn: (data: Data) => Promise, - value: { readonly data: Data, readonly _id: number } + value: { readonly data: Data, readonly id: number } ): void { fn(value.data) .then(res => { - this.parent?.postMessage({ data: res, _id: value._id }) + this.parent?.postMessage({ data: res, id: value.id }) this.lastTask = Date.now() }) .catch(e => { - this.parent?.postMessage({ error: e, _id: value._id }) + this.parent?.postMessage({ error: e, id: value.id }) this.lastTask = Date.now() }) } diff --git a/tests/fixed.test.js b/tests/fixed.test.js index 12c705cc..b8f87264 100644 --- a/tests/fixed.test.js +++ b/tests/fixed.test.js @@ -17,7 +17,7 @@ describe('Fixed thread pool test suite ', () => { it('Choose worker round robin test', async () => { const results = new Set() for (let i = 0; i < numThreads; i++) { - results.add(pool._chooseWorker().threadId) + results.add(pool.chooseWorker().threadId) } expect(results.size).toBe(numThreads) }) -- 2.34.1