1 const { expect
} = require('expect')
2 const { FixedThreadPool
, PoolEvents
} = require('../../../lib')
3 const { TaskFunctions
} = require('../../test-types')
4 const { waitPoolEvents
, waitWorkerEvents
} = require('../../test-utils')
6 describe('Fixed thread pool test suite', () => {
7 const numberOfThreads
= 6
8 const tasksConcurrency
= 2
9 const pool
= new FixedThreadPool(
11 './tests/worker-files/thread/testWorker.js',
13 errorHandler
: (e
) => console
.error(e
)
16 const queuePool
= new FixedThreadPool(
18 './tests/worker-files/thread/testWorker.js',
20 enableTasksQueue
: true,
22 concurrency
: tasksConcurrency
24 errorHandler
: (e
) => console
.error(e
)
27 const emptyPool
= new FixedThreadPool(
29 './tests/worker-files/thread/emptyWorker.js',
30 { exitHandler
: () => console
.info('empty pool worker exited') }
32 const echoPool
= new FixedThreadPool(
34 './tests/worker-files/thread/echoWorker.js'
36 const errorPool
= new FixedThreadPool(
38 './tests/worker-files/thread/errorWorker.js',
40 errorHandler
: (e
) => console
.error(e
)
43 const asyncErrorPool
= new FixedThreadPool(
45 './tests/worker-files/thread/asyncErrorWorker.js',
47 errorHandler
: (e
) => console
.error(e
)
50 const asyncPool
= new FixedThreadPool(
52 './tests/worker-files/thread/asyncWorker.js'
55 after('Destroy all pools', async () => {
56 // We need to clean up the resources after our test
57 await echoPool
.destroy()
58 await asyncPool
.destroy()
59 await errorPool
.destroy()
60 await asyncErrorPool
.destroy()
61 await emptyPool
.destroy()
62 await queuePool
.destroy()
65 it('Verify that the function is executed in a worker thread', async () => {
66 let result
= await pool
.execute({
67 function: TaskFunctions
.fibonacci
69 expect(result
).toBe(75025)
70 result
= await pool
.execute({
71 function: TaskFunctions
.factorial
73 expect(result
).toBe(9.33262154439441e157
)
76 it('Verify that is possible to invoke the execute() method without input', async () => {
77 const result
= await pool
.execute()
78 expect(result
).toStrictEqual({ ok
: 1 })
81 it("Verify that 'ready' event is emitted", async () => {
82 const pool1
= new FixedThreadPool(
84 './tests/worker-files/thread/testWorker.js',
86 errorHandler
: (e
) => console
.error(e
)
90 pool1
.emitter
.on(PoolEvents
.ready
, () => ++poolReady
)
91 await
waitPoolEvents(pool1
, 'ready', 1)
92 expect(poolReady
).toBe(1)
95 it("Verify that 'busy' event is emitted", () => {
97 pool
.emitter
.on(PoolEvents
.busy
, () => ++poolBusy
)
98 for (let i
= 0; i
< numberOfThreads
* 2; i
++) {
101 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
102 // So in total numberOfThreads + 1 times for a loop submitting up to numberOfThreads * 2 tasks to the fixed pool.
103 expect(poolBusy
).toBe(numberOfThreads
+ 1)
106 it('Verify that tasks queuing is working', async () => {
107 const promises
= new Set()
108 const maxMultiplier
= 3 // Must be greater than tasksConcurrency
109 for (let i
= 0; i
< numberOfThreads
* maxMultiplier
; i
++) {
110 promises
.add(queuePool
.execute())
112 expect(promises
.size
).toBe(numberOfThreads
* maxMultiplier
)
113 for (const workerNode
of queuePool
.workerNodes
) {
114 expect(workerNode
.usage
.tasks
.executing
).toBeLessThanOrEqual(
115 queuePool
.opts
.tasksQueueOptions
.concurrency
117 expect(workerNode
.usage
.tasks
.executed
).toBe(0)
118 expect(workerNode
.usage
.tasks
.queued
).toBe(
119 maxMultiplier
- queuePool
.opts
.tasksQueueOptions
.concurrency
121 expect(workerNode
.usage
.tasks
.maxQueued
).toBe(
122 maxMultiplier
- queuePool
.opts
.tasksQueueOptions
.concurrency
125 expect(queuePool
.info
.executingTasks
).toBe(
126 numberOfThreads
* queuePool
.opts
.tasksQueueOptions
.concurrency
128 expect(queuePool
.info
.queuedTasks
).toBe(
130 (maxMultiplier
- queuePool
.opts
.tasksQueueOptions
.concurrency
)
132 expect(queuePool
.info
.maxQueuedTasks
).toBe(
134 (maxMultiplier
- queuePool
.opts
.tasksQueueOptions
.concurrency
)
136 await Promise
.all(promises
)
137 for (const workerNode
of queuePool
.workerNodes
) {
138 expect(workerNode
.usage
.tasks
.executing
).toBe(0)
139 expect(workerNode
.usage
.tasks
.executed
).toBe(maxMultiplier
)
140 expect(workerNode
.usage
.tasks
.queued
).toBe(0)
141 expect(workerNode
.usage
.tasks
.maxQueued
).toBe(
142 maxMultiplier
- queuePool
.opts
.tasksQueueOptions
.concurrency
147 it('Verify that is possible to have a worker that return undefined', async () => {
148 const result
= await emptyPool
.execute()
149 expect(result
).toBeUndefined()
152 it('Verify that data are sent to the worker correctly', async () => {
153 const data
= { f
: 10 }
154 const result
= await echoPool
.execute(data
)
155 expect(result
).toStrictEqual(data
)
158 it('Verify that transferable objects are sent to the worker correctly', async () => {
162 result
= await pool
.execute(undefined, undefined, [
164 new MessageChannel().port1
169 expect(result
).toStrictEqual({ ok
: 1 })
170 expect(error
).toBeUndefined()
172 result
= await pool
.execute(undefined, undefined, [
173 new SharedArrayBuffer(16)
178 expect(result
).toStrictEqual({ ok
: 1 })
179 expect(error
).toStrictEqual(
180 new TypeError('Found invalid object in transferList')
184 it('Verify that error handling is working properly:sync', async () => {
185 const data
= { f
: 10 }
187 errorPool
.emitter
.on(PoolEvents
.taskError
, (e
) => {
192 await errorPool
.execute(data
)
196 expect(inError
).toBeDefined()
197 expect(inError
).toBeInstanceOf(Error
)
198 expect(inError
.message
).toBeDefined()
199 expect(typeof inError
.message
=== 'string').toBe(true)
200 expect(inError
.message
).toBe('Error Message from ThreadWorker')
201 expect(taskError
).toStrictEqual({
203 message
: new Error('Error Message from ThreadWorker'),
207 errorPool
.workerNodes
.some(
208 (workerNode
) => workerNode
.usage
.tasks
.failed
=== 1
213 it('Verify that error handling is working properly:async', async () => {
214 const data
= { f
: 10 }
216 asyncErrorPool
.emitter
.on(PoolEvents
.taskError
, (e
) => {
221 await asyncErrorPool
.execute(data
)
225 expect(inError
).toBeDefined()
226 expect(inError
).toBeInstanceOf(Error
)
227 expect(inError
.message
).toBeDefined()
228 expect(typeof inError
.message
=== 'string').toBe(true)
229 expect(inError
.message
).toBe('Error Message from ThreadWorker:async')
230 expect(taskError
).toStrictEqual({
232 message
: new Error('Error Message from ThreadWorker:async'),
236 asyncErrorPool
.workerNodes
.some(
237 (workerNode
) => workerNode
.usage
.tasks
.failed
=== 1
242 it('Verify that async function is working properly', async () => {
243 const data
= { f
: 10 }
244 const startTime
= performance
.now()
245 const result
= await asyncPool
.execute(data
)
246 const usedTime
= performance
.now() - startTime
247 expect(result
).toStrictEqual(data
)
248 expect(usedTime
).toBeGreaterThanOrEqual(2000)
251 it('Shutdown test', async () => {
252 const exitPromise
= waitWorkerEvents(pool
, 'exit', numberOfThreads
)
254 const numberOfExitEvents
= await exitPromise
255 expect(numberOfExitEvents
).toBe(numberOfThreads
)
258 it('Verify that thread pool options are checked', async () => {
259 const workerFilePath
= './tests/worker-files/thread/testWorker.js'
260 let pool1
= new FixedThreadPool(numberOfThreads
, workerFilePath
)
261 expect(pool1
.opts
.workerOptions
).toBeUndefined()
262 await pool1
.destroy()
263 pool1
= new FixedThreadPool(numberOfThreads
, workerFilePath
, {
265 env
: { TEST
: 'test' },
269 expect(pool1
.opts
.workerOptions
).toStrictEqual({
270 env
: { TEST
: 'test' },
273 await pool1
.destroy()
276 it('Should work even without opts in input', async () => {
277 const pool1
= new FixedThreadPool(
279 './tests/worker-files/thread/testWorker.js'
281 const res
= await pool1
.execute()
282 expect(res
).toStrictEqual({ ok
: 1 })
283 // We need to clean up the resources after our test
284 await pool1
.destroy()
287 it('Verify that a pool with zero worker fails', async () => {
289 () => new FixedThreadPool(0, './tests/worker-files/thread/testWorker.js')
290 ).toThrowError('Cannot instantiate a fixed pool with zero worker')