1 const { expect
} = require('expect')
2 const { FixedThreadPool
, PoolEvents
} = require('../../../lib')
3 const { WorkerFunctions
} = require('../../test-types')
4 const TestUtils
= require('../../test-utils')
6 describe('Fixed thread pool test suite', () => {
7 const numberOfThreads
= 6
8 const pool
= new FixedThreadPool(
10 './tests/worker-files/thread/testWorker.js',
12 errorHandler
: e
=> console
.error(e
)
15 const queuePool
= new FixedThreadPool(
17 './tests/worker-files/thread/testWorker.js',
19 enableTasksQueue
: true,
23 errorHandler
: e
=> console
.error(e
)
26 const emptyPool
= new FixedThreadPool(
28 './tests/worker-files/thread/emptyWorker.js',
29 { exitHandler
: () => console
.log('empty pool worker exited') }
31 const echoPool
= new FixedThreadPool(
33 './tests/worker-files/thread/echoWorker.js'
35 const errorPool
= new FixedThreadPool(
37 './tests/worker-files/thread/errorWorker.js',
39 errorHandler
: e
=> console
.error(e
)
42 const asyncErrorPool
= new FixedThreadPool(
44 './tests/worker-files/thread/asyncErrorWorker.js',
46 errorHandler
: e
=> console
.error(e
)
49 const asyncPool
= new FixedThreadPool(
51 './tests/worker-files/thread/asyncWorker.js'
54 after('Destroy all pools', async () => {
55 // We need to clean up the resources after our test
56 await echoPool
.destroy()
57 await asyncPool
.destroy()
58 await errorPool
.destroy()
59 await asyncErrorPool
.destroy()
60 await emptyPool
.destroy()
61 await queuePool
.destroy()
64 it('Verify that the function is executed in a worker thread', async () => {
65 let result
= await pool
.execute({
66 function: WorkerFunctions
.fibonacci
68 expect(result
).toBe(75025)
69 result
= await pool
.execute({
70 function: WorkerFunctions
.factorial
72 expect(result
).toBe(9.33262154439441e157
)
75 it('Verify that is possible to invoke the execute() method without input', async () => {
76 const result
= await pool
.execute()
77 expect(result
).toBe(false)
80 it("Verify that 'busy' event is emitted", async () => {
82 pool
.emitter
.on(PoolEvents
.busy
, () => ++poolBusy
)
83 for (let i
= 0; i
< numberOfThreads
* 2; i
++) {
86 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
87 // So in total numberOfThreads + 1 times for a loop submitting up to numberOfThreads * 2 tasks to the fixed pool.
88 expect(poolBusy
).toBe(numberOfThreads
+ 1)
91 it('Verify that tasks queuing is working', async () => {
92 const promises
= new Set()
93 const maxMultiplier
= 2
94 for (let i
= 0; i
< numberOfThreads
* maxMultiplier
; i
++) {
95 promises
.add(queuePool
.execute())
97 expect(promises
.size
).toBe(numberOfThreads
* maxMultiplier
)
98 for (const workerNode
of queuePool
.workerNodes
) {
99 expect(workerNode
.workerUsage
.tasks
.executing
).toBeLessThanOrEqual(
100 queuePool
.opts
.tasksQueueOptions
.concurrency
102 expect(workerNode
.workerUsage
.tasks
.executed
).toBe(0)
103 expect(workerNode
.workerUsage
.tasks
.queued
).toBeGreaterThan(0)
104 expect(workerNode
.workerUsage
.tasks
.maxQueued
).toBeGreaterThan(0)
106 expect(queuePool
.info
.executingTasks
).toBe(numberOfThreads
)
107 expect(queuePool
.info
.queuedTasks
).toBe(
108 numberOfThreads
* maxMultiplier
- numberOfThreads
110 expect(queuePool
.info
.maxQueuedTasks
).toBe(
111 numberOfThreads
* maxMultiplier
- numberOfThreads
113 await Promise
.all(promises
)
114 for (const workerNode
of queuePool
.workerNodes
) {
115 expect(workerNode
.workerUsage
.tasks
.executing
).toBe(0)
116 expect(workerNode
.workerUsage
.tasks
.executed
).toBeGreaterThan(0)
117 expect(workerNode
.workerUsage
.tasks
.executed
).toBeLessThanOrEqual(
120 expect(workerNode
.workerUsage
.tasks
.queued
).toBe(0)
121 expect(workerNode
.workerUsage
.tasks
.maxQueued
).toBe(1)
125 it('Verify that is possible to have a worker that return undefined', async () => {
126 const result
= await emptyPool
.execute()
127 expect(result
).toBeUndefined()
130 it('Verify that data are sent to the worker correctly', async () => {
131 const data
= { f
: 10 }
132 const result
= await echoPool
.execute(data
)
133 expect(result
).toStrictEqual(data
)
136 it('Verify that error handling is working properly:sync', async () => {
137 const data
= { f
: 10 }
139 errorPool
.emitter
.on(PoolEvents
.taskError
, e
=> {
144 await errorPool
.execute(data
)
148 expect(inError
).toBeDefined()
149 expect(inError
).toBeInstanceOf(Error
)
150 expect(inError
.message
).toBeDefined()
151 expect(typeof inError
.message
=== 'string').toBe(true)
152 expect(inError
.message
).toBe('Error Message from ThreadWorker')
153 expect(taskError
).toStrictEqual({
154 message
: new Error('Error Message from ThreadWorker'),
158 errorPool
.workerNodes
.some(
159 workerNode
=> workerNode
.workerUsage
.tasks
.failed
=== 1
164 it('Verify that error handling is working properly:async', async () => {
165 const data
= { f
: 10 }
167 asyncErrorPool
.emitter
.on(PoolEvents
.taskError
, e
=> {
172 await asyncErrorPool
.execute(data
)
176 expect(inError
).toBeDefined()
177 expect(inError
).toBeInstanceOf(Error
)
178 expect(inError
.message
).toBeDefined()
179 expect(typeof inError
.message
=== 'string').toBe(true)
180 expect(inError
.message
).toBe('Error Message from ThreadWorker:async')
181 expect(taskError
).toStrictEqual({
182 message
: new Error('Error Message from ThreadWorker:async'),
186 asyncErrorPool
.workerNodes
.some(
187 workerNode
=> workerNode
.workerUsage
.tasks
.failed
=== 1
192 it('Verify that async function is working properly', async () => {
193 const data
= { f
: 10 }
194 const startTime
= performance
.now()
195 const result
= await asyncPool
.execute(data
)
196 const usedTime
= performance
.now() - startTime
197 expect(result
).toStrictEqual(data
)
198 expect(usedTime
).toBeGreaterThanOrEqual(2000)
201 it('Shutdown test', async () => {
202 const exitPromise
= TestUtils
.waitWorkerEvents(
208 const numberOfExitEvents
= await exitPromise
209 expect(numberOfExitEvents
).toBe(numberOfThreads
)
212 it('Verify that thread pool options are checked', async () => {
213 const workerFilePath
= './tests/worker-files/cluster/testWorker.js'
214 let pool1
= new FixedThreadPool(numberOfThreads
, workerFilePath
)
215 expect(pool1
.opts
.workerOptions
).toBeUndefined()
216 await pool1
.destroy()
217 pool1
= new FixedThreadPool(numberOfThreads
, workerFilePath
, {
219 env
: { TEST
: 'test' },
223 expect(pool1
.opts
.workerOptions
).toStrictEqual({
224 env
: { TEST
: 'test' },
227 await pool1
.destroy()
230 it('Should work even without opts in input', async () => {
231 const pool1
= new FixedThreadPool(
233 './tests/worker-files/thread/testWorker.js'
235 const res
= await pool1
.execute()
236 expect(res
).toBe(false)
237 // We need to clean up the resources after our test
238 await pool1
.destroy()
241 it('Verify that a pool with zero worker fails', async () => {
243 () => new FixedThreadPool(0, './tests/worker-files/thread/testWorker.js')
244 ).toThrowError('Cannot instantiate a fixed pool with no worker')