1 const { expect
} = require('expect')
2 const { FixedThreadPool
, PoolEvents
} = require('../../../lib')
3 const { TaskFunctions
} = require('../../test-types')
4 const { waitPoolEvents
, waitWorkerEvents
} = require('../../test-utils')
6 describe('Fixed thread pool test suite', () => {
7 const numberOfThreads
= 6
8 const tasksConcurrency
= 2
9 const pool
= new FixedThreadPool(
11 './tests/worker-files/thread/testWorker.js',
13 errorHandler
: (e
) => console
.error(e
)
16 const queuePool
= new FixedThreadPool(
18 './tests/worker-files/thread/testWorker.js',
20 enableTasksQueue
: true,
22 concurrency
: tasksConcurrency
24 errorHandler
: (e
) => console
.error(e
)
27 const emptyPool
= new FixedThreadPool(
29 './tests/worker-files/thread/emptyWorker.js',
30 { exitHandler
: () => console
.info('empty pool worker exited') }
32 const echoPool
= new FixedThreadPool(
34 './tests/worker-files/thread/echoWorker.js'
36 const errorPool
= new FixedThreadPool(
38 './tests/worker-files/thread/errorWorker.js',
40 errorHandler
: (e
) => console
.error(e
)
43 const asyncErrorPool
= new FixedThreadPool(
45 './tests/worker-files/thread/asyncErrorWorker.js',
47 errorHandler
: (e
) => console
.error(e
)
50 const asyncPool
= new FixedThreadPool(
52 './tests/worker-files/thread/asyncWorker.js'
55 after('Destroy all pools', async () => {
56 // We need to clean up the resources after our test
57 await echoPool
.destroy()
58 await asyncPool
.destroy()
59 await errorPool
.destroy()
60 await asyncErrorPool
.destroy()
61 await emptyPool
.destroy()
62 await queuePool
.destroy()
65 it('Verify that the function is executed in a worker thread', async () => {
66 let result
= await pool
.execute({
67 function: TaskFunctions
.fibonacci
69 expect(result
).toBe(75025)
70 result
= await pool
.execute({
71 function: TaskFunctions
.factorial
73 expect(result
).toBe(9.33262154439441e157
)
76 it('Verify that is possible to invoke the execute() method without input', async () => {
77 const result
= await pool
.execute()
78 expect(result
).toStrictEqual({ ok
: 1 })
81 it("Verify that 'ready' event is emitted", async () => {
82 const pool1
= new FixedThreadPool(
84 './tests/worker-files/thread/testWorker.js',
86 errorHandler
: (e
) => console
.error(e
)
90 pool1
.emitter
.on(PoolEvents
.ready
, () => ++poolReady
)
91 await
waitPoolEvents(pool1
, PoolEvents
.ready
, 1)
92 expect(poolReady
).toBe(1)
95 it("Verify that 'busy' event is emitted", async () => {
96 const promises
= new Set()
98 pool
.emitter
.on(PoolEvents
.busy
, () => ++poolBusy
)
99 for (let i
= 0; i
< numberOfThreads
* 2; i
++) {
100 promises
.add(pool
.execute())
102 await Promise
.all(promises
)
103 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
104 // So in total numberOfThreads + 1 times for a loop submitting up to numberOfThreads * 2 tasks to the fixed pool.
105 expect(poolBusy
).toBe(numberOfThreads
+ 1)
108 it('Verify that tasks queuing is working', async () => {
109 const promises
= new Set()
110 const maxMultiplier
= 3 // Must be greater than tasksConcurrency
111 for (let i
= 0; i
< numberOfThreads
* maxMultiplier
; i
++) {
112 promises
.add(queuePool
.execute())
114 expect(promises
.size
).toBe(numberOfThreads
* maxMultiplier
)
115 for (const workerNode
of queuePool
.workerNodes
) {
116 expect(workerNode
.usage
.tasks
.executing
).toBeGreaterThanOrEqual(0)
117 expect(workerNode
.usage
.tasks
.executing
).toBeLessThanOrEqual(
118 queuePool
.opts
.tasksQueueOptions
.concurrency
120 expect(workerNode
.usage
.tasks
.executed
).toBe(0)
121 expect(workerNode
.usage
.tasks
.queued
).toBe(
122 maxMultiplier
- queuePool
.opts
.tasksQueueOptions
.concurrency
124 expect(workerNode
.usage
.tasks
.maxQueued
).toBe(
125 maxMultiplier
- queuePool
.opts
.tasksQueueOptions
.concurrency
128 expect(queuePool
.info
.executingTasks
).toBe(
129 numberOfThreads
* queuePool
.opts
.tasksQueueOptions
.concurrency
131 expect(queuePool
.info
.queuedTasks
).toBe(
133 (maxMultiplier
- queuePool
.opts
.tasksQueueOptions
.concurrency
)
135 expect(queuePool
.info
.maxQueuedTasks
).toBe(
137 (maxMultiplier
- queuePool
.opts
.tasksQueueOptions
.concurrency
)
139 expect(queuePool
.info
.backPressure
).toBe(false)
140 await Promise
.all(promises
)
141 for (const workerNode
of queuePool
.workerNodes
) {
142 expect(workerNode
.usage
.tasks
.executing
).toBeGreaterThanOrEqual(0)
143 expect(workerNode
.usage
.tasks
.executing
).toBeLessThanOrEqual(
144 numberOfThreads
* maxMultiplier
146 expect(workerNode
.usage
.tasks
.executed
).toBe(maxMultiplier
)
147 expect(workerNode
.usage
.tasks
.queued
).toBe(0)
148 expect(workerNode
.usage
.tasks
.maxQueued
).toBe(
149 maxMultiplier
- queuePool
.opts
.tasksQueueOptions
.concurrency
154 it('Verify that is possible to have a worker that return undefined', async () => {
155 const result
= await emptyPool
.execute()
156 expect(result
).toBeUndefined()
159 it('Verify that data are sent to the worker correctly', async () => {
160 const data
= { f
: 10 }
161 const result
= await echoPool
.execute(data
)
162 expect(result
).toStrictEqual(data
)
165 it('Verify that transferable objects are sent to the worker correctly', async () => {
169 result
= await pool
.execute(undefined, undefined, [
171 new MessageChannel().port1
176 expect(result
).toStrictEqual({ ok
: 1 })
177 expect(error
).toBeUndefined()
179 result
= await pool
.execute(undefined, undefined, [
180 new SharedArrayBuffer(16)
185 expect(result
).toStrictEqual({ ok
: 1 })
186 expect(error
).toStrictEqual(
187 new TypeError('Found invalid object in transferList')
191 it('Verify that error handling is working properly:sync', async () => {
192 const data
= { f
: 10 }
194 errorPool
.emitter
.on(PoolEvents
.taskError
, (e
) => {
199 await errorPool
.execute(data
)
203 expect(inError
).toBeDefined()
204 expect(inError
).toBeInstanceOf(Error
)
205 expect(inError
.message
).toBeDefined()
206 expect(typeof inError
.message
=== 'string').toBe(true)
207 expect(inError
.message
).toBe('Error Message from ThreadWorker')
208 expect(taskError
).toStrictEqual({
210 message
: new Error('Error Message from ThreadWorker'),
214 errorPool
.workerNodes
.some(
215 (workerNode
) => workerNode
.usage
.tasks
.failed
=== 1
220 it('Verify that error handling is working properly:async', async () => {
221 const data
= { f
: 10 }
223 asyncErrorPool
.emitter
.on(PoolEvents
.taskError
, (e
) => {
228 await asyncErrorPool
.execute(data
)
232 expect(inError
).toBeDefined()
233 expect(inError
).toBeInstanceOf(Error
)
234 expect(inError
.message
).toBeDefined()
235 expect(typeof inError
.message
=== 'string').toBe(true)
236 expect(inError
.message
).toBe('Error Message from ThreadWorker:async')
237 expect(taskError
).toStrictEqual({
239 message
: new Error('Error Message from ThreadWorker:async'),
243 asyncErrorPool
.workerNodes
.some(
244 (workerNode
) => workerNode
.usage
.tasks
.failed
=== 1
249 it('Verify that async function is working properly', async () => {
250 const data
= { f
: 10 }
251 const startTime
= performance
.now()
252 const result
= await asyncPool
.execute(data
)
253 const usedTime
= performance
.now() - startTime
254 expect(result
).toStrictEqual(data
)
255 expect(usedTime
).toBeGreaterThanOrEqual(2000)
258 it('Shutdown test', async () => {
259 const exitPromise
= waitWorkerEvents(pool
, 'exit', numberOfThreads
)
261 pool
.emitter
.on(PoolEvents
.destroy
, () => ++poolDestroy
)
263 const numberOfExitEvents
= await exitPromise
264 expect(numberOfExitEvents
).toBe(numberOfThreads
)
265 expect(poolDestroy
).toBe(1)
268 it('Verify that thread pool options are checked', async () => {
269 const workerFilePath
= './tests/worker-files/thread/testWorker.js'
270 let pool1
= new FixedThreadPool(numberOfThreads
, workerFilePath
)
271 expect(pool1
.opts
.workerOptions
).toBeUndefined()
272 await pool1
.destroy()
273 pool1
= new FixedThreadPool(numberOfThreads
, workerFilePath
, {
275 env
: { TEST
: 'test' },
279 expect(pool1
.opts
.workerOptions
).toStrictEqual({
280 env
: { TEST
: 'test' },
283 await pool1
.destroy()
286 it('Should work even without opts in input', async () => {
287 const pool1
= new FixedThreadPool(
289 './tests/worker-files/thread/testWorker.js'
291 const res
= await pool1
.execute()
292 expect(res
).toStrictEqual({ ok
: 1 })
293 // We need to clean up the resources after our test
294 await pool1
.destroy()
297 it('Verify that a pool with zero worker fails', async () => {
299 () => new FixedThreadPool(0, './tests/worker-files/thread/testWorker.js')
300 ).toThrowError('Cannot instantiate a fixed pool with zero worker')