1 const { expect
} = require('expect')
2 const { FixedThreadPool
, PoolEvents
} = require('../../../lib')
3 const { WorkerFunctions
} = require('../../test-types')
4 const { waitPoolEvents
, waitWorkerEvents
} = require('../../test-utils')
6 describe('Fixed thread pool test suite', () => {
7 const numberOfThreads
= 6
8 const pool
= new FixedThreadPool(
10 './tests/worker-files/thread/testWorker.js',
12 errorHandler
: e
=> console
.error(e
)
15 const queuePool
= new FixedThreadPool(
17 './tests/worker-files/thread/testWorker.js',
19 enableTasksQueue
: true,
23 errorHandler
: e
=> console
.error(e
)
26 const emptyPool
= new FixedThreadPool(
28 './tests/worker-files/thread/emptyWorker.js',
29 { exitHandler
: () => console
.log('empty pool worker exited') }
31 const echoPool
= new FixedThreadPool(
33 './tests/worker-files/thread/echoWorker.js'
35 const errorPool
= new FixedThreadPool(
37 './tests/worker-files/thread/errorWorker.js',
39 errorHandler
: e
=> console
.error(e
)
42 const asyncErrorPool
= new FixedThreadPool(
44 './tests/worker-files/thread/asyncErrorWorker.js',
46 errorHandler
: e
=> console
.error(e
)
49 const asyncPool
= new FixedThreadPool(
51 './tests/worker-files/thread/asyncWorker.js'
54 after('Destroy all pools', async () => {
55 // We need to clean up the resources after our test
56 await echoPool
.destroy()
57 await asyncPool
.destroy()
58 await errorPool
.destroy()
59 await asyncErrorPool
.destroy()
60 await emptyPool
.destroy()
61 await queuePool
.destroy()
64 it('Verify that the function is executed in a worker thread', async () => {
65 let result
= await pool
.execute({
66 function: WorkerFunctions
.fibonacci
68 expect(result
).toBe(75025)
69 result
= await pool
.execute({
70 function: WorkerFunctions
.factorial
72 expect(result
).toBe(9.33262154439441e157
)
75 it('Verify that is possible to invoke the execute() method without input', async () => {
76 const result
= await pool
.execute()
77 expect(result
).toStrictEqual({ ok
: 1 })
80 it("Verify that 'ready' event is emitted", async () => {
81 const pool1
= new FixedThreadPool(
83 './tests/worker-files/thread/testWorker.js',
85 errorHandler
: e
=> console
.error(e
)
89 pool1
.emitter
.on(PoolEvents
.ready
, () => ++poolReady
)
90 await
waitPoolEvents(pool1
, PoolEvents
.ready
, 1)
91 expect(poolReady
).toBe(1)
94 it("Verify that 'busy' event is emitted", async () => {
96 pool
.emitter
.on(PoolEvents
.busy
, () => ++poolBusy
)
97 for (let i
= 0; i
< numberOfThreads
* 2; i
++) {
100 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
101 // So in total numberOfThreads + 1 times for a loop submitting up to numberOfThreads * 2 tasks to the fixed pool.
102 expect(poolBusy
).toBe(numberOfThreads
+ 1)
105 it('Verify that tasks queuing is working', async () => {
106 const promises
= new Set()
107 const maxMultiplier
= 2
108 for (let i
= 0; i
< numberOfThreads
* maxMultiplier
; i
++) {
109 promises
.add(queuePool
.execute())
111 expect(promises
.size
).toBe(numberOfThreads
* maxMultiplier
)
112 for (const workerNode
of queuePool
.workerNodes
) {
113 expect(workerNode
.usage
.tasks
.executing
).toBeLessThanOrEqual(
114 queuePool
.opts
.tasksQueueOptions
.concurrency
116 expect(workerNode
.usage
.tasks
.executed
).toBe(0)
117 expect(workerNode
.usage
.tasks
.queued
).toBeGreaterThan(0)
118 expect(workerNode
.usage
.tasks
.maxQueued
).toBeGreaterThan(0)
120 expect(queuePool
.info
.executingTasks
).toBe(numberOfThreads
)
121 expect(queuePool
.info
.queuedTasks
).toBe(
122 numberOfThreads
* maxMultiplier
- numberOfThreads
124 expect(queuePool
.info
.maxQueuedTasks
).toBe(
125 numberOfThreads
* maxMultiplier
- numberOfThreads
127 await Promise
.all(promises
)
128 for (const workerNode
of queuePool
.workerNodes
) {
129 expect(workerNode
.usage
.tasks
.executing
).toBe(0)
130 expect(workerNode
.usage
.tasks
.executed
).toBeGreaterThan(0)
131 expect(workerNode
.usage
.tasks
.executed
).toBeLessThanOrEqual(maxMultiplier
)
132 expect(workerNode
.usage
.tasks
.queued
).toBe(0)
133 expect(workerNode
.usage
.tasks
.maxQueued
).toBe(1)
137 it('Verify that is possible to have a worker that return undefined', async () => {
138 const result
= await emptyPool
.execute()
139 expect(result
).toBeUndefined()
142 it('Verify that data are sent to the worker correctly', async () => {
143 const data
= { f
: 10 }
144 const result
= await echoPool
.execute(data
)
145 expect(result
).toStrictEqual(data
)
148 it('Verify that error handling is working properly:sync', async () => {
149 const data
= { f
: 10 }
151 errorPool
.emitter
.on(PoolEvents
.taskError
, e
=> {
156 await errorPool
.execute(data
)
160 expect(inError
).toBeDefined()
161 expect(inError
).toBeInstanceOf(Error
)
162 expect(inError
.message
).toBeDefined()
163 expect(typeof inError
.message
=== 'string').toBe(true)
164 expect(inError
.message
).toBe('Error Message from ThreadWorker')
165 expect(taskError
).toStrictEqual({
166 message
: new Error('Error Message from ThreadWorker'),
170 errorPool
.workerNodes
.some(
171 workerNode
=> workerNode
.usage
.tasks
.failed
=== 1
176 it('Verify that error handling is working properly:async', async () => {
177 const data
= { f
: 10 }
179 asyncErrorPool
.emitter
.on(PoolEvents
.taskError
, e
=> {
184 await asyncErrorPool
.execute(data
)
188 expect(inError
).toBeDefined()
189 expect(inError
).toBeInstanceOf(Error
)
190 expect(inError
.message
).toBeDefined()
191 expect(typeof inError
.message
=== 'string').toBe(true)
192 expect(inError
.message
).toBe('Error Message from ThreadWorker:async')
193 expect(taskError
).toStrictEqual({
194 message
: new Error('Error Message from ThreadWorker:async'),
198 asyncErrorPool
.workerNodes
.some(
199 workerNode
=> workerNode
.usage
.tasks
.failed
=== 1
204 it('Verify that async function is working properly', async () => {
205 const data
= { f
: 10 }
206 const startTime
= performance
.now()
207 const result
= await asyncPool
.execute(data
)
208 const usedTime
= performance
.now() - startTime
209 expect(result
).toStrictEqual(data
)
210 expect(usedTime
).toBeGreaterThanOrEqual(2000)
213 it('Shutdown test', async () => {
214 const exitPromise
= waitWorkerEvents(pool
, 'exit', numberOfThreads
)
216 const numberOfExitEvents
= await exitPromise
217 expect(numberOfExitEvents
).toBe(numberOfThreads
)
220 it('Verify that thread pool options are checked', async () => {
221 const workerFilePath
= './tests/worker-files/thread/testWorker.js'
222 let pool1
= new FixedThreadPool(numberOfThreads
, workerFilePath
)
223 expect(pool1
.opts
.workerOptions
).toBeUndefined()
224 await pool1
.destroy()
225 pool1
= new FixedThreadPool(numberOfThreads
, workerFilePath
, {
227 env
: { TEST
: 'test' },
231 expect(pool1
.opts
.workerOptions
).toStrictEqual({
232 env
: { TEST
: 'test' },
235 await pool1
.destroy()
238 it('Should work even without opts in input', async () => {
239 const pool1
= new FixedThreadPool(
241 './tests/worker-files/thread/testWorker.js'
243 const res
= await pool1
.execute()
244 expect(res
).toStrictEqual({ ok
: 1 })
245 // We need to clean up the resources after our test
246 await pool1
.destroy()
249 it('Verify that a pool with zero worker fails', async () => {
251 () => new FixedThreadPool(0, './tests/worker-files/thread/testWorker.js')
252 ).toThrowError('Cannot instantiate a fixed pool with zero worker')