1 import EventEmitter from
'events'
2 import type { MessageValue
} from
'../utility-types'
3 import type { IPool
} from
'./pool'
5 export type ErrorHandler
<Worker
> = (this: Worker
, e
: Error) => void
6 export type OnlineHandler
<Worker
> = (this: Worker
) => void
7 export type ExitHandler
<Worker
> = (this: Worker
, code
: number) => void
9 export interface IWorker
{
10 on(event
: 'error', handler
: ErrorHandler
<this>): void
11 on(event
: 'online', handler
: OnlineHandler
<this>): void
12 on(event
: 'exit', handler
: ExitHandler
<this>): void
15 export interface PoolOptions
<Worker
> {
17 * A function that will listen for error event on each worker.
19 errorHandler
?: ErrorHandler
<Worker
>
21 * A function that will listen for online event on each worker.
23 onlineHandler
?: OnlineHandler
<Worker
>
25 * A function that will listen for exit event on each worker.
27 exitHandler
?: ExitHandler
<Worker
>
29 * This is just to avoid not useful warnings message, is used to set `maxListeners` on event emitters (workers are event emitters).
36 class PoolEmitter
extends EventEmitter
{}
38 export abstract class AbstractPool
<
39 Worker
extends IWorker
,
42 > implements IPool
<Data
, Response
> {
43 public readonly workers
: Worker
[] = []
44 public nextWorker
: number = 0
47 * `workerId` as key and an integer value
49 public readonly tasks
: Map
<Worker
, number> = new Map
<Worker
, number>()
51 public readonly emitter
: PoolEmitter
53 protected id
: number = 0
56 public readonly numWorkers
: number,
57 public readonly filePath
: string,
58 public readonly opts
: PoolOptions
<Worker
> = { maxTasks
: 1000 }
61 throw new Error('Cannot start a pool from a worker!')
63 // TODO christopher 2021-02-07: Improve this check e.g. with a pattern or blank check
65 throw new Error('Please specify a file with a worker implementation')
70 for (let i
= 1; i
<= this.numWorkers
; i
++) {
71 this.internalNewWorker()
74 this.emitter
= new PoolEmitter()
77 protected setupHook (): void {
81 protected abstract isMain (): boolean
83 public async destroy (): Promise
<void> {
84 for (const worker
of this.workers
) {
85 await this.destroyWorker(worker
)
89 protected abstract destroyWorker (worker
: Worker
): void | Promise
<void>
91 protected abstract sendToWorker (
93 message
: MessageValue
<Data
>
96 protected addWorker (worker
: Worker
): void {
97 const previousWorkerIndex
= this.tasks
.get(worker
)
98 if (previousWorkerIndex
!== undefined) {
99 this.tasks
.set(worker
, previousWorkerIndex
+ 1)
101 throw Error('Worker could not be found in tasks map')
105 protected removeWorker (worker
: Worker
): void {
106 // Clean worker from data structure
107 const workerIndex
= this.workers
.indexOf(worker
)
108 this.workers
.splice(workerIndex
, 1)
109 this.tasks
.delete(worker
)
113 * Execute the task specified into the constructor with the data parameter.
115 * @param data The input for the task specified.
116 * @returns Promise that is resolved when the task is done.
118 public execute (data
: Data
): Promise
<Response
> {
119 // configure worker to handle message with the specified task
120 const worker
= this.chooseWorker()
121 this.addWorker(worker
)
123 const res
= this.internalExecute(worker
, id
)
124 this.sendToWorker(worker
, { data
: data
|| ({} as Data
), id
: id
})
128 protected abstract registerWorkerMessageListener (
130 listener
: (message
: MessageValue
<Response
>) => void
133 protected abstract unregisterWorkerMessageListener (
135 listener
: (message
: MessageValue
<Response
>) => void
138 protected internalExecute (worker
: Worker
, id
: number): Promise
<Response
> {
139 return new Promise((resolve
, reject
) => {
140 const listener
: (message
: MessageValue
<Response
>) => void = message
=> {
141 if (message
.id
=== id
) {
142 this.unregisterWorkerMessageListener(worker
, listener
)
143 this.addWorker(worker
)
144 if (message
.error
) reject(message
.error
)
145 else resolve(message
.data
as Response
)
148 this.registerWorkerMessageListener(worker
, listener
)
152 protected chooseWorker (): Worker
{
154 this.nextWorker
=== this.workers
.length
- 1 ? 0 : this.nextWorker
+ 1
155 return this.workers
[this.nextWorker
]
158 protected abstract newWorker (): Worker
160 protected abstract afterNewWorkerPushed (worker
: Worker
): void
162 protected internalNewWorker (): Worker
{
163 const worker
: Worker
= this.newWorker()
164 worker
.on('error', this.opts
.errorHandler
?? (() => {}))
165 worker
.on('online', this.opts
.onlineHandler
?? (() => {}))
166 // TODO handle properly when a worker exit
167 worker
.on('exit', this.opts
.exitHandler
?? (() => {}))
168 this.workers
.push(worker
)
169 this.afterNewWorkerPushed(worker
)
171 this.tasks
.set(worker
, 0)