618a67dcf96879c83ae76e8e1c227df1028eec2d
1 const { expect
} = require('expect')
2 const { FixedThreadPool
, PoolEvents
} = require('../../../lib')
3 const { WorkerFunctions
} = require('../../test-types')
4 const { waitWorkerEvents
} = require('../../test-utils')
6 describe('Fixed thread pool test suite', () => {
7 const numberOfThreads
= 6
8 const pool
= new FixedThreadPool(
10 './tests/worker-files/thread/testWorker.js',
12 errorHandler
: e
=> console
.error(e
)
15 const queuePool
= new FixedThreadPool(
17 './tests/worker-files/thread/testWorker.js',
19 enableTasksQueue
: true,
23 errorHandler
: e
=> console
.error(e
)
26 const emptyPool
= new FixedThreadPool(
28 './tests/worker-files/thread/emptyWorker.js',
29 { exitHandler
: () => console
.log('empty pool worker exited') }
31 const echoPool
= new FixedThreadPool(
33 './tests/worker-files/thread/echoWorker.js'
35 const errorPool
= new FixedThreadPool(
37 './tests/worker-files/thread/errorWorker.js',
39 errorHandler
: e
=> console
.error(e
)
42 const asyncErrorPool
= new FixedThreadPool(
44 './tests/worker-files/thread/asyncErrorWorker.js',
46 errorHandler
: e
=> console
.error(e
)
49 const asyncPool
= new FixedThreadPool(
51 './tests/worker-files/thread/asyncWorker.js'
54 after('Destroy all pools', async () => {
55 // We need to clean up the resources after our test
56 await echoPool
.destroy()
57 await asyncPool
.destroy()
58 await errorPool
.destroy()
59 await asyncErrorPool
.destroy()
60 await emptyPool
.destroy()
61 await queuePool
.destroy()
64 it('Verify that the function is executed in a worker thread', async () => {
65 let result
= await pool
.execute({
66 function: WorkerFunctions
.fibonacci
68 expect(result
).toBe(75025)
69 result
= await pool
.execute({
70 function: WorkerFunctions
.factorial
72 expect(result
).toBe(9.33262154439441e157
)
75 it('Verify that is possible to invoke the execute() method without input', async () => {
76 const result
= await pool
.execute()
77 expect(result
).toStrictEqual({ ok
: 1 })
80 it
.skip("Verify that 'ready' event is emitted", async () => {
81 const pool1
= new FixedThreadPool(
83 './tests/worker-files/thread/testWorker.js',
85 errorHandler
: e
=> console
.error(e
)
90 pool1
.emitter
.on(PoolEvents
.ready
, info
=> {
94 expect(poolReady
).toBe(1)
95 expect(poolInfo
).toBeDefined()
98 it("Verify that 'busy' event is emitted", async () => {
100 pool
.emitter
.on(PoolEvents
.busy
, () => ++poolBusy
)
101 for (let i
= 0; i
< numberOfThreads
* 2; i
++) {
104 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
105 // So in total numberOfThreads + 1 times for a loop submitting up to numberOfThreads * 2 tasks to the fixed pool.
106 expect(poolBusy
).toBe(numberOfThreads
+ 1)
109 it('Verify that tasks queuing is working', async () => {
110 const promises
= new Set()
111 const maxMultiplier
= 2
112 for (let i
= 0; i
< numberOfThreads
* maxMultiplier
; i
++) {
113 promises
.add(queuePool
.execute())
115 expect(promises
.size
).toBe(numberOfThreads
* maxMultiplier
)
116 for (const workerNode
of queuePool
.workerNodes
) {
117 expect(workerNode
.usage
.tasks
.executing
).toBeLessThanOrEqual(
118 queuePool
.opts
.tasksQueueOptions
.concurrency
120 expect(workerNode
.usage
.tasks
.executed
).toBe(0)
121 expect(workerNode
.usage
.tasks
.queued
).toBeGreaterThan(0)
122 expect(workerNode
.usage
.tasks
.maxQueued
).toBeGreaterThan(0)
124 expect(queuePool
.info
.executingTasks
).toBe(numberOfThreads
)
125 expect(queuePool
.info
.queuedTasks
).toBe(
126 numberOfThreads
* maxMultiplier
- numberOfThreads
128 expect(queuePool
.info
.maxQueuedTasks
).toBe(
129 numberOfThreads
* maxMultiplier
- numberOfThreads
131 await Promise
.all(promises
)
132 for (const workerNode
of queuePool
.workerNodes
) {
133 expect(workerNode
.usage
.tasks
.executing
).toBe(0)
134 expect(workerNode
.usage
.tasks
.executed
).toBeGreaterThan(0)
135 expect(workerNode
.usage
.tasks
.executed
).toBeLessThanOrEqual(maxMultiplier
)
136 expect(workerNode
.usage
.tasks
.queued
).toBe(0)
137 expect(workerNode
.usage
.tasks
.maxQueued
).toBe(1)
141 it('Verify that is possible to have a worker that return undefined', async () => {
142 const result
= await emptyPool
.execute()
143 expect(result
).toBeUndefined()
146 it('Verify that data are sent to the worker correctly', async () => {
147 const data
= { f
: 10 }
148 const result
= await echoPool
.execute(data
)
149 expect(result
).toStrictEqual(data
)
152 it('Verify that error handling is working properly:sync', async () => {
153 const data
= { f
: 10 }
155 errorPool
.emitter
.on(PoolEvents
.taskError
, e
=> {
160 await errorPool
.execute(data
)
164 expect(inError
).toBeDefined()
165 expect(inError
).toBeInstanceOf(Error
)
166 expect(inError
.message
).toBeDefined()
167 expect(typeof inError
.message
=== 'string').toBe(true)
168 expect(inError
.message
).toBe('Error Message from ThreadWorker')
169 expect(taskError
).toStrictEqual({
171 message
: new Error('Error Message from ThreadWorker'),
175 errorPool
.workerNodes
.some(
176 workerNode
=> workerNode
.usage
.tasks
.failed
=== 1
181 it('Verify that error handling is working properly:async', async () => {
182 const data
= { f
: 10 }
184 asyncErrorPool
.emitter
.on(PoolEvents
.taskError
, e
=> {
189 await asyncErrorPool
.execute(data
)
193 expect(inError
).toBeDefined()
194 expect(inError
).toBeInstanceOf(Error
)
195 expect(inError
.message
).toBeDefined()
196 expect(typeof inError
.message
=== 'string').toBe(true)
197 expect(inError
.message
).toBe('Error Message from ThreadWorker:async')
198 expect(taskError
).toStrictEqual({
200 message
: new Error('Error Message from ThreadWorker:async'),
204 asyncErrorPool
.workerNodes
.some(
205 workerNode
=> workerNode
.usage
.tasks
.failed
=== 1
210 it('Verify that async function is working properly', async () => {
211 const data
= { f
: 10 }
212 const startTime
= performance
.now()
213 const result
= await asyncPool
.execute(data
)
214 const usedTime
= performance
.now() - startTime
215 expect(result
).toStrictEqual(data
)
216 expect(usedTime
).toBeGreaterThanOrEqual(2000)
219 it('Shutdown test', async () => {
220 const exitPromise
= waitWorkerEvents(pool
, 'exit', numberOfThreads
)
222 const numberOfExitEvents
= await exitPromise
223 expect(numberOfExitEvents
).toBe(numberOfThreads
)
226 it('Verify that thread pool options are checked', async () => {
227 const workerFilePath
= './tests/worker-files/thread/testWorker.js'
228 let pool1
= new FixedThreadPool(numberOfThreads
, workerFilePath
)
229 expect(pool1
.opts
.workerOptions
).toBeUndefined()
230 await pool1
.destroy()
231 pool1
= new FixedThreadPool(numberOfThreads
, workerFilePath
, {
233 env
: { TEST
: 'test' },
237 expect(pool1
.opts
.workerOptions
).toStrictEqual({
238 env
: { TEST
: 'test' },
241 await pool1
.destroy()
244 it('Should work even without opts in input', async () => {
245 const pool1
= new FixedThreadPool(
247 './tests/worker-files/thread/testWorker.js'
249 const res
= await pool1
.execute()
250 expect(res
).toStrictEqual({ ok
: 1 })
251 // We need to clean up the resources after our test
252 await pool1
.destroy()
255 it('Verify that a pool with zero worker fails', async () => {
257 () => new FixedThreadPool(0, './tests/worker-files/thread/testWorker.js')
258 ).toThrowError('Cannot instantiate a fixed pool with zero worker')