aboutsummaryrefslogtreecommitdiffstats
path: root/src/main
diff options
context:
space:
mode:
Diffstat (limited to 'src/main')
-rw-r--r--src/main/java/org/onap/dcae/ApplicationSettings.java49
-rw-r--r--src/main/java/org/onap/dcae/CollectorSchemas.java74
-rw-r--r--src/main/java/org/onap/dcae/SchemaValidator.java77
-rw-r--r--src/main/java/org/onap/dcae/VesApplication.java2
-rw-r--r--src/main/java/org/onap/dcae/restapi/ServletConfig.java3
-rw-r--r--src/main/java/org/onap/dcae/restapi/VesRestController.java297
6 files changed, 182 insertions, 320 deletions
diff --git a/src/main/java/org/onap/dcae/ApplicationSettings.java b/src/main/java/org/onap/dcae/ApplicationSettings.java
index e4621849..865b0d14 100644
--- a/src/main/java/org/onap/dcae/ApplicationSettings.java
+++ b/src/main/java/org/onap/dcae/ApplicationSettings.java
@@ -21,8 +21,14 @@
package org.onap.dcae;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.github.fge.jackson.JsonLoader;
+import com.github.fge.jsonschema.core.exceptions.ProcessingException;
+import com.github.fge.jsonschema.main.JsonSchema;
+import com.github.fge.jsonschema.main.JsonSchemaFactory;
import com.google.common.annotations.VisibleForTesting;
import io.vavr.Function1;
+import io.vavr.Tuple2;
import io.vavr.collection.HashMap;
import io.vavr.collection.List;
import io.vavr.collection.Map;
@@ -34,10 +40,14 @@ import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import javax.annotation.Nullable;
+import java.io.IOException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Base64;
+import static io.vavr.API.Tuple;
+import static java.lang.String.format;
+import static java.nio.file.Files.readAllBytes;
import static java.util.Arrays.stream;
/**
@@ -47,10 +57,12 @@ import static java.util.Arrays.stream;
@Component
public class ApplicationSettings {
- private static final Logger inlog = LoggerFactory.getLogger(ApplicationSettings.class);
+ private static final Logger log = LoggerFactory.getLogger(ApplicationSettings.class);
+ private static final String FALLBACK_VES_VERSION = "v5";
private final String appInvocationDir;
private final String configurationFileLocation;
private final PropertiesConfiguration properties = new PropertiesConfiguration();
+ private final Map<String, JsonSchema> loadedJsonSchemas;
public ApplicationSettings(String[] args, Function1<String[], Map<String, String>> argsParser) {
this(args, argsParser, System.getProperty("user.dir"));
@@ -63,13 +75,14 @@ public class ApplicationSettings {
configurationFileLocation = findOutConfigurationFileLocation(parsedArgs);
loadPropertiesFromFile();
parsedArgs.filterKeys(k -> !k.equals("c")).forEach(this::updateProperty);
+ loadedJsonSchemas = loadJsonSchemas();
}
private void loadPropertiesFromFile() {
try {
properties.load(configurationFileLocation);
} catch (ConfigurationException ex) {
- inlog.error("Cannot load properties cause:", ex);
+ log.error("Cannot load properties cause:", ex);
throw new RuntimeException(ex);
}
}
@@ -103,9 +116,28 @@ public class ApplicationSettings {
return properties.getInt("header.authflag", 0) > 0;
}
- public JSONObject jsonSchema() {
- return new JSONObject(
- properties.getString("collector.schema.file", "{\"v5\":\"./etc/CommonEventFormat_28.3.json\"}"));
+ public JsonSchema jsonSchema(String version) {
+ return loadedJsonSchemas.get(version)
+ .orElse(loadedJsonSchemas.get(FALLBACK_VES_VERSION))
+ .getOrElseThrow(() -> new IllegalStateException("No fallback schema present in application."));
+ }
+
+ private Map<String, JsonSchema> loadJsonSchemas() {
+ return jsonSchema().toMap().entrySet().stream()
+ .map(versionToFilePath -> readSchemaForVersion(versionToFilePath))
+ .collect(HashMap.collector());
+ }
+
+ private Tuple2<String, JsonSchema> readSchemaForVersion(java.util.Map.Entry<String, Object> versionToFilePath) {
+ try {
+ String schemaContent = new String(
+ readAllBytes(Paths.get(versionToFilePath.getValue().toString())));
+ JsonNode schemaNode = JsonLoader.fromString(schemaContent);
+ JsonSchema schema = JsonSchemaFactory.byDefault().getJsonSchema(schemaNode);
+ return Tuple(versionToFilePath.getKey(), schema);
+ } catch (IOException | ProcessingException e) {
+ throw new RuntimeException("Could not read schema from path: " + versionToFilePath.getValue(), e);
+ }
}
public int httpPort() {
@@ -157,6 +189,11 @@ public class ApplicationSettings {
}
}
+ private JSONObject jsonSchema() {
+ return new JSONObject(properties.getString("collector.schema.file",
+ format("{\"%s\":\"etc/CommonEventFormat_28.4.1.json\"}", FALLBACK_VES_VERSION)));
+ }
+
private Map<String, String[]> convertDMaaPStreamsPropertyToMap(String streamIdsProperty) {
java.util.HashMap<String, String[]> domainToStreamIdsMapping = new java.util.HashMap<>();
String[] topics = streamIdsProperty.split("\\|");
@@ -176,7 +213,7 @@ public class ApplicationSettings {
}
}
- public String prependWithUserDirOnRelative(String filePath) {
+ private String prependWithUserDirOnRelative(String filePath) {
if (!Paths.get(filePath).isAbsolute()) {
filePath = Paths.get(appInvocationDir, filePath).toString();
}
diff --git a/src/main/java/org/onap/dcae/CollectorSchemas.java b/src/main/java/org/onap/dcae/CollectorSchemas.java
deleted file mode 100644
index fc12b1f9..00000000
--- a/src/main/java/org/onap/dcae/CollectorSchemas.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * PROJECT
- * ================================================================================
- * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
- * Copyright (C) 2018 Nokia. All rights reserved.s
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.dcae;
-
-import static java.nio.file.Files.readAllBytes;
-import static java.util.stream.Collectors.toMap;
-
-import com.fasterxml.jackson.databind.JsonNode;
-import com.github.fge.jackson.JsonLoader;
-import com.github.fge.jsonschema.core.exceptions.ProcessingException;
-import com.github.fge.jsonschema.main.JsonSchema;
-import com.github.fge.jsonschema.main.JsonSchemaFactory;
-import java.io.IOException;
-import java.nio.file.Paths;
-import java.util.AbstractMap;
-import java.util.Map;
-import org.json.JSONObject;
-import org.onap.dcae.restapi.VesRestController;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Component;
-
-@Component
-public class CollectorSchemas {
-
- private static final Logger LOG = (Logger) LoggerFactory.getLogger(VesRestController.class);
-
- @Autowired
- private ApplicationSettings collectorProperties;
-
- //refactor is needed in next iteration
- public Map<String, JsonSchema> getJSONSchemasMap(String version) {
- JSONObject jsonObject = collectorProperties.jsonSchema();
- Map<String, JsonSchema> schemas = jsonObject.toMap().entrySet().stream().map(
- versionToFilePath -> {
- try {
- String schemaContent = new String(
- readAllBytes(Paths.get(versionToFilePath.getValue().toString())));
- JsonNode schemaNode = JsonLoader.fromString(schemaContent);
- JsonSchema schema = JsonSchemaFactory.byDefault().getJsonSchema(schemaNode);
- return new AbstractMap.SimpleEntry<>(versionToFilePath.getKey(), schema);
- } catch (IOException | ProcessingException e) {
- LOG.error("Could not read schema from path: " + versionToFilePath.getValue(), e);
- throw new RuntimeException(
- "Could not read schema from path: " + versionToFilePath.getValue(), e);
- }
- }
- ).collect(toMap(AbstractMap.SimpleEntry::getKey, AbstractMap.SimpleEntry::getValue));
- if (schemas.get(version) == null && collectorProperties.eventTransformingEnabled()) {
- LOG.error(String.format("Missing necessary %s JSON schema", version));
- throw new RuntimeException(String.format("Missing necessary %s JSON schema", version));
- }
- return schemas;
- }
-} \ No newline at end of file
diff --git a/src/main/java/org/onap/dcae/SchemaValidator.java b/src/main/java/org/onap/dcae/SchemaValidator.java
deleted file mode 100644
index e4b52cfb..00000000
--- a/src/main/java/org/onap/dcae/SchemaValidator.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * PROJECT
- * ================================================================================
- * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
- * Copyright (C) 2018 Nokia. All rights reserved.s
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.dcae;
-
-import com.fasterxml.jackson.core.JsonParseException;
-import com.fasterxml.jackson.databind.JsonNode;
-import com.github.fge.jackson.JsonLoader;
-import com.github.fge.jsonschema.core.exceptions.ProcessingException;
-import com.github.fge.jsonschema.core.report.ProcessingMessage;
-import com.github.fge.jsonschema.core.report.ProcessingReport;
-import com.github.fge.jsonschema.main.JsonSchema;
-import com.github.fge.jsonschema.main.JsonSchemaFactory;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-
-public class SchemaValidator {
-
- private static final Logger log = LoggerFactory.getLogger(SchemaValidator.class);
-
- //refactor in next iteration
- public static String validateAgainstSchema(String jsonData, String jsonSchema) {
- ProcessingReport report;
- String result = "false";
-
- try {
- log.trace("Schema validation for event:" + jsonData);
- JsonNode schemaNode = JsonLoader.fromString(jsonSchema);
- JsonNode data = JsonLoader.fromString(jsonData);
- JsonSchemaFactory factory = JsonSchemaFactory.byDefault();
- JsonSchema schema = factory.getJsonSchema(schemaNode);
- report = schema.validate(data);
- } catch (JsonParseException e) {
- log.error("validateAgainstSchema:JsonParseException for event:" + jsonData);
- return e.getMessage();
- } catch (ProcessingException e) {
- log.error("validateAgainstSchema:Processing exception for event:" + jsonData);
- return e.getMessage();
- } catch (IOException e) {
- log.error(
- "validateAgainstSchema:IO exception; something went wrong trying to read json data for event:" + jsonData);
- return e.getMessage();
- }
- if (report != null) {
- for (ProcessingMessage pm : report) {
- log.trace("Processing Message: " + pm.getMessage());
- }
- result = String.valueOf(report.isSuccess());
- }
- try {
- log.debug("Validation Result:" + result + " Validation report:" + report);
- } catch (NullPointerException e) {
- log.error("validateAgainstSchema:NullpointerException on report");
- }
- return result;
- }
-}
diff --git a/src/main/java/org/onap/dcae/VesApplication.java b/src/main/java/org/onap/dcae/VesApplication.java
index 7eea0eb0..d9d12e35 100644
--- a/src/main/java/org/onap/dcae/VesApplication.java
+++ b/src/main/java/org/onap/dcae/VesApplication.java
@@ -109,7 +109,7 @@ public class VesApplication {
}
@Bean
- @Qualifier("metriclog")
+ @Qualifier("metricsLog")
public Logger incomingRequestsMetricsLogger() {
return metriclog;
}
diff --git a/src/main/java/org/onap/dcae/restapi/ServletConfig.java b/src/main/java/org/onap/dcae/restapi/ServletConfig.java
index e8efa375..871904c6 100644
--- a/src/main/java/org/onap/dcae/restapi/ServletConfig.java
+++ b/src/main/java/org/onap/dcae/restapi/ServletConfig.java
@@ -22,7 +22,6 @@
package org.onap.dcae.restapi;
import org.onap.dcae.ApplicationSettings;
-import org.onap.dcae.SchemaValidator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@@ -39,7 +38,7 @@ import static java.nio.file.Files.readAllBytes;
@Component
public class ServletConfig implements WebServerFactoryCustomizer<ConfigurableServletWebServerFactory> {
- private static final Logger log = LoggerFactory.getLogger(SchemaValidator.class);
+ private static final Logger log = LoggerFactory.getLogger(ServletConfig.class);
@Autowired
private ApplicationSettings properties;
diff --git a/src/main/java/org/onap/dcae/restapi/VesRestController.java b/src/main/java/org/onap/dcae/restapi/VesRestController.java
index 58b52765..21fa685c 100644
--- a/src/main/java/org/onap/dcae/restapi/VesRestController.java
+++ b/src/main/java/org/onap/dcae/restapi/VesRestController.java
@@ -21,9 +21,8 @@
package org.onap.dcae.restapi;
-import static java.util.Optional.ofNullable;
import static java.util.stream.StreamSupport.stream;
-import static org.springframework.http.ResponseEntity.ok;
+import static org.springframework.http.ResponseEntity.accepted;
import com.att.nsa.clock.SaClock;
import com.att.nsa.logging.LoggingContext;
@@ -39,7 +38,6 @@ import javax.servlet.http.HttpServletRequest;
import org.json.JSONArray;
import org.json.JSONObject;
import org.onap.dcae.ApplicationSettings;
-import org.onap.dcae.CollectorSchemas;
import org.onap.dcae.commonFunction.VESLogger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -54,173 +52,152 @@ import org.springframework.web.bind.annotation.RestController;
@RestController
public class VesRestController {
+ private static final Logger log = LoggerFactory.getLogger(VesRestController.class);
+
+ private final ApplicationSettings applicationSettings;
+ private final LinkedBlockingQueue<JSONObject> inputQueue;
+
+ private final Logger metricsLog;
+ private final Logger errorLog;
+ private final Logger incomingRequestsLogger;
+
+ @Autowired
+ VesRestController(ApplicationSettings applicationSettings,
+ @Qualifier("metricsLog") Logger metricsLog,
+ @Qualifier("errorLog") Logger errorLog,
+ @Qualifier("incomingRequestsLogger") Logger incomingRequestsLogger,
+ @Qualifier("inputQueue") LinkedBlockingQueue<JSONObject> inputQueue) {
+ this.applicationSettings = applicationSettings;
+ this.metricsLog = metricsLog;
+ this.errorLog = errorLog;
+ this.incomingRequestsLogger = incomingRequestsLogger;
+ this.inputQueue = inputQueue;
+ }
- private static final Logger LOG = LoggerFactory.getLogger(VesRestController.class);
-
- private static final String FALLBACK_VES_VERSION = "v5";
-
- @Autowired private ApplicationSettings collectorProperties;
-
- @Autowired private CollectorSchemas schemas;
-
- @Autowired
- @Qualifier("metriclog")
- private Logger metriclog;
-
- @Autowired
- @Qualifier("incomingRequestsLogger")
- private Logger incomingRequestsLogger;
-
- @Autowired
- @Qualifier("errorLog")
- private Logger errorLog;
-
- private LinkedBlockingQueue<JSONObject> inputQueue;
- private String version;
-
- @Autowired
- VesRestController(
- @Qualifier("incomingRequestsLogger") Logger incomingRequestsLogger,
- @Qualifier("inputQueue") LinkedBlockingQueue<JSONObject> inputQueue) {
- this.incomingRequestsLogger = incomingRequestsLogger;
- this.inputQueue = inputQueue;
- }
-
- @GetMapping("/")
- String mainPage() {
- return "Welcome to VESCollector";
- }
-
- // refactor in next iteration
- @PostMapping(
- value = {
- "/eventListener/v1",
- "/eventListener/v1/eventBatch",
- "/eventListener/v2",
- "/eventListener/v2/eventBatch",
- "/eventListener/v3",
- "/eventListener/v3/eventBatch",
- "/eventListener/v4",
- "/eventListener/v4/eventBatch",
- "/eventListener/v5",
- "/eventListener/v5/eventBatch",
- "/eventListener/v7",
- "/eventListener/v7/eventBatch"
- },
- consumes = "application/json")
- ResponseEntity<String> receiveEvent(
- @RequestBody String jsonPayload, HttpServletRequest httpServletRequest) {
- String request = httpServletRequest.getRequestURI();
- extractVersion(request);
-
- JSONObject jsonObject;
- try {
- jsonObject = new JSONObject(jsonPayload);
- } catch (Exception e) {
- return ResponseEntity.badRequest().body(ApiException.INVALID_JSON_INPUT.toJSON().toString());
+ @GetMapping("/")
+ String mainPage() {
+ return "Welcome to VESCollector";
}
- String uuid = setUpECOMPLoggingForRequest();
- incomingRequestsLogger.info(
- String.format(
- "Received a VESEvent '%s', marked with unique identifier '%s', on api version '%s', from host: '%s'",
- jsonObject, uuid, version, httpServletRequest.getRemoteHost()));
+ //refactor in next iteration
+ @PostMapping(value = {"/eventListener/v1",
+ "/eventListener/v1/eventBatch",
+ "/eventListener/v2",
+ "/eventListener/v2/eventBatch",
+ "/eventListener/v3",
+ "/eventListener/v3/eventBatch",
+ "/eventListener/v4",
+ "/eventListener/v4/eventBatch",
+ "/eventListener/v5",
+ "/eventListener/v5/eventBatch",
+ "/eventListener/v7",
+ "/eventListener/v7/eventBatch"}, consumes = "application/json")
+ ResponseEntity<String> receiveEvent(@RequestBody String jsonPayload, HttpServletRequest httpServletRequest) {
+ String request = httpServletRequest.getRequestURI();
+ String version = extractVersion(request);
+
+ JSONObject jsonObject;
+ try {
+ jsonObject = new JSONObject(jsonPayload);
+ } catch (Exception e) {
+ return ResponseEntity.badRequest().body(ApiException.INVALID_JSON_INPUT.toJSON().toString());
+ }
- if (collectorProperties.jsonSchemaValidationEnabled()) {
- if (isBatchRequest(request) && (jsonObject.has("eventList") && (!jsonObject.has("event")))) {
- if (!conformsToSchema(jsonObject, version)) {
- return errorResponse(ApiException.SCHEMA_VALIDATION_FAILED);
+ String uuid = setUpECOMPLoggingForRequest();
+ incomingRequestsLogger.info(String.format(
+ "Received a VESEvent '%s', marked with unique identifier '%s', on api version '%s', from host: '%s'",
+ jsonObject, uuid, version, httpServletRequest.getRemoteHost()));
+
+ if (applicationSettings.jsonSchemaValidationEnabled()) {
+ if (isBatchRequest(request) && (jsonObject.has("eventList") && (!jsonObject.has("event")))) {
+ if (!conformsToSchema(jsonObject, version)) {
+ return errorResponse(ApiException.SCHEMA_VALIDATION_FAILED);
+ }
+ } else if (!isBatchRequest(request) && (!jsonObject.has("eventList") && (jsonObject.has("event")))) {
+ if (!conformsToSchema(jsonObject, version)) {
+ return errorResponse(ApiException.SCHEMA_VALIDATION_FAILED);
+ }
+ } else {
+ return errorResponse(ApiException.INVALID_JSON_INPUT);
+ }
}
- } else if (!isBatchRequest(request)
- && (!jsonObject.has("eventList") && (jsonObject.has("event")))) {
- if (!conformsToSchema(jsonObject, version)) {
- return errorResponse(ApiException.SCHEMA_VALIDATION_FAILED);
+
+ JSONArray commonlyFormatted = convertToJSONArrayCommonFormat(jsonObject, request, uuid, version);
+
+ if (!putEventsOnProcessingQueue(commonlyFormatted)) {
+ errorLog.error("EVENT_RECEIPT_FAILURE: QueueFull " + ApiException.NO_SERVER_RESOURCES);
+ return errorResponse(ApiException.NO_SERVER_RESOURCES);
}
- } else {
- return errorResponse(ApiException.INVALID_JSON_INPUT);
- }
+ return accepted()
+ .contentType(MediaType.APPLICATION_JSON)
+ .body("Accepted");
}
- JSONArray commonlyFormatted =
- convertToJSONArrayCommonFormat(jsonObject, request, uuid, version);
+ private String extractVersion(String httpServletRequest) {
+ return httpServletRequest.split("/")[2];
+ }
- if (!putEventsOnProcessingQueue(commonlyFormatted)) {
- errorLog.error("EVENT_RECEIPT_FAILURE: QueueFull " + ApiException.NO_SERVER_RESOURCES);
- return errorResponse(ApiException.NO_SERVER_RESOURCES);
+ private ResponseEntity<String> errorResponse(ApiException noServerResources) {
+ return ResponseEntity.status(noServerResources.httpStatusCode)
+ .body(noServerResources.toJSON().toString());
}
- // HttpStatus.SC_NO_CONTENT
- return org.springframework.http.ResponseEntity.accepted()
- .contentType(MediaType.APPLICATION_JSON)
- .body("Accepted");
- }
-
- private void extractVersion(String httpServletRequest) {
- version = httpServletRequest.split("/")[2];
- }
-
- private ResponseEntity<String> errorResponse(ApiException noServerResources) {
- return ResponseEntity.status(noServerResources.httpStatusCode)
- .body(noServerResources.toJSON().toString());
- }
-
- private boolean putEventsOnProcessingQueue(JSONArray arrayOfEvents) {
- for (int i = 0; i < arrayOfEvents.length(); i++) {
- metriclog.info("EVENT_PUBLISH_START");
- if (!inputQueue.offer((JSONObject) arrayOfEvents.get(i))) {
- return false;
- }
+
+ private boolean putEventsOnProcessingQueue(JSONArray arrayOfEvents) {
+ for (int i = 0; i < arrayOfEvents.length(); i++) {
+ metricsLog.info("EVENT_PUBLISH_START");
+ if (!inputQueue.offer((JSONObject) arrayOfEvents.get(i))) {
+ return false;
+ }
+ }
+ log.debug("CommonStartup.handleEvents:EVENTS has been published successfully!");
+ metricsLog.info("EVENT_PUBLISH_END");
+ return true;
}
- LOG.debug("CommonStartup.handleEvents:EVENTS has been published successfully!");
- metriclog.info("EVENT_PUBLISH_END");
- return true;
- }
-
- private boolean conformsToSchema(JSONObject payload, String version) {
- try {
- JsonSchema schema =
- ofNullable(schemas.getJSONSchemasMap(version).get(version))
- .orElse(schemas.getJSONSchemasMap(version).get(FALLBACK_VES_VERSION));
- ProcessingReport report = schema.validate(JsonLoader.fromString(payload.toString()));
- if (!report.isSuccess()) {
- LOG.warn("Schema validation failed for event: " + payload);
- stream(report.spliterator(), false).forEach(e -> LOG.warn(e.getMessage()));
- return false;
- }
- return report.isSuccess();
- } catch (Exception e) {
- throw new RuntimeException("Unable to validate against schema", e);
+
+ private boolean conformsToSchema(JSONObject payload, String version) {
+ try {
+ JsonSchema schema = applicationSettings.jsonSchema(version);
+ ProcessingReport report = schema.validate(JsonLoader.fromString(payload.toString()));
+ if (!report.isSuccess()) {
+ log.warn("Schema validation failed for event: " + payload);
+ stream(report.spliterator(), false).forEach(e -> log.warn(e.getMessage()));
+ return false;
+ }
+ return report.isSuccess();
+ } catch (Exception e) {
+ throw new RuntimeException("Unable to validate against schema", e);
+ }
+ }
+
+ private static JSONArray convertToJSONArrayCommonFormat(JSONObject jsonObject, String request,
+ String uuid, String version) {
+ JSONArray asArrayEvents = new JSONArray();
+ String vesUniqueIdKey = "VESuniqueId";
+ String vesVersionKey = "VESversion";
+ if (isBatchRequest(request)) {
+ JSONArray events = jsonObject.getJSONArray("eventList");
+ for (int i = 0; i < events.length(); i++) {
+ JSONObject event = new JSONObject().put("event", events.getJSONObject(i));
+ event.put(vesUniqueIdKey, uuid + "-" + i);
+ event.put(vesVersionKey, version);
+ asArrayEvents.put(event);
+ }
+ } else {
+ jsonObject.put(vesUniqueIdKey, uuid);
+ jsonObject.put(vesVersionKey, version);
+ asArrayEvents = new JSONArray().put(jsonObject);
+ }
+ return asArrayEvents;
}
- }
-
- private static JSONArray convertToJSONArrayCommonFormat(
- JSONObject jsonObject, String request, String uuid, String version) {
- JSONArray asArrayEvents = new JSONArray();
- String vesUniqueIdKey = "VESuniqueId";
- String vesVersionKey = "VESversion";
- if (isBatchRequest(request)) {
- JSONArray events = jsonObject.getJSONArray("eventList");
- for (int i = 0; i < events.length(); i++) {
- JSONObject event = new JSONObject().put("event", events.getJSONObject(i));
- event.put(vesUniqueIdKey, uuid + "-" + i);
- event.put(vesVersionKey, version);
- asArrayEvents.put(event);
- }
- } else {
- jsonObject.put(vesUniqueIdKey, uuid);
- jsonObject.put(vesVersionKey, version);
- asArrayEvents = new JSONArray().put(jsonObject);
+
+ private static String setUpECOMPLoggingForRequest() {
+ final UUID uuid = UUID.randomUUID();
+ LoggingContext localLC = VESLogger.getLoggingContextForThread(uuid);
+ localLC.put(EcompFields.kBeginTimestampMs, SaClock.now());
+ return uuid.toString();
+ }
+
+ private static boolean isBatchRequest(String request) {
+ return request.contains("eventBatch");
}
- return asArrayEvents;
- }
-
- private static String setUpECOMPLoggingForRequest() {
- final UUID uuid = UUID.randomUUID();
- LoggingContext localLC = VESLogger.getLoggingContextForThread(uuid);
- localLC.put(EcompFields.kBeginTimestampMs, SaClock.now());
- return uuid.toString();
- }
-
- private static boolean isBatchRequest(String request) {
- return request.contains("eventBatch");
- }
-}
+} \ No newline at end of file