e00e2b99f9a2bbbddbdca10a6ae1427ffe982d2c
[e-mobility-charging-stations-simulator.git] / src / charging-station / ocpp / OCPPRequestService.ts
1 import { parentPort } from 'worker_threads';
2
3 import type { JSONSchemaType } from 'ajv';
4 import Ajv from 'ajv-draft-04';
5 import ajvFormats from 'ajv-formats';
6
7 import OCPPError from '../../exception/OCPPError';
8 import PerformanceStatistics from '../../performance/PerformanceStatistics';
9 import type { EmptyObject } from '../../types/EmptyObject';
10 import type { HandleErrorParams } from '../../types/Error';
11 import type { JsonObject, JsonType } from '../../types/JsonType';
12 import { ErrorType } from '../../types/ocpp/ErrorType';
13 import { MessageType } from '../../types/ocpp/MessageType';
14 import {
15 IncomingRequestCommand,
16 OutgoingRequest,
17 RequestCommand,
18 RequestParams,
19 ResponseType,
20 } from '../../types/ocpp/Requests';
21 import type { ErrorResponse, Response } from '../../types/ocpp/Responses';
22 import Constants from '../../utils/Constants';
23 import logger from '../../utils/Logger';
24 import Utils from '../../utils/Utils';
25 import type ChargingStation from '../ChargingStation';
26 import { MessageChannelUtils } from '../MessageChannelUtils';
27 import type OCPPResponseService from './OCPPResponseService';
28 import { OCPPServiceUtils } from './OCPPServiceUtils';
29
30 const moduleName = 'OCPPRequestService';
31
32 export default abstract class OCPPRequestService {
33 private static instance: OCPPRequestService | null = null;
34 private ajv: Ajv;
35
36 private readonly ocppResponseService: OCPPResponseService;
37
38 protected constructor(ocppResponseService: OCPPResponseService) {
39 this.ocppResponseService = ocppResponseService;
40 this.ajv = new Ajv();
41 ajvFormats(this.ajv);
42 this.requestHandler.bind(this);
43 this.sendResponse.bind(this);
44 this.sendError.bind(this);
45 this.internalSendMessage.bind(this);
46 this.buildMessageToSend.bind(this);
47 this.validateRequestPayload.bind(this);
48 }
49
50 public static getInstance<T extends OCPPRequestService>(
51 this: new (ocppResponseService: OCPPResponseService) => T,
52 ocppResponseService: OCPPResponseService
53 ): T {
54 if (OCPPRequestService.instance === null) {
55 OCPPRequestService.instance = new this(ocppResponseService);
56 }
57 return OCPPRequestService.instance as T;
58 }
59
60 public async sendResponse(
61 chargingStation: ChargingStation,
62 messageId: string,
63 messagePayload: JsonType,
64 commandName: IncomingRequestCommand
65 ): Promise<ResponseType> {
66 try {
67 // Send response message
68 return await this.internalSendMessage(
69 chargingStation,
70 messageId,
71 messagePayload,
72 MessageType.CALL_RESULT_MESSAGE,
73 commandName
74 );
75 } catch (error) {
76 this.handleRequestError(chargingStation, commandName, error as Error);
77 }
78 }
79
80 public async sendError(
81 chargingStation: ChargingStation,
82 messageId: string,
83 ocppError: OCPPError,
84 commandName: RequestCommand | IncomingRequestCommand
85 ): Promise<ResponseType> {
86 try {
87 // Send error message
88 return await this.internalSendMessage(
89 chargingStation,
90 messageId,
91 ocppError,
92 MessageType.CALL_ERROR_MESSAGE,
93 commandName
94 );
95 } catch (error) {
96 this.handleRequestError(chargingStation, commandName, error as Error);
97 }
98 }
99
100 protected async sendMessage(
101 chargingStation: ChargingStation,
102 messageId: string,
103 messagePayload: JsonType,
104 commandName: RequestCommand,
105 params: RequestParams = {
106 skipBufferingOnError: false,
107 triggerMessage: false,
108 }
109 ): Promise<ResponseType> {
110 try {
111 return await this.internalSendMessage(
112 chargingStation,
113 messageId,
114 messagePayload,
115 MessageType.CALL_MESSAGE,
116 commandName,
117 params
118 );
119 } catch (error) {
120 this.handleRequestError(chargingStation, commandName, error as Error, { throwError: false });
121 }
122 }
123
124 protected validateRequestPayload<T extends JsonType>(
125 chargingStation: ChargingStation,
126 commandName: RequestCommand,
127 schema: JSONSchemaType<T>,
128 payload: T
129 ): boolean {
130 if (!chargingStation.getPayloadSchemaValidation()) {
131 return true;
132 }
133 const validate = this.ajv.compile(schema);
134 if (validate(payload)) {
135 return true;
136 }
137 logger.error(
138 `${chargingStation.logPrefix()} ${moduleName}.validateRequestPayload: Request PDU is invalid: %j`,
139 validate.errors
140 );
141 throw new OCPPError(
142 OCPPServiceUtils.ajvErrorsToErrorType(validate.errors),
143 'Request PDU is invalid',
144 commandName,
145 JSON.stringify(validate.errors, null, 2)
146 );
147 }
148
149 private async internalSendMessage(
150 chargingStation: ChargingStation,
151 messageId: string,
152 messagePayload: JsonType | OCPPError,
153 messageType: MessageType,
154 commandName?: RequestCommand | IncomingRequestCommand,
155 params: RequestParams = {
156 skipBufferingOnError: false,
157 triggerMessage: false,
158 }
159 ): Promise<ResponseType> {
160 if (
161 (chargingStation.isInUnknownState() && commandName === RequestCommand.BOOT_NOTIFICATION) ||
162 (!chargingStation.getOcppStrictCompliance() && chargingStation.isInUnknownState()) ||
163 chargingStation.isInAcceptedState() ||
164 (chargingStation.isInPendingState() &&
165 (params.triggerMessage || messageType === MessageType.CALL_RESULT_MESSAGE))
166 ) {
167 // eslint-disable-next-line @typescript-eslint/no-this-alias
168 const self = this;
169 // Send a message through wsConnection
170 return Utils.promiseWithTimeout(
171 new Promise((resolve, reject) => {
172 const messageToSend = this.buildMessageToSend(
173 chargingStation,
174 messageId,
175 messagePayload,
176 messageType,
177 commandName,
178 responseCallback,
179 errorCallback
180 );
181 if (chargingStation.getEnableStatistics()) {
182 chargingStation.performanceStatistics.addRequestStatistic(commandName, messageType);
183 }
184 // Check if wsConnection opened
185 if (chargingStation.isWebSocketConnectionOpened()) {
186 // Yes: Send Message
187 const beginId = PerformanceStatistics.beginMeasure(commandName);
188 // FIXME: Handle sending error
189 chargingStation.wsConnection.send(messageToSend);
190 PerformanceStatistics.endMeasure(commandName, beginId);
191 logger.debug(
192 `${chargingStation.logPrefix()} >> Command '${commandName}' sent ${this.getMessageTypeString(
193 messageType
194 )} payload: ${messageToSend}`
195 );
196 } else if (!params.skipBufferingOnError) {
197 // Buffer it
198 chargingStation.bufferMessage(messageToSend);
199 const ocppError = new OCPPError(
200 ErrorType.GENERIC_ERROR,
201 `WebSocket closed for buffered message id '${messageId}' with content '${messageToSend}'`,
202 commandName,
203 (messagePayload as JsonObject)?.details ?? {}
204 );
205 if (messageType === MessageType.CALL_MESSAGE) {
206 // Reject it but keep the request in the cache
207 return reject(ocppError);
208 }
209 return errorCallback(ocppError, false);
210 } else {
211 // Reject it
212 return errorCallback(
213 new OCPPError(
214 ErrorType.GENERIC_ERROR,
215 `WebSocket closed for non buffered message id '${messageId}' with content '${messageToSend}'`,
216 commandName,
217 (messagePayload as JsonObject)?.details ?? {}
218 ),
219 false
220 );
221 }
222 // Response?
223 if (messageType !== MessageType.CALL_MESSAGE) {
224 // Yes: send Ok
225 return resolve(messagePayload);
226 }
227
228 /**
229 * Function that will receive the request's response
230 *
231 * @param payload
232 * @param requestPayload
233 */
234 async function responseCallback(
235 payload: JsonType,
236 requestPayload: JsonType
237 ): Promise<void> {
238 if (chargingStation.getEnableStatistics()) {
239 chargingStation.performanceStatistics.addRequestStatistic(
240 commandName,
241 MessageType.CALL_RESULT_MESSAGE
242 );
243 }
244 // Handle the request's response
245 try {
246 await self.ocppResponseService.responseHandler(
247 chargingStation,
248 commandName as RequestCommand,
249 payload,
250 requestPayload
251 );
252 resolve(payload);
253 } catch (error) {
254 reject(error);
255 } finally {
256 chargingStation.requests.delete(messageId);
257 // parentPort.postMessage(MessageChannelUtils.buildUpdatedMessage(chargingStation));
258 }
259 }
260
261 /**
262 * Function that will receive the request's error response
263 *
264 * @param error
265 * @param requestStatistic
266 */
267 function errorCallback(error: OCPPError, requestStatistic = true): void {
268 if (requestStatistic && chargingStation.getEnableStatistics()) {
269 chargingStation.performanceStatistics.addRequestStatistic(
270 commandName,
271 MessageType.CALL_ERROR_MESSAGE
272 );
273 }
274 logger.error(
275 `${chargingStation.logPrefix()} Error occurred when calling command ${commandName} with message data ${JSON.stringify(
276 messagePayload
277 )}:`,
278 error
279 );
280 chargingStation.requests.delete(messageId);
281 // parentPort.postMessage(MessageChannelUtils.buildUpdatedMessage(chargingStation));
282 reject(error);
283 }
284 }),
285 Constants.OCPP_WEBSOCKET_TIMEOUT,
286 new OCPPError(
287 ErrorType.GENERIC_ERROR,
288 `Timeout for message id '${messageId}'`,
289 commandName,
290 (messagePayload as JsonObject)?.details ?? {}
291 ),
292 () => {
293 messageType === MessageType.CALL_MESSAGE && chargingStation.requests.delete(messageId);
294 }
295 );
296 }
297 throw new OCPPError(
298 ErrorType.SECURITY_ERROR,
299 `Cannot send command ${commandName} PDU when the charging station is in ${chargingStation.getRegistrationStatus()} state on the central server`,
300 commandName
301 );
302 }
303
304 private buildMessageToSend(
305 chargingStation: ChargingStation,
306 messageId: string,
307 messagePayload: JsonType | OCPPError,
308 messageType: MessageType,
309 commandName?: RequestCommand | IncomingRequestCommand,
310 responseCallback?: (payload: JsonType, requestPayload: JsonType) => Promise<void>,
311 errorCallback?: (error: OCPPError, requestStatistic?: boolean) => void
312 ): string {
313 let messageToSend: string;
314 // Type of message
315 switch (messageType) {
316 // Request
317 case MessageType.CALL_MESSAGE:
318 // Build request
319 chargingStation.requests.set(messageId, [
320 responseCallback,
321 errorCallback,
322 commandName,
323 messagePayload as JsonType,
324 ]);
325 messageToSend = JSON.stringify([
326 messageType,
327 messageId,
328 commandName,
329 messagePayload,
330 ] as OutgoingRequest);
331 break;
332 // Response
333 case MessageType.CALL_RESULT_MESSAGE:
334 // Build response
335 messageToSend = JSON.stringify([messageType, messageId, messagePayload] as Response);
336 break;
337 // Error Message
338 case MessageType.CALL_ERROR_MESSAGE:
339 // Build Error Message
340 messageToSend = JSON.stringify([
341 messageType,
342 messageId,
343 (messagePayload as OCPPError)?.code ?? ErrorType.GENERIC_ERROR,
344 (messagePayload as OCPPError)?.message ?? '',
345 (messagePayload as OCPPError)?.details ?? { commandName },
346 ] as ErrorResponse);
347 break;
348 }
349 return messageToSend;
350 }
351
352 private getMessageTypeString(messageType: MessageType): string {
353 switch (messageType) {
354 case MessageType.CALL_MESSAGE:
355 return 'request';
356 case MessageType.CALL_RESULT_MESSAGE:
357 return 'response';
358 case MessageType.CALL_ERROR_MESSAGE:
359 return 'error';
360 }
361 }
362
363 private handleRequestError(
364 chargingStation: ChargingStation,
365 commandName: RequestCommand | IncomingRequestCommand,
366 error: Error,
367 params: HandleErrorParams<EmptyObject> = { throwError: true }
368 ): void {
369 logger.error(`${chargingStation.logPrefix()} Request command ${commandName} error:`, error);
370 if (params?.throwError) {
371 throw error;
372 }
373 }
374
375 // eslint-disable-next-line @typescript-eslint/no-unused-vars
376 public abstract requestHandler<RequestType extends JsonType, ResponseType extends JsonType>(
377 chargingStation: ChargingStation,
378 commandName: RequestCommand,
379 commandParams?: JsonType,
380 params?: RequestParams
381 ): Promise<ResponseType>;
382 }