-import type { MessageValue } from '../utility-types'
+import type {
+ MessageValue,
+ PromiseWorkerResponseWrapper
+} from '../utility-types'
import type { IPoolInternal } from './pool-internal'
import { PoolEmitter } from './pool-internal'
import type { WorkerChoiceStrategy } from './selection-strategies'
Data = unknown,
Response = unknown
> implements IPoolInternal<Worker, Data, Response> {
+ /**
+ * The promise map.
+ *
+ * - `key`: This is the message ID of each submitted task.
+ * - `value`: An object that contains the worker, the resolve function and the reject function.
+ *
+ * When we receive a message from the worker we get a map entry and resolve/reject the promise based on the message.
+ */
+ protected promiseMap: Map<
+ number,
+ PromiseWorkerResponseWrapper<Worker, Response>
+ > = new Map<number, PromiseWorkerResponseWrapper<Worker, Response>>()
+
/** @inheritdoc */
public readonly workers: Worker[] = []
Message extends Data | Response
> (worker: Worker, listener: (message: MessageValue<Message>) => void): void
- protected abstract unregisterWorkerMessageListener<
- Message extends Data | Response
- > (worker: Worker, listener: (message: MessageValue<Message>) => void): void
-
protected internalExecute (
worker: Worker,
messageId: number
): Promise<Response> {
- return new Promise((resolve, reject) => {
- const listener: (message: MessageValue<Response>) => void = message => {
- if (message.id === messageId) {
- this.unregisterWorkerMessageListener(worker, listener)
- this.decreaseWorkersTasks(worker)
- if (message.error) reject(message.error)
- else resolve(message.data as Response)
- }
- }
- this.registerWorkerMessageListener(worker, listener)
+ return new Promise<Response>((resolve, reject) => {
+ this.promiseMap.set(messageId, { resolve, reject, worker })
})
}
return worker
}
+
+ /**
+ * This function is the listener registered for each worker.
+ *
+ * @returns The listener function to execute when a message is sent from a worker.
+ */
+ protected workerListener (): (message: MessageValue<Response>) => void {
+ const listener: (message: MessageValue<Response>) => void = message => {
+ if (message.id) {
+ const value = this.promiseMap.get(message.id)
+ if (value) {
+ this.decreaseWorkersTasks(value.worker)
+ if (message.error) value.reject(message.error)
+ else value.resolve(message.data as Response)
+ this.promiseMap.delete(message.id)
+ }
+ }
+ }
+ return listener
+ }
}