From 3c3c7ad09c02852cd0b4db03ecc9cc5c429cab08 Mon Sep 17 00:00:00 2001 From: Zlatko Murgoski Date: Thu, 9 May 2019 11:21:14 +0200 Subject: VES Collector - Event Ordering https://jira.onap.org/browse/DCAEGEN2-1483 Change-Id: I28b0e871ce570a3cf4c0d2e08d040b66eb6db3aa Issue-ID: DCAEGEN2-1483 Signed-off-by: Zlatko Murgoski --- dpo/blueprint/blueprint_ves.yaml | 387 ++++++++++++--------- dpo/data-formats/ConsulConfig.json | 1 - dpo/spec/vescollector-componentspec.json | 15 +- dpo/tosca_model/schema.yaml | 2 - dpo/tosca_model/template.yaml | 1 - dpo/tosca_model/translate.yaml | 4 - etc/collector.properties | 9 - .../java/org/onap/dcae/ApplicationSettings.java | 83 ++--- src/main/java/org/onap/dcae/VesApplication.java | 29 +- .../java/org/onap/dcae/common/EventProcessor.java | 61 ---- .../java/org/onap/dcae/common/EventSender.java | 94 ++--- .../java/org/onap/dcae/common/EventUpdater.java | 137 ++++++++ .../java/org/onap/dcae/restapi/EventValidator.java | 78 +++++ .../onap/dcae/restapi/HealthCheckController.java | 12 +- .../java/org/onap/dcae/restapi/SwaggerConfig.java | 2 +- .../org/onap/dcae/restapi/VesRestController.java | 179 +++------- .../java/org/onap/dcae/restapi/WebMvcConfig.java | 1 - .../org/onap/dcae/ApplicationSettingsTest.java | 19 - src/test/java/org/onap/dcae/TLSTestBase.java | 5 +- .../java/org/onap/dcae/common/EventSenderTest.java | 12 +- src/test/resources/controller-config_dmaap_ip.json | 1 - .../resources/controller-config_singleline_ip.json | 1 - src/test/resources/test_collector_ip_op.properties | 1 - 23 files changed, 581 insertions(+), 553 deletions(-) delete mode 100644 src/main/java/org/onap/dcae/common/EventProcessor.java create mode 100644 src/main/java/org/onap/dcae/common/EventUpdater.java create mode 100644 src/main/java/org/onap/dcae/restapi/EventValidator.java diff --git a/dpo/blueprint/blueprint_ves.yaml b/dpo/blueprint/blueprint_ves.yaml index 2dbc5a66..43158f1c 100644 --- a/dpo/blueprint/blueprint_ves.yaml +++ b/dpo/blueprint/blueprint_ves.yaml @@ -15,169 +15,240 @@ # ============LICENSE_END========================================================= # # ECOMP is a trademark and service mark of AT&T Intellectual Property. -tosca_definitions_version: cloudify_dsl_1_3 - -description: > - This handcrafted blueprint will install the ves collector and provision the needed message router topics. This blueprint can be used to verify that a platform installation is operational and working correctly. +tosca_definitions_version: cloudify_dsl_1_3 imports: -- http://www.getcloudify.org/spec/cloudify/3.4/types.yaml -- https://NEXUS_REPO_HOST:8443/repository/NEXUS_RAW/type_files/docker/2.2.0/node-type.yaml -- https://NEXUS_REPO_HOST:8443/repository/NEXUS_RAW/type_files/relationship/1.0.0/node-type.yaml -- http://NEXUS_REPO_HOST:8081/repository/NEXUS_RAW/type_files/dmaap/dmaap_mr.yaml - + - http://www.getcloudify.org/spec/cloudify/3.4/types.yaml + - https://nexus.onap.org/service/local/repositories/raw/content/org.onap.dcaegen2.platform.plugins/R4/k8splugin/1.4.5/k8splugin_types.yaml + - https://nexus.onap.org/service/local/repositories/raw/content/org.onap.dcaegen2.platform.plugins/R4/dcaepolicyplugin/2.3.0/dcaepolicyplugin_types.yaml inputs: - - service_id: - description: Unique id used for an instance of this DCAE service. Use deployment id - default: 'foobar' - location_id: - default: 'solutioning-central' - docker_host_override: - default: 'component_dockerhost' - - topic00_aaf_username: - topic00_aaf_password: - topic00_location: - default: mtc5 - topic00_client_role: - default: com.att.dcae.member - - topic01_aaf_username: - topic01_aaf_password: - topic01_location: - default: mtc5 - topic01_client_role: - default: com.att.dcae.member - - topic02_aaf_username: - topic02_aaf_password: - topic02_location: - default: mtc5 - topic02_client_role: - default: com.att.dcae.member - - topic03_aaf_username: - topic03_aaf_password: - topic03_location: - default: mtc5 - topic03_client_role: - default: com.att.dcae.member - + collector.dmaap.streamid: + type: string + default: "fault=ves-fault,ves-fault-secondary|syslog=ves-syslog,ves-syslog-secondary|heartbeat=ves-heartbeat,ves-heartbeat-secondary|measurementsForVfScaling=ves-measurement,ves-measurement-secondary|mobileFlow=ves-mobileflow,ves-mobileflow-secondary|other=ves-other,ves-other-secondary|stateChange=ves-statechange,ves-statechange-secondary|thresholdCrossingAlert=ves-thresholdCrossingAlert,ves-thresholdCrossingAlert-secondary|voiceQuality=ves-voicequality,ves-voicequality-secondary|sipSignaling=ves-sipsignaling,ves-sipsignaling-secondary|notification=ves-notification,ves-notification-secondary|pnfRegistration=ves-pnfRegistration,ves-pnfRegistration-secondary" + external_port: + type: string + description: Kubernetes node port on which collector is exposed + default: "30235" + header.authlist: + type: string + default: "sample1,$2a$10$pgjaxDzSuc6XVFEeqvxQ5u90DKJnM/u7TJTcinAlFJVaavXMWf/Zi|userid1,$2a$10$61gNubgJJl9lh3nvQvY9X.x4e5ETWJJ7ao7ZhJEvmfJigov26Z6uq|userid2,$2a$10$G52y/3uhuhWAMy.bx9Se8uzWinmbJa.dlm1LW6bYPdPkkywLDPLiy" + log_directory: + type: string + default: "/opt/app/VESCollector/logs" + replicas: + type: integer + description: number of instances + default: 1 + tag_version: + type: string + default: "nexus.onap.org:10001/onap/org.onap.dcaegen2.collectors.ves.vescollector:1.3" + ves_fault_publish_url: + type: string + ves_fault_secondary_publish_url: + type: string + ves_heartbeat_publish_url: + type: string + ves_heartbeat_secondary_publish_url: + type: string + ves_measurement_publish_url: + type: string + ves_measurement_secondary_publish_url: + type: string + ves_mobileflow_publish_url: + type: string + ves_mobileflow_secondary_publish_url: + type: string + ves_notification_publish_url: + type: string + ves_notification_secondary_publish_url: + type: string + ves_other_publish_url: + type: string + ves_other_secondary_publish_url: + type: string + ves_pnfRegistration_publish_url: + type: string + ves_pnfRegistration_secondary_publish_url: + type: string + ves_sipsignaling_publish_url: + type: string + ves_sipsignaling_secondary_publish_url: + type: string + ves_statechange_publish_url: + type: string + ves_statechange_secondary_publish_url: + type: string + ves_syslog_publish_url: + type: string + ves_syslog_secondary_publish_url: + type: string + ves_thresholdCrossingAlert_publish_url: + type: string + ves_thresholdCrossingAlert_secondary_publish_url: + type: string + ves_voicequality_publish_url: + type: string + ves_voicequality_secondary_publish_url: + type: string node_templates: - - topic00: - type: dcae.nodes.Topic - properties: - topic_name: sec-fault-unsecure - - topic01: - type: dcae.nodes.Topic - properties: - topic_name: sec-measurement - - topic02: - type: dcae.nodes.Topic - properties: - topic_name: sec-measurement-unsecure - - topic03: - type: dcae.nodes.Topic - properties: - topic_name: sec-fault - - component00: - type: dcae.nodes.DockerContainerForComponentsUsingDmaap - properties: - service_component_type: - 'dcae-controller-ves-collector' - service_id: - { get_input: service_id } - location_id: - { get_input: location_id } - application_config: - collector.keystore.passwordfile: "/opt/app/dcae-certificate/.password" - collector.service.secure.port: -1 - tomcat.maxthreads: '200' - collector.keystore.file.location: "/opt/app/dcae-certificate/keystore.jks" - auth.method: "noAuth" - collector.service.port: 8080 - streams_publishes: - sec_fault_unsecure: - aaf_password: { get_input: topic00_aaf_password } - dmaap_info: "<>" - type: message_router - aaf_username: { get_input: topic00_aaf_username } - sec_measurement: - aaf_password: { get_input: topic01_aaf_password } - aaf_username: { get_input: topic01_aaf_username } - type: message_router - dmaap_info: "<>" - sec_measurement_unsecure: - aaf_password: { get_input: topic02_aaf_password } - aaf_username: { get_input: topic02_aaf_username } - dmaap_info: "<>" - type: message_router - sec_fault: - aaf_password: { get_input: topic03_aaf_password } - aaf_username: { get_input: topic03_aaf_username } - dmaap_info: "<>" - type: message_router - services_calls: {} - collector.schema.checkflag: 1 - collector.dmaap.streamid: fault=sec_fault,roadm-sec-to-hp|syslog=sec_syslog|heartbeat=sec_heartbeat|measurementsForVfScaling=sec_measurement|mobileFlow=sec_mobileflow|other=sec_other|stateChange=sec_statechange|thresholdCrossingAlert=sec_thresholdCrossingAlert - header.authlist: userid1,base64encodepwd1|userid2,base64encodepwd2 - streams_subscribes: {} - collector.inputQueue.maxPending: 8096 - collector.schema.file: "./etc/CommonEventFormat_27.2.json" - image: - NEXUS_REPO_HOST:18443/dcae-dev-raw/dcae-controller-ves-collector:1.1.3 - docker_config: - healthcheck: - type: "http" - interval: "15s" - timeout: "1s" - endpoint: "/" - streams_publishes: - - name: topic00 - location: { get_input: topic00_location } - client_role: { get_input: topic00_client_role } - type: message_router - - name: topic01 - location: { get_input: topic01_location } - client_role: { get_input: topic01_client_role } - type: message_router - - name: topic02 - location: { get_input: topic02_location } - client_role: { get_input: topic02_client_role } - type: message_router - - name: topic03 - location: { get_input: topic03_location } - client_role: { get_input: topic03_client_role } - type: message_router - streams_subscribes: [] - relationships: - - type: dcae.relationships.component_contained_in - target: docker_host - - type: dcae.relationships.publish_events - target: topic00 - - type: dcae.relationships.publish_events - target: topic01 - - type: dcae.relationships.publish_events - target: topic02 - - type: dcae.relationships.publish_events - target: topic03 + dcae-ves-collector: + type: dcae.nodes.ContainerizedPlatformComponent interfaces: cloudify.interfaces.lifecycle: - stop: + start: inputs: - cleanup_image: - True - - docker_host: - type: dcae.nodes.SelectedDockerHost + ports: + - concat: ["8443:", {get_input: external_port }] properties: - location_id: - { get_input: location_id } - docker_host_override: - { get_input: docker_host_override } + application_config: + service_calls: [] + stream_publishes: + ves-fault: + dmaap_info: + topic_url: + get_input: ves_fault_publish_url + type: message router + ves-fault-secondary: + dmaap_info: + topic_url: + get_input: ves_fault_secondary_publish_url + type: message router + ves-heartbeat: + dmaap_info: + topic_url: + get_input: ves_heartbeat_publish_url + type: message router + ves-heartbeat-secondary: + dmaap_info: + topic_url: + get_input: ves_heartbeat_secondary_publish_url + type: message router + ves-measurement: + dmaap_info: + topic_url: + get_input: ves_measurement_publish_url + type: message router + ves-measurement-secondary: + dmaap_info: + topic_url: + get_input: ves_measurement_secondary_publish_url + type: message router + ves-mobileflow: + dmaap_info: + topic_url: + get_input: ves_mobileflow_publish_url + type: message router + ves-mobileflow-secondary: + dmaap_info: + topic_url: + get_input: ves_mobileflow_secondary_publish_url + type: message router + ves-notification: + dmaap_info: + topic_url: + get_input: ves_notification_publish_url + type: message router + ves-notification-secondary: + dmaap_info: + topic_url: + get_input: ves_notification_secondary_publish_url + type: message router + ves-other: + dmaap_info: + topic_url: + get_input: ves_other_publish_url + type: message router + ves-other-secondary: + dmaap_info: + topic_url: + get_input: ves_other_secondary_publish_url + type: message router + ves-pnfRegistration: + dmaap_info: + topic_url: + get_input: ves_pnfRegistration_publish_url + type: message router + ves-pnfRegistration-secondary: + dmaap_info: + topic_url: + get_input: ves_pnfRegistration_secondary_publish_url + type: message router + ves-sipsignaling: + dmaap_info: + topic_url: + get_input: ves_sipsignaling_publish_url + type: message router + ves-sipsignaling-secondary: + dmaap_info: + topic_url: + get_input: ves_sipsignaling_secondary_publish_url + type: message router + ves-statechange: + dmaap_info: + topic_url: + get_input: ves_statechange_publish_url + type: message router + ves-statechange-secondary: + dmaap_info: + topic_url: + get_input: ves_statechange_secondary_publish_url + type: message router + ves-syslog: + dmaap_info: + topic_url: + get_input: ves_syslog_publish_url + type: message router + ves-syslog-secondary: + dmaap_info: + topic_url: + get_input: ves_syslog_secondary_publish_url + type: message router + ves-thresholdCrossingAlert: + dmaap_info: + topic_url: + get_input: ves_thresholdCrossingAlert_publish_url + type: message router + ves-thresholdCrossingAlert-secondary: + dmaap_info: + topic_url: + get_input: ves_thresholdCrossingAlert_secondary_publish_url + type: message router + ves-voicequality: + dmaap_info: + topic_url: + get_input: ves_voicequality_publish_url + type: message router + ves-voicequality-secondary: + dmaap_info: + topic_url: + get_input: ves_voicequality_secondary_publish_url + type: message router + stream_subcribes: {} + auth.method: noAuth + collector.dmaap.streamid: + get_input: collector.dmaap.streamid + collector.keystore.file.location: /opt/app/dcae-certificate/keystore.jks + collector.keystore.passwordfile: /opt/app/dcae-certificate/.password + collector.schema.checkflag: 1 + collector.schema.file: {"v1":"./etc/CommonEventFormat_27.2.json","v2":"./etc/CommonEventFormat_27.2.json","v3":"./etc/CommonEventFormat_27.2.json","v4":"./etc/CommonEventFormat_27.2.json","v5":"./etc/CommonEventFormat_28.4.1.json","v7":"./etc/CommonEventFormat_30.json"} + collector.service.port: 8080 + collector.service.secure.port: 8443 + collector.truststore.file.location: /opt/app/dcae-certificate/truststore.jks + collector.truststore.passwordfile: /opt/app/dcae-certificate/.trustpassword + event.transform.flag: 1 + header.authlist: + get_input: header.authlist + tomcat.maxthreads: 200 + docker_config: + interval: 15s + timeout: 1s + type: https + endpoint: /healthcheck + image: + get_input: tag_version + log_info: + get_input: log_directory + dns_name: dcae-ves-collector + replicas: + get_input: replicas + name: dcae-ves-collector diff --git a/dpo/data-formats/ConsulConfig.json b/dpo/data-formats/ConsulConfig.json index ea65522b..89348bfa 100644 --- a/dpo/data-formats/ConsulConfig.json +++ b/dpo/data-formats/ConsulConfig.json @@ -6,7 +6,6 @@ "collector.service.port": "8080", "collector.schema.file": "{\"v1\":\"./etc/CommonEventFormat_27.2.json\",\"v2\":\"./etc/CommonEventFormat_27.2.json\",\"v3\":\"./etc/CommonEventFormat_27.2.json\",\"v4\":\"./etc/CommonEventFormat_27.2.json\",\"v5\":\"./etc/CommonEventFormat_28.4.1.json\",\"v7\":\"./etc/CommonEventFormat_30.0.1.json\"}", "collector.keystore.passwordfile": "/opt/app/VESCollector/etc/passwordfile", - "collector.inputQueue.maxPending": "8096", "streams_publishes": { "ves-measurement": { "type": "message_router", diff --git a/dpo/spec/vescollector-componentspec.json b/dpo/spec/vescollector-componentspec.json index 4e2eb970..1b47268c 100644 --- a/dpo/spec/vescollector-componentspec.json +++ b/dpo/spec/vescollector-componentspec.json @@ -1,6 +1,6 @@ { "self": { - "version": "1.3.0", + "version": "1.5.0", "name": "dcae-ves-collector", "description": "Collector for receiving VES events through restful interface", "component_type": "docker" @@ -280,14 +280,6 @@ "policy_editable": false, "designer_editable": false }, - { - "name": "collector.inputQueue.maxPending", - "value": 8096, - "description": "Maximum queue limit across domains collector will queue before event is published", - "sourced_at_deployment": false, - "policy_editable": false, - "designer_editable": false - }, { "name": "collector.dmaap.streamid", "value": "fault=ves-fault,ves-fault-secondary|syslog=ves-syslog,ves-syslog-secondary|heartbeat=ves-heartbeat,ves-heartbeat-secondary|measurementsForVfScaling=ves-measurement,ves-measurement-secondary|mobileFlow=ves-mobileflow,ves-mobileflow-secondary|other=ves-other,ves-other-secondary|stateChange=ves-statechange,ves-statechange-secondary|thresholdCrossingAlert=ves-thresholdCrossingAlert,ves-thresholdCrossingAlert-secondary|voiceQuality=ves-voicequality,ves-voicequality-secondary|sipSignaling=ves-sipsignaling,ves-sipsignaling-secondary|notification=ves-notification,ves-notification-secondary|pnfRegistration=ves-pnfRegistration,ves-pnfRegistration-secondary", @@ -299,7 +291,7 @@ { "name": "auth.method", "value": "noAuth", - "description": "Basic Authentication flag; when enabled only secure port will be supported.", + "description": "Property to manage application mode, possible configurations: noAuth - default option - no security (http) , certOnly - auth by certificate (https), basicAuth - auth by basic auth username and password (https),certBasicAuth - auth by certificate and basic auth username / password (https),", "sourced_at_deployment": false, "policy_editable": false, "designer_editable": false @@ -379,13 +371,14 @@ } ], "ports": [ + "8080:8080", "8443:8443" ] }, "artifacts": [ { "type": "docker image", - "uri": "nexus.onap.org:10001/onap/org.onap.dcaegen2.collectors.ves.vescollector:1.3" + "uri": "nexus.onap.org:10001/onap/org.onap.dcaegen2.collectors.ves.vescollector:latest" } ] } diff --git a/dpo/tosca_model/schema.yaml b/dpo/tosca_model/schema.yaml index 6c1b2757..c44a4f7e 100644 --- a/dpo/tosca_model/schema.yaml +++ b/dpo/tosca_model/schema.yaml @@ -195,8 +195,6 @@ node_types: properties: docker_collector.dmaap.streamid: type: string - docker_collector.inputQueue.maxPending: - type: string docker_collector.keystore.file.location: type: string docker_collector.keystore.passwordfile: diff --git a/dpo/tosca_model/template.yaml b/dpo/tosca_model/template.yaml index 73b4ad38..2f132e12 100644 --- a/dpo/tosca_model/template.yaml +++ b/dpo/tosca_model/template.yaml @@ -26,7 +26,6 @@ topology_template: type: dcae.nodes.dockerApp.ves properties: docker_collector.dmaap.streamid: fault=sec_fault,roadm-sec-to-hp|syslog=sec_syslog|heartbeat=sec_heartbeat|measurementsForVfScaling=sec_measurement|mobileFlow=sec_mobileflow|other=sec_other|stateChange=sec_statechange|thresholdCrossingAlert=sec_thresholdCrossingAlert - docker_collector.inputQueue.maxPending: '8096' docker_collector.keystore.file.location: /opt/app/dcae-certificate/keystore.jks docker_collector.keystore.passwordfile: /opt/app/dcae-certificate/.password docker_collector.schema.checkflag: '1' diff --git a/dpo/tosca_model/translate.yaml b/dpo/tosca_model/translate.yaml index 284f34bf..f6b7a23e 100644 --- a/dpo/tosca_model/translate.yaml +++ b/dpo/tosca_model/translate.yaml @@ -24,8 +24,6 @@ topology_template: inputs: docker_collector.dmaap.streamid: type: string - docker_collector.inputQueue.maxPending: - type: string docker_collector.keystore.file.location: type: string docker_collector.keystore.passwordfile: @@ -105,8 +103,6 @@ topology_template: application_config: collector.dmaap.streamid: get_input: docker_collector.dmaap.streamid - collector.inputQueue.maxPending: - get_input: docker_collector.inputQueue.maxPending collector.keystore.file.location: get_input: docker_collector.keystore.file.location collector.keystore.passwordfile: diff --git a/etc/collector.properties b/etc/collector.properties index 82ba5954..ae15cd98 100755 --- a/etc/collector.properties +++ b/etc/collector.properties @@ -46,15 +46,6 @@ collector.cert.subject.matcher=etc/certSubjectMatcher.properties collector.truststore.file.location=etc/truststore collector.truststore.passwordfile=etc/trustpasswordfile -## Processing -## -## If there's a problem that prevents the collector from processing alarms, -## it's normally better to apply back pressure to the caller than to try to -## buffer beyond a reasonable size limit. With a limit, the server won't crash -## due to being out of memory, and the caller will get a 5xx reply saying the -## server is in trouble. -collector.inputQueue.maxPending=8096 - ## Schema Validation checkflag ## default no validation checkflag (-1) ## If enabled (1) - schemafile location must be specified diff --git a/src/main/java/org/onap/dcae/ApplicationSettings.java b/src/main/java/org/onap/dcae/ApplicationSettings.java index 61bcf4b4..205659c4 100644 --- a/src/main/java/org/onap/dcae/ApplicationSettings.java +++ b/src/main/java/org/onap/dcae/ApplicationSettings.java @@ -84,38 +84,14 @@ public class ApplicationSettings { throw new ApplicationException(ex); } } - public void loadPropertiesFromFile() { - try { - properties.load(configurationFileLocation); - } catch (ConfigurationException ex) { - log.error("Cannot load properties cause:", ex); - throw new ApplicationException(ex); - } - } - public Map validAuthorizationCredentials() { return prepareUsersMap(properties.getString("header.authlist", null)); } - private Map prepareUsersMap(@Nullable String allowedUsers) { - return allowedUsers == null ? HashMap.empty() - : List.of(allowedUsers.split("\\|")) - .map(t->t.split(",")) - .toMap(t-> t[0].trim(), t -> t[1].trim()); - } - - private String findOutConfigurationFileLocation(Map parsedArgs) { - return prependWithUserDirOnRelative(parsedArgs.get("c").getOrElse("etc/collector.properties")); - } - public Path configurationFileLocation() { return Paths.get(configurationFileLocation); } - public int maximumAllowedQueuedEvents() { - return properties.getInt("collector.inputQueue.maxPending", 1024 * 4); - } - public boolean jsonSchemaValidationEnabled() { return properties.getInt("collector.schema.checkflag", -1) > 0; } @@ -126,22 +102,8 @@ public class ApplicationSettings { .getOrElseThrow(() -> new IllegalStateException("No fallback schema present in application.")); } - private Map loadJsonSchemas() { - return jsonSchema().toMap().entrySet().stream() - .map(this::readSchemaForVersion) - .collect(HashMap.collector()); - } - - private Tuple2 readSchemaForVersion(java.util.Map.Entry versionToFilePath) { - try { - String schemaContent = new String( - readAllBytes(Paths.get(versionToFilePath.getValue().toString()))); - JsonNode schemaNode = JsonLoader.fromString(schemaContent); - JsonSchema schema = JsonSchemaFactory.byDefault().getJsonSchema(schemaNode); - return Tuple(versionToFilePath.getKey(), schema); - } catch (IOException | ProcessingException e) { - throw new ApplicationException("Could not read schema from path: " + versionToFilePath.getValue(), e); - } + public boolean isVersionSupported(String version){ + return loadedJsonSchemas.containsKey(version); } public int httpPort() { @@ -183,6 +145,7 @@ public class ApplicationSettings { public String exceptionConfigFileLocation() { return properties.getString("exceptionConfig", null); } + public String dMaaPConfigurationFileLocation() { return prependWithUserDirOnRelative(properties.getString("collector.dmaapfile", "etc/DmaapConfig.json")); } @@ -204,7 +167,16 @@ public class ApplicationSettings { } } - public void addOrUpdate(String key, String value) { + private void loadPropertiesFromFile() { + try { + properties.load(configurationFileLocation); + } catch (ConfigurationException ex) { + log.error("Cannot load properties cause:", ex); + throw new ApplicationException(ex); + } + } + + private void addOrUpdate(String key, String value) { if (properties.containsKey(key)) { properties.setProperty(key, value); } else { @@ -212,6 +184,35 @@ public class ApplicationSettings { } } + private String findOutConfigurationFileLocation(Map parsedArgs) { + return prependWithUserDirOnRelative(parsedArgs.get("c").getOrElse("etc/collector.properties")); + } + + private Map loadJsonSchemas() { + return jsonSchema().toMap().entrySet().stream() + .map(this::readSchemaForVersion) + .collect(HashMap.collector()); + } + + private Tuple2 readSchemaForVersion(java.util.Map.Entry versionToFilePath) { + try { + String schemaContent = new String( + readAllBytes(Paths.get(versionToFilePath.getValue().toString()))); + JsonNode schemaNode = JsonLoader.fromString(schemaContent); + JsonSchema schema = JsonSchemaFactory.byDefault().getJsonSchema(schemaNode); + return Tuple(versionToFilePath.getKey(), schema); + } catch (IOException | ProcessingException e) { + throw new ApplicationException("Could not read schema from path: " + versionToFilePath.getValue(), e); + } + } + + private Map prepareUsersMap(@Nullable String allowedUsers) { + return allowedUsers == null ? HashMap.empty() + : List.of(allowedUsers.split("\\|")) + .map(t->t.split(",")) + .toMap(t-> t[0].trim(), t -> t[1].trim()); + } + private JSONObject jsonSchema() { return new JSONObject(properties.getString("collector.schema.file", format("{\"%s\":\"etc/CommonEventFormat_28.4.1.json\"}", FALLBACK_VES_VERSION))); diff --git a/src/main/java/org/onap/dcae/VesApplication.java b/src/main/java/org/onap/dcae/VesApplication.java index d658b4aa..e3340820 100644 --- a/src/main/java/org/onap/dcae/VesApplication.java +++ b/src/main/java/org/onap/dcae/VesApplication.java @@ -22,14 +22,9 @@ package org.onap.dcae; import io.vavr.collection.Map; import java.nio.file.Paths; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import org.json.JSONObject; -import org.onap.dcae.common.EventProcessor; import org.onap.dcae.common.EventSender; import org.onap.dcae.common.publishing.DMaaPConfigurationParser; import org.onap.dcae.common.publishing.EventPublisher; @@ -49,21 +44,16 @@ import org.springframework.context.annotation.Lazy; @SpringBootApplication(exclude = {GsonAutoConfiguration.class, SecurityAutoConfiguration.class}) public class VesApplication { - private static final Logger metriclog = LoggerFactory.getLogger("com.att.ecomp.metrics"); private static final Logger incomingRequestsLogger = LoggerFactory.getLogger("org.onap.dcae.common.input"); private static final Logger oplog = LoggerFactory.getLogger("org.onap.dcae.common.output"); private static final Logger errorLog = LoggerFactory.getLogger("org.onap.dcae.common.error"); - private static final int MAX_THREADS = 20; - public static LinkedBlockingQueue fProcessingInputQueue; private static ApplicationSettings properties; private static ConfigurableApplicationContext context; private static ConfigLoader configLoader; - private static EventProcessor eventProcessor; private static ScheduledThreadPoolExecutor scheduledThreadPoolExecutor; private static SpringApplication app; private static EventPublisher eventPublisher; private static ScheduledFuture scheduleFeatures; - private static ExecutorService executor; public static void main(String[] args) { app = new SpringApplication(VesApplication.class); @@ -73,7 +63,6 @@ public class VesApplication { app.setAddCommandLineProperties(true); context = app.run(); configLoader.updateConfig(); - } public static void restartApplication() { @@ -89,7 +78,6 @@ public class VesApplication { } private static void init() { - fProcessingInputQueue = new LinkedBlockingQueue<>(properties.maximumAllowedQueuedEvents()); createConfigLoader(); createSchedulePoolExecutor(); createExecutors(); @@ -97,12 +85,6 @@ public class VesApplication { private static void createExecutors() { eventPublisher = EventPublisher.createPublisher(oplog, getDmapConfig()); - eventProcessor = new EventProcessor(new EventSender(eventPublisher, properties)); - - executor = Executors.newFixedThreadPool(MAX_THREADS); - for (int i = 0; i < MAX_THREADS; ++i) { - executor.execute(eventProcessor); - } } private static void createSchedulePoolExecutor() { @@ -141,12 +123,6 @@ public class VesApplication { return incomingRequestsLogger; } - @Bean - @Qualifier("metricsLog") - public Logger incomingRequestsMetricsLogger() { - return metriclog; - } - @Bean @Qualifier("errorLog") public Logger errorLogger() { @@ -154,8 +130,9 @@ public class VesApplication { } @Bean - public LinkedBlockingQueue inputQueue() { - return fProcessingInputQueue; + @Qualifier("eventSender") + public EventSender eventSender() { + return new EventSender(eventPublisher,properties); } } diff --git a/src/main/java/org/onap/dcae/common/EventProcessor.java b/src/main/java/org/onap/dcae/common/EventProcessor.java deleted file mode 100644 index bf3bf70d..00000000 --- a/src/main/java/org/onap/dcae/common/EventProcessor.java +++ /dev/null @@ -1,61 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * PROJECT - * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.dcae.common; - -import com.att.nsa.clock.SaClock; -import com.att.nsa.logging.LoggingContext; -import com.att.nsa.logging.log4j.EcompFields; -import org.json.JSONObject; -import org.onap.dcae.VesApplication; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class EventProcessor implements Runnable { - - private static final Logger log = LoggerFactory.getLogger(EventProcessor.class); - private EventSender eventSender; - - public EventProcessor(EventSender eventSender) { - this.eventSender = eventSender; - } - - @Override - public void run() { - try { - while (true){ - JSONObject event = VesApplication.fProcessingInputQueue.take(); - log.info("QueueSize:" + VesApplication.fProcessingInputQueue.size() + "\tEventProcessor\tRemoving element: " + event); - setLoggingContext(event); - log.debug("event.VESuniqueId" + event.get("VESuniqueId") + "event.commonEventHeader.domain:" + eventSender.getDomain(event)); - eventSender.send(event); - log.debug("Message published" + event); - } - } catch (InterruptedException e) { - log.error("EventProcessor InterruptedException" + e.getMessage()); - Thread.currentThread().interrupt(); - } - } - - private void setLoggingContext(JSONObject event) { - LoggingContext localLC = VESLogger.getLoggingContextForThread(event.get("VESuniqueId").toString()); - localLC.put(EcompFields.kBeginTimestampMs, SaClock.now()); - } -} \ No newline at end of file diff --git a/src/main/java/org/onap/dcae/common/EventSender.java b/src/main/java/org/onap/dcae/common/EventSender.java index 48268d6c..c1002af6 100644 --- a/src/main/java/org/onap/dcae/common/EventSender.java +++ b/src/main/java/org/onap/dcae/common/EventSender.java @@ -20,17 +20,12 @@ */ package org.onap.dcae.common; -import com.google.common.reflect.TypeToken; -import com.google.gson.Gson; +import com.att.nsa.clock.SaClock; +import com.att.nsa.logging.LoggingContext; +import com.att.nsa.logging.log4j.EcompFields; import io.vavr.collection.Map; -import java.io.FileReader; -import java.io.IOException; -import java.lang.reflect.Type; -import java.text.SimpleDateFormat; -import java.util.Date; -import java.util.List; +import org.json.JSONArray; import org.json.JSONObject; -import org.onap.dcae.ApplicationException; import org.onap.dcae.ApplicationSettings; import org.onap.dcae.common.publishing.EventPublisher; import org.slf4j.Logger; @@ -38,88 +33,47 @@ import org.slf4j.LoggerFactory; public class EventSender { - private static final String COULD_NOT_FIND_FILE = "Couldn't find file ./etc/eventTransform.json"; + private static final Logger metriclog = LoggerFactory.getLogger("com.att.ecomp.metrics"); private Map streamidHash; - private ApplicationSettings properties; private EventPublisher eventPublisher; - - private static final Type EVENT_LIST_TYPE = new TypeToken>() {}.getType(); + private static final String VES_UNIQUE_ID = "VESuniqueId"; private static final Logger log = LoggerFactory.getLogger(EventSender.class); private static final String EVENT_LITERAL = "event"; private static final String COMMON_EVENT_HEADER = "commonEventHeader"; - private final SimpleDateFormat dateFormat = new SimpleDateFormat("EEE, MM dd yyyy hh:mm:ss z"); public EventSender( EventPublisher eventPublisher, ApplicationSettings properties) { this.eventPublisher = eventPublisher; this.streamidHash = properties.dMaaPStreamsMapping(); - this.properties = properties; - } - public void send(JSONObject event) { - streamidHash.get(getDomain(event)) - .onEmpty(() -> log.error("No StreamID defined for publish - Message dropped" + event)) - .forEach(streamIds -> sendEventsToStreams(event, streamIds)); + public void send(JSONArray arrayOfEvents) { + for (int i = 0; i < arrayOfEvents.length(); i++) { + metriclog.info("EVENT_PUBLISH_START"); + JSONObject object = (JSONObject) arrayOfEvents.get(i); + setLoggingContext(object); + streamidHash.get(getDomain(object)) + .onEmpty(() -> log.error("No StreamID defined for publish - Message dropped" + object)) + .forEach(streamIds -> sendEventsToStreams(object, streamIds)); + log.debug("Message published" + object); + } + log.debug("CommonStartup.handleEvents:EVENTS has been published successfully!"); + metriclog.info("EVENT_PUBLISH_END"); } - public static String getDomain(JSONObject event) { + private static String getDomain(JSONObject event) { return event.getJSONObject(EVENT_LITERAL).getJSONObject(COMMON_EVENT_HEADER).getString("domain"); } private void sendEventsToStreams(JSONObject event, String[] streamIdList) { for (String aStreamIdList : streamIdList) { log.info("Invoking publisher for streamId:" + aStreamIdList); - eventPublisher.sendEvent(overrideEvent(event), aStreamIdList); + eventPublisher.sendEvent(event, aStreamIdList); } } - private JSONObject overrideEvent(JSONObject event) { - JSONObject jsonObject = addCurrentTimeToEvent(event); - if (properties.eventTransformingEnabled()) { - try (FileReader fr = new FileReader("./etc/eventTransform.json")) { - log.info("parse eventTransform.json"); - List events = new Gson().fromJson(fr, EVENT_LIST_TYPE); - parseEventsJson(events, new ConfigProcessorAdapter(new ConfigProcessors(jsonObject))); - } catch (IOException e) { - log.error(COULD_NOT_FIND_FILE, e); - throw new ApplicationException(COULD_NOT_FIND_FILE, e); - } - } - if (jsonObject.has("VESversion")) - jsonObject.remove("VESversion"); - - log.debug("Modified event:" + jsonObject); - return jsonObject; - } - - private JSONObject addCurrentTimeToEvent(JSONObject event) { - final Date currentTime = new Date(); - JSONObject collectorTimeStamp = new JSONObject().put("collectorTimeStamp", dateFormat.format(currentTime)); - JSONObject commonEventHeaderkey = event.getJSONObject(EVENT_LITERAL).getJSONObject(COMMON_EVENT_HEADER); - commonEventHeaderkey.put("internalHeaderFields", collectorTimeStamp); - event.getJSONObject(EVENT_LITERAL).put(COMMON_EVENT_HEADER, commonEventHeaderkey); - return event; - } - - private void parseEventsJson(List eventsTransform, ConfigProcessorAdapter configProcessorAdapter) { - for (Event eventTransform : eventsTransform) { - JSONObject filterObj = new JSONObject(eventTransform.filter.toString()); - if (configProcessorAdapter.isFilterMet(filterObj)) { - callProcessorsMethod(configProcessorAdapter, eventTransform.processors); - } - } - } - - private void callProcessorsMethod(ConfigProcessorAdapter configProcessorAdapter, List processors) { - for (Processor processor : processors) { - final String functionName = processor.functionName; - final JSONObject args = new JSONObject(processor.args.toString()); - log.info(String.format("functionName==%s | args==%s", functionName, args)); - try { - configProcessorAdapter.runConfigProcessorFunctionByName(functionName, args); - } catch (ReflectiveOperationException e) { - log.error("EventProcessor Exception" + e.getMessage() + e + e.getCause()); - } - } + private void setLoggingContext(JSONObject event) { + LoggingContext localLC = VESLogger.getLoggingContextForThread(event.get(VES_UNIQUE_ID).toString()); + localLC.put(EcompFields.kBeginTimestampMs, SaClock.now()); + log.debug("event.VESuniqueId" + event.get(VES_UNIQUE_ID) + "event.commonEventHeader.domain:" + getDomain(event)); } } diff --git a/src/main/java/org/onap/dcae/common/EventUpdater.java b/src/main/java/org/onap/dcae/common/EventUpdater.java new file mode 100644 index 00000000..1caa4f18 --- /dev/null +++ b/src/main/java/org/onap/dcae/common/EventUpdater.java @@ -0,0 +1,137 @@ +/* + * ============LICENSE_START======================================================= + * PROJECT + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * Copyright (C) 2019 Nokia. All rights reserved.s + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.dcae.common; + +import com.google.common.reflect.TypeToken; +import com.google.gson.Gson; +import java.io.FileReader; +import java.io.IOException; +import java.lang.reflect.Type; +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.List; +import java.util.UUID; +import org.json.JSONArray; +import org.json.JSONObject; +import org.onap.dcae.ApplicationException; +import org.onap.dcae.ApplicationSettings; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class EventUpdater { + + private static final String EVENT_LIST = "eventList"; + private static final String EVENT = "event"; + private static final String VES_UNIQUE_ID = "VESuniqueId"; + private static final String VES_VERSION = "VESversion"; + private static final String COULD_NOT_FIND_FILE = "Couldn't find file ./etc/eventTransform.json"; + private static final Type EVENT_LIST_TYPE = new TypeToken>() {}.getType(); + private static final Logger log = LoggerFactory.getLogger(EventSender.class); + private static final String EVENT_LITERAL = "event"; + private static final String COMMON_EVENT_HEADER = "commonEventHeader"; + private static final String EVENT_TRANSFORM = "./etc/eventTransform.json"; + private ApplicationSettings settings; + private final SimpleDateFormat dateFormat = new SimpleDateFormat("EEE, MM dd yyyy hh:mm:ss z"); + + public EventUpdater(ApplicationSettings settings) { + this.settings = settings; + } + + public JSONArray convert(JSONObject jsonObject, String version, UUID uuid, String type){ + if(type.equalsIgnoreCase(EVENT_LIST)){ + return convertEvents(jsonObject, uuid.toString(), version); + } + else { + return convertEvent(jsonObject, uuid.toString(), version); + } + } + + private JSONArray convertEvents(JSONObject jsonObject, + String uuid, String version) { + JSONArray asArrayEvents = new JSONArray(); + + JSONArray events = jsonObject.getJSONArray(EVENT_LIST); + for (int i = 0; i < events.length(); i++) { + JSONObject event = new JSONObject().put(EVENT, events.getJSONObject(i)); + event.put(VES_UNIQUE_ID, uuid + "-" + i); + event.put(VES_VERSION, version); + asArrayEvents.put(overrideEvent(event)); + } + return asArrayEvents; + } + + private JSONArray convertEvent(JSONObject jsonObject, String uuid, String version) { + jsonObject.put(VES_UNIQUE_ID, uuid); + jsonObject.put(VES_VERSION, version); + return new JSONArray().put(overrideEvent(jsonObject)); + } + + private JSONObject overrideEvent(JSONObject event) { + JSONObject jsonObject = addCurrentTimeToEvent(event); + if (settings.eventTransformingEnabled()) { + try (FileReader fr = new FileReader(EVENT_TRANSFORM)) { + log.info("parse " + EVENT_TRANSFORM + " file"); + List events = new Gson().fromJson(fr, EVENT_LIST_TYPE); + parseEventsJson(events, new ConfigProcessorAdapter(new ConfigProcessors(jsonObject))); + } catch (IOException e) { + log.error(COULD_NOT_FIND_FILE, e); + throw new ApplicationException(COULD_NOT_FIND_FILE, e); + } + } + if (jsonObject.has(VES_VERSION)) + jsonObject.remove(VES_VERSION); + log.debug("Modified event:" + jsonObject); + return jsonObject; + } + + private JSONObject addCurrentTimeToEvent(JSONObject event) { + final Date currentTime = new Date(); + JSONObject collectorTimeStamp = new JSONObject().put("collectorTimeStamp", dateFormat.format(currentTime)); + JSONObject commonEventHeaderkey = event.getJSONObject(EVENT_LITERAL).getJSONObject(COMMON_EVENT_HEADER); + commonEventHeaderkey.put("internalHeaderFields", collectorTimeStamp); + event.getJSONObject(EVENT_LITERAL).put(COMMON_EVENT_HEADER, commonEventHeaderkey); + return event; + } + + private void parseEventsJson(List eventsTransform, ConfigProcessorAdapter configProcessorAdapter) { + for (Event eventTransform : eventsTransform) { + JSONObject filterObj = new JSONObject(eventTransform.filter.toString()); + if (configProcessorAdapter.isFilterMet(filterObj)) { + callProcessorsMethod(configProcessorAdapter, eventTransform.processors); + } + } + } + + private void callProcessorsMethod(ConfigProcessorAdapter configProcessorAdapter, List processors) { + for (Processor processor : processors) { + //TODO try to remove refection + final String functionName = processor.functionName; + final JSONObject args = new JSONObject(processor.args.toString()); + log.info(String.format("functionName==%s | args==%s", functionName, args)); + try { + configProcessorAdapter.runConfigProcessorFunctionByName(functionName, args); + } catch (ReflectiveOperationException e) { + log.error("EventProcessor Exception" + e.getMessage() + e + e.getCause()); + } + } + } +} diff --git a/src/main/java/org/onap/dcae/restapi/EventValidator.java b/src/main/java/org/onap/dcae/restapi/EventValidator.java new file mode 100644 index 00000000..f119b507 --- /dev/null +++ b/src/main/java/org/onap/dcae/restapi/EventValidator.java @@ -0,0 +1,78 @@ +/* + * ============LICENSE_START======================================================= + * PROJECT + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * Copyright (C) 2019 Nokia. All rights reserved.s + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.restapi; + +import static java.util.stream.StreamSupport.stream; + +import com.github.fge.jackson.JsonLoader; +import com.github.fge.jsonschema.core.report.ProcessingReport; +import com.github.fge.jsonschema.main.JsonSchema; +import java.util.Optional; +import org.json.JSONObject; +import org.onap.dcae.ApplicationException; +import org.onap.dcae.ApplicationSettings; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.http.ResponseEntity; + +public class EventValidator { + + private static final Logger log = LoggerFactory.getLogger(EventValidator.class); + + private ApplicationSettings applicationSettings; + + public EventValidator(ApplicationSettings applicationSettings) { + this.applicationSettings = applicationSettings; + } + + public Optional> validate(JSONObject jsonObject, String type, String version){ + if (applicationSettings.jsonSchemaValidationEnabled()) { + if (jsonObject.has(type)) { + if (!conformsToSchema(jsonObject, version)) { + return errorResponse(ApiException.SCHEMA_VALIDATION_FAILED); + } + } else { + return errorResponse(ApiException.INVALID_JSON_INPUT); + } + } + return Optional.empty(); + } + + private boolean conformsToSchema(JSONObject payload, String version) { + try { + JsonSchema schema = applicationSettings.jsonSchema(version); + ProcessingReport report = schema.validate(JsonLoader.fromString(payload.toString())); + if (report.isSuccess()) { + return true; + } + log.warn("Schema validation failed for event: " + payload); + stream(report.spliterator(), false).forEach(e -> log.warn(e.getMessage())); + return false; + } catch (Exception e) { + throw new ApplicationException("Unable to validate against schema", e); + } + } + + private Optional> errorResponse(ApiException noServerResources) { + return Optional.of(ResponseEntity.status(noServerResources.httpStatusCode) + .body(noServerResources.toJSON().toString())); + } +} diff --git a/src/main/java/org/onap/dcae/restapi/HealthCheckController.java b/src/main/java/org/onap/dcae/restapi/HealthCheckController.java index 9c65619c..77c6802a 100644 --- a/src/main/java/org/onap/dcae/restapi/HealthCheckController.java +++ b/src/main/java/org/onap/dcae/restapi/HealthCheckController.java @@ -21,16 +21,20 @@ package org.onap.dcae.restapi; -import org.springframework.stereotype.Controller; import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RestController; - -@Controller +@RestController public class HealthCheckController { + @GetMapping("/") + public String main() { + return "Welcome to VESCollector"; + } + @GetMapping("/healthcheck") public String healthCheck() { - return "hello"; + return "I'm good"; } } diff --git a/src/main/java/org/onap/dcae/restapi/SwaggerConfig.java b/src/main/java/org/onap/dcae/restapi/SwaggerConfig.java index 60740a80..267db054 100644 --- a/src/main/java/org/onap/dcae/restapi/SwaggerConfig.java +++ b/src/main/java/org/onap/dcae/restapi/SwaggerConfig.java @@ -31,6 +31,7 @@ import springfox.documentation.swagger2.annotations.EnableSwagger2; @Configuration @EnableSwagger2 public class SwaggerConfig{ + @Bean public Docket api() { return new Docket(DocumentationType.SWAGGER_2) @@ -39,5 +40,4 @@ public class SwaggerConfig{ .paths(PathSelectors.any()) .build(); } - } diff --git a/src/main/java/org/onap/dcae/restapi/VesRestController.java b/src/main/java/org/onap/dcae/restapi/VesRestController.java index 3102c31c..b18eb7bc 100644 --- a/src/main/java/org/onap/dcae/restapi/VesRestController.java +++ b/src/main/java/org/onap/dcae/restapi/VesRestController.java @@ -21,32 +21,27 @@ package org.onap.dcae.restapi; -import static java.util.stream.StreamSupport.stream; import static org.springframework.http.ResponseEntity.accepted; +import static org.springframework.http.ResponseEntity.badRequest; import com.att.nsa.clock.SaClock; import com.att.nsa.logging.LoggingContext; import com.att.nsa.logging.log4j.EcompFields; -import com.github.fge.jackson.JsonLoader; -import com.github.fge.jsonschema.core.report.ProcessingReport; -import com.github.fge.jsonschema.main.JsonSchema; - +import java.util.Optional; import java.util.UUID; -import java.util.concurrent.LinkedBlockingQueue; import javax.servlet.http.HttpServletRequest; - import org.json.JSONArray; import org.json.JSONObject; -import org.onap.dcae.ApplicationException; import org.onap.dcae.ApplicationSettings; +import org.onap.dcae.common.EventSender; import org.onap.dcae.common.VESLogger; +import org.onap.dcae.common.EventUpdater; import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.http.MediaType; import org.springframework.http.ResponseEntity; -import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RestController; @@ -54,152 +49,64 @@ import org.springframework.web.bind.annotation.RestController; @RestController public class VesRestController { - private static final Logger log = LoggerFactory.getLogger(VesRestController.class); - private static final String INVALID_JSON = ApiException.INVALID_JSON_INPUT.toJSON().toString(); - private final ApplicationSettings applicationSettings; - private final LinkedBlockingQueue inputQueue; - private final Logger metricsLog; - private final Logger errorLog; - private final Logger incomingRequestsLogger; + private static final String VES_EVENT_MESSAGE = "Received a VESEvent '%s', marked with unique identifier '%s', on api version '%s', from host: '%s'"; + private static final String EVENT_LIST = "eventList"; + private static final String EVENT = "event"; + private final ApplicationSettings settings; + private final Logger requestLogger; + private EventSender eventSender; @Autowired - VesRestController(ApplicationSettings applicationSettings, - @Qualifier("metricsLog") Logger metricsLog, - @Qualifier("errorLog") Logger errorLog, + VesRestController(ApplicationSettings settings, @Qualifier("incomingRequestsLogger") Logger incomingRequestsLogger, - @Qualifier("inputQueue") LinkedBlockingQueue inputQueue) { - this.applicationSettings = applicationSettings; - this.metricsLog = metricsLog; - this.errorLog = errorLog; - this.incomingRequestsLogger = incomingRequestsLogger; - this.inputQueue = inputQueue; - } - - @GetMapping("/") - String mainPage() { - return "Welcome to VESCollector"; + @Qualifier("eventSender") EventSender eventSender) { + this.settings = settings; + this.requestLogger = incomingRequestsLogger; + this.eventSender = eventSender; } - //refactor in next iteration - @PostMapping(value = {"/eventListener/v1", - "/eventListener/v1/eventBatch", - "/eventListener/v2", - "/eventListener/v2/eventBatch", - "/eventListener/v3", - "/eventListener/v3/eventBatch", - "/eventListener/v4", - "/eventListener/v4/eventBatch", - "/eventListener/v5", - "/eventListener/v5/eventBatch", - "/eventListener/v7", - "/eventListener/v7/eventBatch"}, consumes = "application/json") - ResponseEntity receiveEvent(@RequestBody String jsonPayload, HttpServletRequest httpServletRequest) { - String request = httpServletRequest.getRequestURI(); - String version = extractVersion(request); - - JSONObject jsonObject; - try { - jsonObject = new JSONObject(jsonPayload); - } catch (Exception e) { - log.error(INVALID_JSON); - return ResponseEntity.badRequest().body(INVALID_JSON); - } - - String uuid = setUpECOMPLoggingForRequest(); - incomingRequestsLogger.info(String.format( - "Received a VESEvent '%s', marked with unique identifier '%s', on api version '%s', from host: '%s'", - jsonObject, uuid, version, httpServletRequest.getRemoteHost())); - - if (applicationSettings.jsonSchemaValidationEnabled()) { - if (isBatchRequest(request) && (jsonObject.has("eventList") && (!jsonObject.has("event")))) { - if (!conformsToSchema(jsonObject, version)) { - return errorResponse(ApiException.SCHEMA_VALIDATION_FAILED); - } - } else if (!isBatchRequest(request) && (!jsonObject.has("eventList") && (jsonObject.has("event")))) { - if (!conformsToSchema(jsonObject, version)) { - return errorResponse(ApiException.SCHEMA_VALIDATION_FAILED); - } - } else { - return errorResponse(ApiException.INVALID_JSON_INPUT); - } + @PostMapping(value = {"/eventListener/{version}"}, consumes = "application/json") + ResponseEntity event(@RequestBody String event, @PathVariable String version, HttpServletRequest request) { + if (settings.isVersionSupported(version)) { + return process(event, version, request, EVENT); } + return badRequest().contentType(MediaType.APPLICATION_JSON).body(String.format("API version %s is not supported", version)); + } - JSONArray commonlyFormatted = convertToJSONArrayCommonFormat(jsonObject, request, uuid, version); - if (!putEventsOnProcessingQueue(commonlyFormatted)) { - errorLog.error("EVENT_RECEIPT_FAILURE: QueueFull " + ApiException.NO_SERVER_RESOURCES); - return errorResponse(ApiException.NO_SERVER_RESOURCES); + @PostMapping(value = {"/eventListener/{version}/eventBatch"}, consumes = "application/json") + ResponseEntity events(@RequestBody String events, @PathVariable String version, HttpServletRequest request) { + if (settings.isVersionSupported(version)) { + return process(events, version, request, EVENT_LIST); } - return accepted() - .contentType(MediaType.APPLICATION_JSON) - .body("Accepted"); + return badRequest().contentType(MediaType.APPLICATION_JSON).body(String.format("API version %s is not supported", version)); } - private String extractVersion(String httpServletRequest) { - return httpServletRequest.split("/")[2]; - } + private ResponseEntity process(String events, String version, HttpServletRequest request, String type) { - private ResponseEntity errorResponse(ApiException noServerResources) { - return ResponseEntity.status(noServerResources.httpStatusCode) - .body(noServerResources.toJSON().toString()); - } + JSONObject jsonObject = new JSONObject(events); - private boolean putEventsOnProcessingQueue(JSONArray arrayOfEvents) { - for (int i = 0; i < arrayOfEvents.length(); i++) { - metricsLog.info("EVENT_PUBLISH_START"); - if (!inputQueue.offer((JSONObject) arrayOfEvents.get(i))) { - return false; - } - } - log.debug("CommonStartup.handleEvents:EVENTS has been published successfully!"); - metricsLog.info("EVENT_PUBLISH_END"); - return true; - } + EventValidator eventValidator = new EventValidator(settings); + Optional> validationResult = eventValidator.validate(jsonObject, type, version); - private boolean conformsToSchema(JSONObject payload, String version) { - try { - JsonSchema schema = applicationSettings.jsonSchema(version); - ProcessingReport report = schema.validate(JsonLoader.fromString(payload.toString())); - if (!report.isSuccess()) { - log.warn("Schema validation failed for event: " + payload); - stream(report.spliterator(), false).forEach(e -> log.warn(e.getMessage())); - return false; - } - return report.isSuccess(); - } catch (Exception e) { - throw new ApplicationException("Unable to validate against schema", e); + if (validationResult.isPresent()){ + return validationResult.get(); } + JSONArray arrayOfEvents = new EventUpdater(settings).convert(jsonObject,version, generateUUID(version, request.getRequestURI(), jsonObject), type); + eventSender.send(arrayOfEvents); + // TODO call service and return status, replace CambriaClient, split event to single object and list of them + return accepted().contentType(MediaType.APPLICATION_JSON).body("Accepted"); } - private static JSONArray convertToJSONArrayCommonFormat(JSONObject jsonObject, String request, - String uuid, String version) { - JSONArray asArrayEvents = new JSONArray(); - String vesUniqueIdKey = "VESuniqueId"; - String vesVersionKey = "VESversion"; - if (isBatchRequest(request)) { - JSONArray events = jsonObject.getJSONArray("eventList"); - for (int i = 0; i < events.length(); i++) { - JSONObject event = new JSONObject().put("event", events.getJSONObject(i)); - event.put(vesUniqueIdKey, uuid + "-" + i); - event.put(vesVersionKey, version); - asArrayEvents.put(event); - } - } else { - jsonObject.put(vesUniqueIdKey, uuid); - jsonObject.put(vesVersionKey, version); - asArrayEvents = new JSONArray().put(jsonObject); - } - return asArrayEvents; + private UUID generateUUID(String version, String uri, JSONObject jsonObject) { + UUID uuid = UUID.randomUUID(); + setUpECOMPLoggingForRequest(uuid); + requestLogger.info(String.format(VES_EVENT_MESSAGE, jsonObject, uuid, version, uri)); + return uuid; } - private static String setUpECOMPLoggingForRequest() { - final UUID uuid = UUID.randomUUID(); + private static void setUpECOMPLoggingForRequest(UUID uuid) { LoggingContext localLC = VESLogger.getLoggingContextForThread(uuid); localLC.put(EcompFields.kBeginTimestampMs, SaClock.now()); - return uuid.toString(); - } - - private static boolean isBatchRequest(String request) { - return request.contains("eventBatch"); } } \ No newline at end of file diff --git a/src/main/java/org/onap/dcae/restapi/WebMvcConfig.java b/src/main/java/org/onap/dcae/restapi/WebMvcConfig.java index 7059c4e5..c3e2a5de 100644 --- a/src/main/java/org/onap/dcae/restapi/WebMvcConfig.java +++ b/src/main/java/org/onap/dcae/restapi/WebMvcConfig.java @@ -52,5 +52,4 @@ public class WebMvcConfig extends WebMvcConfigurationSupport { resolver.setSuffix(".html"); return resolver; } - } diff --git a/src/test/java/org/onap/dcae/ApplicationSettingsTest.java b/src/test/java/org/onap/dcae/ApplicationSettingsTest.java index 60287aef..6b0023f8 100644 --- a/src/test/java/org/onap/dcae/ApplicationSettingsTest.java +++ b/src/test/java/org/onap/dcae/ApplicationSettingsTest.java @@ -234,25 +234,6 @@ public class ApplicationSettingsTest { assertEquals(sanitizePath("etc/DmaapConfig.json"), dmaapConfigFileLocation); } - @Test - public void shouldReturnMaximumAllowedQueuedEvents() throws IOException { - // when - int maximumAllowedQueuedEvents = fromTemporaryConfiguration("collector.inputQueue.maxPending=10000") - .maximumAllowedQueuedEvents(); - - // then - assertEquals(10000, maximumAllowedQueuedEvents); - } - - @Test - public void shouldReturnDefaultMaximumAllowedQueuedEvents() throws IOException { - // when - int maximumAllowedQueuedEvents = fromTemporaryConfiguration().maximumAllowedQueuedEvents(); - - // then - assertEquals(1024 * 4, maximumAllowedQueuedEvents); - } - @Test public void shouldTellIfSchemaValidationIsEnabled() throws IOException { // when diff --git a/src/test/java/org/onap/dcae/TLSTestBase.java b/src/test/java/org/onap/dcae/TLSTestBase.java index 4dada129..df10ead9 100644 --- a/src/test/java/org/onap/dcae/TLSTestBase.java +++ b/src/test/java/org/onap/dcae/TLSTestBase.java @@ -24,6 +24,7 @@ package org.onap.dcae; import org.json.JSONObject; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mockito; +import org.onap.dcae.common.EventSender; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.boot.test.mock.mockito.MockBean; @@ -69,8 +70,8 @@ public class TLSTestBase { protected abstract class TestClassBase { @MockBean - @Qualifier("inputQueue") - protected LinkedBlockingQueue queue; + @Qualifier("eventSender") + protected EventSender eventSender; @LocalServerPort private int port; diff --git a/src/test/java/org/onap/dcae/common/EventSenderTest.java b/src/test/java/org/onap/dcae/common/EventSenderTest.java index aba3c2a9..f49d3cd8 100644 --- a/src/test/java/org/onap/dcae/common/EventSenderTest.java +++ b/src/test/java/org/onap/dcae/common/EventSenderTest.java @@ -28,6 +28,7 @@ import static org.mockito.Mockito.when; import io.vavr.collection.HashMap; import io.vavr.collection.Map; +import org.json.JSONArray; import org.json.JSONObject; import org.junit.Test; import org.junit.runner.RunWith; @@ -39,7 +40,6 @@ import org.onap.dcae.common.publishing.EventPublisher; @RunWith(MockitoJUnitRunner.Silent.class) public class EventSenderTest { - private String event = "{\"VESversion\":\"v7\",\"VESuniqueId\":\"fd69d432-5cd5-4c15-9d34-407c81c61c6a-0\",\"event\":{\"commonEventHeader\":{\"startEpochMicrosec\":1544016106000000,\"eventId\":\"fault33\",\"timeZoneOffset\":\"UTC+00.00\",\"priority\":\"Normal\",\"version\":\"4.0.1\",\"nfVendorName\":\"Ericsson\",\"reportingEntityName\":\"1\",\"sequence\":1,\"domain\":\"fault\",\"lastEpochMicrosec\":1544016106000000,\"eventName\":\"Fault_KeyFileFault\",\"vesEventListenerVersion\":\"7.0.1\",\"sourceName\":\"1\"},\"faultFields\":{\"eventSeverity\":\"CRITICAL\",\"alarmCondition\":\"KeyFileFault\",\"faultFieldsVersion\":\"4.0\",\"eventCategory\":\"PROCESSINGERRORALARM\",\"specificProblem\":\"License Key File Fault_1\",\"alarmAdditionalInformation\":{\"probableCause\":\"ConfigurationOrCustomizationError\",\"additionalText\":\"test_1\",\"source\":\"ManagedElement=1,SystemFunctions=1,Lm=1\"},\"eventSourceType\":\"Lm\",\"vfStatus\":\"Active\"}}}\n"; @Mock @@ -54,7 +54,10 @@ public class EventSenderTest { public void shouldntSendEventWhenStreamIdsIsEmpty() { when(settings.dMaaPStreamsMapping()).thenReturn(HashMap.empty()); eventSender = new EventSender(eventPublisher, settings ); - eventSender.send(new JSONObject(event)); + JSONObject jsonObject = new JSONObject(event); + JSONArray jsonArray = new JSONArray(); + jsonArray.put(jsonObject); + eventSender.send(jsonArray); verify(eventPublisher,never()).sendEvent(any(),any()); } @@ -63,7 +66,10 @@ public class EventSenderTest { Map streams = HashMap.of("fault", new String[]{"ves-fault", "fault-ves"}); when(settings.dMaaPStreamsMapping()).thenReturn(streams); eventSender = new EventSender(eventPublisher, settings ); - eventSender.send(new JSONObject(event)); + JSONObject jsonObject = new JSONObject(event); + JSONArray jsonArray = new JSONArray(); + jsonArray.put(jsonObject); + eventSender.send(jsonArray); verify(eventPublisher, times(2)).sendEvent(any(),any()); } } \ No newline at end of file diff --git a/src/test/resources/controller-config_dmaap_ip.json b/src/test/resources/controller-config_dmaap_ip.json index 1cc6576b..f148db55 100644 --- a/src/test/resources/controller-config_dmaap_ip.json +++ b/src/test/resources/controller-config_dmaap_ip.json @@ -1,6 +1,5 @@ { "auth.method": "noAuth", - "collector.inputQueue.maxPending": 8096, "collector.schema.checkflag": 1, "collector.keystore.file.location": "/opt/app/dcae-certificate/keystore.jks", "tomcat.maxthreads": "200", diff --git a/src/test/resources/controller-config_singleline_ip.json b/src/test/resources/controller-config_singleline_ip.json index c3a8d067..a3974e0f 100644 --- a/src/test/resources/controller-config_singleline_ip.json +++ b/src/test/resources/controller-config_singleline_ip.json @@ -5,7 +5,6 @@ "tomcat.maxthreads": "200", "collector.dmaap.streamid": "fault=ves-fault|syslog=ves-syslog|heartbeat=ves-heartbeat|measurementsForVfScaling=ves-measurement|mobileFlow=ves-mobileflow|other=ves-other|stateChange=ves-statechange|thresholdCrossingAlert=ves-thresholdCrossingAlert|voiceQuality=ves-voicequality|sipSignaling=ves-sipsignaling", "streams_subscribes": {}, - "collector.inputQueue.maxPending": "8096", "streams_publishes": { "ves-mobileflow": { "type": "message_router", diff --git a/src/test/resources/test_collector_ip_op.properties b/src/test/resources/test_collector_ip_op.properties index 9450067a..0916211f 100644 --- a/src/test/resources/test_collector_ip_op.properties +++ b/src/test/resources/test_collector_ip_op.properties @@ -9,7 +9,6 @@ collector.dmaapfile=./etc/DmaapConfig.json auth.method=noAuth header.authlist=sample1,$2a$10$pgjaxDzSuc6XVFEeqvxQ5u90DKJnM/u7TJTcinAlFJVaavXMWf/Zi|userid1,$2a$10$61gNubgJJl9lh3nvQvY9X.x4e5ETWJJ7ao7ZhJEvmfJigov26Z6uq|userid2,$2a$10$G52y/3uhuhWAMy.bx9Se8uzWinmbJa.dlm1LW6bYPdPkkywLDPLiy event.transform.flag=1 -collector.inputQueue.maxPending = 8096 streams_subscribes = {} services_calls = {} tomcat.maxthreads = 200 -- cgit 1.2.3-korg