Commit | Line | Data |
---|---|---|
f98fbdb9 JB |
1 | import { Worker, WorkerOptions } from 'worker_threads'; |
2 | ||
6af9012e | 3 | import Configuration from '../utils/Configuration'; |
5fdab605 | 4 | import Constants from '../utils/Constants'; |
3f40bc9c | 5 | import Pool from 'worker-threads-pool'; |
ad3de6c4 | 6 | import WorkerData from '../types/WorkerData'; |
7dde0b73 | 7 | |
3f40bc9c | 8 | export default class Wrk { |
ad3de6c4 JB |
9 | private _workerScript: string; |
10 | private _workerData: WorkerData; | |
f98fbdb9 | 11 | private _index: number; |
97ed5cec | 12 | private _maxWorkerElements: number; |
3d2ff9e4 | 13 | private _worker: Worker; |
6af9012e | 14 | |
7dde0b73 JB |
15 | /** |
16 | * Create a new `Wrk`. | |
17 | * | |
ad3de6c4 JB |
18 | * @param {string} workerScript |
19 | * @param {WorkerData} workerData | |
97ed5cec | 20 | * @param {number} maxWorkerElements |
7dde0b73 | 21 | */ |
97ed5cec | 22 | constructor(workerScript: string, workerData: WorkerData, maxWorkerElements = 1) { |
6798437b | 23 | this._workerData = workerData; |
f98fbdb9 | 24 | this._index = workerData.index; |
6798437b | 25 | this._workerScript = workerScript; |
7dde0b73 | 26 | if (Configuration.useWorkerPool()) { |
97ed5cec | 27 | WorkerPool.maxConcurrentWorkers = Configuration.getWorkerPoolSize(); |
7dde0b73 | 28 | } |
97ed5cec | 29 | this._maxWorkerElements = maxWorkerElements; |
6798437b JB |
30 | } |
31 | ||
32 | /** | |
ad3de6c4 JB |
33 | * @return {number} |
34 | * @public | |
6798437b | 35 | */ |
97ed5cec JB |
36 | public get maxWorkerElements(): number { |
37 | return this._maxWorkerElements; | |
7dde0b73 JB |
38 | } |
39 | ||
6af9012e JB |
40 | /** |
41 | * | |
42 | * @return {Promise} | |
43 | * @public | |
44 | */ | |
3d2ff9e4 | 45 | async start(): Promise<Worker> { |
6af9012e | 46 | if (Configuration.useWorkerPool()) { |
5fdab605 | 47 | await this._startWorkerWithPool(); |
3d2ff9e4 | 48 | } else { |
5fdab605 | 49 | await this._startWorker(); |
6af9012e | 50 | } |
3d2ff9e4 J |
51 | return this._worker; |
52 | } | |
53 | ||
5fdab605 | 54 | /** |
3d2ff9e4 | 55 | * |
5fdab605 | 56 | * @return {void} |
3d2ff9e4 J |
57 | * @public |
58 | */ | |
97ed5cec JB |
59 | addWorkerElement(workerData: WorkerData): void { |
60 | // FIXME: also forbid to add an element if the current number of elements > max number of elements | |
61 | if (Configuration.useWorkerPool()) { | |
62 | return; | |
63 | } | |
3d2ff9e4 J |
64 | this._workerData = workerData; |
65 | this._index = workerData.index; | |
97ed5cec | 66 | this._worker.postMessage({ id : Constants.START_WORKER_ELEMENT, workerData: workerData }); |
6af9012e JB |
67 | } |
68 | ||
7dde0b73 JB |
69 | /** |
70 | * | |
71 | * @return {Promise} | |
72 | * @private | |
73 | */ | |
6af9012e | 74 | private async _startWorkerWithPool() { |
7dde0b73 | 75 | return new Promise((resolve, reject) => { |
f98fbdb9 | 76 | WorkerPool.acquire(this._workerScript, { workerData: this._workerData }, (err, worker) => { |
7dde0b73 JB |
77 | if (err) { |
78 | return reject(err); | |
79 | } | |
80 | worker.once('message', resolve); | |
81 | worker.once('error', reject); | |
3d2ff9e4 | 82 | this._worker = worker; |
7dde0b73 JB |
83 | }); |
84 | }); | |
85 | } | |
86 | ||
87 | /** | |
88 | * | |
89 | * @return {Promise} | |
90 | * @private | |
91 | */ | |
6af9012e | 92 | private async _startWorker() { |
7dde0b73 | 93 | return new Promise((resolve, reject) => { |
6af9012e | 94 | const worker = new Worker(this._workerScript, { workerData: this._workerData }); |
7dde0b73 JB |
95 | worker.on('message', resolve); |
96 | worker.on('error', reject); | |
97 | worker.on('exit', (code) => { | |
98 | if (code !== 0) { | |
f98fbdb9 | 99 | reject(new Error(`Worker id ${this._index} stopped with exit code ${code}`)); |
7dde0b73 JB |
100 | } |
101 | }); | |
3d2ff9e4 | 102 | this._worker = worker; |
7dde0b73 JB |
103 | }); |
104 | } | |
7dde0b73 | 105 | } |
f98fbdb9 JB |
106 | |
107 | class WorkerPool { | |
97ed5cec | 108 | public static maxConcurrentWorkers: number; |
f98fbdb9 JB |
109 | private static _instance: Pool; |
110 | ||
111 | private constructor() { } | |
112 | ||
113 | public static getInstance(): Pool { | |
97ed5cec JB |
114 | if (!WorkerPool._instance) { |
115 | WorkerPool._instance = new Pool({ max: WorkerPool.maxConcurrentWorkers }); | |
f98fbdb9 JB |
116 | } |
117 | return WorkerPool._instance; | |
118 | } | |
119 | ||
120 | public static acquire(filename: string, options: WorkerOptions, callback: (error: Error | null, worker: Worker) => void): void { | |
121 | WorkerPool.getInstance().acquire(filename, options, callback); | |
122 | } | |
123 | } |