1 const { expect
} = require('expect')
2 const { FixedThreadPool
, PoolEvents
} = require('../../../lib')
3 const { TaskFunctions
} = require('../../test-types')
4 const { waitPoolEvents
, waitWorkerEvents
} = require('../../test-utils')
5 const { DEFAULT_TASK_NAME
} = require('../../../lib/utils')
7 describe('Fixed thread pool test suite', () => {
8 const numberOfThreads
= 6
9 const tasksConcurrency
= 2
10 const pool
= new FixedThreadPool(
12 './tests/worker-files/thread/testWorker.js',
14 errorHandler
: (e
) => console
.error(e
)
17 const queuePool
= new FixedThreadPool(
19 './tests/worker-files/thread/testWorker.js',
21 enableTasksQueue
: true,
23 concurrency
: tasksConcurrency
25 errorHandler
: (e
) => console
.error(e
)
28 const emptyPool
= new FixedThreadPool(
30 './tests/worker-files/thread/emptyWorker.js',
31 { exitHandler
: () => console
.info('empty pool worker exited') }
33 const echoPool
= new FixedThreadPool(
35 './tests/worker-files/thread/echoWorker.js'
37 const errorPool
= new FixedThreadPool(
39 './tests/worker-files/thread/errorWorker.js',
41 errorHandler
: (e
) => console
.error(e
)
44 const asyncErrorPool
= new FixedThreadPool(
46 './tests/worker-files/thread/asyncErrorWorker.js',
48 errorHandler
: (e
) => console
.error(e
)
51 const asyncPool
= new FixedThreadPool(
53 './tests/worker-files/thread/asyncWorker.js'
56 after('Destroy all pools', async () => {
57 // We need to clean up the resources after our test
58 await echoPool
.destroy()
59 await asyncPool
.destroy()
60 await errorPool
.destroy()
61 await asyncErrorPool
.destroy()
62 await emptyPool
.destroy()
63 await queuePool
.destroy()
66 it('Verify that the function is executed in a worker thread', async () => {
67 let result
= await pool
.execute({
68 function: TaskFunctions
.fibonacci
70 expect(result
).toBe(75025)
71 result
= await pool
.execute({
72 function: TaskFunctions
.factorial
74 expect(result
).toBe(9.33262154439441e157
)
77 it('Verify that is possible to invoke the execute() method without input', async () => {
78 const result
= await pool
.execute()
79 expect(result
).toStrictEqual({ ok
: 1 })
82 it("Verify that 'ready' event is emitted", async () => {
83 const pool1
= new FixedThreadPool(
85 './tests/worker-files/thread/testWorker.js',
87 errorHandler
: (e
) => console
.error(e
)
91 pool1
.emitter
.on(PoolEvents
.ready
, () => ++poolReady
)
92 await
waitPoolEvents(pool1
, PoolEvents
.ready
, 1)
93 expect(poolReady
).toBe(1)
96 it("Verify that 'busy' event is emitted", async () => {
97 const promises
= new Set()
99 pool
.emitter
.on(PoolEvents
.busy
, () => ++poolBusy
)
100 for (let i
= 0; i
< numberOfThreads
* 2; i
++) {
101 promises
.add(pool
.execute())
103 await Promise
.all(promises
)
104 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
105 // So in total numberOfThreads + 1 times for a loop submitting up to numberOfThreads * 2 tasks to the fixed pool.
106 expect(poolBusy
).toBe(numberOfThreads
+ 1)
109 it('Verify that tasks queuing is working', async () => {
110 const promises
= new Set()
111 const maxMultiplier
= 3 // Must be greater than tasksConcurrency
112 for (let i
= 0; i
< numberOfThreads
* maxMultiplier
; i
++) {
113 promises
.add(queuePool
.execute())
115 expect(promises
.size
).toBe(numberOfThreads
* maxMultiplier
)
116 for (const workerNode
of queuePool
.workerNodes
) {
117 expect(workerNode
.usage
.tasks
.executing
).toBeGreaterThanOrEqual(0)
118 expect(workerNode
.usage
.tasks
.executing
).toBeLessThanOrEqual(
119 queuePool
.opts
.tasksQueueOptions
.concurrency
121 expect(workerNode
.usage
.tasks
.executed
).toBe(0)
122 expect(workerNode
.usage
.tasks
.queued
).toBe(
123 maxMultiplier
- queuePool
.opts
.tasksQueueOptions
.concurrency
125 expect(workerNode
.usage
.tasks
.maxQueued
).toBe(
126 maxMultiplier
- queuePool
.opts
.tasksQueueOptions
.concurrency
128 expect(workerNode
.usage
.tasks
.stolen
).toBe(0)
130 expect(queuePool
.info
.executedTasks
).toBe(0)
131 expect(queuePool
.info
.executingTasks
).toBe(
132 numberOfThreads
* queuePool
.opts
.tasksQueueOptions
.concurrency
134 expect(queuePool
.info
.queuedTasks
).toBe(
136 (maxMultiplier
- queuePool
.opts
.tasksQueueOptions
.concurrency
)
138 expect(queuePool
.info
.maxQueuedTasks
).toBe(
140 (maxMultiplier
- queuePool
.opts
.tasksQueueOptions
.concurrency
)
142 expect(queuePool
.info
.backPressure
).toBe(false)
143 expect(queuePool
.info
.stolenTasks
).toBe(0)
144 await Promise
.all(promises
)
145 for (const workerNode
of queuePool
.workerNodes
) {
146 expect(workerNode
.usage
.tasks
.executing
).toBeGreaterThanOrEqual(0)
147 expect(workerNode
.usage
.tasks
.executing
).toBeLessThanOrEqual(
148 numberOfThreads
* maxMultiplier
150 expect(workerNode
.usage
.tasks
.executed
).toBe(maxMultiplier
)
151 expect(workerNode
.usage
.tasks
.queued
).toBe(0)
152 expect(workerNode
.usage
.tasks
.maxQueued
).toBe(
153 maxMultiplier
- queuePool
.opts
.tasksQueueOptions
.concurrency
155 expect(workerNode
.usage
.tasks
.stolen
).toBeGreaterThanOrEqual(0)
156 expect(workerNode
.usage
.tasks
.stolen
).toBeLessThanOrEqual(
157 numberOfThreads
* maxMultiplier
160 expect(queuePool
.info
.executedTasks
).toBe(numberOfThreads
* maxMultiplier
)
161 expect(queuePool
.info
.backPressure
).toBe(false)
162 expect(queuePool
.info
.stolenTasks
).toBeGreaterThanOrEqual(0)
163 expect(queuePool
.info
.stolenTasks
).toBeLessThanOrEqual(
164 numberOfThreads
* maxMultiplier
168 it('Verify that is possible to have a worker that return undefined', async () => {
169 const result
= await emptyPool
.execute()
170 expect(result
).toBeUndefined()
173 it('Verify that data are sent to the worker correctly', async () => {
174 const data
= { f
: 10 }
175 const result
= await echoPool
.execute(data
)
176 expect(result
).toStrictEqual(data
)
179 it('Verify that transferable objects are sent to the worker correctly', async () => {
183 result
= await pool
.execute(undefined, undefined, [
185 new MessageChannel().port1
190 expect(result
).toStrictEqual({ ok
: 1 })
191 expect(error
).toBeUndefined()
193 result
= await pool
.execute(undefined, undefined, [
194 new SharedArrayBuffer(16)
199 expect(result
).toStrictEqual({ ok
: 1 })
200 expect(error
).toStrictEqual(
201 new TypeError('Found invalid object in transferList')
205 it('Verify that error handling is working properly:sync', async () => {
206 const data
= { f
: 10 }
208 errorPool
.emitter
.on(PoolEvents
.taskError
, (e
) => {
213 await errorPool
.execute(data
)
217 expect(inError
).toBeDefined()
218 expect(inError
).toBeInstanceOf(Error
)
219 expect(inError
.message
).toBeDefined()
220 expect(typeof inError
.message
=== 'string').toBe(true)
221 expect(inError
.message
).toBe('Error Message from ThreadWorker')
222 expect(taskError
).toStrictEqual({
223 name
: DEFAULT_TASK_NAME
,
224 message
: new Error('Error Message from ThreadWorker'),
228 errorPool
.workerNodes
.some(
229 (workerNode
) => workerNode
.usage
.tasks
.failed
=== 1
234 it('Verify that error handling is working properly:async', async () => {
235 const data
= { f
: 10 }
237 asyncErrorPool
.emitter
.on(PoolEvents
.taskError
, (e
) => {
242 await asyncErrorPool
.execute(data
)
246 expect(inError
).toBeDefined()
247 expect(inError
).toBeInstanceOf(Error
)
248 expect(inError
.message
).toBeDefined()
249 expect(typeof inError
.message
=== 'string').toBe(true)
250 expect(inError
.message
).toBe('Error Message from ThreadWorker:async')
251 expect(taskError
).toStrictEqual({
252 name
: DEFAULT_TASK_NAME
,
253 message
: new Error('Error Message from ThreadWorker:async'),
257 asyncErrorPool
.workerNodes
.some(
258 (workerNode
) => workerNode
.usage
.tasks
.failed
=== 1
263 it('Verify that async function is working properly', async () => {
264 const data
= { f
: 10 }
265 const startTime
= performance
.now()
266 const result
= await asyncPool
.execute(data
)
267 const usedTime
= performance
.now() - startTime
268 expect(result
).toStrictEqual(data
)
269 expect(usedTime
).toBeGreaterThanOrEqual(2000)
272 it('Shutdown test', async () => {
273 const exitPromise
= waitWorkerEvents(pool
, 'exit', numberOfThreads
)
275 pool
.emitter
.on(PoolEvents
.destroy
, () => ++poolDestroy
)
277 const numberOfExitEvents
= await exitPromise
278 expect(numberOfExitEvents
).toBe(numberOfThreads
)
279 expect(poolDestroy
).toBe(1)
282 it('Verify that thread pool options are checked', async () => {
283 const workerFilePath
= './tests/worker-files/thread/testWorker.js'
284 let pool1
= new FixedThreadPool(numberOfThreads
, workerFilePath
)
285 expect(pool1
.opts
.workerOptions
).toBeUndefined()
286 await pool1
.destroy()
287 pool1
= new FixedThreadPool(numberOfThreads
, workerFilePath
, {
289 env
: { TEST
: 'test' },
293 expect(pool1
.opts
.workerOptions
).toStrictEqual({
294 env
: { TEST
: 'test' },
297 await pool1
.destroy()
300 it('Should work even without opts in input', async () => {
301 const pool1
= new FixedThreadPool(
303 './tests/worker-files/thread/testWorker.js'
305 const res
= await pool1
.execute()
306 expect(res
).toStrictEqual({ ok
: 1 })
307 // We need to clean up the resources after our test
308 await pool1
.destroy()
311 it('Verify that a pool with zero worker fails', async () => {
313 () => new FixedThreadPool(0, './tests/worker-files/thread/testWorker.js')
314 ).toThrowError('Cannot instantiate a fixed pool with zero worker')