summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorPawelSzalapski <pawel.szalapski@nokia.com>2018-08-08 11:01:03 +0200
committerPawelSzalapski <pawel.szalapski@nokia.com>2018-08-10 14:21:34 +0200
commite1ccd83e39090f16a446b23e154b8e28530e2fdd (patch)
treec38f54e6690f58438cb5e66fed2f6aad177b4bfc
parent5deddeb4892243627ad342a41d4dcef0f7280a29 (diff)
Refactor the code base a bit
Remove potential race condition coming from shared 'version' veriable in singleton instance of RestController. Move the logic behind reading the json schemas out of a on-request phase to application startup. Minor refactoring done that will bump up test coverage. Change-Id: I2ad1ba91dafafd785ede61591a4dc146abf6a1eb Signed-off-by: PawelSzalapski <pawel.szalapski@nokia.com> Issue-ID: DCAEGEN2-526
-rw-r--r--src/main/java/org/onap/dcae/ApplicationSettings.java49
-rw-r--r--src/main/java/org/onap/dcae/CollectorSchemas.java74
-rw-r--r--src/main/java/org/onap/dcae/SchemaValidator.java77
-rw-r--r--src/main/java/org/onap/dcae/VesApplication.java2
-rw-r--r--src/main/java/org/onap/dcae/restapi/ServletConfig.java3
-rw-r--r--src/main/java/org/onap/dcae/restapi/VesRestController.java297
-rw-r--r--src/test/java/org/onap/dcae/ApplicationSettingsTest.java32
-rw-r--r--src/test/java/org/onap/dcae/TestingUtilities.java1
-rw-r--r--src/test/java/org/onap/dcae/commonFunction/EventProcessorTest.java4
-rw-r--r--src/test/java/org/onap/dcae/vestest/TestJsonSchemaValidation.java83
-rw-r--r--src/test/java/org/onap/dcae/vestest/TestingUtilities.java93
11 files changed, 204 insertions, 511 deletions
diff --git a/src/main/java/org/onap/dcae/ApplicationSettings.java b/src/main/java/org/onap/dcae/ApplicationSettings.java
index e4621849..865b0d14 100644
--- a/src/main/java/org/onap/dcae/ApplicationSettings.java
+++ b/src/main/java/org/onap/dcae/ApplicationSettings.java
@@ -21,8 +21,14 @@
package org.onap.dcae;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.github.fge.jackson.JsonLoader;
+import com.github.fge.jsonschema.core.exceptions.ProcessingException;
+import com.github.fge.jsonschema.main.JsonSchema;
+import com.github.fge.jsonschema.main.JsonSchemaFactory;
import com.google.common.annotations.VisibleForTesting;
import io.vavr.Function1;
+import io.vavr.Tuple2;
import io.vavr.collection.HashMap;
import io.vavr.collection.List;
import io.vavr.collection.Map;
@@ -34,10 +40,14 @@ import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import javax.annotation.Nullable;
+import java.io.IOException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Base64;
+import static io.vavr.API.Tuple;
+import static java.lang.String.format;
+import static java.nio.file.Files.readAllBytes;
import static java.util.Arrays.stream;
/**
@@ -47,10 +57,12 @@ import static java.util.Arrays.stream;
@Component
public class ApplicationSettings {
- private static final Logger inlog = LoggerFactory.getLogger(ApplicationSettings.class);
+ private static final Logger log = LoggerFactory.getLogger(ApplicationSettings.class);
+ private static final String FALLBACK_VES_VERSION = "v5";
private final String appInvocationDir;
private final String configurationFileLocation;
private final PropertiesConfiguration properties = new PropertiesConfiguration();
+ private final Map<String, JsonSchema> loadedJsonSchemas;
public ApplicationSettings(String[] args, Function1<String[], Map<String, String>> argsParser) {
this(args, argsParser, System.getProperty("user.dir"));
@@ -63,13 +75,14 @@ public class ApplicationSettings {
configurationFileLocation = findOutConfigurationFileLocation(parsedArgs);
loadPropertiesFromFile();
parsedArgs.filterKeys(k -> !k.equals("c")).forEach(this::updateProperty);
+ loadedJsonSchemas = loadJsonSchemas();
}
private void loadPropertiesFromFile() {
try {
properties.load(configurationFileLocation);
} catch (ConfigurationException ex) {
- inlog.error("Cannot load properties cause:", ex);
+ log.error("Cannot load properties cause:", ex);
throw new RuntimeException(ex);
}
}
@@ -103,9 +116,28 @@ public class ApplicationSettings {
return properties.getInt("header.authflag", 0) > 0;
}
- public JSONObject jsonSchema() {
- return new JSONObject(
- properties.getString("collector.schema.file", "{\"v5\":\"./etc/CommonEventFormat_28.3.json\"}"));
+ public JsonSchema jsonSchema(String version) {
+ return loadedJsonSchemas.get(version)
+ .orElse(loadedJsonSchemas.get(FALLBACK_VES_VERSION))
+ .getOrElseThrow(() -> new IllegalStateException("No fallback schema present in application."));
+ }
+
+ private Map<String, JsonSchema> loadJsonSchemas() {
+ return jsonSchema().toMap().entrySet().stream()
+ .map(versionToFilePath -> readSchemaForVersion(versionToFilePath))
+ .collect(HashMap.collector());
+ }
+
+ private Tuple2<String, JsonSchema> readSchemaForVersion(java.util.Map.Entry<String, Object> versionToFilePath) {
+ try {
+ String schemaContent = new String(
+ readAllBytes(Paths.get(versionToFilePath.getValue().toString())));
+ JsonNode schemaNode = JsonLoader.fromString(schemaContent);
+ JsonSchema schema = JsonSchemaFactory.byDefault().getJsonSchema(schemaNode);
+ return Tuple(versionToFilePath.getKey(), schema);
+ } catch (IOException | ProcessingException e) {
+ throw new RuntimeException("Could not read schema from path: " + versionToFilePath.getValue(), e);
+ }
}
public int httpPort() {
@@ -157,6 +189,11 @@ public class ApplicationSettings {
}
}
+ private JSONObject jsonSchema() {
+ return new JSONObject(properties.getString("collector.schema.file",
+ format("{\"%s\":\"etc/CommonEventFormat_28.4.1.json\"}", FALLBACK_VES_VERSION)));
+ }
+
private Map<String, String[]> convertDMaaPStreamsPropertyToMap(String streamIdsProperty) {
java.util.HashMap<String, String[]> domainToStreamIdsMapping = new java.util.HashMap<>();
String[] topics = streamIdsProperty.split("\\|");
@@ -176,7 +213,7 @@ public class ApplicationSettings {
}
}
- public String prependWithUserDirOnRelative(String filePath) {
+ private String prependWithUserDirOnRelative(String filePath) {
if (!Paths.get(filePath).isAbsolute()) {
filePath = Paths.get(appInvocationDir, filePath).toString();
}
diff --git a/src/main/java/org/onap/dcae/CollectorSchemas.java b/src/main/java/org/onap/dcae/CollectorSchemas.java
deleted file mode 100644
index fc12b1f9..00000000
--- a/src/main/java/org/onap/dcae/CollectorSchemas.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * PROJECT
- * ================================================================================
- * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
- * Copyright (C) 2018 Nokia. All rights reserved.s
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.dcae;
-
-import static java.nio.file.Files.readAllBytes;
-import static java.util.stream.Collectors.toMap;
-
-import com.fasterxml.jackson.databind.JsonNode;
-import com.github.fge.jackson.JsonLoader;
-import com.github.fge.jsonschema.core.exceptions.ProcessingException;
-import com.github.fge.jsonschema.main.JsonSchema;
-import com.github.fge.jsonschema.main.JsonSchemaFactory;
-import java.io.IOException;
-import java.nio.file.Paths;
-import java.util.AbstractMap;
-import java.util.Map;
-import org.json.JSONObject;
-import org.onap.dcae.restapi.VesRestController;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Component;
-
-@Component
-public class CollectorSchemas {
-
- private static final Logger LOG = (Logger) LoggerFactory.getLogger(VesRestController.class);
-
- @Autowired
- private ApplicationSettings collectorProperties;
-
- //refactor is needed in next iteration
- public Map<String, JsonSchema> getJSONSchemasMap(String version) {
- JSONObject jsonObject = collectorProperties.jsonSchema();
- Map<String, JsonSchema> schemas = jsonObject.toMap().entrySet().stream().map(
- versionToFilePath -> {
- try {
- String schemaContent = new String(
- readAllBytes(Paths.get(versionToFilePath.getValue().toString())));
- JsonNode schemaNode = JsonLoader.fromString(schemaContent);
- JsonSchema schema = JsonSchemaFactory.byDefault().getJsonSchema(schemaNode);
- return new AbstractMap.SimpleEntry<>(versionToFilePath.getKey(), schema);
- } catch (IOException | ProcessingException e) {
- LOG.error("Could not read schema from path: " + versionToFilePath.getValue(), e);
- throw new RuntimeException(
- "Could not read schema from path: " + versionToFilePath.getValue(), e);
- }
- }
- ).collect(toMap(AbstractMap.SimpleEntry::getKey, AbstractMap.SimpleEntry::getValue));
- if (schemas.get(version) == null && collectorProperties.eventTransformingEnabled()) {
- LOG.error(String.format("Missing necessary %s JSON schema", version));
- throw new RuntimeException(String.format("Missing necessary %s JSON schema", version));
- }
- return schemas;
- }
-} \ No newline at end of file
diff --git a/src/main/java/org/onap/dcae/SchemaValidator.java b/src/main/java/org/onap/dcae/SchemaValidator.java
deleted file mode 100644
index e4b52cfb..00000000
--- a/src/main/java/org/onap/dcae/SchemaValidator.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * PROJECT
- * ================================================================================
- * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
- * Copyright (C) 2018 Nokia. All rights reserved.s
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.dcae;
-
-import com.fasterxml.jackson.core.JsonParseException;
-import com.fasterxml.jackson.databind.JsonNode;
-import com.github.fge.jackson.JsonLoader;
-import com.github.fge.jsonschema.core.exceptions.ProcessingException;
-import com.github.fge.jsonschema.core.report.ProcessingMessage;
-import com.github.fge.jsonschema.core.report.ProcessingReport;
-import com.github.fge.jsonschema.main.JsonSchema;
-import com.github.fge.jsonschema.main.JsonSchemaFactory;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-
-public class SchemaValidator {
-
- private static final Logger log = LoggerFactory.getLogger(SchemaValidator.class);
-
- //refactor in next iteration
- public static String validateAgainstSchema(String jsonData, String jsonSchema) {
- ProcessingReport report;
- String result = "false";
-
- try {
- log.trace("Schema validation for event:" + jsonData);
- JsonNode schemaNode = JsonLoader.fromString(jsonSchema);
- JsonNode data = JsonLoader.fromString(jsonData);
- JsonSchemaFactory factory = JsonSchemaFactory.byDefault();
- JsonSchema schema = factory.getJsonSchema(schemaNode);
- report = schema.validate(data);
- } catch (JsonParseException e) {
- log.error("validateAgainstSchema:JsonParseException for event:" + jsonData);
- return e.getMessage();
- } catch (ProcessingException e) {
- log.error("validateAgainstSchema:Processing exception for event:" + jsonData);
- return e.getMessage();
- } catch (IOException e) {
- log.error(
- "validateAgainstSchema:IO exception; something went wrong trying to read json data for event:" + jsonData);
- return e.getMessage();
- }
- if (report != null) {
- for (ProcessingMessage pm : report) {
- log.trace("Processing Message: " + pm.getMessage());
- }
- result = String.valueOf(report.isSuccess());
- }
- try {
- log.debug("Validation Result:" + result + " Validation report:" + report);
- } catch (NullPointerException e) {
- log.error("validateAgainstSchema:NullpointerException on report");
- }
- return result;
- }
-}
diff --git a/src/main/java/org/onap/dcae/VesApplication.java b/src/main/java/org/onap/dcae/VesApplication.java
index 7eea0eb0..d9d12e35 100644
--- a/src/main/java/org/onap/dcae/VesApplication.java
+++ b/src/main/java/org/onap/dcae/VesApplication.java
@@ -109,7 +109,7 @@ public class VesApplication {
}
@Bean
- @Qualifier("metriclog")
+ @Qualifier("metricsLog")
public Logger incomingRequestsMetricsLogger() {
return metriclog;
}
diff --git a/src/main/java/org/onap/dcae/restapi/ServletConfig.java b/src/main/java/org/onap/dcae/restapi/ServletConfig.java
index e8efa375..871904c6 100644
--- a/src/main/java/org/onap/dcae/restapi/ServletConfig.java
+++ b/src/main/java/org/onap/dcae/restapi/ServletConfig.java
@@ -22,7 +22,6 @@
package org.onap.dcae.restapi;
import org.onap.dcae.ApplicationSettings;
-import org.onap.dcae.SchemaValidator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@@ -39,7 +38,7 @@ import static java.nio.file.Files.readAllBytes;
@Component
public class ServletConfig implements WebServerFactoryCustomizer<ConfigurableServletWebServerFactory> {
- private static final Logger log = LoggerFactory.getLogger(SchemaValidator.class);
+ private static final Logger log = LoggerFactory.getLogger(ServletConfig.class);
@Autowired
private ApplicationSettings properties;
diff --git a/src/main/java/org/onap/dcae/restapi/VesRestController.java b/src/main/java/org/onap/dcae/restapi/VesRestController.java
index 58b52765..21fa685c 100644
--- a/src/main/java/org/onap/dcae/restapi/VesRestController.java
+++ b/src/main/java/org/onap/dcae/restapi/VesRestController.java
@@ -21,9 +21,8 @@
package org.onap.dcae.restapi;
-import static java.util.Optional.ofNullable;
import static java.util.stream.StreamSupport.stream;
-import static org.springframework.http.ResponseEntity.ok;
+import static org.springframework.http.ResponseEntity.accepted;
import com.att.nsa.clock.SaClock;
import com.att.nsa.logging.LoggingContext;
@@ -39,7 +38,6 @@ import javax.servlet.http.HttpServletRequest;
import org.json.JSONArray;
import org.json.JSONObject;
import org.onap.dcae.ApplicationSettings;
-import org.onap.dcae.CollectorSchemas;
import org.onap.dcae.commonFunction.VESLogger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -54,173 +52,152 @@ import org.springframework.web.bind.annotation.RestController;
@RestController
public class VesRestController {
+ private static final Logger log = LoggerFactory.getLogger(VesRestController.class);
+
+ private final ApplicationSettings applicationSettings;
+ private final LinkedBlockingQueue<JSONObject> inputQueue;
+
+ private final Logger metricsLog;
+ private final Logger errorLog;
+ private final Logger incomingRequestsLogger;
+
+ @Autowired
+ VesRestController(ApplicationSettings applicationSettings,
+ @Qualifier("metricsLog") Logger metricsLog,
+ @Qualifier("errorLog") Logger errorLog,
+ @Qualifier("incomingRequestsLogger") Logger incomingRequestsLogger,
+ @Qualifier("inputQueue") LinkedBlockingQueue<JSONObject> inputQueue) {
+ this.applicationSettings = applicationSettings;
+ this.metricsLog = metricsLog;
+ this.errorLog = errorLog;
+ this.incomingRequestsLogger = incomingRequestsLogger;
+ this.inputQueue = inputQueue;
+ }
- private static final Logger LOG = LoggerFactory.getLogger(VesRestController.class);
-
- private static final String FALLBACK_VES_VERSION = "v5";
-
- @Autowired private ApplicationSettings collectorProperties;
-
- @Autowired private CollectorSchemas schemas;
-
- @Autowired
- @Qualifier("metriclog")
- private Logger metriclog;
-
- @Autowired
- @Qualifier("incomingRequestsLogger")
- private Logger incomingRequestsLogger;
-
- @Autowired
- @Qualifier("errorLog")
- private Logger errorLog;
-
- private LinkedBlockingQueue<JSONObject> inputQueue;
- private String version;
-
- @Autowired
- VesRestController(
- @Qualifier("incomingRequestsLogger") Logger incomingRequestsLogger,
- @Qualifier("inputQueue") LinkedBlockingQueue<JSONObject> inputQueue) {
- this.incomingRequestsLogger = incomingRequestsLogger;
- this.inputQueue = inputQueue;
- }
-
- @GetMapping("/")
- String mainPage() {
- return "Welcome to VESCollector";
- }
-
- // refactor in next iteration
- @PostMapping(
- value = {
- "/eventListener/v1",
- "/eventListener/v1/eventBatch",
- "/eventListener/v2",
- "/eventListener/v2/eventBatch",
- "/eventListener/v3",
- "/eventListener/v3/eventBatch",
- "/eventListener/v4",
- "/eventListener/v4/eventBatch",
- "/eventListener/v5",
- "/eventListener/v5/eventBatch",
- "/eventListener/v7",
- "/eventListener/v7/eventBatch"
- },
- consumes = "application/json")
- ResponseEntity<String> receiveEvent(
- @RequestBody String jsonPayload, HttpServletRequest httpServletRequest) {
- String request = httpServletRequest.getRequestURI();
- extractVersion(request);
-
- JSONObject jsonObject;
- try {
- jsonObject = new JSONObject(jsonPayload);
- } catch (Exception e) {
- return ResponseEntity.badRequest().body(ApiException.INVALID_JSON_INPUT.toJSON().toString());
+ @GetMapping("/")
+ String mainPage() {
+ return "Welcome to VESCollector";
}
- String uuid = setUpECOMPLoggingForRequest();
- incomingRequestsLogger.info(
- String.format(
- "Received a VESEvent '%s', marked with unique identifier '%s', on api version '%s', from host: '%s'",
- jsonObject, uuid, version, httpServletRequest.getRemoteHost()));
+ //refactor in next iteration
+ @PostMapping(value = {"/eventListener/v1",
+ "/eventListener/v1/eventBatch",
+ "/eventListener/v2",
+ "/eventListener/v2/eventBatch",
+ "/eventListener/v3",
+ "/eventListener/v3/eventBatch",
+ "/eventListener/v4",
+ "/eventListener/v4/eventBatch",
+ "/eventListener/v5",
+ "/eventListener/v5/eventBatch",
+ "/eventListener/v7",
+ "/eventListener/v7/eventBatch"}, consumes = "application/json")
+ ResponseEntity<String> receiveEvent(@RequestBody String jsonPayload, HttpServletRequest httpServletRequest) {
+ String request = httpServletRequest.getRequestURI();
+ String version = extractVersion(request);
+
+ JSONObject jsonObject;
+ try {
+ jsonObject = new JSONObject(jsonPayload);
+ } catch (Exception e) {
+ return ResponseEntity.badRequest().body(ApiException.INVALID_JSON_INPUT.toJSON().toString());
+ }
- if (collectorProperties.jsonSchemaValidationEnabled()) {
- if (isBatchRequest(request) && (jsonObject.has("eventList") && (!jsonObject.has("event")))) {
- if (!conformsToSchema(jsonObject, version)) {
- return errorResponse(ApiException.SCHEMA_VALIDATION_FAILED);
+ String uuid = setUpECOMPLoggingForRequest();
+ incomingRequestsLogger.info(String.format(
+ "Received a VESEvent '%s', marked with unique identifier '%s', on api version '%s', from host: '%s'",
+ jsonObject, uuid, version, httpServletRequest.getRemoteHost()));
+
+ if (applicationSettings.jsonSchemaValidationEnabled()) {
+ if (isBatchRequest(request) && (jsonObject.has("eventList") && (!jsonObject.has("event")))) {
+ if (!conformsToSchema(jsonObject, version)) {
+ return errorResponse(ApiException.SCHEMA_VALIDATION_FAILED);
+ }
+ } else if (!isBatchRequest(request) && (!jsonObject.has("eventList") && (jsonObject.has("event")))) {
+ if (!conformsToSchema(jsonObject, version)) {
+ return errorResponse(ApiException.SCHEMA_VALIDATION_FAILED);
+ }
+ } else {
+ return errorResponse(ApiException.INVALID_JSON_INPUT);
+ }
}
- } else if (!isBatchRequest(request)
- && (!jsonObject.has("eventList") && (jsonObject.has("event")))) {
- if (!conformsToSchema(jsonObject, version)) {
- return errorResponse(ApiException.SCHEMA_VALIDATION_FAILED);
+
+ JSONArray commonlyFormatted = convertToJSONArrayCommonFormat(jsonObject, request, uuid, version);
+
+ if (!putEventsOnProcessingQueue(commonlyFormatted)) {
+ errorLog.error("EVENT_RECEIPT_FAILURE: QueueFull " + ApiException.NO_SERVER_RESOURCES);
+ return errorResponse(ApiException.NO_SERVER_RESOURCES);
}
- } else {
- return errorResponse(ApiException.INVALID_JSON_INPUT);
- }
+ return accepted()
+ .contentType(MediaType.APPLICATION_JSON)
+ .body("Accepted");
}
- JSONArray commonlyFormatted =
- convertToJSONArrayCommonFormat(jsonObject, request, uuid, version);
+ private String extractVersion(String httpServletRequest) {
+ return httpServletRequest.split("/")[2];
+ }
- if (!putEventsOnProcessingQueue(commonlyFormatted)) {
- errorLog.error("EVENT_RECEIPT_FAILURE: QueueFull " + ApiException.NO_SERVER_RESOURCES);
- return errorResponse(ApiException.NO_SERVER_RESOURCES);
+ private ResponseEntity<String> errorResponse(ApiException noServerResources) {
+ return ResponseEntity.status(noServerResources.httpStatusCode)
+ .body(noServerResources.toJSON().toString());
}
- // HttpStatus.SC_NO_CONTENT
- return org.springframework.http.ResponseEntity.accepted()
- .contentType(MediaType.APPLICATION_JSON)
- .body("Accepted");
- }
-
- private void extractVersion(String httpServletRequest) {
- version = httpServletRequest.split("/")[2];
- }
-
- private ResponseEntity<String> errorResponse(ApiException noServerResources) {
- return ResponseEntity.status(noServerResources.httpStatusCode)
- .body(noServerResources.toJSON().toString());
- }
-
- private boolean putEventsOnProcessingQueue(JSONArray arrayOfEvents) {
- for (int i = 0; i < arrayOfEvents.length(); i++) {
- metriclog.info("EVENT_PUBLISH_START");
- if (!inputQueue.offer((JSONObject) arrayOfEvents.get(i))) {
- return false;
- }
+
+ private boolean putEventsOnProcessingQueue(JSONArray arrayOfEvents) {
+ for (int i = 0; i < arrayOfEvents.length(); i++) {
+ metricsLog.info("EVENT_PUBLISH_START");
+ if (!inputQueue.offer((JSONObject) arrayOfEvents.get(i))) {
+ return false;
+ }
+ }
+ log.debug("CommonStartup.handleEvents:EVENTS has been published successfully!");
+ metricsLog.info("EVENT_PUBLISH_END");
+ return true;
}
- LOG.debug("CommonStartup.handleEvents:EVENTS has been published successfully!");
- metriclog.info("EVENT_PUBLISH_END");
- return true;
- }
-
- private boolean conformsToSchema(JSONObject payload, String version) {
- try {
- JsonSchema schema =
- ofNullable(schemas.getJSONSchemasMap(version).get(version))
- .orElse(schemas.getJSONSchemasMap(version).get(FALLBACK_VES_VERSION));
- ProcessingReport report = schema.validate(JsonLoader.fromString(payload.toString()));
- if (!report.isSuccess()) {
- LOG.warn("Schema validation failed for event: " + payload);
- stream(report.spliterator(), false).forEach(e -> LOG.warn(e.getMessage()));
- return false;
- }
- return report.isSuccess();
- } catch (Exception e) {
- throw new RuntimeException("Unable to validate against schema", e);
+
+ private boolean conformsToSchema(JSONObject payload, String version) {
+ try {
+ JsonSchema schema = applicationSettings.jsonSchema(version);
+ ProcessingReport report = schema.validate(JsonLoader.fromString(payload.toString()));
+ if (!report.isSuccess()) {
+ log.warn("Schema validation failed for event: " + payload);
+ stream(report.spliterator(), false).forEach(e -> log.warn(e.getMessage()));
+ return false;
+ }
+ return report.isSuccess();
+ } catch (Exception e) {
+ throw new RuntimeException("Unable to validate against schema", e);
+ }
+ }
+
+ private static JSONArray convertToJSONArrayCommonFormat(JSONObject jsonObject, String request,
+ String uuid, String version) {
+ JSONArray asArrayEvents = new JSONArray();
+ String vesUniqueIdKey = "VESuniqueId";
+ String vesVersionKey = "VESversion";
+ if (isBatchRequest(request)) {
+ JSONArray events = jsonObject.getJSONArray("eventList");
+ for (int i = 0; i < events.length(); i++) {
+ JSONObject event = new JSONObject().put("event", events.getJSONObject(i));
+ event.put(vesUniqueIdKey, uuid + "-" + i);
+ event.put(vesVersionKey, version);
+ asArrayEvents.put(event);
+ }
+ } else {
+ jsonObject.put(vesUniqueIdKey, uuid);
+ jsonObject.put(vesVersionKey, version);
+ asArrayEvents = new JSONArray().put(jsonObject);
+ }
+ return asArrayEvents;
}
- }
-
- private static JSONArray convertToJSONArrayCommonFormat(
- JSONObject jsonObject, String request, String uuid, String version) {
- JSONArray asArrayEvents = new JSONArray();
- String vesUniqueIdKey = "VESuniqueId";
- String vesVersionKey = "VESversion";
- if (isBatchRequest(request)) {
- JSONArray events = jsonObject.getJSONArray("eventList");
- for (int i = 0; i < events.length(); i++) {
- JSONObject event = new JSONObject().put("event", events.getJSONObject(i));
- event.put(vesUniqueIdKey, uuid + "-" + i);
- event.put(vesVersionKey, version);
- asArrayEvents.put(event);
- }
- } else {
- jsonObject.put(vesUniqueIdKey, uuid);
- jsonObject.put(vesVersionKey, version);
- asArrayEvents = new JSONArray().put(jsonObject);
+
+ private static String setUpECOMPLoggingForRequest() {
+ final UUID uuid = UUID.randomUUID();
+ LoggingContext localLC = VESLogger.getLoggingContextForThread(uuid);
+ localLC.put(EcompFields.kBeginTimestampMs, SaClock.now());
+ return uuid.toString();
+ }
+
+ private static boolean isBatchRequest(String request) {
+ return request.contains("eventBatch");
}
- return asArrayEvents;
- }
-
- private static String setUpECOMPLoggingForRequest() {
- final UUID uuid = UUID.randomUUID();
- LoggingContext localLC = VESLogger.getLoggingContextForThread(uuid);
- localLC.put(EcompFields.kBeginTimestampMs, SaClock.now());
- return uuid.toString();
- }
-
- private static boolean isBatchRequest(String request) {
- return request.contains("eventBatch");
- }
-}
+} \ No newline at end of file
diff --git a/src/test/java/org/onap/dcae/ApplicationSettingsTest.java b/src/test/java/org/onap/dcae/ApplicationSettingsTest.java
index 2ac42080..26b0a68b 100644
--- a/src/test/java/org/onap/dcae/ApplicationSettingsTest.java
+++ b/src/test/java/org/onap/dcae/ApplicationSettingsTest.java
@@ -20,6 +20,11 @@ package org.onap.dcae;
* ============LICENSE_END=========================================================
*/
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.github.fge.jsonschema.core.exceptions.ProcessingException;
+import com.github.fge.jsonschema.core.report.ProcessingReport;
+import com.github.fge.jsonschema.main.JsonSchema;
import io.vavr.collection.HashMap;
import io.vavr.collection.Map;
import org.json.JSONObject;
@@ -28,6 +33,7 @@ import org.junit.Test;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
+import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.Objects;
@@ -35,6 +41,7 @@ import java.util.Objects;
import static java.util.Collections.singletonList;
import static org.junit.Assert.*;
import static org.onap.dcae.CLIUtils.processCmdLine;
+import static org.onap.dcae.TestingUtilities.createTemporaryFile;
public class ApplicationSettingsTest {
@@ -281,22 +288,25 @@ public class ApplicationSettingsTest {
}
@Test
- public void shouldReturnJSONSchema() throws IOException {
+ public void shouldReturnJSONSchema() throws IOException, ProcessingException {
// when
- JSONObject jsonSchema = fromTemporaryConfiguration("collector.schema.file={\"v1\": {}}")
- .jsonSchema();
+ String sampleJsonSchema = "{" +
+ " \"type\": \"object\"," +
+ " \"properties\": {" +
+ " \"state\": { \"type\": \"string\" }" +
+ " }" +
+ "}";
+ Path temporarySchemaFile = createTemporaryFile(sampleJsonSchema);
- // then
- assertEquals(new JSONObject("{\"v1\": {}}").toMap(), jsonSchema.toMap());
- }
-
- @Test
- public void shouldReturnDefaultJSONSchema() throws IOException {
// when
- JSONObject jsonSchema = fromTemporaryConfiguration().jsonSchema();
+ JsonSchema schema = fromTemporaryConfiguration(String.format("collector.schema.file={\"v1\": \"%s\"}", temporarySchemaFile))
+ .jsonSchema("v1");
// then
- assertEquals(new JSONObject("{\"v5\":\"./etc/CommonEventFormat_28.3.json\"}").toMap(), jsonSchema.toMap());
+ JsonNode incorrectTestObject = new ObjectMapper().readTree("{ \"state\": 1 }");
+ JsonNode correctTestObject = new ObjectMapper().readTree("{ \"state\": \"hi\" }");
+ assertFalse(schema.validate(incorrectTestObject).isSuccess());
+ assertTrue(schema.validate(correctTestObject).isSuccess());
}
@Test
diff --git a/src/test/java/org/onap/dcae/TestingUtilities.java b/src/test/java/org/onap/dcae/TestingUtilities.java
index 0bbb6cc3..092983bf 100644
--- a/src/test/java/org/onap/dcae/TestingUtilities.java
+++ b/src/test/java/org/onap/dcae/TestingUtilities.java
@@ -82,7 +82,6 @@ public final class TestingUtilities {
T get() throws Exception;
}
-
public static void assertFailureHasInfo(Try any, String... msgPart) {
Java6Assertions.assertThat(any.isFailure()).isTrue();
AbstractThrowableAssert<?, ? extends Throwable> o = Java6Assertions.assertThat(any.getCause())
diff --git a/src/test/java/org/onap/dcae/commonFunction/EventProcessorTest.java b/src/test/java/org/onap/dcae/commonFunction/EventProcessorTest.java
index f5c5d5f9..97dccb5b 100644
--- a/src/test/java/org/onap/dcae/commonFunction/EventProcessorTest.java
+++ b/src/test/java/org/onap/dcae/commonFunction/EventProcessorTest.java
@@ -29,7 +29,6 @@ import org.mockito.ArgumentCaptor;
import org.onap.dcae.ApplicationSettings;
import org.onap.dcae.CLIUtils;
import org.onap.dcae.commonFunction.event.publishing.EventPublisher;
-import org.onap.dcae.vestest.TestingUtilities;
import java.util.List;
@@ -43,12 +42,11 @@ public class EventProcessorTest {
private final String ev = "{\"event\": {\"commonEventHeader\": { \"reportingEntityName\": \"VM name will be provided by ECOMP\", \"startEpochMicrosec\": 1477012779802988,\"lastEpochMicrosec\": 1477012789802988,\"eventId\": \"83\",\"sourceName\": \"Dummy VM name - No Metadata available\",\"sequence\": 83,\"priority\": \"Normal\",\"functionalRole\": \"vFirewall\",\"domain\": \"measurementsForVfScaling\",\"reportingEntityId\": \"VM UUID will be provided by ECOMP\",\"sourceId\": \"Dummy VM UUID - No Metadata available\",\"version\": 1.1},\"measurementsForVfScalingFields\": {\"measurementInterval\": 10,\"measurementsForVfScalingVersion\": 1.1,\"vNicUsageArray\": [{\"multicastPacketsIn\": 0,\"bytesIn\": 3896,\"unicastPacketsIn\": 0, \"multicastPacketsOut\": 0,\"broadcastPacketsOut\": 0, \"packetsOut\": 28,\"bytesOut\": 12178,\"broadcastPacketsIn\": 0,\"packetsIn\": 58,\"unicastPacketsOut\": 0,\"vNicIdentifier\": \"eth0\"}]}}}";
- Map<String, String[]> streamID;
+ private Map<String, String[]> streamID;
private ApplicationSettings properties;
@Before
public void setUp() {
- streamID = TestingUtilities.convertDMaaPStreamsPropertyToMap("fault=sec_fault|syslog=sec_syslog|heartbeat=sec_heartbeat|measurementsForVfScaling=sec_measurement|mobileFlow=sec_mobileflow|other=sec_other|stateChange=sec_statechange|thresholdCrossingAlert=sec_thresholdCrossingAlert|voiceQuality=ves_voicequality|sipSignaling=ves_sipsignaling");
properties = new ApplicationSettings(new String[]{}, CLIUtils::processCmdLine);
streamID = properties.dMaaPStreamsMapping();
}
diff --git a/src/test/java/org/onap/dcae/vestest/TestJsonSchemaValidation.java b/src/test/java/org/onap/dcae/vestest/TestJsonSchemaValidation.java
deleted file mode 100644
index 9146cdac..00000000
--- a/src/test/java/org/onap/dcae/vestest/TestJsonSchemaValidation.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*-
- * ============LICENSE_START=======================================================
- * PROJECT
- * ================================================================================
- * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.dcae.vestest;
-
-import static java.nio.file.Files.readAllBytes;
-import static junit.framework.Assert.assertEquals;
-
-import com.google.gson.JsonObject;
-import com.google.gson.JsonParser;
-import java.io.IOException;
-import java.nio.file.Paths;
-import org.junit.Test;
-import org.onap.dcae.SchemaValidator;
-
-public class TestJsonSchemaValidation {
-
- @Test
- public void shouldValidEventPassSchema_27_2() throws IOException {
- String result =
- SchemaValidator.validateAgainstSchema(
- readJSONFromFile("src/test/resources/ves4_valid.json").toString(),
- readJSONFromFile("etc/CommonEventFormat_27.2.json").toString());
- assertEquals(result, "true");
- }
-
- @Test
- public void shouldInvalidEventDoesNotPassSchema_27_2() throws IOException {
- String result =
- SchemaValidator.validateAgainstSchema(
- readJSONFromFile("src/test/resources/ves4_invalid.json").toString(),
- readJSONFromFile("etc/CommonEventFormat_27.2.json").toString());
- assertEquals(result, "false");
- }
-
- @Test
- public void shouldValidEventPassSchema_30_0_1() throws IOException {
- String result =
- SchemaValidator.validateAgainstSchema(
- readJSONFromFile("src/test/resources/ves7_valid.json").toString(),
- readJSONFromFile("etc/CommonEventFormat_30.0.1.json").toString());
- assertEquals(result, "true");
- }
-
- @Test
- public void shouldValidEventBatchPassSchema_30_0_1() throws IOException {
- String result =
- SchemaValidator.validateAgainstSchema(
- readJSONFromFile("src/test/resources/ves7_batch_valid.json").toString(),
- readJSONFromFile("etc/CommonEventFormat_30.0.1.json").toString());
- assertEquals(result, "true");
- }
-
- @Test
- public void shouldInvalidEventDoesNotPassSchema_30_0_1() throws IOException {
- String result =
- SchemaValidator.validateAgainstSchema(
- readJSONFromFile("src/test/resources/ves7_invalid.json").toString(),
- readJSONFromFile("etc/CommonEventFormat_30.0.1.json").toString());
- assertEquals(result, "false");
- }
-
- private static JsonObject readJSONFromFile(String path) throws IOException {
- return (JsonObject) new JsonParser().parse(new String(readAllBytes(Paths.get(path))));
- }
-}
diff --git a/src/test/java/org/onap/dcae/vestest/TestingUtilities.java b/src/test/java/org/onap/dcae/vestest/TestingUtilities.java
deleted file mode 100644
index eff31f6d..00000000
--- a/src/test/java/org/onap/dcae/vestest/TestingUtilities.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/*-
- * ============LICENSE_START=======================================================
- * org.onap.dcaegen2.collectors.ves
- * ================================================================================
- * Copyright (C) 2018 Nokia. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.dcae.vestest;
-
-import static java.nio.file.Files.readAllBytes;
-
-import com.google.gson.JsonObject;
-import com.google.gson.JsonParser;
-import io.vavr.collection.HashMap;
-
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-
-/**
- * @author Pawel Szalapski (pawel.szalapski@nokia.com)
- */
-public final class TestingUtilities {
-
- private TestingUtilities() {
- // utility class, no objects allowed
- }
-
- static JsonObject readJSONFromFile(Path path) {
- return rethrow(() -> (JsonObject) new JsonParser().parse(new String(readAllBytes(path))));
- }
-
- static Path createTemporaryFile() {
- return rethrow(() -> {
- Path temporaryDirectory = Files.createTempDirectory("temporaryDirectory");
- Path temporaryFile = TestingUtilities.createFile(temporaryDirectory + "/testFile");
- TestingUtilities.scheduleToBeDeletedAfterTests(temporaryDirectory);
- TestingUtilities.scheduleToBeDeletedAfterTests(temporaryFile);
- return temporaryFile;
- });
- }
-
- public static HashMap<String, String[]> convertDMaaPStreamsPropertyToMap(String streamIdsProperty) {
- java.util.HashMap<String, String[]> domainToStreamIdsMapping = new java.util.HashMap<>();
- String[] topics = streamIdsProperty.split("\\|");
- for (String t : topics) {
- String domain = t.split("=")[0];
- String[] streamIds = t.split("=")[1].split(",");
- domainToStreamIdsMapping.put(domain, streamIds);
- }
- return HashMap.ofAll(domainToStreamIdsMapping);
- }
-
- private static Path createFile(String path) {
- return rethrow(() -> Files.createFile(Paths.get(path)));
- }
-
- private static void scheduleToBeDeletedAfterTests(Path path) {
- path.toFile().deleteOnExit();
- }
-
- /**
- * Exception in test case usually means there is something wrong, it should never be catched, but rather thrown to
- * be handled by JUnit framework.
- */
- private static <T> T rethrow(CheckedSupplier<T> supplier) {
- try {
- return supplier.get();
- } catch (Exception e) {
- throw new RuntimeException();
- }
- }
-
- @FunctionalInterface
- interface CheckedSupplier<T> {
-
- T get() throws Exception;
- }
-
-
-}