import { MessageChannel } from 'node:worker_threads'
+import { EventEmitter } from 'node:events'
import { CircularArray } from '../circular-array'
import type { Task } from '../utility-types'
import { DEFAULT_TASK_NAME, getWorkerId, getWorkerType } from '../utils'
import { Deque } from '../deque'
import {
+ type ErrorHandler,
+ type ExitHandler,
type IWorker,
type IWorkerNode,
+ type MessageHandler,
+ type OnlineHandler,
type StrategyData,
type WorkerInfo,
- type WorkerNodeEventDetail,
+ type WorkerNodeOptions,
type WorkerType,
WorkerTypes,
type WorkerUsage
} from './worker'
-import { checkWorkerNodeArguments } from './utils'
+import { checkWorkerNodeArguments, createWorker } from './utils'
/**
* Worker node.
* @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
*/
export class WorkerNode<Worker extends IWorker, Data = unknown>
- extends EventTarget
+ extends EventEmitter
implements IWorkerNode<Worker, Data> {
/** @inheritdoc */
public readonly worker: Worker
/**
* Constructs a new worker node.
*
- * @param worker - The worker.
- * @param tasksQueueBackPressureSize - The tasks queue back pressure size.
+ * @param type - The worker type.
+ * @param filePath - Path to the worker file.
+ * @param opts - The worker node options.
*/
- constructor (worker: Worker, tasksQueueBackPressureSize: number) {
+ constructor (type: WorkerType, filePath: string, opts: WorkerNodeOptions) {
super()
- checkWorkerNodeArguments<Worker>(worker, tasksQueueBackPressureSize)
- this.worker = worker
- this.info = this.initWorkerInfo(worker)
+ checkWorkerNodeArguments(type, filePath, opts)
+ this.worker = createWorker<Worker>(type, filePath, {
+ env: opts.env,
+ workerOptions: opts.workerOptions
+ })
+ this.info = this.initWorkerInfo(this.worker)
this.usage = this.initWorkerUsage()
if (this.info.type === WorkerTypes.thread) {
this.messageChannel = new MessageChannel()
}
- this.tasksQueueBackPressureSize = tasksQueueBackPressureSize
+ this.tasksQueueBackPressureSize = opts.tasksQueueBackPressureSize
this.tasksQueue = new Deque<Task<Data>>()
this.onBackPressureStarted = false
this.taskFunctionsUsage = new Map<string, WorkerUsage>()
const tasksQueueSize = this.tasksQueue.push(task)
if (this.hasBackPressure() && !this.onBackPressureStarted) {
this.onBackPressureStarted = true
- this.dispatchEvent(
- new CustomEvent<WorkerNodeEventDetail>('backPressure', {
- detail: { workerId: this.info.id as number }
- })
- )
+ this.emit('backPressure', { workerId: this.info.id as number })
this.onBackPressureStarted = false
}
return tasksQueueSize
const tasksQueueSize = this.tasksQueue.unshift(task)
if (this.hasBackPressure() && !this.onBackPressureStarted) {
this.onBackPressureStarted = true
- this.dispatchEvent(
- new CustomEvent<WorkerNodeEventDetail>('backPressure', {
- detail: { workerId: this.info.id as number }
- })
- )
+ this.emit('backPressure', { workerId: this.info.id as number })
this.onBackPressureStarted = false
}
return tasksQueueSize
}
/** @inheritdoc */
- public closeChannel (): void {
- if (this.messageChannel != null) {
- this.messageChannel.port1.unref()
- this.messageChannel.port2.unref()
- this.messageChannel.port1.close()
- this.messageChannel.port2.close()
- delete this.messageChannel
+ public async terminate (): Promise<void> {
+ const waitWorkerExit = new Promise<void>(resolve => {
+ this.registerOnceWorkerEventHandler('exit', () => {
+ resolve()
+ })
+ })
+ this.closeMessageChannel()
+ this.removeAllListeners()
+ if (this.info.type === WorkerTypes.thread) {
+ await this.worker.terminate?.()
+ } else if (this.info.type === WorkerTypes.cluster) {
+ this.registerOnceWorkerEventHandler('disconnect', () => {
+ this.worker.kill?.()
+ })
+ this.worker.disconnect?.()
}
+ await waitWorkerExit
+ }
+
+ /** @inheritdoc */
+ public registerWorkerEventHandler (
+ event: string,
+ handler:
+ | OnlineHandler<Worker>
+ | MessageHandler<Worker>
+ | ErrorHandler<Worker>
+ | ExitHandler<Worker>
+ ): void {
+ this.worker.on(event, handler)
+ }
+
+ /** @inheritdoc */
+ public registerOnceWorkerEventHandler (
+ event: string,
+ handler:
+ | OnlineHandler<Worker>
+ | MessageHandler<Worker>
+ | ErrorHandler<Worker>
+ | ExitHandler<Worker>
+ ): void {
+ this.worker.once(event, handler)
}
/** @inheritdoc */
return this.taskFunctionsUsage.delete(name)
}
+ private closeMessageChannel (): void {
+ if (this.messageChannel != null) {
+ this.messageChannel.port1.unref()
+ this.messageChannel.port2.unref()
+ this.messageChannel.port1.close()
+ this.messageChannel.port2.close()
+ delete this.messageChannel
+ }
+ }
+
private initWorkerInfo (worker: Worker): WorkerInfo {
return {
id: getWorkerId(worker),