636121fd4812eb8705bb2aed57119e8b84f8bcfb
1 const { expect
} = require('expect')
2 const { FixedThreadPool
, PoolEvents
} = require('../../../lib')
3 const { WorkerFunctions
} = require('../../test-types')
4 const { waitPoolEvents
, waitWorkerEvents
} = require('../../test-utils')
6 describe('Fixed thread pool test suite', () => {
7 const numberOfThreads
= 6
8 const pool
= new FixedThreadPool(
10 './tests/worker-files/thread/testWorker.js',
12 errorHandler
: e
=> console
.error(e
)
15 const queuePool
= new FixedThreadPool(
17 './tests/worker-files/thread/testWorker.js',
19 enableTasksQueue
: true,
23 errorHandler
: e
=> console
.error(e
)
26 const emptyPool
= new FixedThreadPool(
28 './tests/worker-files/thread/emptyWorker.js',
29 { exitHandler
: () => console
.log('empty pool worker exited') }
31 const echoPool
= new FixedThreadPool(
33 './tests/worker-files/thread/echoWorker.js'
35 const errorPool
= new FixedThreadPool(
37 './tests/worker-files/thread/errorWorker.js',
39 errorHandler
: e
=> console
.error(e
)
42 const asyncErrorPool
= new FixedThreadPool(
44 './tests/worker-files/thread/asyncErrorWorker.js',
46 errorHandler
: e
=> console
.error(e
)
49 const asyncPool
= new FixedThreadPool(
51 './tests/worker-files/thread/asyncWorker.js'
54 after('Destroy all pools', async () => {
55 // We need to clean up the resources after our test
56 await echoPool
.destroy()
57 await asyncPool
.destroy()
58 await errorPool
.destroy()
59 await asyncErrorPool
.destroy()
60 await emptyPool
.destroy()
61 await queuePool
.destroy()
64 it('Verify that the function is executed in a worker thread', async () => {
65 let result
= await pool
.execute({
66 function: WorkerFunctions
.fibonacci
68 expect(result
).toBe(75025)
69 result
= await pool
.execute({
70 function: WorkerFunctions
.factorial
72 expect(result
).toBe(9.33262154439441e157
)
75 it('Verify that is possible to invoke the execute() method without input', async () => {
76 const result
= await pool
.execute()
77 expect(result
).toStrictEqual({ ok
: 1 })
80 it("Verify that 'ready' event is emitted", async () => {
81 const pool1
= new FixedThreadPool(
83 './tests/worker-files/thread/testWorker.js',
85 errorHandler
: e
=> console
.error(e
)
89 pool1
.emitter
.on(PoolEvents
.ready
, () => ++poolReady
)
90 if (!pool1
.info
.ready
) {
91 await
waitPoolEvents(pool1
, PoolEvents
.ready
, 1)
93 expect(poolReady
).toBe(1)
96 it("Verify that 'busy' event is emitted", async () => {
98 pool
.emitter
.on(PoolEvents
.busy
, () => ++poolBusy
)
99 for (let i
= 0; i
< numberOfThreads
* 2; i
++) {
102 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
103 // So in total numberOfThreads + 1 times for a loop submitting up to numberOfThreads * 2 tasks to the fixed pool.
104 expect(poolBusy
).toBe(numberOfThreads
+ 1)
107 it('Verify that tasks queuing is working', async () => {
108 const promises
= new Set()
109 const maxMultiplier
= 2
110 for (let i
= 0; i
< numberOfThreads
* maxMultiplier
; i
++) {
111 promises
.add(queuePool
.execute())
113 expect(promises
.size
).toBe(numberOfThreads
* maxMultiplier
)
114 for (const workerNode
of queuePool
.workerNodes
) {
115 expect(workerNode
.usage
.tasks
.executing
).toBeLessThanOrEqual(
116 queuePool
.opts
.tasksQueueOptions
.concurrency
118 expect(workerNode
.usage
.tasks
.executed
).toBe(0)
119 expect(workerNode
.usage
.tasks
.queued
).toBeGreaterThan(0)
120 expect(workerNode
.usage
.tasks
.maxQueued
).toBeGreaterThan(0)
122 expect(queuePool
.info
.executingTasks
).toBe(numberOfThreads
)
123 expect(queuePool
.info
.queuedTasks
).toBe(
124 numberOfThreads
* maxMultiplier
- numberOfThreads
126 expect(queuePool
.info
.maxQueuedTasks
).toBe(
127 numberOfThreads
* maxMultiplier
- numberOfThreads
129 await Promise
.all(promises
)
130 for (const workerNode
of queuePool
.workerNodes
) {
131 expect(workerNode
.usage
.tasks
.executing
).toBe(0)
132 expect(workerNode
.usage
.tasks
.executed
).toBeGreaterThan(0)
133 expect(workerNode
.usage
.tasks
.executed
).toBeLessThanOrEqual(maxMultiplier
)
134 expect(workerNode
.usage
.tasks
.queued
).toBe(0)
135 expect(workerNode
.usage
.tasks
.maxQueued
).toBe(1)
139 it('Verify that is possible to have a worker that return undefined', async () => {
140 const result
= await emptyPool
.execute()
141 expect(result
).toBeUndefined()
144 it('Verify that data are sent to the worker correctly', async () => {
145 const data
= { f
: 10 }
146 const result
= await echoPool
.execute(data
)
147 expect(result
).toStrictEqual(data
)
150 it('Verify that error handling is working properly:sync', async () => {
151 const data
= { f
: 10 }
153 errorPool
.emitter
.on(PoolEvents
.taskError
, e
=> {
158 await errorPool
.execute(data
)
162 expect(inError
).toBeDefined()
163 expect(inError
).toBeInstanceOf(Error
)
164 expect(inError
.message
).toBeDefined()
165 expect(typeof inError
.message
=== 'string').toBe(true)
166 expect(inError
.message
).toBe('Error Message from ThreadWorker')
167 expect(taskError
).toStrictEqual({
169 message
: new Error('Error Message from ThreadWorker'),
173 errorPool
.workerNodes
.some(
174 workerNode
=> workerNode
.usage
.tasks
.failed
=== 1
179 it('Verify that error handling is working properly:async', async () => {
180 const data
= { f
: 10 }
182 asyncErrorPool
.emitter
.on(PoolEvents
.taskError
, e
=> {
187 await asyncErrorPool
.execute(data
)
191 expect(inError
).toBeDefined()
192 expect(inError
).toBeInstanceOf(Error
)
193 expect(inError
.message
).toBeDefined()
194 expect(typeof inError
.message
=== 'string').toBe(true)
195 expect(inError
.message
).toBe('Error Message from ThreadWorker:async')
196 expect(taskError
).toStrictEqual({
198 message
: new Error('Error Message from ThreadWorker:async'),
202 asyncErrorPool
.workerNodes
.some(
203 workerNode
=> workerNode
.usage
.tasks
.failed
=== 1
208 it('Verify that async function is working properly', async () => {
209 const data
= { f
: 10 }
210 const startTime
= performance
.now()
211 const result
= await asyncPool
.execute(data
)
212 const usedTime
= performance
.now() - startTime
213 expect(result
).toStrictEqual(data
)
214 expect(usedTime
).toBeGreaterThanOrEqual(2000)
217 it('Shutdown test', async () => {
218 const exitPromise
= waitWorkerEvents(pool
, 'exit', numberOfThreads
)
220 const numberOfExitEvents
= await exitPromise
221 expect(numberOfExitEvents
).toBe(numberOfThreads
)
224 it('Verify that thread pool options are checked', async () => {
225 const workerFilePath
= './tests/worker-files/thread/testWorker.js'
226 let pool1
= new FixedThreadPool(numberOfThreads
, workerFilePath
)
227 expect(pool1
.opts
.workerOptions
).toBeUndefined()
228 await pool1
.destroy()
229 pool1
= new FixedThreadPool(numberOfThreads
, workerFilePath
, {
231 env
: { TEST
: 'test' },
235 expect(pool1
.opts
.workerOptions
).toStrictEqual({
236 env
: { TEST
: 'test' },
239 await pool1
.destroy()
242 it('Should work even without opts in input', async () => {
243 const pool1
= new FixedThreadPool(
245 './tests/worker-files/thread/testWorker.js'
247 const res
= await pool1
.execute()
248 expect(res
).toStrictEqual({ ok
: 1 })
249 // We need to clean up the resources after our test
250 await pool1
.destroy()
253 it('Verify that a pool with zero worker fails', async () => {
255 () => new FixedThreadPool(0, './tests/worker-files/thread/testWorker.js')
256 ).toThrowError('Cannot instantiate a fixed pool with zero worker')