import { randomUUID } from 'node:crypto'
import { performance } from 'node:perf_hooks'
import { existsSync } from 'node:fs'
+import { type TransferListItem } from 'node:worker_threads'
import type {
MessageValue,
PromiseResponseWrapper,
protected abstract get busy (): boolean
/**
- * Whether worker nodes are executing at least one task.
+ * Whether worker nodes are executing concurrently their tasks quota or not.
*
* @returns Worker nodes busyness boolean status.
*/
protected internalBusy (): boolean {
- return (
- this.workerNodes.findIndex(
- workerNode =>
- workerNode.info.ready && workerNode.usage.tasks.executing === 0
- ) === -1
- )
+ if (this.opts.enableTasksQueue === true) {
+ return (
+ this.workerNodes.findIndex(
+ workerNode =>
+ workerNode.info.ready &&
+ workerNode.usage.tasks.executing <
+ (this.opts.tasksQueueOptions?.concurrency as number)
+ ) === -1
+ )
+ } else {
+ return (
+ this.workerNodes.findIndex(
+ workerNode =>
+ workerNode.info.ready && workerNode.usage.tasks.executing === 0
+ ) === -1
+ )
+ }
}
/** @inheritDoc */
- public async execute (data?: Data, name?: string): Promise<Response> {
+ public async execute (
+ data?: Data,
+ name?: string,
+ transferList?: TransferListItem[]
+ ): Promise<Response> {
return await new Promise<Response>((resolve, reject) => {
+ if (name != null && typeof name !== 'string') {
+ reject(new TypeError('name argument must be a string'))
+ }
+ if (transferList != null && !Array.isArray(transferList)) {
+ reject(new TypeError('transferList argument must be an array'))
+ }
const timestamp = performance.now()
const workerNodeKey = this.chooseWorkerNode()
const task: Task<Data> = {
name: name ?? DEFAULT_TASK_NAME,
// eslint-disable-next-line @typescript-eslint/consistent-type-assertions
data: data ?? ({} as Data),
+ transferList,
timestamp,
workerId: this.getWorkerInfo(workerNodeKey).id as number,
taskId: randomUUID()
*
* @param workerNodeKey - The worker node key.
* @param message - The message.
+ * @param transferList - The optional array of transferable objects.
*/
protected abstract sendToWorker (
workerNodeKey: number,
- message: MessageValue<Data>
+ message: MessageValue<Data>,
+ transferList?: TransferListItem[]
): void
/**
*/
private executeTask (workerNodeKey: number, task: Task<Data>): void {
this.beforeTaskExecutionHook(workerNodeKey, task)
- this.sendToWorker(workerNodeKey, task)
+ this.sendToWorker(workerNodeKey, task, task.transferList)
}
private enqueueTask (workerNodeKey: number, task: Task<Data>): number {