1 const { expect
} = require('expect')
2 const { FixedThreadPool
, PoolEvents
} = require('../../../lib')
3 const { TaskFunctions
} = require('../../test-types')
4 const { waitPoolEvents
, waitWorkerEvents
} = require('../../test-utils')
6 describe('Fixed thread pool test suite', () => {
7 const numberOfThreads
= 6
8 const tasksConcurrency
= 2
9 const pool
= new FixedThreadPool(
11 './tests/worker-files/thread/testWorker.js',
13 errorHandler
: (e
) => console
.error(e
)
16 const queuePool
= new FixedThreadPool(
18 './tests/worker-files/thread/testWorker.js',
20 enableTasksQueue
: true,
22 concurrency
: tasksConcurrency
24 errorHandler
: (e
) => console
.error(e
)
27 const emptyPool
= new FixedThreadPool(
29 './tests/worker-files/thread/emptyWorker.js',
30 { exitHandler
: () => console
.info('empty pool worker exited') }
32 const echoPool
= new FixedThreadPool(
34 './tests/worker-files/thread/echoWorker.js'
36 const errorPool
= new FixedThreadPool(
38 './tests/worker-files/thread/errorWorker.js',
40 errorHandler
: (e
) => console
.error(e
)
43 const asyncErrorPool
= new FixedThreadPool(
45 './tests/worker-files/thread/asyncErrorWorker.js',
47 errorHandler
: (e
) => console
.error(e
)
50 const asyncPool
= new FixedThreadPool(
52 './tests/worker-files/thread/asyncWorker.js'
55 after('Destroy all pools', async () => {
56 // We need to clean up the resources after our test
57 await echoPool
.destroy()
58 await asyncPool
.destroy()
59 await errorPool
.destroy()
60 await asyncErrorPool
.destroy()
61 await emptyPool
.destroy()
62 await queuePool
.destroy()
65 it('Verify that the function is executed in a worker thread', async () => {
66 let result
= await pool
.execute({
67 function: TaskFunctions
.fibonacci
69 expect(result
).toBe(75025)
70 result
= await pool
.execute({
71 function: TaskFunctions
.factorial
73 expect(result
).toBe(9.33262154439441e157
)
76 it('Verify that is possible to invoke the execute() method without input', async () => {
77 const result
= await pool
.execute()
78 expect(result
).toStrictEqual({ ok
: 1 })
81 it("Verify that 'ready' event is emitted", async () => {
82 const pool1
= new FixedThreadPool(
84 './tests/worker-files/thread/testWorker.js',
86 errorHandler
: (e
) => console
.error(e
)
90 pool1
.emitter
.on(PoolEvents
.ready
, () => ++poolReady
)
91 await
waitPoolEvents(pool1
, PoolEvents
.ready
, 1)
92 expect(poolReady
).toBe(1)
95 it("Verify that 'busy' event is emitted", async () => {
96 const promises
= new Set()
98 pool
.emitter
.on(PoolEvents
.busy
, () => ++poolBusy
)
99 for (let i
= 0; i
< numberOfThreads
* 2; i
++) {
100 promises
.add(pool
.execute())
102 await Promise
.all(promises
)
103 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
104 // So in total numberOfThreads + 1 times for a loop submitting up to numberOfThreads * 2 tasks to the fixed pool.
105 expect(poolBusy
).toBe(numberOfThreads
+ 1)
108 it('Verify that tasks queuing is working', async () => {
109 const promises
= new Set()
110 const maxMultiplier
= 3 // Must be greater than tasksConcurrency
111 for (let i
= 0; i
< numberOfThreads
* maxMultiplier
; i
++) {
112 promises
.add(queuePool
.execute())
114 expect(promises
.size
).toBe(numberOfThreads
* maxMultiplier
)
115 for (const workerNode
of queuePool
.workerNodes
) {
116 expect(workerNode
.usage
.tasks
.executing
).toBeGreaterThanOrEqual(0)
117 expect(workerNode
.usage
.tasks
.executing
).toBeLessThanOrEqual(
118 queuePool
.opts
.tasksQueueOptions
.concurrency
120 expect(workerNode
.usage
.tasks
.executed
).toBe(0)
121 expect(workerNode
.usage
.tasks
.queued
).toBe(
122 maxMultiplier
- queuePool
.opts
.tasksQueueOptions
.concurrency
124 expect(workerNode
.usage
.tasks
.maxQueued
).toBe(
125 maxMultiplier
- queuePool
.opts
.tasksQueueOptions
.concurrency
127 expect(workerNode
.usage
.tasks
.stolen
).toBe(0)
129 expect(queuePool
.info
.executedTasks
).toBe(0)
130 expect(queuePool
.info
.executingTasks
).toBe(
131 numberOfThreads
* queuePool
.opts
.tasksQueueOptions
.concurrency
133 expect(queuePool
.info
.queuedTasks
).toBe(
135 (maxMultiplier
- queuePool
.opts
.tasksQueueOptions
.concurrency
)
137 expect(queuePool
.info
.maxQueuedTasks
).toBe(
139 (maxMultiplier
- queuePool
.opts
.tasksQueueOptions
.concurrency
)
141 expect(queuePool
.info
.backPressure
).toBe(false)
142 expect(queuePool
.info
.stolenTasks
).toBe(0)
143 await Promise
.all(promises
)
144 for (const workerNode
of queuePool
.workerNodes
) {
145 expect(workerNode
.usage
.tasks
.executing
).toBeGreaterThanOrEqual(0)
146 expect(workerNode
.usage
.tasks
.executing
).toBeLessThanOrEqual(
147 numberOfThreads
* maxMultiplier
149 expect(workerNode
.usage
.tasks
.executed
).toBe(maxMultiplier
)
150 expect(workerNode
.usage
.tasks
.queued
).toBe(0)
151 expect(workerNode
.usage
.tasks
.maxQueued
).toBe(
152 maxMultiplier
- queuePool
.opts
.tasksQueueOptions
.concurrency
154 expect(workerNode
.usage
.tasks
.stolen
).toBeGreaterThanOrEqual(0)
155 expect(workerNode
.usage
.tasks
.stolen
).toBeLessThanOrEqual(
156 numberOfThreads
* maxMultiplier
159 expect(queuePool
.info
.executedTasks
).toBe(numberOfThreads
* maxMultiplier
)
160 expect(queuePool
.info
.backPressure
).toBe(false)
161 expect(queuePool
.info
.stolenTasks
).toBeGreaterThanOrEqual(0)
162 expect(queuePool
.info
.stolenTasks
).toBeLessThanOrEqual(
163 numberOfThreads
* maxMultiplier
167 it('Verify that is possible to have a worker that return undefined', async () => {
168 const result
= await emptyPool
.execute()
169 expect(result
).toBeUndefined()
172 it('Verify that data are sent to the worker correctly', async () => {
173 const data
= { f
: 10 }
174 const result
= await echoPool
.execute(data
)
175 expect(result
).toStrictEqual(data
)
178 it('Verify that transferable objects are sent to the worker correctly', async () => {
182 result
= await pool
.execute(undefined, undefined, [
184 new MessageChannel().port1
189 expect(result
).toStrictEqual({ ok
: 1 })
190 expect(error
).toBeUndefined()
192 result
= await pool
.execute(undefined, undefined, [
193 new SharedArrayBuffer(16)
198 expect(result
).toStrictEqual({ ok
: 1 })
199 expect(error
).toStrictEqual(
200 new TypeError('Found invalid object in transferList')
204 it('Verify that error handling is working properly:sync', async () => {
205 const data
= { f
: 10 }
207 errorPool
.emitter
.on(PoolEvents
.taskError
, (e
) => {
212 await errorPool
.execute(data
)
216 expect(inError
).toBeDefined()
217 expect(inError
).toBeInstanceOf(Error
)
218 expect(inError
.message
).toBeDefined()
219 expect(typeof inError
.message
=== 'string').toBe(true)
220 expect(inError
.message
).toBe('Error Message from ThreadWorker')
221 expect(taskError
).toStrictEqual({
223 message
: new Error('Error Message from ThreadWorker'),
227 errorPool
.workerNodes
.some(
228 (workerNode
) => workerNode
.usage
.tasks
.failed
=== 1
233 it('Verify that error handling is working properly:async', async () => {
234 const data
= { f
: 10 }
236 asyncErrorPool
.emitter
.on(PoolEvents
.taskError
, (e
) => {
241 await asyncErrorPool
.execute(data
)
245 expect(inError
).toBeDefined()
246 expect(inError
).toBeInstanceOf(Error
)
247 expect(inError
.message
).toBeDefined()
248 expect(typeof inError
.message
=== 'string').toBe(true)
249 expect(inError
.message
).toBe('Error Message from ThreadWorker:async')
250 expect(taskError
).toStrictEqual({
252 message
: new Error('Error Message from ThreadWorker:async'),
256 asyncErrorPool
.workerNodes
.some(
257 (workerNode
) => workerNode
.usage
.tasks
.failed
=== 1
262 it('Verify that async function is working properly', async () => {
263 const data
= { f
: 10 }
264 const startTime
= performance
.now()
265 const result
= await asyncPool
.execute(data
)
266 const usedTime
= performance
.now() - startTime
267 expect(result
).toStrictEqual(data
)
268 expect(usedTime
).toBeGreaterThanOrEqual(2000)
271 it('Shutdown test', async () => {
272 const exitPromise
= waitWorkerEvents(pool
, 'exit', numberOfThreads
)
274 pool
.emitter
.on(PoolEvents
.destroy
, () => ++poolDestroy
)
276 const numberOfExitEvents
= await exitPromise
277 expect(numberOfExitEvents
).toBe(numberOfThreads
)
278 expect(poolDestroy
).toBe(1)
281 it('Verify that thread pool options are checked', async () => {
282 const workerFilePath
= './tests/worker-files/thread/testWorker.js'
283 let pool1
= new FixedThreadPool(numberOfThreads
, workerFilePath
)
284 expect(pool1
.opts
.workerOptions
).toBeUndefined()
285 await pool1
.destroy()
286 pool1
= new FixedThreadPool(numberOfThreads
, workerFilePath
, {
288 env
: { TEST
: 'test' },
292 expect(pool1
.opts
.workerOptions
).toStrictEqual({
293 env
: { TEST
: 'test' },
296 await pool1
.destroy()
299 it('Should work even without opts in input', async () => {
300 const pool1
= new FixedThreadPool(
302 './tests/worker-files/thread/testWorker.js'
304 const res
= await pool1
.execute()
305 expect(res
).toStrictEqual({ ok
: 1 })
306 // We need to clean up the resources after our test
307 await pool1
.destroy()
310 it('Verify that a pool with zero worker fails', async () => {
312 () => new FixedThreadPool(0, './tests/worker-files/thread/testWorker.js')
313 ).toThrowError('Cannot instantiate a fixed pool with zero worker')