Merge branch 'master' of github.com:poolifier/poolifier
[poolifier.git] / src / pools / abstract-pool.ts
CommitLineData
2845f2a5 1import { randomUUID } from 'node:crypto'
62c15a68 2import { performance } from 'node:perf_hooks'
2740a743 3import type { MessageValue, PromiseResponseWrapper } from '../utility-types'
bbeadd16
JB
4import {
5 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
6 EMPTY_FUNCTION,
59317253 7 isKillBehavior,
0d80593b 8 isPlainObject,
afe0d5bf
JB
9 median,
10 round
bbeadd16 11} from '../utils'
59317253 12import { KillBehaviors } from '../worker/worker-options'
c4855468 13import {
65d7a1c9 14 type IPool,
7c5a1080 15 PoolEmitter,
c4855468 16 PoolEvents,
6b27d407 17 type PoolInfo,
c4855468 18 type PoolOptions,
6b27d407
JB
19 type PoolType,
20 PoolTypes,
4b628b48 21 type TasksQueueOptions
c4855468 22} from './pool'
e102732c
JB
23import type {
24 IWorker,
4b628b48 25 IWorkerNode,
e102732c
JB
26 MessageHandler,
27 Task,
8a1260a3 28 WorkerInfo,
4b628b48 29 WorkerType,
e102732c
JB
30 WorkerUsage
31} from './worker'
a35560ba 32import {
f0d7f803 33 Measurements,
a35560ba 34 WorkerChoiceStrategies,
a20f0ba5
JB
35 type WorkerChoiceStrategy,
36 type WorkerChoiceStrategyOptions
bdaf31cd
JB
37} from './selection-strategies/selection-strategies-types'
38import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
92b1feaa 39import { version } from './version'
4b628b48 40import { WorkerNode } from './worker-node'
23ccf9d7 41
729c563d 42/**
ea7a90d3 43 * Base class that implements some shared logic for all poolifier pools.
729c563d 44 *
38e795c1 45 * @typeParam Worker - Type of worker which manages this pool.
e102732c
JB
46 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
47 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
729c563d 48 */
c97c7edb 49export abstract class AbstractPool<
f06e48d8 50 Worker extends IWorker,
d3c8a1a8
S
51 Data = unknown,
52 Response = unknown
c4855468 53> implements IPool<Worker, Data, Response> {
afc003b2 54 /** @inheritDoc */
4b628b48 55 public readonly workerNodes: Array<IWorkerNode<Worker, Data>> = []
4a6952ff 56
afc003b2 57 /** @inheritDoc */
7c0ba920
JB
58 public readonly emitter?: PoolEmitter
59
be0676b3 60 /**
a3445496 61 * The execution response promise map.
be0676b3 62 *
2740a743 63 * - `key`: The message id of each submitted task.
a3445496 64 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
be0676b3 65 *
a3445496 66 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
be0676b3 67 */
c923ce56
JB
68 protected promiseResponseMap: Map<
69 string,
70 PromiseResponseWrapper<Worker, Response>
71 > = new Map<string, PromiseResponseWrapper<Worker, Response>>()
c97c7edb 72
a35560ba 73 /**
51fe3d3c 74 * Worker choice strategy context referencing a worker choice algorithm implementation.
a35560ba
S
75 */
76 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
78cea37e
JB
77 Worker,
78 Data,
79 Response
a35560ba
S
80 >
81
afe0d5bf
JB
82 /**
83 * The start timestamp of the pool.
84 */
85 private readonly startTimestamp
86
729c563d
S
87 /**
88 * Constructs a new poolifier pool.
89 *
38e795c1 90 * @param numberOfWorkers - Number of workers that this pool should manage.
029715f0 91 * @param filePath - Path to the worker file.
38e795c1 92 * @param opts - Options for the pool.
729c563d 93 */
c97c7edb 94 public constructor (
b4213b7f
JB
95 protected readonly numberOfWorkers: number,
96 protected readonly filePath: string,
97 protected readonly opts: PoolOptions<Worker>
c97c7edb 98 ) {
78cea37e 99 if (!this.isMain()) {
c97c7edb
S
100 throw new Error('Cannot start a pool from a worker!')
101 }
8d3782fa 102 this.checkNumberOfWorkers(this.numberOfWorkers)
c510fea7 103 this.checkFilePath(this.filePath)
7c0ba920 104 this.checkPoolOptions(this.opts)
1086026a 105
7254e419
JB
106 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
107 this.executeTask = this.executeTask.bind(this)
108 this.enqueueTask = this.enqueueTask.bind(this)
109 this.checkAndEmitEvents = this.checkAndEmitEvents.bind(this)
1086026a 110
6bd72cd0 111 if (this.opts.enableEvents === true) {
7c0ba920
JB
112 this.emitter = new PoolEmitter()
113 }
d59df138
JB
114 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
115 Worker,
116 Data,
117 Response
da309861
JB
118 >(
119 this,
120 this.opts.workerChoiceStrategy,
121 this.opts.workerChoiceStrategyOptions
122 )
b6b32453
JB
123
124 this.setupHook()
125
afe0d5bf 126 while (this.workerNodes.length < this.numberOfWorkers) {
b6b32453
JB
127 this.createAndSetupWorker()
128 }
afe0d5bf
JB
129
130 this.startTimestamp = performance.now()
c97c7edb
S
131 }
132
a35560ba 133 private checkFilePath (filePath: string): void {
ffcbbad8
JB
134 if (
135 filePath == null ||
136 (typeof filePath === 'string' && filePath.trim().length === 0)
137 ) {
c510fea7
APA
138 throw new Error('Please specify a file with a worker implementation')
139 }
140 }
141
8d3782fa
JB
142 private checkNumberOfWorkers (numberOfWorkers: number): void {
143 if (numberOfWorkers == null) {
144 throw new Error(
145 'Cannot instantiate a pool without specifying the number of workers'
146 )
78cea37e 147 } else if (!Number.isSafeInteger(numberOfWorkers)) {
473c717a 148 throw new TypeError(
0d80593b 149 'Cannot instantiate a pool with a non safe integer number of workers'
8d3782fa
JB
150 )
151 } else if (numberOfWorkers < 0) {
473c717a 152 throw new RangeError(
8d3782fa
JB
153 'Cannot instantiate a pool with a negative number of workers'
154 )
6b27d407 155 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
2431bdb4
JB
156 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
157 }
158 }
159
160 protected checkDynamicPoolSize (min: number, max: number): void {
079de991
JB
161 if (this.type === PoolTypes.dynamic) {
162 if (min > max) {
163 throw new RangeError(
164 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
165 )
166 } else if (min === 0 && max === 0) {
167 throw new RangeError(
168 'Cannot instantiate a dynamic pool with a minimum pool size and a maximum pool size equal to zero'
169 )
170 } else if (min === max) {
171 throw new RangeError(
172 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
173 )
174 }
8d3782fa
JB
175 }
176 }
177
7c0ba920 178 private checkPoolOptions (opts: PoolOptions<Worker>): void {
0d80593b
JB
179 if (isPlainObject(opts)) {
180 this.opts.workerChoiceStrategy =
181 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
182 this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
183 this.opts.workerChoiceStrategyOptions =
184 opts.workerChoiceStrategyOptions ??
185 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
49be33fe
JB
186 this.checkValidWorkerChoiceStrategyOptions(
187 this.opts.workerChoiceStrategyOptions
188 )
1f68cede 189 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
0d80593b
JB
190 this.opts.enableEvents = opts.enableEvents ?? true
191 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
192 if (this.opts.enableTasksQueue) {
193 this.checkValidTasksQueueOptions(
194 opts.tasksQueueOptions as TasksQueueOptions
195 )
196 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
197 opts.tasksQueueOptions as TasksQueueOptions
198 )
199 }
200 } else {
201 throw new TypeError('Invalid pool options: must be a plain object')
7171d33f 202 }
aee46736
JB
203 }
204
205 private checkValidWorkerChoiceStrategy (
206 workerChoiceStrategy: WorkerChoiceStrategy
207 ): void {
208 if (!Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)) {
b529c323 209 throw new Error(
aee46736 210 `Invalid worker choice strategy '${workerChoiceStrategy}'`
b529c323
JB
211 )
212 }
7c0ba920
JB
213 }
214
0d80593b
JB
215 private checkValidWorkerChoiceStrategyOptions (
216 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
217 ): void {
218 if (!isPlainObject(workerChoiceStrategyOptions)) {
219 throw new TypeError(
220 'Invalid worker choice strategy options: must be a plain object'
221 )
222 }
49be33fe
JB
223 if (
224 workerChoiceStrategyOptions.weights != null &&
6b27d407 225 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
49be33fe
JB
226 ) {
227 throw new Error(
228 'Invalid worker choice strategy options: must have a weight for each worker node'
229 )
230 }
f0d7f803
JB
231 if (
232 workerChoiceStrategyOptions.measurement != null &&
233 !Object.values(Measurements).includes(
234 workerChoiceStrategyOptions.measurement
235 )
236 ) {
237 throw new Error(
238 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
239 )
240 }
0d80593b
JB
241 }
242
a20f0ba5
JB
243 private checkValidTasksQueueOptions (
244 tasksQueueOptions: TasksQueueOptions
245 ): void {
0d80593b
JB
246 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
247 throw new TypeError('Invalid tasks queue options: must be a plain object')
248 }
f0d7f803
JB
249 if (
250 tasksQueueOptions?.concurrency != null &&
251 !Number.isSafeInteger(tasksQueueOptions.concurrency)
252 ) {
253 throw new TypeError(
254 'Invalid worker tasks concurrency: must be an integer'
255 )
256 }
257 if (
258 tasksQueueOptions?.concurrency != null &&
259 tasksQueueOptions.concurrency <= 0
260 ) {
a20f0ba5 261 throw new Error(
f0d7f803 262 `Invalid worker tasks concurrency '${tasksQueueOptions.concurrency}'`
a20f0ba5
JB
263 )
264 }
265 }
266
08f3f44c 267 /** @inheritDoc */
6b27d407
JB
268 public get info (): PoolInfo {
269 return {
23ccf9d7 270 version,
6b27d407 271 type: this.type,
184855e6 272 worker: this.worker,
2431bdb4
JB
273 ready: this.ready,
274 strategy: this.opts.workerChoiceStrategy as WorkerChoiceStrategy,
6b27d407
JB
275 minSize: this.minSize,
276 maxSize: this.maxSize,
c05f0d50
JB
277 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
278 .runTime.aggregate &&
1305e9a8
JB
279 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
280 .waitTime.aggregate && { utilization: round(this.utilization) }),
6b27d407
JB
281 workerNodes: this.workerNodes.length,
282 idleWorkerNodes: this.workerNodes.reduce(
283 (accumulator, workerNode) =>
f59e1027 284 workerNode.usage.tasks.executing === 0
a4e07f72
JB
285 ? accumulator + 1
286 : accumulator,
6b27d407
JB
287 0
288 ),
289 busyWorkerNodes: this.workerNodes.reduce(
290 (accumulator, workerNode) =>
f59e1027 291 workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator,
6b27d407
JB
292 0
293 ),
a4e07f72 294 executedTasks: this.workerNodes.reduce(
6b27d407 295 (accumulator, workerNode) =>
f59e1027 296 accumulator + workerNode.usage.tasks.executed,
a4e07f72
JB
297 0
298 ),
299 executingTasks: this.workerNodes.reduce(
300 (accumulator, workerNode) =>
f59e1027 301 accumulator + workerNode.usage.tasks.executing,
6b27d407
JB
302 0
303 ),
304 queuedTasks: this.workerNodes.reduce(
df593701 305 (accumulator, workerNode) =>
f59e1027 306 accumulator + workerNode.usage.tasks.queued,
6b27d407
JB
307 0
308 ),
309 maxQueuedTasks: this.workerNodes.reduce(
310 (accumulator, workerNode) =>
f59e1027 311 accumulator + workerNode.usage.tasks.maxQueued,
6b27d407 312 0
a4e07f72
JB
313 ),
314 failedTasks: this.workerNodes.reduce(
315 (accumulator, workerNode) =>
f59e1027 316 accumulator + workerNode.usage.tasks.failed,
a4e07f72 317 0
1dcf8b7b
JB
318 ),
319 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
320 .runTime.aggregate && {
321 runTime: {
98e72cda
JB
322 minimum: round(
323 Math.min(
324 ...this.workerNodes.map(
325 workerNode => workerNode.usage.runTime?.minimum ?? Infinity
326 )
1dcf8b7b
JB
327 )
328 ),
98e72cda
JB
329 maximum: round(
330 Math.max(
331 ...this.workerNodes.map(
332 workerNode => workerNode.usage.runTime?.maximum ?? -Infinity
333 )
1dcf8b7b 334 )
98e72cda
JB
335 ),
336 average: round(
337 this.workerNodes.reduce(
338 (accumulator, workerNode) =>
339 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
340 0
341 ) /
342 this.workerNodes.reduce(
343 (accumulator, workerNode) =>
344 accumulator + (workerNode.usage.tasks?.executed ?? 0),
345 0
346 )
347 ),
348 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
349 .runTime.median && {
350 median: round(
351 median(
352 this.workerNodes.map(
353 workerNode => workerNode.usage.runTime?.median ?? 0
354 )
355 )
356 )
357 })
1dcf8b7b
JB
358 }
359 }),
360 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
361 .waitTime.aggregate && {
362 waitTime: {
98e72cda
JB
363 minimum: round(
364 Math.min(
365 ...this.workerNodes.map(
366 workerNode => workerNode.usage.waitTime?.minimum ?? Infinity
367 )
1dcf8b7b
JB
368 )
369 ),
98e72cda
JB
370 maximum: round(
371 Math.max(
372 ...this.workerNodes.map(
373 workerNode => workerNode.usage.waitTime?.maximum ?? -Infinity
374 )
1dcf8b7b 375 )
98e72cda
JB
376 ),
377 average: round(
378 this.workerNodes.reduce(
379 (accumulator, workerNode) =>
380 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
381 0
382 ) /
383 this.workerNodes.reduce(
384 (accumulator, workerNode) =>
385 accumulator + (workerNode.usage.tasks?.executed ?? 0),
386 0
387 )
388 ),
389 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
390 .waitTime.median && {
391 median: round(
392 median(
393 this.workerNodes.map(
394 workerNode => workerNode.usage.waitTime?.median ?? 0
395 )
396 )
397 )
398 })
1dcf8b7b
JB
399 }
400 })
6b27d407
JB
401 }
402 }
08f3f44c 403
2431bdb4
JB
404 private get starting (): boolean {
405 return (
406 !this.full ||
407 (this.full && this.workerNodes.some(workerNode => !workerNode.info.ready))
408 )
409 }
410
411 private get ready (): boolean {
412 return (
413 this.full && this.workerNodes.every(workerNode => workerNode.info.ready)
414 )
415 }
416
afe0d5bf
JB
417 /**
418 * Gets the approximate pool utilization.
419 *
420 * @returns The pool utilization.
421 */
422 private get utilization (): number {
fe7d90db
JB
423 const poolRunTimeCapacity =
424 (performance.now() - this.startTimestamp) * this.maxSize
afe0d5bf
JB
425 const totalTasksRunTime = this.workerNodes.reduce(
426 (accumulator, workerNode) =>
71514351 427 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
afe0d5bf
JB
428 0
429 )
430 const totalTasksWaitTime = this.workerNodes.reduce(
431 (accumulator, workerNode) =>
71514351 432 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
afe0d5bf
JB
433 0
434 )
435 return (totalTasksRunTime + totalTasksWaitTime) / poolRunTimeCapacity
436 }
437
8881ae32
JB
438 /**
439 * Pool type.
440 *
441 * If it is `'dynamic'`, it provides the `max` property.
442 */
443 protected abstract get type (): PoolType
444
184855e6
JB
445 /**
446 * Gets the worker type.
447 */
448 protected abstract get worker (): WorkerType
449
c2ade475 450 /**
6b27d407 451 * Pool minimum size.
c2ade475 452 */
6b27d407 453 protected abstract get minSize (): number
ff733df7
JB
454
455 /**
6b27d407 456 * Pool maximum size.
ff733df7 457 */
6b27d407 458 protected abstract get maxSize (): number
a35560ba 459
f59e1027
JB
460 /**
461 * Get the worker given its id.
462 *
463 * @param workerId - The worker id.
464 * @returns The worker if found in the pool worker nodes, `undefined` otherwise.
465 */
466 private getWorkerById (workerId: number): Worker | undefined {
467 return this.workerNodes.find(workerNode => workerNode.info.id === workerId)
468 ?.worker
469 }
470
21f710aa
JB
471 private checkMessageWorkerId (message: MessageValue<Response>): void {
472 if (
473 message.workerId != null &&
474 this.getWorkerById(message.workerId) == null
475 ) {
476 throw new Error(
477 `Worker message received from unknown worker '${message.workerId}'`
478 )
479 }
480 }
481
ffcbbad8 482 /**
f06e48d8 483 * Gets the given worker its worker node key.
ffcbbad8
JB
484 *
485 * @param worker - The worker.
f59e1027 486 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
ffcbbad8 487 */
f06e48d8
JB
488 private getWorkerNodeKey (worker: Worker): number {
489 return this.workerNodes.findIndex(
490 workerNode => workerNode.worker === worker
491 )
bf9549ae
JB
492 }
493
afc003b2 494 /** @inheritDoc */
a35560ba 495 public setWorkerChoiceStrategy (
59219cbb
JB
496 workerChoiceStrategy: WorkerChoiceStrategy,
497 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
a35560ba 498 ): void {
aee46736 499 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
b98ec2e6 500 this.opts.workerChoiceStrategy = workerChoiceStrategy
b6b32453
JB
501 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
502 this.opts.workerChoiceStrategy
503 )
504 if (workerChoiceStrategyOptions != null) {
505 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
506 }
8a1260a3 507 for (const workerNode of this.workerNodes) {
4b628b48 508 workerNode.resetUsage()
b6b32453 509 this.setWorkerStatistics(workerNode.worker)
59219cbb 510 }
a20f0ba5
JB
511 }
512
513 /** @inheritDoc */
514 public setWorkerChoiceStrategyOptions (
515 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
516 ): void {
0d80593b 517 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
a20f0ba5
JB
518 this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions
519 this.workerChoiceStrategyContext.setOptions(
520 this.opts.workerChoiceStrategyOptions
a35560ba
S
521 )
522 }
523
a20f0ba5 524 /** @inheritDoc */
8f52842f
JB
525 public enableTasksQueue (
526 enable: boolean,
527 tasksQueueOptions?: TasksQueueOptions
528 ): void {
a20f0ba5 529 if (this.opts.enableTasksQueue === true && !enable) {
ef41a6e6 530 this.flushTasksQueues()
a20f0ba5
JB
531 }
532 this.opts.enableTasksQueue = enable
8f52842f 533 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
a20f0ba5
JB
534 }
535
536 /** @inheritDoc */
8f52842f 537 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
a20f0ba5 538 if (this.opts.enableTasksQueue === true) {
8f52842f
JB
539 this.checkValidTasksQueueOptions(tasksQueueOptions)
540 this.opts.tasksQueueOptions =
541 this.buildTasksQueueOptions(tasksQueueOptions)
5baee0d7 542 } else if (this.opts.tasksQueueOptions != null) {
a20f0ba5
JB
543 delete this.opts.tasksQueueOptions
544 }
545 }
546
547 private buildTasksQueueOptions (
548 tasksQueueOptions: TasksQueueOptions
549 ): TasksQueueOptions {
550 return {
551 concurrency: tasksQueueOptions?.concurrency ?? 1
552 }
553 }
554
c319c66b
JB
555 /**
556 * Whether the pool is full or not.
557 *
558 * The pool filling boolean status.
559 */
dea903a8
JB
560 protected get full (): boolean {
561 return this.workerNodes.length >= this.maxSize
562 }
c2ade475 563
c319c66b
JB
564 /**
565 * Whether the pool is busy or not.
566 *
567 * The pool busyness boolean status.
568 */
569 protected abstract get busy (): boolean
7c0ba920 570
6c6afb84
JB
571 /**
572 * Whether worker nodes are executing at least one task.
573 *
574 * @returns Worker nodes busyness boolean status.
575 */
c2ade475 576 protected internalBusy (): boolean {
e0ae6100
JB
577 return (
578 this.workerNodes.findIndex(workerNode => {
f59e1027 579 return workerNode.usage.tasks.executing === 0
e0ae6100
JB
580 }) === -1
581 )
cb70b19d
JB
582 }
583
afc003b2 584 /** @inheritDoc */
a86b6df1 585 public async execute (data?: Data, name?: string): Promise<Response> {
b6b32453 586 const timestamp = performance.now()
20dcad1a 587 const workerNodeKey = this.chooseWorkerNode()
adc3c320 588 const submittedTask: Task<Data> = {
a86b6df1 589 name,
e5a5c0fc
JB
590 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
591 data: data ?? ({} as Data),
b6b32453 592 timestamp,
21f710aa 593 workerId: this.getWorkerInfo(workerNodeKey).id as number,
2845f2a5 594 id: randomUUID()
adc3c320 595 }
2e81254d 596 const res = new Promise<Response>((resolve, reject) => {
02706357 597 this.promiseResponseMap.set(submittedTask.id as string, {
2e81254d
JB
598 resolve,
599 reject,
20dcad1a 600 worker: this.workerNodes[workerNodeKey].worker
2e81254d
JB
601 })
602 })
ff733df7
JB
603 if (
604 this.opts.enableTasksQueue === true &&
7171d33f 605 (this.busy ||
f59e1027 606 this.workerNodes[workerNodeKey].usage.tasks.executing >=
7171d33f 607 ((this.opts.tasksQueueOptions as TasksQueueOptions)
3528c992 608 .concurrency as number))
ff733df7 609 ) {
26a929d7
JB
610 this.enqueueTask(workerNodeKey, submittedTask)
611 } else {
2e81254d 612 this.executeTask(workerNodeKey, submittedTask)
adc3c320 613 }
ff733df7 614 this.checkAndEmitEvents()
78cea37e 615 // eslint-disable-next-line @typescript-eslint/return-await
280c2a77
S
616 return res
617 }
c97c7edb 618
afc003b2 619 /** @inheritDoc */
c97c7edb 620 public async destroy (): Promise<void> {
1fbcaa7c 621 await Promise.all(
875a7c37
JB
622 this.workerNodes.map(async (workerNode, workerNodeKey) => {
623 this.flushTasksQueue(workerNodeKey)
47aacbaa 624 // FIXME: wait for tasks to be finished
920278a2
JB
625 const workerExitPromise = new Promise<void>(resolve => {
626 workerNode.worker.on('exit', () => {
627 resolve()
628 })
629 })
f06e48d8 630 await this.destroyWorker(workerNode.worker)
920278a2 631 await workerExitPromise
1fbcaa7c
JB
632 })
633 )
c97c7edb
S
634 }
635
4a6952ff 636 /**
6c6afb84 637 * Terminates the given worker.
4a6952ff 638 *
f06e48d8 639 * @param worker - A worker within `workerNodes`.
4a6952ff
JB
640 */
641 protected abstract destroyWorker (worker: Worker): void | Promise<void>
c97c7edb 642
729c563d 643 /**
6677a3d3
JB
644 * Setup hook to execute code before worker nodes are created in the abstract constructor.
645 * Can be overridden.
afc003b2
JB
646 *
647 * @virtual
729c563d 648 */
280c2a77 649 protected setupHook (): void {
d99ba5a8 650 // Intentionally empty
280c2a77 651 }
c97c7edb 652
729c563d 653 /**
280c2a77
S
654 * Should return whether the worker is the main worker or not.
655 */
656 protected abstract isMain (): boolean
657
658 /**
2e81254d 659 * Hook executed before the worker task execution.
bf9549ae 660 * Can be overridden.
729c563d 661 *
f06e48d8 662 * @param workerNodeKey - The worker node key.
1c6fe997 663 * @param task - The task to execute.
729c563d 664 */
1c6fe997
JB
665 protected beforeTaskExecutionHook (
666 workerNodeKey: number,
667 task: Task<Data>
668 ): void {
f59e1027 669 const workerUsage = this.workerNodes[workerNodeKey].usage
1c6fe997
JB
670 ++workerUsage.tasks.executing
671 this.updateWaitTimeWorkerUsage(workerUsage, task)
c97c7edb
S
672 }
673
c01733f1 674 /**
2e81254d 675 * Hook executed after the worker task execution.
bf9549ae 676 * Can be overridden.
c01733f1 677 *
c923ce56 678 * @param worker - The worker.
38e795c1 679 * @param message - The received message.
c01733f1 680 */
2e81254d 681 protected afterTaskExecutionHook (
c923ce56 682 worker: Worker,
2740a743 683 message: MessageValue<Response>
bf9549ae 684 ): void {
f59e1027 685 const workerUsage = this.workerNodes[this.getWorkerNodeKey(worker)].usage
f1c06930
JB
686 this.updateTaskStatisticsWorkerUsage(workerUsage, message)
687 this.updateRunTimeWorkerUsage(workerUsage, message)
688 this.updateEluWorkerUsage(workerUsage, message)
689 }
690
691 private updateTaskStatisticsWorkerUsage (
692 workerUsage: WorkerUsage,
693 message: MessageValue<Response>
694 ): void {
a4e07f72
JB
695 const workerTaskStatistics = workerUsage.tasks
696 --workerTaskStatistics.executing
98e72cda
JB
697 if (message.taskError == null) {
698 ++workerTaskStatistics.executed
699 } else {
a4e07f72 700 ++workerTaskStatistics.failed
2740a743 701 }
f8eb0a2a
JB
702 }
703
a4e07f72
JB
704 private updateRunTimeWorkerUsage (
705 workerUsage: WorkerUsage,
f8eb0a2a
JB
706 message: MessageValue<Response>
707 ): void {
87de9ff5
JB
708 if (
709 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
932fc8be 710 .aggregate
87de9ff5 711 ) {
f7510105 712 const taskRunTime = message.taskPerformance?.runTime ?? 0
71514351
JB
713 workerUsage.runTime.aggregate =
714 (workerUsage.runTime.aggregate ?? 0) + taskRunTime
f7510105
JB
715 workerUsage.runTime.minimum = Math.min(
716 taskRunTime,
71514351 717 workerUsage.runTime?.minimum ?? Infinity
f7510105
JB
718 )
719 workerUsage.runTime.maximum = Math.max(
720 taskRunTime,
71514351 721 workerUsage.runTime?.maximum ?? -Infinity
f7510105 722 )
c6bd2650 723 if (
932fc8be
JB
724 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
725 .average &&
a4e07f72 726 workerUsage.tasks.executed !== 0
c6bd2650 727 ) {
a4e07f72 728 workerUsage.runTime.average =
98e72cda 729 workerUsage.runTime.aggregate / workerUsage.tasks.executed
3032893a 730 }
3fa4cdd2 731 if (
932fc8be
JB
732 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
733 .median &&
d715b7bc 734 message.taskPerformance?.runTime != null
3fa4cdd2 735 ) {
a4e07f72
JB
736 workerUsage.runTime.history.push(message.taskPerformance.runTime)
737 workerUsage.runTime.median = median(workerUsage.runTime.history)
78099a15 738 }
3032893a 739 }
f8eb0a2a
JB
740 }
741
a4e07f72
JB
742 private updateWaitTimeWorkerUsage (
743 workerUsage: WorkerUsage,
1c6fe997 744 task: Task<Data>
f8eb0a2a 745 ): void {
1c6fe997
JB
746 const timestamp = performance.now()
747 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
87de9ff5
JB
748 if (
749 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime
932fc8be 750 .aggregate
87de9ff5 751 ) {
71514351
JB
752 workerUsage.waitTime.aggregate =
753 (workerUsage.waitTime?.aggregate ?? 0) + taskWaitTime
f7510105
JB
754 workerUsage.waitTime.minimum = Math.min(
755 taskWaitTime,
71514351 756 workerUsage.waitTime?.minimum ?? Infinity
f7510105
JB
757 )
758 workerUsage.waitTime.maximum = Math.max(
759 taskWaitTime,
71514351 760 workerUsage.waitTime?.maximum ?? -Infinity
f7510105 761 )
09a6305f 762 if (
87de9ff5 763 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
932fc8be 764 .waitTime.average &&
a4e07f72 765 workerUsage.tasks.executed !== 0
09a6305f 766 ) {
a4e07f72 767 workerUsage.waitTime.average =
98e72cda 768 workerUsage.waitTime.aggregate / workerUsage.tasks.executed
09a6305f
JB
769 }
770 if (
87de9ff5 771 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
71514351 772 .waitTime.median
09a6305f 773 ) {
1c6fe997 774 workerUsage.waitTime.history.push(taskWaitTime)
a4e07f72 775 workerUsage.waitTime.median = median(workerUsage.waitTime.history)
09a6305f 776 }
0567595a 777 }
c01733f1 778 }
779
a4e07f72 780 private updateEluWorkerUsage (
5df69fab 781 workerUsage: WorkerUsage,
62c15a68
JB
782 message: MessageValue<Response>
783 ): void {
5df69fab
JB
784 if (
785 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
786 .aggregate
787 ) {
f7510105 788 if (message.taskPerformance?.elu != null) {
71514351
JB
789 workerUsage.elu.idle.aggregate =
790 (workerUsage.elu.idle?.aggregate ?? 0) +
791 message.taskPerformance.elu.idle
792 workerUsage.elu.active.aggregate =
793 (workerUsage.elu.active?.aggregate ?? 0) +
794 message.taskPerformance.elu.active
f7510105
JB
795 if (workerUsage.elu.utilization != null) {
796 workerUsage.elu.utilization =
797 (workerUsage.elu.utilization +
798 message.taskPerformance.elu.utilization) /
799 2
800 } else {
801 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
802 }
803 workerUsage.elu.idle.minimum = Math.min(
804 message.taskPerformance.elu.idle,
71514351 805 workerUsage.elu.idle?.minimum ?? Infinity
f7510105
JB
806 )
807 workerUsage.elu.idle.maximum = Math.max(
808 message.taskPerformance.elu.idle,
71514351 809 workerUsage.elu.idle?.maximum ?? -Infinity
f7510105
JB
810 )
811 workerUsage.elu.active.minimum = Math.min(
812 message.taskPerformance.elu.active,
71514351 813 workerUsage.elu.active?.minimum ?? Infinity
f7510105
JB
814 )
815 workerUsage.elu.active.maximum = Math.max(
816 message.taskPerformance.elu.active,
71514351 817 workerUsage.elu.active?.maximum ?? -Infinity
f7510105 818 )
71514351
JB
819 if (
820 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
821 .average &&
822 workerUsage.tasks.executed !== 0
823 ) {
71514351 824 workerUsage.elu.idle.average =
98e72cda 825 workerUsage.elu.idle.aggregate / workerUsage.tasks.executed
71514351 826 workerUsage.elu.active.average =
98e72cda 827 workerUsage.elu.active.aggregate / workerUsage.tasks.executed
71514351
JB
828 }
829 if (
830 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
61e4a75e 831 .median
71514351
JB
832 ) {
833 workerUsage.elu.idle.history.push(message.taskPerformance.elu.idle)
834 workerUsage.elu.active.history.push(
835 message.taskPerformance.elu.active
836 )
837 workerUsage.elu.idle.median = median(workerUsage.elu.idle.history)
838 workerUsage.elu.active.median = median(workerUsage.elu.active.history)
839 }
62c15a68
JB
840 }
841 }
842 }
843
280c2a77 844 /**
f06e48d8 845 * Chooses a worker node for the next task.
280c2a77 846 *
6c6afb84 847 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
280c2a77 848 *
20dcad1a 849 * @returns The worker node key
280c2a77 850 */
6c6afb84 851 private chooseWorkerNode (): number {
930dcf12 852 if (this.shallCreateDynamicWorker()) {
6c6afb84
JB
853 const worker = this.createAndSetupDynamicWorker()
854 if (
855 this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker
856 ) {
857 return this.getWorkerNodeKey(worker)
858 }
17393ac8 859 }
930dcf12
JB
860 return this.workerChoiceStrategyContext.execute()
861 }
862
6c6afb84
JB
863 /**
864 * Conditions for dynamic worker creation.
865 *
866 * @returns Whether to create a dynamic worker or not.
867 */
868 private shallCreateDynamicWorker (): boolean {
930dcf12 869 return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
c97c7edb
S
870 }
871
280c2a77 872 /**
675bb809 873 * Sends a message to the given worker.
280c2a77 874 *
38e795c1
JB
875 * @param worker - The worker which should receive the message.
876 * @param message - The message.
280c2a77
S
877 */
878 protected abstract sendToWorker (
879 worker: Worker,
880 message: MessageValue<Data>
881 ): void
882
4a6952ff 883 /**
f06e48d8 884 * Registers a listener callback on the given worker.
4a6952ff 885 *
38e795c1
JB
886 * @param worker - The worker which should register a listener.
887 * @param listener - The message listener callback.
4a6952ff 888 */
e102732c
JB
889 private registerWorkerMessageListener<Message extends Data | Response>(
890 worker: Worker,
891 listener: (message: MessageValue<Message>) => void
892 ): void {
893 worker.on('message', listener as MessageHandler<Worker>)
894 }
c97c7edb 895
729c563d 896 /**
41344292 897 * Creates a new worker.
6c6afb84
JB
898 *
899 * @returns Newly created worker.
729c563d 900 */
280c2a77 901 protected abstract createWorker (): Worker
c97c7edb 902
729c563d 903 /**
f06e48d8 904 * Function that can be hooked up when a worker has been newly created and moved to the pool worker nodes.
6677a3d3 905 * Can be overridden.
729c563d 906 *
38e795c1 907 * @param worker - The newly created worker.
729c563d 908 */
6677a3d3 909 protected afterWorkerSetup (worker: Worker): void {
e102732c
JB
910 // Listen to worker messages.
911 this.registerWorkerMessageListener(worker, this.workerListener())
2431bdb4
JB
912 // Send startup message to worker.
913 this.sendToWorker(worker, {
914 ready: false,
21f710aa 915 workerId: this.getWorkerInfo(this.getWorkerNodeKey(worker)).id as number
2431bdb4
JB
916 })
917 // Setup worker task statistics computation.
918 this.setWorkerStatistics(worker)
e102732c 919 }
c97c7edb 920
4a6952ff 921 /**
f06e48d8 922 * Creates a new worker and sets it up completely in the pool worker nodes.
4a6952ff
JB
923 *
924 * @returns New, completely set up worker.
925 */
926 protected createAndSetupWorker (): Worker {
bdacc2d2 927 const worker = this.createWorker()
280c2a77 928
35cf1c03 929 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
a35560ba 930 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
1f68cede
JB
931 worker.on('error', error => {
932 if (this.emitter != null) {
933 this.emitter.emit(PoolEvents.error, error)
934 }
5bc91f3e 935 if (this.opts.enableTasksQueue === true) {
7d2e3f59 936 this.redistributeQueuedTasks(worker)
5bc91f3e 937 }
2431bdb4 938 if (this.opts.restartWorkerOnError === true && !this.starting) {
8a1260a3
JB
939 if (this.getWorkerInfo(this.getWorkerNodeKey(worker)).dynamic) {
940 this.createAndSetupDynamicWorker()
941 } else {
942 this.createAndSetupWorker()
943 }
5baee0d7
JB
944 }
945 })
a35560ba
S
946 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
947 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
a974afa6 948 worker.once('exit', () => {
f06e48d8 949 this.removeWorkerNode(worker)
a974afa6 950 })
280c2a77 951
f06e48d8 952 this.pushWorkerNode(worker)
280c2a77
S
953
954 this.afterWorkerSetup(worker)
955
c97c7edb
S
956 return worker
957 }
be0676b3 958
7d2e3f59
JB
959 private redistributeQueuedTasks (worker: Worker): void {
960 const workerNodeKey = this.getWorkerNodeKey(worker)
961 while (this.tasksQueueSize(workerNodeKey) > 0) {
962 let targetWorkerNodeKey: number = workerNodeKey
963 let minQueuedTasks = Infinity
964 for (const [workerNodeId, workerNode] of this.workerNodes.entries()) {
965 if (
966 workerNodeId !== workerNodeKey &&
967 workerNode.usage.tasks.queued === 0
968 ) {
969 targetWorkerNodeKey = workerNodeId
970 break
971 }
972 if (
973 workerNodeId !== workerNodeKey &&
974 workerNode.usage.tasks.queued < minQueuedTasks
975 ) {
976 minQueuedTasks = workerNode.usage.tasks.queued
977 targetWorkerNodeKey = workerNodeId
978 }
979 }
980 this.enqueueTask(
981 targetWorkerNodeKey,
982 this.dequeueTask(workerNodeKey) as Task<Data>
983 )
984 }
985 }
986
930dcf12
JB
987 /**
988 * Creates a new dynamic worker and sets it up completely in the pool worker nodes.
989 *
990 * @returns New, completely set up dynamic worker.
991 */
992 protected createAndSetupDynamicWorker (): Worker {
993 const worker = this.createAndSetupWorker()
994 this.registerWorkerMessageListener(worker, message => {
e8b3a5ab 995 const workerNodeKey = this.getWorkerNodeKey(worker)
930dcf12
JB
996 if (
997 isKillBehavior(KillBehaviors.HARD, message.kill) ||
7b56f532
JB
998 (message.kill != null &&
999 ((this.opts.enableTasksQueue === false &&
f59e1027 1000 this.workerNodes[workerNodeKey].usage.tasks.executing === 0) ||
7b56f532 1001 (this.opts.enableTasksQueue === true &&
f59e1027 1002 this.workerNodes[workerNodeKey].usage.tasks.executing === 0 &&
e8b3a5ab 1003 this.tasksQueueSize(workerNodeKey) === 0)))
930dcf12
JB
1004 ) {
1005 // Kill message received from the worker: no new tasks are submitted to that worker for a while ( > maxInactiveTime)
930dcf12
JB
1006 void (this.destroyWorker(worker) as Promise<void>)
1007 }
1008 })
21f710aa
JB
1009 const workerInfo = this.getWorkerInfo(this.getWorkerNodeKey(worker))
1010 workerInfo.dynamic = true
1011 this.sendToWorker(worker, {
1012 checkAlive: true,
1013 workerId: workerInfo.id as number
1014 })
930dcf12
JB
1015 return worker
1016 }
1017
be0676b3 1018 /**
ff733df7 1019 * This function is the listener registered for each worker message.
be0676b3 1020 *
bdacc2d2 1021 * @returns The listener function to execute when a message is received from a worker.
be0676b3
APA
1022 */
1023 protected workerListener (): (message: MessageValue<Response>) => void {
4a6952ff 1024 return message => {
21f710aa 1025 this.checkMessageWorkerId(message)
2431bdb4 1026 if (message.ready != null && message.workerId != null) {
7c8381eb
JB
1027 // Worker ready message received
1028 this.handleWorkerReadyMessage(message)
f59e1027 1029 } else if (message.id != null) {
a3445496 1030 // Task execution response received
6b272951
JB
1031 this.handleTaskExecutionResponse(message)
1032 }
1033 }
1034 }
1035
7c8381eb 1036 private handleWorkerReadyMessage (message: MessageValue<Response>): void {
21f710aa
JB
1037 const worker = this.getWorkerById(message.workerId)
1038 this.getWorkerInfo(this.getWorkerNodeKey(worker as Worker)).ready =
1039 message.ready as boolean
2431bdb4
JB
1040 if (this.emitter != null && this.ready) {
1041 this.emitter.emit(PoolEvents.ready, this.info)
1042 }
6b272951
JB
1043 }
1044
1045 private handleTaskExecutionResponse (message: MessageValue<Response>): void {
1046 const promiseResponse = this.promiseResponseMap.get(message.id as string)
1047 if (promiseResponse != null) {
1048 if (message.taskError != null) {
1049 if (this.emitter != null) {
1050 this.emitter.emit(PoolEvents.taskError, message.taskError)
be0676b3 1051 }
6b272951
JB
1052 promiseResponse.reject(message.taskError.message)
1053 } else {
1054 promiseResponse.resolve(message.data as Response)
1055 }
1056 this.afterTaskExecutionHook(promiseResponse.worker, message)
1057 this.promiseResponseMap.delete(message.id as string)
1058 const workerNodeKey = this.getWorkerNodeKey(promiseResponse.worker)
1059 if (
1060 this.opts.enableTasksQueue === true &&
1061 this.tasksQueueSize(workerNodeKey) > 0
1062 ) {
1063 this.executeTask(
1064 workerNodeKey,
1065 this.dequeueTask(workerNodeKey) as Task<Data>
1066 )
be0676b3 1067 }
6b272951 1068 this.workerChoiceStrategyContext.update(workerNodeKey)
be0676b3 1069 }
be0676b3 1070 }
7c0ba920 1071
ff733df7 1072 private checkAndEmitEvents (): void {
1f68cede 1073 if (this.emitter != null) {
ff733df7 1074 if (this.busy) {
2845f2a5 1075 this.emitter.emit(PoolEvents.busy, this.info)
ff733df7 1076 }
6b27d407 1077 if (this.type === PoolTypes.dynamic && this.full) {
2845f2a5 1078 this.emitter.emit(PoolEvents.full, this.info)
ff733df7 1079 }
164d950a
JB
1080 }
1081 }
1082
8a1260a3
JB
1083 /**
1084 * Gets the worker information.
1085 *
1086 * @param workerNodeKey - The worker node key.
1087 */
1088 private getWorkerInfo (workerNodeKey: number): WorkerInfo {
1089 return this.workerNodes[workerNodeKey].info
1090 }
1091
a05c10de 1092 /**
f06e48d8 1093 * Pushes the given worker in the pool worker nodes.
ea7a90d3 1094 *
38e795c1 1095 * @param worker - The worker.
f06e48d8 1096 * @returns The worker nodes length.
ea7a90d3 1097 */
f06e48d8 1098 private pushWorkerNode (worker: Worker): number {
4b628b48 1099 return this.workerNodes.push(new WorkerNode(worker, this.worker))
ea7a90d3 1100 }
c923ce56 1101
51fe3d3c 1102 /**
f06e48d8 1103 * Removes the given worker from the pool worker nodes.
51fe3d3c 1104 *
f06e48d8 1105 * @param worker - The worker.
51fe3d3c 1106 */
416fd65c 1107 private removeWorkerNode (worker: Worker): void {
f06e48d8 1108 const workerNodeKey = this.getWorkerNodeKey(worker)
1f68cede
JB
1109 if (workerNodeKey !== -1) {
1110 this.workerNodes.splice(workerNodeKey, 1)
1111 this.workerChoiceStrategyContext.remove(workerNodeKey)
1112 }
51fe3d3c 1113 }
adc3c320 1114
2e81254d 1115 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1c6fe997 1116 this.beforeTaskExecutionHook(workerNodeKey, task)
2e81254d
JB
1117 this.sendToWorker(this.workerNodes[workerNodeKey].worker, task)
1118 }
1119
f9f00b5f 1120 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
4b628b48 1121 return this.workerNodes[workerNodeKey].enqueueTask(task)
adc3c320
JB
1122 }
1123
416fd65c 1124 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
4b628b48 1125 return this.workerNodes[workerNodeKey].dequeueTask()
adc3c320
JB
1126 }
1127
416fd65c 1128 private tasksQueueSize (workerNodeKey: number): number {
4b628b48 1129 return this.workerNodes[workerNodeKey].tasksQueueSize()
df593701
JB
1130 }
1131
416fd65c 1132 private flushTasksQueue (workerNodeKey: number): void {
920278a2
JB
1133 while (this.tasksQueueSize(workerNodeKey) > 0) {
1134 this.executeTask(
1135 workerNodeKey,
1136 this.dequeueTask(workerNodeKey) as Task<Data>
1137 )
ff733df7 1138 }
4b628b48 1139 this.workerNodes[workerNodeKey].clearTasksQueue()
ff733df7
JB
1140 }
1141
ef41a6e6
JB
1142 private flushTasksQueues (): void {
1143 for (const [workerNodeKey] of this.workerNodes.entries()) {
1144 this.flushTasksQueue(workerNodeKey)
1145 }
1146 }
b6b32453
JB
1147
1148 private setWorkerStatistics (worker: Worker): void {
1149 this.sendToWorker(worker, {
1150 statistics: {
87de9ff5
JB
1151 runTime:
1152 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
932fc8be 1153 .runTime.aggregate,
87de9ff5 1154 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
5df69fab 1155 .elu.aggregate
21f710aa
JB
1156 },
1157 workerId: this.getWorkerInfo(this.getWorkerNodeKey(worker)).id as number
b6b32453
JB
1158 })
1159 }
c97c7edb 1160}