1 import { expect } from 'expect'
3 import { FixedThreadPool, PoolEvents } from '../../../lib/index.cjs'
4 import { DEFAULT_TASK_NAME } from '../../../lib/utils.cjs'
5 import { TaskFunctions } from '../../test-types.cjs'
6 import { waitWorkerEvents } from '../../test-utils.cjs'
8 describe('Fixed thread pool test suite', () => {
9 const numberOfThreads = 6
10 const tasksConcurrency = 2
11 let asyncErrorPool, asyncPool, echoPool, emptyPool, errorPool, pool, queuePool
13 before('Create pools', () => {
14 pool = new FixedThreadPool(
16 './tests/worker-files/thread/testWorker.mjs',
18 errorHandler: e => console.error(e),
21 queuePool = new FixedThreadPool(
23 './tests/worker-files/thread/testWorker.mjs',
25 enableTasksQueue: true,
26 errorHandler: e => console.error(e),
28 concurrency: tasksConcurrency,
32 emptyPool = new FixedThreadPool(
34 './tests/worker-files/thread/emptyWorker.mjs',
35 { exitHandler: () => console.info('empty pool worker exited') }
37 echoPool = new FixedThreadPool(
39 './tests/worker-files/thread/echoWorker.mjs'
41 errorPool = new FixedThreadPool(
43 './tests/worker-files/thread/errorWorker.mjs',
45 errorHandler: e => console.error(e),
48 asyncErrorPool = new FixedThreadPool(
50 './tests/worker-files/thread/asyncErrorWorker.mjs',
52 errorHandler: e => console.error(e),
55 asyncPool = new FixedThreadPool(
57 './tests/worker-files/thread/asyncWorker.mjs'
61 after('Destroy pools', async () => {
62 // We need to clean up the resources after our tests
63 await echoPool.destroy()
64 await asyncPool.destroy()
65 await errorPool.destroy()
66 await asyncErrorPool.destroy()
67 await emptyPool.destroy()
68 await queuePool.destroy()
71 it('Verify that the function is executed in a worker thread', async () => {
72 let result = await pool.execute({
73 function: TaskFunctions.fibonacci,
75 expect(result).toBe(354224848179262000000)
76 result = await pool.execute({
77 function: TaskFunctions.factorial,
79 expect(result).toBe(9.33262154439441e157)
82 it('Verify that is possible to invoke the execute() method without input', async () => {
83 const result = await pool.execute()
84 expect(result).toStrictEqual({ ok: 1 })
87 it('Verify that tasks queuing is working', async () => {
88 const promises = new Set()
89 const maxMultiplier = 3 // Must be greater than tasksConcurrency
90 for (let i = 0; i < numberOfThreads * maxMultiplier; i++) {
91 promises.add(queuePool.execute())
93 expect(promises.size).toBe(numberOfThreads * maxMultiplier)
94 for (const workerNode of queuePool.workerNodes) {
95 expect(workerNode.usage.tasks.executing).toBeGreaterThanOrEqual(0)
96 expect(workerNode.usage.tasks.executing).toBeLessThanOrEqual(
97 queuePool.opts.tasksQueueOptions.concurrency
99 expect(workerNode.usage.tasks.executed).toBe(0)
100 expect(workerNode.usage.tasks.queued).toBe(
101 maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency
103 expect(workerNode.usage.tasks.maxQueued).toBe(
104 maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency
106 expect(workerNode.usage.tasks.sequentiallyStolen).toBe(0)
107 expect(workerNode.usage.tasks.stolen).toBe(0)
109 expect(queuePool.info.executedTasks).toBe(0)
110 expect(queuePool.info.executingTasks).toBe(
111 numberOfThreads * queuePool.opts.tasksQueueOptions.concurrency
113 expect(queuePool.info.queuedTasks).toBe(
115 (maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency)
117 expect(queuePool.info.maxQueuedTasks).toBe(
119 (maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency)
121 expect(queuePool.info.backPressure).toBe(false)
122 expect(queuePool.info.stolenTasks).toBe(0)
123 await Promise.all(promises)
124 for (const workerNode of queuePool.workerNodes) {
125 expect(workerNode.usage.tasks.executing).toBeGreaterThanOrEqual(0)
126 expect(workerNode.usage.tasks.executing).toBeLessThanOrEqual(
127 numberOfThreads * maxMultiplier
129 expect(workerNode.usage.tasks.executed).toBe(maxMultiplier)
130 expect(workerNode.usage.tasks.queued).toBe(0)
131 expect(workerNode.usage.tasks.maxQueued).toBe(
132 maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency
134 expect(workerNode.usage.tasks.sequentiallyStolen).toBeGreaterThanOrEqual(
137 expect(workerNode.usage.tasks.sequentiallyStolen).toBeLessThanOrEqual(
138 numberOfThreads * maxMultiplier
140 expect(workerNode.usage.tasks.stolen).toBeGreaterThanOrEqual(0)
141 expect(workerNode.usage.tasks.stolen).toBeLessThanOrEqual(
142 numberOfThreads * maxMultiplier
145 expect(queuePool.info.executedTasks).toBe(numberOfThreads * maxMultiplier)
146 expect(queuePool.info.backPressure).toBe(false)
147 expect(queuePool.info.stolenTasks).toBeGreaterThanOrEqual(0)
148 expect(queuePool.info.stolenTasks).toBeLessThanOrEqual(
149 numberOfThreads * maxMultiplier
153 it('Verify that is possible to have a worker that return undefined', async () => {
154 const result = await emptyPool.execute()
155 expect(result).toBeUndefined()
158 it('Verify that data are sent to the worker correctly', async () => {
159 const data = { f: 10 }
160 const result = await echoPool.execute(data)
161 expect(result).toStrictEqual(data)
164 it('Verify that transferable objects are sent to the worker correctly', async () => {
168 result = await pool.execute(undefined, undefined, [
170 new MessageChannel().port1,
175 expect(result).toStrictEqual({ ok: 1 })
176 expect(error).toBeUndefined()
178 result = await pool.execute(undefined, undefined, [
179 new SharedArrayBuffer(16),
184 expect(result).toStrictEqual({ ok: 1 })
185 expect(error).toBeInstanceOf(Error)
186 expect(error.message).toMatch(
187 /Found invalid (object|value) in transferList/
191 it('Verify that error handling is working properly:sync', async () => {
192 const data = { f: 10 }
193 expect(errorPool.emitter.eventNames()).toStrictEqual([])
195 errorPool.emitter.on(PoolEvents.taskError, e => {
198 expect(errorPool.emitter.eventNames()).toStrictEqual([PoolEvents.taskError])
201 await errorPool.execute(data)
205 expect(inError).toBeInstanceOf(Error)
206 expect(inError.message).toStrictEqual('Error Message from ThreadWorker')
207 expect(taskError).toStrictEqual({
209 message: new Error('Error Message from ThreadWorker'),
210 name: DEFAULT_TASK_NAME,
213 errorPool.workerNodes.some(
214 workerNode => workerNode.usage.tasks.failed === 1
219 it('Verify that error handling is working properly:async', async () => {
220 const data = { f: 10 }
221 expect(asyncErrorPool.emitter.eventNames()).toStrictEqual([])
223 asyncErrorPool.emitter.on(PoolEvents.taskError, e => {
226 expect(asyncErrorPool.emitter.eventNames()).toStrictEqual([
227 PoolEvents.taskError,
231 await asyncErrorPool.execute(data)
235 expect(inError).toBeInstanceOf(Error)
236 expect(inError.message).toStrictEqual(
237 'Error Message from ThreadWorker:async'
239 expect(taskError).toStrictEqual({
241 message: new Error('Error Message from ThreadWorker:async'),
242 name: DEFAULT_TASK_NAME,
245 asyncErrorPool.workerNodes.some(
246 workerNode => workerNode.usage.tasks.failed === 1
251 it('Verify that async function is working properly', async () => {
252 const data = { f: 10 }
253 const startTime = performance.now()
254 const result = await asyncPool.execute(data)
255 const usedTime = performance.now() - startTime
256 expect(result).toStrictEqual(data)
257 expect(usedTime).toBeGreaterThanOrEqual(2000)
260 it('Shutdown test', async () => {
261 const exitPromise = waitWorkerEvents(pool, 'exit', numberOfThreads)
262 expect(pool.emitter.eventNames()).toStrictEqual([])
264 pool.emitter.on(PoolEvents.destroy, () => ++poolDestroy)
265 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.destroy])
267 const numberOfExitEvents = await exitPromise
268 expect(pool.info.started).toBe(false)
269 expect(pool.info.ready).toBe(false)
270 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.destroy])
271 expect(pool.readyEventEmitted).toBe(false)
272 expect(pool.busyEventEmitted).toBe(false)
273 expect(pool.backPressureEventEmitted).toBe(false)
274 expect(pool.workerNodes.length).toBe(0)
275 expect(numberOfExitEvents).toBe(numberOfThreads)
276 expect(poolDestroy).toBe(1)
279 it('Verify that thread pool options are checked', async () => {
280 const workerFilePath = './tests/worker-files/thread/testWorker.mjs'
281 let pool = new FixedThreadPool(numberOfThreads, workerFilePath)
282 expect(pool.opts.workerOptions).toBeUndefined()
284 pool = new FixedThreadPool(numberOfThreads, workerFilePath, {
286 env: { TEST: 'test' },
290 expect(pool.opts.workerOptions).toStrictEqual({
291 env: { TEST: 'test' },
297 it('Verify destroyWorkerNode()', async () => {
298 const workerFilePath = './tests/worker-files/thread/testWorker.mjs'
299 const pool = new FixedThreadPool(numberOfThreads, workerFilePath)
300 const workerNodeKey = 0
302 pool.workerNodes[workerNodeKey].worker.on('exit', () => {
305 await expect(pool.destroyWorkerNode(workerNodeKey)).resolves.toBeUndefined()
306 expect(exitEvent).toBe(1)
307 // Simulates an illegitimate worker node destroy and the minimum number of worker nodes is guaranteed
308 expect(pool.workerNodes.length).toBe(numberOfThreads)
312 it('Verify that a pool with zero worker fails', () => {
314 () => new FixedThreadPool(0, './tests/worker-files/thread/testWorker.mjs')
315 ).toThrow('Cannot instantiate a fixed pool with zero worker')