Fix some format issues in MD files (#131)
[poolifier.git] / src / pools / thread / fixed.ts
CommitLineData
fa699c42 1import { isMainThread, MessageChannel, SHARE_ENV, Worker } from 'worker_threads'
325f50bc 2import type { Draft, MessageValue } from '../../utility-types'
c97c7edb
S
3import type { PoolOptions } from '../abstract-pool'
4import { AbstractPool } from '../abstract-pool'
4ade5f1f 5
c97c7edb 6export type ThreadWorkerWithMessageChannel = Worker & Draft<MessageChannel>
4ade5f1f
S
7
8/**
9 * A thread pool with a static number of threads, is possible to execute tasks in sync or async mode as you prefer.
10 *
11 * This pool will select the worker thread in a round robin fashion.
12 *
13 * @author [Alessandro Pio Ardizio](https://github.com/pioardi)
14 * @since 0.0.1
15 */
777b7824 16// eslint-disable-next-line @typescript-eslint/no-explicit-any
c97c7edb
S
17export class FixedThreadPool<Data = any, Response = any> extends AbstractPool<
18 ThreadWorkerWithMessageChannel,
19 Data,
20 Response
21> {
4ade5f1f
S
22 /**
23 * @param numThreads Num of threads for this worker pool.
24 * @param filePath A file path with implementation of `ThreadWorker` class, relative path is fine.
25 * @param opts An object with possible options for example `errorHandler`, `onlineHandler`. Default: `{ maxTasks: 1000 }`
26 */
27 public constructor (
c97c7edb
S
28 numThreads: number,
29 filePath: string,
30 opts: PoolOptions<ThreadWorkerWithMessageChannel> = { maxTasks: 1000 }
4ade5f1f 31 ) {
c97c7edb
S
32 super(numThreads, filePath, opts)
33 }
4ade5f1f 34
c97c7edb
S
35 protected isMain (): boolean {
36 return isMainThread
4ade5f1f
S
37 }
38
c97c7edb
S
39 protected async destroyWorker (
40 worker: ThreadWorkerWithMessageChannel
41 ): Promise<void> {
42 await worker.terminate()
4ade5f1f
S
43 }
44
c97c7edb
S
45 protected sendToWorker (
46 worker: ThreadWorkerWithMessageChannel,
47 message: MessageValue<Data>
48 ): void {
49 worker.postMessage(message)
4ade5f1f
S
50 }
51
c97c7edb
S
52 protected registerWorkerMessageListener (
53 port: ThreadWorkerWithMessageChannel,
54 listener: (message: MessageValue<Response>) => void
55 ): void {
56 port.port2?.on('message', listener)
4ade5f1f
S
57 }
58
c97c7edb
S
59 protected unregisterWorkerMessageListener (
60 port: ThreadWorkerWithMessageChannel,
61 listener: (message: MessageValue<Response>) => void
62 ): void {
63 port.port2?.removeListener('message', listener)
4ade5f1f
S
64 }
65
c97c7edb
S
66 protected newWorker (): ThreadWorkerWithMessageChannel {
67 return new Worker(this.filePath, {
4ade5f1f
S
68 env: SHARE_ENV
69 })
c97c7edb
S
70 }
71
72 protected afterNewWorkerPushed (
73 worker: ThreadWorkerWithMessageChannel
74 ): void {
4ade5f1f
S
75 const { port1, port2 } = new MessageChannel()
76 worker.postMessage({ parent: port1 }, [port1])
77 worker.port1 = port1
78 worker.port2 = port2
79 // we will attach a listener for every task,
80 // when task is completed the listener will be removed but to avoid warnings we are increasing the max listeners size
ee99693b 81 worker.port2.setMaxListeners(this.opts.maxTasks ?? 1000)
4ade5f1f
S
82 }
83}