e5d9831dc462c1b6bf887e606888fca9a5f78e33
1 const { expect
} = require('expect')
2 const { FixedThreadPool
, PoolEvents
} = require('../../../lib')
3 const { WorkerFunctions
} = require('../../test-types')
4 const TestUtils
= require('../../test-utils')
6 describe('Fixed thread pool test suite', () => {
7 const numberOfThreads
= 6
8 const pool
= new FixedThreadPool(
10 './tests/worker-files/thread/testWorker.js',
12 errorHandler
: e
=> console
.error(e
)
15 const queuePool
= new FixedThreadPool(
17 './tests/worker-files/thread/testWorker.js',
19 enableTasksQueue
: true,
23 errorHandler
: e
=> console
.error(e
)
26 const emptyPool
= new FixedThreadPool(
28 './tests/worker-files/thread/emptyWorker.js',
29 { exitHandler
: () => console
.log('empty pool worker exited') }
31 const echoPool
= new FixedThreadPool(
33 './tests/worker-files/thread/echoWorker.js'
35 const errorPool
= new FixedThreadPool(
37 './tests/worker-files/thread/errorWorker.js',
39 errorHandler
: e
=> console
.error(e
)
42 const asyncErrorPool
= new FixedThreadPool(
44 './tests/worker-files/thread/asyncErrorWorker.js',
46 errorHandler
: e
=> console
.error(e
)
49 const asyncPool
= new FixedThreadPool(
51 './tests/worker-files/thread/asyncWorker.js'
54 after('Destroy all pools', async () => {
55 // We need to clean up the resources after our test
56 await echoPool
.destroy()
57 await asyncPool
.destroy()
58 await errorPool
.destroy()
59 await asyncErrorPool
.destroy()
60 await emptyPool
.destroy()
61 await queuePool
.destroy()
64 it('Verify that the function is executed in a worker thread', async () => {
65 let result
= await pool
.execute({
66 function: WorkerFunctions
.fibonacci
68 expect(result
).toBe(75025)
69 result
= await pool
.execute({
70 function: WorkerFunctions
.factorial
72 expect(result
).toBe(9.33262154439441e157
)
75 it('Verify that is possible to invoke the execute() method without input', async () => {
76 const result
= await pool
.execute()
77 expect(result
).toBe(false)
80 it("Verify that 'busy' event is emitted", async () => {
82 pool
.emitter
.on(PoolEvents
.busy
, () => ++poolBusy
)
83 for (let i
= 0; i
< numberOfThreads
* 2; i
++) {
86 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
87 // So in total numberOfThreads + 1 times for a loop submitting up to numberOfThreads * 2 tasks to the fixed pool.
88 expect(poolBusy
).toBe(numberOfThreads
+ 1)
91 it('Verify that tasks queuing is working', async () => {
92 const promises
= new Set()
93 const maxMultiplier
= 2
94 for (let i
= 0; i
< numberOfThreads
* maxMultiplier
; i
++) {
95 promises
.add(queuePool
.execute())
97 expect(promises
.size
).toBe(numberOfThreads
* maxMultiplier
)
98 for (const workerNode
of queuePool
.workerNodes
) {
99 expect(workerNode
.usage
.tasks
.executing
).toBeLessThanOrEqual(
100 queuePool
.opts
.tasksQueueOptions
.concurrency
102 expect(workerNode
.usage
.tasks
.executed
).toBe(0)
103 expect(workerNode
.usage
.tasks
.queued
).toBeGreaterThan(0)
104 expect(workerNode
.usage
.tasks
.maxQueued
).toBeGreaterThan(0)
106 expect(queuePool
.info
.executingTasks
).toBe(numberOfThreads
)
107 expect(queuePool
.info
.queuedTasks
).toBe(
108 numberOfThreads
* maxMultiplier
- numberOfThreads
110 expect(queuePool
.info
.maxQueuedTasks
).toBe(
111 numberOfThreads
* maxMultiplier
- numberOfThreads
113 await Promise
.all(promises
)
114 for (const workerNode
of queuePool
.workerNodes
) {
115 expect(workerNode
.usage
.tasks
.executing
).toBe(0)
116 expect(workerNode
.usage
.tasks
.executed
).toBeGreaterThan(0)
117 expect(workerNode
.usage
.tasks
.executed
).toBeLessThanOrEqual(maxMultiplier
)
118 expect(workerNode
.usage
.tasks
.queued
).toBe(0)
119 expect(workerNode
.usage
.tasks
.maxQueued
).toBe(1)
123 it('Verify that is possible to have a worker that return undefined', async () => {
124 const result
= await emptyPool
.execute()
125 expect(result
).toBeUndefined()
128 it('Verify that data are sent to the worker correctly', async () => {
129 const data
= { f
: 10 }
130 const result
= await echoPool
.execute(data
)
131 expect(result
).toStrictEqual(data
)
134 it('Verify that error handling is working properly:sync', async () => {
135 const data
= { f
: 10 }
137 errorPool
.emitter
.on(PoolEvents
.taskError
, e
=> {
142 await errorPool
.execute(data
)
146 expect(inError
).toBeDefined()
147 expect(inError
).toBeInstanceOf(Error
)
148 expect(inError
.message
).toBeDefined()
149 expect(typeof inError
.message
=== 'string').toBe(true)
150 expect(inError
.message
).toBe('Error Message from ThreadWorker')
151 expect(taskError
).toStrictEqual({
152 message
: new Error('Error Message from ThreadWorker'),
156 errorPool
.workerNodes
.some(
157 workerNode
=> workerNode
.usage
.tasks
.failed
=== 1
162 it('Verify that error handling is working properly:async', async () => {
163 const data
= { f
: 10 }
165 asyncErrorPool
.emitter
.on(PoolEvents
.taskError
, e
=> {
170 await asyncErrorPool
.execute(data
)
174 expect(inError
).toBeDefined()
175 expect(inError
).toBeInstanceOf(Error
)
176 expect(inError
.message
).toBeDefined()
177 expect(typeof inError
.message
=== 'string').toBe(true)
178 expect(inError
.message
).toBe('Error Message from ThreadWorker:async')
179 expect(taskError
).toStrictEqual({
180 message
: new Error('Error Message from ThreadWorker:async'),
184 asyncErrorPool
.workerNodes
.some(
185 workerNode
=> workerNode
.usage
.tasks
.failed
=== 1
190 it('Verify that async function is working properly', async () => {
191 const data
= { f
: 10 }
192 const startTime
= performance
.now()
193 const result
= await asyncPool
.execute(data
)
194 const usedTime
= performance
.now() - startTime
195 expect(result
).toStrictEqual(data
)
196 expect(usedTime
).toBeGreaterThanOrEqual(2000)
199 it('Shutdown test', async () => {
200 const exitPromise
= TestUtils
.waitWorkerEvents(
206 const numberOfExitEvents
= await exitPromise
207 expect(numberOfExitEvents
).toBe(numberOfThreads
)
210 it('Verify that thread pool options are checked', async () => {
211 const workerFilePath
= './tests/worker-files/cluster/testWorker.js'
212 let pool1
= new FixedThreadPool(numberOfThreads
, workerFilePath
)
213 expect(pool1
.opts
.workerOptions
).toBeUndefined()
214 await pool1
.destroy()
215 pool1
= new FixedThreadPool(numberOfThreads
, workerFilePath
, {
217 env
: { TEST
: 'test' },
221 expect(pool1
.opts
.workerOptions
).toStrictEqual({
222 env
: { TEST
: 'test' },
225 await pool1
.destroy()
228 it('Should work even without opts in input', async () => {
229 const pool1
= new FixedThreadPool(
231 './tests/worker-files/thread/testWorker.js'
233 const res
= await pool1
.execute()
234 expect(res
).toBe(false)
235 // We need to clean up the resources after our test
236 await pool1
.destroy()
239 it('Verify that a pool with zero worker fails', async () => {
241 () => new FixedThreadPool(0, './tests/worker-files/thread/testWorker.js')
242 ).toThrowError('Cannot instantiate a fixed pool with no worker')