1 const { expect
} = require('expect')
2 const { FixedThreadPool
, PoolEvents
} = require('../../../lib')
3 const { TaskFunctions
} = require('../../test-types')
4 const { waitPoolEvents
, waitWorkerEvents
} = require('../../test-utils')
5 const { DEFAULT_TASK_NAME
} = require('../../../lib/utils')
7 describe('Fixed thread pool test suite', () => {
8 const numberOfThreads
= 6
9 const tasksConcurrency
= 2
10 const pool
= new FixedThreadPool(
12 './tests/worker-files/thread/testWorker.js',
14 errorHandler
: e
=> console
.error(e
)
17 const queuePool
= new FixedThreadPool(
19 './tests/worker-files/thread/testWorker.js',
21 enableTasksQueue
: true,
23 concurrency
: tasksConcurrency
25 errorHandler
: e
=> console
.error(e
)
28 const emptyPool
= new FixedThreadPool(
30 './tests/worker-files/thread/emptyWorker.js',
31 { exitHandler
: () => console
.info('empty pool worker exited') }
33 const echoPool
= new FixedThreadPool(
35 './tests/worker-files/thread/echoWorker.js'
37 const errorPool
= new FixedThreadPool(
39 './tests/worker-files/thread/errorWorker.js',
41 errorHandler
: e
=> console
.error(e
)
44 const asyncErrorPool
= new FixedThreadPool(
46 './tests/worker-files/thread/asyncErrorWorker.js',
48 errorHandler
: e
=> console
.error(e
)
51 const asyncPool
= new FixedThreadPool(
53 './tests/worker-files/thread/asyncWorker.js'
56 after('Destroy all pools', async () => {
57 // We need to clean up the resources after our test
58 await echoPool
.destroy()
59 await asyncPool
.destroy()
60 await errorPool
.destroy()
61 await asyncErrorPool
.destroy()
62 await emptyPool
.destroy()
63 await queuePool
.destroy()
66 it('Verify that the function is executed in a worker thread', async () => {
67 let result
= await pool
.execute({
68 function: TaskFunctions
.fibonacci
70 expect(result
).toBe(75025)
71 result
= await pool
.execute({
72 function: TaskFunctions
.factorial
74 expect(result
).toBe(9.33262154439441e157
)
77 it('Verify that is possible to invoke the execute() method without input', async () => {
78 const result
= await pool
.execute()
79 expect(result
).toStrictEqual({ ok
: 1 })
82 it("Verify that 'ready' event is emitted", async () => {
83 const pool
= new FixedThreadPool(
85 './tests/worker-files/thread/testWorker.js',
87 errorHandler
: e
=> console
.error(e
)
91 pool
.emitter
.on(PoolEvents
.ready
, () => ++poolReady
)
92 await
waitPoolEvents(pool
, PoolEvents
.ready
, 1)
93 expect(poolReady
).toBe(1)
97 it("Verify that 'busy' event is emitted", async () => {
98 const promises
= new Set()
100 pool
.emitter
.on(PoolEvents
.busy
, () => ++poolBusy
)
101 for (let i
= 0; i
< numberOfThreads
* 2; i
++) {
102 promises
.add(pool
.execute())
104 await Promise
.all(promises
)
105 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
106 // So in total numberOfThreads + 1 times for a loop submitting up to numberOfThreads * 2 tasks to the fixed pool.
107 expect(poolBusy
).toBe(numberOfThreads
+ 1)
110 it('Verify that tasks queuing is working', async () => {
111 const promises
= new Set()
112 const maxMultiplier
= 3 // Must be greater than tasksConcurrency
113 for (let i
= 0; i
< numberOfThreads
* maxMultiplier
; i
++) {
114 promises
.add(queuePool
.execute())
116 expect(promises
.size
).toBe(numberOfThreads
* maxMultiplier
)
117 for (const workerNode
of queuePool
.workerNodes
) {
118 expect(workerNode
.usage
.tasks
.executing
).toBeGreaterThanOrEqual(0)
119 expect(workerNode
.usage
.tasks
.executing
).toBeLessThanOrEqual(
120 queuePool
.opts
.tasksQueueOptions
.concurrency
122 expect(workerNode
.usage
.tasks
.executed
).toBe(0)
123 expect(workerNode
.usage
.tasks
.queued
).toBe(
124 maxMultiplier
- queuePool
.opts
.tasksQueueOptions
.concurrency
126 expect(workerNode
.usage
.tasks
.maxQueued
).toBe(
127 maxMultiplier
- queuePool
.opts
.tasksQueueOptions
.concurrency
129 expect(workerNode
.usage
.tasks
.stolen
).toBe(0)
131 expect(queuePool
.info
.executedTasks
).toBe(0)
132 expect(queuePool
.info
.executingTasks
).toBe(
133 numberOfThreads
* queuePool
.opts
.tasksQueueOptions
.concurrency
135 expect(queuePool
.info
.queuedTasks
).toBe(
137 (maxMultiplier
- queuePool
.opts
.tasksQueueOptions
.concurrency
)
139 expect(queuePool
.info
.maxQueuedTasks
).toBe(
141 (maxMultiplier
- queuePool
.opts
.tasksQueueOptions
.concurrency
)
143 expect(queuePool
.info
.backPressure
).toBe(false)
144 expect(queuePool
.info
.stolenTasks
).toBe(0)
145 await Promise
.all(promises
)
146 for (const workerNode
of queuePool
.workerNodes
) {
147 expect(workerNode
.usage
.tasks
.executing
).toBeGreaterThanOrEqual(0)
148 expect(workerNode
.usage
.tasks
.executing
).toBeLessThanOrEqual(
149 numberOfThreads
* maxMultiplier
151 expect(workerNode
.usage
.tasks
.executed
).toBe(maxMultiplier
)
152 expect(workerNode
.usage
.tasks
.queued
).toBe(0)
153 expect(workerNode
.usage
.tasks
.maxQueued
).toBe(
154 maxMultiplier
- queuePool
.opts
.tasksQueueOptions
.concurrency
156 expect(workerNode
.usage
.tasks
.stolen
).toBeGreaterThanOrEqual(0)
157 expect(workerNode
.usage
.tasks
.stolen
).toBeLessThanOrEqual(
158 numberOfThreads
* maxMultiplier
161 expect(queuePool
.info
.executedTasks
).toBe(numberOfThreads
* maxMultiplier
)
162 expect(queuePool
.info
.backPressure
).toBe(false)
163 expect(queuePool
.info
.stolenTasks
).toBeGreaterThanOrEqual(0)
164 expect(queuePool
.info
.stolenTasks
).toBeLessThanOrEqual(
165 numberOfThreads
* maxMultiplier
169 it('Verify that is possible to have a worker that return undefined', async () => {
170 const result
= await emptyPool
.execute()
171 expect(result
).toBeUndefined()
174 it('Verify that data are sent to the worker correctly', async () => {
175 const data
= { f
: 10 }
176 const result
= await echoPool
.execute(data
)
177 expect(result
).toStrictEqual(data
)
180 it('Verify that transferable objects are sent to the worker correctly', async () => {
184 result
= await pool
.execute(undefined, undefined, [
186 new MessageChannel().port1
191 expect(result
).toStrictEqual({ ok
: 1 })
192 expect(error
).toBeUndefined()
194 result
= await pool
.execute(undefined, undefined, [
195 new SharedArrayBuffer(16)
200 expect(result
).toStrictEqual({ ok
: 1 })
201 expect(error
).toStrictEqual(
202 new TypeError('Found invalid object in transferList')
206 it('Verify that error handling is working properly:sync', async () => {
207 const data
= { f
: 10 }
209 errorPool
.emitter
.on(PoolEvents
.taskError
, e
=> {
214 await errorPool
.execute(data
)
218 expect(inError
).toBeDefined()
219 expect(inError
).toBeInstanceOf(Error
)
220 expect(inError
.message
).toBeDefined()
221 expect(typeof inError
.message
=== 'string').toBe(true)
222 expect(inError
.message
).toBe('Error Message from ThreadWorker')
223 expect(taskError
).toStrictEqual({
224 name
: DEFAULT_TASK_NAME
,
225 message
: new Error('Error Message from ThreadWorker'),
229 errorPool
.workerNodes
.some(
230 workerNode
=> workerNode
.usage
.tasks
.failed
=== 1
235 it('Verify that error handling is working properly:async', async () => {
236 const data
= { f
: 10 }
238 asyncErrorPool
.emitter
.on(PoolEvents
.taskError
, e
=> {
243 await asyncErrorPool
.execute(data
)
247 expect(inError
).toBeDefined()
248 expect(inError
).toBeInstanceOf(Error
)
249 expect(inError
.message
).toBeDefined()
250 expect(typeof inError
.message
=== 'string').toBe(true)
251 expect(inError
.message
).toBe('Error Message from ThreadWorker:async')
252 expect(taskError
).toStrictEqual({
253 name
: DEFAULT_TASK_NAME
,
254 message
: new Error('Error Message from ThreadWorker:async'),
258 asyncErrorPool
.workerNodes
.some(
259 workerNode
=> workerNode
.usage
.tasks
.failed
=== 1
264 it('Verify that async function is working properly', async () => {
265 const data
= { f
: 10 }
266 const startTime
= performance
.now()
267 const result
= await asyncPool
.execute(data
)
268 const usedTime
= performance
.now() - startTime
269 expect(result
).toStrictEqual(data
)
270 expect(usedTime
).toBeGreaterThanOrEqual(2000)
273 it('Shutdown test', async () => {
274 const exitPromise
= waitWorkerEvents(pool
, 'exit', numberOfThreads
)
276 pool
.emitter
.on(PoolEvents
.destroy
, () => ++poolDestroy
)
278 const numberOfExitEvents
= await exitPromise
279 expect(pool
.started
).toBe(false)
280 expect(pool
.workerNodes
.length
).toBe(0)
281 expect(numberOfExitEvents
).toBe(numberOfThreads
)
282 expect(poolDestroy
).toBe(1)
285 it('Verify that thread pool options are checked', async () => {
286 const workerFilePath
= './tests/worker-files/thread/testWorker.js'
287 let pool
= new FixedThreadPool(numberOfThreads
, workerFilePath
)
288 expect(pool
.opts
.workerOptions
).toBeUndefined()
290 pool
= new FixedThreadPool(numberOfThreads
, workerFilePath
, {
292 env
: { TEST
: 'test' },
296 expect(pool
.opts
.workerOptions
).toStrictEqual({
297 env
: { TEST
: 'test' },
303 it('Should work even without opts in input', async () => {
304 const workerFilePath
= './tests/worker-files/thread/testWorker.js'
305 const pool
= new FixedThreadPool(numberOfThreads
, workerFilePath
)
306 const res
= await pool
.execute()
307 expect(res
).toStrictEqual({ ok
: 1 })
308 // We need to clean up the resources after our test
312 it('Verify destroyWorkerNode()', async () => {
313 const workerFilePath
= './tests/worker-files/thread/testWorker.js'
314 const pool
= new FixedThreadPool(numberOfThreads
, workerFilePath
)
315 const workerNodeKey
= 0
317 pool
.workerNodes
[workerNodeKey
].worker
.on('exit', () => {
320 await
expect(pool
.destroyWorkerNode(workerNodeKey
)).resolves
.toBeUndefined()
321 expect(exitEvent
).toBe(1)
322 expect(pool
.workerNodes
.length
).toBe(numberOfThreads
- 1)
326 it('Verify that a pool with zero worker fails', async () => {
328 () => new FixedThreadPool(0, './tests/worker-files/thread/testWorker.js')
329 ).toThrowError('Cannot instantiate a fixed pool with zero worker')