1 const { expect
} = require('expect')
2 const { FixedThreadPool
, PoolEvents
} = require('../../../lib')
3 const { WorkerFunctions
} = require('../../test-types')
4 const TestUtils
= require('../../test-utils')
6 describe('Fixed thread pool test suite', () => {
7 const numberOfThreads
= 6
8 const pool
= new FixedThreadPool(
10 './tests/worker-files/thread/testWorker.js',
12 errorHandler
: e
=> console
.error(e
)
15 const queuePool
= new FixedThreadPool(
17 './tests/worker-files/thread/testWorker.js',
19 enableTasksQueue
: true,
23 errorHandler
: e
=> console
.error(e
)
26 const emptyPool
= new FixedThreadPool(
28 './tests/worker-files/thread/emptyWorker.js',
29 { exitHandler
: () => console
.log('empty pool worker exited') }
31 const echoPool
= new FixedThreadPool(
33 './tests/worker-files/thread/echoWorker.js'
35 const errorPool
= new FixedThreadPool(
37 './tests/worker-files/thread/errorWorker.js',
39 errorHandler
: e
=> console
.error(e
)
42 const asyncErrorPool
= new FixedThreadPool(
44 './tests/worker-files/thread/asyncErrorWorker.js',
46 errorHandler
: e
=> console
.error(e
)
49 const asyncPool
= new FixedThreadPool(
51 './tests/worker-files/thread/asyncWorker.js'
54 after('Destroy all pools', async () => {
55 // We need to clean up the resources after our test
56 await echoPool
.destroy()
57 await asyncPool
.destroy()
58 await errorPool
.destroy()
59 await asyncErrorPool
.destroy()
60 await emptyPool
.destroy()
61 await queuePool
.destroy()
64 it('Verify that the function is executed in a worker thread', async () => {
65 let result
= await pool
.execute({
66 function: WorkerFunctions
.fibonacci
68 expect(result
).toBe(121393)
69 result
= await pool
.execute({
70 function: WorkerFunctions
.factorial
72 expect(result
).toBe(9.33262154439441e157
)
75 it('Verify that is possible to invoke the execute() method without input', async () => {
76 const result
= await pool
.execute()
77 expect(result
).toBe(false)
80 it("Verify that 'busy' event is emitted", async () => {
82 pool
.emitter
.on(PoolEvents
.busy
, () => ++poolBusy
)
83 for (let i
= 0; i
< numberOfThreads
* 2; i
++) {
86 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
87 // So in total numberOfThreads + 1 times for a loop submitting up to numberOfThreads * 2 tasks to the fixed pool.
88 expect(poolBusy
).toBe(numberOfThreads
+ 1)
91 it('Verify that tasks queuing is working', async () => {
92 const promises
= new Set()
93 const maxMultiplier
= 2
94 for (let i
= 0; i
< numberOfThreads
* maxMultiplier
; i
++) {
95 promises
.add(queuePool
.execute())
97 expect(promises
.size
).toBe(numberOfThreads
* maxMultiplier
)
98 for (const workerNode
of queuePool
.workerNodes
) {
99 expect(workerNode
.tasksUsage
.running
).toBeLessThanOrEqual(
100 queuePool
.opts
.tasksQueueOptions
.concurrency
102 expect(workerNode
.tasksUsage
.ran
).toBe(0)
103 expect(workerNode
.tasksQueue
.size
).toBeGreaterThan(0)
105 expect(queuePool
.info
.runningTasks
).toBe(numberOfThreads
)
106 expect(queuePool
.info
.queuedTasks
).toBe(
107 numberOfThreads
* maxMultiplier
- numberOfThreads
109 expect(queuePool
.info
.maxQueuedTasks
).toBe(
110 numberOfThreads
* maxMultiplier
- numberOfThreads
112 await Promise
.all(promises
)
113 for (const workerNode
of queuePool
.workerNodes
) {
114 expect(workerNode
.tasksUsage
.running
).toBe(0)
115 expect(workerNode
.tasksUsage
.ran
).toBeGreaterThan(0)
116 expect(workerNode
.tasksUsage
.ran
).toBeLessThanOrEqual(maxMultiplier
)
117 expect(workerNode
.tasksQueue
.size
).toBe(0)
121 it('Verify that is possible to have a worker that return undefined', async () => {
122 const result
= await emptyPool
.execute()
123 expect(result
).toBeUndefined()
126 it('Verify that data are sent to the worker correctly', async () => {
127 const data
= { f
: 10 }
128 const result
= await echoPool
.execute(data
)
129 expect(result
).toStrictEqual(data
)
132 it('Verify that error handling is working properly:sync', async () => {
133 const data
= { f
: 10 }
135 errorPool
.emitter
.on(PoolEvents
.taskError
, e
=> {
140 await errorPool
.execute(data
)
144 expect(inError
).toBeDefined()
145 expect(inError
).toBeInstanceOf(Error
)
146 expect(inError
.message
).toBeDefined()
147 expect(typeof inError
.message
=== 'string').toBe(true)
148 expect(inError
.message
).toBe('Error Message from ThreadWorker')
149 expect(taskError
).toStrictEqual({
150 message
: new Error('Error Message from ThreadWorker'),
154 errorPool
.workerNodes
.some(
155 workerNode
=> workerNode
.tasksUsage
.error
=== 1
160 it('Verify that error handling is working properly:async', async () => {
161 const data
= { f
: 10 }
163 asyncErrorPool
.emitter
.on(PoolEvents
.taskError
, e
=> {
168 await asyncErrorPool
.execute(data
)
172 expect(inError
).toBeDefined()
173 expect(inError
).toBeInstanceOf(Error
)
174 expect(inError
.message
).toBeDefined()
175 expect(typeof inError
.message
=== 'string').toBe(true)
176 expect(inError
.message
).toBe('Error Message from ThreadWorker:async')
177 expect(taskError
).toStrictEqual({
178 message
: new Error('Error Message from ThreadWorker:async'),
182 asyncErrorPool
.workerNodes
.some(
183 workerNode
=> workerNode
.tasksUsage
.error
=== 1
188 it('Verify that async function is working properly', async () => {
189 const data
= { f
: 10 }
190 const startTime
= performance
.now()
191 const result
= await asyncPool
.execute(data
)
192 const usedTime
= performance
.now() - startTime
193 expect(result
).toStrictEqual(data
)
194 expect(usedTime
).toBeGreaterThanOrEqual(2000)
197 it('Shutdown test', async () => {
198 const exitPromise
= TestUtils
.waitWorkerExits(pool
, numberOfThreads
)
200 const numberOfExitEvents
= await exitPromise
201 expect(numberOfExitEvents
).toBe(numberOfThreads
)
204 it('Verify that thread pool options are checked', async () => {
205 const workerFilePath
= './tests/worker-files/cluster/testWorker.js'
206 let pool1
= new FixedThreadPool(numberOfThreads
, workerFilePath
)
207 expect(pool1
.opts
.workerOptions
).toBeUndefined()
208 await pool1
.destroy()
209 pool1
= new FixedThreadPool(numberOfThreads
, workerFilePath
, {
211 env
: { TEST
: 'test' },
215 expect(pool1
.opts
.workerOptions
).toStrictEqual({
216 env
: { TEST
: 'test' },
219 await pool1
.destroy()
222 it('Should work even without opts in input', async () => {
223 const pool1
= new FixedThreadPool(
225 './tests/worker-files/thread/testWorker.js'
227 const res
= await pool1
.execute()
228 expect(res
).toBe(false)
229 // We need to clean up the resources after our test
230 await pool1
.destroy()
233 it('Verify that a pool with zero worker fails', async () => {
235 () => new FixedThreadPool(0, './tests/worker-files/thread/testWorker.js')
236 ).toThrowError('Cannot instantiate a fixed pool with no worker')