1 import { expect } from 'expect'
2 import { FixedThreadPool, PoolEvents } from '../../../lib/index.cjs'
3 import { TaskFunctions } from '../../test-types.cjs'
4 import { waitPoolEvents, waitWorkerEvents } from '../../test-utils.cjs'
5 import { DEFAULT_TASK_NAME } from '../../../lib/utils.cjs'
7 describe('Fixed thread pool test suite', () => {
8 const numberOfThreads = 6
9 const tasksConcurrency = 2
10 const pool = new FixedThreadPool(
12 './tests/worker-files/thread/testWorker.mjs',
14 errorHandler: e => console.error(e)
17 const queuePool = new FixedThreadPool(
19 './tests/worker-files/thread/testWorker.mjs',
21 enableTasksQueue: true,
23 concurrency: tasksConcurrency
25 errorHandler: e => console.error(e)
28 const emptyPool = new FixedThreadPool(
30 './tests/worker-files/thread/emptyWorker.mjs',
31 { exitHandler: () => console.info('empty pool worker exited') }
33 const echoPool = new FixedThreadPool(
35 './tests/worker-files/thread/echoWorker.mjs'
37 const errorPool = new FixedThreadPool(
39 './tests/worker-files/thread/errorWorker.mjs',
41 errorHandler: e => console.error(e)
44 const asyncErrorPool = new FixedThreadPool(
46 './tests/worker-files/thread/asyncErrorWorker.mjs',
48 errorHandler: e => console.error(e)
51 const asyncPool = new FixedThreadPool(
53 './tests/worker-files/thread/asyncWorker.mjs'
56 after('Destroy all pools', async () => {
57 // We need to clean up the resources after our test
58 await echoPool.destroy()
59 await asyncPool.destroy()
60 await errorPool.destroy()
61 await asyncErrorPool.destroy()
62 await emptyPool.destroy()
63 await queuePool.destroy()
66 it('Verify that the function is executed in a worker thread', async () => {
67 let result = await pool.execute({
68 function: TaskFunctions.fibonacci
70 expect(result).toBe(75025)
71 result = await pool.execute({
72 function: TaskFunctions.factorial
74 expect(result).toBe(9.33262154439441e157)
77 it('Verify that is possible to invoke the execute() method without input', async () => {
78 const result = await pool.execute()
79 expect(result).toStrictEqual({ ok: 1 })
82 it("Verify that 'ready' event is emitted", async () => {
83 const pool = new FixedThreadPool(
85 './tests/worker-files/thread/testWorker.mjs',
87 errorHandler: e => console.error(e)
90 expect(pool.emitter.eventNames()).toStrictEqual([])
92 pool.emitter.on(PoolEvents.ready, () => ++poolReady)
93 await waitPoolEvents(pool, PoolEvents.ready, 1)
94 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.ready])
95 expect(poolReady).toBe(1)
99 it("Verify that 'busy' event is emitted", async () => {
100 const promises = new Set()
101 expect(pool.emitter.eventNames()).toStrictEqual([])
103 pool.emitter.on(PoolEvents.busy, () => ++poolBusy)
104 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.busy])
105 for (let i = 0; i < numberOfThreads * 2; i++) {
106 promises.add(pool.execute())
108 await Promise.all(promises)
109 // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
110 // So in total numberOfThreads + 1 times for a loop submitting up to numberOfThreads * 2 tasks to the fixed pool.
111 expect(poolBusy).toBe(numberOfThreads + 1)
114 it('Verify that tasks queuing is working', async () => {
115 const promises = new Set()
116 const maxMultiplier = 3 // Must be greater than tasksConcurrency
117 for (let i = 0; i < numberOfThreads * maxMultiplier; i++) {
118 promises.add(queuePool.execute())
120 expect(promises.size).toBe(numberOfThreads * maxMultiplier)
121 for (const workerNode of queuePool.workerNodes) {
122 expect(workerNode.usage.tasks.executing).toBeGreaterThanOrEqual(0)
123 expect(workerNode.usage.tasks.executing).toBeLessThanOrEqual(
124 queuePool.opts.tasksQueueOptions.concurrency
126 expect(workerNode.usage.tasks.executed).toBe(0)
127 expect(workerNode.usage.tasks.queued).toBe(
128 maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency
130 expect(workerNode.usage.tasks.maxQueued).toBe(
131 maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency
133 expect(workerNode.usage.tasks.sequentiallyStolen).toBe(0)
134 expect(workerNode.usage.tasks.stolen).toBe(0)
136 expect(queuePool.info.executedTasks).toBe(0)
137 expect(queuePool.info.executingTasks).toBe(
138 numberOfThreads * queuePool.opts.tasksQueueOptions.concurrency
140 expect(queuePool.info.queuedTasks).toBe(
142 (maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency)
144 expect(queuePool.info.maxQueuedTasks).toBe(
146 (maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency)
148 expect(queuePool.info.backPressure).toBe(false)
149 expect(queuePool.info.stolenTasks).toBe(0)
150 await Promise.all(promises)
151 for (const workerNode of queuePool.workerNodes) {
152 expect(workerNode.usage.tasks.executing).toBeGreaterThanOrEqual(0)
153 expect(workerNode.usage.tasks.executing).toBeLessThanOrEqual(
154 numberOfThreads * maxMultiplier
156 expect(workerNode.usage.tasks.executed).toBe(maxMultiplier)
157 expect(workerNode.usage.tasks.queued).toBe(0)
158 expect(workerNode.usage.tasks.maxQueued).toBe(
159 maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency
161 expect(workerNode.usage.tasks.sequentiallyStolen).toBeGreaterThanOrEqual(
164 expect(workerNode.usage.tasks.sequentiallyStolen).toBeLessThanOrEqual(
165 numberOfThreads * maxMultiplier
167 expect(workerNode.usage.tasks.stolen).toBeGreaterThanOrEqual(0)
168 expect(workerNode.usage.tasks.stolen).toBeLessThanOrEqual(
169 numberOfThreads * maxMultiplier
172 expect(queuePool.info.executedTasks).toBe(numberOfThreads * maxMultiplier)
173 expect(queuePool.info.backPressure).toBe(false)
174 expect(queuePool.info.stolenTasks).toBeGreaterThanOrEqual(0)
175 expect(queuePool.info.stolenTasks).toBeLessThanOrEqual(
176 numberOfThreads * maxMultiplier
180 it('Verify that is possible to have a worker that return undefined', async () => {
181 const result = await emptyPool.execute()
182 expect(result).toBeUndefined()
185 it('Verify that data are sent to the worker correctly', async () => {
186 const data = { f: 10 }
187 const result = await echoPool.execute(data)
188 expect(result).toStrictEqual(data)
191 it('Verify that transferable objects are sent to the worker correctly', async () => {
195 result = await pool.execute(undefined, undefined, [
197 new MessageChannel().port1
202 expect(result).toStrictEqual({ ok: 1 })
203 expect(error).toBeUndefined()
205 result = await pool.execute(undefined, undefined, [
206 new SharedArrayBuffer(16)
211 expect(result).toStrictEqual({ ok: 1 })
212 expect(error).toBeInstanceOf(Error)
213 expect(error.message).toMatch(
214 /Found invalid (object|value) in transferList/
218 it('Verify that error handling is working properly:sync', async () => {
219 const data = { f: 10 }
220 expect(errorPool.emitter.eventNames()).toStrictEqual([])
222 errorPool.emitter.on(PoolEvents.taskError, e => {
225 expect(errorPool.emitter.eventNames()).toStrictEqual([PoolEvents.taskError])
228 await errorPool.execute(data)
232 expect(inError).toBeDefined()
233 expect(inError).toBeInstanceOf(Error)
234 expect(inError.message).toBeDefined()
235 expect(typeof inError.message === 'string').toBe(true)
236 expect(inError.message).toBe('Error Message from ThreadWorker')
237 expect(taskError).toStrictEqual({
238 name: DEFAULT_TASK_NAME,
239 message: new Error('Error Message from ThreadWorker'),
243 errorPool.workerNodes.some(
244 workerNode => workerNode.usage.tasks.failed === 1
249 it('Verify that error handling is working properly:async', async () => {
250 const data = { f: 10 }
251 expect(asyncErrorPool.emitter.eventNames()).toStrictEqual([])
253 asyncErrorPool.emitter.on(PoolEvents.taskError, e => {
256 expect(asyncErrorPool.emitter.eventNames()).toStrictEqual([
261 await asyncErrorPool.execute(data)
265 expect(inError).toBeDefined()
266 expect(inError).toBeInstanceOf(Error)
267 expect(inError.message).toBeDefined()
268 expect(typeof inError.message === 'string').toBe(true)
269 expect(inError.message).toBe('Error Message from ThreadWorker:async')
270 expect(taskError).toStrictEqual({
271 name: DEFAULT_TASK_NAME,
272 message: new Error('Error Message from ThreadWorker:async'),
276 asyncErrorPool.workerNodes.some(
277 workerNode => workerNode.usage.tasks.failed === 1
282 it('Verify that async function is working properly', async () => {
283 const data = { f: 10 }
284 const startTime = performance.now()
285 const result = await asyncPool.execute(data)
286 const usedTime = performance.now() - startTime
287 expect(result).toStrictEqual(data)
288 expect(usedTime).toBeGreaterThanOrEqual(2000)
291 it('Shutdown test', async () => {
292 const exitPromise = waitWorkerEvents(pool, 'exit', numberOfThreads)
293 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.busy])
295 pool.emitter.on(PoolEvents.destroy, () => ++poolDestroy)
296 expect(pool.emitter.eventNames()).toStrictEqual([
301 const numberOfExitEvents = await exitPromise
302 expect(pool.started).toBe(false)
303 expect(pool.readyEventEmitted).toBe(false)
304 expect(pool.emitter.eventNames()).toStrictEqual([])
305 expect(pool.workerNodes.length).toBe(0)
306 expect(numberOfExitEvents).toBe(numberOfThreads)
307 expect(poolDestroy).toBe(1)
310 it('Verify that thread pool options are checked', async () => {
311 const workerFilePath = './tests/worker-files/thread/testWorker.mjs'
312 let pool = new FixedThreadPool(numberOfThreads, workerFilePath)
313 expect(pool.opts.workerOptions).toBeUndefined()
315 pool = new FixedThreadPool(numberOfThreads, workerFilePath, {
317 env: { TEST: 'test' },
321 expect(pool.opts.workerOptions).toStrictEqual({
322 env: { TEST: 'test' },
328 it('Should work even without opts in input', async () => {
329 const workerFilePath = './tests/worker-files/thread/testWorker.mjs'
330 const pool = new FixedThreadPool(numberOfThreads, workerFilePath)
331 const res = await pool.execute()
332 expect(res).toStrictEqual({ ok: 1 })
333 // We need to clean up the resources after our test
337 it('Verify destroyWorkerNode()', async () => {
338 const workerFilePath = './tests/worker-files/thread/testWorker.mjs'
339 const pool = new FixedThreadPool(numberOfThreads, workerFilePath)
340 const workerNodeKey = 0
342 pool.workerNodes[workerNodeKey].worker.on('exit', () => {
345 await expect(pool.destroyWorkerNode(workerNodeKey)).resolves.toBeUndefined()
346 expect(exitEvent).toBe(1)
347 expect(pool.workerNodes.length).toBe(numberOfThreads - 1)
351 it('Verify that a pool with zero worker fails', () => {
353 () => new FixedThreadPool(0, './tests/worker-files/thread/testWorker.mjs')
354 ).toThrow('Cannot instantiate a fixed pool with zero worker')