Simplify worker choosing (#138)
[poolifier.git] / src / pools / abstract-pool.ts
CommitLineData
c97c7edb
S
1import EventEmitter from 'events'
2import type { MessageValue } from '../utility-types'
3import type { IPool } from './pool'
4
5export type ErrorHandler<Worker> = (this: Worker, e: Error) => void
6export type OnlineHandler<Worker> = (this: Worker) => void
7export type ExitHandler<Worker> = (this: Worker, code: number) => void
8
9export interface IWorker {
10 on(event: 'error', handler: ErrorHandler<this>): void
11 on(event: 'online', handler: OnlineHandler<this>): void
12 on(event: 'exit', handler: ExitHandler<this>): void
13}
14
15export interface PoolOptions<Worker> {
16 /**
17 * A function that will listen for error event on each worker.
18 */
19 errorHandler?: ErrorHandler<Worker>
20 /**
21 * A function that will listen for online event on each worker.
22 */
23 onlineHandler?: OnlineHandler<Worker>
24 /**
25 * A function that will listen for exit event on each worker.
26 */
27 exitHandler?: ExitHandler<Worker>
28 /**
29 * This is just to avoid not useful warnings message, is used to set `maxListeners` on event emitters (workers are event emitters).
30 *
31 * @default 1000
32 */
33 maxTasks?: number
34}
35
36class PoolEmitter extends EventEmitter {}
37
38export abstract class AbstractPool<
39 Worker extends IWorker,
d3c8a1a8
S
40 Data = unknown,
41 Response = unknown
c97c7edb
S
42> implements IPool<Data, Response> {
43 public readonly workers: Worker[] = []
44 public nextWorker: number = 0
45
46 /**
47 * `workerId` as key and an integer value
48 */
49 public readonly tasks: Map<Worker, number> = new Map<Worker, number>()
50
51 public readonly emitter: PoolEmitter
52
53 protected id: number = 0
54
55 public constructor (
56 public readonly numWorkers: number,
57 public readonly filePath: string,
58 public readonly opts: PoolOptions<Worker> = { maxTasks: 1000 }
59 ) {
60 if (!this.isMain()) {
61 throw new Error('Cannot start a pool from a worker!')
62 }
63 // TODO christopher 2021-02-07: Improve this check e.g. with a pattern or blank check
64 if (!this.filePath) {
65 throw new Error('Please specify a file with a worker implementation')
66 }
67
68 this.setupHook()
69
70 for (let i = 1; i <= this.numWorkers; i++) {
71 this.internalNewWorker()
72 }
73
74 this.emitter = new PoolEmitter()
75 }
76
77 protected setupHook (): void {
78 // Can be overridden
79 }
80
81 protected abstract isMain (): boolean
82
83 public async destroy (): Promise<void> {
84 for (const worker of this.workers) {
85 await this.destroyWorker(worker)
86 }
87 }
88
89 protected abstract destroyWorker (worker: Worker): void | Promise<void>
90
91 protected abstract sendToWorker (
92 worker: Worker,
93 message: MessageValue<Data>
94 ): void
95
96 protected addWorker (worker: Worker): void {
97 const previousWorkerIndex = this.tasks.get(worker)
98 if (previousWorkerIndex !== undefined) {
99 this.tasks.set(worker, previousWorkerIndex + 1)
100 } else {
101 throw Error('Worker could not be found in tasks map')
102 }
103 }
104
f2fdaa86
JB
105 protected removeWorker (worker: Worker): void {
106 // Clean worker from data structure
107 const workerIndex = this.workers.indexOf(worker)
108 this.workers.splice(workerIndex, 1)
109 this.tasks.delete(worker)
110 }
111
c97c7edb
S
112 /**
113 * Execute the task specified into the constructor with the data parameter.
114 *
115 * @param data The input for the task specified.
116 * @returns Promise that is resolved when the task is done.
117 */
118 public execute (data: Data): Promise<Response> {
119 // configure worker to handle message with the specified task
120 const worker = this.chooseWorker()
121 this.addWorker(worker)
122 const id = ++this.id
123 const res = this.internalExecute(worker, id)
124 this.sendToWorker(worker, { data: data || ({} as Data), id: id })
125 return res
126 }
127
128 protected abstract registerWorkerMessageListener (
129 port: Worker,
130 listener: (message: MessageValue<Response>) => void
131 ): void
132
133 protected abstract unregisterWorkerMessageListener (
134 port: Worker,
135 listener: (message: MessageValue<Response>) => void
136 ): void
137
138 protected internalExecute (worker: Worker, id: number): Promise<Response> {
139 return new Promise((resolve, reject) => {
140 const listener: (message: MessageValue<Response>) => void = message => {
141 if (message.id === id) {
142 this.unregisterWorkerMessageListener(worker, listener)
143 this.addWorker(worker)
144 if (message.error) reject(message.error)
145 else resolve(message.data as Response)
146 }
147 }
148 this.registerWorkerMessageListener(worker, listener)
149 })
150 }
151
152 protected chooseWorker (): Worker {
afbc1e28
JB
153 this.nextWorker =
154 this.nextWorker === this.workers.length - 1 ? 0 : this.nextWorker + 1
155 return this.workers[this.nextWorker]
c97c7edb
S
156 }
157
158 protected abstract newWorker (): Worker
159
160 protected abstract afterNewWorkerPushed (worker: Worker): void
161
162 protected internalNewWorker (): Worker {
163 const worker: Worker = this.newWorker()
164 worker.on('error', this.opts.errorHandler ?? (() => {}))
165 worker.on('online', this.opts.onlineHandler ?? (() => {}))
166 // TODO handle properly when a worker exit
167 worker.on('exit', this.opts.exitHandler ?? (() => {}))
168 this.workers.push(worker)
169 this.afterNewWorkerPushed(worker)
170 // init tasks map
171 this.tasks.set(worker, 0)
172 return worker
173 }
174}