From 6e9d10db05ac2bbc85373195a5c885d2492fee61 Mon Sep 17 00:00:00 2001 From: =?utf8?q?J=C3=A9r=C3=B4me=20Benoit?= Date: Mon, 22 Mar 2021 11:58:03 +0100 Subject: [PATCH] Avoid to on-by-one in worker function. (#285) --- benchmarks/internal/cluster/worker.js | 2 +- benchmarks/internal/thread/worker.js | 2 +- .../versus-external-pools/functions/function-to-bench.js | 2 +- src/pools/abstract-pool.ts | 8 +------- src/pools/cluster/fixed.ts | 5 +++++ src/pools/thread/fixed.ts | 4 ++++ src/utils.ts | 6 ++++++ src/worker/abstract-worker.ts | 7 +++++-- src/worker/cluster-worker.ts | 2 ++ src/worker/thread-worker.ts | 1 + tests/worker-files/cluster/testWorker.js | 2 +- tests/worker-files/thread/testWorker.js | 2 +- 12 files changed, 29 insertions(+), 14 deletions(-) create mode 100644 src/utils.ts diff --git a/benchmarks/internal/cluster/worker.js b/benchmarks/internal/cluster/worker.js index 1692c284..b4d54fca 100644 --- a/benchmarks/internal/cluster/worker.js +++ b/benchmarks/internal/cluster/worker.js @@ -2,7 +2,7 @@ const { ClusterWorker } = require('../../../lib/index') function yourFunction (data) { - for (let i = 0; i <= 1000; i++) { + for (let i = 0; i < 1000; i++) { const o = { a: i } diff --git a/benchmarks/internal/thread/worker.js b/benchmarks/internal/thread/worker.js index 2ec5f4c2..f5f36ed8 100644 --- a/benchmarks/internal/thread/worker.js +++ b/benchmarks/internal/thread/worker.js @@ -2,7 +2,7 @@ const { ThreadWorker } = require('../../../lib/index') function yourFunction (data) { - for (let i = 0; i <= 1000; i++) { + for (let i = 0; i < 1000; i++) { const o = { a: i } diff --git a/benchmarks/versus-external-pools/functions/function-to-bench.js b/benchmarks/versus-external-pools/functions/function-to-bench.js index 48a0fc78..565f8112 100644 --- a/benchmarks/versus-external-pools/functions/function-to-bench.js +++ b/benchmarks/versus-external-pools/functions/function-to-bench.js @@ -1,7 +1,7 @@ module.exports = function (data) { if (data.taskType === 'CPU_INTENSIVE') { // CPU intensive task - for (let i = 0; i <= 5000; i++) { + for (let i = 0; i < 5000; i++) { const o = { a: i } diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index 54447c82..c3f293e7 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -2,6 +2,7 @@ import type { MessageValue, PromiseWorkerResponseWrapper } from '../utility-types' +import { EMPTY_FUNCTION } from '../utils' import { isKillBehavior, KillBehaviors } from '../worker/worker-options' import type { IPoolInternal } from './pool-internal' import { PoolEmitter, PoolType } from './pool-internal' @@ -11,13 +12,6 @@ import { WorkerChoiceStrategyContext } from './selection-strategies' -/** - * An intentional empty function. - */ -const EMPTY_FUNCTION: () => void = () => { - /* Intentionally empty */ -} - /** * Callback invoked if the worker raised an error. */ diff --git a/src/pools/cluster/fixed.ts b/src/pools/cluster/fixed.ts index b522947f..e1caf252 100644 --- a/src/pools/cluster/fixed.ts +++ b/src/pools/cluster/fixed.ts @@ -49,12 +49,14 @@ export class FixedClusterPool< super(numberOfWorkers, filePath, opts) } + /** @inheritdoc */ protected setupHook (): void { setupMaster({ exec: this.filePath }) } + /** @inheritdoc */ protected isMain (): boolean { return isMaster } @@ -65,6 +67,7 @@ export class FixedClusterPool< worker.kill() } + /** @inheritdoc */ protected sendToWorker (worker: Worker, message: MessageValue): void { worker.send(message) } @@ -77,10 +80,12 @@ export class FixedClusterPool< worker.on('message', listener) } + /** @inheritdoc */ protected createWorker (): Worker { return fork(this.opts.env) } + /** @inheritdoc */ protected afterWorkerSetup (worker: Worker): void { // Listen worker messages. this.registerWorkerMessageListener(worker, super.workerListener()) diff --git a/src/pools/thread/fixed.ts b/src/pools/thread/fixed.ts index abd1b6b8..7dda3f30 100644 --- a/src/pools/thread/fixed.ts +++ b/src/pools/thread/fixed.ts @@ -41,6 +41,7 @@ export class FixedThreadPool< super(numberOfThreads, filePath, opts) } + /** @inheritdoc */ protected isMain (): boolean { return isMainThread } @@ -53,6 +54,7 @@ export class FixedThreadPool< await worker.terminate() } + /** @inheritdoc */ protected sendToWorker ( worker: ThreadWorkerWithMessageChannel, message: MessageValue @@ -68,12 +70,14 @@ export class FixedThreadPool< messageChannel.port2?.on('message', listener) } + /** @inheritdoc */ protected createWorker (): ThreadWorkerWithMessageChannel { return new Worker(this.filePath, { env: SHARE_ENV }) } + /** @inheritdoc */ protected afterWorkerSetup (worker: ThreadWorkerWithMessageChannel): void { const { port1, port2 } = new MessageChannel() worker.postMessage({ parent: port1 }, [port1]) diff --git a/src/utils.ts b/src/utils.ts new file mode 100644 index 00000000..b1521c4c --- /dev/null +++ b/src/utils.ts @@ -0,0 +1,6 @@ +/** + * An intentional empty function. + */ +export const EMPTY_FUNCTION: () => void = () => { + /* Intentionally empty */ +} diff --git a/src/worker/abstract-worker.ts b/src/worker/abstract-worker.ts index 4f741b47..26468aeb 100644 --- a/src/worker/abstract-worker.ts +++ b/src/worker/abstract-worker.ts @@ -2,6 +2,7 @@ import { AsyncResource } from 'async_hooks' import type { Worker } from 'cluster' import type { MessagePort } from 'worker_threads' import type { MessageValue } from '../utility-types' +import { EMPTY_FUNCTION } from '../utils' import type { KillBehavior, WorkerOptions } from './worker-options' import { KillBehaviors } from './worker-options' @@ -156,10 +157,10 @@ export abstract class AbstractWorker< try { const res = fn(value.data) this.sendToMainWorker({ data: res, id: value.id }) - this.lastTask = Date.now() } catch (e) { const err = this.handleError(e) this.sendToMainWorker({ error: err, id: value.id }) + } finally { this.lastTask = Date.now() } } @@ -177,13 +178,15 @@ export abstract class AbstractWorker< fn(value.data) .then(res => { this.sendToMainWorker({ data: res, id: value.id }) - this.lastTask = Date.now() return null }) .catch(e => { const err = this.handleError(e) this.sendToMainWorker({ error: err, id: value.id }) + }) + .finally(() => { this.lastTask = Date.now() }) + .catch(EMPTY_FUNCTION) } } diff --git a/src/worker/cluster-worker.ts b/src/worker/cluster-worker.ts index 6dea4c5e..2c326a67 100644 --- a/src/worker/cluster-worker.ts +++ b/src/worker/cluster-worker.ts @@ -33,10 +33,12 @@ export class ClusterWorker< super('worker-cluster-pool:pioardi', isMaster, fn, worker, opts) } + /** @inheritdoc */ protected sendToMainWorker (message: MessageValue): void { this.getMainWorker().send(message) } + /** @inheritdoc */ protected handleError (e: Error | string): string { return e instanceof Error ? e.message : e } diff --git a/src/worker/thread-worker.ts b/src/worker/thread-worker.ts index 1070ed31..4c458aa1 100644 --- a/src/worker/thread-worker.ts +++ b/src/worker/thread-worker.ts @@ -33,6 +33,7 @@ export class ThreadWorker< super('worker-thread-pool:pioardi', isMainThread, fn, parentPort, opts) } + /** @inheritdoc */ protected sendToMainWorker (message: MessageValue): void { this.getMainWorker().postMessage(message) } diff --git a/tests/worker-files/cluster/testWorker.js b/tests/worker-files/cluster/testWorker.js index 7caad947..54938607 100644 --- a/tests/worker-files/cluster/testWorker.js +++ b/tests/worker-files/cluster/testWorker.js @@ -3,7 +3,7 @@ const { ClusterWorker, KillBehaviors } = require('../../../lib/index') const { isMaster } = require('cluster') function test (data) { - for (let i = 0; i <= 50; i++) { + for (let i = 0; i < 50; i++) { const o = { a: i } diff --git a/tests/worker-files/thread/testWorker.js b/tests/worker-files/thread/testWorker.js index c70069c7..369dbdba 100644 --- a/tests/worker-files/thread/testWorker.js +++ b/tests/worker-files/thread/testWorker.js @@ -3,7 +3,7 @@ const { ThreadWorker, KillBehaviors } = require('../../../lib/index') const { isMainThread } = require('worker_threads') function test (data) { - for (let i = 0; i <= 50; i++) { + for (let i = 0; i < 50; i++) { const o = { a: i } -- 2.34.1