8b2fe42047bc4c91b2a8d01599842878bd9950e5
1 const { expect
} = require('expect')
2 const { FixedThreadPool
, PoolEvents
} = require('../../../lib')
3 const { WorkerFunctions
} = require('../../test-types')
4 const { waitPoolEvents
, waitWorkerEvents
} = require('../../test-utils')
6 describe('Fixed thread pool test suite', async () => {
7 const numberOfThreads
= 6
8 const pool
= new FixedThreadPool(
10 './tests/worker-files/thread/testWorker.js',
12 errorHandler
: e
=> console
.error(e
)
16 pool
.emitter
.on(PoolEvents
.ready
, () => ++poolReady
)
17 await
waitPoolEvents(pool
, PoolEvents
.ready
, 1)
18 const queuePool
= new FixedThreadPool(
20 './tests/worker-files/thread/testWorker.js',
22 enableTasksQueue
: true,
26 errorHandler
: e
=> console
.error(e
)
29 const emptyPool
= new FixedThreadPool(
31 './tests/worker-files/thread/emptyWorker.js',
32 { exitHandler
: () => console
.log('empty pool worker exited') }
34 const echoPool
= new FixedThreadPool(
36 './tests/worker-files/thread/echoWorker.js'
38 const errorPool
= new FixedThreadPool(
40 './tests/worker-files/thread/errorWorker.js',
42 errorHandler
: e
=> console
.error(e
)
45 const asyncErrorPool
= new FixedThreadPool(
47 './tests/worker-files/thread/asyncErrorWorker.js',
49 errorHandler
: e
=> console
.error(e
)
52 const asyncPool
= new FixedThreadPool(
54 './tests/worker-files/thread/asyncWorker.js'
57 after('Destroy all pools', async () => {
58 // We need to clean up the resources after our test
59 await echoPool
.destroy()
60 await asyncPool
.destroy()
61 await errorPool
.destroy()
62 await asyncErrorPool
.destroy()
63 await emptyPool
.destroy()
64 await queuePool
.destroy()
67 it('Verify that the function is executed in a worker thread', async () => {
68 let result
= await pool
.execute({
69 function: WorkerFunctions
.fibonacci
71 expect(result
).toBe(75025)
72 result
= await pool
.execute({
73 function: WorkerFunctions
.factorial
75 expect(result
).toBe(9.33262154439441e157
)
78 it('Verify that is possible to invoke the execute() method without input', async () => {
79 const result
= await pool
.execute()
80 expect(result
).toStrictEqual({ ok
: 1 })
83 it("Verify that 'ready' event is emitted", async () => {
84 expect(poolReady
).toBe(1)
87 it("Verify that 'busy' event is emitted", async () => {
89 pool
.emitter
.on(PoolEvents
.busy
, () => ++poolBusy
)
90 for (let i
= 0; i
< numberOfThreads
* 2; i
++) {
93 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
94 // So in total numberOfThreads + 1 times for a loop submitting up to numberOfThreads * 2 tasks to the fixed pool.
95 expect(poolBusy
).toBe(numberOfThreads
+ 1)
98 it('Verify that tasks queuing is working', async () => {
99 const promises
= new Set()
100 const maxMultiplier
= 2
101 for (let i
= 0; i
< numberOfThreads
* maxMultiplier
; i
++) {
102 promises
.add(queuePool
.execute())
104 expect(promises
.size
).toBe(numberOfThreads
* maxMultiplier
)
105 for (const workerNode
of queuePool
.workerNodes
) {
106 expect(workerNode
.usage
.tasks
.executing
).toBeLessThanOrEqual(
107 queuePool
.opts
.tasksQueueOptions
.concurrency
109 expect(workerNode
.usage
.tasks
.executed
).toBe(0)
110 expect(workerNode
.usage
.tasks
.queued
).toBeGreaterThan(0)
111 expect(workerNode
.usage
.tasks
.maxQueued
).toBeGreaterThan(0)
113 expect(queuePool
.info
.executingTasks
).toBe(numberOfThreads
)
114 expect(queuePool
.info
.queuedTasks
).toBe(
115 numberOfThreads
* maxMultiplier
- numberOfThreads
117 expect(queuePool
.info
.maxQueuedTasks
).toBe(
118 numberOfThreads
* maxMultiplier
- numberOfThreads
120 await Promise
.all(promises
)
121 for (const workerNode
of queuePool
.workerNodes
) {
122 expect(workerNode
.usage
.tasks
.executing
).toBe(0)
123 expect(workerNode
.usage
.tasks
.executed
).toBeGreaterThan(0)
124 expect(workerNode
.usage
.tasks
.executed
).toBeLessThanOrEqual(maxMultiplier
)
125 expect(workerNode
.usage
.tasks
.queued
).toBe(0)
126 expect(workerNode
.usage
.tasks
.maxQueued
).toBe(1)
130 it('Verify that is possible to have a worker that return undefined', async () => {
131 const result
= await emptyPool
.execute()
132 expect(result
).toBeUndefined()
135 it('Verify that data are sent to the worker correctly', async () => {
136 const data
= { f
: 10 }
137 const result
= await echoPool
.execute(data
)
138 expect(result
).toStrictEqual(data
)
141 it('Verify that error handling is working properly:sync', async () => {
142 const data
= { f
: 10 }
144 errorPool
.emitter
.on(PoolEvents
.taskError
, e
=> {
149 await errorPool
.execute(data
)
153 expect(inError
).toBeDefined()
154 expect(inError
).toBeInstanceOf(Error
)
155 expect(inError
.message
).toBeDefined()
156 expect(typeof inError
.message
=== 'string').toBe(true)
157 expect(inError
.message
).toBe('Error Message from ThreadWorker')
158 expect(taskError
).toStrictEqual({
159 message
: new Error('Error Message from ThreadWorker'),
163 errorPool
.workerNodes
.some(
164 workerNode
=> workerNode
.usage
.tasks
.failed
=== 1
169 it('Verify that error handling is working properly:async', async () => {
170 const data
= { f
: 10 }
172 asyncErrorPool
.emitter
.on(PoolEvents
.taskError
, e
=> {
177 await asyncErrorPool
.execute(data
)
181 expect(inError
).toBeDefined()
182 expect(inError
).toBeInstanceOf(Error
)
183 expect(inError
.message
).toBeDefined()
184 expect(typeof inError
.message
=== 'string').toBe(true)
185 expect(inError
.message
).toBe('Error Message from ThreadWorker:async')
186 expect(taskError
).toStrictEqual({
187 message
: new Error('Error Message from ThreadWorker:async'),
191 asyncErrorPool
.workerNodes
.some(
192 workerNode
=> workerNode
.usage
.tasks
.failed
=== 1
197 it('Verify that async function is working properly', async () => {
198 const data
= { f
: 10 }
199 const startTime
= performance
.now()
200 const result
= await asyncPool
.execute(data
)
201 const usedTime
= performance
.now() - startTime
202 expect(result
).toStrictEqual(data
)
203 expect(usedTime
).toBeGreaterThanOrEqual(2000)
206 it('Shutdown test', async () => {
207 const exitPromise
= waitWorkerEvents(pool
, 'exit', numberOfThreads
)
209 const numberOfExitEvents
= await exitPromise
210 expect(numberOfExitEvents
).toBe(numberOfThreads
)
213 it('Verify that thread pool options are checked', async () => {
214 const workerFilePath
= './tests/worker-files/thread/testWorker.js'
215 let pool1
= new FixedThreadPool(numberOfThreads
, workerFilePath
)
216 expect(pool1
.opts
.workerOptions
).toBeUndefined()
217 await pool1
.destroy()
218 pool1
= new FixedThreadPool(numberOfThreads
, workerFilePath
, {
220 env
: { TEST
: 'test' },
224 expect(pool1
.opts
.workerOptions
).toStrictEqual({
225 env
: { TEST
: 'test' },
228 await pool1
.destroy()
231 it('Should work even without opts in input', async () => {
232 const pool1
= new FixedThreadPool(
234 './tests/worker-files/thread/testWorker.js'
236 const res
= await pool1
.execute()
237 expect(res
).toStrictEqual({ ok
: 1 })
238 // We need to clean up the resources after our test
239 await pool1
.destroy()
242 it('Verify that a pool with zero worker fails', async () => {
244 () => new FixedThreadPool(0, './tests/worker-files/thread/testWorker.js')
245 ).toThrowError('Cannot instantiate a fixed pool with zero worker')