Optimize worker handlers calls by binding them to the current instance
[e-mobility-charging-stations-simulator.git] / src / charging-station / ocpp / OCPPRequestService.ts
1 import type { JSONSchemaType } from 'ajv';
2 import Ajv from 'ajv-draft-04';
3 import ajvFormats from 'ajv-formats';
4
5 import OCPPError from '../../exception/OCPPError';
6 import PerformanceStatistics from '../../performance/PerformanceStatistics';
7 import type { EmptyObject } from '../../types/EmptyObject';
8 import type { HandleErrorParams } from '../../types/Error';
9 import type { JsonObject, JsonType } from '../../types/JsonType';
10 import { ErrorType } from '../../types/ocpp/ErrorType';
11 import { MessageType } from '../../types/ocpp/MessageType';
12 import {
13 ErrorCallback,
14 IncomingRequestCommand,
15 OutgoingRequest,
16 RequestCommand,
17 RequestParams,
18 ResponseCallback,
19 ResponseType,
20 } from '../../types/ocpp/Requests';
21 import type { ErrorResponse, Response } from '../../types/ocpp/Responses';
22 import Constants from '../../utils/Constants';
23 import logger from '../../utils/Logger';
24 import Utils from '../../utils/Utils';
25 import type ChargingStation from '../ChargingStation';
26 import type OCPPResponseService from './OCPPResponseService';
27 import { OCPPServiceUtils } from './OCPPServiceUtils';
28
29 const moduleName = 'OCPPRequestService';
30
31 export default abstract class OCPPRequestService {
32 private static instance: OCPPRequestService | null = null;
33 private readonly ajv: Ajv;
34
35 private readonly ocppResponseService: OCPPResponseService;
36
37 protected constructor(ocppResponseService: OCPPResponseService) {
38 this.ocppResponseService = ocppResponseService;
39 this.ajv = new Ajv();
40 ajvFormats(this.ajv);
41 this.requestHandler.bind(this);
42 this.sendMessage.bind(this);
43 this.sendResponse.bind(this);
44 this.sendError.bind(this);
45 this.internalSendMessage.bind(this);
46 this.buildMessageToSend.bind(this);
47 this.validateRequestPayload.bind(this);
48 }
49
50 public static getInstance<T extends OCPPRequestService>(
51 this: new (ocppResponseService: OCPPResponseService) => T,
52 ocppResponseService: OCPPResponseService
53 ): T {
54 if (OCPPRequestService.instance === null) {
55 OCPPRequestService.instance = new this(ocppResponseService);
56 }
57 return OCPPRequestService.instance as T;
58 }
59
60 public async sendResponse(
61 chargingStation: ChargingStation,
62 messageId: string,
63 messagePayload: JsonType,
64 commandName: IncomingRequestCommand
65 ): Promise<ResponseType> {
66 try {
67 // Send response message
68 return await this.internalSendMessage(
69 chargingStation,
70 messageId,
71 messagePayload,
72 MessageType.CALL_RESULT_MESSAGE,
73 commandName
74 );
75 } catch (error) {
76 this.handleSendMessageError(chargingStation, commandName, error as Error, {
77 throwError: true,
78 });
79 }
80 }
81
82 public async sendError(
83 chargingStation: ChargingStation,
84 messageId: string,
85 ocppError: OCPPError,
86 commandName: RequestCommand | IncomingRequestCommand
87 ): Promise<ResponseType> {
88 try {
89 // Send error message
90 return await this.internalSendMessage(
91 chargingStation,
92 messageId,
93 ocppError,
94 MessageType.CALL_ERROR_MESSAGE,
95 commandName
96 );
97 } catch (error) {
98 this.handleSendMessageError(chargingStation, commandName, error as Error);
99 }
100 }
101
102 protected async sendMessage(
103 chargingStation: ChargingStation,
104 messageId: string,
105 messagePayload: JsonType,
106 commandName: RequestCommand,
107 params: RequestParams = {
108 skipBufferingOnError: false,
109 triggerMessage: false,
110 }
111 ): Promise<ResponseType> {
112 try {
113 return await this.internalSendMessage(
114 chargingStation,
115 messageId,
116 messagePayload,
117 MessageType.CALL_MESSAGE,
118 commandName,
119 params
120 );
121 } catch (error) {
122 this.handleSendMessageError(chargingStation, commandName, error as Error);
123 }
124 }
125
126 protected validateRequestPayload<T extends JsonType>(
127 chargingStation: ChargingStation,
128 commandName: RequestCommand,
129 schema: JSONSchemaType<T>,
130 payload: T
131 ): boolean {
132 if (chargingStation.getPayloadSchemaValidation() === false) {
133 return true;
134 }
135 const validate = this.ajv.compile(schema);
136 if (validate(payload)) {
137 return true;
138 }
139 logger.error(
140 `${chargingStation.logPrefix()} ${moduleName}.validateRequestPayload: Request PDU is invalid: %j`,
141 validate.errors
142 );
143 // OCPPError usage here is debatable: it's an error in the OCPP stack but not targeted to sendError().
144 throw new OCPPError(
145 OCPPServiceUtils.ajvErrorsToErrorType(validate.errors),
146 'Request PDU is invalid',
147 commandName,
148 JSON.stringify(validate.errors, null, 2)
149 );
150 }
151
152 private async internalSendMessage(
153 chargingStation: ChargingStation,
154 messageId: string,
155 messagePayload: JsonType | OCPPError,
156 messageType: MessageType,
157 commandName?: RequestCommand | IncomingRequestCommand,
158 params: RequestParams = {
159 skipBufferingOnError: false,
160 triggerMessage: false,
161 }
162 ): Promise<ResponseType> {
163 if (
164 (chargingStation.isInUnknownState() === true &&
165 commandName === RequestCommand.BOOT_NOTIFICATION) ||
166 (chargingStation.getOcppStrictCompliance() === false &&
167 chargingStation.isInUnknownState() === true) ||
168 chargingStation.isInAcceptedState() === true ||
169 (chargingStation.isInPendingState() === true &&
170 (params.triggerMessage === true || messageType === MessageType.CALL_RESULT_MESSAGE))
171 ) {
172 // eslint-disable-next-line @typescript-eslint/no-this-alias
173 const self = this;
174 // Send a message through wsConnection
175 return Utils.promiseWithTimeout(
176 new Promise((resolve, reject) => {
177 const messageToSend = this.buildMessageToSend(
178 chargingStation,
179 messageId,
180 messagePayload,
181 messageType,
182 commandName,
183 responseCallback,
184 errorCallback
185 );
186 if (chargingStation.getEnableStatistics() === true) {
187 chargingStation.performanceStatistics.addRequestStatistic(commandName, messageType);
188 }
189 // Check if wsConnection opened
190 if (chargingStation.isWebSocketConnectionOpened() === true) {
191 // Yes: Send Message
192 const beginId = PerformanceStatistics.beginMeasure(commandName);
193 // FIXME: Handle sending error
194 chargingStation.wsConnection.send(messageToSend);
195 PerformanceStatistics.endMeasure(commandName, beginId);
196 logger.debug(
197 `${chargingStation.logPrefix()} >> Command '${commandName}' sent ${this.getMessageTypeString(
198 messageType
199 )} payload: ${messageToSend}`
200 );
201 } else if (params.skipBufferingOnError === false) {
202 // Buffer it
203 chargingStation.bufferMessage(messageToSend);
204 const ocppError = new OCPPError(
205 ErrorType.GENERIC_ERROR,
206 `WebSocket closed for buffered message id '${messageId}' with content '${messageToSend}'`,
207 commandName,
208 (messagePayload as JsonObject)?.details ?? {}
209 );
210 if (messageType === MessageType.CALL_MESSAGE) {
211 // Reject it but keep the request in the cache
212 return reject(ocppError);
213 }
214 return errorCallback(ocppError, false);
215 } else {
216 // Reject it
217 return errorCallback(
218 new OCPPError(
219 ErrorType.GENERIC_ERROR,
220 `WebSocket closed for non buffered message id '${messageId}' with content '${messageToSend}'`,
221 commandName,
222 (messagePayload as JsonObject)?.details ?? {}
223 ),
224 false
225 );
226 }
227 // Response?
228 if (messageType !== MessageType.CALL_MESSAGE) {
229 // Yes: send Ok
230 return resolve(messagePayload);
231 }
232
233 /**
234 * Function that will receive the request's response
235 *
236 * @param payload -
237 * @param requestPayload -
238 */
239 function responseCallback(payload: JsonType, requestPayload: JsonType): void {
240 if (chargingStation.getEnableStatistics() === true) {
241 chargingStation.performanceStatistics.addRequestStatistic(
242 commandName,
243 MessageType.CALL_RESULT_MESSAGE
244 );
245 }
246 // Handle the request's response
247 self.ocppResponseService
248 .responseHandler(
249 chargingStation,
250 commandName as RequestCommand,
251 payload,
252 requestPayload
253 )
254 .then(() => {
255 resolve(payload);
256 })
257 .catch((error) => {
258 reject(error);
259 })
260 .finally(() => {
261 chargingStation.requests.delete(messageId);
262 });
263 }
264
265 /**
266 * Function that will receive the request's error response
267 *
268 * @param error -
269 * @param requestStatistic -
270 */
271 function errorCallback(error: OCPPError, requestStatistic = true): void {
272 if (requestStatistic === true && chargingStation.getEnableStatistics() === true) {
273 chargingStation.performanceStatistics.addRequestStatistic(
274 commandName,
275 MessageType.CALL_ERROR_MESSAGE
276 );
277 }
278 logger.error(
279 `${chargingStation.logPrefix()} Error occurred when calling command ${commandName} with message data ${JSON.stringify(
280 messagePayload
281 )}:`,
282 error
283 );
284 chargingStation.requests.delete(messageId);
285 reject(error);
286 }
287 }),
288 Constants.OCPP_WEBSOCKET_TIMEOUT,
289 new OCPPError(
290 ErrorType.GENERIC_ERROR,
291 `Timeout for message id '${messageId}'`,
292 commandName,
293 (messagePayload as JsonObject)?.details ?? {}
294 ),
295 () => {
296 messageType === MessageType.CALL_MESSAGE && chargingStation.requests.delete(messageId);
297 }
298 );
299 }
300 throw new OCPPError(
301 ErrorType.SECURITY_ERROR,
302 `Cannot send command ${commandName} PDU when the charging station is in ${chargingStation.getRegistrationStatus()} state on the central server`,
303 commandName
304 );
305 }
306
307 private buildMessageToSend(
308 chargingStation: ChargingStation,
309 messageId: string,
310 messagePayload: JsonType | OCPPError,
311 messageType: MessageType,
312 commandName?: RequestCommand | IncomingRequestCommand,
313 responseCallback?: ResponseCallback,
314 errorCallback?: ErrorCallback
315 ): string {
316 let messageToSend: string;
317 // Type of message
318 switch (messageType) {
319 // Request
320 case MessageType.CALL_MESSAGE:
321 // Build request
322 chargingStation.requests.set(messageId, [
323 responseCallback,
324 errorCallback,
325 commandName,
326 messagePayload as JsonType,
327 ]);
328 messageToSend = JSON.stringify([
329 messageType,
330 messageId,
331 commandName,
332 messagePayload,
333 ] as OutgoingRequest);
334 break;
335 // Response
336 case MessageType.CALL_RESULT_MESSAGE:
337 // Build response
338 messageToSend = JSON.stringify([messageType, messageId, messagePayload] as Response);
339 break;
340 // Error Message
341 case MessageType.CALL_ERROR_MESSAGE:
342 // Build Error Message
343 messageToSend = JSON.stringify([
344 messageType,
345 messageId,
346 (messagePayload as OCPPError)?.code ?? ErrorType.GENERIC_ERROR,
347 (messagePayload as OCPPError)?.message ?? '',
348 (messagePayload as OCPPError)?.details ?? { commandName },
349 ] as ErrorResponse);
350 break;
351 }
352 return messageToSend;
353 }
354
355 private getMessageTypeString(messageType: MessageType): string {
356 switch (messageType) {
357 case MessageType.CALL_MESSAGE:
358 return 'request';
359 case MessageType.CALL_RESULT_MESSAGE:
360 return 'response';
361 case MessageType.CALL_ERROR_MESSAGE:
362 return 'error';
363 }
364 }
365
366 private handleSendMessageError(
367 chargingStation: ChargingStation,
368 commandName: RequestCommand | IncomingRequestCommand,
369 error: Error,
370 params: HandleErrorParams<EmptyObject> = { throwError: false }
371 ): void {
372 logger.error(`${chargingStation.logPrefix()} Request command '${commandName}' error:`, error);
373 if (params?.throwError === true) {
374 throw error;
375 }
376 }
377
378 // eslint-disable-next-line @typescript-eslint/no-unused-vars
379 public abstract requestHandler<RequestType extends JsonType, ResponseType extends JsonType>(
380 chargingStation: ChargingStation,
381 commandName: RequestCommand,
382 commandParams?: JsonType,
383 params?: RequestParams
384 ): Promise<ResponseType>;
385 }