- protected internalExecute (worker: Worker, id: number): Promise<Response> {
- return new Promise((resolve, reject) => {
- const listener: (message: MessageValue<Response>) => void = message => {
- if (message.id === id) {
- this.unregisterWorkerMessageListener(worker, listener)
- this.addWorker(worker)
- if (message.error) reject(message.error)
- else resolve(message.data as Response)
+ /**
+ * Registers a listener callback on the given worker.
+ *
+ * @param worker - The worker which should register a listener.
+ * @param listener - The message listener callback.
+ */
+ protected abstract registerWorkerMessageListener<
+ Message extends Data | Response
+ >(worker: Worker, listener: (message: MessageValue<Message>) => void): void
+
+ /**
+ * Returns a newly created worker.
+ */
+ protected abstract createWorker (): Worker
+
+ /**
+ * Function that can be hooked up when a worker has been newly created and moved to the pool worker nodes.
+ *
+ * Can be used to update the `maxListeners` or binding the `main-worker`\<-\>`worker` connection if not bind by default.
+ *
+ * @param worker - The newly created worker.
+ */
+ protected abstract afterWorkerSetup (worker: Worker): void
+
+ /**
+ * Creates a new worker and sets it up completely in the pool worker nodes.
+ *
+ * @returns New, completely set up worker.
+ */
+ protected createAndSetupWorker (): Worker {
+ const worker = this.createWorker()
+
+ worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
+ worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
+ worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
+ worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
+ worker.once('exit', () => {
+ this.removeWorkerNode(worker)
+ })
+
+ this.pushWorkerNode(worker)
+
+ this.afterWorkerSetup(worker)
+
+ return worker
+ }
+
+ /**
+ * This function is the listener registered for each worker message.
+ *
+ * @returns The listener function to execute when a message is received from a worker.
+ */
+ protected workerListener (): (message: MessageValue<Response>) => void {
+ return message => {
+ if (message.id != null) {
+ // Task execution response received
+ const promiseResponse = this.promiseResponseMap.get(message.id)
+ if (promiseResponse != null) {
+ if (message.error != null) {
+ promiseResponse.reject(message.error)
+ } else {
+ promiseResponse.resolve(message.data as Response)
+ }
+ this.afterTaskExecutionHook(promiseResponse.worker, message)
+ this.promiseResponseMap.delete(message.id)
+ const workerNodeKey = this.getWorkerNodeKey(promiseResponse.worker)
+ if (
+ this.opts.enableTasksQueue === true &&
+ this.tasksQueueSize(workerNodeKey) > 0
+ ) {
+ this.executeTask(
+ workerNodeKey,
+ this.dequeueTask(workerNodeKey) as Task<Data>
+ )
+ }