import type {
IWorker,
IWorkerNode,
- MessageHandler,
WorkerInfo,
WorkerType,
WorkerUsage
worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
worker.once('exit', () => {
+ const workerInfo = this.getWorkerInfoByWorker(worker)
+ if (workerInfo.messageChannel != null) {
+ workerInfo.messageChannel?.port1.close()
+ workerInfo.messageChannel?.port1.close()
+ }
this.removeWorkerNode(worker)
})
* @param worker - The worker which should register a listener.
* @param listener - The message listener callback.
*/
- private registerWorkerMessageListener<Message extends Data | Response>(
- worker: Worker,
- listener: (message: MessageValue<Message>) => void
- ): void {
- worker.on('message', listener as MessageHandler<Worker>)
- }
+ protected abstract registerWorkerMessageListener<
+ Message extends Data | Response
+ >(worker: Worker, listener: (message: MessageValue<Message>) => void): void
/**
* Function that can be hooked up when a worker has been newly created and moved to the pool worker nodes.
protected afterWorkerSetup (worker: Worker): void {
// Listen to worker messages.
this.registerWorkerMessageListener(worker, this.workerListener())
- // Send startup message to worker.
- this.sendWorkerStartupMessage(worker)
+ // Send the startup message to worker.
+ this.sendStartupMessageToWorker(worker)
// Setup worker task statistics computation.
this.setWorkerStatistics(worker)
}
- private sendWorkerStartupMessage (worker: Worker): void {
- this.sendToWorker(worker, {
- ready: false,
- workerId: this.getWorkerInfoByWorker(worker).id as number
- })
- }
+ /**
+ * Sends the startup message to the given worker.
+ *
+ * @param worker - The worker which should receive the startup message.
+ */
+ protected abstract sendStartupMessageToWorker (worker: Worker): void
private redistributeQueuedTasks (workerNodeKey: number): void {
while (this.tasksQueueSize(workerNodeKey) > 0) {
*
* @param worker - The worker.
*/
- private getWorkerInfoByWorker (worker: Worker): WorkerInfo {
+ protected getWorkerInfoByWorker (worker: Worker): WorkerInfo {
return this.workerNodes[this.getWorkerNodeKey(worker)].info
}
worker.send(message)
}
+ /** @inheritDoc */
+ protected sendStartupMessageToWorker (worker: Worker): void {
+ this.sendToWorker(worker, {
+ ready: false,
+ workerId: this.getWorkerInfoByWorker(worker).id as number
+ })
+ }
+
+ /** @inheritDoc */
+ protected registerWorkerMessageListener<Message extends Data | Response>(
+ worker: Worker,
+ listener: (message: MessageValue<Message>) => void
+ ): void {
+ worker.on('message', listener)
+ }
+
/** @inheritDoc */
protected createWorker (): Worker {
return cluster.fork(this.opts.env)
import {
+ type MessageChannel,
+ type MessagePort,
SHARE_ENV,
Worker,
type WorkerOptions,
/** @inheritDoc */
protected async destroyWorker (worker: Worker): Promise<void> {
this.sendToWorker(worker, { kill: true, workerId: worker.threadId })
+ const workerInfo = this.getWorkerInfoByWorker(worker)
+ workerInfo.messageChannel?.port1.close()
+ workerInfo.messageChannel?.port2.close()
await worker.terminate()
}
/** @inheritDoc */
protected sendToWorker (worker: Worker, message: MessageValue<Data>): void {
- worker.postMessage(message)
+ (
+ this.getWorkerInfoByWorker(worker).messageChannel as MessageChannel
+ ).port1.postMessage(message)
+ }
+
+ /** @inheritDoc */
+ protected sendStartupMessageToWorker (worker: Worker): void {
+ const port2: MessagePort = (
+ this.getWorkerInfoByWorker(worker).messageChannel as MessageChannel
+ ).port2
+ worker.postMessage(
+ {
+ ready: false,
+ workerId: this.getWorkerInfoByWorker(worker).id as number,
+ port: port2
+ },
+ [port2]
+ )
+ }
+
+ /** @inheritDoc */
+ protected registerWorkerMessageListener<Message extends Data | Response>(
+ worker: Worker,
+ listener: (message: MessageValue<Message>) => void
+ ): void {
+ (
+ this.getWorkerInfoByWorker(worker).messageChannel as MessageChannel
+ ).port1.on('message', listener)
}
/** @inheritDoc */
+import { MessageChannel } from 'node:worker_threads'
import { CircularArray } from '../circular-array'
import { Queue } from '../queue'
import type { Task } from '../utility-types'
id: this.getWorkerId(worker, workerType),
type: workerType,
dynamic: false,
- ready: false
+ ready: false,
+ ...(workerType === WorkerTypes.thread && {
+ messageChannel: new MessageChannel()
+ })
}
}
+import type { MessageChannel } from 'node:worker_threads'
import type { CircularArray } from '../circular-array'
import type { Task } from '../utility-types'
* Ready flag.
*/
ready: boolean
+ /**
+ * Message channel.
+ */
+ messageChannel?: MessageChannel
}
/**
import type { EventLoopUtilization } from 'node:perf_hooks'
+import type { MessagePort } from 'node:worker_threads'
import type { KillBehavior } from './worker/worker-options'
import type { IWorker } from './pools/worker'
* Whether the worker starts or stops its activity check.
*/
readonly checkActive?: boolean
+ /**
+ * Message port.
+ */
+ readonly port?: MessagePort
}
/**
*
* @param type - The type of async event.
* @param isMain - Whether this is the main worker or not.
- * @param taskFunctions - Task function(s) processed by the worker when the pool's `execution` function is invoked. The first function is the default function.
* @param mainWorker - Reference to main worker.
+ * @param taskFunctions - Task function(s) processed by the worker when the pool's `execution` function is invoked. The first function is the default function.
* @param opts - Options for the worker.
*/
public constructor (
type: string,
protected readonly isMain: boolean,
+ protected readonly mainWorker: MainWorker,
taskFunctions:
| WorkerFunction<Data, Response>
| TaskFunctions<Data, Response>,
- protected readonly mainWorker: MainWorker,
protected readonly opts: WorkerOptions = {
/**
* The kill behavior option on this worker or its default value.
this.checkWorkerOptions(this.opts)
this.checkTaskFunctions(taskFunctions)
if (!this.isMain) {
- this.mainWorker?.on('message', this.messageListener.bind(this))
+ this.getMainWorker()?.on('message', this.handleReadyMessage.bind(this))
}
}
*
* @param message - The received message.
*/
- protected messageListener (message: MessageValue<Data, Data>): void {
+ protected messageListener (message: MessageValue<Data>): void {
if (message.workerId === this.id) {
- if (message.ready != null) {
- // Startup message received
- this.sendReadyResponse()
- } else if (message.statistics != null) {
+ if (message.statistics != null) {
// Statistics message received
this.statistics = message.statistics
} else if (message.checkActive != null) {
}
/**
- * Sends the ready response to the main worker.
+ * Handles the ready message sent by the main worker.
+ *
+ * @param message - The ready message.
*/
- protected sendReadyResponse (): void {
- !this.isMain && this.sendToMainWorker({ ready: true, workerId: this.id })
- }
+ protected abstract handleReadyMessage (message: MessageValue<Data>): void
/**
* Starts the worker check active interval.
super(
'worker-cluster-pool:poolifier',
cluster.isPrimary,
- taskFunctions,
cluster.worker as Worker,
+ taskFunctions,
opts
)
+ if (!this.isMain) {
+ this.getMainWorker()?.on('message', this.messageListener.bind(this))
+ }
+ }
+
+ /** @inheritDoc */
+ protected handleReadyMessage (message: MessageValue<Data>): void {
+ if (message.workerId === this.id && message.ready != null) {
+ !this.isMain && this.sendToMainWorker({ ready: true, workerId: this.id })
+ }
}
/** @inheritDoc */
Data = unknown,
Response = unknown
> extends AbstractWorker<MessagePort, Data, Response> {
+ /**
+ * Message port used to communicate with the main thread.
+ */
+ private port!: MessagePort
/**
* Constructs a new poolifier thread worker.
*
super(
'worker-thread-pool:poolifier',
isMainThread,
- taskFunctions,
parentPort as MessagePort,
+ taskFunctions,
opts
)
}
+ /** @inheritDoc */
+ protected handleReadyMessage (message: MessageValue<Data>): void {
+ if (
+ message.workerId === this.id &&
+ message.ready != null &&
+ message.port != null
+ ) {
+ if (!this.isMain) {
+ this.port = message.port
+ this.port.on('message', this.messageListener.bind(this))
+ this.sendToMainWorker({ ready: true, workerId: this.id })
+ }
+ }
+ }
+
+ /** @inheritDoc */
protected get id (): number {
return threadId
}
/** @inheritDoc */
protected sendToMainWorker (message: MessageValue<Response>): void {
- this.getMainWorker().postMessage(message)
+ this.port.postMessage(message)
}
/** @inheritDoc */
+const { MessageChannel } = require('worker_threads')
const { expect } = require('expect')
const {
DynamicClusterPool,
id: expect.any(Number),
type: WorkerTypes.thread,
dynamic: false,
- ready: true
+ ready: true,
+ messageChannel: expect.any(MessageChannel)
})
}
})
++numberOfMessagesPosted
}
class SpyWorker extends ThreadWorker {
- getMainWorker () {
- return { postMessage }
+ constructor (fn) {
+ super(fn)
+ this.port = { postMessage }
}
}
expect(worker.handleError(errorMessage)).toStrictEqual(errorMessage)
})
- it('Verify worker invokes the getMainWorker() and postMessage() methods', () => {
+ it('Verify worker invokes the postMessage() method on port property', () => {
const worker = new SpyWorker(() => {})
worker.sendToMainWorker({ ok: 1 })
expect(numberOfMessagesPosted).toBe(1)