Encapsulate logic of cluster and thread worker/pool (#116)
[poolifier.git] / src / pools / abstract-pool.ts
1 import EventEmitter from 'events'
2 import type { MessageValue } from '../utility-types'
3 import type { IPool } from './pool'
4
5 export type ErrorHandler<Worker> = (this: Worker, e: Error) => void
6 export type OnlineHandler<Worker> = (this: Worker) => void
7 export type ExitHandler<Worker> = (this: Worker, code: number) => void
8
9 export interface IWorker {
10 on(event: 'error', handler: ErrorHandler<this>): void
11 on(event: 'online', handler: OnlineHandler<this>): void
12 on(event: 'exit', handler: ExitHandler<this>): void
13 }
14
15 export interface PoolOptions<Worker> {
16 /**
17 * A function that will listen for error event on each worker.
18 */
19 errorHandler?: ErrorHandler<Worker>
20 /**
21 * A function that will listen for online event on each worker.
22 */
23 onlineHandler?: OnlineHandler<Worker>
24 /**
25 * A function that will listen for exit event on each worker.
26 */
27 exitHandler?: ExitHandler<Worker>
28 /**
29 * This is just to avoid not useful warnings message, is used to set `maxListeners` on event emitters (workers are event emitters).
30 *
31 * @default 1000
32 */
33 maxTasks?: number
34 }
35
36 class PoolEmitter extends EventEmitter {}
37
38 export abstract class AbstractPool<
39 Worker extends IWorker,
40 // eslint-disable-next-line @typescript-eslint/no-explicit-any
41 Data = any,
42 // eslint-disable-next-line @typescript-eslint/no-explicit-any
43 Response = any
44 > implements IPool<Data, Response> {
45 public readonly workers: Worker[] = []
46 public nextWorker: number = 0
47
48 /**
49 * `workerId` as key and an integer value
50 */
51 public readonly tasks: Map<Worker, number> = new Map<Worker, number>()
52
53 public readonly emitter: PoolEmitter
54
55 protected id: number = 0
56
57 public constructor (
58 public readonly numWorkers: number,
59 public readonly filePath: string,
60 public readonly opts: PoolOptions<Worker> = { maxTasks: 1000 }
61 ) {
62 if (!this.isMain()) {
63 throw new Error('Cannot start a pool from a worker!')
64 }
65 // TODO christopher 2021-02-07: Improve this check e.g. with a pattern or blank check
66 if (!this.filePath) {
67 throw new Error('Please specify a file with a worker implementation')
68 }
69
70 this.setupHook()
71
72 for (let i = 1; i <= this.numWorkers; i++) {
73 this.internalNewWorker()
74 }
75
76 this.emitter = new PoolEmitter()
77 }
78
79 protected setupHook (): void {
80 // Can be overridden
81 }
82
83 protected abstract isMain (): boolean
84
85 public async destroy (): Promise<void> {
86 for (const worker of this.workers) {
87 await this.destroyWorker(worker)
88 }
89 }
90
91 protected abstract destroyWorker (worker: Worker): void | Promise<void>
92
93 protected abstract sendToWorker (
94 worker: Worker,
95 message: MessageValue<Data>
96 ): void
97
98 protected addWorker (worker: Worker): void {
99 const previousWorkerIndex = this.tasks.get(worker)
100 if (previousWorkerIndex !== undefined) {
101 this.tasks.set(worker, previousWorkerIndex + 1)
102 } else {
103 throw Error('Worker could not be found in tasks map')
104 }
105 }
106
107 /**
108 * Execute the task specified into the constructor with the data parameter.
109 *
110 * @param data The input for the task specified.
111 * @returns Promise that is resolved when the task is done.
112 */
113 public execute (data: Data): Promise<Response> {
114 // configure worker to handle message with the specified task
115 const worker = this.chooseWorker()
116 this.addWorker(worker)
117 const id = ++this.id
118 const res = this.internalExecute(worker, id)
119 this.sendToWorker(worker, { data: data || ({} as Data), id: id })
120 return res
121 }
122
123 protected abstract registerWorkerMessageListener (
124 port: Worker,
125 listener: (message: MessageValue<Response>) => void
126 ): void
127
128 protected abstract unregisterWorkerMessageListener (
129 port: Worker,
130 listener: (message: MessageValue<Response>) => void
131 ): void
132
133 protected internalExecute (worker: Worker, id: number): Promise<Response> {
134 return new Promise((resolve, reject) => {
135 const listener: (message: MessageValue<Response>) => void = message => {
136 if (message.id === id) {
137 this.unregisterWorkerMessageListener(worker, listener)
138 this.addWorker(worker)
139 if (message.error) reject(message.error)
140 else resolve(message.data as Response)
141 }
142 }
143 this.registerWorkerMessageListener(worker, listener)
144 })
145 }
146
147 protected chooseWorker (): Worker {
148 if (this.workers.length - 1 === this.nextWorker) {
149 this.nextWorker = 0
150 return this.workers[this.nextWorker]
151 } else {
152 this.nextWorker++
153 return this.workers[this.nextWorker]
154 }
155 }
156
157 protected abstract newWorker (): Worker
158
159 protected abstract afterNewWorkerPushed (worker: Worker): void
160
161 protected internalNewWorker (): Worker {
162 const worker: Worker = this.newWorker()
163 worker.on('error', this.opts.errorHandler ?? (() => {}))
164 worker.on('online', this.opts.onlineHandler ?? (() => {}))
165 // TODO handle properly when a worker exit
166 worker.on('exit', this.opts.exitHandler ?? (() => {}))
167 this.workers.push(worker)
168 this.afterNewWorkerPushed(worker)
169 // init tasks map
170 this.tasks.set(worker, 0)
171 return worker
172 }
173 }