From 3c3c7ad09c02852cd0b4db03ecc9cc5c429cab08 Mon Sep 17 00:00:00 2001 From: Zlatko Murgoski Date: Thu, 9 May 2019 11:21:14 +0200 Subject: VES Collector - Event Ordering https://jira.onap.org/browse/DCAEGEN2-1483 Change-Id: I28b0e871ce570a3cf4c0d2e08d040b66eb6db3aa Issue-ID: DCAEGEN2-1483 Signed-off-by: Zlatko Murgoski --- .../java/org/onap/dcae/ApplicationSettings.java | 83 +++++----- src/main/java/org/onap/dcae/VesApplication.java | 29 +--- .../java/org/onap/dcae/common/EventProcessor.java | 61 ------- .../java/org/onap/dcae/common/EventSender.java | 94 +++-------- .../java/org/onap/dcae/common/EventUpdater.java | 137 ++++++++++++++++ .../java/org/onap/dcae/restapi/EventValidator.java | 78 +++++++++ .../onap/dcae/restapi/HealthCheckController.java | 12 +- .../java/org/onap/dcae/restapi/SwaggerConfig.java | 2 +- .../org/onap/dcae/restapi/VesRestController.java | 179 +++++---------------- .../java/org/onap/dcae/restapi/WebMvcConfig.java | 1 - .../org/onap/dcae/ApplicationSettingsTest.java | 19 --- src/test/java/org/onap/dcae/TLSTestBase.java | 5 +- .../java/org/onap/dcae/common/EventSenderTest.java | 12 +- src/test/resources/controller-config_dmaap_ip.json | 1 - .../resources/controller-config_singleline_ip.json | 1 - src/test/resources/test_collector_ip_op.properties | 1 - 16 files changed, 348 insertions(+), 367 deletions(-) delete mode 100644 src/main/java/org/onap/dcae/common/EventProcessor.java create mode 100644 src/main/java/org/onap/dcae/common/EventUpdater.java create mode 100644 src/main/java/org/onap/dcae/restapi/EventValidator.java (limited to 'src') diff --git a/src/main/java/org/onap/dcae/ApplicationSettings.java b/src/main/java/org/onap/dcae/ApplicationSettings.java index 61bcf4b4..205659c4 100644 --- a/src/main/java/org/onap/dcae/ApplicationSettings.java +++ b/src/main/java/org/onap/dcae/ApplicationSettings.java @@ -84,38 +84,14 @@ public class ApplicationSettings { throw new ApplicationException(ex); } } - public void loadPropertiesFromFile() { - try { - properties.load(configurationFileLocation); - } catch (ConfigurationException ex) { - log.error("Cannot load properties cause:", ex); - throw new ApplicationException(ex); - } - } - public Map validAuthorizationCredentials() { return prepareUsersMap(properties.getString("header.authlist", null)); } - private Map prepareUsersMap(@Nullable String allowedUsers) { - return allowedUsers == null ? HashMap.empty() - : List.of(allowedUsers.split("\\|")) - .map(t->t.split(",")) - .toMap(t-> t[0].trim(), t -> t[1].trim()); - } - - private String findOutConfigurationFileLocation(Map parsedArgs) { - return prependWithUserDirOnRelative(parsedArgs.get("c").getOrElse("etc/collector.properties")); - } - public Path configurationFileLocation() { return Paths.get(configurationFileLocation); } - public int maximumAllowedQueuedEvents() { - return properties.getInt("collector.inputQueue.maxPending", 1024 * 4); - } - public boolean jsonSchemaValidationEnabled() { return properties.getInt("collector.schema.checkflag", -1) > 0; } @@ -126,22 +102,8 @@ public class ApplicationSettings { .getOrElseThrow(() -> new IllegalStateException("No fallback schema present in application.")); } - private Map loadJsonSchemas() { - return jsonSchema().toMap().entrySet().stream() - .map(this::readSchemaForVersion) - .collect(HashMap.collector()); - } - - private Tuple2 readSchemaForVersion(java.util.Map.Entry versionToFilePath) { - try { - String schemaContent = new String( - readAllBytes(Paths.get(versionToFilePath.getValue().toString()))); - JsonNode schemaNode = JsonLoader.fromString(schemaContent); - JsonSchema schema = JsonSchemaFactory.byDefault().getJsonSchema(schemaNode); - return Tuple(versionToFilePath.getKey(), schema); - } catch (IOException | ProcessingException e) { - throw new ApplicationException("Could not read schema from path: " + versionToFilePath.getValue(), e); - } + public boolean isVersionSupported(String version){ + return loadedJsonSchemas.containsKey(version); } public int httpPort() { @@ -183,6 +145,7 @@ public class ApplicationSettings { public String exceptionConfigFileLocation() { return properties.getString("exceptionConfig", null); } + public String dMaaPConfigurationFileLocation() { return prependWithUserDirOnRelative(properties.getString("collector.dmaapfile", "etc/DmaapConfig.json")); } @@ -204,7 +167,16 @@ public class ApplicationSettings { } } - public void addOrUpdate(String key, String value) { + private void loadPropertiesFromFile() { + try { + properties.load(configurationFileLocation); + } catch (ConfigurationException ex) { + log.error("Cannot load properties cause:", ex); + throw new ApplicationException(ex); + } + } + + private void addOrUpdate(String key, String value) { if (properties.containsKey(key)) { properties.setProperty(key, value); } else { @@ -212,6 +184,35 @@ public class ApplicationSettings { } } + private String findOutConfigurationFileLocation(Map parsedArgs) { + return prependWithUserDirOnRelative(parsedArgs.get("c").getOrElse("etc/collector.properties")); + } + + private Map loadJsonSchemas() { + return jsonSchema().toMap().entrySet().stream() + .map(this::readSchemaForVersion) + .collect(HashMap.collector()); + } + + private Tuple2 readSchemaForVersion(java.util.Map.Entry versionToFilePath) { + try { + String schemaContent = new String( + readAllBytes(Paths.get(versionToFilePath.getValue().toString()))); + JsonNode schemaNode = JsonLoader.fromString(schemaContent); + JsonSchema schema = JsonSchemaFactory.byDefault().getJsonSchema(schemaNode); + return Tuple(versionToFilePath.getKey(), schema); + } catch (IOException | ProcessingException e) { + throw new ApplicationException("Could not read schema from path: " + versionToFilePath.getValue(), e); + } + } + + private Map prepareUsersMap(@Nullable String allowedUsers) { + return allowedUsers == null ? HashMap.empty() + : List.of(allowedUsers.split("\\|")) + .map(t->t.split(",")) + .toMap(t-> t[0].trim(), t -> t[1].trim()); + } + private JSONObject jsonSchema() { return new JSONObject(properties.getString("collector.schema.file", format("{\"%s\":\"etc/CommonEventFormat_28.4.1.json\"}", FALLBACK_VES_VERSION))); diff --git a/src/main/java/org/onap/dcae/VesApplication.java b/src/main/java/org/onap/dcae/VesApplication.java index d658b4aa..e3340820 100644 --- a/src/main/java/org/onap/dcae/VesApplication.java +++ b/src/main/java/org/onap/dcae/VesApplication.java @@ -22,14 +22,9 @@ package org.onap.dcae; import io.vavr.collection.Map; import java.nio.file.Paths; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import org.json.JSONObject; -import org.onap.dcae.common.EventProcessor; import org.onap.dcae.common.EventSender; import org.onap.dcae.common.publishing.DMaaPConfigurationParser; import org.onap.dcae.common.publishing.EventPublisher; @@ -49,21 +44,16 @@ import org.springframework.context.annotation.Lazy; @SpringBootApplication(exclude = {GsonAutoConfiguration.class, SecurityAutoConfiguration.class}) public class VesApplication { - private static final Logger metriclog = LoggerFactory.getLogger("com.att.ecomp.metrics"); private static final Logger incomingRequestsLogger = LoggerFactory.getLogger("org.onap.dcae.common.input"); private static final Logger oplog = LoggerFactory.getLogger("org.onap.dcae.common.output"); private static final Logger errorLog = LoggerFactory.getLogger("org.onap.dcae.common.error"); - private static final int MAX_THREADS = 20; - public static LinkedBlockingQueue fProcessingInputQueue; private static ApplicationSettings properties; private static ConfigurableApplicationContext context; private static ConfigLoader configLoader; - private static EventProcessor eventProcessor; private static ScheduledThreadPoolExecutor scheduledThreadPoolExecutor; private static SpringApplication app; private static EventPublisher eventPublisher; private static ScheduledFuture scheduleFeatures; - private static ExecutorService executor; public static void main(String[] args) { app = new SpringApplication(VesApplication.class); @@ -73,7 +63,6 @@ public class VesApplication { app.setAddCommandLineProperties(true); context = app.run(); configLoader.updateConfig(); - } public static void restartApplication() { @@ -89,7 +78,6 @@ public class VesApplication { } private static void init() { - fProcessingInputQueue = new LinkedBlockingQueue<>(properties.maximumAllowedQueuedEvents()); createConfigLoader(); createSchedulePoolExecutor(); createExecutors(); @@ -97,12 +85,6 @@ public class VesApplication { private static void createExecutors() { eventPublisher = EventPublisher.createPublisher(oplog, getDmapConfig()); - eventProcessor = new EventProcessor(new EventSender(eventPublisher, properties)); - - executor = Executors.newFixedThreadPool(MAX_THREADS); - for (int i = 0; i < MAX_THREADS; ++i) { - executor.execute(eventProcessor); - } } private static void createSchedulePoolExecutor() { @@ -141,12 +123,6 @@ public class VesApplication { return incomingRequestsLogger; } - @Bean - @Qualifier("metricsLog") - public Logger incomingRequestsMetricsLogger() { - return metriclog; - } - @Bean @Qualifier("errorLog") public Logger errorLogger() { @@ -154,8 +130,9 @@ public class VesApplication { } @Bean - public LinkedBlockingQueue inputQueue() { - return fProcessingInputQueue; + @Qualifier("eventSender") + public EventSender eventSender() { + return new EventSender(eventPublisher,properties); } } diff --git a/src/main/java/org/onap/dcae/common/EventProcessor.java b/src/main/java/org/onap/dcae/common/EventProcessor.java deleted file mode 100644 index bf3bf70d..00000000 --- a/src/main/java/org/onap/dcae/common/EventProcessor.java +++ /dev/null @@ -1,61 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * PROJECT - * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.dcae.common; - -import com.att.nsa.clock.SaClock; -import com.att.nsa.logging.LoggingContext; -import com.att.nsa.logging.log4j.EcompFields; -import org.json.JSONObject; -import org.onap.dcae.VesApplication; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class EventProcessor implements Runnable { - - private static final Logger log = LoggerFactory.getLogger(EventProcessor.class); - private EventSender eventSender; - - public EventProcessor(EventSender eventSender) { - this.eventSender = eventSender; - } - - @Override - public void run() { - try { - while (true){ - JSONObject event = VesApplication.fProcessingInputQueue.take(); - log.info("QueueSize:" + VesApplication.fProcessingInputQueue.size() + "\tEventProcessor\tRemoving element: " + event); - setLoggingContext(event); - log.debug("event.VESuniqueId" + event.get("VESuniqueId") + "event.commonEventHeader.domain:" + eventSender.getDomain(event)); - eventSender.send(event); - log.debug("Message published" + event); - } - } catch (InterruptedException e) { - log.error("EventProcessor InterruptedException" + e.getMessage()); - Thread.currentThread().interrupt(); - } - } - - private void setLoggingContext(JSONObject event) { - LoggingContext localLC = VESLogger.getLoggingContextForThread(event.get("VESuniqueId").toString()); - localLC.put(EcompFields.kBeginTimestampMs, SaClock.now()); - } -} \ No newline at end of file diff --git a/src/main/java/org/onap/dcae/common/EventSender.java b/src/main/java/org/onap/dcae/common/EventSender.java index 48268d6c..c1002af6 100644 --- a/src/main/java/org/onap/dcae/common/EventSender.java +++ b/src/main/java/org/onap/dcae/common/EventSender.java @@ -20,17 +20,12 @@ */ package org.onap.dcae.common; -import com.google.common.reflect.TypeToken; -import com.google.gson.Gson; +import com.att.nsa.clock.SaClock; +import com.att.nsa.logging.LoggingContext; +import com.att.nsa.logging.log4j.EcompFields; import io.vavr.collection.Map; -import java.io.FileReader; -import java.io.IOException; -import java.lang.reflect.Type; -import java.text.SimpleDateFormat; -import java.util.Date; -import java.util.List; +import org.json.JSONArray; import org.json.JSONObject; -import org.onap.dcae.ApplicationException; import org.onap.dcae.ApplicationSettings; import org.onap.dcae.common.publishing.EventPublisher; import org.slf4j.Logger; @@ -38,88 +33,47 @@ import org.slf4j.LoggerFactory; public class EventSender { - private static final String COULD_NOT_FIND_FILE = "Couldn't find file ./etc/eventTransform.json"; + private static final Logger metriclog = LoggerFactory.getLogger("com.att.ecomp.metrics"); private Map streamidHash; - private ApplicationSettings properties; private EventPublisher eventPublisher; - - private static final Type EVENT_LIST_TYPE = new TypeToken>() {}.getType(); + private static final String VES_UNIQUE_ID = "VESuniqueId"; private static final Logger log = LoggerFactory.getLogger(EventSender.class); private static final String EVENT_LITERAL = "event"; private static final String COMMON_EVENT_HEADER = "commonEventHeader"; - private final SimpleDateFormat dateFormat = new SimpleDateFormat("EEE, MM dd yyyy hh:mm:ss z"); public EventSender( EventPublisher eventPublisher, ApplicationSettings properties) { this.eventPublisher = eventPublisher; this.streamidHash = properties.dMaaPStreamsMapping(); - this.properties = properties; - } - public void send(JSONObject event) { - streamidHash.get(getDomain(event)) - .onEmpty(() -> log.error("No StreamID defined for publish - Message dropped" + event)) - .forEach(streamIds -> sendEventsToStreams(event, streamIds)); + public void send(JSONArray arrayOfEvents) { + for (int i = 0; i < arrayOfEvents.length(); i++) { + metriclog.info("EVENT_PUBLISH_START"); + JSONObject object = (JSONObject) arrayOfEvents.get(i); + setLoggingContext(object); + streamidHash.get(getDomain(object)) + .onEmpty(() -> log.error("No StreamID defined for publish - Message dropped" + object)) + .forEach(streamIds -> sendEventsToStreams(object, streamIds)); + log.debug("Message published" + object); + } + log.debug("CommonStartup.handleEvents:EVENTS has been published successfully!"); + metriclog.info("EVENT_PUBLISH_END"); } - public static String getDomain(JSONObject event) { + private static String getDomain(JSONObject event) { return event.getJSONObject(EVENT_LITERAL).getJSONObject(COMMON_EVENT_HEADER).getString("domain"); } private void sendEventsToStreams(JSONObject event, String[] streamIdList) { for (String aStreamIdList : streamIdList) { log.info("Invoking publisher for streamId:" + aStreamIdList); - eventPublisher.sendEvent(overrideEvent(event), aStreamIdList); + eventPublisher.sendEvent(event, aStreamIdList); } } - private JSONObject overrideEvent(JSONObject event) { - JSONObject jsonObject = addCurrentTimeToEvent(event); - if (properties.eventTransformingEnabled()) { - try (FileReader fr = new FileReader("./etc/eventTransform.json")) { - log.info("parse eventTransform.json"); - List events = new Gson().fromJson(fr, EVENT_LIST_TYPE); - parseEventsJson(events, new ConfigProcessorAdapter(new ConfigProcessors(jsonObject))); - } catch (IOException e) { - log.error(COULD_NOT_FIND_FILE, e); - throw new ApplicationException(COULD_NOT_FIND_FILE, e); - } - } - if (jsonObject.has("VESversion")) - jsonObject.remove("VESversion"); - - log.debug("Modified event:" + jsonObject); - return jsonObject; - } - - private JSONObject addCurrentTimeToEvent(JSONObject event) { - final Date currentTime = new Date(); - JSONObject collectorTimeStamp = new JSONObject().put("collectorTimeStamp", dateFormat.format(currentTime)); - JSONObject commonEventHeaderkey = event.getJSONObject(EVENT_LITERAL).getJSONObject(COMMON_EVENT_HEADER); - commonEventHeaderkey.put("internalHeaderFields", collectorTimeStamp); - event.getJSONObject(EVENT_LITERAL).put(COMMON_EVENT_HEADER, commonEventHeaderkey); - return event; - } - - private void parseEventsJson(List eventsTransform, ConfigProcessorAdapter configProcessorAdapter) { - for (Event eventTransform : eventsTransform) { - JSONObject filterObj = new JSONObject(eventTransform.filter.toString()); - if (configProcessorAdapter.isFilterMet(filterObj)) { - callProcessorsMethod(configProcessorAdapter, eventTransform.processors); - } - } - } - - private void callProcessorsMethod(ConfigProcessorAdapter configProcessorAdapter, List processors) { - for (Processor processor : processors) { - final String functionName = processor.functionName; - final JSONObject args = new JSONObject(processor.args.toString()); - log.info(String.format("functionName==%s | args==%s", functionName, args)); - try { - configProcessorAdapter.runConfigProcessorFunctionByName(functionName, args); - } catch (ReflectiveOperationException e) { - log.error("EventProcessor Exception" + e.getMessage() + e + e.getCause()); - } - } + private void setLoggingContext(JSONObject event) { + LoggingContext localLC = VESLogger.getLoggingContextForThread(event.get(VES_UNIQUE_ID).toString()); + localLC.put(EcompFields.kBeginTimestampMs, SaClock.now()); + log.debug("event.VESuniqueId" + event.get(VES_UNIQUE_ID) + "event.commonEventHeader.domain:" + getDomain(event)); } } diff --git a/src/main/java/org/onap/dcae/common/EventUpdater.java b/src/main/java/org/onap/dcae/common/EventUpdater.java new file mode 100644 index 00000000..1caa4f18 --- /dev/null +++ b/src/main/java/org/onap/dcae/common/EventUpdater.java @@ -0,0 +1,137 @@ +/* + * ============LICENSE_START======================================================= + * PROJECT + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * Copyright (C) 2019 Nokia. All rights reserved.s + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.dcae.common; + +import com.google.common.reflect.TypeToken; +import com.google.gson.Gson; +import java.io.FileReader; +import java.io.IOException; +import java.lang.reflect.Type; +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.List; +import java.util.UUID; +import org.json.JSONArray; +import org.json.JSONObject; +import org.onap.dcae.ApplicationException; +import org.onap.dcae.ApplicationSettings; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class EventUpdater { + + private static final String EVENT_LIST = "eventList"; + private static final String EVENT = "event"; + private static final String VES_UNIQUE_ID = "VESuniqueId"; + private static final String VES_VERSION = "VESversion"; + private static final String COULD_NOT_FIND_FILE = "Couldn't find file ./etc/eventTransform.json"; + private static final Type EVENT_LIST_TYPE = new TypeToken>() {}.getType(); + private static final Logger log = LoggerFactory.getLogger(EventSender.class); + private static final String EVENT_LITERAL = "event"; + private static final String COMMON_EVENT_HEADER = "commonEventHeader"; + private static final String EVENT_TRANSFORM = "./etc/eventTransform.json"; + private ApplicationSettings settings; + private final SimpleDateFormat dateFormat = new SimpleDateFormat("EEE, MM dd yyyy hh:mm:ss z"); + + public EventUpdater(ApplicationSettings settings) { + this.settings = settings; + } + + public JSONArray convert(JSONObject jsonObject, String version, UUID uuid, String type){ + if(type.equalsIgnoreCase(EVENT_LIST)){ + return convertEvents(jsonObject, uuid.toString(), version); + } + else { + return convertEvent(jsonObject, uuid.toString(), version); + } + } + + private JSONArray convertEvents(JSONObject jsonObject, + String uuid, String version) { + JSONArray asArrayEvents = new JSONArray(); + + JSONArray events = jsonObject.getJSONArray(EVENT_LIST); + for (int i = 0; i < events.length(); i++) { + JSONObject event = new JSONObject().put(EVENT, events.getJSONObject(i)); + event.put(VES_UNIQUE_ID, uuid + "-" + i); + event.put(VES_VERSION, version); + asArrayEvents.put(overrideEvent(event)); + } + return asArrayEvents; + } + + private JSONArray convertEvent(JSONObject jsonObject, String uuid, String version) { + jsonObject.put(VES_UNIQUE_ID, uuid); + jsonObject.put(VES_VERSION, version); + return new JSONArray().put(overrideEvent(jsonObject)); + } + + private JSONObject overrideEvent(JSONObject event) { + JSONObject jsonObject = addCurrentTimeToEvent(event); + if (settings.eventTransformingEnabled()) { + try (FileReader fr = new FileReader(EVENT_TRANSFORM)) { + log.info("parse " + EVENT_TRANSFORM + " file"); + List events = new Gson().fromJson(fr, EVENT_LIST_TYPE); + parseEventsJson(events, new ConfigProcessorAdapter(new ConfigProcessors(jsonObject))); + } catch (IOException e) { + log.error(COULD_NOT_FIND_FILE, e); + throw new ApplicationException(COULD_NOT_FIND_FILE, e); + } + } + if (jsonObject.has(VES_VERSION)) + jsonObject.remove(VES_VERSION); + log.debug("Modified event:" + jsonObject); + return jsonObject; + } + + private JSONObject addCurrentTimeToEvent(JSONObject event) { + final Date currentTime = new Date(); + JSONObject collectorTimeStamp = new JSONObject().put("collectorTimeStamp", dateFormat.format(currentTime)); + JSONObject commonEventHeaderkey = event.getJSONObject(EVENT_LITERAL).getJSONObject(COMMON_EVENT_HEADER); + commonEventHeaderkey.put("internalHeaderFields", collectorTimeStamp); + event.getJSONObject(EVENT_LITERAL).put(COMMON_EVENT_HEADER, commonEventHeaderkey); + return event; + } + + private void parseEventsJson(List eventsTransform, ConfigProcessorAdapter configProcessorAdapter) { + for (Event eventTransform : eventsTransform) { + JSONObject filterObj = new JSONObject(eventTransform.filter.toString()); + if (configProcessorAdapter.isFilterMet(filterObj)) { + callProcessorsMethod(configProcessorAdapter, eventTransform.processors); + } + } + } + + private void callProcessorsMethod(ConfigProcessorAdapter configProcessorAdapter, List processors) { + for (Processor processor : processors) { + //TODO try to remove refection + final String functionName = processor.functionName; + final JSONObject args = new JSONObject(processor.args.toString()); + log.info(String.format("functionName==%s | args==%s", functionName, args)); + try { + configProcessorAdapter.runConfigProcessorFunctionByName(functionName, args); + } catch (ReflectiveOperationException e) { + log.error("EventProcessor Exception" + e.getMessage() + e + e.getCause()); + } + } + } +} diff --git a/src/main/java/org/onap/dcae/restapi/EventValidator.java b/src/main/java/org/onap/dcae/restapi/EventValidator.java new file mode 100644 index 00000000..f119b507 --- /dev/null +++ b/src/main/java/org/onap/dcae/restapi/EventValidator.java @@ -0,0 +1,78 @@ +/* + * ============LICENSE_START======================================================= + * PROJECT + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * Copyright (C) 2019 Nokia. All rights reserved.s + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.restapi; + +import static java.util.stream.StreamSupport.stream; + +import com.github.fge.jackson.JsonLoader; +import com.github.fge.jsonschema.core.report.ProcessingReport; +import com.github.fge.jsonschema.main.JsonSchema; +import java.util.Optional; +import org.json.JSONObject; +import org.onap.dcae.ApplicationException; +import org.onap.dcae.ApplicationSettings; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.http.ResponseEntity; + +public class EventValidator { + + private static final Logger log = LoggerFactory.getLogger(EventValidator.class); + + private ApplicationSettings applicationSettings; + + public EventValidator(ApplicationSettings applicationSettings) { + this.applicationSettings = applicationSettings; + } + + public Optional> validate(JSONObject jsonObject, String type, String version){ + if (applicationSettings.jsonSchemaValidationEnabled()) { + if (jsonObject.has(type)) { + if (!conformsToSchema(jsonObject, version)) { + return errorResponse(ApiException.SCHEMA_VALIDATION_FAILED); + } + } else { + return errorResponse(ApiException.INVALID_JSON_INPUT); + } + } + return Optional.empty(); + } + + private boolean conformsToSchema(JSONObject payload, String version) { + try { + JsonSchema schema = applicationSettings.jsonSchema(version); + ProcessingReport report = schema.validate(JsonLoader.fromString(payload.toString())); + if (report.isSuccess()) { + return true; + } + log.warn("Schema validation failed for event: " + payload); + stream(report.spliterator(), false).forEach(e -> log.warn(e.getMessage())); + return false; + } catch (Exception e) { + throw new ApplicationException("Unable to validate against schema", e); + } + } + + private Optional> errorResponse(ApiException noServerResources) { + return Optional.of(ResponseEntity.status(noServerResources.httpStatusCode) + .body(noServerResources.toJSON().toString())); + } +} diff --git a/src/main/java/org/onap/dcae/restapi/HealthCheckController.java b/src/main/java/org/onap/dcae/restapi/HealthCheckController.java index 9c65619c..77c6802a 100644 --- a/src/main/java/org/onap/dcae/restapi/HealthCheckController.java +++ b/src/main/java/org/onap/dcae/restapi/HealthCheckController.java @@ -21,16 +21,20 @@ package org.onap.dcae.restapi; -import org.springframework.stereotype.Controller; import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RestController; - -@Controller +@RestController public class HealthCheckController { + @GetMapping("/") + public String main() { + return "Welcome to VESCollector"; + } + @GetMapping("/healthcheck") public String healthCheck() { - return "hello"; + return "I'm good"; } } diff --git a/src/main/java/org/onap/dcae/restapi/SwaggerConfig.java b/src/main/java/org/onap/dcae/restapi/SwaggerConfig.java index 60740a80..267db054 100644 --- a/src/main/java/org/onap/dcae/restapi/SwaggerConfig.java +++ b/src/main/java/org/onap/dcae/restapi/SwaggerConfig.java @@ -31,6 +31,7 @@ import springfox.documentation.swagger2.annotations.EnableSwagger2; @Configuration @EnableSwagger2 public class SwaggerConfig{ + @Bean public Docket api() { return new Docket(DocumentationType.SWAGGER_2) @@ -39,5 +40,4 @@ public class SwaggerConfig{ .paths(PathSelectors.any()) .build(); } - } diff --git a/src/main/java/org/onap/dcae/restapi/VesRestController.java b/src/main/java/org/onap/dcae/restapi/VesRestController.java index 3102c31c..b18eb7bc 100644 --- a/src/main/java/org/onap/dcae/restapi/VesRestController.java +++ b/src/main/java/org/onap/dcae/restapi/VesRestController.java @@ -21,32 +21,27 @@ package org.onap.dcae.restapi; -import static java.util.stream.StreamSupport.stream; import static org.springframework.http.ResponseEntity.accepted; +import static org.springframework.http.ResponseEntity.badRequest; import com.att.nsa.clock.SaClock; import com.att.nsa.logging.LoggingContext; import com.att.nsa.logging.log4j.EcompFields; -import com.github.fge.jackson.JsonLoader; -import com.github.fge.jsonschema.core.report.ProcessingReport; -import com.github.fge.jsonschema.main.JsonSchema; - +import java.util.Optional; import java.util.UUID; -import java.util.concurrent.LinkedBlockingQueue; import javax.servlet.http.HttpServletRequest; - import org.json.JSONArray; import org.json.JSONObject; -import org.onap.dcae.ApplicationException; import org.onap.dcae.ApplicationSettings; +import org.onap.dcae.common.EventSender; import org.onap.dcae.common.VESLogger; +import org.onap.dcae.common.EventUpdater; import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.http.MediaType; import org.springframework.http.ResponseEntity; -import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RestController; @@ -54,152 +49,64 @@ import org.springframework.web.bind.annotation.RestController; @RestController public class VesRestController { - private static final Logger log = LoggerFactory.getLogger(VesRestController.class); - private static final String INVALID_JSON = ApiException.INVALID_JSON_INPUT.toJSON().toString(); - private final ApplicationSettings applicationSettings; - private final LinkedBlockingQueue inputQueue; - private final Logger metricsLog; - private final Logger errorLog; - private final Logger incomingRequestsLogger; + private static final String VES_EVENT_MESSAGE = "Received a VESEvent '%s', marked with unique identifier '%s', on api version '%s', from host: '%s'"; + private static final String EVENT_LIST = "eventList"; + private static final String EVENT = "event"; + private final ApplicationSettings settings; + private final Logger requestLogger; + private EventSender eventSender; @Autowired - VesRestController(ApplicationSettings applicationSettings, - @Qualifier("metricsLog") Logger metricsLog, - @Qualifier("errorLog") Logger errorLog, + VesRestController(ApplicationSettings settings, @Qualifier("incomingRequestsLogger") Logger incomingRequestsLogger, - @Qualifier("inputQueue") LinkedBlockingQueue inputQueue) { - this.applicationSettings = applicationSettings; - this.metricsLog = metricsLog; - this.errorLog = errorLog; - this.incomingRequestsLogger = incomingRequestsLogger; - this.inputQueue = inputQueue; - } - - @GetMapping("/") - String mainPage() { - return "Welcome to VESCollector"; + @Qualifier("eventSender") EventSender eventSender) { + this.settings = settings; + this.requestLogger = incomingRequestsLogger; + this.eventSender = eventSender; } - //refactor in next iteration - @PostMapping(value = {"/eventListener/v1", - "/eventListener/v1/eventBatch", - "/eventListener/v2", - "/eventListener/v2/eventBatch", - "/eventListener/v3", - "/eventListener/v3/eventBatch", - "/eventListener/v4", - "/eventListener/v4/eventBatch", - "/eventListener/v5", - "/eventListener/v5/eventBatch", - "/eventListener/v7", - "/eventListener/v7/eventBatch"}, consumes = "application/json") - ResponseEntity receiveEvent(@RequestBody String jsonPayload, HttpServletRequest httpServletRequest) { - String request = httpServletRequest.getRequestURI(); - String version = extractVersion(request); - - JSONObject jsonObject; - try { - jsonObject = new JSONObject(jsonPayload); - } catch (Exception e) { - log.error(INVALID_JSON); - return ResponseEntity.badRequest().body(INVALID_JSON); - } - - String uuid = setUpECOMPLoggingForRequest(); - incomingRequestsLogger.info(String.format( - "Received a VESEvent '%s', marked with unique identifier '%s', on api version '%s', from host: '%s'", - jsonObject, uuid, version, httpServletRequest.getRemoteHost())); - - if (applicationSettings.jsonSchemaValidationEnabled()) { - if (isBatchRequest(request) && (jsonObject.has("eventList") && (!jsonObject.has("event")))) { - if (!conformsToSchema(jsonObject, version)) { - return errorResponse(ApiException.SCHEMA_VALIDATION_FAILED); - } - } else if (!isBatchRequest(request) && (!jsonObject.has("eventList") && (jsonObject.has("event")))) { - if (!conformsToSchema(jsonObject, version)) { - return errorResponse(ApiException.SCHEMA_VALIDATION_FAILED); - } - } else { - return errorResponse(ApiException.INVALID_JSON_INPUT); - } + @PostMapping(value = {"/eventListener/{version}"}, consumes = "application/json") + ResponseEntity event(@RequestBody String event, @PathVariable String version, HttpServletRequest request) { + if (settings.isVersionSupported(version)) { + return process(event, version, request, EVENT); } + return badRequest().contentType(MediaType.APPLICATION_JSON).body(String.format("API version %s is not supported", version)); + } - JSONArray commonlyFormatted = convertToJSONArrayCommonFormat(jsonObject, request, uuid, version); - if (!putEventsOnProcessingQueue(commonlyFormatted)) { - errorLog.error("EVENT_RECEIPT_FAILURE: QueueFull " + ApiException.NO_SERVER_RESOURCES); - return errorResponse(ApiException.NO_SERVER_RESOURCES); + @PostMapping(value = {"/eventListener/{version}/eventBatch"}, consumes = "application/json") + ResponseEntity events(@RequestBody String events, @PathVariable String version, HttpServletRequest request) { + if (settings.isVersionSupported(version)) { + return process(events, version, request, EVENT_LIST); } - return accepted() - .contentType(MediaType.APPLICATION_JSON) - .body("Accepted"); + return badRequest().contentType(MediaType.APPLICATION_JSON).body(String.format("API version %s is not supported", version)); } - private String extractVersion(String httpServletRequest) { - return httpServletRequest.split("/")[2]; - } + private ResponseEntity process(String events, String version, HttpServletRequest request, String type) { - private ResponseEntity errorResponse(ApiException noServerResources) { - return ResponseEntity.status(noServerResources.httpStatusCode) - .body(noServerResources.toJSON().toString()); - } + JSONObject jsonObject = new JSONObject(events); - private boolean putEventsOnProcessingQueue(JSONArray arrayOfEvents) { - for (int i = 0; i < arrayOfEvents.length(); i++) { - metricsLog.info("EVENT_PUBLISH_START"); - if (!inputQueue.offer((JSONObject) arrayOfEvents.get(i))) { - return false; - } - } - log.debug("CommonStartup.handleEvents:EVENTS has been published successfully!"); - metricsLog.info("EVENT_PUBLISH_END"); - return true; - } + EventValidator eventValidator = new EventValidator(settings); + Optional> validationResult = eventValidator.validate(jsonObject, type, version); - private boolean conformsToSchema(JSONObject payload, String version) { - try { - JsonSchema schema = applicationSettings.jsonSchema(version); - ProcessingReport report = schema.validate(JsonLoader.fromString(payload.toString())); - if (!report.isSuccess()) { - log.warn("Schema validation failed for event: " + payload); - stream(report.spliterator(), false).forEach(e -> log.warn(e.getMessage())); - return false; - } - return report.isSuccess(); - } catch (Exception e) { - throw new ApplicationException("Unable to validate against schema", e); + if (validationResult.isPresent()){ + return validationResult.get(); } + JSONArray arrayOfEvents = new EventUpdater(settings).convert(jsonObject,version, generateUUID(version, request.getRequestURI(), jsonObject), type); + eventSender.send(arrayOfEvents); + // TODO call service and return status, replace CambriaClient, split event to single object and list of them + return accepted().contentType(MediaType.APPLICATION_JSON).body("Accepted"); } - private static JSONArray convertToJSONArrayCommonFormat(JSONObject jsonObject, String request, - String uuid, String version) { - JSONArray asArrayEvents = new JSONArray(); - String vesUniqueIdKey = "VESuniqueId"; - String vesVersionKey = "VESversion"; - if (isBatchRequest(request)) { - JSONArray events = jsonObject.getJSONArray("eventList"); - for (int i = 0; i < events.length(); i++) { - JSONObject event = new JSONObject().put("event", events.getJSONObject(i)); - event.put(vesUniqueIdKey, uuid + "-" + i); - event.put(vesVersionKey, version); - asArrayEvents.put(event); - } - } else { - jsonObject.put(vesUniqueIdKey, uuid); - jsonObject.put(vesVersionKey, version); - asArrayEvents = new JSONArray().put(jsonObject); - } - return asArrayEvents; + private UUID generateUUID(String version, String uri, JSONObject jsonObject) { + UUID uuid = UUID.randomUUID(); + setUpECOMPLoggingForRequest(uuid); + requestLogger.info(String.format(VES_EVENT_MESSAGE, jsonObject, uuid, version, uri)); + return uuid; } - private static String setUpECOMPLoggingForRequest() { - final UUID uuid = UUID.randomUUID(); + private static void setUpECOMPLoggingForRequest(UUID uuid) { LoggingContext localLC = VESLogger.getLoggingContextForThread(uuid); localLC.put(EcompFields.kBeginTimestampMs, SaClock.now()); - return uuid.toString(); - } - - private static boolean isBatchRequest(String request) { - return request.contains("eventBatch"); } } \ No newline at end of file diff --git a/src/main/java/org/onap/dcae/restapi/WebMvcConfig.java b/src/main/java/org/onap/dcae/restapi/WebMvcConfig.java index 7059c4e5..c3e2a5de 100644 --- a/src/main/java/org/onap/dcae/restapi/WebMvcConfig.java +++ b/src/main/java/org/onap/dcae/restapi/WebMvcConfig.java @@ -52,5 +52,4 @@ public class WebMvcConfig extends WebMvcConfigurationSupport { resolver.setSuffix(".html"); return resolver; } - } diff --git a/src/test/java/org/onap/dcae/ApplicationSettingsTest.java b/src/test/java/org/onap/dcae/ApplicationSettingsTest.java index 60287aef..6b0023f8 100644 --- a/src/test/java/org/onap/dcae/ApplicationSettingsTest.java +++ b/src/test/java/org/onap/dcae/ApplicationSettingsTest.java @@ -234,25 +234,6 @@ public class ApplicationSettingsTest { assertEquals(sanitizePath("etc/DmaapConfig.json"), dmaapConfigFileLocation); } - @Test - public void shouldReturnMaximumAllowedQueuedEvents() throws IOException { - // when - int maximumAllowedQueuedEvents = fromTemporaryConfiguration("collector.inputQueue.maxPending=10000") - .maximumAllowedQueuedEvents(); - - // then - assertEquals(10000, maximumAllowedQueuedEvents); - } - - @Test - public void shouldReturnDefaultMaximumAllowedQueuedEvents() throws IOException { - // when - int maximumAllowedQueuedEvents = fromTemporaryConfiguration().maximumAllowedQueuedEvents(); - - // then - assertEquals(1024 * 4, maximumAllowedQueuedEvents); - } - @Test public void shouldTellIfSchemaValidationIsEnabled() throws IOException { // when diff --git a/src/test/java/org/onap/dcae/TLSTestBase.java b/src/test/java/org/onap/dcae/TLSTestBase.java index 4dada129..df10ead9 100644 --- a/src/test/java/org/onap/dcae/TLSTestBase.java +++ b/src/test/java/org/onap/dcae/TLSTestBase.java @@ -24,6 +24,7 @@ package org.onap.dcae; import org.json.JSONObject; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mockito; +import org.onap.dcae.common.EventSender; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.boot.test.mock.mockito.MockBean; @@ -69,8 +70,8 @@ public class TLSTestBase { protected abstract class TestClassBase { @MockBean - @Qualifier("inputQueue") - protected LinkedBlockingQueue queue; + @Qualifier("eventSender") + protected EventSender eventSender; @LocalServerPort private int port; diff --git a/src/test/java/org/onap/dcae/common/EventSenderTest.java b/src/test/java/org/onap/dcae/common/EventSenderTest.java index aba3c2a9..f49d3cd8 100644 --- a/src/test/java/org/onap/dcae/common/EventSenderTest.java +++ b/src/test/java/org/onap/dcae/common/EventSenderTest.java @@ -28,6 +28,7 @@ import static org.mockito.Mockito.when; import io.vavr.collection.HashMap; import io.vavr.collection.Map; +import org.json.JSONArray; import org.json.JSONObject; import org.junit.Test; import org.junit.runner.RunWith; @@ -39,7 +40,6 @@ import org.onap.dcae.common.publishing.EventPublisher; @RunWith(MockitoJUnitRunner.Silent.class) public class EventSenderTest { - private String event = "{\"VESversion\":\"v7\",\"VESuniqueId\":\"fd69d432-5cd5-4c15-9d34-407c81c61c6a-0\",\"event\":{\"commonEventHeader\":{\"startEpochMicrosec\":1544016106000000,\"eventId\":\"fault33\",\"timeZoneOffset\":\"UTC+00.00\",\"priority\":\"Normal\",\"version\":\"4.0.1\",\"nfVendorName\":\"Ericsson\",\"reportingEntityName\":\"1\",\"sequence\":1,\"domain\":\"fault\",\"lastEpochMicrosec\":1544016106000000,\"eventName\":\"Fault_KeyFileFault\",\"vesEventListenerVersion\":\"7.0.1\",\"sourceName\":\"1\"},\"faultFields\":{\"eventSeverity\":\"CRITICAL\",\"alarmCondition\":\"KeyFileFault\",\"faultFieldsVersion\":\"4.0\",\"eventCategory\":\"PROCESSINGERRORALARM\",\"specificProblem\":\"License Key File Fault_1\",\"alarmAdditionalInformation\":{\"probableCause\":\"ConfigurationOrCustomizationError\",\"additionalText\":\"test_1\",\"source\":\"ManagedElement=1,SystemFunctions=1,Lm=1\"},\"eventSourceType\":\"Lm\",\"vfStatus\":\"Active\"}}}\n"; @Mock @@ -54,7 +54,10 @@ public class EventSenderTest { public void shouldntSendEventWhenStreamIdsIsEmpty() { when(settings.dMaaPStreamsMapping()).thenReturn(HashMap.empty()); eventSender = new EventSender(eventPublisher, settings ); - eventSender.send(new JSONObject(event)); + JSONObject jsonObject = new JSONObject(event); + JSONArray jsonArray = new JSONArray(); + jsonArray.put(jsonObject); + eventSender.send(jsonArray); verify(eventPublisher,never()).sendEvent(any(),any()); } @@ -63,7 +66,10 @@ public class EventSenderTest { Map streams = HashMap.of("fault", new String[]{"ves-fault", "fault-ves"}); when(settings.dMaaPStreamsMapping()).thenReturn(streams); eventSender = new EventSender(eventPublisher, settings ); - eventSender.send(new JSONObject(event)); + JSONObject jsonObject = new JSONObject(event); + JSONArray jsonArray = new JSONArray(); + jsonArray.put(jsonObject); + eventSender.send(jsonArray); verify(eventPublisher, times(2)).sendEvent(any(),any()); } } \ No newline at end of file diff --git a/src/test/resources/controller-config_dmaap_ip.json b/src/test/resources/controller-config_dmaap_ip.json index 1cc6576b..f148db55 100644 --- a/src/test/resources/controller-config_dmaap_ip.json +++ b/src/test/resources/controller-config_dmaap_ip.json @@ -1,6 +1,5 @@ { "auth.method": "noAuth", - "collector.inputQueue.maxPending": 8096, "collector.schema.checkflag": 1, "collector.keystore.file.location": "/opt/app/dcae-certificate/keystore.jks", "tomcat.maxthreads": "200", diff --git a/src/test/resources/controller-config_singleline_ip.json b/src/test/resources/controller-config_singleline_ip.json index c3a8d067..a3974e0f 100644 --- a/src/test/resources/controller-config_singleline_ip.json +++ b/src/test/resources/controller-config_singleline_ip.json @@ -5,7 +5,6 @@ "tomcat.maxthreads": "200", "collector.dmaap.streamid": "fault=ves-fault|syslog=ves-syslog|heartbeat=ves-heartbeat|measurementsForVfScaling=ves-measurement|mobileFlow=ves-mobileflow|other=ves-other|stateChange=ves-statechange|thresholdCrossingAlert=ves-thresholdCrossingAlert|voiceQuality=ves-voicequality|sipSignaling=ves-sipsignaling", "streams_subscribes": {}, - "collector.inputQueue.maxPending": "8096", "streams_publishes": { "ves-mobileflow": { "type": "message_router", diff --git a/src/test/resources/test_collector_ip_op.properties b/src/test/resources/test_collector_ip_op.properties index 9450067a..0916211f 100644 --- a/src/test/resources/test_collector_ip_op.properties +++ b/src/test/resources/test_collector_ip_op.properties @@ -9,7 +9,6 @@ collector.dmaapfile=./etc/DmaapConfig.json auth.method=noAuth header.authlist=sample1,$2a$10$pgjaxDzSuc6XVFEeqvxQ5u90DKJnM/u7TJTcinAlFJVaavXMWf/Zi|userid1,$2a$10$61gNubgJJl9lh3nvQvY9X.x4e5ETWJJ7ao7ZhJEvmfJigov26Z6uq|userid2,$2a$10$G52y/3uhuhWAMy.bx9Se8uzWinmbJa.dlm1LW6bYPdPkkywLDPLiy event.transform.flag=1 -collector.inputQueue.maxPending = 8096 streams_subscribes = {} services_calls = {} tomcat.maxthreads = 200 -- cgit 1.2.3-korg