repositories
/
poolifier.git
/ blobdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
tree
raw
|
inline
| side by side
docs: refine task stealing documentation
[poolifier.git]
/
src
/
pools
/
abstract-pool.ts
diff --git
a/src/pools/abstract-pool.ts
b/src/pools/abstract-pool.ts
index 4fe0c0388d898adae0222e281e4fb70eb66a4c82..c56beaa6d801843c043f091963cea385b72eccee 100644
(file)
--- a/
src/pools/abstract-pool.ts
+++ b/
src/pools/abstract-pool.ts
@@
-1,6
+1,7
@@
import { randomUUID } from 'node:crypto'
import { performance } from 'node:perf_hooks'
import { randomUUID } from 'node:crypto'
import { performance } from 'node:perf_hooks'
-import { type TransferListItem } from 'node:worker_threads'
+import type { TransferListItem } from 'node:worker_threads'
+import { type EventEmitter, EventEmitterAsyncResource } from 'node:events'
import type {
MessageValue,
PromiseResponseWrapper,
import type {
MessageValue,
PromiseResponseWrapper,
@@
-22,7
+23,6
@@
import { KillBehaviors } from '../worker/worker-options'
import type { TaskFunction } from '../worker/task-functions'
import {
type IPool,
import type { TaskFunction } from '../worker/task-functions'
import {
type IPool,
- PoolEmitter,
PoolEvents,
type PoolInfo,
type PoolOptions,
PoolEvents,
type PoolInfo,
type PoolOptions,
@@
-70,7
+70,7
@@
export abstract class AbstractPool<
public readonly workerNodes: Array<IWorkerNode<Worker, Data>> = []
/** @inheritDoc */
public readonly workerNodes: Array<IWorkerNode<Worker, Data>> = []
/** @inheritDoc */
- public
readonly emitter?: PoolEmitter
+ public
emitter?: EventEmitter | EventEmitterAsyncResource
/**
* The task execution response promise map:
/**
* The task execution response promise map:
@@
-133,8
+133,8
@@
export abstract class AbstractPool<
'Cannot start a pool from a worker with the same type as the pool'
)
}
'Cannot start a pool from a worker with the same type as the pool'
)
}
- this.checkNumberOfWorkers(this.numberOfWorkers)
checkFilePath(this.filePath)
checkFilePath(this.filePath)
+ this.checkNumberOfWorkers(this.numberOfWorkers)
this.checkPoolOptions(this.opts)
this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
this.checkPoolOptions(this.opts)
this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
@@
-142,7
+142,7
@@
export abstract class AbstractPool<
this.enqueueTask = this.enqueueTask.bind(this)
if (this.opts.enableEvents === true) {
this.enqueueTask = this.enqueueTask.bind(this)
if (this.opts.enableEvents === true) {
- this.
emitter = new Pool
Emitter()
+ this.
initializeEvent
Emitter()
}
this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
Worker,
}
this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
Worker,
@@
-261,6
+261,12
@@
export abstract class AbstractPool<
}
}
}
}
+ private initializeEventEmitter (): void {
+ this.emitter = new EventEmitterAsyncResource({
+ name: `poolifier:${this.type}-${this.worker}-pool`
+ })
+ }
+
/** @inheritDoc */
public get info (): PoolInfo {
return {
/** @inheritDoc */
public get info (): PoolInfo {
return {
@@
-938,6
+944,9
@@
export abstract class AbstractPool<
})
)
this.emitter?.emit(PoolEvents.destroy, this.info)
})
)
this.emitter?.emit(PoolEvents.destroy, this.info)
+ if (this.emitter instanceof EventEmitterAsyncResource) {
+ this.emitter?.emitDestroy()
+ }
this.started = false
}
this.started = false
}