Simplify worker choosing (#138)
[poolifier.git] / src / pools / thread / fixed.ts
CommitLineData
fa699c42 1import { isMainThread, MessageChannel, SHARE_ENV, Worker } from 'worker_threads'
d3c8a1a8 2import type { Draft, JSONValue, MessageValue } from '../../utility-types'
c97c7edb
S
3import type { PoolOptions } from '../abstract-pool'
4import { AbstractPool } from '../abstract-pool'
4ade5f1f 5
c97c7edb 6export type ThreadWorkerWithMessageChannel = Worker & Draft<MessageChannel>
4ade5f1f
S
7
8/**
9 * A thread pool with a static number of threads, is possible to execute tasks in sync or async mode as you prefer.
10 *
11 * This pool will select the worker thread in a round robin fashion.
12 *
13 * @author [Alessandro Pio Ardizio](https://github.com/pioardi)
14 * @since 0.0.1
15 */
d3c8a1a8
S
16export class FixedThreadPool<
17 Data extends JSONValue = JSONValue,
18 Response extends JSONValue = JSONValue
19> extends AbstractPool<ThreadWorkerWithMessageChannel, Data, Response> {
4ade5f1f
S
20 /**
21 * @param numThreads Num of threads for this worker pool.
22 * @param filePath A file path with implementation of `ThreadWorker` class, relative path is fine.
23 * @param opts An object with possible options for example `errorHandler`, `onlineHandler`. Default: `{ maxTasks: 1000 }`
24 */
25 public constructor (
c97c7edb
S
26 numThreads: number,
27 filePath: string,
28 opts: PoolOptions<ThreadWorkerWithMessageChannel> = { maxTasks: 1000 }
4ade5f1f 29 ) {
c97c7edb
S
30 super(numThreads, filePath, opts)
31 }
4ade5f1f 32
c97c7edb
S
33 protected isMain (): boolean {
34 return isMainThread
4ade5f1f
S
35 }
36
c97c7edb
S
37 protected async destroyWorker (
38 worker: ThreadWorkerWithMessageChannel
39 ): Promise<void> {
40 await worker.terminate()
f2fdaa86 41 // FIXME: The tests are currently failing, so these must be changed first
4ade5f1f
S
42 }
43
c97c7edb
S
44 protected sendToWorker (
45 worker: ThreadWorkerWithMessageChannel,
46 message: MessageValue<Data>
47 ): void {
48 worker.postMessage(message)
4ade5f1f
S
49 }
50
c97c7edb
S
51 protected registerWorkerMessageListener (
52 port: ThreadWorkerWithMessageChannel,
53 listener: (message: MessageValue<Response>) => void
54 ): void {
55 port.port2?.on('message', listener)
4ade5f1f
S
56 }
57
c97c7edb
S
58 protected unregisterWorkerMessageListener (
59 port: ThreadWorkerWithMessageChannel,
60 listener: (message: MessageValue<Response>) => void
61 ): void {
62 port.port2?.removeListener('message', listener)
4ade5f1f
S
63 }
64
c97c7edb
S
65 protected newWorker (): ThreadWorkerWithMessageChannel {
66 return new Worker(this.filePath, {
4ade5f1f
S
67 env: SHARE_ENV
68 })
c97c7edb
S
69 }
70
71 protected afterNewWorkerPushed (
72 worker: ThreadWorkerWithMessageChannel
73 ): void {
4ade5f1f
S
74 const { port1, port2 } = new MessageChannel()
75 worker.postMessage({ parent: port1 }, [port1])
76 worker.port1 = port1
77 worker.port2 = port2
78 // we will attach a listener for every task,
79 // when task is completed the listener will be removed but to avoid warnings we are increasing the max listeners size
ee99693b 80 worker.port2.setMaxListeners(this.opts.maxTasks ?? 1000)
4ade5f1f
S
81 }
82}