aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVijay Venkatesh Kumar <vv770d@att.com>2019-06-14 22:15:52 +0000
committerGerrit Code Review <gerrit@onap.org>2019-06-14 22:15:52 +0000
commit639686e01af925f2be3bd0d3cd187be7df0b2cf8 (patch)
tree17a6cab254276f849ce61c3e1483df65eec00029
parentc3a2fd25011e1777ef48d79632170c0ed913f928 (diff)
parent3c3c7ad09c02852cd0b4db03ecc9cc5c429cab08 (diff)
Merge "VES Collector - Event Ordering"
-rw-r--r--dpo/blueprint/blueprint_ves.yaml387
-rw-r--r--dpo/data-formats/ConsulConfig.json1
-rw-r--r--dpo/spec/vescollector-componentspec.json15
-rw-r--r--dpo/tosca_model/schema.yaml2
-rw-r--r--dpo/tosca_model/template.yaml1
-rw-r--r--dpo/tosca_model/translate.yaml4
-rwxr-xr-xetc/collector.properties9
-rw-r--r--src/main/java/org/onap/dcae/ApplicationSettings.java83
-rw-r--r--src/main/java/org/onap/dcae/VesApplication.java29
-rw-r--r--src/main/java/org/onap/dcae/common/EventProcessor.java61
-rw-r--r--src/main/java/org/onap/dcae/common/EventSender.java94
-rw-r--r--src/main/java/org/onap/dcae/common/EventUpdater.java137
-rw-r--r--src/main/java/org/onap/dcae/restapi/EventValidator.java78
-rw-r--r--src/main/java/org/onap/dcae/restapi/HealthCheckController.java12
-rw-r--r--src/main/java/org/onap/dcae/restapi/SwaggerConfig.java2
-rw-r--r--src/main/java/org/onap/dcae/restapi/VesRestController.java179
-rw-r--r--src/main/java/org/onap/dcae/restapi/WebMvcConfig.java1
-rw-r--r--src/test/java/org/onap/dcae/ApplicationSettingsTest.java19
-rw-r--r--src/test/java/org/onap/dcae/TLSTestBase.java5
-rw-r--r--src/test/java/org/onap/dcae/common/EventSenderTest.java12
-rw-r--r--src/test/resources/controller-config_dmaap_ip.json1
-rw-r--r--src/test/resources/controller-config_singleline_ip.json1
-rw-r--r--src/test/resources/test_collector_ip_op.properties1
23 files changed, 581 insertions, 553 deletions
diff --git a/dpo/blueprint/blueprint_ves.yaml b/dpo/blueprint/blueprint_ves.yaml
index 2dbc5a66..43158f1c 100644
--- a/dpo/blueprint/blueprint_ves.yaml
+++ b/dpo/blueprint/blueprint_ves.yaml
@@ -15,169 +15,240 @@
# ============LICENSE_END=========================================================
#
# ECOMP is a trademark and service mark of AT&T Intellectual Property.
-tosca_definitions_version: cloudify_dsl_1_3
-
-description: >
- This handcrafted blueprint will install the ves collector and provision the needed message router topics. This blueprint can be used to verify that a platform installation is operational and working correctly.
+tosca_definitions_version: cloudify_dsl_1_3
imports:
-- http://www.getcloudify.org/spec/cloudify/3.4/types.yaml
-- https://NEXUS_REPO_HOST:8443/repository/NEXUS_RAW/type_files/docker/2.2.0/node-type.yaml
-- https://NEXUS_REPO_HOST:8443/repository/NEXUS_RAW/type_files/relationship/1.0.0/node-type.yaml
-- http://NEXUS_REPO_HOST:8081/repository/NEXUS_RAW/type_files/dmaap/dmaap_mr.yaml
-
+ - http://www.getcloudify.org/spec/cloudify/3.4/types.yaml
+ - https://nexus.onap.org/service/local/repositories/raw/content/org.onap.dcaegen2.platform.plugins/R4/k8splugin/1.4.5/k8splugin_types.yaml
+ - https://nexus.onap.org/service/local/repositories/raw/content/org.onap.dcaegen2.platform.plugins/R4/dcaepolicyplugin/2.3.0/dcaepolicyplugin_types.yaml
inputs:
-
- service_id:
- description: Unique id used for an instance of this DCAE service. Use deployment id
- default: 'foobar'
- location_id:
- default: 'solutioning-central'
- docker_host_override:
- default: 'component_dockerhost'
-
- topic00_aaf_username:
- topic00_aaf_password:
- topic00_location:
- default: mtc5
- topic00_client_role:
- default: com.att.dcae.member
-
- topic01_aaf_username:
- topic01_aaf_password:
- topic01_location:
- default: mtc5
- topic01_client_role:
- default: com.att.dcae.member
-
- topic02_aaf_username:
- topic02_aaf_password:
- topic02_location:
- default: mtc5
- topic02_client_role:
- default: com.att.dcae.member
-
- topic03_aaf_username:
- topic03_aaf_password:
- topic03_location:
- default: mtc5
- topic03_client_role:
- default: com.att.dcae.member
-
+ collector.dmaap.streamid:
+ type: string
+ default: "fault=ves-fault,ves-fault-secondary|syslog=ves-syslog,ves-syslog-secondary|heartbeat=ves-heartbeat,ves-heartbeat-secondary|measurementsForVfScaling=ves-measurement,ves-measurement-secondary|mobileFlow=ves-mobileflow,ves-mobileflow-secondary|other=ves-other,ves-other-secondary|stateChange=ves-statechange,ves-statechange-secondary|thresholdCrossingAlert=ves-thresholdCrossingAlert,ves-thresholdCrossingAlert-secondary|voiceQuality=ves-voicequality,ves-voicequality-secondary|sipSignaling=ves-sipsignaling,ves-sipsignaling-secondary|notification=ves-notification,ves-notification-secondary|pnfRegistration=ves-pnfRegistration,ves-pnfRegistration-secondary"
+ external_port:
+ type: string
+ description: Kubernetes node port on which collector is exposed
+ default: "30235"
+ header.authlist:
+ type: string
+ default: "sample1,$2a$10$pgjaxDzSuc6XVFEeqvxQ5u90DKJnM/u7TJTcinAlFJVaavXMWf/Zi|userid1,$2a$10$61gNubgJJl9lh3nvQvY9X.x4e5ETWJJ7ao7ZhJEvmfJigov26Z6uq|userid2,$2a$10$G52y/3uhuhWAMy.bx9Se8uzWinmbJa.dlm1LW6bYPdPkkywLDPLiy"
+ log_directory:
+ type: string
+ default: "/opt/app/VESCollector/logs"
+ replicas:
+ type: integer
+ description: number of instances
+ default: 1
+ tag_version:
+ type: string
+ default: "nexus.onap.org:10001/onap/org.onap.dcaegen2.collectors.ves.vescollector:1.3"
+ ves_fault_publish_url:
+ type: string
+ ves_fault_secondary_publish_url:
+ type: string
+ ves_heartbeat_publish_url:
+ type: string
+ ves_heartbeat_secondary_publish_url:
+ type: string
+ ves_measurement_publish_url:
+ type: string
+ ves_measurement_secondary_publish_url:
+ type: string
+ ves_mobileflow_publish_url:
+ type: string
+ ves_mobileflow_secondary_publish_url:
+ type: string
+ ves_notification_publish_url:
+ type: string
+ ves_notification_secondary_publish_url:
+ type: string
+ ves_other_publish_url:
+ type: string
+ ves_other_secondary_publish_url:
+ type: string
+ ves_pnfRegistration_publish_url:
+ type: string
+ ves_pnfRegistration_secondary_publish_url:
+ type: string
+ ves_sipsignaling_publish_url:
+ type: string
+ ves_sipsignaling_secondary_publish_url:
+ type: string
+ ves_statechange_publish_url:
+ type: string
+ ves_statechange_secondary_publish_url:
+ type: string
+ ves_syslog_publish_url:
+ type: string
+ ves_syslog_secondary_publish_url:
+ type: string
+ ves_thresholdCrossingAlert_publish_url:
+ type: string
+ ves_thresholdCrossingAlert_secondary_publish_url:
+ type: string
+ ves_voicequality_publish_url:
+ type: string
+ ves_voicequality_secondary_publish_url:
+ type: string
node_templates:
-
- topic00:
- type: dcae.nodes.Topic
- properties:
- topic_name: sec-fault-unsecure
-
- topic01:
- type: dcae.nodes.Topic
- properties:
- topic_name: sec-measurement
-
- topic02:
- type: dcae.nodes.Topic
- properties:
- topic_name: sec-measurement-unsecure
-
- topic03:
- type: dcae.nodes.Topic
- properties:
- topic_name: sec-fault
-
- component00:
- type: dcae.nodes.DockerContainerForComponentsUsingDmaap
- properties:
- service_component_type:
- 'dcae-controller-ves-collector'
- service_id:
- { get_input: service_id }
- location_id:
- { get_input: location_id }
- application_config:
- collector.keystore.passwordfile: "/opt/app/dcae-certificate/.password"
- collector.service.secure.port: -1
- tomcat.maxthreads: '200'
- collector.keystore.file.location: "/opt/app/dcae-certificate/keystore.jks"
- auth.method: "noAuth"
- collector.service.port: 8080
- streams_publishes:
- sec_fault_unsecure:
- aaf_password: { get_input: topic00_aaf_password }
- dmaap_info: "<<topic00>>"
- type: message_router
- aaf_username: { get_input: topic00_aaf_username }
- sec_measurement:
- aaf_password: { get_input: topic01_aaf_password }
- aaf_username: { get_input: topic01_aaf_username }
- type: message_router
- dmaap_info: "<<topic01>>"
- sec_measurement_unsecure:
- aaf_password: { get_input: topic02_aaf_password }
- aaf_username: { get_input: topic02_aaf_username }
- dmaap_info: "<<topic02>>"
- type: message_router
- sec_fault:
- aaf_password: { get_input: topic03_aaf_password }
- aaf_username: { get_input: topic03_aaf_username }
- dmaap_info: "<<topic03>>"
- type: message_router
- services_calls: {}
- collector.schema.checkflag: 1
- collector.dmaap.streamid: fault=sec_fault,roadm-sec-to-hp|syslog=sec_syslog|heartbeat=sec_heartbeat|measurementsForVfScaling=sec_measurement|mobileFlow=sec_mobileflow|other=sec_other|stateChange=sec_statechange|thresholdCrossingAlert=sec_thresholdCrossingAlert
- header.authlist: userid1,base64encodepwd1|userid2,base64encodepwd2
- streams_subscribes: {}
- collector.inputQueue.maxPending: 8096
- collector.schema.file: "./etc/CommonEventFormat_27.2.json"
- image:
- NEXUS_REPO_HOST:18443/dcae-dev-raw/dcae-controller-ves-collector:1.1.3
- docker_config:
- healthcheck:
- type: "http"
- interval: "15s"
- timeout: "1s"
- endpoint: "/"
- streams_publishes:
- - name: topic00
- location: { get_input: topic00_location }
- client_role: { get_input: topic00_client_role }
- type: message_router
- - name: topic01
- location: { get_input: topic01_location }
- client_role: { get_input: topic01_client_role }
- type: message_router
- - name: topic02
- location: { get_input: topic02_location }
- client_role: { get_input: topic02_client_role }
- type: message_router
- - name: topic03
- location: { get_input: topic03_location }
- client_role: { get_input: topic03_client_role }
- type: message_router
- streams_subscribes: []
- relationships:
- - type: dcae.relationships.component_contained_in
- target: docker_host
- - type: dcae.relationships.publish_events
- target: topic00
- - type: dcae.relationships.publish_events
- target: topic01
- - type: dcae.relationships.publish_events
- target: topic02
- - type: dcae.relationships.publish_events
- target: topic03
+ dcae-ves-collector:
+ type: dcae.nodes.ContainerizedPlatformComponent
interfaces:
cloudify.interfaces.lifecycle:
- stop:
+ start:
inputs:
- cleanup_image:
- True
-
- docker_host:
- type: dcae.nodes.SelectedDockerHost
+ ports:
+ - concat: ["8443:", {get_input: external_port }]
properties:
- location_id:
- { get_input: location_id }
- docker_host_override:
- { get_input: docker_host_override }
+ application_config:
+ service_calls: []
+ stream_publishes:
+ ves-fault:
+ dmaap_info:
+ topic_url:
+ get_input: ves_fault_publish_url
+ type: message router
+ ves-fault-secondary:
+ dmaap_info:
+ topic_url:
+ get_input: ves_fault_secondary_publish_url
+ type: message router
+ ves-heartbeat:
+ dmaap_info:
+ topic_url:
+ get_input: ves_heartbeat_publish_url
+ type: message router
+ ves-heartbeat-secondary:
+ dmaap_info:
+ topic_url:
+ get_input: ves_heartbeat_secondary_publish_url
+ type: message router
+ ves-measurement:
+ dmaap_info:
+ topic_url:
+ get_input: ves_measurement_publish_url
+ type: message router
+ ves-measurement-secondary:
+ dmaap_info:
+ topic_url:
+ get_input: ves_measurement_secondary_publish_url
+ type: message router
+ ves-mobileflow:
+ dmaap_info:
+ topic_url:
+ get_input: ves_mobileflow_publish_url
+ type: message router
+ ves-mobileflow-secondary:
+ dmaap_info:
+ topic_url:
+ get_input: ves_mobileflow_secondary_publish_url
+ type: message router
+ ves-notification:
+ dmaap_info:
+ topic_url:
+ get_input: ves_notification_publish_url
+ type: message router
+ ves-notification-secondary:
+ dmaap_info:
+ topic_url:
+ get_input: ves_notification_secondary_publish_url
+ type: message router
+ ves-other:
+ dmaap_info:
+ topic_url:
+ get_input: ves_other_publish_url
+ type: message router
+ ves-other-secondary:
+ dmaap_info:
+ topic_url:
+ get_input: ves_other_secondary_publish_url
+ type: message router
+ ves-pnfRegistration:
+ dmaap_info:
+ topic_url:
+ get_input: ves_pnfRegistration_publish_url
+ type: message router
+ ves-pnfRegistration-secondary:
+ dmaap_info:
+ topic_url:
+ get_input: ves_pnfRegistration_secondary_publish_url
+ type: message router
+ ves-sipsignaling:
+ dmaap_info:
+ topic_url:
+ get_input: ves_sipsignaling_publish_url
+ type: message router
+ ves-sipsignaling-secondary:
+ dmaap_info:
+ topic_url:
+ get_input: ves_sipsignaling_secondary_publish_url
+ type: message router
+ ves-statechange:
+ dmaap_info:
+ topic_url:
+ get_input: ves_statechange_publish_url
+ type: message router
+ ves-statechange-secondary:
+ dmaap_info:
+ topic_url:
+ get_input: ves_statechange_secondary_publish_url
+ type: message router
+ ves-syslog:
+ dmaap_info:
+ topic_url:
+ get_input: ves_syslog_publish_url
+ type: message router
+ ves-syslog-secondary:
+ dmaap_info:
+ topic_url:
+ get_input: ves_syslog_secondary_publish_url
+ type: message router
+ ves-thresholdCrossingAlert:
+ dmaap_info:
+ topic_url:
+ get_input: ves_thresholdCrossingAlert_publish_url
+ type: message router
+ ves-thresholdCrossingAlert-secondary:
+ dmaap_info:
+ topic_url:
+ get_input: ves_thresholdCrossingAlert_secondary_publish_url
+ type: message router
+ ves-voicequality:
+ dmaap_info:
+ topic_url:
+ get_input: ves_voicequality_publish_url
+ type: message router
+ ves-voicequality-secondary:
+ dmaap_info:
+ topic_url:
+ get_input: ves_voicequality_secondary_publish_url
+ type: message router
+ stream_subcribes: {}
+ auth.method: noAuth
+ collector.dmaap.streamid:
+ get_input: collector.dmaap.streamid
+ collector.keystore.file.location: /opt/app/dcae-certificate/keystore.jks
+ collector.keystore.passwordfile: /opt/app/dcae-certificate/.password
+ collector.schema.checkflag: 1
+ collector.schema.file: {"v1":"./etc/CommonEventFormat_27.2.json","v2":"./etc/CommonEventFormat_27.2.json","v3":"./etc/CommonEventFormat_27.2.json","v4":"./etc/CommonEventFormat_27.2.json","v5":"./etc/CommonEventFormat_28.4.1.json","v7":"./etc/CommonEventFormat_30.json"}
+ collector.service.port: 8080
+ collector.service.secure.port: 8443
+ collector.truststore.file.location: /opt/app/dcae-certificate/truststore.jks
+ collector.truststore.passwordfile: /opt/app/dcae-certificate/.trustpassword
+ event.transform.flag: 1
+ header.authlist:
+ get_input: header.authlist
+ tomcat.maxthreads: 200
+ docker_config:
+ interval: 15s
+ timeout: 1s
+ type: https
+ endpoint: /healthcheck
+ image:
+ get_input: tag_version
+ log_info:
+ get_input: log_directory
+ dns_name: dcae-ves-collector
+ replicas:
+ get_input: replicas
+ name: dcae-ves-collector
diff --git a/dpo/data-formats/ConsulConfig.json b/dpo/data-formats/ConsulConfig.json
index ea65522b..89348bfa 100644
--- a/dpo/data-formats/ConsulConfig.json
+++ b/dpo/data-formats/ConsulConfig.json
@@ -6,7 +6,6 @@
"collector.service.port": "8080",
"collector.schema.file": "{\"v1\":\"./etc/CommonEventFormat_27.2.json\",\"v2\":\"./etc/CommonEventFormat_27.2.json\",\"v3\":\"./etc/CommonEventFormat_27.2.json\",\"v4\":\"./etc/CommonEventFormat_27.2.json\",\"v5\":\"./etc/CommonEventFormat_28.4.1.json\",\"v7\":\"./etc/CommonEventFormat_30.0.1.json\"}",
"collector.keystore.passwordfile": "/opt/app/VESCollector/etc/passwordfile",
- "collector.inputQueue.maxPending": "8096",
"streams_publishes": {
"ves-measurement": {
"type": "message_router",
diff --git a/dpo/spec/vescollector-componentspec.json b/dpo/spec/vescollector-componentspec.json
index 4e2eb970..1b47268c 100644
--- a/dpo/spec/vescollector-componentspec.json
+++ b/dpo/spec/vescollector-componentspec.json
@@ -1,6 +1,6 @@
{
"self": {
- "version": "1.3.0",
+ "version": "1.5.0",
"name": "dcae-ves-collector",
"description": "Collector for receiving VES events through restful interface",
"component_type": "docker"
@@ -281,14 +281,6 @@
"designer_editable": false
},
{
- "name": "collector.inputQueue.maxPending",
- "value": 8096,
- "description": "Maximum queue limit across domains collector will queue before event is published",
- "sourced_at_deployment": false,
- "policy_editable": false,
- "designer_editable": false
- },
- {
"name": "collector.dmaap.streamid",
"value": "fault=ves-fault,ves-fault-secondary|syslog=ves-syslog,ves-syslog-secondary|heartbeat=ves-heartbeat,ves-heartbeat-secondary|measurementsForVfScaling=ves-measurement,ves-measurement-secondary|mobileFlow=ves-mobileflow,ves-mobileflow-secondary|other=ves-other,ves-other-secondary|stateChange=ves-statechange,ves-statechange-secondary|thresholdCrossingAlert=ves-thresholdCrossingAlert,ves-thresholdCrossingAlert-secondary|voiceQuality=ves-voicequality,ves-voicequality-secondary|sipSignaling=ves-sipsignaling,ves-sipsignaling-secondary|notification=ves-notification,ves-notification-secondary|pnfRegistration=ves-pnfRegistration,ves-pnfRegistration-secondary",
"description": "domain-to-streamid mapping used by VESCollector to distributes events based on domain. Both primary and secondary config_key are included for resilency (multiple streamid can be included commma separated). The streamids MUST match to topic config_keys. For single site without resiliency deployment - configkeys with -secondary suffix can be removed",
@@ -299,7 +291,7 @@
{
"name": "auth.method",
"value": "noAuth",
- "description": "Basic Authentication flag; when enabled only secure port will be supported.",
+ "description": "Property to manage application mode, possible configurations: noAuth - default option - no security (http) , certOnly - auth by certificate (https), basicAuth - auth by basic auth username and password (https),certBasicAuth - auth by certificate and basic auth username / password (https),",
"sourced_at_deployment": false,
"policy_editable": false,
"designer_editable": false
@@ -379,13 +371,14 @@
}
],
"ports": [
+ "8080:8080",
"8443:8443"
]
},
"artifacts": [
{
"type": "docker image",
- "uri": "nexus.onap.org:10001/onap/org.onap.dcaegen2.collectors.ves.vescollector:1.3"
+ "uri": "nexus.onap.org:10001/onap/org.onap.dcaegen2.collectors.ves.vescollector:latest"
}
]
}
diff --git a/dpo/tosca_model/schema.yaml b/dpo/tosca_model/schema.yaml
index 6c1b2757..c44a4f7e 100644
--- a/dpo/tosca_model/schema.yaml
+++ b/dpo/tosca_model/schema.yaml
@@ -195,8 +195,6 @@ node_types:
properties:
docker_collector.dmaap.streamid:
type: string
- docker_collector.inputQueue.maxPending:
- type: string
docker_collector.keystore.file.location:
type: string
docker_collector.keystore.passwordfile:
diff --git a/dpo/tosca_model/template.yaml b/dpo/tosca_model/template.yaml
index 73b4ad38..2f132e12 100644
--- a/dpo/tosca_model/template.yaml
+++ b/dpo/tosca_model/template.yaml
@@ -26,7 +26,6 @@ topology_template:
type: dcae.nodes.dockerApp.ves
properties:
docker_collector.dmaap.streamid: fault=sec_fault,roadm-sec-to-hp|syslog=sec_syslog|heartbeat=sec_heartbeat|measurementsForVfScaling=sec_measurement|mobileFlow=sec_mobileflow|other=sec_other|stateChange=sec_statechange|thresholdCrossingAlert=sec_thresholdCrossingAlert
- docker_collector.inputQueue.maxPending: '8096'
docker_collector.keystore.file.location: /opt/app/dcae-certificate/keystore.jks
docker_collector.keystore.passwordfile: /opt/app/dcae-certificate/.password
docker_collector.schema.checkflag: '1'
diff --git a/dpo/tosca_model/translate.yaml b/dpo/tosca_model/translate.yaml
index 284f34bf..f6b7a23e 100644
--- a/dpo/tosca_model/translate.yaml
+++ b/dpo/tosca_model/translate.yaml
@@ -24,8 +24,6 @@ topology_template:
inputs:
docker_collector.dmaap.streamid:
type: string
- docker_collector.inputQueue.maxPending:
- type: string
docker_collector.keystore.file.location:
type: string
docker_collector.keystore.passwordfile:
@@ -105,8 +103,6 @@ topology_template:
application_config:
collector.dmaap.streamid:
get_input: docker_collector.dmaap.streamid
- collector.inputQueue.maxPending:
- get_input: docker_collector.inputQueue.maxPending
collector.keystore.file.location:
get_input: docker_collector.keystore.file.location
collector.keystore.passwordfile:
diff --git a/etc/collector.properties b/etc/collector.properties
index 82ba5954..ae15cd98 100755
--- a/etc/collector.properties
+++ b/etc/collector.properties
@@ -46,15 +46,6 @@ collector.cert.subject.matcher=etc/certSubjectMatcher.properties
collector.truststore.file.location=etc/truststore
collector.truststore.passwordfile=etc/trustpasswordfile
-## Processing
-##
-## If there's a problem that prevents the collector from processing alarms,
-## it's normally better to apply back pressure to the caller than to try to
-## buffer beyond a reasonable size limit. With a limit, the server won't crash
-## due to being out of memory, and the caller will get a 5xx reply saying the
-## server is in trouble.
-collector.inputQueue.maxPending=8096
-
## Schema Validation checkflag
## default no validation checkflag (-1)
## If enabled (1) - schemafile location must be specified
diff --git a/src/main/java/org/onap/dcae/ApplicationSettings.java b/src/main/java/org/onap/dcae/ApplicationSettings.java
index 61bcf4b4..205659c4 100644
--- a/src/main/java/org/onap/dcae/ApplicationSettings.java
+++ b/src/main/java/org/onap/dcae/ApplicationSettings.java
@@ -84,38 +84,14 @@ public class ApplicationSettings {
throw new ApplicationException(ex);
}
}
- public void loadPropertiesFromFile() {
- try {
- properties.load(configurationFileLocation);
- } catch (ConfigurationException ex) {
- log.error("Cannot load properties cause:", ex);
- throw new ApplicationException(ex);
- }
- }
-
public Map<String, String> validAuthorizationCredentials() {
return prepareUsersMap(properties.getString("header.authlist", null));
}
- private Map<String, String> prepareUsersMap(@Nullable String allowedUsers) {
- return allowedUsers == null ? HashMap.empty()
- : List.of(allowedUsers.split("\\|"))
- .map(t->t.split(","))
- .toMap(t-> t[0].trim(), t -> t[1].trim());
- }
-
- private String findOutConfigurationFileLocation(Map<String, String> parsedArgs) {
- return prependWithUserDirOnRelative(parsedArgs.get("c").getOrElse("etc/collector.properties"));
- }
-
public Path configurationFileLocation() {
return Paths.get(configurationFileLocation);
}
- public int maximumAllowedQueuedEvents() {
- return properties.getInt("collector.inputQueue.maxPending", 1024 * 4);
- }
-
public boolean jsonSchemaValidationEnabled() {
return properties.getInt("collector.schema.checkflag", -1) > 0;
}
@@ -126,22 +102,8 @@ public class ApplicationSettings {
.getOrElseThrow(() -> new IllegalStateException("No fallback schema present in application."));
}
- private Map<String, JsonSchema> loadJsonSchemas() {
- return jsonSchema().toMap().entrySet().stream()
- .map(this::readSchemaForVersion)
- .collect(HashMap.collector());
- }
-
- private Tuple2<String, JsonSchema> readSchemaForVersion(java.util.Map.Entry<String, Object> versionToFilePath) {
- try {
- String schemaContent = new String(
- readAllBytes(Paths.get(versionToFilePath.getValue().toString())));
- JsonNode schemaNode = JsonLoader.fromString(schemaContent);
- JsonSchema schema = JsonSchemaFactory.byDefault().getJsonSchema(schemaNode);
- return Tuple(versionToFilePath.getKey(), schema);
- } catch (IOException | ProcessingException e) {
- throw new ApplicationException("Could not read schema from path: " + versionToFilePath.getValue(), e);
- }
+ public boolean isVersionSupported(String version){
+ return loadedJsonSchemas.containsKey(version);
}
public int httpPort() {
@@ -183,6 +145,7 @@ public class ApplicationSettings {
public String exceptionConfigFileLocation() {
return properties.getString("exceptionConfig", null);
}
+
public String dMaaPConfigurationFileLocation() {
return prependWithUserDirOnRelative(properties.getString("collector.dmaapfile", "etc/DmaapConfig.json"));
}
@@ -204,7 +167,16 @@ public class ApplicationSettings {
}
}
- public void addOrUpdate(String key, String value) {
+ private void loadPropertiesFromFile() {
+ try {
+ properties.load(configurationFileLocation);
+ } catch (ConfigurationException ex) {
+ log.error("Cannot load properties cause:", ex);
+ throw new ApplicationException(ex);
+ }
+ }
+
+ private void addOrUpdate(String key, String value) {
if (properties.containsKey(key)) {
properties.setProperty(key, value);
} else {
@@ -212,6 +184,35 @@ public class ApplicationSettings {
}
}
+ private String findOutConfigurationFileLocation(Map<String, String> parsedArgs) {
+ return prependWithUserDirOnRelative(parsedArgs.get("c").getOrElse("etc/collector.properties"));
+ }
+
+ private Map<String, JsonSchema> loadJsonSchemas() {
+ return jsonSchema().toMap().entrySet().stream()
+ .map(this::readSchemaForVersion)
+ .collect(HashMap.collector());
+ }
+
+ private Tuple2<String, JsonSchema> readSchemaForVersion(java.util.Map.Entry<String, Object> versionToFilePath) {
+ try {
+ String schemaContent = new String(
+ readAllBytes(Paths.get(versionToFilePath.getValue().toString())));
+ JsonNode schemaNode = JsonLoader.fromString(schemaContent);
+ JsonSchema schema = JsonSchemaFactory.byDefault().getJsonSchema(schemaNode);
+ return Tuple(versionToFilePath.getKey(), schema);
+ } catch (IOException | ProcessingException e) {
+ throw new ApplicationException("Could not read schema from path: " + versionToFilePath.getValue(), e);
+ }
+ }
+
+ private Map<String, String> prepareUsersMap(@Nullable String allowedUsers) {
+ return allowedUsers == null ? HashMap.empty()
+ : List.of(allowedUsers.split("\\|"))
+ .map(t->t.split(","))
+ .toMap(t-> t[0].trim(), t -> t[1].trim());
+ }
+
private JSONObject jsonSchema() {
return new JSONObject(properties.getString("collector.schema.file",
format("{\"%s\":\"etc/CommonEventFormat_28.4.1.json\"}", FALLBACK_VES_VERSION)));
diff --git a/src/main/java/org/onap/dcae/VesApplication.java b/src/main/java/org/onap/dcae/VesApplication.java
index d658b4aa..e3340820 100644
--- a/src/main/java/org/onap/dcae/VesApplication.java
+++ b/src/main/java/org/onap/dcae/VesApplication.java
@@ -22,14 +22,9 @@ package org.onap.dcae;
import io.vavr.collection.Map;
import java.nio.file.Paths;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
-import org.json.JSONObject;
-import org.onap.dcae.common.EventProcessor;
import org.onap.dcae.common.EventSender;
import org.onap.dcae.common.publishing.DMaaPConfigurationParser;
import org.onap.dcae.common.publishing.EventPublisher;
@@ -49,21 +44,16 @@ import org.springframework.context.annotation.Lazy;
@SpringBootApplication(exclude = {GsonAutoConfiguration.class, SecurityAutoConfiguration.class})
public class VesApplication {
- private static final Logger metriclog = LoggerFactory.getLogger("com.att.ecomp.metrics");
private static final Logger incomingRequestsLogger = LoggerFactory.getLogger("org.onap.dcae.common.input");
private static final Logger oplog = LoggerFactory.getLogger("org.onap.dcae.common.output");
private static final Logger errorLog = LoggerFactory.getLogger("org.onap.dcae.common.error");
- private static final int MAX_THREADS = 20;
- public static LinkedBlockingQueue<JSONObject> fProcessingInputQueue;
private static ApplicationSettings properties;
private static ConfigurableApplicationContext context;
private static ConfigLoader configLoader;
- private static EventProcessor eventProcessor;
private static ScheduledThreadPoolExecutor scheduledThreadPoolExecutor;
private static SpringApplication app;
private static EventPublisher eventPublisher;
private static ScheduledFuture<?> scheduleFeatures;
- private static ExecutorService executor;
public static void main(String[] args) {
app = new SpringApplication(VesApplication.class);
@@ -73,7 +63,6 @@ public class VesApplication {
app.setAddCommandLineProperties(true);
context = app.run();
configLoader.updateConfig();
-
}
public static void restartApplication() {
@@ -89,7 +78,6 @@ public class VesApplication {
}
private static void init() {
- fProcessingInputQueue = new LinkedBlockingQueue<>(properties.maximumAllowedQueuedEvents());
createConfigLoader();
createSchedulePoolExecutor();
createExecutors();
@@ -97,12 +85,6 @@ public class VesApplication {
private static void createExecutors() {
eventPublisher = EventPublisher.createPublisher(oplog, getDmapConfig());
- eventProcessor = new EventProcessor(new EventSender(eventPublisher, properties));
-
- executor = Executors.newFixedThreadPool(MAX_THREADS);
- for (int i = 0; i < MAX_THREADS; ++i) {
- executor.execute(eventProcessor);
- }
}
private static void createSchedulePoolExecutor() {
@@ -142,20 +124,15 @@ public class VesApplication {
}
@Bean
- @Qualifier("metricsLog")
- public Logger incomingRequestsMetricsLogger() {
- return metriclog;
- }
-
- @Bean
@Qualifier("errorLog")
public Logger errorLogger() {
return errorLog;
}
@Bean
- public LinkedBlockingQueue<JSONObject> inputQueue() {
- return fProcessingInputQueue;
+ @Qualifier("eventSender")
+ public EventSender eventSender() {
+ return new EventSender(eventPublisher,properties);
}
}
diff --git a/src/main/java/org/onap/dcae/common/EventProcessor.java b/src/main/java/org/onap/dcae/common/EventProcessor.java
deleted file mode 100644
index bf3bf70d..00000000
--- a/src/main/java/org/onap/dcae/common/EventProcessor.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*-
- * ============LICENSE_START=======================================================
- * PROJECT
- * ================================================================================
- * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.dcae.common;
-
-import com.att.nsa.clock.SaClock;
-import com.att.nsa.logging.LoggingContext;
-import com.att.nsa.logging.log4j.EcompFields;
-import org.json.JSONObject;
-import org.onap.dcae.VesApplication;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class EventProcessor implements Runnable {
-
- private static final Logger log = LoggerFactory.getLogger(EventProcessor.class);
- private EventSender eventSender;
-
- public EventProcessor(EventSender eventSender) {
- this.eventSender = eventSender;
- }
-
- @Override
- public void run() {
- try {
- while (true){
- JSONObject event = VesApplication.fProcessingInputQueue.take();
- log.info("QueueSize:" + VesApplication.fProcessingInputQueue.size() + "\tEventProcessor\tRemoving element: " + event);
- setLoggingContext(event);
- log.debug("event.VESuniqueId" + event.get("VESuniqueId") + "event.commonEventHeader.domain:" + eventSender.getDomain(event));
- eventSender.send(event);
- log.debug("Message published" + event);
- }
- } catch (InterruptedException e) {
- log.error("EventProcessor InterruptedException" + e.getMessage());
- Thread.currentThread().interrupt();
- }
- }
-
- private void setLoggingContext(JSONObject event) {
- LoggingContext localLC = VESLogger.getLoggingContextForThread(event.get("VESuniqueId").toString());
- localLC.put(EcompFields.kBeginTimestampMs, SaClock.now());
- }
-} \ No newline at end of file
diff --git a/src/main/java/org/onap/dcae/common/EventSender.java b/src/main/java/org/onap/dcae/common/EventSender.java
index 48268d6c..c1002af6 100644
--- a/src/main/java/org/onap/dcae/common/EventSender.java
+++ b/src/main/java/org/onap/dcae/common/EventSender.java
@@ -20,17 +20,12 @@
*/
package org.onap.dcae.common;
-import com.google.common.reflect.TypeToken;
-import com.google.gson.Gson;
+import com.att.nsa.clock.SaClock;
+import com.att.nsa.logging.LoggingContext;
+import com.att.nsa.logging.log4j.EcompFields;
import io.vavr.collection.Map;
-import java.io.FileReader;
-import java.io.IOException;
-import java.lang.reflect.Type;
-import java.text.SimpleDateFormat;
-import java.util.Date;
-import java.util.List;
+import org.json.JSONArray;
import org.json.JSONObject;
-import org.onap.dcae.ApplicationException;
import org.onap.dcae.ApplicationSettings;
import org.onap.dcae.common.publishing.EventPublisher;
import org.slf4j.Logger;
@@ -38,88 +33,47 @@ import org.slf4j.LoggerFactory;
public class EventSender {
- private static final String COULD_NOT_FIND_FILE = "Couldn't find file ./etc/eventTransform.json";
+ private static final Logger metriclog = LoggerFactory.getLogger("com.att.ecomp.metrics");
private Map<String, String[]> streamidHash;
- private ApplicationSettings properties;
private EventPublisher eventPublisher;
-
- private static final Type EVENT_LIST_TYPE = new TypeToken<List<Event>>() {}.getType();
+ private static final String VES_UNIQUE_ID = "VESuniqueId";
private static final Logger log = LoggerFactory.getLogger(EventSender.class);
private static final String EVENT_LITERAL = "event";
private static final String COMMON_EVENT_HEADER = "commonEventHeader";
- private final SimpleDateFormat dateFormat = new SimpleDateFormat("EEE, MM dd yyyy hh:mm:ss z");
public EventSender( EventPublisher eventPublisher, ApplicationSettings properties) {
this.eventPublisher = eventPublisher;
this.streamidHash = properties.dMaaPStreamsMapping();
- this.properties = properties;
-
}
- public void send(JSONObject event) {
- streamidHash.get(getDomain(event))
- .onEmpty(() -> log.error("No StreamID defined for publish - Message dropped" + event))
- .forEach(streamIds -> sendEventsToStreams(event, streamIds));
+ public void send(JSONArray arrayOfEvents) {
+ for (int i = 0; i < arrayOfEvents.length(); i++) {
+ metriclog.info("EVENT_PUBLISH_START");
+ JSONObject object = (JSONObject) arrayOfEvents.get(i);
+ setLoggingContext(object);
+ streamidHash.get(getDomain(object))
+ .onEmpty(() -> log.error("No StreamID defined for publish - Message dropped" + object))
+ .forEach(streamIds -> sendEventsToStreams(object, streamIds));
+ log.debug("Message published" + object);
+ }
+ log.debug("CommonStartup.handleEvents:EVENTS has been published successfully!");
+ metriclog.info("EVENT_PUBLISH_END");
}
- public static String getDomain(JSONObject event) {
+ private static String getDomain(JSONObject event) {
return event.getJSONObject(EVENT_LITERAL).getJSONObject(COMMON_EVENT_HEADER).getString("domain");
}
private void sendEventsToStreams(JSONObject event, String[] streamIdList) {
for (String aStreamIdList : streamIdList) {
log.info("Invoking publisher for streamId:" + aStreamIdList);
- eventPublisher.sendEvent(overrideEvent(event), aStreamIdList);
+ eventPublisher.sendEvent(event, aStreamIdList);
}
}
- private JSONObject overrideEvent(JSONObject event) {
- JSONObject jsonObject = addCurrentTimeToEvent(event);
- if (properties.eventTransformingEnabled()) {
- try (FileReader fr = new FileReader("./etc/eventTransform.json")) {
- log.info("parse eventTransform.json");
- List<Event> events = new Gson().fromJson(fr, EVENT_LIST_TYPE);
- parseEventsJson(events, new ConfigProcessorAdapter(new ConfigProcessors(jsonObject)));
- } catch (IOException e) {
- log.error(COULD_NOT_FIND_FILE, e);
- throw new ApplicationException(COULD_NOT_FIND_FILE, e);
- }
- }
- if (jsonObject.has("VESversion"))
- jsonObject.remove("VESversion");
-
- log.debug("Modified event:" + jsonObject);
- return jsonObject;
- }
-
- private JSONObject addCurrentTimeToEvent(JSONObject event) {
- final Date currentTime = new Date();
- JSONObject collectorTimeStamp = new JSONObject().put("collectorTimeStamp", dateFormat.format(currentTime));
- JSONObject commonEventHeaderkey = event.getJSONObject(EVENT_LITERAL).getJSONObject(COMMON_EVENT_HEADER);
- commonEventHeaderkey.put("internalHeaderFields", collectorTimeStamp);
- event.getJSONObject(EVENT_LITERAL).put(COMMON_EVENT_HEADER, commonEventHeaderkey);
- return event;
- }
-
- private void parseEventsJson(List<Event> eventsTransform, ConfigProcessorAdapter configProcessorAdapter) {
- for (Event eventTransform : eventsTransform) {
- JSONObject filterObj = new JSONObject(eventTransform.filter.toString());
- if (configProcessorAdapter.isFilterMet(filterObj)) {
- callProcessorsMethod(configProcessorAdapter, eventTransform.processors);
- }
- }
- }
-
- private void callProcessorsMethod(ConfigProcessorAdapter configProcessorAdapter, List<Processor> processors) {
- for (Processor processor : processors) {
- final String functionName = processor.functionName;
- final JSONObject args = new JSONObject(processor.args.toString());
- log.info(String.format("functionName==%s | args==%s", functionName, args));
- try {
- configProcessorAdapter.runConfigProcessorFunctionByName(functionName, args);
- } catch (ReflectiveOperationException e) {
- log.error("EventProcessor Exception" + e.getMessage() + e + e.getCause());
- }
- }
+ private void setLoggingContext(JSONObject event) {
+ LoggingContext localLC = VESLogger.getLoggingContextForThread(event.get(VES_UNIQUE_ID).toString());
+ localLC.put(EcompFields.kBeginTimestampMs, SaClock.now());
+ log.debug("event.VESuniqueId" + event.get(VES_UNIQUE_ID) + "event.commonEventHeader.domain:" + getDomain(event));
}
}
diff --git a/src/main/java/org/onap/dcae/common/EventUpdater.java b/src/main/java/org/onap/dcae/common/EventUpdater.java
new file mode 100644
index 00000000..1caa4f18
--- /dev/null
+++ b/src/main/java/org/onap/dcae/common/EventUpdater.java
@@ -0,0 +1,137 @@
+/*
+ * ============LICENSE_START=======================================================
+ * PROJECT
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2019 Nokia. All rights reserved.s
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcae.common;
+
+import com.google.common.reflect.TypeToken;
+import com.google.gson.Gson;
+import java.io.FileReader;
+import java.io.IOException;
+import java.lang.reflect.Type;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.List;
+import java.util.UUID;
+import org.json.JSONArray;
+import org.json.JSONObject;
+import org.onap.dcae.ApplicationException;
+import org.onap.dcae.ApplicationSettings;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class EventUpdater {
+
+ private static final String EVENT_LIST = "eventList";
+ private static final String EVENT = "event";
+ private static final String VES_UNIQUE_ID = "VESuniqueId";
+ private static final String VES_VERSION = "VESversion";
+ private static final String COULD_NOT_FIND_FILE = "Couldn't find file ./etc/eventTransform.json";
+ private static final Type EVENT_LIST_TYPE = new TypeToken<List<Event>>() {}.getType();
+ private static final Logger log = LoggerFactory.getLogger(EventSender.class);
+ private static final String EVENT_LITERAL = "event";
+ private static final String COMMON_EVENT_HEADER = "commonEventHeader";
+ private static final String EVENT_TRANSFORM = "./etc/eventTransform.json";
+ private ApplicationSettings settings;
+ private final SimpleDateFormat dateFormat = new SimpleDateFormat("EEE, MM dd yyyy hh:mm:ss z");
+
+ public EventUpdater(ApplicationSettings settings) {
+ this.settings = settings;
+ }
+
+ public JSONArray convert(JSONObject jsonObject, String version, UUID uuid, String type){
+ if(type.equalsIgnoreCase(EVENT_LIST)){
+ return convertEvents(jsonObject, uuid.toString(), version);
+ }
+ else {
+ return convertEvent(jsonObject, uuid.toString(), version);
+ }
+ }
+
+ private JSONArray convertEvents(JSONObject jsonObject,
+ String uuid, String version) {
+ JSONArray asArrayEvents = new JSONArray();
+
+ JSONArray events = jsonObject.getJSONArray(EVENT_LIST);
+ for (int i = 0; i < events.length(); i++) {
+ JSONObject event = new JSONObject().put(EVENT, events.getJSONObject(i));
+ event.put(VES_UNIQUE_ID, uuid + "-" + i);
+ event.put(VES_VERSION, version);
+ asArrayEvents.put(overrideEvent(event));
+ }
+ return asArrayEvents;
+ }
+
+ private JSONArray convertEvent(JSONObject jsonObject, String uuid, String version) {
+ jsonObject.put(VES_UNIQUE_ID, uuid);
+ jsonObject.put(VES_VERSION, version);
+ return new JSONArray().put(overrideEvent(jsonObject));
+ }
+
+ private JSONObject overrideEvent(JSONObject event) {
+ JSONObject jsonObject = addCurrentTimeToEvent(event);
+ if (settings.eventTransformingEnabled()) {
+ try (FileReader fr = new FileReader(EVENT_TRANSFORM)) {
+ log.info("parse " + EVENT_TRANSFORM + " file");
+ List<Event> events = new Gson().fromJson(fr, EVENT_LIST_TYPE);
+ parseEventsJson(events, new ConfigProcessorAdapter(new ConfigProcessors(jsonObject)));
+ } catch (IOException e) {
+ log.error(COULD_NOT_FIND_FILE, e);
+ throw new ApplicationException(COULD_NOT_FIND_FILE, e);
+ }
+ }
+ if (jsonObject.has(VES_VERSION))
+ jsonObject.remove(VES_VERSION);
+ log.debug("Modified event:" + jsonObject);
+ return jsonObject;
+ }
+
+ private JSONObject addCurrentTimeToEvent(JSONObject event) {
+ final Date currentTime = new Date();
+ JSONObject collectorTimeStamp = new JSONObject().put("collectorTimeStamp", dateFormat.format(currentTime));
+ JSONObject commonEventHeaderkey = event.getJSONObject(EVENT_LITERAL).getJSONObject(COMMON_EVENT_HEADER);
+ commonEventHeaderkey.put("internalHeaderFields", collectorTimeStamp);
+ event.getJSONObject(EVENT_LITERAL).put(COMMON_EVENT_HEADER, commonEventHeaderkey);
+ return event;
+ }
+
+ private void parseEventsJson(List<Event> eventsTransform, ConfigProcessorAdapter configProcessorAdapter) {
+ for (Event eventTransform : eventsTransform) {
+ JSONObject filterObj = new JSONObject(eventTransform.filter.toString());
+ if (configProcessorAdapter.isFilterMet(filterObj)) {
+ callProcessorsMethod(configProcessorAdapter, eventTransform.processors);
+ }
+ }
+ }
+
+ private void callProcessorsMethod(ConfigProcessorAdapter configProcessorAdapter, List<Processor> processors) {
+ for (Processor processor : processors) {
+ //TODO try to remove refection
+ final String functionName = processor.functionName;
+ final JSONObject args = new JSONObject(processor.args.toString());
+ log.info(String.format("functionName==%s | args==%s", functionName, args));
+ try {
+ configProcessorAdapter.runConfigProcessorFunctionByName(functionName, args);
+ } catch (ReflectiveOperationException e) {
+ log.error("EventProcessor Exception" + e.getMessage() + e + e.getCause());
+ }
+ }
+ }
+}
diff --git a/src/main/java/org/onap/dcae/restapi/EventValidator.java b/src/main/java/org/onap/dcae/restapi/EventValidator.java
new file mode 100644
index 00000000..f119b507
--- /dev/null
+++ b/src/main/java/org/onap/dcae/restapi/EventValidator.java
@@ -0,0 +1,78 @@
+/*
+ * ============LICENSE_START=======================================================
+ * PROJECT
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2019 Nokia. All rights reserved.s
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.restapi;
+
+import static java.util.stream.StreamSupport.stream;
+
+import com.github.fge.jackson.JsonLoader;
+import com.github.fge.jsonschema.core.report.ProcessingReport;
+import com.github.fge.jsonschema.main.JsonSchema;
+import java.util.Optional;
+import org.json.JSONObject;
+import org.onap.dcae.ApplicationException;
+import org.onap.dcae.ApplicationSettings;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.http.ResponseEntity;
+
+public class EventValidator {
+
+ private static final Logger log = LoggerFactory.getLogger(EventValidator.class);
+
+ private ApplicationSettings applicationSettings;
+
+ public EventValidator(ApplicationSettings applicationSettings) {
+ this.applicationSettings = applicationSettings;
+ }
+
+ public Optional<ResponseEntity<String>> validate(JSONObject jsonObject, String type, String version){
+ if (applicationSettings.jsonSchemaValidationEnabled()) {
+ if (jsonObject.has(type)) {
+ if (!conformsToSchema(jsonObject, version)) {
+ return errorResponse(ApiException.SCHEMA_VALIDATION_FAILED);
+ }
+ } else {
+ return errorResponse(ApiException.INVALID_JSON_INPUT);
+ }
+ }
+ return Optional.empty();
+ }
+
+ private boolean conformsToSchema(JSONObject payload, String version) {
+ try {
+ JsonSchema schema = applicationSettings.jsonSchema(version);
+ ProcessingReport report = schema.validate(JsonLoader.fromString(payload.toString()));
+ if (report.isSuccess()) {
+ return true;
+ }
+ log.warn("Schema validation failed for event: " + payload);
+ stream(report.spliterator(), false).forEach(e -> log.warn(e.getMessage()));
+ return false;
+ } catch (Exception e) {
+ throw new ApplicationException("Unable to validate against schema", e);
+ }
+ }
+
+ private Optional<ResponseEntity<String>> errorResponse(ApiException noServerResources) {
+ return Optional.of(ResponseEntity.status(noServerResources.httpStatusCode)
+ .body(noServerResources.toJSON().toString()));
+ }
+}
diff --git a/src/main/java/org/onap/dcae/restapi/HealthCheckController.java b/src/main/java/org/onap/dcae/restapi/HealthCheckController.java
index 9c65619c..77c6802a 100644
--- a/src/main/java/org/onap/dcae/restapi/HealthCheckController.java
+++ b/src/main/java/org/onap/dcae/restapi/HealthCheckController.java
@@ -21,16 +21,20 @@
package org.onap.dcae.restapi;
-import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.RestController;
-
-@Controller
+@RestController
public class HealthCheckController {
+ @GetMapping("/")
+ public String main() {
+ return "Welcome to VESCollector";
+ }
+
@GetMapping("/healthcheck")
public String healthCheck() {
- return "hello";
+ return "I'm good";
}
}
diff --git a/src/main/java/org/onap/dcae/restapi/SwaggerConfig.java b/src/main/java/org/onap/dcae/restapi/SwaggerConfig.java
index 60740a80..267db054 100644
--- a/src/main/java/org/onap/dcae/restapi/SwaggerConfig.java
+++ b/src/main/java/org/onap/dcae/restapi/SwaggerConfig.java
@@ -31,6 +31,7 @@ import springfox.documentation.swagger2.annotations.EnableSwagger2;
@Configuration
@EnableSwagger2
public class SwaggerConfig{
+
@Bean
public Docket api() {
return new Docket(DocumentationType.SWAGGER_2)
@@ -39,5 +40,4 @@ public class SwaggerConfig{
.paths(PathSelectors.any())
.build();
}
-
}
diff --git a/src/main/java/org/onap/dcae/restapi/VesRestController.java b/src/main/java/org/onap/dcae/restapi/VesRestController.java
index 3102c31c..b18eb7bc 100644
--- a/src/main/java/org/onap/dcae/restapi/VesRestController.java
+++ b/src/main/java/org/onap/dcae/restapi/VesRestController.java
@@ -21,32 +21,27 @@
package org.onap.dcae.restapi;
-import static java.util.stream.StreamSupport.stream;
import static org.springframework.http.ResponseEntity.accepted;
+import static org.springframework.http.ResponseEntity.badRequest;
import com.att.nsa.clock.SaClock;
import com.att.nsa.logging.LoggingContext;
import com.att.nsa.logging.log4j.EcompFields;
-import com.github.fge.jackson.JsonLoader;
-import com.github.fge.jsonschema.core.report.ProcessingReport;
-import com.github.fge.jsonschema.main.JsonSchema;
-
+import java.util.Optional;
import java.util.UUID;
-import java.util.concurrent.LinkedBlockingQueue;
import javax.servlet.http.HttpServletRequest;
-
import org.json.JSONArray;
import org.json.JSONObject;
-import org.onap.dcae.ApplicationException;
import org.onap.dcae.ApplicationSettings;
+import org.onap.dcae.common.EventSender;
import org.onap.dcae.common.VESLogger;
+import org.onap.dcae.common.EventUpdater;
import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
-import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;
@@ -54,152 +49,64 @@ import org.springframework.web.bind.annotation.RestController;
@RestController
public class VesRestController {
- private static final Logger log = LoggerFactory.getLogger(VesRestController.class);
- private static final String INVALID_JSON = ApiException.INVALID_JSON_INPUT.toJSON().toString();
- private final ApplicationSettings applicationSettings;
- private final LinkedBlockingQueue<JSONObject> inputQueue;
- private final Logger metricsLog;
- private final Logger errorLog;
- private final Logger incomingRequestsLogger;
+ private static final String VES_EVENT_MESSAGE = "Received a VESEvent '%s', marked with unique identifier '%s', on api version '%s', from host: '%s'";
+ private static final String EVENT_LIST = "eventList";
+ private static final String EVENT = "event";
+ private final ApplicationSettings settings;
+ private final Logger requestLogger;
+ private EventSender eventSender;
@Autowired
- VesRestController(ApplicationSettings applicationSettings,
- @Qualifier("metricsLog") Logger metricsLog,
- @Qualifier("errorLog") Logger errorLog,
+ VesRestController(ApplicationSettings settings,
@Qualifier("incomingRequestsLogger") Logger incomingRequestsLogger,
- @Qualifier("inputQueue") LinkedBlockingQueue<JSONObject> inputQueue) {
- this.applicationSettings = applicationSettings;
- this.metricsLog = metricsLog;
- this.errorLog = errorLog;
- this.incomingRequestsLogger = incomingRequestsLogger;
- this.inputQueue = inputQueue;
- }
-
- @GetMapping("/")
- String mainPage() {
- return "Welcome to VESCollector";
+ @Qualifier("eventSender") EventSender eventSender) {
+ this.settings = settings;
+ this.requestLogger = incomingRequestsLogger;
+ this.eventSender = eventSender;
}
- //refactor in next iteration
- @PostMapping(value = {"/eventListener/v1",
- "/eventListener/v1/eventBatch",
- "/eventListener/v2",
- "/eventListener/v2/eventBatch",
- "/eventListener/v3",
- "/eventListener/v3/eventBatch",
- "/eventListener/v4",
- "/eventListener/v4/eventBatch",
- "/eventListener/v5",
- "/eventListener/v5/eventBatch",
- "/eventListener/v7",
- "/eventListener/v7/eventBatch"}, consumes = "application/json")
- ResponseEntity<String> receiveEvent(@RequestBody String jsonPayload, HttpServletRequest httpServletRequest) {
- String request = httpServletRequest.getRequestURI();
- String version = extractVersion(request);
-
- JSONObject jsonObject;
- try {
- jsonObject = new JSONObject(jsonPayload);
- } catch (Exception e) {
- log.error(INVALID_JSON);
- return ResponseEntity.badRequest().body(INVALID_JSON);
- }
-
- String uuid = setUpECOMPLoggingForRequest();
- incomingRequestsLogger.info(String.format(
- "Received a VESEvent '%s', marked with unique identifier '%s', on api version '%s', from host: '%s'",
- jsonObject, uuid, version, httpServletRequest.getRemoteHost()));
-
- if (applicationSettings.jsonSchemaValidationEnabled()) {
- if (isBatchRequest(request) && (jsonObject.has("eventList") && (!jsonObject.has("event")))) {
- if (!conformsToSchema(jsonObject, version)) {
- return errorResponse(ApiException.SCHEMA_VALIDATION_FAILED);
- }
- } else if (!isBatchRequest(request) && (!jsonObject.has("eventList") && (jsonObject.has("event")))) {
- if (!conformsToSchema(jsonObject, version)) {
- return errorResponse(ApiException.SCHEMA_VALIDATION_FAILED);
- }
- } else {
- return errorResponse(ApiException.INVALID_JSON_INPUT);
- }
+ @PostMapping(value = {"/eventListener/{version}"}, consumes = "application/json")
+ ResponseEntity<String> event(@RequestBody String event, @PathVariable String version, HttpServletRequest request) {
+ if (settings.isVersionSupported(version)) {
+ return process(event, version, request, EVENT);
}
+ return badRequest().contentType(MediaType.APPLICATION_JSON).body(String.format("API version %s is not supported", version));
+ }
- JSONArray commonlyFormatted = convertToJSONArrayCommonFormat(jsonObject, request, uuid, version);
- if (!putEventsOnProcessingQueue(commonlyFormatted)) {
- errorLog.error("EVENT_RECEIPT_FAILURE: QueueFull " + ApiException.NO_SERVER_RESOURCES);
- return errorResponse(ApiException.NO_SERVER_RESOURCES);
+ @PostMapping(value = {"/eventListener/{version}/eventBatch"}, consumes = "application/json")
+ ResponseEntity<String> events(@RequestBody String events, @PathVariable String version, HttpServletRequest request) {
+ if (settings.isVersionSupported(version)) {
+ return process(events, version, request, EVENT_LIST);
}
- return accepted()
- .contentType(MediaType.APPLICATION_JSON)
- .body("Accepted");
+ return badRequest().contentType(MediaType.APPLICATION_JSON).body(String.format("API version %s is not supported", version));
}
- private String extractVersion(String httpServletRequest) {
- return httpServletRequest.split("/")[2];
- }
+ private ResponseEntity<String> process(String events, String version, HttpServletRequest request, String type) {
- private ResponseEntity<String> errorResponse(ApiException noServerResources) {
- return ResponseEntity.status(noServerResources.httpStatusCode)
- .body(noServerResources.toJSON().toString());
- }
+ JSONObject jsonObject = new JSONObject(events);
- private boolean putEventsOnProcessingQueue(JSONArray arrayOfEvents) {
- for (int i = 0; i < arrayOfEvents.length(); i++) {
- metricsLog.info("EVENT_PUBLISH_START");
- if (!inputQueue.offer((JSONObject) arrayOfEvents.get(i))) {
- return false;
- }
- }
- log.debug("CommonStartup.handleEvents:EVENTS has been published successfully!");
- metricsLog.info("EVENT_PUBLISH_END");
- return true;
- }
+ EventValidator eventValidator = new EventValidator(settings);
+ Optional<ResponseEntity<String>> validationResult = eventValidator.validate(jsonObject, type, version);
- private boolean conformsToSchema(JSONObject payload, String version) {
- try {
- JsonSchema schema = applicationSettings.jsonSchema(version);
- ProcessingReport report = schema.validate(JsonLoader.fromString(payload.toString()));
- if (!report.isSuccess()) {
- log.warn("Schema validation failed for event: " + payload);
- stream(report.spliterator(), false).forEach(e -> log.warn(e.getMessage()));
- return false;
- }
- return report.isSuccess();
- } catch (Exception e) {
- throw new ApplicationException("Unable to validate against schema", e);
+ if (validationResult.isPresent()){
+ return validationResult.get();
}
+ JSONArray arrayOfEvents = new EventUpdater(settings).convert(jsonObject,version, generateUUID(version, request.getRequestURI(), jsonObject), type);
+ eventSender.send(arrayOfEvents);
+ // TODO call service and return status, replace CambriaClient, split event to single object and list of them
+ return accepted().contentType(MediaType.APPLICATION_JSON).body("Accepted");
}
- private static JSONArray convertToJSONArrayCommonFormat(JSONObject jsonObject, String request,
- String uuid, String version) {
- JSONArray asArrayEvents = new JSONArray();
- String vesUniqueIdKey = "VESuniqueId";
- String vesVersionKey = "VESversion";
- if (isBatchRequest(request)) {
- JSONArray events = jsonObject.getJSONArray("eventList");
- for (int i = 0; i < events.length(); i++) {
- JSONObject event = new JSONObject().put("event", events.getJSONObject(i));
- event.put(vesUniqueIdKey, uuid + "-" + i);
- event.put(vesVersionKey, version);
- asArrayEvents.put(event);
- }
- } else {
- jsonObject.put(vesUniqueIdKey, uuid);
- jsonObject.put(vesVersionKey, version);
- asArrayEvents = new JSONArray().put(jsonObject);
- }
- return asArrayEvents;
+ private UUID generateUUID(String version, String uri, JSONObject jsonObject) {
+ UUID uuid = UUID.randomUUID();
+ setUpECOMPLoggingForRequest(uuid);
+ requestLogger.info(String.format(VES_EVENT_MESSAGE, jsonObject, uuid, version, uri));
+ return uuid;
}
- private static String setUpECOMPLoggingForRequest() {
- final UUID uuid = UUID.randomUUID();
+ private static void setUpECOMPLoggingForRequest(UUID uuid) {
LoggingContext localLC = VESLogger.getLoggingContextForThread(uuid);
localLC.put(EcompFields.kBeginTimestampMs, SaClock.now());
- return uuid.toString();
- }
-
- private static boolean isBatchRequest(String request) {
- return request.contains("eventBatch");
}
} \ No newline at end of file
diff --git a/src/main/java/org/onap/dcae/restapi/WebMvcConfig.java b/src/main/java/org/onap/dcae/restapi/WebMvcConfig.java
index 7059c4e5..c3e2a5de 100644
--- a/src/main/java/org/onap/dcae/restapi/WebMvcConfig.java
+++ b/src/main/java/org/onap/dcae/restapi/WebMvcConfig.java
@@ -52,5 +52,4 @@ public class WebMvcConfig extends WebMvcConfigurationSupport {
resolver.setSuffix(".html");
return resolver;
}
-
}
diff --git a/src/test/java/org/onap/dcae/ApplicationSettingsTest.java b/src/test/java/org/onap/dcae/ApplicationSettingsTest.java
index 60287aef..6b0023f8 100644
--- a/src/test/java/org/onap/dcae/ApplicationSettingsTest.java
+++ b/src/test/java/org/onap/dcae/ApplicationSettingsTest.java
@@ -235,25 +235,6 @@ public class ApplicationSettingsTest {
}
@Test
- public void shouldReturnMaximumAllowedQueuedEvents() throws IOException {
- // when
- int maximumAllowedQueuedEvents = fromTemporaryConfiguration("collector.inputQueue.maxPending=10000")
- .maximumAllowedQueuedEvents();
-
- // then
- assertEquals(10000, maximumAllowedQueuedEvents);
- }
-
- @Test
- public void shouldReturnDefaultMaximumAllowedQueuedEvents() throws IOException {
- // when
- int maximumAllowedQueuedEvents = fromTemporaryConfiguration().maximumAllowedQueuedEvents();
-
- // then
- assertEquals(1024 * 4, maximumAllowedQueuedEvents);
- }
-
- @Test
public void shouldTellIfSchemaValidationIsEnabled() throws IOException {
// when
boolean jsonSchemaValidationEnabled = fromTemporaryConfiguration("collector.schema.checkflag=1")
diff --git a/src/test/java/org/onap/dcae/TLSTestBase.java b/src/test/java/org/onap/dcae/TLSTestBase.java
index 4dada129..df10ead9 100644
--- a/src/test/java/org/onap/dcae/TLSTestBase.java
+++ b/src/test/java/org/onap/dcae/TLSTestBase.java
@@ -24,6 +24,7 @@ package org.onap.dcae;
import org.json.JSONObject;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mockito;
+import org.onap.dcae.common.EventSender;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.mock.mockito.MockBean;
@@ -69,8 +70,8 @@ public class TLSTestBase {
protected abstract class TestClassBase {
@MockBean
- @Qualifier("inputQueue")
- protected LinkedBlockingQueue<JSONObject> queue;
+ @Qualifier("eventSender")
+ protected EventSender eventSender;
@LocalServerPort
private int port;
diff --git a/src/test/java/org/onap/dcae/common/EventSenderTest.java b/src/test/java/org/onap/dcae/common/EventSenderTest.java
index aba3c2a9..f49d3cd8 100644
--- a/src/test/java/org/onap/dcae/common/EventSenderTest.java
+++ b/src/test/java/org/onap/dcae/common/EventSenderTest.java
@@ -28,6 +28,7 @@ import static org.mockito.Mockito.when;
import io.vavr.collection.HashMap;
import io.vavr.collection.Map;
+import org.json.JSONArray;
import org.json.JSONObject;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -39,7 +40,6 @@ import org.onap.dcae.common.publishing.EventPublisher;
@RunWith(MockitoJUnitRunner.Silent.class)
public class EventSenderTest {
-
private String event = "{\"VESversion\":\"v7\",\"VESuniqueId\":\"fd69d432-5cd5-4c15-9d34-407c81c61c6a-0\",\"event\":{\"commonEventHeader\":{\"startEpochMicrosec\":1544016106000000,\"eventId\":\"fault33\",\"timeZoneOffset\":\"UTC+00.00\",\"priority\":\"Normal\",\"version\":\"4.0.1\",\"nfVendorName\":\"Ericsson\",\"reportingEntityName\":\"1\",\"sequence\":1,\"domain\":\"fault\",\"lastEpochMicrosec\":1544016106000000,\"eventName\":\"Fault_KeyFileFault\",\"vesEventListenerVersion\":\"7.0.1\",\"sourceName\":\"1\"},\"faultFields\":{\"eventSeverity\":\"CRITICAL\",\"alarmCondition\":\"KeyFileFault\",\"faultFieldsVersion\":\"4.0\",\"eventCategory\":\"PROCESSINGERRORALARM\",\"specificProblem\":\"License Key File Fault_1\",\"alarmAdditionalInformation\":{\"probableCause\":\"ConfigurationOrCustomizationError\",\"additionalText\":\"test_1\",\"source\":\"ManagedElement=1,SystemFunctions=1,Lm=1\"},\"eventSourceType\":\"Lm\",\"vfStatus\":\"Active\"}}}\n";
@Mock
@@ -54,7 +54,10 @@ public class EventSenderTest {
public void shouldntSendEventWhenStreamIdsIsEmpty() {
when(settings.dMaaPStreamsMapping()).thenReturn(HashMap.empty());
eventSender = new EventSender(eventPublisher, settings );
- eventSender.send(new JSONObject(event));
+ JSONObject jsonObject = new JSONObject(event);
+ JSONArray jsonArray = new JSONArray();
+ jsonArray.put(jsonObject);
+ eventSender.send(jsonArray);
verify(eventPublisher,never()).sendEvent(any(),any());
}
@@ -63,7 +66,10 @@ public class EventSenderTest {
Map<String, String[]> streams = HashMap.of("fault", new String[]{"ves-fault", "fault-ves"});
when(settings.dMaaPStreamsMapping()).thenReturn(streams);
eventSender = new EventSender(eventPublisher, settings );
- eventSender.send(new JSONObject(event));
+ JSONObject jsonObject = new JSONObject(event);
+ JSONArray jsonArray = new JSONArray();
+ jsonArray.put(jsonObject);
+ eventSender.send(jsonArray);
verify(eventPublisher, times(2)).sendEvent(any(),any());
}
} \ No newline at end of file
diff --git a/src/test/resources/controller-config_dmaap_ip.json b/src/test/resources/controller-config_dmaap_ip.json
index 1cc6576b..f148db55 100644
--- a/src/test/resources/controller-config_dmaap_ip.json
+++ b/src/test/resources/controller-config_dmaap_ip.json
@@ -1,6 +1,5 @@
{
"auth.method": "noAuth",
- "collector.inputQueue.maxPending": 8096,
"collector.schema.checkflag": 1,
"collector.keystore.file.location": "/opt/app/dcae-certificate/keystore.jks",
"tomcat.maxthreads": "200",
diff --git a/src/test/resources/controller-config_singleline_ip.json b/src/test/resources/controller-config_singleline_ip.json
index c3a8d067..a3974e0f 100644
--- a/src/test/resources/controller-config_singleline_ip.json
+++ b/src/test/resources/controller-config_singleline_ip.json
@@ -5,7 +5,6 @@
"tomcat.maxthreads": "200",
"collector.dmaap.streamid": "fault=ves-fault|syslog=ves-syslog|heartbeat=ves-heartbeat|measurementsForVfScaling=ves-measurement|mobileFlow=ves-mobileflow|other=ves-other|stateChange=ves-statechange|thresholdCrossingAlert=ves-thresholdCrossingAlert|voiceQuality=ves-voicequality|sipSignaling=ves-sipsignaling",
"streams_subscribes": {},
- "collector.inputQueue.maxPending": "8096",
"streams_publishes": {
"ves-mobileflow": {
"type": "message_router",
diff --git a/src/test/resources/test_collector_ip_op.properties b/src/test/resources/test_collector_ip_op.properties
index 9450067a..0916211f 100644
--- a/src/test/resources/test_collector_ip_op.properties
+++ b/src/test/resources/test_collector_ip_op.properties
@@ -9,7 +9,6 @@ collector.dmaapfile=./etc/DmaapConfig.json
auth.method=noAuth
header.authlist=sample1,$2a$10$pgjaxDzSuc6XVFEeqvxQ5u90DKJnM/u7TJTcinAlFJVaavXMWf/Zi|userid1,$2a$10$61gNubgJJl9lh3nvQvY9X.x4e5ETWJJ7ao7ZhJEvmfJigov26Z6uq|userid2,$2a$10$G52y/3uhuhWAMy.bx9Se8uzWinmbJa.dlm1LW6bYPdPkkywLDPLiy
event.transform.flag=1
-collector.inputQueue.maxPending = 8096
streams_subscribes = {}
services_calls = {}
tomcat.maxthreads = 200