)
})
- it('Verify that async kill handler is called when worker is killed', async () => {
- const killHandlerStub = stub().returns()
- const worker = new ClusterWorker(() => {}, {
- killHandler: async () => await Promise.resolve(killHandlerStub()),
- })
- worker.isMain = false
- const sendToMainWorkerStub = stub(worker, 'sendToMainWorker').returns()
- worker.handleKillMessage()
- await sleep(10)
- expect(killHandlerStub.calledOnce).toBe(true)
- expect(sendToMainWorkerStub.calledOnce).toBe(true)
- expect(sendToMainWorkerStub.calledWith({ kill: 'success' })).toBe(true)
- })
-
it('Verify that getMainWorker() throw error if main worker is not set', () => {
expect(() =>
new StubWorkerWithMainWorker(() => {}).getMainWorker()
worker.taskFunctions.get('fn2')
)
})
+
+ it('Verify that removeTaskFunction() is working', () => {
+ const fn1 = () => {
+ return 1
+ }
+ const fn2 = () => {
+ return 2
+ }
+ const worker = new ThreadWorker({ fn1, fn2 })
+ stub(worker, 'sendToMainWorker').returns()
+ expect(worker.removeTaskFunction(0)).toStrictEqual({
+ error: new TypeError('name parameter is not a string'),
+ status: false,
+ })
+ expect(worker.removeTaskFunction('')).toStrictEqual({
+ error: new TypeError('name parameter is an empty string'),
+ status: false,
+ })
+ expect(worker.removeTaskFunction(DEFAULT_TASK_NAME)).toStrictEqual({
+ error: new Error(
+ 'Cannot remove the task function with the default reserved name'
+ ),
+ status: false,
+ })
+ expect(worker.removeTaskFunction('fn1')).toStrictEqual({
+ error: new Error(
+ 'Cannot remove the task function used as the default task function'
+ ),
+ status: false,
+ })
+ expect(worker.taskFunctions.size).toBe(3)
+ expect(worker.removeTaskFunction('fn2')).toStrictEqual({ status: true })
+ expect(worker.taskFunctions.size).toBe(2)
+ expect(worker.taskFunctions.has('fn2')).toBe(false)
+ })
+
+ describe('Message handling', () => {
+ it('Verify that messageListener() handles statistics message', () => {
+ const worker = new ThreadWorker(() => {})
+ worker.messageListener({
+ statistics: { elu: true, runTime: true },
+ workerId: worker.id,
+ })
+ expect(worker.statistics).toStrictEqual({ elu: true, runTime: true })
+ })
+
+ it('Verify that messageListener() handles checkActive message', () => {
+ const worker = new ThreadWorker(() => {})
+ worker.messageListener({
+ checkActive: true,
+ workerId: worker.id,
+ })
+ expect(worker.activeInterval).toBeDefined()
+ worker.messageListener({
+ checkActive: false,
+ workerId: worker.id,
+ })
+ expect(worker.activeInterval).toBeUndefined()
+ })
+
+ it('Verify that messageListener() handles kill message', () => {
+ const worker = new ThreadWorker(() => {})
+ worker.isMain = false
+ const sendToMainWorkerStub = stub(worker, 'sendToMainWorker').returns()
+ worker.messageListener({
+ kill: true,
+ workerId: worker.id,
+ })
+ expect(sendToMainWorkerStub.called).toBe(true)
+ expect(sendToMainWorkerStub.lastCall.args[0]).toMatchObject({
+ kill: 'success',
+ })
+ })
+
+ it('Verify that async kill handler is called when worker is killed', async () => {
+ const killHandlerStub = stub().returns()
+ const worker = new ClusterWorker(() => {}, {
+ killHandler: async () => await Promise.resolve(killHandlerStub()),
+ })
+ worker.isMain = false
+ const sendToMainWorkerStub = stub(worker, 'sendToMainWorker').returns()
+ worker.handleKillMessage()
+ await sleep(10)
+ expect(killHandlerStub.calledOnce).toBe(true)
+ expect(sendToMainWorkerStub.called).toBe(true)
+ expect(sendToMainWorkerStub.calledWith({ kill: 'success' })).toBe(true)
+ })
+
+ it('Verify that messageListener() throws on missing workerId', () => {
+ const worker = new ThreadWorker(() => {})
+ expect(() => worker.messageListener({})).toThrow(
+ /Message worker id is not set/
+ )
+ })
+
+ it('Verify that messageListener() throws on mismatched workerId', () => {
+ const worker = new ThreadWorker(() => {})
+ expect(() => worker.messageListener({ workerId: 9999 })).toThrow(
+ /Message worker id .* does not match/
+ )
+ })
+ })
+
+ describe('Task execution', () => {
+ it('Verify that run() executes sync task function', () => {
+ const worker = new ThreadWorker(data => data * 2)
+ worker.statistics = { elu: false, runTime: false }
+ const sendToMainWorkerStub = stub(worker, 'sendToMainWorker').returns()
+ worker.run({
+ data: 21,
+ name: DEFAULT_TASK_NAME,
+ taskId: '550e8400-e29b-41d4-a716-446655440000',
+ })
+ expect(sendToMainWorkerStub.called).toBe(true)
+ const lastCall = sendToMainWorkerStub.lastCall
+ expect(lastCall.args[0].data).toBe(42)
+ expect(lastCall.args[0].taskId).toBe(
+ '550e8400-e29b-41d4-a716-446655440000'
+ )
+ expect(lastCall.args[0].taskPerformance).toBeDefined()
+ })
+
+ it('Verify that run() executes async task function', async () => {
+ const worker = new ThreadWorker(
+ async data => await Promise.resolve(data * 2)
+ )
+ worker.statistics = { elu: false, runTime: false }
+ const sendToMainWorkerStub = stub(worker, 'sendToMainWorker').returns()
+ worker.run({
+ data: 21,
+ name: DEFAULT_TASK_NAME,
+ taskId: '550e8400-e29b-41d4-a716-446655440000',
+ })
+ await sleep(10)
+ expect(sendToMainWorkerStub.called).toBe(true)
+ const lastCall = sendToMainWorkerStub.lastCall
+ expect(lastCall.args[0].data).toBe(42)
+ expect(lastCall.args[0].taskId).toBe(
+ '550e8400-e29b-41d4-a716-446655440000'
+ )
+ })
+
+ it('Verify that run() handles task function not found', () => {
+ const worker = new ThreadWorker(() => {})
+ worker.statistics = { elu: false, runTime: false }
+ const sendToMainWorkerStub = stub(worker, 'sendToMainWorker').returns()
+ worker.run({
+ data: {},
+ name: 'unknown',
+ taskId: '550e8400-e29b-41d4-a716-446655440000',
+ })
+ expect(sendToMainWorkerStub.called).toBe(true)
+ const lastCall = sendToMainWorkerStub.lastCall
+ expect(lastCall.args[0].workerError).toBeDefined()
+ expect(lastCall.args[0].workerError.message).toMatch(
+ /Task function 'unknown' not found/
+ )
+ })
+
+ it('Verify that runSync() handles task function error', () => {
+ const worker = new ThreadWorker(() => {
+ throw new Error('Task error')
+ })
+ worker.statistics = { elu: false, runTime: false }
+ const sendToMainWorkerStub = stub(worker, 'sendToMainWorker').returns()
+ worker.run({
+ data: {},
+ name: DEFAULT_TASK_NAME,
+ taskId: '550e8400-e29b-41d4-a716-446655440000',
+ })
+ expect(sendToMainWorkerStub.called).toBe(true)
+ const lastCall = sendToMainWorkerStub.lastCall
+ expect(lastCall.args[0].workerError).toBeDefined()
+ expect(lastCall.args[0].workerError.message).toBe('Task error')
+ })
+
+ it('Verify that runAsync() handles task function error', async () => {
+ const worker = new ThreadWorker(async () => {
+ return await Promise.reject(new Error('Async task error'))
+ })
+ worker.statistics = { elu: false, runTime: false }
+ const sendToMainWorkerStub = stub(worker, 'sendToMainWorker').returns()
+ worker.run({
+ data: {},
+ name: DEFAULT_TASK_NAME,
+ taskId: '550e8400-e29b-41d4-a716-446655440000',
+ })
+ await sleep(10)
+ expect(sendToMainWorkerStub.called).toBe(true)
+ const lastCall = sendToMainWorkerStub.lastCall
+ expect(lastCall.args[0].workerError).toBeDefined()
+ expect(lastCall.args[0].workerError.message).toBe('Async task error')
+ })
+
+ it('Verify that run() with runTime statistics works', () => {
+ const worker = new ThreadWorker(data => data)
+ worker.statistics = { elu: false, runTime: true }
+ const sendToMainWorkerStub = stub(worker, 'sendToMainWorker').returns()
+ worker.run({
+ data: 'test',
+ name: DEFAULT_TASK_NAME,
+ taskId: '550e8400-e29b-41d4-a716-446655440000',
+ })
+ expect(sendToMainWorkerStub.called).toBe(true)
+ const lastCall = sendToMainWorkerStub.lastCall
+ expect(lastCall.args[0].taskPerformance.runTime).toBeGreaterThanOrEqual(0)
+ })
+
+ it('Verify that run() with elu statistics works', () => {
+ const worker = new ThreadWorker(data => data)
+ worker.statistics = { elu: true, runTime: false }
+ const sendToMainWorkerStub = stub(worker, 'sendToMainWorker').returns()
+ worker.run({
+ data: 'test',
+ name: DEFAULT_TASK_NAME,
+ taskId: '550e8400-e29b-41d4-a716-446655440000',
+ })
+ expect(sendToMainWorkerStub.called).toBe(true)
+ const lastCall = sendToMainWorkerStub.lastCall
+ expect(lastCall.args[0].taskPerformance.elu).toBeDefined()
+ })
+ })
+
+ describe('Task function operations via messages', () => {
+ it('Verify that handleTaskFunctionOperationMessage() handles add operation', () => {
+ const worker = new ThreadWorker(() => {})
+ const sendToMainWorkerStub = stub(worker, 'sendToMainWorker').returns()
+ worker.handleTaskFunctionOperationMessage({
+ taskFunction: '(data) => data * 3',
+ taskFunctionOperation: 'add',
+ taskFunctionProperties: { name: 'newFn' },
+ })
+ expect(worker.taskFunctions.has('newFn')).toBe(true)
+ expect(sendToMainWorkerStub.called).toBe(true)
+ const lastCall = sendToMainWorkerStub.lastCall
+ expect(lastCall.args[0].taskFunctionOperationStatus).toBe(true)
+ })
+
+ it('Verify that handleTaskFunctionOperationMessage() handles remove operation', () => {
+ const fn1 = () => 1
+ const fn2 = () => 2
+ const worker = new ThreadWorker({ fn1, fn2 })
+ const sendToMainWorkerStub = stub(worker, 'sendToMainWorker').returns()
+ expect(worker.taskFunctions.has('fn2')).toBe(true)
+ worker.handleTaskFunctionOperationMessage({
+ taskFunctionOperation: 'remove',
+ taskFunctionProperties: { name: 'fn2' },
+ })
+ expect(worker.taskFunctions.has('fn2')).toBe(false)
+ expect(sendToMainWorkerStub.called).toBe(true)
+ const lastCall = sendToMainWorkerStub.lastCall
+ expect(lastCall.args[0].taskFunctionOperationStatus).toBe(true)
+ })
+
+ it('Verify that handleTaskFunctionOperationMessage() handles default operation', () => {
+ const fn1 = () => 1
+ const fn2 = () => 2
+ const worker = new ThreadWorker({ fn1, fn2 })
+ const sendToMainWorkerStub = stub(worker, 'sendToMainWorker').returns()
+ worker.handleTaskFunctionOperationMessage({
+ taskFunctionOperation: 'default',
+ taskFunctionProperties: { name: 'fn2' },
+ })
+ expect(worker.taskFunctions.get(DEFAULT_TASK_NAME)).toStrictEqual(
+ worker.taskFunctions.get('fn2')
+ )
+ expect(sendToMainWorkerStub.called).toBe(true)
+ const lastCall = sendToMainWorkerStub.lastCall
+ expect(lastCall.args[0].taskFunctionOperationStatus).toBe(true)
+ })
+
+ it('Verify that handleTaskFunctionOperationMessage() handles unknown operation', () => {
+ const worker = new ThreadWorker(() => {})
+ const sendToMainWorkerStub = stub(worker, 'sendToMainWorker').returns()
+ worker.handleTaskFunctionOperationMessage({
+ taskFunctionOperation: 'unknown',
+ taskFunctionProperties: { name: 'fn' },
+ })
+ expect(sendToMainWorkerStub.called).toBe(true)
+ const lastCall = sendToMainWorkerStub.lastCall
+ expect(lastCall.args[0].taskFunctionOperationStatus).toBe(false)
+ expect(lastCall.args[0].workerError.message).toMatch(
+ /Unknown task function operation/
+ )
+ })
+
+ it('Verify that handleTaskFunctionOperationMessage() throws without properties', () => {
+ const worker = new ThreadWorker(() => {})
+ expect(() =>
+ worker.handleTaskFunctionOperationMessage({
+ taskFunctionOperation: 'add',
+ })
+ ).toThrow(
+ /Cannot handle task function operation message without task function properties/
+ )
+ })
+
+ it('Verify that handleTaskFunctionOperationMessage() throws add without function', () => {
+ const worker = new ThreadWorker(() => {})
+ expect(() =>
+ worker.handleTaskFunctionOperationMessage({
+ taskFunctionOperation: 'add',
+ taskFunctionProperties: { name: 'fn' },
+ })
+ ).toThrow(
+ /Cannot handle task function operation add message without task function/
+ )
+ })
+ })
+
+ describe('Check active mechanism', () => {
+ it('Verify that startCheckActive() starts the interval', () => {
+ const worker = new ThreadWorker(() => {})
+ expect(worker.activeInterval).toBeUndefined()
+ worker.startCheckActive()
+ expect(worker.activeInterval).toBeDefined()
+ worker.stopCheckActive()
+ })
+
+ it('Verify that stopCheckActive() stops the interval', () => {
+ const worker = new ThreadWorker(() => {})
+ worker.startCheckActive()
+ expect(worker.activeInterval).toBeDefined()
+ worker.stopCheckActive()
+ expect(worker.activeInterval).toBeUndefined()
+ })
+
+ it('Verify that checkActive() sends kill on inactivity', async () => {
+ const worker = new ThreadWorker(() => {}, { maxInactiveTime: 10 })
+ const sendToMainWorkerStub = stub(worker, 'sendToMainWorker').returns()
+ worker.startCheckActive()
+ await sleep(20)
+ expect(sendToMainWorkerStub.called).toBe(true)
+ expect(
+ sendToMainWorkerStub.calledWith({ kill: KillBehaviors.SOFT })
+ ).toBe(true)
+ worker.stopCheckActive()
+ })
+ })
})