Avoid to on-by-one in worker function. (#285)
authorJérôme Benoit <jerome.benoit@piment-noir.org>
Mon, 22 Mar 2021 10:58:03 +0000 (11:58 +0100)
committerGitHub <noreply@github.com>
Mon, 22 Mar 2021 10:58:03 +0000 (11:58 +0100)
12 files changed:
benchmarks/internal/cluster/worker.js
benchmarks/internal/thread/worker.js
benchmarks/versus-external-pools/functions/function-to-bench.js
src/pools/abstract-pool.ts
src/pools/cluster/fixed.ts
src/pools/thread/fixed.ts
src/utils.ts [new file with mode: 0644]
src/worker/abstract-worker.ts
src/worker/cluster-worker.ts
src/worker/thread-worker.ts
tests/worker-files/cluster/testWorker.js
tests/worker-files/thread/testWorker.js

index 1692c284b218c1decb5b5b451e80b1ff81e0283f..b4d54fca9976e65bbbb76da3ff5d54a4673815ff 100644 (file)
@@ -2,7 +2,7 @@
 const { ClusterWorker } = require('../../../lib/index')
 
 function yourFunction (data) {
-  for (let i = 0; i <= 1000; i++) {
+  for (let i = 0; i < 1000; i++) {
     const o = {
       a: i
     }
index 2ec5f4c2a54ff25550029e28e6ad491c61934bb3..f5f36ed88b82cbfef176e0cbdb1d9969d9a7af1c 100644 (file)
@@ -2,7 +2,7 @@
 const { ThreadWorker } = require('../../../lib/index')
 
 function yourFunction (data) {
-  for (let i = 0; i <= 1000; i++) {
+  for (let i = 0; i < 1000; i++) {
     const o = {
       a: i
     }
index 48a0fc78c8851f6c31c3e87d7d7d35eb3b6e0dc2..565f811219cbccea2cdb1991b1c8a9a337fa9618 100644 (file)
@@ -1,7 +1,7 @@
 module.exports = function (data) {
   if (data.taskType === 'CPU_INTENSIVE') {
     // CPU intensive task
-    for (let i = 0; i <= 5000; i++) {
+    for (let i = 0; i < 5000; i++) {
       const o = {
         a: i
       }
index 54447c82ba2db2e9a88503d6e670f73022a143bc..c3f293e78bc693fba6aea17e36c880968ee58443 100644 (file)
@@ -2,6 +2,7 @@ import type {
   MessageValue,
   PromiseWorkerResponseWrapper
 } from '../utility-types'
+import { EMPTY_FUNCTION } from '../utils'
 import { isKillBehavior, KillBehaviors } from '../worker/worker-options'
 import type { IPoolInternal } from './pool-internal'
 import { PoolEmitter, PoolType } from './pool-internal'
@@ -11,13 +12,6 @@ import {
   WorkerChoiceStrategyContext
 } from './selection-strategies'
 
-/**
- * An intentional empty function.
- */
-const EMPTY_FUNCTION: () => void = () => {
-  /* Intentionally empty */
-}
-
 /**
  * Callback invoked if the worker raised an error.
  */
index b522947f25cb6f9497c7935fdcd131d1747e8b34..e1caf252332f7d0103a77a9470b1526a62613e78 100644 (file)
@@ -49,12 +49,14 @@ export class FixedClusterPool<
     super(numberOfWorkers, filePath, opts)
   }
 
+  /** @inheritdoc */
   protected setupHook (): void {
     setupMaster({
       exec: this.filePath
     })
   }
 
+  /** @inheritdoc */
   protected isMain (): boolean {
     return isMaster
   }
@@ -65,6 +67,7 @@ export class FixedClusterPool<
     worker.kill()
   }
 
+  /** @inheritdoc */
   protected sendToWorker (worker: Worker, message: MessageValue<Data>): void {
     worker.send(message)
   }
@@ -77,10 +80,12 @@ export class FixedClusterPool<
     worker.on('message', listener)
   }
 
+  /** @inheritdoc */
   protected createWorker (): Worker {
     return fork(this.opts.env)
   }
 
+  /** @inheritdoc */
   protected afterWorkerSetup (worker: Worker): void {
     // Listen worker messages.
     this.registerWorkerMessageListener(worker, super.workerListener())
index abd1b6b8b081ac2740ac96ea6a1265a690ff407b..7dda3f3090f72459a599128213364cb380a7356b 100644 (file)
@@ -41,6 +41,7 @@ export class FixedThreadPool<
     super(numberOfThreads, filePath, opts)
   }
 
+  /** @inheritdoc */
   protected isMain (): boolean {
     return isMainThread
   }
@@ -53,6 +54,7 @@ export class FixedThreadPool<
     await worker.terminate()
   }
 
+  /** @inheritdoc */
   protected sendToWorker (
     worker: ThreadWorkerWithMessageChannel,
     message: MessageValue<Data>
@@ -68,12 +70,14 @@ export class FixedThreadPool<
     messageChannel.port2?.on('message', listener)
   }
 
+  /** @inheritdoc */
   protected createWorker (): ThreadWorkerWithMessageChannel {
     return new Worker(this.filePath, {
       env: SHARE_ENV
     })
   }
 
+  /** @inheritdoc */
   protected afterWorkerSetup (worker: ThreadWorkerWithMessageChannel): void {
     const { port1, port2 } = new MessageChannel()
     worker.postMessage({ parent: port1 }, [port1])
diff --git a/src/utils.ts b/src/utils.ts
new file mode 100644 (file)
index 0000000..b1521c4
--- /dev/null
@@ -0,0 +1,6 @@
+/**
+ * An intentional empty function.
+ */
+export const EMPTY_FUNCTION: () => void = () => {
+  /* Intentionally empty */
+}
index 4f741b47acf134146069e33d866b8f40a09e0bb1..26468aeb96ae0d5f838518e2e9308b2e31c16c0e 100644 (file)
@@ -2,6 +2,7 @@ import { AsyncResource } from 'async_hooks'
 import type { Worker } from 'cluster'
 import type { MessagePort } from 'worker_threads'
 import type { MessageValue } from '../utility-types'
+import { EMPTY_FUNCTION } from '../utils'
 import type { KillBehavior, WorkerOptions } from './worker-options'
 import { KillBehaviors } from './worker-options'
 
@@ -156,10 +157,10 @@ export abstract class AbstractWorker<
     try {
       const res = fn(value.data)
       this.sendToMainWorker({ data: res, id: value.id })
-      this.lastTask = Date.now()
     } catch (e) {
       const err = this.handleError(e)
       this.sendToMainWorker({ error: err, id: value.id })
+    } finally {
       this.lastTask = Date.now()
     }
   }
@@ -177,13 +178,15 @@ export abstract class AbstractWorker<
     fn(value.data)
       .then(res => {
         this.sendToMainWorker({ data: res, id: value.id })
-        this.lastTask = Date.now()
         return null
       })
       .catch(e => {
         const err = this.handleError(e)
         this.sendToMainWorker({ error: err, id: value.id })
+      })
+      .finally(() => {
         this.lastTask = Date.now()
       })
+      .catch(EMPTY_FUNCTION)
   }
 }
index 6dea4c5e5bbab624f48c36bce66c46de3deca96d..2c326a67380e96589083a8bb451e4353180e232b 100644 (file)
@@ -33,10 +33,12 @@ export class ClusterWorker<
     super('worker-cluster-pool:pioardi', isMaster, fn, worker, opts)
   }
 
+  /** @inheritdoc */
   protected sendToMainWorker (message: MessageValue<Response>): void {
     this.getMainWorker().send(message)
   }
 
+  /** @inheritdoc */
   protected handleError (e: Error | string): string {
     return e instanceof Error ? e.message : e
   }
index 1070ed31d7822a86e6b2d4f96c5a9b7c7274b8dc..4c458aa19d0f5ed47091689deb3727037663100d 100644 (file)
@@ -33,6 +33,7 @@ export class ThreadWorker<
     super('worker-thread-pool:pioardi', isMainThread, fn, parentPort, opts)
   }
 
+  /** @inheritdoc */
   protected sendToMainWorker (message: MessageValue<Response>): void {
     this.getMainWorker().postMessage(message)
   }
index 7caad9476da291f19319b981c67e8115ac0e32a7..549386073f3413ac26b44d43dc50fb88ff43a43e 100644 (file)
@@ -3,7 +3,7 @@ const { ClusterWorker, KillBehaviors } = require('../../../lib/index')
 const { isMaster } = require('cluster')
 
 function test (data) {
-  for (let i = 0; i <= 50; i++) {
+  for (let i = 0; i < 50; i++) {
     const o = {
       a: i
     }
index c70069c7269f17d2f83069554d0e3fba5d2a15e6..369dbdba477bc8f7f60c50976fd513e387f5ed5e 100644 (file)
@@ -3,7 +3,7 @@ const { ThreadWorker, KillBehaviors } = require('../../../lib/index')
 const { isMainThread } = require('worker_threads')
 
 function test (data) {
-  for (let i = 0; i <= 50; i++) {
+  for (let i = 0; i < 50; i++) {
     const o = {
       a: i
     }