From 8a2bf757c04a2056ff94dda488e87d817c7291d5 Mon Sep 17 00:00:00 2001 From: =?utf8?q?J=C3=A9r=C3=B4me=20Benoit?= Date: Sun, 6 Jul 2025 20:40:58 +0200 Subject: [PATCH] fix: task queueing fixes (#2912) MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit * fix: task queueing fixes Signed-off-by: Jérôme Benoit * refactor: code cleanups Signed-off-by: Jérôme Benoit * fix: worker removal in IWRR worker selection Signed-off-by: Jérôme Benoit --------- Signed-off-by: Jérôme Benoit --- package.json | 2 +- pnpm-lock.yaml | 355 +++++++++--------- src/circular-buffer.ts | 30 +- src/pools/abstract-pool.ts | 89 ++--- .../abstract-worker-choice-strategy.ts | 17 +- .../fair-share-worker-choice-strategy.ts | 34 +- ...hted-round-robin-worker-choice-strategy.ts | 17 +- .../least-busy-worker-choice-strategy.ts | 22 +- .../least-elu-worker-choice-strategy.ts | 18 +- .../least-used-worker-choice-strategy.ts | 21 +- .../round-robin-worker-choice-strategy.ts | 30 +- ...hted-round-robin-worker-choice-strategy.ts | 40 +- .../worker-choice-strategies-context.ts | 14 +- src/pools/worker-node.ts | 18 +- src/queues/abstract-fixed-queue.ts | 46 ++- src/queues/fixed-priority-queue.ts | 36 +- src/queues/priority-queue.ts | 177 ++++----- src/queues/queue-types.ts | 2 +- src/worker/utils.ts | 16 +- tests/circular-buffer.test.mjs | 21 +- .../selection-strategies.test.mjs | 4 +- tests/pools/worker-node.test.mjs | 2 - tests/worker/abstract-worker.test.mjs | 10 +- 23 files changed, 537 insertions(+), 484 deletions(-) diff --git a/package.json b/package.json index e11a65237..3c0aa3657 100644 --- a/package.json +++ b/package.json @@ -109,7 +109,7 @@ "@biomejs/biome": "^2.0.6", "@commitlint/cli": "^19.8.1", "@commitlint/config-conventional": "^19.8.1", - "@cspell/eslint-plugin": "^9.1.2", + "@cspell/eslint-plugin": "^9.1.3", "@eslint/js": "^9.30.1", "@rollup/plugin-terser": "^0.4.4", "@rollup/plugin-typescript": "^12.1.4", diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 85a575610..4a30c0adb 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -21,8 +21,8 @@ importers: specifier: ^19.8.1 version: 19.8.1 '@cspell/eslint-plugin': - specifier: ^9.1.2 - version: 9.1.2(eslint@9.30.1(jiti@2.4.2)) + specifier: ^9.1.3 + version: 9.1.3(eslint@9.30.1(jiti@2.4.2)) '@eslint/js': specifier: ^9.30.1 version: 9.30.1 @@ -242,24 +242,24 @@ packages: resolution: {integrity: sha512-/yCrWGCoA1SVKOks25EGadP9Pnj0oAIHGpl2wH2M2Y46dPM2ueb8wyCVOD7O3WCTkaJ0IkKvzhl1JY7+uCT2Dw==} engines: {node: '>=v18'} - '@cspell/cspell-bundled-dicts@9.1.2': - resolution: {integrity: sha512-mdhxj7j1zqXYKO/KPx2MgN3RPAvqoWvncxz2dOMFBcuUteZPt58NenUoi0VZXEhV/FM2V80NvhHZZafaIcxVjQ==} + '@cspell/cspell-bundled-dicts@9.1.3': + resolution: {integrity: sha512-WbOkD32fjxz0hHMP6oTvAgi2VBlzYcqKPNwCo+4b9HefLWV5aiLaxp04d8CeifaAdlYjkjuqRTJXh/HfUeLCVg==} engines: {node: '>=20'} - '@cspell/cspell-pipe@9.1.2': - resolution: {integrity: sha512-/pIhsf4SI4Q/kvehq9GsGKLgbQsRhiDgthQIgO6YOrEa761wOI2hVdRyc0Tgc1iAGiJEedDaFsAhabVRJBeo2g==} + '@cspell/cspell-pipe@9.1.3': + resolution: {integrity: sha512-Cns37ml7IaXMWBci9XOqdTkP9nFtOO8+sJ4VvtbVO68Zo8v0vq74ApDbPgGI2HzYtn7Jj2hxQqGIBdLnmrMPyA==} engines: {node: '>=20'} - '@cspell/cspell-resolver@9.1.2': - resolution: {integrity: sha512-dNDx7yMl2h1Ousk08lizTou+BUvce4RPSnPXrQPB7B7CscgZloSyuP3Yyj1Zt81pHNpggrym4Ezx6tMdyPjESw==} + '@cspell/cspell-resolver@9.1.3': + resolution: {integrity: sha512-3h9AkbY+YutBG91fQxeSpfIRT50sfrNQ7IAS0N6fCvJ6z0sXed7UPYwf90NauQp/1lN/bVlHFFAgxDEyG720Yg==} engines: {node: '>=20'} - '@cspell/cspell-service-bus@9.1.2': - resolution: {integrity: sha512-YOsUctzCMzEJbKdzNyvPkyMen/i7sGO3Xgcczn848GJPlRsJc50QwsoU67SY7zEARz6y2WS0tv5F5RMrRO4idw==} + '@cspell/cspell-service-bus@9.1.3': + resolution: {integrity: sha512-Ss4cCnkJI3IHDSOQKxhtAfypvZZDzuJeXbZFVimLvO22/8GdVH+vQxAFm3kBY+ACVUAe13MQIYzZxuFHaM9y8g==} engines: {node: '>=20'} - '@cspell/cspell-types@9.1.2': - resolution: {integrity: sha512-bSDDjoQi4pbh1BULEA596XCo1PMShTpTb4J2lj8jVYqYgXYQNjSmQFA1fj4NHesC84JpK1um4ybzXBcqtniC7Q==} + '@cspell/cspell-types@9.1.3': + resolution: {integrity: sha512-JPLFMp6qKj4fjsEDvMjVXFZg+j3HaRQ7raFtR2RPidYyKcUHPCVhX0wfJ0vuYxkC0Yst+99tgVxR8Wi57xs2Ew==} engines: {node: '>=20'} '@cspell/dict-ada@4.1.0': @@ -268,8 +268,8 @@ packages: '@cspell/dict-al@1.1.0': resolution: {integrity: sha512-PtNI1KLmYkELYltbzuoztBxfi11jcE9HXBHCpID2lou/J4VMYKJPNqe4ZjVzSI9NYbMnMnyG3gkbhIdx66VSXg==} - '@cspell/dict-aws@4.0.10': - resolution: {integrity: sha512-0qW4sI0GX8haELdhfakQNuw7a2pnWXz3VYQA2MpydH2xT2e6EN9DWFpKAi8DfcChm8MgDAogKkoHtIo075iYng==} + '@cspell/dict-aws@4.0.11': + resolution: {integrity: sha512-nesbrYbxP/ek7Nc3X1ENWxAXJ/2XIKGxauF0k4VSPLtMvWP50gHAEe+zmqFciFolwIVVjF2l+juDdUdBMPbMiw==} '@cspell/dict-bash@4.2.0': resolution: {integrity: sha512-HOyOS+4AbCArZHs/wMxX/apRkjxg6NDWdt0jF9i9XkvJQUltMwEhyA2TWYjQ0kssBsnof+9amax2lhiZnh3kCg==} @@ -432,8 +432,8 @@ packages: '@cspell/dict-swift@2.0.5': resolution: {integrity: sha512-3lGzDCwUmnrfckv3Q4eVSW3sK3cHqqHlPprFJZD4nAqt23ot7fic5ALR7J4joHpvDz36nHX34TgcbZNNZOC/JA==} - '@cspell/dict-terraform@1.1.1': - resolution: {integrity: sha512-07KFDwCU7EnKl4hOZLsLKlj6Zceq/IsQ3LRWUyIjvGFfZHdoGtFdCp3ZPVgnFaAcd/DKv+WVkrOzUBSYqHopQQ==} + '@cspell/dict-terraform@1.1.2': + resolution: {integrity: sha512-RB9dnhxKIiWpwQB+b3JuFa8X4m+6Ny92Y4Z5QARR7jEtapg8iF2ODZX1yLtozp4kFVoRsUKEP6vj3MLv87VTdg==} '@cspell/dict-typescript@3.2.2': resolution: {integrity: sha512-H9Y+uUHsTIDFO/jdfUAcqmcd5osT+2DB5b0aRCHfLWN/twUbGn/1qq3b7YwEvttxKlYzWHU3uNFf+KfA93VY7w==} @@ -441,26 +441,26 @@ packages: '@cspell/dict-vue@3.0.4': resolution: {integrity: sha512-0dPtI0lwHcAgSiQFx8CzvqjdoXROcH+1LyqgROCpBgppommWpVhbQ0eubnKotFEXgpUCONVkeZJ6Ql8NbTEu+w==} - '@cspell/dynamic-import@9.1.2': - resolution: {integrity: sha512-Kg22HCx5m0znVPLea2jRrvMnzHZAAzqcDr5g6Dbd4Pizs5b3SPQuRpFmYaDvKo26JNZnfRqA9eweiuE5aQAf2A==} + '@cspell/dynamic-import@9.1.3': + resolution: {integrity: sha512-+8PxTslsh+oTxmhYdnfQZ/brYGFAnfqLR9xotWE4Ks3HoaLOhZsp6FF9kvlEp/gNOjpyhHn1UhT/Gr5fT4+QhQ==} engines: {node: '>=20'} - '@cspell/eslint-plugin@9.1.2': - resolution: {integrity: sha512-UUCCBAyv3gTL1P19fX9C+cknkwCXHvnHUAaFBz25dX6PhJSPyYPmVdA8jm/2H6+GQYKBnHvWgfjkkiZgtqoQRA==} + '@cspell/eslint-plugin@9.1.3': + resolution: {integrity: sha512-0CqTAs1opvqpdI75d4h3KbI4n1l/JtCd7Wa3zkdalYoY+A4z/6vK222tJTnNgJguxG1usy1jd+snqOqdr3IJSg==} engines: {node: '>=20'} peerDependencies: eslint: ^7 || ^8 || ^9 - '@cspell/filetypes@9.1.2': - resolution: {integrity: sha512-j+6kDz3GbeYwwtlzVosqVaSiFGMhf0u3y8eAP3IV2bTelhP2ZiOLD+yNbAyYGao7p10/Sqv+Ri0yT7IsGLniww==} + '@cspell/filetypes@9.1.3': + resolution: {integrity: sha512-HRJEggDo6OJJmCc/gq7oriMqkqVDema+oLpGBh1a/M7ulw+CzoHkOa//1ohpAJh5KsWj9Tej9Va4BUZ/SaCwUA==} engines: {node: '>=20'} - '@cspell/strong-weak-map@9.1.2': - resolution: {integrity: sha512-6X9oXnklvdt1pd0x0Mh6qXaaIRxjt0G50Xz5ZGm3wpAagv0MFvTThdmYVFfBuZ91x7fDT3u77y3d1uqdGQW1CA==} + '@cspell/strong-weak-map@9.1.3': + resolution: {integrity: sha512-+96SI9R6TOY+xGBOK5LiOgX/W/9gAKus1Cvngh2LdtDVZwgVqpqvm5LoXxLhUT+Vs5UsndRBzblSdNpziSwZtA==} engines: {node: '>=20'} - '@cspell/url@9.1.2': - resolution: {integrity: sha512-PMJBuLYQIdFnEfPHQXaVE5hHUkbbOxOIRmHyZwWEc9+79tIaIkiwLpjZvbm8p6f9WXAaESqXs/uK2tUC/bjwmw==} + '@cspell/url@9.1.3': + resolution: {integrity: sha512-LQQKY0O4QYUNKyDod8VfEBvqeJNGHJlx1v0gDq00eMvaClnkIz+y2ObGdtDlF7ZbG7TgI6PQ3ahJdlqfRPe3ZQ==} engines: {node: '>=20'} '@emnapi/core@1.4.3': @@ -860,98 +860,98 @@ packages: resolution: {integrity: sha512-VRwixir4zBWCSTP/ljEo091lbpypz57PoeAQ9imjG+vbeof9LplljsL1mos4ccG6H9IjfrVGM359RozUnuFhpw==} engines: {node: ^18.18.0 || ^20.9.0 || >=21.1.0} - '@unrs/resolver-binding-android-arm-eabi@1.10.1': - resolution: {integrity: sha512-zohDKXT1Ok0yhbVGff4YAg9HUs5ietG5GpvJBPFSApZnGe7uf2cd26DRhKZbn0Be6xHUZrSzP+RAgMmzyc71EA==} + '@unrs/resolver-binding-android-arm-eabi@1.11.0': + resolution: {integrity: sha512-LRw5BW29sYj9NsQC6QoqeLVQhEa+BwVINYyMlcve+6stwdBsSt5UB7zw4UZB4+4PNqIVilHoMaPWCb/KhABHQw==} cpu: [arm] os: [android] - '@unrs/resolver-binding-android-arm64@1.10.1': - resolution: {integrity: sha512-tAN6k5UrTd4nicpA7s2PbjR/jagpDzAmvXFjbpTazUe5FRsFxVcBlS1F5Lzp5jtWU6bdiqRhSvd4X8rdpCffeA==} + '@unrs/resolver-binding-android-arm64@1.11.0': + resolution: {integrity: sha512-zYX8D2zcWCAHqghA8tPjbp7LwjVXbIZP++mpU/Mrf5jUVlk3BWIxkeB8yYzZi5GpFSlqMcRZQxQqbMI0c2lASQ==} cpu: [arm64] os: [android] - '@unrs/resolver-binding-darwin-arm64@1.10.1': - resolution: {integrity: sha512-+FCsag8WkauI4dQ50XumCXdfvDCZEpMUnvZDsKMxfOisnEklpDFXc6ThY0WqybBYZbiwR5tWcFaZmI0G6b4vrg==} + '@unrs/resolver-binding-darwin-arm64@1.11.0': + resolution: {integrity: sha512-YsYOT049hevAY/lTYD77GhRs885EXPeAfExG5KenqMJ417nYLS2N/kpRpYbABhFZBVQn+2uRPasTe4ypmYoo3w==} cpu: [arm64] os: [darwin] - '@unrs/resolver-binding-darwin-x64@1.10.1': - resolution: {integrity: sha512-qYKGGm5wk71ONcXTMZ0+J11qQeOAPz3nw6VtqrBUUELRyXFyvK8cHhHsLBFR4GHnilc2pgY1HTB2TvdW9wO26Q==} + '@unrs/resolver-binding-darwin-x64@1.11.0': + resolution: {integrity: sha512-PSjvk3OZf1aZImdGY5xj9ClFG3bC4gnSSYWrt+id0UAv+GwwVldhpMFjAga8SpMo2T1GjV9UKwM+QCsQCQmtdA==} cpu: [x64] os: [darwin] - '@unrs/resolver-binding-freebsd-x64@1.10.1': - resolution: {integrity: sha512-hOHMAhbvIQ63gkpgeNsXcWPSyvXH7ZEyeg254hY0Lp/hX8NdW+FsUWq73g9946Pc/BrcVI/I3C1cmZ4RCX9bNw==} + '@unrs/resolver-binding-freebsd-x64@1.11.0': + resolution: {integrity: sha512-KC/iFaEN/wsTVYnHClyHh5RSYA9PpuGfqkFua45r4sweXpC0KHZ+BYY7ikfcGPt5w1lMpR1gneFzuqWLQxsRKg==} cpu: [x64] os: [freebsd] - '@unrs/resolver-binding-linux-arm-gnueabihf@1.10.1': - resolution: {integrity: sha512-6ds7+zzHJgTDmpe0gmFcOTvSUhG5oZukkt+cCsSb3k4Uiz2yEQB4iCRITX2hBwSW+p8gAieAfecITjgqCkswXw==} + '@unrs/resolver-binding-linux-arm-gnueabihf@1.11.0': + resolution: {integrity: sha512-CDh/0v8uot43cB4yKtDL9CVY8pbPnMV0dHyQCE4lFz6PW/+9tS0i9eqP5a91PAqEBVMqH1ycu+k8rP6wQU846w==} cpu: [arm] os: [linux] - '@unrs/resolver-binding-linux-arm-musleabihf@1.10.1': - resolution: {integrity: sha512-P7A0G2/jW00diNJyFeq4W9/nxovD62Ay8CMP4UK9OymC7qO7rG1a8Upad68/bdfpIOn7KSp7Aj/6lEW3yyznAA==} + '@unrs/resolver-binding-linux-arm-musleabihf@1.11.0': + resolution: {integrity: sha512-+TE7epATDSnvwr3L/hNHX3wQ8KQYB+jSDTdywycg3qDqvavRP8/HX9qdq/rMcnaRDn4EOtallb3vL/5wCWGCkw==} cpu: [arm] os: [linux] - '@unrs/resolver-binding-linux-arm64-gnu@1.10.1': - resolution: {integrity: sha512-Cg6xzdkrpltcTPO4At+A79zkC7gPDQIgosJmVV8M104ImB6KZi1MrNXgDYIAfkhUYjPzjNooEDFRAwwPadS7ZA==} + '@unrs/resolver-binding-linux-arm64-gnu@1.11.0': + resolution: {integrity: sha512-VBAYGg3VahofpQ+L4k/ZO8TSICIbUKKTaMYOWHWfuYBFqPbSkArZZLezw3xd27fQkxX4BaLGb/RKnW0dH9Y/UA==} cpu: [arm64] os: [linux] - '@unrs/resolver-binding-linux-arm64-musl@1.10.1': - resolution: {integrity: sha512-aNeg99bVkXa4lt+oZbjNRPC8ZpjJTKxijg/wILrJdzNyAymO2UC/HUK1UfDjt6T7U5p/mK24T3CYOi3/+YEQSA==} + '@unrs/resolver-binding-linux-arm64-musl@1.11.0': + resolution: {integrity: sha512-9IgGFUUb02J1hqdRAHXpZHIeUHRrbnGo6vrRbz0fREH7g+rzQy53/IBSyadZ/LG5iqMxukriNPu4hEMUn+uWEg==} cpu: [arm64] os: [linux] - '@unrs/resolver-binding-linux-ppc64-gnu@1.10.1': - resolution: {integrity: sha512-ylz5ojeXrkPrtnzVhpCO+YegG63/aKhkoTlY8PfMfBfLaUG8v6m6iqrL7sBUKdVBgOB4kSTUPt9efQdA/Y3Z/w==} + '@unrs/resolver-binding-linux-ppc64-gnu@1.11.0': + resolution: {integrity: sha512-LR4iQ/LPjMfivpL2bQ9kmm3UnTas3U+umcCnq/CV7HAkukVdHxrDD1wwx74MIWbbgzQTLPYY7Ur2MnnvkYJCBQ==} cpu: [ppc64] os: [linux] - '@unrs/resolver-binding-linux-riscv64-gnu@1.10.1': - resolution: {integrity: sha512-xcWyhmJfXXOxK7lvE4+rLwBq+on83svlc0AIypfe6x4sMJR+S4oD7n9OynaQShfj2SufPw2KJAotnsNb+4nN2g==} + '@unrs/resolver-binding-linux-riscv64-gnu@1.11.0': + resolution: {integrity: sha512-HCupFQwMrRhrOg7YHrobbB5ADg0Q8RNiuefqMHVsdhEy9lLyXm/CxsCXeLJdrg27NAPsCaMDtdlm8Z2X8x91Tg==} cpu: [riscv64] os: [linux] - '@unrs/resolver-binding-linux-riscv64-musl@1.10.1': - resolution: {integrity: sha512-mW9JZAdOCyorgi1eLJr4gX7xS67WNG9XNPYj5P8VuttK72XNsmdw9yhOO4tDANMgiLXFiSFaiL1gEpoNtRPw/A==} + '@unrs/resolver-binding-linux-riscv64-musl@1.11.0': + resolution: {integrity: sha512-Ckxy76A5xgjWa4FNrzcKul5qFMWgP5JSQ5YKd0XakmWOddPLSkQT+uAvUpQNnFGNbgKzv90DyQlxPDYPQ4nd6A==} cpu: [riscv64] os: [linux] - '@unrs/resolver-binding-linux-s390x-gnu@1.10.1': - resolution: {integrity: sha512-NZGKhBy6xkJ0k09cWNZz4DnhBcGlhDd3W+j7EYoNvf5TSwj2K6kbmfqTWITEgkvjsMUjm1wsrc4IJaH6VtjyHQ==} + '@unrs/resolver-binding-linux-s390x-gnu@1.11.0': + resolution: {integrity: sha512-HfO0PUCCRte2pMJmVyxPI+eqT7KuV3Fnvn2RPvMe5mOzb2BJKf4/Vth8sSt9cerQboMaTVpbxyYjjLBWIuI5BQ==} cpu: [s390x] os: [linux] - '@unrs/resolver-binding-linux-x64-gnu@1.10.1': - resolution: {integrity: sha512-VsjgckJ0gNMw7p0d8In6uPYr+s0p16yrT2rvG4v2jUpEMYkpnfnCiALa9SWshbvlGjKQ98Q2x19agm3iFk8w8Q==} + '@unrs/resolver-binding-linux-x64-gnu@1.11.0': + resolution: {integrity: sha512-9PZdjP7tLOEjpXHS6+B/RNqtfVUyDEmaViPOuSqcbomLdkJnalt5RKQ1tr2m16+qAufV0aDkfhXtoO7DQos/jg==} cpu: [x64] os: [linux] - '@unrs/resolver-binding-linux-x64-musl@1.10.1': - resolution: {integrity: sha512-idMnajMeejnaFi0Mx9UTLSYFDAOTfAEP7VjXNgxKApso3Eu2Njs0p2V95nNIyFi4oQVGFmIuCkoznAXtF/Zbmw==} + '@unrs/resolver-binding-linux-x64-musl@1.11.0': + resolution: {integrity: sha512-qkE99ieiSKMnFJY/EfyGKVtNra52/k+lVF/PbO4EL5nU6AdvG4XhtJ+WHojAJP7ID9BNIra/yd75EHndewNRfA==} cpu: [x64] os: [linux] - '@unrs/resolver-binding-wasm32-wasi@1.10.1': - resolution: {integrity: sha512-7jyhjIRNFjzlr8x5pth6Oi9hv3a7ubcVYm2GBFinkBQKcFhw4nIs5BtauSNtDW1dPIGrxF0ciynCZqzxMrYMsg==} + '@unrs/resolver-binding-wasm32-wasi@1.11.0': + resolution: {integrity: sha512-MjXek8UL9tIX34gymvQLecz2hMaQzOlaqYJJBomwm1gsvK2F7hF+YqJJ2tRyBDTv9EZJGMt4KlKkSD/gZWCOiw==} engines: {node: '>=14.0.0'} cpu: [wasm32] - '@unrs/resolver-binding-win32-arm64-msvc@1.10.1': - resolution: {integrity: sha512-TY79+N+Gkoo7E99K+zmsKNeiuNJYlclZJtKqsHSls8We2iGhgxtletVsiBYie93MSTDRDMI8pkBZJlIJSZPrdA==} + '@unrs/resolver-binding-win32-arm64-msvc@1.11.0': + resolution: {integrity: sha512-9LT6zIGO7CHybiQSh7DnQGwFMZvVr0kUjah6qQfkH2ghucxPV6e71sUXJdSM4Ba0MaGE6DC/NwWf7mJmc3DAng==} cpu: [arm64] os: [win32] - '@unrs/resolver-binding-win32-ia32-msvc@1.10.1': - resolution: {integrity: sha512-BAJN5PEPlEV+1m8+PCtFoKm3LQ1P57B4Z+0+efU0NzmCaGk7pUaOxuPgl+m3eufVeeNBKiPDltG0sSB9qEfCxw==} + '@unrs/resolver-binding-win32-ia32-msvc@1.11.0': + resolution: {integrity: sha512-HYchBYOZ7WN266VjoGm20xFv5EonG/ODURRgwl9EZT7Bq1nLEs6VKJddzfFdXEAho0wfFlt8L/xIiE29Pmy1RA==} cpu: [ia32] os: [win32] - '@unrs/resolver-binding-win32-x64-msvc@1.10.1': - resolution: {integrity: sha512-2v3erKKmmCyIVvvhI2nF15qEbdBpISTq44m9pyd5gfIJB1PN94oePTLWEd82XUbIbvKhv76xTSeUQSCOGesLeg==} + '@unrs/resolver-binding-win32-x64-msvc@1.11.0': + resolution: {integrity: sha512-+oLKLHw3I1UQo4MeHfoLYF+e6YBa8p5vYUw3Rgt7IDzCs+57vIZqQlIo62NDpYM0VG6BjWOwnzBczMvbtH8hag==} cpu: [x64] os: [win32] @@ -1202,33 +1202,33 @@ packages: resolution: {integrity: sha512-uV2QOWP2nWzsy2aMp8aRibhi9dlzF5Hgh5SHaB9OiTGEyDTiJJyx0uy51QXdyWbtAHNua4XJzUKca3OzKUd3vA==} engines: {node: '>= 8'} - cspell-config-lib@9.1.2: - resolution: {integrity: sha512-QvHHGUuMI5h3ymU6O/Qz8zfhMhvPTuopT1FgebYRBB1cyggl4KnEJKU9m7wy/SQ1IGSlFDtQp6rCy70ujTfavQ==} + cspell-config-lib@9.1.3: + resolution: {integrity: sha512-B3DdOTZNIOQahSkOYqaq2fOc8fq/jFkrOFd36kge/GAyEpY2Um/Kp/GQ6caOcev+ju0h3iGaO24OLCx6QJ3YoQ==} engines: {node: '>=20'} - cspell-dictionary@9.1.2: - resolution: {integrity: sha512-Osn5f9ugkX/zA3PVtSmYKRer3gZX3YqVB0UH0wVNzi8Ryl/1RUuYLIcvd0SDEhiVW56WKxFLfZ5sggTz/l9cDA==} + cspell-dictionary@9.1.3: + resolution: {integrity: sha512-BXWwYQ64LaSOd7+8TLZax3AeUnTJUuIl+Tl32/dqcVpgDF4P0eAUVE5xap+QZ2rzKRVFjD8r5M6IR2QrA23o0g==} engines: {node: '>=20'} - cspell-glob@9.1.2: - resolution: {integrity: sha512-l7Mqirn5h2tilTXgRamRIqqnzeA7R5iJEtJkY/zHDMEBeLWTR/5ai7dBp2+ooe8gIebpDtvv4938IXa5/75E6g==} + cspell-glob@9.1.3: + resolution: {integrity: sha512-If7gSgbWlUhLcmNA9zPflWzdUZs4wyRKB/Ze584wrht7zJR4yJm2Rptk2+M8kXEhx3zYS6UGhSL0alPbVAbjgQ==} engines: {node: '>=20'} - cspell-grammar@9.1.2: - resolution: {integrity: sha512-vUcnlUqJKK0yhwYHfGC71zjGyEn918l64U/NWb1ijn1VXrL6gsh3w8Acwdo++zbpOASd9HTAuuZelveDJKLLgA==} + cspell-grammar@9.1.3: + resolution: {integrity: sha512-L1OVY9RyZXPT+qesw0c7aRKTxQIC7nrLKDQ97hRrQhK23hv5Q8o7GVs1S7pXRNZ/oA8V+VNG2CgjLiKnVM2jnw==} engines: {node: '>=20'} hasBin: true - cspell-io@9.1.2: - resolution: {integrity: sha512-oLPxbteI+uFV9ZPcJjII7Lr/C/gVXpdmDLlAMwR8/7LHGnEfxXR0lqYu5GZVEvZ7riX9whCUOsQWQQqr2u2Fzw==} + cspell-io@9.1.3: + resolution: {integrity: sha512-fdgAVrthOY1pPsBZHWVjEVn6uHMAshj2n75eu2rvUd6EcmMuLR13EcIXHoMcQo/1Az05x2UgG7HuK+0MuRcikQ==} engines: {node: '>=20'} - cspell-lib@9.1.2: - resolution: {integrity: sha512-OFCssgfp6Z2gd1K8j2FsYr9YGoA/C6xXlcUwgU75Ut/XMZ/S44chdA9fUupGd4dUOw+CZl0qKzSP21J6kYObIw==} + cspell-lib@9.1.3: + resolution: {integrity: sha512-egESsnErAPtC/wuqbHWW28eRKChkg5h+vFQQuZ0iThuOSZ65jeSM0ESOt8W3TH2JD7EGo2pvPED/7rZjjnMIcQ==} engines: {node: '>=20'} - cspell-trie-lib@9.1.2: - resolution: {integrity: sha512-TkIQaknRRusUznqy+HwpqKCETCAznrzPJJHRHi8m6Zo3tAMsnIpaBQPRN8xem6w8/r/yJqFhLrsLSma0swyviQ==} + cspell-trie-lib@9.1.3: + resolution: {integrity: sha512-fvI0ede/rPr+SB0zX8le426c5lroNdmMTkl4fFk2e0w5/JZRHIfkuenhWe0MZeb18d1NPRIiLgxoD87zswLynw==} engines: {node: '>=20'} dargs@8.1.0: @@ -2476,6 +2476,10 @@ packages: smob@1.5.0: resolution: {integrity: sha512-g6T+p7QO8npa+/hNx9ohv1E5pVCmWrVCUzUXJyLdMmftX6ER0oiWY/w9knEonLpnOp6b6FenKnMfR8gqwWdwig==} + smol-toml@1.4.1: + resolution: {integrity: sha512-CxdwHXyYTONGHThDbq5XdwbFsuY4wlClRGejfE2NtwUtiHYsP1QtNsHb/hnj31jKYSchztJsaA8pSQoVzkfCFg==} + engines: {node: '>= 18'} + source-map-support@0.5.21: resolution: {integrity: sha512-uBHU3L3czsIyYXKX88fdrGovxdSCoTGDRZ6SYXtSRxLZUzHg5P/66Ht6uoUlHu9EZod+inXhKo3qQgwXUT/y1w==} @@ -2696,8 +2700,8 @@ packages: resolution: {integrity: sha512-gptHNQghINnc/vTGIk0SOFGFNXw7JVrlRUtConJRlvaw6DuX0wO5Jeko9sWrMBhh+PsYAZ7oXAiOnf/UKogyiw==} engines: {node: '>= 10.0.0'} - unrs-resolver@1.10.1: - resolution: {integrity: sha512-EFrL7Hw4kmhZdwWO3dwwFJo6hO3FXuQ6Bg8BK/faHZ9m1YxqBS31BNSTxklIQkxK/4LlV8zTYnPsIRLBzTzjCA==} + unrs-resolver@1.11.0: + resolution: {integrity: sha512-uw3hCGO/RdAEAb4zgJ3C/v6KIAFFOtBoxR86b2Ejc5TnH7HrhTWJR2o0A9ullC3eWMegKQCw/arQ/JivywQzkg==} uri-js@4.4.1: resolution: {integrity: sha512-7rKUyy33Q1yc98pQ1DAmLtwX109F7TIfWlW1Ydo8Wl1ii1SeHieeh0HHfPeL2fMXK6z0s8ecKs9frCuLJvndBg==} @@ -2950,11 +2954,11 @@ snapshots: '@types/conventional-commits-parser': 5.0.1 chalk: 5.4.1 - '@cspell/cspell-bundled-dicts@9.1.2': + '@cspell/cspell-bundled-dicts@9.1.3': dependencies: '@cspell/dict-ada': 4.1.0 '@cspell/dict-al': 1.1.0 - '@cspell/dict-aws': 4.0.10 + '@cspell/dict-aws': 4.0.11 '@cspell/dict-bash': 4.2.0 '@cspell/dict-companies': 3.2.1 '@cspell/dict-cpp': 6.0.8 @@ -3007,25 +3011,25 @@ snapshots: '@cspell/dict-sql': 2.2.0 '@cspell/dict-svelte': 1.0.6 '@cspell/dict-swift': 2.0.5 - '@cspell/dict-terraform': 1.1.1 + '@cspell/dict-terraform': 1.1.2 '@cspell/dict-typescript': 3.2.2 '@cspell/dict-vue': 3.0.4 - '@cspell/cspell-pipe@9.1.2': {} + '@cspell/cspell-pipe@9.1.3': {} - '@cspell/cspell-resolver@9.1.2': + '@cspell/cspell-resolver@9.1.3': dependencies: global-directory: 4.0.1 - '@cspell/cspell-service-bus@9.1.2': {} + '@cspell/cspell-service-bus@9.1.3': {} - '@cspell/cspell-types@9.1.2': {} + '@cspell/cspell-types@9.1.3': {} '@cspell/dict-ada@4.1.0': {} '@cspell/dict-al@1.1.0': {} - '@cspell/dict-aws@4.0.10': {} + '@cspell/dict-aws@4.0.11': {} '@cspell/dict-bash@4.2.0': dependencies: @@ -3140,30 +3144,30 @@ snapshots: '@cspell/dict-swift@2.0.5': {} - '@cspell/dict-terraform@1.1.1': {} + '@cspell/dict-terraform@1.1.2': {} '@cspell/dict-typescript@3.2.2': {} '@cspell/dict-vue@3.0.4': {} - '@cspell/dynamic-import@9.1.2': + '@cspell/dynamic-import@9.1.3': dependencies: - '@cspell/url': 9.1.2 + '@cspell/url': 9.1.3 import-meta-resolve: 4.1.0 - '@cspell/eslint-plugin@9.1.2(eslint@9.30.1(jiti@2.4.2))': + '@cspell/eslint-plugin@9.1.3(eslint@9.30.1(jiti@2.4.2))': dependencies: - '@cspell/cspell-types': 9.1.2 - '@cspell/url': 9.1.2 - cspell-lib: 9.1.2 + '@cspell/cspell-types': 9.1.3 + '@cspell/url': 9.1.3 + cspell-lib: 9.1.3 eslint: 9.30.1(jiti@2.4.2) synckit: 0.11.8 - '@cspell/filetypes@9.1.2': {} + '@cspell/filetypes@9.1.3': {} - '@cspell/strong-weak-map@9.1.2': {} + '@cspell/strong-weak-map@9.1.3': {} - '@cspell/url@9.1.2': {} + '@cspell/url@9.1.3': {} '@emnapi/core@1.4.3': dependencies: @@ -3583,63 +3587,63 @@ snapshots: '@typescript-eslint/types': 8.35.1 eslint-visitor-keys: 4.2.1 - '@unrs/resolver-binding-android-arm-eabi@1.10.1': + '@unrs/resolver-binding-android-arm-eabi@1.11.0': optional: true - '@unrs/resolver-binding-android-arm64@1.10.1': + '@unrs/resolver-binding-android-arm64@1.11.0': optional: true - '@unrs/resolver-binding-darwin-arm64@1.10.1': + '@unrs/resolver-binding-darwin-arm64@1.11.0': optional: true - '@unrs/resolver-binding-darwin-x64@1.10.1': + '@unrs/resolver-binding-darwin-x64@1.11.0': optional: true - '@unrs/resolver-binding-freebsd-x64@1.10.1': + '@unrs/resolver-binding-freebsd-x64@1.11.0': optional: true - '@unrs/resolver-binding-linux-arm-gnueabihf@1.10.1': + '@unrs/resolver-binding-linux-arm-gnueabihf@1.11.0': optional: true - '@unrs/resolver-binding-linux-arm-musleabihf@1.10.1': + '@unrs/resolver-binding-linux-arm-musleabihf@1.11.0': optional: true - '@unrs/resolver-binding-linux-arm64-gnu@1.10.1': + '@unrs/resolver-binding-linux-arm64-gnu@1.11.0': optional: true - '@unrs/resolver-binding-linux-arm64-musl@1.10.1': + '@unrs/resolver-binding-linux-arm64-musl@1.11.0': optional: true - '@unrs/resolver-binding-linux-ppc64-gnu@1.10.1': + '@unrs/resolver-binding-linux-ppc64-gnu@1.11.0': optional: true - '@unrs/resolver-binding-linux-riscv64-gnu@1.10.1': + '@unrs/resolver-binding-linux-riscv64-gnu@1.11.0': optional: true - '@unrs/resolver-binding-linux-riscv64-musl@1.10.1': + '@unrs/resolver-binding-linux-riscv64-musl@1.11.0': optional: true - '@unrs/resolver-binding-linux-s390x-gnu@1.10.1': + '@unrs/resolver-binding-linux-s390x-gnu@1.11.0': optional: true - '@unrs/resolver-binding-linux-x64-gnu@1.10.1': + '@unrs/resolver-binding-linux-x64-gnu@1.11.0': optional: true - '@unrs/resolver-binding-linux-x64-musl@1.10.1': + '@unrs/resolver-binding-linux-x64-musl@1.11.0': optional: true - '@unrs/resolver-binding-wasm32-wasi@1.10.1': + '@unrs/resolver-binding-wasm32-wasi@1.11.0': dependencies: '@napi-rs/wasm-runtime': 0.2.11 optional: true - '@unrs/resolver-binding-win32-arm64-msvc@1.10.1': + '@unrs/resolver-binding-win32-arm64-msvc@1.11.0': optional: true - '@unrs/resolver-binding-win32-ia32-msvc@1.10.1': + '@unrs/resolver-binding-win32-ia32-msvc@1.11.0': optional: true - '@unrs/resolver-binding-win32-x64-msvc@1.10.1': + '@unrs/resolver-binding-win32-x64-msvc@1.11.0': optional: true JSONStream@1.3.5: @@ -3918,52 +3922,53 @@ snapshots: shebang-command: 2.0.0 which: 2.0.2 - cspell-config-lib@9.1.2: + cspell-config-lib@9.1.3: dependencies: - '@cspell/cspell-types': 9.1.2 + '@cspell/cspell-types': 9.1.3 comment-json: 4.2.5 + smol-toml: 1.4.1 yaml: 2.8.0 - cspell-dictionary@9.1.2: + cspell-dictionary@9.1.3: dependencies: - '@cspell/cspell-pipe': 9.1.2 - '@cspell/cspell-types': 9.1.2 - cspell-trie-lib: 9.1.2 + '@cspell/cspell-pipe': 9.1.3 + '@cspell/cspell-types': 9.1.3 + cspell-trie-lib: 9.1.3 fast-equals: 5.2.2 - cspell-glob@9.1.2: + cspell-glob@9.1.3: dependencies: - '@cspell/url': 9.1.2 + '@cspell/url': 9.1.3 picomatch: 4.0.2 - cspell-grammar@9.1.2: + cspell-grammar@9.1.3: dependencies: - '@cspell/cspell-pipe': 9.1.2 - '@cspell/cspell-types': 9.1.2 + '@cspell/cspell-pipe': 9.1.3 + '@cspell/cspell-types': 9.1.3 - cspell-io@9.1.2: + cspell-io@9.1.3: dependencies: - '@cspell/cspell-service-bus': 9.1.2 - '@cspell/url': 9.1.2 + '@cspell/cspell-service-bus': 9.1.3 + '@cspell/url': 9.1.3 - cspell-lib@9.1.2: + cspell-lib@9.1.3: dependencies: - '@cspell/cspell-bundled-dicts': 9.1.2 - '@cspell/cspell-pipe': 9.1.2 - '@cspell/cspell-resolver': 9.1.2 - '@cspell/cspell-types': 9.1.2 - '@cspell/dynamic-import': 9.1.2 - '@cspell/filetypes': 9.1.2 - '@cspell/strong-weak-map': 9.1.2 - '@cspell/url': 9.1.2 + '@cspell/cspell-bundled-dicts': 9.1.3 + '@cspell/cspell-pipe': 9.1.3 + '@cspell/cspell-resolver': 9.1.3 + '@cspell/cspell-types': 9.1.3 + '@cspell/dynamic-import': 9.1.3 + '@cspell/filetypes': 9.1.3 + '@cspell/strong-weak-map': 9.1.3 + '@cspell/url': 9.1.3 clear-module: 4.1.2 comment-json: 4.2.5 - cspell-config-lib: 9.1.2 - cspell-dictionary: 9.1.2 - cspell-glob: 9.1.2 - cspell-grammar: 9.1.2 - cspell-io: 9.1.2 - cspell-trie-lib: 9.1.2 + cspell-config-lib: 9.1.3 + cspell-dictionary: 9.1.3 + cspell-glob: 9.1.3 + cspell-grammar: 9.1.3 + cspell-io: 9.1.3 + cspell-trie-lib: 9.1.3 env-paths: 3.0.0 fast-equals: 5.2.2 gensequence: 7.0.0 @@ -3973,10 +3978,10 @@ snapshots: vscode-uri: 3.1.0 xdg-basedir: 5.1.0 - cspell-trie-lib@9.1.2: + cspell-trie-lib@9.1.3: dependencies: - '@cspell/cspell-pipe': 9.1.2 - '@cspell/cspell-types': 9.1.2 + '@cspell/cspell-pipe': 9.1.3 + '@cspell/cspell-types': 9.1.3 gensequence: 7.0.0 dargs@8.1.0: {} @@ -4192,12 +4197,12 @@ snapshots: eslint: 9.30.1(jiti@2.4.2) semver: 7.7.2 - eslint-import-context@0.1.9(unrs-resolver@1.10.1): + eslint-import-context@0.1.9(unrs-resolver@1.11.0): dependencies: get-tsconfig: 4.10.1 stable-hash-x: 0.2.0 optionalDependencies: - unrs-resolver: 1.10.1 + unrs-resolver: 1.11.0 eslint-import-resolver-node@0.3.9: dependencies: @@ -4217,7 +4222,7 @@ snapshots: is-bun-module: 2.0.0 stable-hash: 0.0.5 tinyglobby: 0.2.14 - unrs-resolver: 1.10.1 + unrs-resolver: 1.11.0 optionalDependencies: eslint-plugin-import-x: 4.16.1(@typescript-eslint/utils@8.35.1(eslint@9.30.1(jiti@2.4.2))(typescript@5.8.3))(eslint-import-resolver-node@0.3.9)(eslint@9.30.1(jiti@2.4.2)) transitivePeerDependencies: @@ -4236,12 +4241,12 @@ snapshots: comment-parser: 1.4.1 debug: 4.4.1(supports-color@8.1.1) eslint: 9.30.1(jiti@2.4.2) - eslint-import-context: 0.1.9(unrs-resolver@1.10.1) + eslint-import-context: 0.1.9(unrs-resolver@1.11.0) is-glob: 4.0.3 minimatch: 10.0.3 semver: 7.7.2 stable-hash-x: 0.2.0 - unrs-resolver: 1.10.1 + unrs-resolver: 1.11.0 optionalDependencies: '@typescript-eslint/utils': 8.35.1(eslint@9.30.1(jiti@2.4.2))(typescript@5.8.3) eslint-import-resolver-node: 0.3.9 @@ -5405,6 +5410,8 @@ snapshots: smob@1.5.0: {} + smol-toml@1.4.1: {} + source-map-support@0.5.21: dependencies: buffer-from: 1.1.2 @@ -5652,29 +5659,29 @@ snapshots: universalify@2.0.1: {} - unrs-resolver@1.10.1: + unrs-resolver@1.11.0: dependencies: napi-postinstall: 0.3.0 optionalDependencies: - '@unrs/resolver-binding-android-arm-eabi': 1.10.1 - '@unrs/resolver-binding-android-arm64': 1.10.1 - '@unrs/resolver-binding-darwin-arm64': 1.10.1 - '@unrs/resolver-binding-darwin-x64': 1.10.1 - '@unrs/resolver-binding-freebsd-x64': 1.10.1 - '@unrs/resolver-binding-linux-arm-gnueabihf': 1.10.1 - '@unrs/resolver-binding-linux-arm-musleabihf': 1.10.1 - '@unrs/resolver-binding-linux-arm64-gnu': 1.10.1 - '@unrs/resolver-binding-linux-arm64-musl': 1.10.1 - '@unrs/resolver-binding-linux-ppc64-gnu': 1.10.1 - '@unrs/resolver-binding-linux-riscv64-gnu': 1.10.1 - '@unrs/resolver-binding-linux-riscv64-musl': 1.10.1 - '@unrs/resolver-binding-linux-s390x-gnu': 1.10.1 - '@unrs/resolver-binding-linux-x64-gnu': 1.10.1 - '@unrs/resolver-binding-linux-x64-musl': 1.10.1 - '@unrs/resolver-binding-wasm32-wasi': 1.10.1 - '@unrs/resolver-binding-win32-arm64-msvc': 1.10.1 - '@unrs/resolver-binding-win32-ia32-msvc': 1.10.1 - '@unrs/resolver-binding-win32-x64-msvc': 1.10.1 + '@unrs/resolver-binding-android-arm-eabi': 1.11.0 + '@unrs/resolver-binding-android-arm64': 1.11.0 + '@unrs/resolver-binding-darwin-arm64': 1.11.0 + '@unrs/resolver-binding-darwin-x64': 1.11.0 + '@unrs/resolver-binding-freebsd-x64': 1.11.0 + '@unrs/resolver-binding-linux-arm-gnueabihf': 1.11.0 + '@unrs/resolver-binding-linux-arm-musleabihf': 1.11.0 + '@unrs/resolver-binding-linux-arm64-gnu': 1.11.0 + '@unrs/resolver-binding-linux-arm64-musl': 1.11.0 + '@unrs/resolver-binding-linux-ppc64-gnu': 1.11.0 + '@unrs/resolver-binding-linux-riscv64-gnu': 1.11.0 + '@unrs/resolver-binding-linux-riscv64-musl': 1.11.0 + '@unrs/resolver-binding-linux-s390x-gnu': 1.11.0 + '@unrs/resolver-binding-linux-x64-gnu': 1.11.0 + '@unrs/resolver-binding-linux-x64-musl': 1.11.0 + '@unrs/resolver-binding-wasm32-wasi': 1.11.0 + '@unrs/resolver-binding-win32-arm64-msvc': 1.11.0 + '@unrs/resolver-binding-win32-ia32-msvc': 1.11.0 + '@unrs/resolver-binding-win32-x64-msvc': 1.11.0 uri-js@4.4.1: dependencies: diff --git a/src/circular-buffer.ts b/src/circular-buffer.ts index bcd5920a9..8bd510cdf 100644 --- a/src/circular-buffer.ts +++ b/src/circular-buffer.ts @@ -4,7 +4,7 @@ export const defaultBufferSize = 2048 /** - * Circular buffer designed for positive numbers. + * Circular buffer designed for numbers. * @internal */ export class CircularBuffer { @@ -24,7 +24,7 @@ export class CircularBuffer { this.writeIdx = 0 this.maxArrayIdx = size - 1 this.size = 0 - this.items = new Float32Array(size).fill(-1) + this.items = new Float32Array(size) } /** @@ -48,11 +48,10 @@ export class CircularBuffer { * @returns Number from buffer. */ public get (): number | undefined { - const number = this.items[this.readIdx] - if (number === -1) { - return + if (this.empty()) { + return undefined } - this.items[this.readIdx] = -1 + const number = this.items[this.readIdx] this.readIdx = this.readIdx === this.maxArrayIdx ? 0 : this.readIdx + 1 --this.size return number @@ -63,11 +62,13 @@ export class CircularBuffer { * @param number - Number to put into buffer. */ public put (number: number): void { - this.items[this.writeIdx] = number - this.writeIdx = this.writeIdx === this.maxArrayIdx ? 0 : this.writeIdx + 1 - if (this.size < this.items.length) { + if (this.full()) { + this.readIdx = this.readIdx === this.maxArrayIdx ? 0 : this.readIdx + 1 + } else { ++this.size } + this.items[this.writeIdx] = number + this.writeIdx = this.writeIdx === this.maxArrayIdx ? 0 : this.writeIdx + 1 } /** @@ -75,7 +76,16 @@ export class CircularBuffer { * @returns Numbers' array. */ public toArray (): number[] { - return Array.from(this.items.filter(item => item !== -1)) + const array: number[] = [] + if (this.empty()) { + return array + } + let currentIdx = this.readIdx + for (let i = 0; i < this.size; i++) { + array.push(this.items[currentIdx]) + currentIdx = currentIdx === this.maxArrayIdx ? 0 : currentIdx + 1 + } + return array } /** diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index 639a967e7..ee3b632bb 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -2245,56 +2245,59 @@ export abstract class AbstractPool< private async sendTaskFunctionOperationToWorkers ( message: MessageValue ): Promise { - return await new Promise((resolve, reject) => { - const responsesReceived: MessageValue[] = [] - const taskFunctionOperationsListener = ( - message: MessageValue - ): void => { - this.checkMessageWorkerId(message) - if (message.taskFunctionOperationStatus != null) { - responsesReceived.push(message) - if (responsesReceived.length === this.workerNodes.length) { - if ( - responsesReceived.every( - message => message.taskFunctionOperationStatus === true - ) - ) { - resolve(true) - } else if ( - responsesReceived.some( - message => message.taskFunctionOperationStatus === false - ) - ) { - const errorResponse = responsesReceived.find( - response => response.taskFunctionOperationStatus === false - ) - reject( - new Error( - `Task function operation '${ - message.taskFunctionOperation as string - // eslint-disable-next-line @typescript-eslint/restrict-template-expressions - }' failed on worker ${errorResponse?.workerId?.toString()} with error: '${ - // eslint-disable-next-line @typescript-eslint/restrict-template-expressions - errorResponse?.workerError?.message - }'` - ) + const taskFunctionOperationsListener = ( + message: MessageValue, + resolve: (value: boolean | PromiseLike) => void, + reject: (reason?: unknown) => void, + responsesReceived: MessageValue[] + ): void => { + this.checkMessageWorkerId(message) + if (message.taskFunctionOperationStatus != null) { + responsesReceived.push(message) + if (responsesReceived.length >= this.workerNodes.length) { + if ( + responsesReceived.every( + msg => msg.taskFunctionOperationStatus === true + ) + ) { + resolve(true) + } else { + const errorResponse = responsesReceived.find( + msg => msg.taskFunctionOperationStatus === false + ) + reject( + new Error( + // eslint-disable-next-line @typescript-eslint/restrict-template-expressions + `Task function operation '${message.taskFunctionOperation as string}' failed on worker ${errorResponse?.workerId?.toString()} with error: '${errorResponse?.workerError?.error?.message ?? 'Unknown error'}'` ) - } - this.deregisterWorkerMessageListener( - this.getWorkerNodeKeyByWorkerId(message.workerId), - taskFunctionOperationsListener ) } } } + } + let listener: (message: MessageValue) => void + try { + return await new Promise((resolve, reject) => { + const responsesReceived: MessageValue[] = [] + listener = (message: MessageValue) => { + taskFunctionOperationsListener( + message, + resolve, + reject, + responsesReceived + ) + } + for (const workerNodeKey of this.workerNodes.keys()) { + this.registerWorkerMessageListener(workerNodeKey, listener) + this.sendToWorker(workerNodeKey, message) + } + }) + } finally { for (const workerNodeKey of this.workerNodes.keys()) { - this.registerWorkerMessageListener( - workerNodeKey, - taskFunctionOperationsListener - ) - this.sendToWorker(workerNodeKey, message) + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + this.deregisterWorkerMessageListener(workerNodeKey, listener!) } - }) + } } private setTasksQueuePriority (workerNodeKey: number): void { diff --git a/src/pools/selection-strategies/abstract-worker-choice-strategy.ts b/src/pools/selection-strategies/abstract-worker-choice-strategy.ts index acde2e3e5..68d91ed34 100644 --- a/src/pools/selection-strategies/abstract-worker-choice-strategy.ts +++ b/src/pools/selection-strategies/abstract-worker-choice-strategy.ts @@ -83,16 +83,21 @@ export abstract class AbstractWorkerChoiceStrategy< public abstract update (workerNodeKey: number): boolean /** - * Check the next worker node key. + * Check the worker node key. + * @param workerNodeKey - The worker node key to check. + * @returns The worker node key if it is valid, otherwise undefined. */ - protected checkNextWorkerNodeKey (): void { + protected checkWorkerNodeKey ( + workerNodeKey: number | undefined + ): number | undefined { if ( - this.nextWorkerNodeKey != null && - (this.nextWorkerNodeKey < 0 || - !this.isWorkerNodeReady(this.nextWorkerNodeKey)) + workerNodeKey == null || + workerNodeKey < 0 || + workerNodeKey >= this.pool.workerNodes.length ) { - delete this.nextWorkerNodeKey + return undefined } + return workerNodeKey } /** diff --git a/src/pools/selection-strategies/fair-share-worker-choice-strategy.ts b/src/pools/selection-strategies/fair-share-worker-choice-strategy.ts index 44f39b578..e5e07ecd1 100644 --- a/src/pools/selection-strategies/fair-share-worker-choice-strategy.ts +++ b/src/pools/selection-strategies/fair-share-worker-choice-strategy.ts @@ -60,7 +60,15 @@ export class FairShareWorkerChoiceStrategy< } /** @inheritDoc */ - public remove (): boolean { + public remove (workerNodeKey: number): boolean { + if ( + this.pool.workerNodes[workerNodeKey]?.strategyData + ?.virtualTaskEndTimestamp != null + ) { + this.pool.workerNodes[ + workerNodeKey + ].strategyData.virtualTaskEndTimestamp = undefined + } return true } @@ -96,24 +104,34 @@ export class FairShareWorkerChoiceStrategy< } private fairShareNextWorkerNodeKey (): number | undefined { - return this.pool.workerNodes.reduce( - (minWorkerNodeKey, workerNode, workerNodeKey, workerNodes) => { + const chosenWorkerNodeKey = this.pool.workerNodes.reduce( + (minWorkerNodeKey: number, workerNode, workerNodeKey, workerNodes) => { + if (!this.isWorkerNodeReady(workerNodeKey)) { + return minWorkerNodeKey + } + if (minWorkerNodeKey === -1) { + workerNode.strategyData = { + virtualTaskEndTimestamp: + this.computeWorkerNodeVirtualTaskEndTimestamp(workerNodeKey), + } + return workerNodeKey + } if (workerNode.strategyData?.virtualTaskEndTimestamp == null) { workerNode.strategyData = { virtualTaskEndTimestamp: this.computeWorkerNodeVirtualTaskEndTimestamp(workerNodeKey), } } - return this.isWorkerNodeReady(workerNodeKey) && + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + return workerNode.strategyData.virtualTaskEndTimestamp! < // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - workerNode.strategyData.virtualTaskEndTimestamp! < - // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - workerNodes[minWorkerNodeKey].strategyData!.virtualTaskEndTimestamp! + workerNodes[minWorkerNodeKey].strategyData!.virtualTaskEndTimestamp! ? workerNodeKey : minWorkerNodeKey }, - 0 + -1 ) + return chosenWorkerNodeKey === -1 ? undefined : chosenWorkerNodeKey } private getWorkerNodeVirtualTaskEndTimestamp ( diff --git a/src/pools/selection-strategies/interleaved-weighted-round-robin-worker-choice-strategy.ts b/src/pools/selection-strategies/interleaved-weighted-round-robin-worker-choice-strategy.ts index 5a3310e87..7c37685fa 100644 --- a/src/pools/selection-strategies/interleaved-weighted-round-robin-worker-choice-strategy.ts +++ b/src/pools/selection-strategies/interleaved-weighted-round-robin-worker-choice-strategy.ts @@ -113,16 +113,17 @@ export class InterleavedWeightedRoundRobinWorkerChoiceStrategy< return true } if ( - this.workerNodeId === workerNodeKey && - this.workerNodeId > this.pool.workerNodes.length - 1 + this.nextWorkerNodeKey != null && + this.nextWorkerNodeKey >= workerNodeKey ) { - this.workerNodeId = this.pool.workerNodes.length - 1 + this.nextWorkerNodeKey = + (this.nextWorkerNodeKey - 1 + this.pool.workerNodes.length) % + this.pool.workerNodes.length } - if ( - this.previousWorkerNodeKey === workerNodeKey && - this.previousWorkerNodeKey > this.pool.workerNodes.length - 1 - ) { - this.previousWorkerNodeKey = this.pool.workerNodes.length - 1 + if (this.workerNodeId >= workerNodeKey) { + this.workerNodeId = + (this.workerNodeId - 1 + this.pool.workerNodes.length) % + this.pool.workerNodes.length } return true } diff --git a/src/pools/selection-strategies/least-busy-worker-choice-strategy.ts b/src/pools/selection-strategies/least-busy-worker-choice-strategy.ts index ef61c72a0..cba9e17ee 100644 --- a/src/pools/selection-strategies/least-busy-worker-choice-strategy.ts +++ b/src/pools/selection-strategies/least-busy-worker-choice-strategy.ts @@ -70,17 +70,23 @@ export class LeastBusyWorkerChoiceStrategy< } private leastBusyNextWorkerNodeKey (): number | undefined { - return this.pool.workerNodes.reduce( - (minWorkerNodeKey, workerNode, workerNodeKey, workerNodes) => { - return this.isWorkerNodeReady(workerNodeKey) && - (workerNode.usage.waitTime.aggregate ?? 0) + - (workerNode.usage.runTime.aggregate ?? 0) < - (workerNodes[minWorkerNodeKey].usage.waitTime.aggregate ?? 0) + - (workerNodes[minWorkerNodeKey].usage.runTime.aggregate ?? 0) + const chosenWorkerNodeKey = this.pool.workerNodes.reduce( + (minWorkerNodeKey: number, workerNode, workerNodeKey, workerNodes) => { + if (!this.isWorkerNodeReady(workerNodeKey)) { + return minWorkerNodeKey + } + if (minWorkerNodeKey === -1) { + return workerNodeKey + } + return (workerNode.usage.waitTime.aggregate ?? 0) + + (workerNode.usage.runTime.aggregate ?? 0) < + (workerNodes[minWorkerNodeKey].usage.waitTime.aggregate ?? 0) + + (workerNodes[minWorkerNodeKey].usage.runTime.aggregate ?? 0) ? workerNodeKey : minWorkerNodeKey }, - 0 + -1 ) + return chosenWorkerNodeKey === -1 ? undefined : chosenWorkerNodeKey } } diff --git a/src/pools/selection-strategies/least-elu-worker-choice-strategy.ts b/src/pools/selection-strategies/least-elu-worker-choice-strategy.ts index 7ac70513a..5a52a280a 100644 --- a/src/pools/selection-strategies/least-elu-worker-choice-strategy.ts +++ b/src/pools/selection-strategies/least-elu-worker-choice-strategy.ts @@ -66,15 +66,21 @@ export class LeastEluWorkerChoiceStrategy< } private leastEluNextWorkerNodeKey (): number | undefined { - return this.pool.workerNodes.reduce( - (minWorkerNodeKey, workerNode, workerNodeKey, workerNodes) => { - return this.isWorkerNodeReady(workerNodeKey) && - (workerNode.usage.elu.active.aggregate ?? 0) < - (workerNodes[minWorkerNodeKey].usage.elu.active.aggregate ?? 0) + const chosenWorkerNodeKey = this.pool.workerNodes.reduce( + (minWorkerNodeKey: number, workerNode, workerNodeKey, workerNodes) => { + if (!this.isWorkerNodeReady(workerNodeKey)) { + return minWorkerNodeKey + } + if (minWorkerNodeKey === -1) { + return workerNodeKey + } + return (workerNode.usage.elu.active.aggregate ?? 0) < + (workerNodes[minWorkerNodeKey].usage.elu.active.aggregate ?? 0) ? workerNodeKey : minWorkerNodeKey }, - 0 + -1 ) + return chosenWorkerNodeKey === -1 ? undefined : chosenWorkerNodeKey } } diff --git a/src/pools/selection-strategies/least-used-worker-choice-strategy.ts b/src/pools/selection-strategies/least-used-worker-choice-strategy.ts index 5cdcb3b72..5d877ba9a 100644 --- a/src/pools/selection-strategies/least-used-worker-choice-strategy.ts +++ b/src/pools/selection-strategies/least-used-worker-choice-strategy.ts @@ -51,16 +51,23 @@ export class LeastUsedWorkerChoiceStrategy< } private leastUsedNextWorkerNodeKey (): number | undefined { - return this.pool.workerNodes.reduce( - (minWorkerNodeKey, workerNode, workerNodeKey, workerNodes) => { - return this.isWorkerNodeReady(workerNodeKey) && - workerNode.usage.tasks.executing + workerNode.usage.tasks.queued < - workerNodes[minWorkerNodeKey].usage.tasks.executing + - workerNodes[minWorkerNodeKey].usage.tasks.queued + const chosenWorkerNodeKey = this.pool.workerNodes.reduce( + (minWorkerNodeKey: number, workerNode, workerNodeKey, workerNodes) => { + if (!this.isWorkerNodeReady(workerNodeKey)) { + return minWorkerNodeKey + } + if (minWorkerNodeKey === -1) { + return workerNodeKey + } + return workerNode.usage.tasks.executing + + workerNode.usage.tasks.queued < + workerNodes[minWorkerNodeKey].usage.tasks.executing + + workerNodes[minWorkerNodeKey].usage.tasks.queued ? workerNodeKey : minWorkerNodeKey }, - 0 + -1 ) + return chosenWorkerNodeKey === -1 ? undefined : chosenWorkerNodeKey } } diff --git a/src/pools/selection-strategies/round-robin-worker-choice-strategy.ts b/src/pools/selection-strategies/round-robin-worker-choice-strategy.ts index a70b72e92..5967f5493 100644 --- a/src/pools/selection-strategies/round-robin-worker-choice-strategy.ts +++ b/src/pools/selection-strategies/round-robin-worker-choice-strategy.ts @@ -30,30 +30,30 @@ export class RoundRobinWorkerChoiceStrategy< /** @inheritDoc */ public choose (): number | undefined { - const chosenWorkerNodeKey = this.nextWorkerNodeKey - this.setPreviousWorkerNodeKey(chosenWorkerNodeKey) + this.setPreviousWorkerNodeKey(this.nextWorkerNodeKey) this.roundRobinNextWorkerNodeKey() - this.checkNextWorkerNodeKey() - return chosenWorkerNodeKey + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + if (!this.isWorkerNodeReady(this.nextWorkerNodeKey!)) { + return undefined + } + return this.checkWorkerNodeKey(this.nextWorkerNodeKey) } /** @inheritDoc */ public remove (workerNodeKey: number): boolean { if (this.pool.workerNodes.length === 0) { - this.reset() - return true - } - if ( - this.nextWorkerNodeKey === workerNodeKey && - this.nextWorkerNodeKey > this.pool.workerNodes.length - 1 - ) { - this.nextWorkerNodeKey = this.pool.workerNodes.length - 1 + return this.reset() } if ( - this.previousWorkerNodeKey === workerNodeKey && - this.previousWorkerNodeKey > this.pool.workerNodes.length - 1 + this.nextWorkerNodeKey != null && + this.nextWorkerNodeKey >= workerNodeKey ) { - this.previousWorkerNodeKey = this.pool.workerNodes.length - 1 + this.nextWorkerNodeKey = + (this.nextWorkerNodeKey - 1 + this.pool.workerNodes.length) % + this.pool.workerNodes.length + if (this.previousWorkerNodeKey >= workerNodeKey) { + this.previousWorkerNodeKey = this.nextWorkerNodeKey + } } return true } diff --git a/src/pools/selection-strategies/weighted-round-robin-worker-choice-strategy.ts b/src/pools/selection-strategies/weighted-round-robin-worker-choice-strategy.ts index edc54cd5b..769a5c02b 100644 --- a/src/pools/selection-strategies/weighted-round-robin-worker-choice-strategy.ts +++ b/src/pools/selection-strategies/weighted-round-robin-worker-choice-strategy.ts @@ -57,27 +57,31 @@ export class WeightedRoundRobinWorkerChoiceStrategy< public choose (): number | undefined { this.setPreviousWorkerNodeKey(this.nextWorkerNodeKey) this.weightedRoundRobinNextWorkerNodeKey() - this.checkNextWorkerNodeKey() - return this.nextWorkerNodeKey + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + if (!this.isWorkerNodeReady(this.nextWorkerNodeKey!)) { + return undefined + } + return this.checkWorkerNodeKey(this.nextWorkerNodeKey) } /** @inheritDoc */ public remove (workerNodeKey: number): boolean { if (this.pool.workerNodes.length === 0) { - this.reset() - return true + return this.reset() } if (this.nextWorkerNodeKey === workerNodeKey) { this.workerNodeVirtualTaskExecutionTime = 0 - if (this.nextWorkerNodeKey > this.pool.workerNodes.length - 1) { - this.nextWorkerNodeKey = this.pool.workerNodes.length - 1 - } } if ( - this.previousWorkerNodeKey === workerNodeKey && - this.previousWorkerNodeKey > this.pool.workerNodes.length - 1 + this.nextWorkerNodeKey != null && + this.nextWorkerNodeKey >= workerNodeKey ) { - this.previousWorkerNodeKey = this.pool.workerNodes.length - 1 + this.nextWorkerNodeKey = + (this.nextWorkerNodeKey - 1 + this.pool.workerNodes.length) % + this.pool.workerNodes.length + if (this.previousWorkerNodeKey >= workerNodeKey) { + this.previousWorkerNodeKey = this.nextWorkerNodeKey + } } return true } @@ -95,22 +99,18 @@ export class WeightedRoundRobinWorkerChoiceStrategy< } private weightedRoundRobinNextWorkerNodeKey (): number | undefined { - const workerWeight = - // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - this.opts!.weights![this.nextWorkerNodeKey ?? this.previousWorkerNodeKey] + const workerNodeKey = this.nextWorkerNodeKey ?? this.previousWorkerNodeKey + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + const workerWeight = this.opts!.weights![workerNodeKey] if (this.workerNodeVirtualTaskExecutionTime < workerWeight) { this.workerNodeVirtualTaskExecutionTime += - this.getWorkerNodeTaskWaitTime( - this.nextWorkerNodeKey ?? this.previousWorkerNodeKey - ) + - this.getWorkerNodeTaskRunTime( - this.nextWorkerNodeKey ?? this.previousWorkerNodeKey - ) + this.getWorkerNodeTaskWaitTime(workerNodeKey) + + this.getWorkerNodeTaskRunTime(workerNodeKey) } else { this.nextWorkerNodeKey = this.nextWorkerNodeKey === this.pool.workerNodes.length - 1 ? 0 - : (this.nextWorkerNodeKey ?? this.previousWorkerNodeKey) + 1 + : workerNodeKey + 1 this.workerNodeVirtualTaskExecutionTime = 0 } return this.nextWorkerNodeKey diff --git a/src/pools/selection-strategies/worker-choice-strategies-context.ts b/src/pools/selection-strategies/worker-choice-strategies-context.ts index 0e5085bec..fae1e1304 100644 --- a/src/pools/selection-strategies/worker-choice-strategies-context.ts +++ b/src/pools/selection-strategies/worker-choice-strategies-context.ts @@ -239,17 +239,13 @@ export class WorkerChoiceStrategiesContext< * @throws {@link https://nodejs.org/api/errors.html#class-error} If after computed retries the worker node key is null or undefined. */ private executeStrategy (workerChoiceStrategy: IWorkerChoiceStrategy): number { - let workerNodeKey: number | undefined - let chooseCount = 0 + let workerNodeKey: number | undefined = workerChoiceStrategy.choose() let retriesCount = 0 - do { + while (workerNodeKey == null && retriesCount < this.retries) { workerNodeKey = workerChoiceStrategy.choose() - if (workerNodeKey == null && chooseCount > 0) { - ++retriesCount - ++this.retriesCount - } - ++chooseCount - } while (workerNodeKey == null && retriesCount < this.retries) + retriesCount++ + this.retriesCount++ + } if (workerNodeKey == null) { throw new Error( `Worker node key chosen is null or undefined after ${retriesCount.toString()} retries` diff --git a/src/pools/worker-node.ts b/src/pools/worker-node.ts index 8e33add0f..057c206a9 100644 --- a/src/pools/worker-node.ts +++ b/src/pools/worker-node.ts @@ -46,7 +46,6 @@ export class WorkerNode public usage: WorkerUsage /** @inheritdoc */ public readonly worker: Worker - private setBackPressureFlag: boolean private readonly taskFunctionsUsage: Map /** @@ -73,7 +72,6 @@ export class WorkerNode opts.tasksQueueBucketSize, opts.tasksQueuePriority ) - this.setBackPressureFlag = false this.taskFunctionsUsage = new Map() } @@ -101,14 +99,8 @@ export class WorkerNode /** @inheritdoc */ public dequeueTask (bucket?: number): Task | undefined { const task = this.tasksQueue.dequeue(bucket) - if ( - !this.setBackPressureFlag && - !this.hasBackPressure() && - this.info.backPressure - ) { - this.setBackPressureFlag = true + if (!this.hasBackPressure() && this.info.backPressure) { this.info.backPressure = false - this.setBackPressureFlag = false } return task } @@ -116,15 +108,9 @@ export class WorkerNode /** @inheritdoc */ public enqueueTask (task: Task): number { const tasksQueueSize = this.tasksQueue.enqueue(task, task.priority) - if ( - !this.setBackPressureFlag && - this.hasBackPressure() && - !this.info.backPressure - ) { - this.setBackPressureFlag = true + if (this.hasBackPressure() && !this.info.backPressure) { this.info.backPressure = true this.emit('backPressure', { workerId: this.info.id }) - this.setBackPressureFlag = false } return tasksQueueSize } diff --git a/src/queues/abstract-fixed-queue.ts b/src/queues/abstract-fixed-queue.ts index 9bbd32fe4..cd8969f67 100644 --- a/src/queues/abstract-fixed-queue.ts +++ b/src/queues/abstract-fixed-queue.ts @@ -13,7 +13,7 @@ export abstract class AbstractFixedQueue implements IFixedQueue { /** @inheritdoc */ public readonly capacity: number /** @inheritdoc */ - public nodeArray: FixedQueueNode[] + public nodeArray: (FixedQueueNode | undefined)[] /** @inheritdoc */ public size!: number protected start!: number @@ -38,11 +38,34 @@ export abstract class AbstractFixedQueue implements IFixedQueue { /** @inheritdoc */ public delete (data: T): boolean { - // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition - const index = this.nodeArray.findIndex(node => node?.data === data) - if (index !== -1) { - this.nodeArray.splice(index, 1) - this.nodeArray.length = this.capacity + let currentPhysicalIndex = this.start + let logicalIndex = -1 + for (let i = 0; i < this.size; i++) { + if (this.nodeArray[currentPhysicalIndex]?.data === data) { + logicalIndex = i + break + } + currentPhysicalIndex++ + if (currentPhysicalIndex === this.capacity) { + currentPhysicalIndex = 0 + } + } + if (logicalIndex !== -1) { + if (logicalIndex === this.size - 1) { + this.nodeArray[currentPhysicalIndex] = undefined + --this.size + return true + } + let physicalShiftIndex = currentPhysicalIndex + for (let i = logicalIndex; i < this.size - 1; i++) { + let nextPhysicalIndex = physicalShiftIndex + 1 + if (nextPhysicalIndex === this.capacity) { + nextPhysicalIndex = 0 + } + this.nodeArray[physicalShiftIndex] = this.nodeArray[nextPhysicalIndex] + physicalShiftIndex = nextPhysicalIndex + } + this.nodeArray[physicalShiftIndex] = undefined --this.size return true } @@ -55,12 +78,13 @@ export abstract class AbstractFixedQueue implements IFixedQueue { return undefined } const index = this.start - --this.size ++this.start if (this.start === this.capacity) { this.start = 0 } - return this.nodeArray[index].data + --this.size + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + return this.nodeArray[index]!.data } /** @inheritdoc */ @@ -85,7 +109,8 @@ export abstract class AbstractFixedQueue implements IFixedQueue { if (index >= this.capacity) { index -= this.capacity } - return this.nodeArray[index].data + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + return this.nodeArray[index]!.data } /** @inheritdoc */ @@ -100,7 +125,8 @@ export abstract class AbstractFixedQueue implements IFixedQueue { value: undefined, } } - const value = this.nodeArray[index].data + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + const value = this.nodeArray[index]!.data ++index ++i if (index === this.capacity) { diff --git a/src/queues/fixed-priority-queue.ts b/src/queues/fixed-priority-queue.ts index 275a78403..d7f65b5ad 100644 --- a/src/queues/fixed-priority-queue.ts +++ b/src/queues/fixed-priority-queue.ts @@ -16,27 +16,35 @@ export class FixedPriorityQueue throw new Error('Fixed priority queue is full') } priority = priority ?? 0 - let inserted = false - let index = this.start + let insertionPhysicalIndex = -1 + let currentPhysicalIndex = this.start for (let i = 0; i < this.size; i++) { - if (this.nodeArray[index].priority > priority) { - this.nodeArray.splice(index, 0, { data, priority }) - this.nodeArray.length = this.capacity - inserted = true + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + if (this.nodeArray[currentPhysicalIndex]!.priority > priority) { + insertionPhysicalIndex = currentPhysicalIndex break } - ++index - if (index === this.capacity) { - index = 0 + currentPhysicalIndex++ + if (currentPhysicalIndex === this.capacity) { + currentPhysicalIndex = 0 } } - if (!inserted) { - let index = this.start + this.size - if (index >= this.capacity) { - index -= this.capacity + let end = this.start + this.size + if (end >= this.capacity) { + end -= this.capacity + } + if (insertionPhysicalIndex === -1) { + insertionPhysicalIndex = end + } else { + let toShiftIndex = end + while (toShiftIndex !== insertionPhysicalIndex) { + const previousIndex = + toShiftIndex === 0 ? this.capacity - 1 : toShiftIndex - 1 + this.nodeArray[toShiftIndex] = this.nodeArray[previousIndex] + toShiftIndex = previousIndex } - this.nodeArray[index] = { data, priority } } + this.nodeArray[insertionPhysicalIndex] = { data, priority } return ++this.size } } diff --git a/src/queues/priority-queue.ts b/src/queues/priority-queue.ts index 3b12b82e0..ee6a0d26b 100644 --- a/src/queues/priority-queue.ts +++ b/src/queues/priority-queue.ts @@ -17,7 +17,8 @@ import { export class PriorityQueue { /** The priority queue maximum size. */ public maxSize!: number - + /** The priority queue size. */ + public size!: number /** * The number of filled prioritized buckets. * @returns The number of filled prioritized buckets. @@ -43,44 +44,11 @@ export class PriorityQueue { return } this.priorityEnabled = enablePriority - let head: PriorityQueueNode - let tail: PriorityQueueNode - let prev: PriorityQueueNode | undefined - let node: PriorityQueueNode | undefined = this.tail - let buckets = 0 - while (node != null) { - const currentNode = this.getPriorityQueueNode(node.nodeArray) - if (buckets === 0) { - tail = currentNode - } - if (prev != null) { - prev.next = currentNode - } - prev = currentNode - if (node.next == null) { - head = currentNode - } - ++buckets - node = node.next - } - // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - this.head = head! - // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - this.tail = tail! - } - - /** - * The priority queue size. - * @returns The priority queue size. - */ - public get size (): number { - let node: PriorityQueueNode | undefined = this.tail - let size = 0 - while (node != null) { - size += node.size - node = node.next + const data: T[] = Array.from(this) + this.clear() + for (const dataItem of data) { + this.enqueue(dataItem) } - return size } private readonly bucketSize: number @@ -116,6 +84,7 @@ export class PriorityQueue { */ public clear (): void { this.head = this.tail = this.getPriorityQueueNode() + this.size = 0 this.maxSize = 0 } @@ -129,18 +98,20 @@ export class PriorityQueue { let prev: PriorityQueueNode | undefined while (node != null) { if (node.delete(data)) { - if (node.empty()) { - if (node === this.tail && node.next != null) { - this.tail = node.next - delete node.next - } else if (node.next != null && prev != null) { - prev.next = node.next - delete node.next - } else if (node.next == null && prev != null) { - delete prev.next - this.head = prev + if (node.empty() && this.head !== this.tail) { + if (node === this.tail) { + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + this.tail = node.next! + } else { + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + prev!.next = node.next + if (node === this.head) { + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + this.head = prev! + } } } + --this.size return true } prev = node @@ -155,47 +126,44 @@ export class PriorityQueue { * @returns The dequeued data or `undefined` if the priority queue is empty. */ public dequeue (bucket?: number): T | undefined { - let tail: PriorityQueueNode | undefined = this.tail - let tailChanged = false + if (this.size === 0) { + return undefined + } + let targetNode: PriorityQueueNode | undefined = this.tail + let prev: PriorityQueueNode | undefined if (bucket != null && bucket > 0) { let currentBucket = 1 - while (tail != null) { - if (currentBucket === bucket) { - break - } + while (targetNode.next != null && currentBucket < bucket) { + prev = targetNode + targetNode = targetNode.next ++currentBucket - tail = tail.next } - tailChanged = tail !== this.tail + if (currentBucket < bucket || targetNode.empty()) { + return undefined + } + } else { + while (targetNode?.empty() === true && targetNode !== this.head) { + prev = targetNode + targetNode = targetNode.next + } + } + if (targetNode?.empty() === true) { + return undefined } // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - const data = tail!.dequeue() + const data = targetNode!.dequeue() + --this.size // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - if (tail!.empty()) { - // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - if (!tailChanged && tail!.next != null) { + if (targetNode!.empty() && this.head !== this.tail) { + if (targetNode === this.tail) { // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - this.tail = tail!.next + this.tail = this.tail.next! + } else { // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - delete tail!.next - } else if (tailChanged) { - let node: PriorityQueueNode | undefined = this.tail - while (node != null) { - // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - if (node.next === tail && tail!.next != null) { - // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - node.next = tail!.next - // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - delete tail!.next - break - } + prev!.next = targetNode!.next + if (targetNode === this.head) { // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - if (node.next === tail && tail!.next == null) { - delete node.next - this.head = node - break - } - node = node.next + this.head = prev! } } } @@ -213,11 +181,11 @@ export class PriorityQueue { this.head = this.head.next = this.getPriorityQueueNode() } this.head.enqueue(data, priority) - const size = this.size - if (size > this.maxSize) { - this.maxSize = size + ++this.size + if (this.size > this.maxSize) { + this.maxSize = this.size } - return size + return this.size } /** @@ -226,32 +194,37 @@ export class PriorityQueue { * @see https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Iteration_protocols */ public [Symbol.iterator] (): Iterator { + let node: PriorityQueueNode | undefined = this.tail let index = 0 - let node = this.tail - return { - next: () => { - const value = node.get(index) as T - if (value == null) { - return { - done: true, - value: undefined, - } + const getNextValue = (): IteratorResult => { + if (node == null) { + return { done: true, value: undefined } + } + + while (index >= node.size) { + node = node.next + index = 0 + if (node == null) { + return { done: true, value: undefined } } + } + + const value = node.get(index) + if (value == null) { ++index - if (index === node.capacity && node.next != null) { - node = node.next - index = 0 - } - return { - done: false, - value, - } - }, + return getNextValue() + } + + ++index + return { done: false, value } + } + return { + next: getNextValue, } } private getPriorityQueueNode ( - nodeArray?: FixedQueueNode[] + nodeArray?: (FixedQueueNode | undefined)[] ): PriorityQueueNode { let fixedQueue: IFixedQueue if (this.priorityEnabled) { diff --git a/src/queues/queue-types.ts b/src/queues/queue-types.ts index c8aba72eb..c5710d422 100644 --- a/src/queues/queue-types.ts +++ b/src/queues/queue-types.ts @@ -68,7 +68,7 @@ export interface IFixedQueue { */ get: (index: number) => T | undefined /** The fixed queue node array. */ - nodeArray: FixedQueueNode[] + nodeArray: (FixedQueueNode | undefined)[] /** The fixed queue size. */ readonly size: number } diff --git a/src/worker/utils.ts b/src/worker/utils.ts index 1b0ef6f9e..28822508f 100644 --- a/src/worker/utils.ts +++ b/src/worker/utils.ts @@ -23,11 +23,8 @@ export const checkValidWorkerOptions = ( } if ( opts?.maxInactiveTime != null && - !Number.isSafeInteger(opts.maxInactiveTime) + (!Number.isSafeInteger(opts.maxInactiveTime) || opts.maxInactiveTime < 5) ) { - throw new TypeError('maxInactiveTime option is not an integer') - } - if (opts?.maxInactiveTime != null && opts.maxInactiveTime < 5) { throw new TypeError( 'maxInactiveTime option is not a positive integer greater or equal than 5' ) @@ -44,14 +41,7 @@ export const checkValidTaskFunctionObjectEntry = < name: string, fnObj: TaskFunctionObject ): void => { - if (typeof name !== 'string') { - throw new TypeError('A taskFunctions parameter object key is not a string') - } - if (typeof name === 'string' && name.trim().length === 0) { - throw new TypeError( - 'A taskFunctions parameter object key is an empty string' - ) - } + checkTaskFunctionName(name) if (typeof fnObj.taskFunction !== 'function') { throw new TypeError( // eslint-disable-next-line @typescript-eslint/restrict-template-expressions @@ -66,7 +56,7 @@ export const checkTaskFunctionName = (name: string): void => { if (typeof name !== 'string') { throw new TypeError('name parameter is not a string') } - if (typeof name === 'string' && name.trim().length === 0) { + if (name.trim().length === 0) { throw new TypeError('name parameter is an empty string') } } diff --git a/tests/circular-buffer.test.mjs b/tests/circular-buffer.test.mjs index c1a1a4f59..b597aafe8 100644 --- a/tests/circular-buffer.test.mjs +++ b/tests/circular-buffer.test.mjs @@ -41,29 +41,33 @@ describe('Circular buffer test suite', () => { it('Verify that circular buffer put() works as intended', () => { const circularBuffer = new CircularBuffer(4) circularBuffer.put(1) - expect(circularBuffer.items).toStrictEqual( - new Float32Array([1, -1, -1, -1]) - ) + expect(circularBuffer.items).toStrictEqual(new Float32Array([1, 0, 0, 0])) + expect(circularBuffer.readIdx).toBe(0) expect(circularBuffer.writeIdx).toBe(1) expect(circularBuffer.size).toBe(1) circularBuffer.put(2) - expect(circularBuffer.items).toStrictEqual(new Float32Array([1, 2, -1, -1])) + expect(circularBuffer.items).toStrictEqual(new Float32Array([1, 2, 0, 0])) + expect(circularBuffer.readIdx).toBe(0) expect(circularBuffer.writeIdx).toBe(2) expect(circularBuffer.size).toBe(2) circularBuffer.put(3) - expect(circularBuffer.items).toStrictEqual(new Float32Array([1, 2, 3, -1])) + expect(circularBuffer.items).toStrictEqual(new Float32Array([1, 2, 3, 0])) + expect(circularBuffer.readIdx).toBe(0) expect(circularBuffer.writeIdx).toBe(3) expect(circularBuffer.size).toBe(3) circularBuffer.put(4) expect(circularBuffer.items).toStrictEqual(new Float32Array([1, 2, 3, 4])) + expect(circularBuffer.readIdx).toBe(0) expect(circularBuffer.writeIdx).toBe(0) expect(circularBuffer.size).toBe(4) circularBuffer.put(5) expect(circularBuffer.items).toStrictEqual(new Float32Array([5, 2, 3, 4])) + expect(circularBuffer.readIdx).toBe(1) expect(circularBuffer.writeIdx).toBe(1) expect(circularBuffer.size).toBe(4) circularBuffer.put(6) expect(circularBuffer.items).toStrictEqual(new Float32Array([5, 6, 3, 4])) + expect(circularBuffer.readIdx).toBe(2) expect(circularBuffer.writeIdx).toBe(2) expect(circularBuffer.size).toBe(4) }) @@ -149,11 +153,16 @@ describe('Circular buffer test suite', () => { it('Verify that circular buffer toArray() works as intended', () => { const circularBuffer = new CircularBuffer(4) circularBuffer.put(1) + expect(circularBuffer.toArray()).toStrictEqual([1]) circularBuffer.put(2) + expect(circularBuffer.toArray()).toStrictEqual([1, 2]) circularBuffer.put(3) + expect(circularBuffer.toArray()).toStrictEqual([1, 2, 3]) circularBuffer.put(4) + expect(circularBuffer.toArray()).toStrictEqual([1, 2, 3, 4]) circularBuffer.put(5) + expect(circularBuffer.toArray()).toStrictEqual([2, 3, 4, 5]) circularBuffer.put(6) - expect(circularBuffer.toArray()).toStrictEqual([5, 6, 3, 4]) + expect(circularBuffer.toArray()).toStrictEqual([3, 4, 5, 6]) }) }) diff --git a/tests/pools/selection-strategies/selection-strategies.test.mjs b/tests/pools/selection-strategies/selection-strategies.test.mjs index 949cb3d92..89d1e3af9 100644 --- a/tests/pools/selection-strategies/selection-strategies.test.mjs +++ b/tests/pools/selection-strategies/selection-strategies.test.mjs @@ -336,12 +336,12 @@ describe('Selection strategies test suite', () => { pool.workerChoiceStrategiesContext.workerChoiceStrategies.get( pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy ).nextWorkerNodeKey - ).toBe(0) + ).toBe(pool.workerNodes.length - 1) expect( pool.workerChoiceStrategiesContext.workerChoiceStrategies.get( pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy ).previousWorkerNodeKey - ).toBe(pool.workerNodes.length - 1) + ).toBe(pool.workerNodes.length - 2) // We need to clean up the resources after our test await pool.destroy() }) diff --git a/tests/pools/worker-node.test.mjs b/tests/pools/worker-node.test.mjs index 709c67699..6972a5aea 100644 --- a/tests/pools/worker-node.test.mjs +++ b/tests/pools/worker-node.test.mjs @@ -292,7 +292,6 @@ describe('Worker node test suite', () => { expect(threadWorkerNode.tasksQueueSize()).toBe( threadWorkerNode.tasksQueue.size ) - expect(threadWorkerNode.setBackPressureFlag).toBe(false) expect(threadWorkerNode.taskFunctionsUsage).toBeInstanceOf(Map) expect(clusterWorkerNode).toBeInstanceOf(WorkerNode) @@ -355,7 +354,6 @@ describe('Worker node test suite', () => { expect(clusterWorkerNode.tasksQueueSize()).toBe( clusterWorkerNode.tasksQueue.size ) - expect(clusterWorkerNode.setBackPressureFlag).toBe(false) expect(clusterWorkerNode.taskFunctionsUsage).toBeInstanceOf(Map) }) diff --git a/tests/worker/abstract-worker.test.mjs b/tests/worker/abstract-worker.test.mjs index 68238dea7..3ab729e9c 100644 --- a/tests/worker/abstract-worker.test.mjs +++ b/tests/worker/abstract-worker.test.mjs @@ -41,10 +41,14 @@ describe('Abstract worker test suite', () => { new TypeError("killBehavior option '0' is not valid") ) expect(() => new ThreadWorker(() => {}, { maxInactiveTime: '' })).toThrow( - new TypeError('maxInactiveTime option is not an integer') + new TypeError( + 'maxInactiveTime option is not a positive integer greater or equal than 5' + ) ) expect(() => new ThreadWorker(() => {}, { maxInactiveTime: 0.5 })).toThrow( - new TypeError('maxInactiveTime option is not an integer') + new TypeError( + 'maxInactiveTime option is not a positive integer greater or equal than 5' + ) ) expect(() => new ThreadWorker(() => {}, { maxInactiveTime: 0 })).toThrow( new TypeError( @@ -155,7 +159,7 @@ describe('Abstract worker test suite', () => { } const fn2 = '' expect(() => new ThreadWorker({ '': fn1 })).toThrow( - new TypeError('A taskFunctions parameter object key is an empty string') + new TypeError('name parameter is an empty string') ) expect(() => new ThreadWorker({ fn1, fn2 })).toThrow( new TypeError( -- 2.43.0