Only allow primitive JSON for transfer between worker and main worker (#128)
[poolifier.git] / src / pools / abstract-pool.ts
CommitLineData
c97c7edb
S
1import EventEmitter from 'events'
2import type { MessageValue } from '../utility-types'
3import type { IPool } from './pool'
4
5export type ErrorHandler<Worker> = (this: Worker, e: Error) => void
6export type OnlineHandler<Worker> = (this: Worker) => void
7export type ExitHandler<Worker> = (this: Worker, code: number) => void
8
9export interface IWorker {
10 on(event: 'error', handler: ErrorHandler<this>): void
11 on(event: 'online', handler: OnlineHandler<this>): void
12 on(event: 'exit', handler: ExitHandler<this>): void
13}
14
15export interface PoolOptions<Worker> {
16 /**
17 * A function that will listen for error event on each worker.
18 */
19 errorHandler?: ErrorHandler<Worker>
20 /**
21 * A function that will listen for online event on each worker.
22 */
23 onlineHandler?: OnlineHandler<Worker>
24 /**
25 * A function that will listen for exit event on each worker.
26 */
27 exitHandler?: ExitHandler<Worker>
28 /**
29 * This is just to avoid not useful warnings message, is used to set `maxListeners` on event emitters (workers are event emitters).
30 *
31 * @default 1000
32 */
33 maxTasks?: number
34}
35
36class PoolEmitter extends EventEmitter {}
37
38export abstract class AbstractPool<
39 Worker extends IWorker,
d3c8a1a8
S
40 Data = unknown,
41 Response = unknown
c97c7edb
S
42> implements IPool<Data, Response> {
43 public readonly workers: Worker[] = []
44 public nextWorker: number = 0
45
46 /**
47 * `workerId` as key and an integer value
48 */
49 public readonly tasks: Map<Worker, number> = new Map<Worker, number>()
50
51 public readonly emitter: PoolEmitter
52
53 protected id: number = 0
54
55 public constructor (
56 public readonly numWorkers: number,
57 public readonly filePath: string,
58 public readonly opts: PoolOptions<Worker> = { maxTasks: 1000 }
59 ) {
60 if (!this.isMain()) {
61 throw new Error('Cannot start a pool from a worker!')
62 }
63 // TODO christopher 2021-02-07: Improve this check e.g. with a pattern or blank check
64 if (!this.filePath) {
65 throw new Error('Please specify a file with a worker implementation')
66 }
67
68 this.setupHook()
69
70 for (let i = 1; i <= this.numWorkers; i++) {
71 this.internalNewWorker()
72 }
73
74 this.emitter = new PoolEmitter()
75 }
76
77 protected setupHook (): void {
78 // Can be overridden
79 }
80
81 protected abstract isMain (): boolean
82
83 public async destroy (): Promise<void> {
84 for (const worker of this.workers) {
85 await this.destroyWorker(worker)
86 }
87 }
88
89 protected abstract destroyWorker (worker: Worker): void | Promise<void>
90
91 protected abstract sendToWorker (
92 worker: Worker,
93 message: MessageValue<Data>
94 ): void
95
96 protected addWorker (worker: Worker): void {
97 const previousWorkerIndex = this.tasks.get(worker)
98 if (previousWorkerIndex !== undefined) {
99 this.tasks.set(worker, previousWorkerIndex + 1)
100 } else {
101 throw Error('Worker could not be found in tasks map')
102 }
103 }
104
105 /**
106 * Execute the task specified into the constructor with the data parameter.
107 *
108 * @param data The input for the task specified.
109 * @returns Promise that is resolved when the task is done.
110 */
111 public execute (data: Data): Promise<Response> {
112 // configure worker to handle message with the specified task
113 const worker = this.chooseWorker()
114 this.addWorker(worker)
115 const id = ++this.id
116 const res = this.internalExecute(worker, id)
117 this.sendToWorker(worker, { data: data || ({} as Data), id: id })
118 return res
119 }
120
121 protected abstract registerWorkerMessageListener (
122 port: Worker,
123 listener: (message: MessageValue<Response>) => void
124 ): void
125
126 protected abstract unregisterWorkerMessageListener (
127 port: Worker,
128 listener: (message: MessageValue<Response>) => void
129 ): void
130
131 protected internalExecute (worker: Worker, id: number): Promise<Response> {
132 return new Promise((resolve, reject) => {
133 const listener: (message: MessageValue<Response>) => void = message => {
134 if (message.id === id) {
135 this.unregisterWorkerMessageListener(worker, listener)
136 this.addWorker(worker)
137 if (message.error) reject(message.error)
138 else resolve(message.data as Response)
139 }
140 }
141 this.registerWorkerMessageListener(worker, listener)
142 })
143 }
144
145 protected chooseWorker (): Worker {
146 if (this.workers.length - 1 === this.nextWorker) {
147 this.nextWorker = 0
148 return this.workers[this.nextWorker]
149 } else {
150 this.nextWorker++
151 return this.workers[this.nextWorker]
152 }
153 }
154
155 protected abstract newWorker (): Worker
156
157 protected abstract afterNewWorkerPushed (worker: Worker): void
158
159 protected internalNewWorker (): Worker {
160 const worker: Worker = this.newWorker()
161 worker.on('error', this.opts.errorHandler ?? (() => {}))
162 worker.on('online', this.opts.onlineHandler ?? (() => {}))
163 // TODO handle properly when a worker exit
164 worker.on('exit', this.opts.exitHandler ?? (() => {}))
165 this.workers.push(worker)
166 this.afterNewWorkerPushed(worker)
167 // init tasks map
168 this.tasks.set(worker, 0)
169 return worker
170 }
171}