chore(deps-dev): apply updates
[poolifier.git] / tests / pools / thread / fixed.test.mjs
... / ...
CommitLineData
1import { expect } from 'expect'
2
3import { FixedThreadPool, PoolEvents } from '../../../lib/index.cjs'
4import { DEFAULT_TASK_NAME } from '../../../lib/utils.cjs'
5import { TaskFunctions } from '../../test-types.cjs'
6import { waitWorkerEvents } from '../../test-utils.cjs'
7
8describe('Fixed thread pool test suite', () => {
9 const numberOfThreads = 6
10 const tasksConcurrency = 2
11 let asyncErrorPool, asyncPool, echoPool, emptyPool, errorPool, pool, queuePool
12
13 before('Create pools', () => {
14 pool = new FixedThreadPool(
15 numberOfThreads,
16 './tests/worker-files/thread/testWorker.mjs',
17 {
18 errorHandler: e => console.error(e),
19 }
20 )
21 queuePool = new FixedThreadPool(
22 numberOfThreads,
23 './tests/worker-files/thread/testWorker.mjs',
24 {
25 enableTasksQueue: true,
26 errorHandler: e => console.error(e),
27 tasksQueueOptions: {
28 concurrency: tasksConcurrency,
29 },
30 }
31 )
32 emptyPool = new FixedThreadPool(
33 numberOfThreads,
34 './tests/worker-files/thread/emptyWorker.mjs',
35 { exitHandler: () => console.info('empty pool worker exited') }
36 )
37 echoPool = new FixedThreadPool(
38 numberOfThreads,
39 './tests/worker-files/thread/echoWorker.mjs'
40 )
41 errorPool = new FixedThreadPool(
42 numberOfThreads,
43 './tests/worker-files/thread/errorWorker.mjs',
44 {
45 errorHandler: e => console.error(e),
46 }
47 )
48 asyncErrorPool = new FixedThreadPool(
49 numberOfThreads,
50 './tests/worker-files/thread/asyncErrorWorker.mjs',
51 {
52 errorHandler: e => console.error(e),
53 }
54 )
55 asyncPool = new FixedThreadPool(
56 numberOfThreads,
57 './tests/worker-files/thread/asyncWorker.mjs'
58 )
59 })
60
61 after('Destroy pools', async () => {
62 // We need to clean up the resources after our tests
63 await echoPool.destroy()
64 await asyncPool.destroy()
65 await errorPool.destroy()
66 await asyncErrorPool.destroy()
67 await emptyPool.destroy()
68 await queuePool.destroy()
69 })
70
71 it('Verify that the function is executed in a worker thread', async () => {
72 let result = await pool.execute({
73 function: TaskFunctions.fibonacci,
74 })
75 expect(result).toBe(354224848179262000000)
76 result = await pool.execute({
77 function: TaskFunctions.factorial,
78 })
79 expect(result).toBe(9.33262154439441e157)
80 })
81
82 it('Verify that is possible to invoke the execute() method without input', async () => {
83 const result = await pool.execute()
84 expect(result).toStrictEqual({ ok: 1 })
85 })
86
87 it('Verify that tasks queuing is working', async () => {
88 const promises = new Set()
89 const maxMultiplier = 3 // Must be greater than tasksConcurrency
90 for (let i = 0; i < numberOfThreads * maxMultiplier; i++) {
91 promises.add(queuePool.execute())
92 }
93 expect(promises.size).toBe(numberOfThreads * maxMultiplier)
94 for (const workerNode of queuePool.workerNodes) {
95 expect(workerNode.usage.tasks.executing).toBeGreaterThanOrEqual(0)
96 expect(workerNode.usage.tasks.executing).toBeLessThanOrEqual(
97 queuePool.opts.tasksQueueOptions.concurrency
98 )
99 expect(workerNode.usage.tasks.executed).toBe(0)
100 expect(workerNode.usage.tasks.queued).toBe(
101 maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency
102 )
103 expect(workerNode.usage.tasks.maxQueued).toBe(
104 maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency
105 )
106 expect(workerNode.usage.tasks.sequentiallyStolen).toBe(0)
107 expect(workerNode.usage.tasks.stolen).toBe(0)
108 }
109 expect(queuePool.info.executedTasks).toBe(0)
110 expect(queuePool.info.executingTasks).toBe(
111 numberOfThreads * queuePool.opts.tasksQueueOptions.concurrency
112 )
113 expect(queuePool.info.queuedTasks).toBe(
114 numberOfThreads *
115 (maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency)
116 )
117 expect(queuePool.info.maxQueuedTasks).toBe(
118 numberOfThreads *
119 (maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency)
120 )
121 expect(queuePool.info.backPressure).toBe(false)
122 expect(queuePool.info.stolenTasks).toBe(0)
123 await Promise.all(promises)
124 for (const workerNode of queuePool.workerNodes) {
125 expect(workerNode.usage.tasks.executing).toBeGreaterThanOrEqual(0)
126 expect(workerNode.usage.tasks.executing).toBeLessThanOrEqual(
127 numberOfThreads * maxMultiplier
128 )
129 expect(workerNode.usage.tasks.executed).toBe(maxMultiplier)
130 expect(workerNode.usage.tasks.queued).toBe(0)
131 expect(workerNode.usage.tasks.maxQueued).toBe(
132 maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency
133 )
134 expect(workerNode.usage.tasks.sequentiallyStolen).toBeGreaterThanOrEqual(
135 0
136 )
137 expect(workerNode.usage.tasks.sequentiallyStolen).toBeLessThanOrEqual(
138 numberOfThreads * maxMultiplier
139 )
140 expect(workerNode.usage.tasks.stolen).toBeGreaterThanOrEqual(0)
141 expect(workerNode.usage.tasks.stolen).toBeLessThanOrEqual(
142 numberOfThreads * maxMultiplier
143 )
144 }
145 expect(queuePool.info.executedTasks).toBe(numberOfThreads * maxMultiplier)
146 expect(queuePool.info.backPressure).toBe(false)
147 expect(queuePool.info.stolenTasks).toBeGreaterThanOrEqual(0)
148 expect(queuePool.info.stolenTasks).toBeLessThanOrEqual(
149 numberOfThreads * maxMultiplier
150 )
151 })
152
153 it('Verify that is possible to have a worker that return undefined', async () => {
154 const result = await emptyPool.execute()
155 expect(result).toBeUndefined()
156 })
157
158 it('Verify that data are sent to the worker correctly', async () => {
159 const data = { f: 10 }
160 const result = await echoPool.execute(data)
161 expect(result).toStrictEqual(data)
162 })
163
164 it('Verify that transferable objects are sent to the worker correctly', async () => {
165 let error
166 let result
167 try {
168 result = await pool.execute(undefined, undefined, [
169 new ArrayBuffer(16),
170 new MessageChannel().port1,
171 ])
172 } catch (e) {
173 error = e
174 }
175 expect(result).toStrictEqual({ ok: 1 })
176 expect(error).toBeUndefined()
177 try {
178 result = await pool.execute(undefined, undefined, [
179 new SharedArrayBuffer(16),
180 ])
181 } catch (e) {
182 error = e
183 }
184 expect(result).toStrictEqual({ ok: 1 })
185 expect(error).toBeInstanceOf(Error)
186 expect(error.message).toMatch(
187 /Found invalid (object|value) in transferList/
188 )
189 })
190
191 it('Verify that error handling is working properly:sync', async () => {
192 const data = { f: 10 }
193 expect(errorPool.emitter.eventNames()).toStrictEqual([])
194 let taskError
195 errorPool.emitter.on(PoolEvents.taskError, e => {
196 taskError = e
197 })
198 expect(errorPool.emitter.eventNames()).toStrictEqual([PoolEvents.taskError])
199 let inError
200 try {
201 await errorPool.execute(data)
202 } catch (e) {
203 inError = e
204 }
205 expect(inError).toBeInstanceOf(Error)
206 expect(inError.message).toStrictEqual('Error Message from ThreadWorker')
207 expect(taskError).toStrictEqual({
208 data,
209 message: new Error('Error Message from ThreadWorker'),
210 name: DEFAULT_TASK_NAME,
211 })
212 expect(
213 errorPool.workerNodes.some(
214 workerNode => workerNode.usage.tasks.failed === 1
215 )
216 ).toBe(true)
217 })
218
219 it('Verify that error handling is working properly:async', async () => {
220 const data = { f: 10 }
221 expect(asyncErrorPool.emitter.eventNames()).toStrictEqual([])
222 let taskError
223 asyncErrorPool.emitter.on(PoolEvents.taskError, e => {
224 taskError = e
225 })
226 expect(asyncErrorPool.emitter.eventNames()).toStrictEqual([
227 PoolEvents.taskError,
228 ])
229 let inError
230 try {
231 await asyncErrorPool.execute(data)
232 } catch (e) {
233 inError = e
234 }
235 expect(inError).toBeInstanceOf(Error)
236 expect(inError.message).toStrictEqual(
237 'Error Message from ThreadWorker:async'
238 )
239 expect(taskError).toStrictEqual({
240 data,
241 message: new Error('Error Message from ThreadWorker:async'),
242 name: DEFAULT_TASK_NAME,
243 })
244 expect(
245 asyncErrorPool.workerNodes.some(
246 workerNode => workerNode.usage.tasks.failed === 1
247 )
248 ).toBe(true)
249 })
250
251 it('Verify that async function is working properly', async () => {
252 const data = { f: 10 }
253 const startTime = performance.now()
254 const result = await asyncPool.execute(data)
255 const usedTime = performance.now() - startTime
256 expect(result).toStrictEqual(data)
257 expect(usedTime).toBeGreaterThanOrEqual(2000)
258 })
259
260 it('Shutdown test', async () => {
261 const exitPromise = waitWorkerEvents(pool, 'exit', numberOfThreads)
262 expect(pool.emitter.eventNames()).toStrictEqual([])
263 let poolDestroy = 0
264 pool.emitter.on(PoolEvents.destroy, () => ++poolDestroy)
265 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.destroy])
266 await pool.destroy()
267 const numberOfExitEvents = await exitPromise
268 expect(pool.info.started).toBe(false)
269 expect(pool.info.ready).toBe(false)
270 expect(pool.emitter.eventNames()).toStrictEqual([PoolEvents.destroy])
271 expect(pool.readyEventEmitted).toBe(false)
272 expect(pool.busyEventEmitted).toBe(false)
273 expect(pool.backPressureEventEmitted).toBe(false)
274 expect(pool.workerNodes.length).toBe(0)
275 expect(numberOfExitEvents).toBe(numberOfThreads)
276 expect(poolDestroy).toBe(1)
277 })
278
279 it('Verify that thread pool options are checked', async () => {
280 const workerFilePath = './tests/worker-files/thread/testWorker.mjs'
281 let pool = new FixedThreadPool(numberOfThreads, workerFilePath)
282 expect(pool.opts.workerOptions).toBeUndefined()
283 await pool.destroy()
284 pool = new FixedThreadPool(numberOfThreads, workerFilePath, {
285 workerOptions: {
286 env: { TEST: 'test' },
287 name: 'test',
288 },
289 })
290 expect(pool.opts.workerOptions).toStrictEqual({
291 env: { TEST: 'test' },
292 name: 'test',
293 })
294 await pool.destroy()
295 })
296
297 it('Verify destroyWorkerNode()', async () => {
298 const workerFilePath = './tests/worker-files/thread/testWorker.mjs'
299 const pool = new FixedThreadPool(numberOfThreads, workerFilePath)
300 const workerNodeKey = 0
301 let exitEvent = 0
302 pool.workerNodes[workerNodeKey].worker.on('exit', () => {
303 ++exitEvent
304 })
305 await expect(pool.destroyWorkerNode(workerNodeKey)).resolves.toBeUndefined()
306 expect(exitEvent).toBe(1)
307 // Simulates an illegitimate worker node destroy and the minimum number of worker nodes is guaranteed
308 expect(pool.workerNodes.length).toBe(numberOfThreads)
309 await pool.destroy()
310 })
311
312 it('Verify that a pool with zero worker fails', () => {
313 expect(
314 () => new FixedThreadPool(0, './tests/worker-files/thread/testWorker.mjs')
315 ).toThrow('Cannot instantiate a fixed pool with zero worker')
316 })
317})