The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
+## [2.0.0] - not released yet
+
+### Breaking Changes
+
+We changed some internal structures, but you shouldn't be too affected by them as these are internal changes.
+
+#### New `export` strategy
+
+```js
+// Before
+const DynamicThreadPool = require("poolifier/lib/dynamic");
+// After
+const { DynamicThreadPool } = require("poolifier/lib/dynamic");
+```
+
+But you should always prefer just using
+
+```js
+const { DynamicThreadPool } = require("poolifier");
+```
+
+#### Internal (protected) methods has renamed
+
+Those methods are not intended to be used from final users
+
+- `_chooseWorker` => `chooseWorker`
+- `_newWorker` => `newWorker`
+- `_execute` => `internalExecute`
+- `_chooseWorker` => `chooseWorker`
+- `_checkAlive` => `checkAlive`
+- `_run` => `run`
+- `_runAsync` => `runAsync`
+
## [1.1.0] - 2020-21-05
### Added
this.emitter = new MyEmitter()
}
- protected _chooseWorker (): WorkerWithMessageChannel {
+ protected chooseWorker (): WorkerWithMessageChannel {
let worker: WorkerWithMessageChannel | undefined
for (const entry of this.tasks) {
if (entry[1] === 0) {
} else {
if (this.workers.length === this.max) {
this.emitter.emit('FullPool')
- return super._chooseWorker()
+ return super.chooseWorker()
}
// all workers are busy create a new worker
- const worker = this._newWorker()
+ const worker = this.newWorker()
worker.port2?.on('message', (message: { kill?: number }) => {
if (message.kill) {
worker.postMessage({ kill: 1 })
import { MessageChannel, SHARE_ENV, Worker, isMainThread } from 'worker_threads'
-function empty (): void {}
-const _void = {}
-
export type Draft<T> = { -readonly [P in keyof T]?: T[P] }
export type WorkerWithMessageChannel = Worker & Draft<MessageChannel>
>()
/* eslint-enable @typescript-eslint/indent */
- protected _id: number = 0
+ protected id: number = 0
/**
* @param numThreads Num of threads for this worker pool.
}
for (let i = 1; i <= this.numThreads; i++) {
- this._newWorker()
+ this.newWorker()
}
}
// eslint-disable-next-line @typescript-eslint/promise-function-async
public execute (data: Data): Promise<Response> {
// configure worker to handle message with the specified task
- const worker = this._chooseWorker()
+ const worker = this.chooseWorker()
const previousWorkerIndex = this.tasks.get(worker)
if (previousWorkerIndex !== undefined) {
this.tasks.set(worker, previousWorkerIndex + 1)
} else {
throw Error('Worker could not be found in tasks map')
}
- const id = ++this._id
- const res = this._execute(worker, id)
- worker.postMessage({ data: data || _void, _id: id })
+ const id = ++this.id
+ const res = this.internalExecute(worker, id)
+ worker.postMessage({ data: data || {}, id: id })
return res
}
// eslint-disable-next-line @typescript-eslint/promise-function-async
- protected _execute (
+ protected internalExecute (
worker: WorkerWithMessageChannel,
id: number
): Promise<Response> {
return new Promise((resolve, reject) => {
const listener = (message: {
- _id: number
+ id: number
error?: string
data: Response
}): void => {
- if (message._id === id) {
+ if (message.id === id) {
worker.port2?.removeListener('message', listener)
const previousWorkerIndex = this.tasks.get(worker)
if (previousWorkerIndex !== undefined) {
})
}
- protected _chooseWorker (): WorkerWithMessageChannel {
+ protected chooseWorker (): WorkerWithMessageChannel {
if (this.workers.length - 1 === this.nextWorker) {
this.nextWorker = 0
return this.workers[this.nextWorker]
}
}
- protected _newWorker (): WorkerWithMessageChannel {
+ protected newWorker (): WorkerWithMessageChannel {
const worker: WorkerWithMessageChannel = new Worker(this.filePath, {
env: SHARE_ENV
})
- worker.on('error', this.opts.errorHandler ?? empty)
- worker.on('online', this.opts.onlineHandler ?? empty)
+ worker.on('error', this.opts.errorHandler ?? (() => {}))
+ worker.on('online', this.opts.onlineHandler ?? (() => {}))
// TODO handle properly when a thread exit
- worker.on('exit', this.opts.exitHandler ?? empty)
+ worker.on('exit', this.opts.exitHandler ?? (() => {}))
this.workers.push(worker)
const { port1, port2 } = new MessageChannel()
worker.postMessage({ parent: port1 }, [port1])
// keep the worker active
if (!isMainThread) {
this.interval = setInterval(
- this._checkAlive.bind(this),
+ this.checkAlive.bind(this),
this.maxInactiveTime / 2
)
- this._checkAlive.bind(this)()
+ this.checkAlive.bind(this)()
}
parentPort?.on(
'message',
(value: {
data?: Response
- _id?: number
+ id?: number
parent?: MessagePort
kill?: number
}) => {
- if (value?.data && value._id) {
+ if (value?.data && value.id) {
// here you will receive messages
// console.log('This is the main thread ' + isMainThread)
if (this.async) {
- this.runInAsyncScope(this._runAsync.bind(this), this, fn, value)
+ this.runInAsyncScope(this.runAsync.bind(this), this, fn, value)
} else {
- this.runInAsyncScope(this._run.bind(this), this, fn, value)
+ this.runInAsyncScope(this.run.bind(this), this, fn, value)
}
} else if (value.parent) {
// save the port to communicate with the main thread
)
}
- protected _checkAlive (): void {
+ protected checkAlive (): void {
if (Date.now() - this.lastTask > this.maxInactiveTime) {
this.parent?.postMessage({ kill: 1 })
}
}
- protected _run (
+ protected run (
fn: (data: Data) => Response,
- value: { readonly data: Data, readonly _id: number }
+ value: { readonly data: Data, readonly id: number }
): void {
try {
const res = fn(value.data)
- this.parent?.postMessage({ data: res, _id: value._id })
+ this.parent?.postMessage({ data: res, id: value.id })
this.lastTask = Date.now()
} catch (e) {
- this.parent?.postMessage({ error: e, _id: value._id })
+ this.parent?.postMessage({ error: e, id: value.id })
this.lastTask = Date.now()
}
}
- protected _runAsync (
+ protected runAsync (
fn: (data: Data) => Promise<Response>,
- value: { readonly data: Data, readonly _id: number }
+ value: { readonly data: Data, readonly id: number }
): void {
fn(value.data)
.then(res => {
- this.parent?.postMessage({ data: res, _id: value._id })
+ this.parent?.postMessage({ data: res, id: value.id })
this.lastTask = Date.now()
})
.catch(e => {
- this.parent?.postMessage({ error: e, _id: value._id })
+ this.parent?.postMessage({ error: e, id: value.id })
this.lastTask = Date.now()
})
}
it('Choose worker round robin test', async () => {
const results = new Set()
for (let i = 0; i < numThreads; i++) {
- results.add(pool._chooseWorker().threadId)
+ results.add(pool.chooseWorker().threadId)
}
expect(results.size).toBe(numThreads)
})