diff options
author | Zlatko Murgoski <zlatko.murgoski@nokia.com> | 2019-05-09 11:21:14 +0200 |
---|---|---|
committer | Zlatko Murgoski <zlatko.murgoski@nokia.com> | 2019-06-14 11:56:34 +0200 |
commit | 3c3c7ad09c02852cd0b4db03ecc9cc5c429cab08 (patch) | |
tree | 80221b2d825d878b6b6e860a5bae55f9317f9ec3 /src | |
parent | 2bff7994a2bf880694a4c967b488ce55f3911af2 (diff) |
VES Collector - Event Ordering
https://jira.onap.org/browse/DCAEGEN2-1483
Change-Id: I28b0e871ce570a3cf4c0d2e08d040b66eb6db3aa
Issue-ID: DCAEGEN2-1483
Signed-off-by: Zlatko Murgoski <zlatko.murgoski@nokia.com>
Diffstat (limited to 'src')
16 files changed, 348 insertions, 367 deletions
diff --git a/src/main/java/org/onap/dcae/ApplicationSettings.java b/src/main/java/org/onap/dcae/ApplicationSettings.java index 61bcf4b4..205659c4 100644 --- a/src/main/java/org/onap/dcae/ApplicationSettings.java +++ b/src/main/java/org/onap/dcae/ApplicationSettings.java @@ -84,38 +84,14 @@ public class ApplicationSettings { throw new ApplicationException(ex); } } - public void loadPropertiesFromFile() { - try { - properties.load(configurationFileLocation); - } catch (ConfigurationException ex) { - log.error("Cannot load properties cause:", ex); - throw new ApplicationException(ex); - } - } - public Map<String, String> validAuthorizationCredentials() { return prepareUsersMap(properties.getString("header.authlist", null)); } - private Map<String, String> prepareUsersMap(@Nullable String allowedUsers) { - return allowedUsers == null ? HashMap.empty() - : List.of(allowedUsers.split("\\|")) - .map(t->t.split(",")) - .toMap(t-> t[0].trim(), t -> t[1].trim()); - } - - private String findOutConfigurationFileLocation(Map<String, String> parsedArgs) { - return prependWithUserDirOnRelative(parsedArgs.get("c").getOrElse("etc/collector.properties")); - } - public Path configurationFileLocation() { return Paths.get(configurationFileLocation); } - public int maximumAllowedQueuedEvents() { - return properties.getInt("collector.inputQueue.maxPending", 1024 * 4); - } - public boolean jsonSchemaValidationEnabled() { return properties.getInt("collector.schema.checkflag", -1) > 0; } @@ -126,22 +102,8 @@ public class ApplicationSettings { .getOrElseThrow(() -> new IllegalStateException("No fallback schema present in application.")); } - private Map<String, JsonSchema> loadJsonSchemas() { - return jsonSchema().toMap().entrySet().stream() - .map(this::readSchemaForVersion) - .collect(HashMap.collector()); - } - - private Tuple2<String, JsonSchema> readSchemaForVersion(java.util.Map.Entry<String, Object> versionToFilePath) { - try { - String schemaContent = new String( - readAllBytes(Paths.get(versionToFilePath.getValue().toString()))); - JsonNode schemaNode = JsonLoader.fromString(schemaContent); - JsonSchema schema = JsonSchemaFactory.byDefault().getJsonSchema(schemaNode); - return Tuple(versionToFilePath.getKey(), schema); - } catch (IOException | ProcessingException e) { - throw new ApplicationException("Could not read schema from path: " + versionToFilePath.getValue(), e); - } + public boolean isVersionSupported(String version){ + return loadedJsonSchemas.containsKey(version); } public int httpPort() { @@ -183,6 +145,7 @@ public class ApplicationSettings { public String exceptionConfigFileLocation() { return properties.getString("exceptionConfig", null); } + public String dMaaPConfigurationFileLocation() { return prependWithUserDirOnRelative(properties.getString("collector.dmaapfile", "etc/DmaapConfig.json")); } @@ -204,7 +167,16 @@ public class ApplicationSettings { } } - public void addOrUpdate(String key, String value) { + private void loadPropertiesFromFile() { + try { + properties.load(configurationFileLocation); + } catch (ConfigurationException ex) { + log.error("Cannot load properties cause:", ex); + throw new ApplicationException(ex); + } + } + + private void addOrUpdate(String key, String value) { if (properties.containsKey(key)) { properties.setProperty(key, value); } else { @@ -212,6 +184,35 @@ public class ApplicationSettings { } } + private String findOutConfigurationFileLocation(Map<String, String> parsedArgs) { + return prependWithUserDirOnRelative(parsedArgs.get("c").getOrElse("etc/collector.properties")); + } + + private Map<String, JsonSchema> loadJsonSchemas() { + return jsonSchema().toMap().entrySet().stream() + .map(this::readSchemaForVersion) + .collect(HashMap.collector()); + } + + private Tuple2<String, JsonSchema> readSchemaForVersion(java.util.Map.Entry<String, Object> versionToFilePath) { + try { + String schemaContent = new String( + readAllBytes(Paths.get(versionToFilePath.getValue().toString()))); + JsonNode schemaNode = JsonLoader.fromString(schemaContent); + JsonSchema schema = JsonSchemaFactory.byDefault().getJsonSchema(schemaNode); + return Tuple(versionToFilePath.getKey(), schema); + } catch (IOException | ProcessingException e) { + throw new ApplicationException("Could not read schema from path: " + versionToFilePath.getValue(), e); + } + } + + private Map<String, String> prepareUsersMap(@Nullable String allowedUsers) { + return allowedUsers == null ? HashMap.empty() + : List.of(allowedUsers.split("\\|")) + .map(t->t.split(",")) + .toMap(t-> t[0].trim(), t -> t[1].trim()); + } + private JSONObject jsonSchema() { return new JSONObject(properties.getString("collector.schema.file", format("{\"%s\":\"etc/CommonEventFormat_28.4.1.json\"}", FALLBACK_VES_VERSION))); diff --git a/src/main/java/org/onap/dcae/VesApplication.java b/src/main/java/org/onap/dcae/VesApplication.java index d658b4aa..e3340820 100644 --- a/src/main/java/org/onap/dcae/VesApplication.java +++ b/src/main/java/org/onap/dcae/VesApplication.java @@ -22,14 +22,9 @@ package org.onap.dcae; import io.vavr.collection.Map; import java.nio.file.Paths; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import org.json.JSONObject; -import org.onap.dcae.common.EventProcessor; import org.onap.dcae.common.EventSender; import org.onap.dcae.common.publishing.DMaaPConfigurationParser; import org.onap.dcae.common.publishing.EventPublisher; @@ -49,21 +44,16 @@ import org.springframework.context.annotation.Lazy; @SpringBootApplication(exclude = {GsonAutoConfiguration.class, SecurityAutoConfiguration.class}) public class VesApplication { - private static final Logger metriclog = LoggerFactory.getLogger("com.att.ecomp.metrics"); private static final Logger incomingRequestsLogger = LoggerFactory.getLogger("org.onap.dcae.common.input"); private static final Logger oplog = LoggerFactory.getLogger("org.onap.dcae.common.output"); private static final Logger errorLog = LoggerFactory.getLogger("org.onap.dcae.common.error"); - private static final int MAX_THREADS = 20; - public static LinkedBlockingQueue<JSONObject> fProcessingInputQueue; private static ApplicationSettings properties; private static ConfigurableApplicationContext context; private static ConfigLoader configLoader; - private static EventProcessor eventProcessor; private static ScheduledThreadPoolExecutor scheduledThreadPoolExecutor; private static SpringApplication app; private static EventPublisher eventPublisher; private static ScheduledFuture<?> scheduleFeatures; - private static ExecutorService executor; public static void main(String[] args) { app = new SpringApplication(VesApplication.class); @@ -73,7 +63,6 @@ public class VesApplication { app.setAddCommandLineProperties(true); context = app.run(); configLoader.updateConfig(); - } public static void restartApplication() { @@ -89,7 +78,6 @@ public class VesApplication { } private static void init() { - fProcessingInputQueue = new LinkedBlockingQueue<>(properties.maximumAllowedQueuedEvents()); createConfigLoader(); createSchedulePoolExecutor(); createExecutors(); @@ -97,12 +85,6 @@ public class VesApplication { private static void createExecutors() { eventPublisher = EventPublisher.createPublisher(oplog, getDmapConfig()); - eventProcessor = new EventProcessor(new EventSender(eventPublisher, properties)); - - executor = Executors.newFixedThreadPool(MAX_THREADS); - for (int i = 0; i < MAX_THREADS; ++i) { - executor.execute(eventProcessor); - } } private static void createSchedulePoolExecutor() { @@ -142,20 +124,15 @@ public class VesApplication { } @Bean - @Qualifier("metricsLog") - public Logger incomingRequestsMetricsLogger() { - return metriclog; - } - - @Bean @Qualifier("errorLog") public Logger errorLogger() { return errorLog; } @Bean - public LinkedBlockingQueue<JSONObject> inputQueue() { - return fProcessingInputQueue; + @Qualifier("eventSender") + public EventSender eventSender() { + return new EventSender(eventPublisher,properties); } } diff --git a/src/main/java/org/onap/dcae/common/EventProcessor.java b/src/main/java/org/onap/dcae/common/EventProcessor.java deleted file mode 100644 index bf3bf70d..00000000 --- a/src/main/java/org/onap/dcae/common/EventProcessor.java +++ /dev/null @@ -1,61 +0,0 @@ -/*-
- * ============LICENSE_START=======================================================
- * PROJECT
- * ================================================================================
- * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.dcae.common;
-
-import com.att.nsa.clock.SaClock;
-import com.att.nsa.logging.LoggingContext;
-import com.att.nsa.logging.log4j.EcompFields;
-import org.json.JSONObject;
-import org.onap.dcae.VesApplication;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class EventProcessor implements Runnable {
-
- private static final Logger log = LoggerFactory.getLogger(EventProcessor.class);
- private EventSender eventSender;
-
- public EventProcessor(EventSender eventSender) {
- this.eventSender = eventSender;
- }
-
- @Override
- public void run() {
- try {
- while (true){
- JSONObject event = VesApplication.fProcessingInputQueue.take();
- log.info("QueueSize:" + VesApplication.fProcessingInputQueue.size() + "\tEventProcessor\tRemoving element: " + event);
- setLoggingContext(event);
- log.debug("event.VESuniqueId" + event.get("VESuniqueId") + "event.commonEventHeader.domain:" + eventSender.getDomain(event));
- eventSender.send(event);
- log.debug("Message published" + event);
- }
- } catch (InterruptedException e) {
- log.error("EventProcessor InterruptedException" + e.getMessage());
- Thread.currentThread().interrupt();
- }
- }
-
- private void setLoggingContext(JSONObject event) {
- LoggingContext localLC = VESLogger.getLoggingContextForThread(event.get("VESuniqueId").toString());
- localLC.put(EcompFields.kBeginTimestampMs, SaClock.now());
- }
-}
\ No newline at end of file diff --git a/src/main/java/org/onap/dcae/common/EventSender.java b/src/main/java/org/onap/dcae/common/EventSender.java index 48268d6c..c1002af6 100644 --- a/src/main/java/org/onap/dcae/common/EventSender.java +++ b/src/main/java/org/onap/dcae/common/EventSender.java @@ -20,17 +20,12 @@ */ package org.onap.dcae.common; -import com.google.common.reflect.TypeToken; -import com.google.gson.Gson; +import com.att.nsa.clock.SaClock; +import com.att.nsa.logging.LoggingContext; +import com.att.nsa.logging.log4j.EcompFields; import io.vavr.collection.Map; -import java.io.FileReader; -import java.io.IOException; -import java.lang.reflect.Type; -import java.text.SimpleDateFormat; -import java.util.Date; -import java.util.List; +import org.json.JSONArray; import org.json.JSONObject; -import org.onap.dcae.ApplicationException; import org.onap.dcae.ApplicationSettings; import org.onap.dcae.common.publishing.EventPublisher; import org.slf4j.Logger; @@ -38,88 +33,47 @@ import org.slf4j.LoggerFactory; public class EventSender { - private static final String COULD_NOT_FIND_FILE = "Couldn't find file ./etc/eventTransform.json"; + private static final Logger metriclog = LoggerFactory.getLogger("com.att.ecomp.metrics"); private Map<String, String[]> streamidHash; - private ApplicationSettings properties; private EventPublisher eventPublisher; - - private static final Type EVENT_LIST_TYPE = new TypeToken<List<Event>>() {}.getType(); + private static final String VES_UNIQUE_ID = "VESuniqueId"; private static final Logger log = LoggerFactory.getLogger(EventSender.class); private static final String EVENT_LITERAL = "event"; private static final String COMMON_EVENT_HEADER = "commonEventHeader"; - private final SimpleDateFormat dateFormat = new SimpleDateFormat("EEE, MM dd yyyy hh:mm:ss z"); public EventSender( EventPublisher eventPublisher, ApplicationSettings properties) { this.eventPublisher = eventPublisher; this.streamidHash = properties.dMaaPStreamsMapping(); - this.properties = properties; - } - public void send(JSONObject event) { - streamidHash.get(getDomain(event)) - .onEmpty(() -> log.error("No StreamID defined for publish - Message dropped" + event)) - .forEach(streamIds -> sendEventsToStreams(event, streamIds)); + public void send(JSONArray arrayOfEvents) { + for (int i = 0; i < arrayOfEvents.length(); i++) { + metriclog.info("EVENT_PUBLISH_START"); + JSONObject object = (JSONObject) arrayOfEvents.get(i); + setLoggingContext(object); + streamidHash.get(getDomain(object)) + .onEmpty(() -> log.error("No StreamID defined for publish - Message dropped" + object)) + .forEach(streamIds -> sendEventsToStreams(object, streamIds)); + log.debug("Message published" + object); + } + log.debug("CommonStartup.handleEvents:EVENTS has been published successfully!"); + metriclog.info("EVENT_PUBLISH_END"); } - public static String getDomain(JSONObject event) { + private static String getDomain(JSONObject event) { return event.getJSONObject(EVENT_LITERAL).getJSONObject(COMMON_EVENT_HEADER).getString("domain"); } private void sendEventsToStreams(JSONObject event, String[] streamIdList) { for (String aStreamIdList : streamIdList) { log.info("Invoking publisher for streamId:" + aStreamIdList); - eventPublisher.sendEvent(overrideEvent(event), aStreamIdList); + eventPublisher.sendEvent(event, aStreamIdList); } } - private JSONObject overrideEvent(JSONObject event) { - JSONObject jsonObject = addCurrentTimeToEvent(event); - if (properties.eventTransformingEnabled()) { - try (FileReader fr = new FileReader("./etc/eventTransform.json")) { - log.info("parse eventTransform.json"); - List<Event> events = new Gson().fromJson(fr, EVENT_LIST_TYPE); - parseEventsJson(events, new ConfigProcessorAdapter(new ConfigProcessors(jsonObject))); - } catch (IOException e) { - log.error(COULD_NOT_FIND_FILE, e); - throw new ApplicationException(COULD_NOT_FIND_FILE, e); - } - } - if (jsonObject.has("VESversion")) - jsonObject.remove("VESversion"); - - log.debug("Modified event:" + jsonObject); - return jsonObject; - } - - private JSONObject addCurrentTimeToEvent(JSONObject event) { - final Date currentTime = new Date(); - JSONObject collectorTimeStamp = new JSONObject().put("collectorTimeStamp", dateFormat.format(currentTime)); - JSONObject commonEventHeaderkey = event.getJSONObject(EVENT_LITERAL).getJSONObject(COMMON_EVENT_HEADER); - commonEventHeaderkey.put("internalHeaderFields", collectorTimeStamp); - event.getJSONObject(EVENT_LITERAL).put(COMMON_EVENT_HEADER, commonEventHeaderkey); - return event; - } - - private void parseEventsJson(List<Event> eventsTransform, ConfigProcessorAdapter configProcessorAdapter) { - for (Event eventTransform : eventsTransform) { - JSONObject filterObj = new JSONObject(eventTransform.filter.toString()); - if (configProcessorAdapter.isFilterMet(filterObj)) { - callProcessorsMethod(configProcessorAdapter, eventTransform.processors); - } - } - } - - private void callProcessorsMethod(ConfigProcessorAdapter configProcessorAdapter, List<Processor> processors) { - for (Processor processor : processors) { - final String functionName = processor.functionName; - final JSONObject args = new JSONObject(processor.args.toString()); - log.info(String.format("functionName==%s | args==%s", functionName, args)); - try { - configProcessorAdapter.runConfigProcessorFunctionByName(functionName, args); - } catch (ReflectiveOperationException e) { - log.error("EventProcessor Exception" + e.getMessage() + e + e.getCause()); - } - } + private void setLoggingContext(JSONObject event) { + LoggingContext localLC = VESLogger.getLoggingContextForThread(event.get(VES_UNIQUE_ID).toString()); + localLC.put(EcompFields.kBeginTimestampMs, SaClock.now()); + log.debug("event.VESuniqueId" + event.get(VES_UNIQUE_ID) + "event.commonEventHeader.domain:" + getDomain(event)); } } diff --git a/src/main/java/org/onap/dcae/common/EventUpdater.java b/src/main/java/org/onap/dcae/common/EventUpdater.java new file mode 100644 index 00000000..1caa4f18 --- /dev/null +++ b/src/main/java/org/onap/dcae/common/EventUpdater.java @@ -0,0 +1,137 @@ +/* + * ============LICENSE_START======================================================= + * PROJECT + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * Copyright (C) 2019 Nokia. All rights reserved.s + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.dcae.common; + +import com.google.common.reflect.TypeToken; +import com.google.gson.Gson; +import java.io.FileReader; +import java.io.IOException; +import java.lang.reflect.Type; +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.List; +import java.util.UUID; +import org.json.JSONArray; +import org.json.JSONObject; +import org.onap.dcae.ApplicationException; +import org.onap.dcae.ApplicationSettings; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class EventUpdater { + + private static final String EVENT_LIST = "eventList"; + private static final String EVENT = "event"; + private static final String VES_UNIQUE_ID = "VESuniqueId"; + private static final String VES_VERSION = "VESversion"; + private static final String COULD_NOT_FIND_FILE = "Couldn't find file ./etc/eventTransform.json"; + private static final Type EVENT_LIST_TYPE = new TypeToken<List<Event>>() {}.getType(); + private static final Logger log = LoggerFactory.getLogger(EventSender.class); + private static final String EVENT_LITERAL = "event"; + private static final String COMMON_EVENT_HEADER = "commonEventHeader"; + private static final String EVENT_TRANSFORM = "./etc/eventTransform.json"; + private ApplicationSettings settings; + private final SimpleDateFormat dateFormat = new SimpleDateFormat("EEE, MM dd yyyy hh:mm:ss z"); + + public EventUpdater(ApplicationSettings settings) { + this.settings = settings; + } + + public JSONArray convert(JSONObject jsonObject, String version, UUID uuid, String type){ + if(type.equalsIgnoreCase(EVENT_LIST)){ + return convertEvents(jsonObject, uuid.toString(), version); + } + else { + return convertEvent(jsonObject, uuid.toString(), version); + } + } + + private JSONArray convertEvents(JSONObject jsonObject, + String uuid, String version) { + JSONArray asArrayEvents = new JSONArray(); + + JSONArray events = jsonObject.getJSONArray(EVENT_LIST); + for (int i = 0; i < events.length(); i++) { + JSONObject event = new JSONObject().put(EVENT, events.getJSONObject(i)); + event.put(VES_UNIQUE_ID, uuid + "-" + i); + event.put(VES_VERSION, version); + asArrayEvents.put(overrideEvent(event)); + } + return asArrayEvents; + } + + private JSONArray convertEvent(JSONObject jsonObject, String uuid, String version) { + jsonObject.put(VES_UNIQUE_ID, uuid); + jsonObject.put(VES_VERSION, version); + return new JSONArray().put(overrideEvent(jsonObject)); + } + + private JSONObject overrideEvent(JSONObject event) { + JSONObject jsonObject = addCurrentTimeToEvent(event); + if (settings.eventTransformingEnabled()) { + try (FileReader fr = new FileReader(EVENT_TRANSFORM)) { + log.info("parse " + EVENT_TRANSFORM + " file"); + List<Event> events = new Gson().fromJson(fr, EVENT_LIST_TYPE); + parseEventsJson(events, new ConfigProcessorAdapter(new ConfigProcessors(jsonObject))); + } catch (IOException e) { + log.error(COULD_NOT_FIND_FILE, e); + throw new ApplicationException(COULD_NOT_FIND_FILE, e); + } + } + if (jsonObject.has(VES_VERSION)) + jsonObject.remove(VES_VERSION); + log.debug("Modified event:" + jsonObject); + return jsonObject; + } + + private JSONObject addCurrentTimeToEvent(JSONObject event) { + final Date currentTime = new Date(); + JSONObject collectorTimeStamp = new JSONObject().put("collectorTimeStamp", dateFormat.format(currentTime)); + JSONObject commonEventHeaderkey = event.getJSONObject(EVENT_LITERAL).getJSONObject(COMMON_EVENT_HEADER); + commonEventHeaderkey.put("internalHeaderFields", collectorTimeStamp); + event.getJSONObject(EVENT_LITERAL).put(COMMON_EVENT_HEADER, commonEventHeaderkey); + return event; + } + + private void parseEventsJson(List<Event> eventsTransform, ConfigProcessorAdapter configProcessorAdapter) { + for (Event eventTransform : eventsTransform) { + JSONObject filterObj = new JSONObject(eventTransform.filter.toString()); + if (configProcessorAdapter.isFilterMet(filterObj)) { + callProcessorsMethod(configProcessorAdapter, eventTransform.processors); + } + } + } + + private void callProcessorsMethod(ConfigProcessorAdapter configProcessorAdapter, List<Processor> processors) { + for (Processor processor : processors) { + //TODO try to remove refection + final String functionName = processor.functionName; + final JSONObject args = new JSONObject(processor.args.toString()); + log.info(String.format("functionName==%s | args==%s", functionName, args)); + try { + configProcessorAdapter.runConfigProcessorFunctionByName(functionName, args); + } catch (ReflectiveOperationException e) { + log.error("EventProcessor Exception" + e.getMessage() + e + e.getCause()); + } + } + } +} diff --git a/src/main/java/org/onap/dcae/restapi/EventValidator.java b/src/main/java/org/onap/dcae/restapi/EventValidator.java new file mode 100644 index 00000000..f119b507 --- /dev/null +++ b/src/main/java/org/onap/dcae/restapi/EventValidator.java @@ -0,0 +1,78 @@ +/* + * ============LICENSE_START======================================================= + * PROJECT + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * Copyright (C) 2019 Nokia. All rights reserved.s + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.restapi; + +import static java.util.stream.StreamSupport.stream; + +import com.github.fge.jackson.JsonLoader; +import com.github.fge.jsonschema.core.report.ProcessingReport; +import com.github.fge.jsonschema.main.JsonSchema; +import java.util.Optional; +import org.json.JSONObject; +import org.onap.dcae.ApplicationException; +import org.onap.dcae.ApplicationSettings; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.http.ResponseEntity; + +public class EventValidator { + + private static final Logger log = LoggerFactory.getLogger(EventValidator.class); + + private ApplicationSettings applicationSettings; + + public EventValidator(ApplicationSettings applicationSettings) { + this.applicationSettings = applicationSettings; + } + + public Optional<ResponseEntity<String>> validate(JSONObject jsonObject, String type, String version){ + if (applicationSettings.jsonSchemaValidationEnabled()) { + if (jsonObject.has(type)) { + if (!conformsToSchema(jsonObject, version)) { + return errorResponse(ApiException.SCHEMA_VALIDATION_FAILED); + } + } else { + return errorResponse(ApiException.INVALID_JSON_INPUT); + } + } + return Optional.empty(); + } + + private boolean conformsToSchema(JSONObject payload, String version) { + try { + JsonSchema schema = applicationSettings.jsonSchema(version); + ProcessingReport report = schema.validate(JsonLoader.fromString(payload.toString())); + if (report.isSuccess()) { + return true; + } + log.warn("Schema validation failed for event: " + payload); + stream(report.spliterator(), false).forEach(e -> log.warn(e.getMessage())); + return false; + } catch (Exception e) { + throw new ApplicationException("Unable to validate against schema", e); + } + } + + private Optional<ResponseEntity<String>> errorResponse(ApiException noServerResources) { + return Optional.of(ResponseEntity.status(noServerResources.httpStatusCode) + .body(noServerResources.toJSON().toString())); + } +} diff --git a/src/main/java/org/onap/dcae/restapi/HealthCheckController.java b/src/main/java/org/onap/dcae/restapi/HealthCheckController.java index 9c65619c..77c6802a 100644 --- a/src/main/java/org/onap/dcae/restapi/HealthCheckController.java +++ b/src/main/java/org/onap/dcae/restapi/HealthCheckController.java @@ -21,16 +21,20 @@ package org.onap.dcae.restapi; -import org.springframework.stereotype.Controller; import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RestController; - -@Controller +@RestController public class HealthCheckController { + @GetMapping("/") + public String main() { + return "Welcome to VESCollector"; + } + @GetMapping("/healthcheck") public String healthCheck() { - return "hello"; + return "I'm good"; } } diff --git a/src/main/java/org/onap/dcae/restapi/SwaggerConfig.java b/src/main/java/org/onap/dcae/restapi/SwaggerConfig.java index 60740a80..267db054 100644 --- a/src/main/java/org/onap/dcae/restapi/SwaggerConfig.java +++ b/src/main/java/org/onap/dcae/restapi/SwaggerConfig.java @@ -31,6 +31,7 @@ import springfox.documentation.swagger2.annotations.EnableSwagger2; @Configuration @EnableSwagger2 public class SwaggerConfig{ + @Bean public Docket api() { return new Docket(DocumentationType.SWAGGER_2) @@ -39,5 +40,4 @@ public class SwaggerConfig{ .paths(PathSelectors.any()) .build(); } - } diff --git a/src/main/java/org/onap/dcae/restapi/VesRestController.java b/src/main/java/org/onap/dcae/restapi/VesRestController.java index 3102c31c..b18eb7bc 100644 --- a/src/main/java/org/onap/dcae/restapi/VesRestController.java +++ b/src/main/java/org/onap/dcae/restapi/VesRestController.java @@ -21,32 +21,27 @@ package org.onap.dcae.restapi; -import static java.util.stream.StreamSupport.stream; import static org.springframework.http.ResponseEntity.accepted; +import static org.springframework.http.ResponseEntity.badRequest; import com.att.nsa.clock.SaClock; import com.att.nsa.logging.LoggingContext; import com.att.nsa.logging.log4j.EcompFields; -import com.github.fge.jackson.JsonLoader; -import com.github.fge.jsonschema.core.report.ProcessingReport; -import com.github.fge.jsonschema.main.JsonSchema; - +import java.util.Optional; import java.util.UUID; -import java.util.concurrent.LinkedBlockingQueue; import javax.servlet.http.HttpServletRequest; - import org.json.JSONArray; import org.json.JSONObject; -import org.onap.dcae.ApplicationException; import org.onap.dcae.ApplicationSettings; +import org.onap.dcae.common.EventSender; import org.onap.dcae.common.VESLogger; +import org.onap.dcae.common.EventUpdater; import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.http.MediaType; import org.springframework.http.ResponseEntity; -import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RestController; @@ -54,152 +49,64 @@ import org.springframework.web.bind.annotation.RestController; @RestController public class VesRestController { - private static final Logger log = LoggerFactory.getLogger(VesRestController.class); - private static final String INVALID_JSON = ApiException.INVALID_JSON_INPUT.toJSON().toString(); - private final ApplicationSettings applicationSettings; - private final LinkedBlockingQueue<JSONObject> inputQueue; - private final Logger metricsLog; - private final Logger errorLog; - private final Logger incomingRequestsLogger; + private static final String VES_EVENT_MESSAGE = "Received a VESEvent '%s', marked with unique identifier '%s', on api version '%s', from host: '%s'"; + private static final String EVENT_LIST = "eventList"; + private static final String EVENT = "event"; + private final ApplicationSettings settings; + private final Logger requestLogger; + private EventSender eventSender; @Autowired - VesRestController(ApplicationSettings applicationSettings, - @Qualifier("metricsLog") Logger metricsLog, - @Qualifier("errorLog") Logger errorLog, + VesRestController(ApplicationSettings settings, @Qualifier("incomingRequestsLogger") Logger incomingRequestsLogger, - @Qualifier("inputQueue") LinkedBlockingQueue<JSONObject> inputQueue) { - this.applicationSettings = applicationSettings; - this.metricsLog = metricsLog; - this.errorLog = errorLog; - this.incomingRequestsLogger = incomingRequestsLogger; - this.inputQueue = inputQueue; - } - - @GetMapping("/") - String mainPage() { - return "Welcome to VESCollector"; + @Qualifier("eventSender") EventSender eventSender) { + this.settings = settings; + this.requestLogger = incomingRequestsLogger; + this.eventSender = eventSender; } - //refactor in next iteration - @PostMapping(value = {"/eventListener/v1", - "/eventListener/v1/eventBatch", - "/eventListener/v2", - "/eventListener/v2/eventBatch", - "/eventListener/v3", - "/eventListener/v3/eventBatch", - "/eventListener/v4", - "/eventListener/v4/eventBatch", - "/eventListener/v5", - "/eventListener/v5/eventBatch", - "/eventListener/v7", - "/eventListener/v7/eventBatch"}, consumes = "application/json") - ResponseEntity<String> receiveEvent(@RequestBody String jsonPayload, HttpServletRequest httpServletRequest) { - String request = httpServletRequest.getRequestURI(); - String version = extractVersion(request); - - JSONObject jsonObject; - try { - jsonObject = new JSONObject(jsonPayload); - } catch (Exception e) { - log.error(INVALID_JSON); - return ResponseEntity.badRequest().body(INVALID_JSON); - } - - String uuid = setUpECOMPLoggingForRequest(); - incomingRequestsLogger.info(String.format( - "Received a VESEvent '%s', marked with unique identifier '%s', on api version '%s', from host: '%s'", - jsonObject, uuid, version, httpServletRequest.getRemoteHost())); - - if (applicationSettings.jsonSchemaValidationEnabled()) { - if (isBatchRequest(request) && (jsonObject.has("eventList") && (!jsonObject.has("event")))) { - if (!conformsToSchema(jsonObject, version)) { - return errorResponse(ApiException.SCHEMA_VALIDATION_FAILED); - } - } else if (!isBatchRequest(request) && (!jsonObject.has("eventList") && (jsonObject.has("event")))) { - if (!conformsToSchema(jsonObject, version)) { - return errorResponse(ApiException.SCHEMA_VALIDATION_FAILED); - } - } else { - return errorResponse(ApiException.INVALID_JSON_INPUT); - } + @PostMapping(value = {"/eventListener/{version}"}, consumes = "application/json") + ResponseEntity<String> event(@RequestBody String event, @PathVariable String version, HttpServletRequest request) { + if (settings.isVersionSupported(version)) { + return process(event, version, request, EVENT); } + return badRequest().contentType(MediaType.APPLICATION_JSON).body(String.format("API version %s is not supported", version)); + } - JSONArray commonlyFormatted = convertToJSONArrayCommonFormat(jsonObject, request, uuid, version); - if (!putEventsOnProcessingQueue(commonlyFormatted)) { - errorLog.error("EVENT_RECEIPT_FAILURE: QueueFull " + ApiException.NO_SERVER_RESOURCES); - return errorResponse(ApiException.NO_SERVER_RESOURCES); + @PostMapping(value = {"/eventListener/{version}/eventBatch"}, consumes = "application/json") + ResponseEntity<String> events(@RequestBody String events, @PathVariable String version, HttpServletRequest request) { + if (settings.isVersionSupported(version)) { + return process(events, version, request, EVENT_LIST); } - return accepted() - .contentType(MediaType.APPLICATION_JSON) - .body("Accepted"); + return badRequest().contentType(MediaType.APPLICATION_JSON).body(String.format("API version %s is not supported", version)); } - private String extractVersion(String httpServletRequest) { - return httpServletRequest.split("/")[2]; - } + private ResponseEntity<String> process(String events, String version, HttpServletRequest request, String type) { - private ResponseEntity<String> errorResponse(ApiException noServerResources) { - return ResponseEntity.status(noServerResources.httpStatusCode) - .body(noServerResources.toJSON().toString()); - } + JSONObject jsonObject = new JSONObject(events); - private boolean putEventsOnProcessingQueue(JSONArray arrayOfEvents) { - for (int i = 0; i < arrayOfEvents.length(); i++) { - metricsLog.info("EVENT_PUBLISH_START"); - if (!inputQueue.offer((JSONObject) arrayOfEvents.get(i))) { - return false; - } - } - log.debug("CommonStartup.handleEvents:EVENTS has been published successfully!"); - metricsLog.info("EVENT_PUBLISH_END"); - return true; - } + EventValidator eventValidator = new EventValidator(settings); + Optional<ResponseEntity<String>> validationResult = eventValidator.validate(jsonObject, type, version); - private boolean conformsToSchema(JSONObject payload, String version) { - try { - JsonSchema schema = applicationSettings.jsonSchema(version); - ProcessingReport report = schema.validate(JsonLoader.fromString(payload.toString())); - if (!report.isSuccess()) { - log.warn("Schema validation failed for event: " + payload); - stream(report.spliterator(), false).forEach(e -> log.warn(e.getMessage())); - return false; - } - return report.isSuccess(); - } catch (Exception e) { - throw new ApplicationException("Unable to validate against schema", e); + if (validationResult.isPresent()){ + return validationResult.get(); } + JSONArray arrayOfEvents = new EventUpdater(settings).convert(jsonObject,version, generateUUID(version, request.getRequestURI(), jsonObject), type); + eventSender.send(arrayOfEvents); + // TODO call service and return status, replace CambriaClient, split event to single object and list of them + return accepted().contentType(MediaType.APPLICATION_JSON).body("Accepted"); } - private static JSONArray convertToJSONArrayCommonFormat(JSONObject jsonObject, String request, - String uuid, String version) { - JSONArray asArrayEvents = new JSONArray(); - String vesUniqueIdKey = "VESuniqueId"; - String vesVersionKey = "VESversion"; - if (isBatchRequest(request)) { - JSONArray events = jsonObject.getJSONArray("eventList"); - for (int i = 0; i < events.length(); i++) { - JSONObject event = new JSONObject().put("event", events.getJSONObject(i)); - event.put(vesUniqueIdKey, uuid + "-" + i); - event.put(vesVersionKey, version); - asArrayEvents.put(event); - } - } else { - jsonObject.put(vesUniqueIdKey, uuid); - jsonObject.put(vesVersionKey, version); - asArrayEvents = new JSONArray().put(jsonObject); - } - return asArrayEvents; + private UUID generateUUID(String version, String uri, JSONObject jsonObject) { + UUID uuid = UUID.randomUUID(); + setUpECOMPLoggingForRequest(uuid); + requestLogger.info(String.format(VES_EVENT_MESSAGE, jsonObject, uuid, version, uri)); + return uuid; } - private static String setUpECOMPLoggingForRequest() { - final UUID uuid = UUID.randomUUID(); + private static void setUpECOMPLoggingForRequest(UUID uuid) { LoggingContext localLC = VESLogger.getLoggingContextForThread(uuid); localLC.put(EcompFields.kBeginTimestampMs, SaClock.now()); - return uuid.toString(); - } - - private static boolean isBatchRequest(String request) { - return request.contains("eventBatch"); } }
\ No newline at end of file diff --git a/src/main/java/org/onap/dcae/restapi/WebMvcConfig.java b/src/main/java/org/onap/dcae/restapi/WebMvcConfig.java index 7059c4e5..c3e2a5de 100644 --- a/src/main/java/org/onap/dcae/restapi/WebMvcConfig.java +++ b/src/main/java/org/onap/dcae/restapi/WebMvcConfig.java @@ -52,5 +52,4 @@ public class WebMvcConfig extends WebMvcConfigurationSupport { resolver.setSuffix(".html"); return resolver; } - } diff --git a/src/test/java/org/onap/dcae/ApplicationSettingsTest.java b/src/test/java/org/onap/dcae/ApplicationSettingsTest.java index 60287aef..6b0023f8 100644 --- a/src/test/java/org/onap/dcae/ApplicationSettingsTest.java +++ b/src/test/java/org/onap/dcae/ApplicationSettingsTest.java @@ -235,25 +235,6 @@ public class ApplicationSettingsTest { } @Test - public void shouldReturnMaximumAllowedQueuedEvents() throws IOException { - // when - int maximumAllowedQueuedEvents = fromTemporaryConfiguration("collector.inputQueue.maxPending=10000") - .maximumAllowedQueuedEvents(); - - // then - assertEquals(10000, maximumAllowedQueuedEvents); - } - - @Test - public void shouldReturnDefaultMaximumAllowedQueuedEvents() throws IOException { - // when - int maximumAllowedQueuedEvents = fromTemporaryConfiguration().maximumAllowedQueuedEvents(); - - // then - assertEquals(1024 * 4, maximumAllowedQueuedEvents); - } - - @Test public void shouldTellIfSchemaValidationIsEnabled() throws IOException { // when boolean jsonSchemaValidationEnabled = fromTemporaryConfiguration("collector.schema.checkflag=1") diff --git a/src/test/java/org/onap/dcae/TLSTestBase.java b/src/test/java/org/onap/dcae/TLSTestBase.java index 4dada129..df10ead9 100644 --- a/src/test/java/org/onap/dcae/TLSTestBase.java +++ b/src/test/java/org/onap/dcae/TLSTestBase.java @@ -24,6 +24,7 @@ package org.onap.dcae; import org.json.JSONObject; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mockito; +import org.onap.dcae.common.EventSender; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.boot.test.mock.mockito.MockBean; @@ -69,8 +70,8 @@ public class TLSTestBase { protected abstract class TestClassBase { @MockBean - @Qualifier("inputQueue") - protected LinkedBlockingQueue<JSONObject> queue; + @Qualifier("eventSender") + protected EventSender eventSender; @LocalServerPort private int port; diff --git a/src/test/java/org/onap/dcae/common/EventSenderTest.java b/src/test/java/org/onap/dcae/common/EventSenderTest.java index aba3c2a9..f49d3cd8 100644 --- a/src/test/java/org/onap/dcae/common/EventSenderTest.java +++ b/src/test/java/org/onap/dcae/common/EventSenderTest.java @@ -28,6 +28,7 @@ import static org.mockito.Mockito.when; import io.vavr.collection.HashMap; import io.vavr.collection.Map; +import org.json.JSONArray; import org.json.JSONObject; import org.junit.Test; import org.junit.runner.RunWith; @@ -39,7 +40,6 @@ import org.onap.dcae.common.publishing.EventPublisher; @RunWith(MockitoJUnitRunner.Silent.class) public class EventSenderTest { - private String event = "{\"VESversion\":\"v7\",\"VESuniqueId\":\"fd69d432-5cd5-4c15-9d34-407c81c61c6a-0\",\"event\":{\"commonEventHeader\":{\"startEpochMicrosec\":1544016106000000,\"eventId\":\"fault33\",\"timeZoneOffset\":\"UTC+00.00\",\"priority\":\"Normal\",\"version\":\"4.0.1\",\"nfVendorName\":\"Ericsson\",\"reportingEntityName\":\"1\",\"sequence\":1,\"domain\":\"fault\",\"lastEpochMicrosec\":1544016106000000,\"eventName\":\"Fault_KeyFileFault\",\"vesEventListenerVersion\":\"7.0.1\",\"sourceName\":\"1\"},\"faultFields\":{\"eventSeverity\":\"CRITICAL\",\"alarmCondition\":\"KeyFileFault\",\"faultFieldsVersion\":\"4.0\",\"eventCategory\":\"PROCESSINGERRORALARM\",\"specificProblem\":\"License Key File Fault_1\",\"alarmAdditionalInformation\":{\"probableCause\":\"ConfigurationOrCustomizationError\",\"additionalText\":\"test_1\",\"source\":\"ManagedElement=1,SystemFunctions=1,Lm=1\"},\"eventSourceType\":\"Lm\",\"vfStatus\":\"Active\"}}}\n"; @Mock @@ -54,7 +54,10 @@ public class EventSenderTest { public void shouldntSendEventWhenStreamIdsIsEmpty() { when(settings.dMaaPStreamsMapping()).thenReturn(HashMap.empty()); eventSender = new EventSender(eventPublisher, settings ); - eventSender.send(new JSONObject(event)); + JSONObject jsonObject = new JSONObject(event); + JSONArray jsonArray = new JSONArray(); + jsonArray.put(jsonObject); + eventSender.send(jsonArray); verify(eventPublisher,never()).sendEvent(any(),any()); } @@ -63,7 +66,10 @@ public class EventSenderTest { Map<String, String[]> streams = HashMap.of("fault", new String[]{"ves-fault", "fault-ves"}); when(settings.dMaaPStreamsMapping()).thenReturn(streams); eventSender = new EventSender(eventPublisher, settings ); - eventSender.send(new JSONObject(event)); + JSONObject jsonObject = new JSONObject(event); + JSONArray jsonArray = new JSONArray(); + jsonArray.put(jsonObject); + eventSender.send(jsonArray); verify(eventPublisher, times(2)).sendEvent(any(),any()); } }
\ No newline at end of file diff --git a/src/test/resources/controller-config_dmaap_ip.json b/src/test/resources/controller-config_dmaap_ip.json index 1cc6576b..f148db55 100644 --- a/src/test/resources/controller-config_dmaap_ip.json +++ b/src/test/resources/controller-config_dmaap_ip.json @@ -1,6 +1,5 @@ { "auth.method": "noAuth", - "collector.inputQueue.maxPending": 8096, "collector.schema.checkflag": 1, "collector.keystore.file.location": "/opt/app/dcae-certificate/keystore.jks", "tomcat.maxthreads": "200", diff --git a/src/test/resources/controller-config_singleline_ip.json b/src/test/resources/controller-config_singleline_ip.json index c3a8d067..a3974e0f 100644 --- a/src/test/resources/controller-config_singleline_ip.json +++ b/src/test/resources/controller-config_singleline_ip.json @@ -5,7 +5,6 @@ "tomcat.maxthreads": "200", "collector.dmaap.streamid": "fault=ves-fault|syslog=ves-syslog|heartbeat=ves-heartbeat|measurementsForVfScaling=ves-measurement|mobileFlow=ves-mobileflow|other=ves-other|stateChange=ves-statechange|thresholdCrossingAlert=ves-thresholdCrossingAlert|voiceQuality=ves-voicequality|sipSignaling=ves-sipsignaling", "streams_subscribes": {}, - "collector.inputQueue.maxPending": "8096", "streams_publishes": { "ves-mobileflow": { "type": "message_router", diff --git a/src/test/resources/test_collector_ip_op.properties b/src/test/resources/test_collector_ip_op.properties index 9450067a..0916211f 100644 --- a/src/test/resources/test_collector_ip_op.properties +++ b/src/test/resources/test_collector_ip_op.properties @@ -9,7 +9,6 @@ collector.dmaapfile=./etc/DmaapConfig.json auth.method=noAuth header.authlist=sample1,$2a$10$pgjaxDzSuc6XVFEeqvxQ5u90DKJnM/u7TJTcinAlFJVaavXMWf/Zi|userid1,$2a$10$61gNubgJJl9lh3nvQvY9X.x4e5ETWJJ7ao7ZhJEvmfJigov26Z6uq|userid2,$2a$10$G52y/3uhuhWAMy.bx9Se8uzWinmbJa.dlm1LW6bYPdPkkywLDPLiy event.transform.flag=1 -collector.inputQueue.maxPending = 8096 streams_subscribes = {} services_calls = {} tomcat.maxthreads = 200 |