repositories
/
poolifier.git
/ blobdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
tree
raw
|
inline
| side by side
test: refine test naming
[poolifier.git]
/
src
/
pools
/
worker-node.ts
diff --git
a/src/pools/worker-node.ts
b/src/pools/worker-node.ts
index c1c1e0a99b2bc3200e58e85d7c8d1b29aacaa893..d29a6e323f35208e22b6f87ca4610fb0af1b3e34 100644
(file)
--- a/
src/pools/worker-node.ts
+++ b/
src/pools/worker-node.ts
@@
-1,7
+1,7
@@
import { EventEmitter } from 'node:events'
import { MessageChannel } from 'node:worker_threads'
import { EventEmitter } from 'node:events'
import { MessageChannel } from 'node:worker_threads'
-import { Circular
Array } from '../circular-array
.js'
+import { Circular
Buffer } from '../circular-buffer
.js'
import { PriorityQueue } from '../priority-queue.js'
import type { Task } from '../utility-types.js'
import { DEFAULT_TASK_NAME } from '../utils.js'
import { PriorityQueue } from '../priority-queue.js'
import type { Task } from '../utility-types.js'
import { DEFAULT_TASK_NAME } from '../utils.js'
@@
-9,23
+9,23
@@
import {
checkWorkerNodeArguments,
createWorker,
getWorkerId,
checkWorkerNodeArguments,
createWorker,
getWorkerId,
- getWorkerType
+ getWorkerType
,
} from './utils.js'
import {
type EventHandler,
type IWorker,
type IWorkerNode,
} from './utils.js'
import {
type EventHandler,
type IWorker,
type IWorkerNode,
+ MeasurementHistorySize,
type StrategyData,
type WorkerInfo,
type WorkerNodeOptions,
type WorkerType,
WorkerTypes,
type StrategyData,
type WorkerInfo,
type WorkerNodeOptions,
type WorkerType,
WorkerTypes,
- type WorkerUsage
+ type WorkerUsage
,
} from './worker.js'
/**
* Worker node.
} from './worker.js'
/**
* Worker node.
- *
* @typeParam Worker - Type of worker.
* @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
*/
* @typeParam Worker - Type of worker.
* @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
*/
@@
-50,7
+50,6
@@
export class WorkerNode<Worker extends IWorker, Data = unknown>
/**
* Constructs a new worker node.
/**
* Constructs a new worker node.
- *
* @param type - The worker type.
* @param filePath - Path to the worker file.
* @param opts - The worker node options.
* @param type - The worker type.
* @param filePath - Path to the worker file.
* @param opts - The worker node options.
@@
-60,7
+59,7
@@
export class WorkerNode<Worker extends IWorker, Data = unknown>
checkWorkerNodeArguments(type, filePath, opts)
this.worker = createWorker<Worker>(type, filePath, {
env: opts.env,
checkWorkerNodeArguments(type, filePath, opts)
this.worker = createWorker<Worker>(type, filePath, {
env: opts.env,
- workerOptions: opts.workerOptions
+ workerOptions: opts.workerOptions
,
})
this.info = this.initWorkerInfo(this.worker)
this.usage = this.initWorkerUsage()
})
this.info = this.initWorkerInfo(this.worker)
this.usage = this.initWorkerUsage()
@@
-69,11
+68,19
@@
export class WorkerNode<Worker extends IWorker, Data = unknown>
}
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
this.tasksQueueBackPressureSize = opts.tasksQueueBackPressureSize!
}
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
this.tasksQueueBackPressureSize = opts.tasksQueueBackPressureSize!
- this.tasksQueue = new PriorityQueue<Task<Data>>(opts.tasksQueueBucketSize)
+ this.tasksQueue = new PriorityQueue<Task<Data>>(
+ opts.tasksQueueBucketSize,
+ opts.tasksQueuePriority
+ )
this.setBackPressureFlag = false
this.taskFunctionsUsage = new Map<string, WorkerUsage>()
}
this.setBackPressureFlag = false
this.taskFunctionsUsage = new Map<string, WorkerUsage>()
}
+ /** @inheritdoc */
+ public setTasksQueuePriority (enablePriority: boolean): void {
+ this.tasksQueue.enablePriority = enablePriority
+ }
+
/** @inheritdoc */
public tasksQueueSize (): number {
return this.tasksQueue.size
/** @inheritdoc */
public tasksQueueSize (): number {
return this.tasksQueue.size
@@
-213,7
+220,7
@@
export class WorkerNode<Worker extends IWorker, Data = unknown>
dynamic: false,
ready: false,
stealing: false,
dynamic: false,
ready: false,
stealing: false,
- backPressure: false
+ backPressure: false
,
}
}
}
}
@@
-236,22
+243,22
@@
export class WorkerNode<Worker extends IWorker, Data = unknown>
},
sequentiallyStolen: 0,
stolen: 0,
},
sequentiallyStolen: 0,
stolen: 0,
- failed: 0
+ failed: 0
,
},
runTime: {
},
runTime: {
- history: new Circular
Array<number>()
+ history: new Circular
Buffer(MeasurementHistorySize),
},
waitTime: {
},
waitTime: {
- history: new Circular
Array<number>()
+ history: new Circular
Buffer(MeasurementHistorySize),
},
elu: {
idle: {
},
elu: {
idle: {
- history: new Circular
Array<number>()
+ history: new Circular
Buffer(MeasurementHistorySize),
},
active: {
},
active: {
- history: new Circular
Array<number>()
- }
- }
+ history: new Circular
Buffer(MeasurementHistorySize),
+ }
,
+ }
,
}
}
}
}
@@
-279,22
+286,22
@@
export class WorkerNode<Worker extends IWorker, Data = unknown>
},
sequentiallyStolen: 0,
stolen: 0,
},
sequentiallyStolen: 0,
stolen: 0,
- failed: 0
+ failed: 0
,
},
runTime: {
},
runTime: {
- history: new Circular
Array<number>()
+ history: new Circular
Buffer(MeasurementHistorySize),
},
waitTime: {
},
waitTime: {
- history: new Circular
Array<number>()
+ history: new Circular
Buffer(MeasurementHistorySize),
},
elu: {
idle: {
},
elu: {
idle: {
- history: new Circular
Array<number>()
+ history: new Circular
Buffer(MeasurementHistorySize),
},
active: {
},
active: {
- history: new Circular
Array<number>()
- }
- }
+ history: new Circular
Buffer(MeasurementHistorySize),
+ }
,
+ }
,
}
}
}
}
}
}