chore(deps-dev): bump tatami-ng to 0.6.0
[poolifier.git] / src / worker / thread-worker.ts
... / ...
CommitLineData
1import {
2 isMainThread,
3 type MessagePort,
4 parentPort,
5 threadId,
6} from 'node:worker_threads'
7
8import type { MessageValue } from '../utility-types.js'
9import type { TaskFunction, TaskFunctions } from './task-functions.js'
10import type { WorkerOptions } from './worker-options.js'
11
12import { AbstractWorker } from './abstract-worker.js'
13
14/**
15 * A thread worker used by a poolifier `ThreadPool`.
16 *
17 * When this worker is inactive for more than the given `maxInactiveTime`,
18 * it will send a termination request to its main thread.
19 *
20 * If you use a `DynamicThreadPool` the extra workers that were created will be terminated,
21 * but the minimum number of workers will be guaranteed.
22 * @typeParam Data - Type of data this worker receives from pool's execution. This can only be structured-cloneable data.
23 * @typeParam Response - Type of response the worker sends back to the main thread. This can only be structured-cloneable data.
24 * @author [Alessandro Pio Ardizio](https://github.com/pioardi)
25 * @since 0.0.1
26 */
27export class ThreadWorker<
28 Data = unknown,
29 Response = unknown
30> extends AbstractWorker<MessagePort, Data, Response> {
31 /** @inheritDoc */
32 protected readonly sendToMainWorker = (
33 message: MessageValue<Response>
34 ): void => {
35 this.port?.postMessage({
36 ...message,
37 workerId: this.id,
38 } satisfies MessageValue<Response>)
39 }
40
41 /**
42 * Message port used to communicate with the main worker.
43 */
44 private port?: MessagePort
45
46 /**
47 * Constructs a new poolifier thread worker.
48 * @param taskFunctions - Task function(s) processed by the worker when the pool's `execute` method is invoked.
49 * @param opts - Options for the worker.
50 */
51 public constructor (
52 taskFunctions: TaskFunction<Data, Response> | TaskFunctions<Data, Response>,
53 opts: WorkerOptions = {}
54 ) {
55 super(isMainThread, parentPort, taskFunctions, opts)
56 }
57
58 /**
59 * @inheritDoc
60 */
61 protected override handleError (error: Error | string): string {
62 return error as string
63 }
64
65 /** @inheritDoc */
66 protected override handleKillMessage (message: MessageValue<Data>): void {
67 super.handleKillMessage(message)
68 this.port?.unref()
69 this.port?.close()
70 }
71
72 /** @inheritDoc */
73 protected handleReadyMessage (message: MessageValue<Data>): void {
74 if (
75 message.workerId === this.id &&
76 message.ready === false &&
77 message.port != null
78 ) {
79 try {
80 this.port = message.port
81 this.port.on('message', this.messageListener.bind(this))
82 this.sendToMainWorker({
83 ready: true,
84 taskFunctionsProperties: this.listTaskFunctionsProperties(),
85 })
86 } catch {
87 this.sendToMainWorker({
88 ready: false,
89 taskFunctionsProperties: this.listTaskFunctionsProperties(),
90 })
91 }
92 }
93 }
94
95 /** @inheritDoc */
96 protected get id (): number {
97 return threadId
98 }
99}