summaryrefslogtreecommitdiffstats
path: root/components/bbs-event-processor
diff options
context:
space:
mode:
authorStavros Kanarakis <stavros.kanarakis@nokia.com>2019-03-26 15:22:38 +0200
committerStavros Kanarakis <stavros.kanarakis@nokia.com>2019-03-27 15:11:46 +0200
commitf9e1cf6e9e91e587b9d387dc994058bcfdfc8c20 (patch)
treea623e15edfabe340a02c4b84f1276fe67ece1062 /components/bbs-event-processor
parent458ac66f25b9343447c5d3c4885594a61e328a86 (diff)
Bug fixes and performance improvements
When facing any kind of errors on retrieving PNF and service-instance objects from A&AI, BBSep now logs the error and keeps the reactive stream active. Synchronized access in tasks when there is a configuration update. Performance improvements in polling from DMaaP. Change-Id: I654fd1a7267f2b723cc66b0a93e4855003af2914 Issue-ID: DCAEGEN2-1354 Signed-off-by: Stavros Kanarakis <stavros.kanarakis@nokia.com>
Diffstat (limited to 'components/bbs-event-processor')
-rw-r--r--components/bbs-event-processor/dpo/blueprints/bbs-event-processor-input.yaml2
-rw-r--r--components/bbs-event-processor/dpo/blueprints/k8s-bbs-event-processor.yaml-template36
-rw-r--r--components/bbs-event-processor/dpo/spec/bbs-event-processor-spec.json70
-rw-r--r--components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/config/ApplicationConfiguration.java171
-rw-r--r--components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/config/ApplicationConstants.java5
-rw-r--r--components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/config/ConfigurationChangeObserver.java5
-rw-r--r--components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/config/ConsulConfigurationGateway.java46
-rw-r--r--components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/config/GenericProperties.java18
-rw-r--r--components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/model/GeneratedAppConfigObject.java21
-rw-r--r--components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/pipelines/CpeAuthenticationPipeline.java40
-rw-r--r--components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/pipelines/ReRegistrationPipeline.java42
-rw-r--r--components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/pipelines/Scheduler.java10
-rw-r--r--components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/AaiClientTask.java3
-rw-r--r--components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/AaiClientTaskImpl.java11
-rw-r--r--components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapCpeAuthenticationConsumerTask.java3
-rw-r--r--components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapCpeAuthenticationConsumerTaskImpl.java53
-rw-r--r--components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapPublisherTask.java3
-rw-r--r--components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapPublisherTaskImpl.java37
-rw-r--r--components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapReRegistrationConsumerTask.java3
-rw-r--r--components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapReRegistrationConsumerTaskImpl.java54
-rw-r--r--components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/AaiReactiveClient.java29
-rw-r--r--components/bbs-event-processor/src/main/resources/application.yml6
-rw-r--r--components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/config/ApplicationConfigurationTest.java24
-rw-r--r--components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/config/ConsulConfigurationGatewayTest.java12
-rw-r--r--components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/config/GenericPropertiesTest.java12
-rw-r--r--components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/pipelines/CpeAuthenticationPipelineTest.java12
-rw-r--r--components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/pipelines/ReRegistrationPipelineTest.java12
27 files changed, 523 insertions, 217 deletions
diff --git a/components/bbs-event-processor/dpo/blueprints/bbs-event-processor-input.yaml b/components/bbs-event-processor/dpo/blueprints/bbs-event-processor-input.yaml
index 97bb138e..36e69cf6 100644
--- a/components/bbs-event-processor/dpo/blueprints/bbs-event-processor-input.yaml
+++ b/components/bbs-event-processor/dpo/blueprints/bbs-event-processor-input.yaml
@@ -25,5 +25,5 @@ application_rereg_cl_control_name: clControlNameReReg
application_cpeAuth_policy_scope: policyScopeCpeAuth
application_cpeAuth_cl_control_name: clControlNameCpeAuth
application_cbs_polling_interval_sec: 120
-dmaap_consumer_id: c12
+dmaap_consumer_id: c12
dmaap_consumer_group: OpenDcae-c12
diff --git a/components/bbs-event-processor/dpo/blueprints/k8s-bbs-event-processor.yaml-template b/components/bbs-event-processor/dpo/blueprints/k8s-bbs-event-processor.yaml-template
index 3468d7f6..e9550314 100644
--- a/components/bbs-event-processor/dpo/blueprints/k8s-bbs-event-processor.yaml-template
+++ b/components/bbs-event-processor/dpo/blueprints/k8s-bbs-event-processor.yaml-template
@@ -21,7 +21,7 @@ tosca_definitions_version: cloudify_dsl_1_3
imports:
- "http://www.getcloudify.org/spec/cloudify/3.4/types.yaml"
- - {{ ONAPTEMPLATE_RAWREPOURL_org_onap_dcaegen2_platform_plugins_releases }}/k8splugin/1.4.5/k8splugin_types.yaml
+ - https://nexus.onap.org/service/local/repositories/raw/content/org.onap.dcaegen2.platform.plugins/R3/k8splugin/1.4.4/k8splugin_types.yaml
inputs:
aai_enrichment_host:
@@ -47,7 +47,31 @@ inputs:
cpe_authentication_url:
type: string
close_loop_url:
- type: string
+ type: string
+ application_policy_version:
+ description: Policy version value for building CL events
+ type: string
+ default: "1.0.0.5"
+ application_cl_target_type:
+ description: Close Loop target type value for building CL events
+ type: string
+ default: "VM"
+ application_cl_event_status:
+ description: Close Loop event status value for building CL events
+ type: string
+ default: "ONSET"
+ application_cl_version:
+ description: Close Loop version value for building CL events
+ type: string
+ default: "1.0.2"
+ application_cl_target:
+ description: Close Loop target value for building CL events
+ type: string
+ default: "vserver.vserver-name"
+ application_cl_originator:
+ description: Close Loop originator value for building CL events
+ type: string
+ default: "DCAE-BBS-ep"
application_rereg_policy_scope:
description: Policy Scope value for building PNF relocation CL event
type: string
@@ -101,6 +125,12 @@ node_templates:
application.pipelinesPollingIntervalSec: 30
application.pipelinesTimeoutSec: 15
application.cbsPollingIntervalSec: { get_input: application_cbs_polling_interval_sec }
+ application.policyVersion: { get_input: application_policy_version }
+ application.clTargetType: { get_input: application_cl_target_type }
+ application.clEventStatus: { get_input: application_cl_event_status }
+ application.clVersion: { get_input: application_cl_version }
+ application.clTarget: { get_input: application_cl_target }
+ application.clOriginator: { get_input: application_cl_originator }
application.reregistration.policyScope: { get_input: application_rereg_policy_scope }
application.reregistration.clControlName: { get_input: application_rereg_cl_control_name }
application.cpe.authentication.policyScope: { get_input: application_cpeAuth_policy_scope }
@@ -127,4 +157,4 @@ node_templates:
log_directory: "/opt/app/bbs-event-processor/logs"
tls_info:
cert_directory: '/opt/app/bbs-event-processor/etc/cert/'
- use_tls: false
+ use_tls: false \ No newline at end of file
diff --git a/components/bbs-event-processor/dpo/spec/bbs-event-processor-spec.json b/components/bbs-event-processor/dpo/spec/bbs-event-processor-spec.json
index b1329bef..8710a877 100644
--- a/components/bbs-event-processor/dpo/spec/bbs-event-processor-spec.json
+++ b/components/bbs-event-processor/dpo/spec/bbs-event-processor-spec.json
@@ -52,7 +52,7 @@
"designer_editable": true,
"policy_editable": false,
"sourced_at_deployment": false,
- "description": "DmaaP protocol used for any DMaaP interaction"
+ "description": "DMaaP protocol used for any DMaaP interaction"
},
{
"name": "dmaap.contentType",
@@ -60,7 +60,7 @@
"designer_editable": true,
"policy_editable": false,
"sourced_at_deployment": false,
- "description": "DmaaP content type"
+ "description": "DMaaP content type"
},
{
"name": "dmaap.consumer.consumerId",
@@ -68,7 +68,7 @@
"designer_editable": true,
"policy_editable": false,
"sourced_at_deployment": true,
- "description": "DmaaP consumer consumer ID"
+ "description": "DMaaP consumer consumer ID"
},
{
"name": "dmaap.consumer.consumerGroup",
@@ -76,7 +76,7 @@
"designer_editable": true,
"policy_editable": false,
"sourced_at_deployment": true,
- "description": "DmaaP consumer consumer group"
+ "description": "DMaaP consumer consumer group"
},
{
"name": "dmaap.messageLimit",
@@ -84,7 +84,7 @@
"designer_editable": true,
"policy_editable": false,
"sourced_at_deployment": false,
- "description": "DmaaP message limit"
+ "description": "DMaaP message limit"
},
{
"name": "dmaap.timeoutMs",
@@ -92,7 +92,7 @@
"designer_editable": true,
"policy_editable": false,
"sourced_at_deployment": false,
- "description": "DmaaP timeout in millis"
+ "description": "DMaaP timeout in millis"
},
{
"name": "aai.host",
@@ -168,7 +168,7 @@
"value": 300,
"constraints": [
{
- "greater_or_equal": 20
+ "greater_or_equal": 30
}
],
"designer_editable": true,
@@ -177,12 +177,60 @@
"description": "Polling interval in seconds for fetching configuration from Consul via CBS service"
},
{
+ "name": "application.policyVersion",
+ "value": "1.0.0.5",
+ "designer_editable": false,
+ "policy_editable": true,
+ "sourced_at_deployment": true,
+ "description": "Policy Version parameter in Policy triggering event to be published"
+ },
+ {
+ "name": "application.clTargetType",
+ "value": "VM",
+ "designer_editable": false,
+ "policy_editable": true,
+ "sourced_at_deployment": true,
+ "description": "Close Loop target type parameter in Policy triggering event to be published"
+ },
+ {
+ "name": "application.clEventStatus",
+ "value": "ONSET",
+ "designer_editable": false,
+ "policy_editable": true,
+ "sourced_at_deployment": true,
+ "description": "Close Loop event status parameter in Policy triggering event to be published"
+ },
+ {
+ "name": "application.clVersion",
+ "value": "1.0.2",
+ "designer_editable": false,
+ "policy_editable": true,
+ "sourced_at_deployment": true,
+ "description": "Close Loop version parameter in Policy triggering event to be published"
+ },
+ {
+ "name": "application.clTarget",
+ "value": "vserver.vserver-name",
+ "designer_editable": false,
+ "policy_editable": true,
+ "sourced_at_deployment": true,
+ "description": "Close Loop target parameter in Policy triggering event to be published"
+ },
+ {
+ "name": "application.clOriginator",
+ "value": "DCAE-BBS-ep",
+ "designer_editable": false,
+ "policy_editable": true,
+ "sourced_at_deployment": true,
+ "description": "Close Loop originator parameter in Policy triggering event to be published"
+ },
+ {
"name": "application.reregistration.policyScope",
"value": "policyScope",
"designer_editable": false,
"policy_editable": true,
"sourced_at_deployment": true,
- "description": "Hard-coded value for Policy Scope parameter in Policy triggering event to be published"
+ "description": "Policy Scope parameter in Policy triggering event to be published"
},
{
"name": "application.reregistration.clControlName",
@@ -190,7 +238,7 @@
"designer_editable": false,
"policy_editable": true,
"sourced_at_deployment": true,
- "description": "Hard-coded value for CL control name parameter in Policy triggering event to be published"
+ "description": "Close Loop control name parameter in Policy triggering event to be published"
},
{
"name": "application.cpe.authentication.policyScope",
@@ -198,7 +246,7 @@
"designer_editable": false,
"policy_editable": true,
"sourced_at_deployment": true,
- "description": "Hard-coded value for CL control nam parameter in Policy triggering event to be published"
+ "description": "Policy Scope parameter in Policy triggering event to be published"
},
{
"name": "application.cpe.authentication.clControlName",
@@ -206,7 +254,7 @@
"designer_editable": false,
"policy_editable": true,
"sourced_at_deployment": true,
- "description": "Hard-coded value for CL control nam parameter in Policy triggering event to be published"
+ "description": "Close Loop control nam parameter in Policy triggering event to be published"
},
{
"name": "application.reregistration.configKey",
diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/config/ApplicationConfiguration.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/config/ApplicationConfiguration.java
index 5277f3cd..69fa83d2 100644
--- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/config/ApplicationConfiguration.java
+++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/config/ApplicationConfiguration.java
@@ -93,51 +93,74 @@ public class ApplicationConfiguration implements ConfigurationChangeObservable {
@Override
public synchronized void notifyObservers() {
- observers.forEach(o -> o.updateConfiguration(this));
+ observers.forEach(ConfigurationChangeObserver::updateConfiguration);
}
- public DmaapConsumerConfiguration getDmaapReRegistrationConsumerConfiguration() {
+ public synchronized DmaapConsumerConfiguration getDmaapReRegistrationConsumerConfiguration() {
return dmaapReRegistrationConsumerConfiguration;
}
- public DmaapConsumerConfiguration getDmaapCpeAuthenticationConsumerConfiguration() {
+ public synchronized DmaapConsumerConfiguration getDmaapCpeAuthenticationConsumerConfiguration() {
return dmaapCpeAuthenticationConsumerConfiguration;
}
- public DmaapPublisherConfiguration getDmaapPublisherConfiguration() {
+ public synchronized DmaapPublisherConfiguration getDmaapPublisherConfiguration() {
return dmaapPublisherConfiguration;
}
- public AaiClientConfiguration getAaiClientConfiguration() {
+ public synchronized AaiClientConfiguration getAaiClientConfiguration() {
return aaiClientConfiguration;
}
- public int getPipelinesPollingIntervalInSeconds() {
+ public synchronized int getPipelinesPollingIntervalInSeconds() {
return genericProperties.getPipelinesPollingIntervalSec();
}
- public int getPipelinesTimeoutInSeconds() {
+ public synchronized int getPipelinesTimeoutInSeconds() {
return genericProperties.getPipelinesTimeoutSec();
}
- public int getCbsPollingInterval() {
- return cbsPollingInterval;
+ public synchronized String getPolicyVersion() {
+ return genericProperties.getPolicyVersion();
+ }
+
+ public synchronized String getCloseLoopTargetType() {
+ return genericProperties.getClTargetType();
+ }
+
+ public synchronized String getCloseLoopEventStatus() {
+ return genericProperties.getClEventStatus();
+ }
+
+ public synchronized String getCloseLoopVersion() {
+ return genericProperties.getClVersion();
+ }
+
+ public synchronized String getCloseLoopTarget() {
+ return genericProperties.getClTarget();
}
- public String getReRegistrationCloseLoopPolicyScope() {
+ public String getCloseLoopOriginator() {
+ return genericProperties.getClOriginator();
+ }
+ public synchronized int getCbsPollingInterval() {
+ return cbsPollingInterval;
+ }
+
+ public synchronized String getReRegistrationCloseLoopPolicyScope() {
return genericProperties.getReRegistration().getPolicyScope();
}
- public String getReRegistrationCloseLoopControlName() {
+ public synchronized String getReRegistrationCloseLoopControlName() {
return genericProperties.getReRegistration().getClControlName();
}
- public String getCpeAuthenticationCloseLoopPolicyScope() {
+ public synchronized String getCpeAuthenticationCloseLoopPolicyScope() {
return genericProperties.getCpeAuthentication().getPolicyScope();
}
- public String getCpeAuthenticationCloseLoopControlName() {
+ public synchronized String getCpeAuthenticationCloseLoopControlName() {
return genericProperties.getCpeAuthentication().getClControlName();
}
@@ -147,61 +170,71 @@ public class ApplicationConfiguration implements ConfigurationChangeObservable {
*/
public void updateCurrentConfiguration(GeneratedAppConfigObject newConfiguration) {
- cbsPollingInterval = newConfiguration.cbsPollingIntervalSec();
-
- GeneratedAppConfigObject.StreamsObject reRegObject = getStreamsObject(newConfiguration.streamSubscribesMap(),
- newConfiguration.reRegConfigKey(), "PNF Re-Registration");
- TopicUrlInfo topicUrlInfo = parseTopicUrl(reRegObject.dmaapInfo().topicUrl());
- dmaapReRegistrationConsumerProperties.setDmaapHostName(topicUrlInfo.getHost());
- dmaapReRegistrationConsumerProperties.setDmaapPortNumber(topicUrlInfo.getPort());
- dmaapReRegistrationConsumerProperties.setDmaapProtocol(newConfiguration.dmaapProtocol());
- dmaapReRegistrationConsumerProperties.setDmaapContentType(newConfiguration.dmaapContentType());
- dmaapReRegistrationConsumerProperties.setDmaapTopicName(topicUrlInfo.getTopicName());
- dmaapReRegistrationConsumerProperties.setConsumerId(newConfiguration.dmaapConsumerConsumerId());
- dmaapReRegistrationConsumerProperties.setConsumerGroup(newConfiguration.dmaapConsumerConsumerGroup());
- dmaapReRegistrationConsumerProperties.setMessageLimit(newConfiguration.dmaapMessageLimit());
- dmaapReRegistrationConsumerProperties.setTimeoutMs(newConfiguration.dmaapTimeoutMs());
- constructDmaapReRegistrationConfiguration();
-
- GeneratedAppConfigObject.StreamsObject cpeAuthObject = getStreamsObject(newConfiguration.streamSubscribesMap(),
- newConfiguration.cpeAuthConfigKey(), "CPE Authentication");
- topicUrlInfo = parseTopicUrl(cpeAuthObject.dmaapInfo().topicUrl());
- dmaapCpeAuthenticationConsumerProperties.setDmaapHostName(topicUrlInfo.getHost());
- dmaapCpeAuthenticationConsumerProperties.setDmaapPortNumber(topicUrlInfo.getPort());
- dmaapCpeAuthenticationConsumerProperties.setDmaapProtocol(newConfiguration.dmaapProtocol());
- dmaapCpeAuthenticationConsumerProperties.setDmaapContentType(newConfiguration.dmaapContentType());
- dmaapCpeAuthenticationConsumerProperties.setDmaapTopicName(topicUrlInfo.getTopicName());
- dmaapCpeAuthenticationConsumerProperties.setConsumerId(newConfiguration.dmaapConsumerConsumerId());
- dmaapCpeAuthenticationConsumerProperties.setConsumerGroup(newConfiguration.dmaapConsumerConsumerGroup());
- dmaapCpeAuthenticationConsumerProperties.setMessageLimit(newConfiguration.dmaapMessageLimit());
- dmaapCpeAuthenticationConsumerProperties.setTimeoutMs(newConfiguration.dmaapTimeoutMs());
- constructDmaapCpeAuthenticationConfiguration();
-
- GeneratedAppConfigObject.StreamsObject closeLoopObject = getStreamsObject(newConfiguration.streamPublishesMap(),
- newConfiguration.closeLoopConfigKey(), "Close Loop");
- topicUrlInfo = parseTopicUrl(closeLoopObject.dmaapInfo().topicUrl());
- dmaapProducerProperties.setDmaapHostName(topicUrlInfo.getHost());
- dmaapProducerProperties.setDmaapPortNumber(topicUrlInfo.getPort());
- dmaapProducerProperties.setDmaapProtocol(newConfiguration.dmaapProtocol());
- dmaapProducerProperties.setDmaapContentType(newConfiguration.dmaapContentType());
- dmaapProducerProperties.setDmaapTopicName(topicUrlInfo.getTopicName());
- constructDmaapProducerConfiguration();
-
- aaiClientProperties.setAaiHost(newConfiguration.aaiHost());
- aaiClientProperties.setAaiPort(newConfiguration.aaiPort());
- aaiClientProperties.setAaiProtocol(newConfiguration.aaiProtocol());
- aaiClientProperties.setAaiUserName(newConfiguration.aaiUsername());
- aaiClientProperties.setAaiUserPassword(newConfiguration.aaiPassword());
- aaiClientProperties.setAaiIgnoreSslCertificateErrors(newConfiguration.aaiIgnoreSslCertificateErrors());
- constructAaiConfiguration();
-
-
- genericProperties.setPipelinesPollingIntervalSec(newConfiguration.pipelinesPollingIntervalSec());
- genericProperties.setPipelinesTimeoutSec(newConfiguration.pipelinesTimeoutSec());
- genericProperties.getReRegistration().setPolicyScope(newConfiguration.reRegistrationPolicyScope());
- genericProperties.getReRegistration().setClControlName(newConfiguration.reRegistrationClControlName());
- genericProperties.getCpeAuthentication().setPolicyScope(newConfiguration.cpeAuthPolicyScope());
- genericProperties.getCpeAuthentication().setClControlName(newConfiguration.cpeAuthClControlName());
+ synchronized (this) {
+ cbsPollingInterval = newConfiguration.cbsPollingIntervalSec();
+
+ GeneratedAppConfigObject.StreamsObject reRegObject =
+ getStreamsObject(newConfiguration.streamSubscribesMap(), newConfiguration.reRegConfigKey(),
+ "PNF Re-Registration");
+ TopicUrlInfo topicUrlInfo = parseTopicUrl(reRegObject.dmaapInfo().topicUrl());
+ dmaapReRegistrationConsumerProperties.setDmaapHostName(topicUrlInfo.getHost());
+ dmaapReRegistrationConsumerProperties.setDmaapPortNumber(topicUrlInfo.getPort());
+ dmaapReRegistrationConsumerProperties.setDmaapProtocol(newConfiguration.dmaapProtocol());
+ dmaapReRegistrationConsumerProperties.setDmaapContentType(newConfiguration.dmaapContentType());
+ dmaapReRegistrationConsumerProperties.setDmaapTopicName(topicUrlInfo.getTopicName());
+ dmaapReRegistrationConsumerProperties.setConsumerId(newConfiguration.dmaapConsumerConsumerId());
+ dmaapReRegistrationConsumerProperties.setConsumerGroup(newConfiguration.dmaapConsumerConsumerGroup());
+ dmaapReRegistrationConsumerProperties.setMessageLimit(newConfiguration.dmaapMessageLimit());
+ dmaapReRegistrationConsumerProperties.setTimeoutMs(newConfiguration.dmaapTimeoutMs());
+ constructDmaapReRegistrationConfiguration();
+
+ GeneratedAppConfigObject.StreamsObject cpeAuthObject =
+ getStreamsObject(newConfiguration.streamSubscribesMap(), newConfiguration.cpeAuthConfigKey(),
+ "CPE Authentication");
+ topicUrlInfo = parseTopicUrl(cpeAuthObject.dmaapInfo().topicUrl());
+ dmaapCpeAuthenticationConsumerProperties.setDmaapHostName(topicUrlInfo.getHost());
+ dmaapCpeAuthenticationConsumerProperties.setDmaapPortNumber(topicUrlInfo.getPort());
+ dmaapCpeAuthenticationConsumerProperties.setDmaapProtocol(newConfiguration.dmaapProtocol());
+ dmaapCpeAuthenticationConsumerProperties.setDmaapContentType(newConfiguration.dmaapContentType());
+ dmaapCpeAuthenticationConsumerProperties.setDmaapTopicName(topicUrlInfo.getTopicName());
+ dmaapCpeAuthenticationConsumerProperties.setConsumerId(newConfiguration.dmaapConsumerConsumerId());
+ dmaapCpeAuthenticationConsumerProperties.setConsumerGroup(newConfiguration.dmaapConsumerConsumerGroup());
+ dmaapCpeAuthenticationConsumerProperties.setMessageLimit(newConfiguration.dmaapMessageLimit());
+ dmaapCpeAuthenticationConsumerProperties.setTimeoutMs(newConfiguration.dmaapTimeoutMs());
+ constructDmaapCpeAuthenticationConfiguration();
+
+ GeneratedAppConfigObject.StreamsObject closeLoopObject =
+ getStreamsObject(newConfiguration.streamPublishesMap(), newConfiguration.closeLoopConfigKey(),
+ "Close Loop");
+ topicUrlInfo = parseTopicUrl(closeLoopObject.dmaapInfo().topicUrl());
+ dmaapProducerProperties.setDmaapHostName(topicUrlInfo.getHost());
+ dmaapProducerProperties.setDmaapPortNumber(topicUrlInfo.getPort());
+ dmaapProducerProperties.setDmaapProtocol(newConfiguration.dmaapProtocol());
+ dmaapProducerProperties.setDmaapContentType(newConfiguration.dmaapContentType());
+ dmaapProducerProperties.setDmaapTopicName(topicUrlInfo.getTopicName());
+ constructDmaapProducerConfiguration();
+
+ aaiClientProperties.setAaiHost(newConfiguration.aaiHost());
+ aaiClientProperties.setAaiPort(newConfiguration.aaiPort());
+ aaiClientProperties.setAaiProtocol(newConfiguration.aaiProtocol());
+ aaiClientProperties.setAaiUserName(newConfiguration.aaiUsername());
+ aaiClientProperties.setAaiUserPassword(newConfiguration.aaiPassword());
+ aaiClientProperties.setAaiIgnoreSslCertificateErrors(newConfiguration.aaiIgnoreSslCertificateErrors());
+ constructAaiConfiguration();
+
+ genericProperties.setPipelinesPollingIntervalSec(newConfiguration.pipelinesPollingIntervalSec());
+ genericProperties.setPipelinesTimeoutSec(newConfiguration.pipelinesTimeoutSec());
+ genericProperties.setPolicyVersion(newConfiguration.policyVersion());
+ genericProperties.setClTargetType(newConfiguration.closeLoopTargetType());
+ genericProperties.setClEventStatus(newConfiguration.closeLoopEventStatus());
+ genericProperties.setClVersion(newConfiguration.closeLoopVersion());
+ genericProperties.setClTarget(newConfiguration.closeLoopTarget());
+ genericProperties.setClOriginator(newConfiguration.closeLoopOriginator());
+ genericProperties.getReRegistration().setPolicyScope(newConfiguration.reRegistrationPolicyScope());
+ genericProperties.getReRegistration().setClControlName(newConfiguration.reRegistrationClControlName());
+ genericProperties.getCpeAuthentication().setPolicyScope(newConfiguration.cpeAuthPolicyScope());
+ genericProperties.getCpeAuthentication().setClControlName(newConfiguration.cpeAuthClControlName());
+ }
notifyObservers();
}
@@ -210,7 +243,7 @@ public class ApplicationConfiguration implements ConfigurationChangeObservable {
private GeneratedAppConfigObject.StreamsObject getStreamsObject(
Map<String, GeneratedAppConfigObject.StreamsObject> map, String configKey, String messageName) {
GeneratedAppConfigObject.StreamsObject streamsObject = map.get(configKey);
- if (!streamsObject.type().equals(STREAMS_TYPE)) {
+ if (!STREAMS_TYPE.equals(streamsObject.type())) {
throw new ApplicationEnvironmentException(String.format("%s requires information about"
+ " message-router topic in ONAP", messageName));
}
diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/config/ApplicationConstants.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/config/ApplicationConstants.java
index 5718432e..d2ac3c10 100644
--- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/config/ApplicationConstants.java
+++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/config/ApplicationConstants.java
@@ -27,10 +27,13 @@ public class ApplicationConstants {
public static final String RETRIEVE_PNF_TASK_NAME = "PNF Retrieval";
public static final String RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME = "HSI CFS Service Instance Retrieval";
- public static final String STREAMS_TYPE = "message_router";
+ static final String STREAMS_TYPE = "message_router";
public static final String IN_SERVICE_NAME_IN_ONAP = "inService";
public static final String OUT_OF_SERVICE_NAME_IN_ONAP = "outOfService";
+ // Close Loop Constants
+ public static final String DCAE_BBS_EVENT_PROCESSOR_MS_INSTANCE = "DCAE.BBS_event_processor_mSInstance";
+
private ApplicationConstants() {}
} \ No newline at end of file
diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/config/ConfigurationChangeObserver.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/config/ConfigurationChangeObserver.java
index d007c982..c246aaed 100644
--- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/config/ConfigurationChangeObserver.java
+++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/config/ConfigurationChangeObserver.java
@@ -23,8 +23,7 @@ package org.onap.bbs.event.processor.config;
public interface ConfigurationChangeObserver {
/**
- * Take actions upon new application configuration.
- * @param configuration new application configuration (complete configuration)
+ * Take actions upon updates in application configuration.
*/
- void updateConfiguration(ApplicationConfiguration configuration);
+ void updateConfiguration();
}
diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/config/ConsulConfigurationGateway.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/config/ConsulConfigurationGateway.java
index 315fc793..6530f0b2 100644
--- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/config/ConsulConfigurationGateway.java
+++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/config/ConsulConfigurationGateway.java
@@ -79,17 +79,7 @@ public class ConsulConfigurationGateway {
getMissingEnvironmentVariables()));
}
- RequestDiagnosticContext diagnosticContext = RequestDiagnosticContext.create();
-
- // Necessary properties from the environment (Consul host:port, service-name (hostname), CBS name)
- EnvProperties env = EnvProperties.fromEnvironment();
-
- // Create the client and use it to get the configuration
- cbsFetchPipeline = CbsClientFactory.createCbsClient(env)
- .doOnError(e -> LOGGER.warn("CBS Configuration fetch failed with error: {}", e))
- .retry(e -> true)
- .flatMapMany(cbsClient -> cbsClient.updates(diagnosticContext, initialDelay, period))
- .subscribe(this::parseConsulRetrievedConfiguration, this::handleErrors);
+ fetchConfig(initialDelay, period);
}
boolean environmentNotReady() {
@@ -121,6 +111,25 @@ public class ConsulConfigurationGateway {
private void handleErrors(Throwable throwable) {
LOGGER.error("Periodic CBS configuration polling was terminated with error: {}", throwable);
+ LOGGER.info("Will restart CBS configuration fetching job due to abnormal termination."
+ + " Will start fetching after 60 seconds (please correct configuration in the meantime)"
+ + " and it will then poll every {} seconds (reverting to default)",
+ configuration.getCbsPollingInterval());
+ fetchConfig(Duration.ofSeconds(60), Duration.ofSeconds(configuration.getCbsPollingInterval()));
+ }
+
+ private void fetchConfig(Duration initialDelay, Duration period) {
+ RequestDiagnosticContext diagnosticContext = RequestDiagnosticContext.create();
+
+ // Necessary properties from the environment (Consul host:port, service-name (hostname), CBS name)
+ EnvProperties env = EnvProperties.fromEnvironment();
+
+ // Create the client and use it to get the configuration
+ cbsFetchPipeline = CbsClientFactory.createCbsClient(env)
+ .doOnError(e -> LOGGER.warn("CBS Configuration fetch failed with error: {}", e))
+ .retry(e -> true)
+ .flatMapMany(cbsClient -> cbsClient.updates(diagnosticContext, initialDelay, period))
+ .subscribe(this::parseConsulRetrievedConfiguration, this::handleErrors);
}
@NotNull
@@ -149,11 +158,20 @@ public class ConsulConfigurationGateway {
final int pipelinesPollingIntervalSec = configObject.get("application.pipelinesPollingIntervalSec").getAsInt();
final int pipelinesTimeoutSec = configObject.get("application.pipelinesTimeoutSec").getAsInt();
final int cbsPollingIntervalSec = configObject.get("application.cbsPollingIntervalSec").getAsInt();
+
final String reRegPolicyScope = configObject.get("application.reregistration.policyScope").getAsString();
final String reRegClControlName = configObject.get("application.reregistration.clControlName").getAsString();
final String cpeAuthPolicyScope = configObject.get("application.cpe.authentication.policyScope").getAsString();
final String cpeAuthClControlName =
configObject.get("application.cpe.authentication.clControlName").getAsString();
+
+ final String policyVersion = configObject.get("application.policyVersion").getAsString();
+ final String closeLoopTargetType = configObject.get("application.clTargetType").getAsString();
+ final String closeLoopEventStatus = configObject.get("application.clEventStatus").getAsString();
+ final String closeLoopVersion = configObject.get("application.clVersion").getAsString();
+ final String closeLoopTarget = configObject.get("application.clTarget").getAsString();
+ final String closeLoopOriginator = configObject.get("application.clOriginator").getAsString();
+
final String reRegConfigKey = configObject.get("application.reregistration.configKey").getAsString();
final String cpeAuthConfigKey = configObject.get("application.cpeAuth.configKey").getAsString();
final String closeLoopConfigKey = configObject.get("application.closeLoop.configKey").getAsString();
@@ -179,6 +197,12 @@ public class ConsulConfigurationGateway {
.cbsPollingIntervalSec(cbsPollingIntervalSec)
.reRegistrationPolicyScope(reRegPolicyScope)
.reRegistrationClControlName(reRegClControlName)
+ .policyVersion(policyVersion)
+ .closeLoopTargetType(closeLoopTargetType)
+ .closeLoopEventStatus(closeLoopEventStatus)
+ .closeLoopVersion(closeLoopVersion)
+ .closeLoopTarget(closeLoopTarget)
+ .closeLoopOriginator(closeLoopOriginator)
.cpeAuthPolicyScope(cpeAuthPolicyScope)
.cpeAuthClControlName(cpeAuthClControlName)
.reRegConfigKey(reRegConfigKey)
diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/config/GenericProperties.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/config/GenericProperties.java
index bc8020a4..71fd5bbd 100644
--- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/config/GenericProperties.java
+++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/config/GenericProperties.java
@@ -40,6 +40,24 @@ public class GenericProperties {
private int pipelinesTimeoutSec;
+ @NotBlank
+ private String policyVersion;
+
+ @NotBlank
+ private String clTargetType;
+
+ @NotBlank
+ private String clEventStatus;
+
+ @NotBlank
+ private String clVersion;
+
+ @NotBlank
+ private String clTarget;
+
+ @NotBlank
+ private String clOriginator;
+
private ReRegistrationGenericProperties reRegistration;
private CpeAuthenticationGenericProperties cpeAuthentication;
diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/model/GeneratedAppConfigObject.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/model/GeneratedAppConfigObject.java
index c48ea6ef..12716ce4 100644
--- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/model/GeneratedAppConfigObject.java
+++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/model/GeneratedAppConfigObject.java
@@ -74,8 +74,7 @@ public interface GeneratedAppConfigObject {
@SerializedName(value = "application.pipelinesTimeoutSec", alternate = "application.pipelinesTimeoutSec")
int pipelinesTimeoutSec();
- @SerializedName(value = "application.cbsPollingIntervalSec",
- alternate = "application.cbsPollingIntervalSec")
+ @SerializedName(value = "application.cbsPollingIntervalSec", alternate = "application.cbsPollingIntervalSec")
int cbsPollingIntervalSec();
@SerializedName(value = "application.reregistration.policyScope",
@@ -94,6 +93,24 @@ public interface GeneratedAppConfigObject {
alternate = "application.reregistration.clControlName")
String cpeAuthClControlName();
+ @SerializedName(value = "application.policyVersion", alternate = "application.policyVersion")
+ String policyVersion();
+
+ @SerializedName(value = "application.clTargetType", alternate = "application.clTargetType")
+ String closeLoopTargetType();
+
+ @SerializedName(value = "application.clEventStatus", alternate = "application.clEventStatus")
+ String closeLoopEventStatus();
+
+ @SerializedName(value = "application.clVersion", alternate = "application.clVersion")
+ String closeLoopVersion();
+
+ @SerializedName(value = "application.clTarget", alternate = "application.clTarget")
+ String closeLoopTarget();
+
+ @SerializedName(value = "application.clOriginator", alternate = "application.clOriginator")
+ String closeLoopOriginator();
+
@SerializedName(value = "application.reregistration.configKey", alternate = "application.reregistration.configKey")
String reRegConfigKey();
diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/pipelines/CpeAuthenticationPipeline.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/pipelines/CpeAuthenticationPipeline.java
index a8d08576..d4688594 100644
--- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/pipelines/CpeAuthenticationPipeline.java
+++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/pipelines/CpeAuthenticationPipeline.java
@@ -21,6 +21,7 @@
package org.onap.bbs.event.processor.pipelines;
import static org.onap.bbs.event.processor.config.ApplicationConstants.CONSUME_CPE_AUTHENTICATION_TASK_NAME;
+import static org.onap.bbs.event.processor.config.ApplicationConstants.DCAE_BBS_EVENT_PROCESSOR_MS_INSTANCE;
import static org.onap.bbs.event.processor.config.ApplicationConstants.RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME;
import static org.onap.bbs.event.processor.config.ApplicationConstants.RETRIEVE_PNF_TASK_NAME;
import static org.onap.dcaegen2.services.sdk.rest.services.model.logging.MdcVariables.INSTANCE_UUID;
@@ -37,8 +38,6 @@ import java.util.concurrent.TimeoutException;
import javax.net.ssl.SSLException;
import org.onap.bbs.event.processor.config.ApplicationConfiguration;
-import org.onap.bbs.event.processor.exceptions.AaiTaskException;
-import org.onap.bbs.event.processor.exceptions.DmaapException;
import org.onap.bbs.event.processor.exceptions.EmptyDmaapResponseException;
import org.onap.bbs.event.processor.model.ControlLoopPublisherDmaapModel;
import org.onap.bbs.event.processor.model.CpeAuthenticationConsumerDmaapModel;
@@ -66,14 +65,7 @@ public class CpeAuthenticationPipeline {
private static final Logger LOGGER = LoggerFactory.getLogger(CpeAuthenticationPipeline.class);
- private static final String DCAE_BBS_EVENT_PROCESSOR_MS_INSTANCE = "DCAE.BBS_event_processor_mSInstance";
- private static final String POLICY_VERSION = "1.0.0.5";
private static final String POLICY_NAME = "CPE_Authentication";
- private static final String CLOSE_LOOP_TARGET_TYPE = "VM";
- private static final String CLOSED_LOOP_EVENT_STATUS = "ONSET";
- private static final String CLOSE_LOOP_VERSION = "1.0.2";
- private static final String CLOSE_LOOP_TARGET = "vserver.vserver-name";
- private static final String FROM = "DCAE";
private DmaapCpeAuthenticationConsumerTask consumerTask;
private DmaapPublisherTask publisherTask;
@@ -145,11 +137,13 @@ public class CpeAuthenticationPipeline {
if (e instanceof TimeoutException) {
LOGGER.warn("Timed out waiting for DMaaP response");
} else if (e instanceof EmptyDmaapResponseException) {
- LOGGER.warn("Nothing to consume from DMaaP");
+ LOGGER.info("Nothing to consume from DMaaP");
+ } else {
+ LOGGER.error("DMaaP Consumer error: {}", e.getMessage());
}
})
.onErrorResume(
- e -> (e instanceof EmptyDmaapResponseException || e instanceof TimeoutException),
+ e -> e instanceof Exception,
e -> Mono.empty())
.map(event -> {
// For each message, we have to keep separate state. This state will be enhanced
@@ -180,7 +174,7 @@ public class CpeAuthenticationPipeline {
e.getMessage())
)
.onErrorResume(
- e -> e instanceof AaiTaskException || e instanceof TimeoutException,
+ e -> e instanceof Exception,
e -> Mono.empty())
.map(p -> {
state.setPnfAaiObject(p);
@@ -199,9 +193,9 @@ public class CpeAuthenticationPipeline {
// towards the HSI CFS service
String serviceInstanceId = pnf.getRelationshipListAaiObject().getRelationshipEntries()
.stream()
- .filter(e -> e.getRelatedTo().equals("service-instance"))
+ .filter(e -> "service-instance".equals(e.getRelatedTo()))
.flatMap(e -> e.getRelationshipData().stream())
- .filter(d -> d.getRelationshipKey().equals("service-instance.service-instance-id"))
+ .filter(d -> "service-instance.service-instance-id".equals(d.getRelationshipKey()))
.map(RelationshipListAaiObject.RelationshipDataEntryAaiObject::getRelationshipValue)
.findFirst().orElse("");
@@ -223,7 +217,7 @@ public class CpeAuthenticationPipeline {
e.getMessage())
)
.onErrorResume(
- e -> e instanceof AaiTaskException || e instanceof TimeoutException,
+ e -> e instanceof Exception,
e -> Mono.empty())
.map(s -> {
state.setHsiCfsServiceInstance(s);
@@ -253,7 +247,7 @@ public class CpeAuthenticationPipeline {
)
.doOnError(e -> LOGGER.error("Error while triggering Policy: {}", e.getMessage()))
.onErrorResume(
- e -> e instanceof DmaapException || e instanceof TimeoutException,
+ e -> e instanceof Exception,
e -> Mono.empty());
}
@@ -267,7 +261,7 @@ public class CpeAuthenticationPipeline {
return optionalMetadata
.map(list -> list.getMetadataEntries()
.stream()
- .anyMatch(m -> m.getMetaname().equals("rgw-mac-address")
+ .anyMatch(m -> "rgw-mac-address".equals(m.getMetaname())
&& m.getMetavalue().equals(eventRgwMacAddress)))
.orElse(false);
}
@@ -287,18 +281,18 @@ public class CpeAuthenticationPipeline {
ControlLoopPublisherDmaapModel triggerEvent = ImmutableControlLoopPublisherDmaapModel.builder()
.closedLoopEventClient(DCAE_BBS_EVENT_PROCESSOR_MS_INSTANCE)
- .policyVersion(POLICY_VERSION)
+ .policyVersion(configuration.getPolicyVersion())
.policyName(POLICY_NAME)
.policyScope(configuration.getCpeAuthenticationCloseLoopPolicyScope())
- .targetType(CLOSE_LOOP_TARGET_TYPE)
+ .targetType(configuration.getCloseLoopTargetType())
.aaiEnrichmentData(enrichmentData)
.closedLoopAlarmStart(Instant.now().getEpochSecond())
- .closedLoopEventStatus(CLOSED_LOOP_EVENT_STATUS)
+ .closedLoopEventStatus(configuration.getCloseLoopEventStatus())
.closedLoopControlName(configuration.getCpeAuthenticationCloseLoopControlName())
- .version(CLOSE_LOOP_VERSION)
- .target(CLOSE_LOOP_TARGET)
+ .version(configuration.getCloseLoopVersion())
+ .target(configuration.getCloseLoopTarget())
.requestId(UUID.randomUUID().toString())
- .originator(FROM)
+ .originator(configuration.getCloseLoopOriginator())
.build();
LOGGER.debug("Processing Step: Publish for Policy");
LOGGER.trace("Trigger Policy event: ({})",triggerEvent);
diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/pipelines/ReRegistrationPipeline.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/pipelines/ReRegistrationPipeline.java
index 7f96cdd5..0f28d7c5 100644
--- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/pipelines/ReRegistrationPipeline.java
+++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/pipelines/ReRegistrationPipeline.java
@@ -21,6 +21,7 @@
package org.onap.bbs.event.processor.pipelines;
import static org.onap.bbs.event.processor.config.ApplicationConstants.CONSUME_REREGISTRATION_TASK_NAME;
+import static org.onap.bbs.event.processor.config.ApplicationConstants.DCAE_BBS_EVENT_PROCESSOR_MS_INSTANCE;
import static org.onap.bbs.event.processor.config.ApplicationConstants.RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME;
import static org.onap.bbs.event.processor.config.ApplicationConstants.RETRIEVE_PNF_TASK_NAME;
import static org.onap.dcaegen2.services.sdk.rest.services.model.logging.MdcVariables.INSTANCE_UUID;
@@ -37,8 +38,6 @@ import java.util.concurrent.TimeoutException;
import javax.net.ssl.SSLException;
import org.onap.bbs.event.processor.config.ApplicationConfiguration;
-import org.onap.bbs.event.processor.exceptions.AaiTaskException;
-import org.onap.bbs.event.processor.exceptions.DmaapException;
import org.onap.bbs.event.processor.exceptions.EmptyDmaapResponseException;
import org.onap.bbs.event.processor.model.ControlLoopPublisherDmaapModel;
import org.onap.bbs.event.processor.model.ImmutableControlLoopPublisherDmaapModel;
@@ -65,14 +64,7 @@ public class ReRegistrationPipeline {
private static final Logger LOGGER = LoggerFactory.getLogger(ReRegistrationPipeline.class);
- private static final String DCAE_BBS_EVENT_PROCESSOR_MS_INSTANCE = "DCAE.BBS_event_processor_mSInstance";
- private static final String POLICY_VERSION = "1.0.0.5";
private static final String POLICY_NAME = "Nomadic_ONT";
- private static final String CLOSE_LOOP_TARGET_TYPE = "VM";
- private static final String CLOSED_LOOP_EVENT_STATUS = "ONSET";
- private static final String CLOSE_LOOP_VERSION = "1.0.2";
- private static final String CLOSE_LOOP_TARGET = "vserver.vserver-name";
- private static final String FROM = "DCAE";
private DmaapReRegistrationConsumerTask consumerTask;
private DmaapPublisherTask publisherTask;
@@ -144,11 +136,13 @@ public class ReRegistrationPipeline {
if (e instanceof TimeoutException) {
LOGGER.warn("Timed out waiting for DMaaP response");
} else if (e instanceof EmptyDmaapResponseException) {
- LOGGER.warn("Nothing to consume from DMaaP");
+ LOGGER.info("Nothing to consume from DMaaP");
+ } else {
+ LOGGER.error("DMaaP Consumer error: {}", e.getMessage());
}
})
.onErrorResume(
- e -> (e instanceof EmptyDmaapResponseException || e instanceof TimeoutException),
+ e -> e instanceof Exception,
e -> Mono.empty())
.map(event -> {
// For each message, we have to keep separate state. This state will be enhanced
@@ -179,7 +173,7 @@ public class ReRegistrationPipeline {
e.getMessage())
)
.onErrorResume(
- e -> e instanceof AaiTaskException || e instanceof TimeoutException,
+ e -> e instanceof Exception,
e -> Mono.empty())
.map(p -> {
state.setPnfAaiObject(p);
@@ -204,9 +198,9 @@ public class ReRegistrationPipeline {
// towards the HSI CFS service
String serviceInstanceId = pnf.getRelationshipListAaiObject().getRelationshipEntries()
.stream()
- .filter(e -> e.getRelatedTo().equals("service-instance"))
+ .filter(e -> "service-instance".equals(e.getRelatedTo()))
.flatMap(e -> e.getRelationshipData().stream())
- .filter(d -> d.getRelationshipKey().equals("service-instance.service-instance-id"))
+ .filter(d -> "service-instance.service-instance-id".equals(d.getRelationshipKey()))
.map(RelationshipListAaiObject.RelationshipDataEntryAaiObject::getRelationshipValue)
.findFirst().orElse("");
@@ -228,7 +222,7 @@ public class ReRegistrationPipeline {
e.getMessage())
)
.onErrorResume(
- e -> e instanceof AaiTaskException || e instanceof TimeoutException,
+ e -> e instanceof Exception,
e -> Mono.empty())
.map(s -> {
state.setHsiCfsServiceInstance(s);
@@ -241,7 +235,7 @@ public class ReRegistrationPipeline {
state.getPnfAaiObject().getRelationshipListAaiObject().getRelationshipEntries();
// If no logical-link, fail further processing
- if (relationshipEntries.stream().noneMatch(e -> e.getRelatedTo().equals("logical-link"))) {
+ if (relationshipEntries.stream().noneMatch(e -> "logical-link".equals(e.getRelatedTo()))) {
LOGGER.warn("PNF {} does not have any logical-links bridged. Stop further processing",
state.getPnfAaiObject().getPnfName());
return true;
@@ -250,7 +244,7 @@ public class ReRegistrationPipeline {
// Assuming PNF will only have one logical-link per BBS use case design
boolean isNotRelocation = relationshipEntries
.stream()
- .filter(e -> e.getRelatedTo().equals("logical-link"))
+ .filter(e -> "logical-link".equals(e.getRelatedTo()))
.flatMap(e -> e.getRelationshipData().stream())
.anyMatch(d -> d.getRelationshipValue()
.equals(state.getReRegistrationEvent().getAttachmentPoint()));
@@ -278,7 +272,7 @@ public class ReRegistrationPipeline {
)
.doOnError(e -> LOGGER.error("Error while triggering Policy: {}", e.getMessage()))
.onErrorResume(
- e -> e instanceof DmaapException || e instanceof TimeoutException,
+ e -> e instanceof Exception,
e -> Mono.empty());
}
@@ -301,18 +295,18 @@ public class ReRegistrationPipeline {
ControlLoopPublisherDmaapModel triggerEvent = ImmutableControlLoopPublisherDmaapModel.builder()
.closedLoopEventClient(DCAE_BBS_EVENT_PROCESSOR_MS_INSTANCE)
- .policyVersion(POLICY_VERSION)
+ .policyVersion(configuration.getPolicyVersion())
.policyName(POLICY_NAME)
.policyScope(configuration.getReRegistrationCloseLoopPolicyScope())
- .targetType(CLOSE_LOOP_TARGET_TYPE)
+ .targetType(configuration.getCloseLoopTargetType())
.aaiEnrichmentData(enrichmentData)
.closedLoopAlarmStart(Instant.now().getEpochSecond())
- .closedLoopEventStatus(CLOSED_LOOP_EVENT_STATUS)
+ .closedLoopEventStatus(configuration.getCloseLoopEventStatus())
.closedLoopControlName(configuration.getReRegistrationCloseLoopControlName())
- .version(CLOSE_LOOP_VERSION)
- .target(CLOSE_LOOP_TARGET)
+ .version(configuration.getCloseLoopVersion())
+ .target(configuration.getCloseLoopTarget())
.requestId(UUID.randomUUID().toString())
- .originator(FROM)
+ .originator(configuration.getCloseLoopOriginator())
.build();
LOGGER.debug("Processing Step: Publish for Policy");
LOGGER.trace("Trigger Policy event: ({})",triggerEvent);
diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/pipelines/Scheduler.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/pipelines/Scheduler.java
index 64d212ad..5fbb0875 100644
--- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/pipelines/Scheduler.java
+++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/pipelines/Scheduler.java
@@ -112,17 +112,13 @@ public class Scheduler implements ConfigurationChangeObserver {
}
@Override
- public void updateConfiguration(ApplicationConfiguration newConfiguration) {
- if (newConfiguration.getPipelinesPollingIntervalInSeconds() != currentPipelinesPollingInterval
- || newConfiguration.getCbsPollingInterval() != currentCbsPollingInterval) {
- configuration = newConfiguration;
- }
- if (newConfiguration.getPipelinesPollingIntervalInSeconds() != currentPipelinesPollingInterval) {
+ public void updateConfiguration() {
+ if (configuration.getPipelinesPollingIntervalInSeconds() != currentPipelinesPollingInterval) {
LOGGER.info("Pipelines Polling interval has changed. Re-scheduling processing pipelines");
cancelScheduledProcessingTasks();
reScheduleProcessingTasks();
}
- int newCbsPollingInterval = newConfiguration.getCbsPollingInterval();
+ int newCbsPollingInterval = configuration.getCbsPollingInterval();
if (newCbsPollingInterval != currentCbsPollingInterval) {
if (newCbsPollingInterval < DEFAULT_CBS_POLLING_INTERVAL) {
LOGGER.warn("CBS Polling interval is too small ({}). Will not re-schedule CBS job",
diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/AaiClientTask.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/AaiClientTask.java
index 239ccd6c..377fe0f6 100644
--- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/AaiClientTask.java
+++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/AaiClientTask.java
@@ -22,7 +22,6 @@ package org.onap.bbs.event.processor.tasks;
import org.onap.bbs.event.processor.model.PnfAaiObject;
import org.onap.bbs.event.processor.model.ServiceInstanceAaiObject;
-import org.onap.bbs.event.processor.utilities.AaiReactiveClient;
import reactor.core.publisher.Mono;
@@ -31,6 +30,4 @@ public interface AaiClientTask {
Mono<PnfAaiObject> executePnfRetrieval(String taskName, String url);
Mono<ServiceInstanceAaiObject> executeServiceInstanceRetrieval(String taskName, String url);
-
- AaiReactiveClient resolveClient();
}
diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/AaiClientTaskImpl.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/AaiClientTaskImpl.java
index 1dc43557..92516a04 100644
--- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/AaiClientTaskImpl.java
+++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/AaiClientTaskImpl.java
@@ -39,7 +39,7 @@ public class AaiClientTaskImpl implements AaiClientTask {
private final AaiReactiveClient reactiveClient;
@Autowired
- public AaiClientTaskImpl(AaiReactiveClient reactiveClient) {
+ AaiClientTaskImpl(AaiReactiveClient reactiveClient) {
this.reactiveClient = reactiveClient;
}
@@ -49,7 +49,7 @@ public class AaiClientTaskImpl implements AaiClientTask {
throw new AaiTaskException("Cannot invoke an A&AI client task with an invalid URL");
}
LOGGER.info("Executing task ({}) for retrieving PNF object", taskName);
- return resolveClient().getPnfObjectDataFor(url);
+ return reactiveClient.getPnfObjectDataFor(url);
}
@Override
@@ -58,11 +58,6 @@ public class AaiClientTaskImpl implements AaiClientTask {
throw new AaiTaskException("Cannot invoke an A&AI client task with an invalid URL");
}
LOGGER.info("Executing task ({}) for retrieving Service Instance object", taskName);
- return resolveClient().getServiceInstanceObjectDataFor(url);
- }
-
- @Override
- public AaiReactiveClient resolveClient() {
- return reactiveClient;
+ return reactiveClient.getServiceInstanceObjectDataFor(url);
}
}
diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapCpeAuthenticationConsumerTask.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapCpeAuthenticationConsumerTask.java
index 2e8a9ee2..5b7ade12 100644
--- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapCpeAuthenticationConsumerTask.java
+++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapCpeAuthenticationConsumerTask.java
@@ -23,13 +23,10 @@ package org.onap.bbs.event.processor.tasks;
import javax.net.ssl.SSLException;
import org.onap.bbs.event.processor.model.CpeAuthenticationConsumerDmaapModel;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.DMaaPConsumerReactiveHttpClient;
import reactor.core.publisher.Flux;
public interface DmaapCpeAuthenticationConsumerTask {
Flux<CpeAuthenticationConsumerDmaapModel> execute(String object) throws SSLException;
-
- DMaaPConsumerReactiveHttpClient resolveClient() throws SSLException;
}
diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapCpeAuthenticationConsumerTaskImpl.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapCpeAuthenticationConsumerTaskImpl.java
index e9546638..4c5122d5 100644
--- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapCpeAuthenticationConsumerTaskImpl.java
+++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapCpeAuthenticationConsumerTaskImpl.java
@@ -20,9 +20,12 @@
package org.onap.bbs.event.processor.tasks;
+import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
import javax.net.ssl.SSLException;
import org.onap.bbs.event.processor.config.ApplicationConfiguration;
+import org.onap.bbs.event.processor.config.ConfigurationChangeObserver;
import org.onap.bbs.event.processor.exceptions.EmptyDmaapResponseException;
import org.onap.bbs.event.processor.model.CpeAuthenticationConsumerDmaapModel;
import org.onap.bbs.event.processor.utilities.CpeAuthenticationDmaapConsumerJsonParser;
@@ -38,15 +41,21 @@ import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@Component
-public class DmaapCpeAuthenticationConsumerTaskImpl implements DmaapCpeAuthenticationConsumerTask {
+public class DmaapCpeAuthenticationConsumerTaskImpl
+ implements DmaapCpeAuthenticationConsumerTask, ConfigurationChangeObserver {
private static final Logger LOGGER = LoggerFactory.getLogger(DmaapCpeAuthenticationConsumerTaskImpl.class);
- private final ApplicationConfiguration configuration;
+ private ApplicationConfiguration configuration;
private final CpeAuthenticationDmaapConsumerJsonParser cpeAuthenticationDmaapConsumerJsonParser;
private final ConsumerReactiveHttpClientFactory httpClientFactory;
+ private static final EmptyDmaapResponseException EMPTY_DMAAP_EXCEPTION =
+ new EmptyDmaapResponseException("CPE Authentication: Got an empty response from DMaaP");
+
+ private DMaaPConsumerReactiveHttpClient httpClient;
+
@Autowired
- public DmaapCpeAuthenticationConsumerTaskImpl(ApplicationConfiguration configuration) {
+ public DmaapCpeAuthenticationConsumerTaskImpl(ApplicationConfiguration configuration) throws SSLException {
this(configuration, new CpeAuthenticationDmaapConsumerJsonParser(),
new ConsumerReactiveHttpClientFactory(new DMaaPReactiveWebClientFactory()));
}
@@ -54,20 +63,41 @@ public class DmaapCpeAuthenticationConsumerTaskImpl implements DmaapCpeAuthentic
DmaapCpeAuthenticationConsumerTaskImpl(ApplicationConfiguration configuration,
CpeAuthenticationDmaapConsumerJsonParser
cpeAuthenticationDmaapConsumerJsonParser,
- ConsumerReactiveHttpClientFactory httpClientFactory) {
+ ConsumerReactiveHttpClientFactory httpClientFactory) throws SSLException {
this.configuration = configuration;
this.cpeAuthenticationDmaapConsumerJsonParser = cpeAuthenticationDmaapConsumerJsonParser;
this.httpClientFactory = httpClientFactory;
+
+ httpClient = httpClientFactory.create(this.configuration.getDmaapCpeAuthenticationConsumerConfiguration());
+ }
+
+ @PostConstruct
+ void registerForConfigChanges() {
+ configuration.register(this);
+ }
+
+ @PreDestroy
+ void unRegisterForConfigChanges() {
+ configuration.unRegister(this);
}
@Override
- public Flux<CpeAuthenticationConsumerDmaapModel> execute(String taskName) throws SSLException {
+ public synchronized void updateConfiguration() {
+ try {
+ LOGGER.info("DMaaP CPE authentication consumer update due to new application configuration");
+ httpClient = httpClientFactory.create(this.configuration.getDmaapCpeAuthenticationConsumerConfiguration());
+ } catch (SSLException e) {
+ LOGGER.error("Error while updating HTTP Client after a config update: SSL exception");
+ }
+ }
+
+ @Override
+ public Flux<CpeAuthenticationConsumerDmaapModel> execute(String taskName) {
LOGGER.debug("Executing task for CPE-Authentication with name \"{}\"", taskName);
- DMaaPConsumerReactiveHttpClient dmaaPConsumerReactiveHttpClient = resolveClient();
- Mono<String> response = dmaaPConsumerReactiveHttpClient.getDMaaPConsumerResponse();
+ DMaaPConsumerReactiveHttpClient httpClient = getHttpClient();
+ Mono<String> response = httpClient.getDMaaPConsumerResponse();
return cpeAuthenticationDmaapConsumerJsonParser.extractModelFromDmaap(response)
- .switchIfEmpty(Flux.error(
- new EmptyDmaapResponseException("CPE Authentication: Got an empty response from DMaaP")))
+ .switchIfEmpty(Flux.error(EMPTY_DMAAP_EXCEPTION))
.doOnError(e -> {
if (!(e instanceof EmptyDmaapResponseException)) {
LOGGER.error("DMaaP Consumption Exception: {}", e.getMessage());
@@ -75,8 +105,7 @@ public class DmaapCpeAuthenticationConsumerTaskImpl implements DmaapCpeAuthentic
});
}
- @Override
- public DMaaPConsumerReactiveHttpClient resolveClient() throws SSLException {
- return httpClientFactory.create(configuration.getDmaapCpeAuthenticationConsumerConfiguration());
+ private synchronized DMaaPConsumerReactiveHttpClient getHttpClient() {
+ return httpClient;
}
}
diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapPublisherTask.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapPublisherTask.java
index bc18064b..bddd2ecc 100644
--- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapPublisherTask.java
+++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapPublisherTask.java
@@ -21,7 +21,6 @@
package org.onap.bbs.event.processor.tasks;
import org.onap.bbs.event.processor.model.ControlLoopPublisherDmaapModel;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.producer.DMaaPPublisherReactiveHttpClient;
import org.springframework.http.ResponseEntity;
import reactor.core.publisher.Mono;
@@ -29,6 +28,4 @@ import reactor.core.publisher.Mono;
public interface DmaapPublisherTask {
Mono<ResponseEntity<String>> execute(ControlLoopPublisherDmaapModel controlLoopPublisherDmaapModel);
-
- DMaaPPublisherReactiveHttpClient resolveClient();
}
diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapPublisherTaskImpl.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapPublisherTaskImpl.java
index 1dd39bec..7b227211 100644
--- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapPublisherTaskImpl.java
+++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapPublisherTaskImpl.java
@@ -20,7 +20,11 @@
package org.onap.bbs.event.processor.tasks;
+import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
+
import org.onap.bbs.event.processor.config.ApplicationConfiguration;
+import org.onap.bbs.event.processor.config.ConfigurationChangeObserver;
import org.onap.bbs.event.processor.exceptions.DmaapException;
import org.onap.bbs.event.processor.model.ControlLoopPublisherDmaapModel;
import org.onap.bbs.event.processor.utilities.ControlLoopJsonBodyBuilder;
@@ -36,12 +40,14 @@ import org.springframework.stereotype.Component;
import reactor.core.publisher.Mono;
@Component
-public class DmaapPublisherTaskImpl implements DmaapPublisherTask {
+public class DmaapPublisherTaskImpl implements DmaapPublisherTask, ConfigurationChangeObserver {
private static final Logger LOGGER = LoggerFactory.getLogger(DmaapPublisherTaskImpl.class);
- private final ApplicationConfiguration configuration;
+ private ApplicationConfiguration configuration;
private final PublisherReactiveHttpClientFactory httpClientFactory;
+ private DMaaPPublisherReactiveHttpClient httpClient;
+
@Autowired
DmaapPublisherTaskImpl(ApplicationConfiguration configuration) {
this(configuration, new PublisherReactiveHttpClientFactory(new DmaaPRestTemplateFactory(),
@@ -52,6 +58,24 @@ public class DmaapPublisherTaskImpl implements DmaapPublisherTask {
PublisherReactiveHttpClientFactory httpClientFactory) {
this.configuration = configuration;
this.httpClientFactory = httpClientFactory;
+
+ httpClient = httpClientFactory.create(this.configuration.getDmaapPublisherConfiguration());
+ }
+
+ @PostConstruct
+ void registerForConfigChanges() {
+ configuration.register(this);
+ }
+
+ @PreDestroy
+ void unRegisterForConfigChanges() {
+ configuration.unRegister(this);
+ }
+
+ @Override
+ public synchronized void updateConfiguration() {
+ LOGGER.info("DMaaP Publisher update due to new application configuration");
+ httpClient = httpClientFactory.create(this.configuration.getDmaapPublisherConfiguration());
}
@Override
@@ -60,12 +84,11 @@ public class DmaapPublisherTaskImpl implements DmaapPublisherTask {
throw new DmaapException("Cannot invoke a DMaaP Publish task with a null message");
}
LOGGER.info("Executing task for publishing control loop message \n{}", controlLoopPublisherDmaapModel);
- DMaaPPublisherReactiveHttpClient dmaapPublisherReactiveHttpClient = resolveClient();
- return dmaapPublisherReactiveHttpClient.getDMaaPProducerResponse(controlLoopPublisherDmaapModel);
+ DMaaPPublisherReactiveHttpClient httpClient = getHttpClient();
+ return httpClient.getDMaaPProducerResponse(controlLoopPublisherDmaapModel);
}
- @Override
- public DMaaPPublisherReactiveHttpClient resolveClient() {
- return httpClientFactory.create(configuration.getDmaapPublisherConfiguration());
+ private synchronized DMaaPPublisherReactiveHttpClient getHttpClient() {
+ return httpClient;
}
}
diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapReRegistrationConsumerTask.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapReRegistrationConsumerTask.java
index ea459d00..77f2ef8e 100644
--- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapReRegistrationConsumerTask.java
+++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapReRegistrationConsumerTask.java
@@ -23,13 +23,10 @@ package org.onap.bbs.event.processor.tasks;
import javax.net.ssl.SSLException;
import org.onap.bbs.event.processor.model.ReRegistrationConsumerDmaapModel;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.DMaaPConsumerReactiveHttpClient;
import reactor.core.publisher.Flux;
public interface DmaapReRegistrationConsumerTask {
Flux<ReRegistrationConsumerDmaapModel> execute(String taskName) throws SSLException;
-
- DMaaPConsumerReactiveHttpClient resolveClient() throws SSLException;
}
diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapReRegistrationConsumerTaskImpl.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapReRegistrationConsumerTaskImpl.java
index d8610215..6985396c 100644
--- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapReRegistrationConsumerTaskImpl.java
+++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapReRegistrationConsumerTaskImpl.java
@@ -20,9 +20,12 @@
package org.onap.bbs.event.processor.tasks;
+import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
import javax.net.ssl.SSLException;
import org.onap.bbs.event.processor.config.ApplicationConfiguration;
+import org.onap.bbs.event.processor.config.ConfigurationChangeObserver;
import org.onap.bbs.event.processor.exceptions.EmptyDmaapResponseException;
import org.onap.bbs.event.processor.model.ReRegistrationConsumerDmaapModel;
import org.onap.bbs.event.processor.utilities.ReRegistrationDmaapConsumerJsonParser;
@@ -38,35 +41,63 @@ import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@Component
-public class DmaapReRegistrationConsumerTaskImpl implements DmaapReRegistrationConsumerTask {
+public class DmaapReRegistrationConsumerTaskImpl implements DmaapReRegistrationConsumerTask,
+ ConfigurationChangeObserver {
private static final Logger LOGGER = LoggerFactory.getLogger(DmaapReRegistrationConsumerTaskImpl.class);
- private final ApplicationConfiguration configuration;
+ private ApplicationConfiguration configuration;
private final ReRegistrationDmaapConsumerJsonParser reRegistrationDmaapConsumerJsonParser;
private final ConsumerReactiveHttpClientFactory httpClientFactory;
+ private static final EmptyDmaapResponseException EMPTY_DMAAP_EXCEPTION =
+ new EmptyDmaapResponseException("PNF Re-Registration: Got an empty response from DMaaP");
+
+ private DMaaPConsumerReactiveHttpClient httpClient;
+
@Autowired
- public DmaapReRegistrationConsumerTaskImpl(ApplicationConfiguration configuration) {
+ public DmaapReRegistrationConsumerTaskImpl(ApplicationConfiguration configuration) throws SSLException {
this(configuration, new ReRegistrationDmaapConsumerJsonParser(),
new ConsumerReactiveHttpClientFactory(new DMaaPReactiveWebClientFactory()));
}
DmaapReRegistrationConsumerTaskImpl(ApplicationConfiguration configuration,
ReRegistrationDmaapConsumerJsonParser reRegDmaapConsumerJsonParser,
- ConsumerReactiveHttpClientFactory httpClientFactory) {
+ ConsumerReactiveHttpClientFactory httpClientFactory)
+ throws SSLException {
this.configuration = configuration;
this.reRegistrationDmaapConsumerJsonParser = reRegDmaapConsumerJsonParser;
this.httpClientFactory = httpClientFactory;
+
+ httpClient = httpClientFactory.create(this.configuration.getDmaapReRegistrationConsumerConfiguration());
+ }
+
+ @PostConstruct
+ void registerForConfigChanges() {
+ configuration.register(this);
+ }
+
+ @PreDestroy
+ void unRegisterForConfigChanges() {
+ configuration.unRegister(this);
}
@Override
- public Flux<ReRegistrationConsumerDmaapModel> execute(String taskName) throws SSLException {
+ public synchronized void updateConfiguration() {
+ try {
+ LOGGER.info("DMaaP PNF reregistration consumer update due to new application configuration");
+ httpClient = httpClientFactory.create(this.configuration.getDmaapReRegistrationConsumerConfiguration());
+ } catch (SSLException e) {
+ LOGGER.error("Error while updating HTTP Client after a config update: SSL exception");
+ }
+ }
+
+ @Override
+ public Flux<ReRegistrationConsumerDmaapModel> execute(String taskName) {
LOGGER.debug("Executing task for Re-Registration with name \"{}\"", taskName);
- DMaaPConsumerReactiveHttpClient dmaaPConsumerReactiveHttpClient = resolveClient();
- Mono<String> response = dmaaPConsumerReactiveHttpClient.getDMaaPConsumerResponse();
+ DMaaPConsumerReactiveHttpClient httpClient = getHttpClient();
+ Mono<String> response = httpClient.getDMaaPConsumerResponse();
return reRegistrationDmaapConsumerJsonParser.extractModelFromDmaap(response)
- .switchIfEmpty(Flux.error(
- new EmptyDmaapResponseException("Re-Registration: Got an empty response from DMaaP")))
+ .switchIfEmpty(Flux.error(EMPTY_DMAAP_EXCEPTION))
.doOnError(e -> {
if (!(e instanceof EmptyDmaapResponseException)) {
LOGGER.error("DMaaP Consumption Exception: {}", e.getMessage());
@@ -74,8 +105,7 @@ public class DmaapReRegistrationConsumerTaskImpl implements DmaapReRegistrationC
});
}
- @Override
- public DMaaPConsumerReactiveHttpClient resolveClient() throws SSLException {
- return httpClientFactory.create(configuration.getDmaapReRegistrationConsumerConfiguration());
+ private synchronized DMaaPConsumerReactiveHttpClient getHttpClient() {
+ return httpClient;
}
}
diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/AaiReactiveClient.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/AaiReactiveClient.java
index 131551e5..e07de4bb 100644
--- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/AaiReactiveClient.java
+++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/AaiReactiveClient.java
@@ -69,7 +69,7 @@ public class AaiReactiveClient implements ConfigurationChangeObserver {
this.gson = gson;
this.sslFactory = new SslFactory();
- aaiClientConfiguration = configuration.getAaiClientConfiguration();
+ aaiClientConfiguration = this.configuration.getAaiClientConfiguration();
setupWebClient();
}
@@ -84,22 +84,24 @@ public class AaiReactiveClient implements ConfigurationChangeObserver {
}
@Override
- public void updateConfiguration(ApplicationConfiguration configuration) {
+ public void updateConfiguration() {
AaiClientConfiguration newConfiguration = configuration.getAaiClientConfiguration();
if (aaiClientConfiguration.equals(newConfiguration)) {
- LOGGER.debug("No Configuration changes necessary for AAI Reactive client");
+ LOGGER.info("No Configuration changes necessary for AAI Reactive client");
} else {
- LOGGER.debug("AAI Reactive client must be re-configured");
- aaiClientConfiguration = newConfiguration;
- try {
- setupWebClient();
- } catch (SSLException e) {
- LOGGER.error("AAI Reactive client error while re-configuring WebClient");
+ synchronized (this) {
+ LOGGER.info("AAI Reactive client must be re-configured");
+ aaiClientConfiguration = newConfiguration;
+ try {
+ setupWebClient();
+ } catch (SSLException e) {
+ LOGGER.error("AAI Reactive client error while re-configuring WebClient");
+ }
}
}
}
- private synchronized void setupWebClient() throws SSLException {
+ private void setupWebClient() throws SSLException {
SslContext sslContext = createSslContext();
ClientHttpConnector reactorClientHttpConnector = new ReactorClientHttpConnector(
@@ -127,8 +129,9 @@ public class AaiReactiveClient implements ConfigurationChangeObserver {
return performReactiveHttpGet(url, ServiceInstanceAaiObject.class);
}
- private synchronized <T> Mono<T> performReactiveHttpGet(String url, Class<T> responseType) {
+ private <T> Mono<T> performReactiveHttpGet(String url, Class<T> responseType) {
LOGGER.debug("Will issue Reactive GET request to URL ({}) for object ({})", url, responseType.getName());
+ WebClient webClient = getWebClient();
return webClient
.get()
.uri(url)
@@ -186,4 +189,8 @@ public class AaiReactiveClient implements ConfigurationChangeObserver {
}
return sslFactory.createInsecureContext();
}
+
+ private synchronized WebClient getWebClient() {
+ return webClient;
+ }
}
diff --git a/components/bbs-event-processor/src/main/resources/application.yml b/components/bbs-event-processor/src/main/resources/application.yml
index c9820ebf..9092adae 100644
--- a/components/bbs-event-processor/src/main/resources/application.yml
+++ b/components/bbs-event-processor/src/main/resources/application.yml
@@ -56,6 +56,12 @@ configs:
application:
pipelinesPollingIntervalSec: 30
pipelinesTimeoutSec: 15
+ policyVersion: 1.0.0.5
+ clTargetType: VM
+ clEventStatus: ONSET
+ clVersion: 1.0.2
+ clTarget: vserver.vserver-name
+ clOriginator: DCAE-bbs-event-processor
re-registration:
policyScope: policyScope
clControlName: controlName
diff --git a/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/config/ApplicationConfigurationTest.java b/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/config/ApplicationConfigurationTest.java
index 59feacaa..7b81b1be 100644
--- a/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/config/ApplicationConfigurationTest.java
+++ b/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/config/ApplicationConfigurationTest.java
@@ -95,6 +95,12 @@ import org.springframework.test.context.TestPropertySource;
"configs.security.enableAaiCertAuth=true",
"configs.application.pipelinesPollingIntervalSec=30",
"configs.application.pipelinesTimeoutSec=15",
+ "configs.application.policyVersion=1.0.0",
+ "configs.application.clTargetType=VM",
+ "configs.application.clEventStatus=ONSET",
+ "configs.application.clVersion=1.0.2",
+ "configs.application.clTarget=vserver.vserver-name",
+ "configs.application.clOriginator=DCAE-bbs-event-processor",
"configs.application.re-registration.policyScope=reRegPolicyScope",
"configs.application.re-registration.clControlName=reRegControlName",
"configs.application.cpe-authentication.policyScope=cpeAuthPolicyScope",
@@ -191,6 +197,12 @@ class ApplicationConfigurationTest {
assertAll("Generic Application Properties",
() -> assertEquals(30, configuration.getPipelinesPollingIntervalInSeconds()),
() -> assertEquals(15, configuration.getPipelinesTimeoutInSeconds()),
+ () -> assertEquals("1.0.0", configuration.getPolicyVersion()),
+ () -> assertEquals("VM", configuration.getCloseLoopTargetType()),
+ () -> assertEquals("ONSET", configuration.getCloseLoopEventStatus()),
+ () -> assertEquals("1.0.2", configuration.getCloseLoopVersion()),
+ () -> assertEquals("vserver.vserver-name", configuration.getCloseLoopTarget()),
+ () -> assertEquals("DCAE-bbs-event-processor", configuration.getCloseLoopOriginator()),
() -> assertEquals("reRegPolicyScope", configuration.getReRegistrationCloseLoopPolicyScope()),
() -> assertEquals("cpeAuthPolicyScope", configuration.getCpeAuthenticationCloseLoopPolicyScope()),
() -> assertEquals("reRegControlName", configuration.getReRegistrationCloseLoopControlName()),
@@ -261,6 +273,12 @@ class ApplicationConfigurationTest {
.pipelinesPollingIntervalSec(20)
.pipelinesTimeoutSec(20)
.cbsPollingIntervalSec(180)
+ .policyVersion("2.0.0")
+ .closeLoopTargetType("VM2")
+ .closeLoopEventStatus("ONSET-update")
+ .closeLoopVersion("2.0.2")
+ .closeLoopTarget("Target-update")
+ .closeLoopOriginator("Originator-update")
.reRegistrationPolicyScope("policyScope-update")
.reRegistrationClControlName("controlName-update")
.cpeAuthPolicyScope("policyScope-update")
@@ -341,6 +359,12 @@ class ApplicationConfigurationTest {
() -> assertEquals(20, configuration.getPipelinesPollingIntervalInSeconds()),
() -> assertEquals(20, configuration.getPipelinesTimeoutInSeconds()),
() -> assertEquals(180, configuration.getCbsPollingInterval()),
+ () -> assertEquals("2.0.0", configuration.getPolicyVersion()),
+ () -> assertEquals("VM2", configuration.getCloseLoopTargetType()),
+ () -> assertEquals("ONSET-update", configuration.getCloseLoopEventStatus()),
+ () -> assertEquals("2.0.2", configuration.getCloseLoopVersion()),
+ () -> assertEquals("Target-update", configuration.getCloseLoopTarget()),
+ () -> assertEquals("Originator-update", configuration.getCloseLoopOriginator()),
() -> assertEquals("policyScope-update", configuration.getReRegistrationCloseLoopPolicyScope()),
() -> assertEquals("policyScope-update", configuration.getCpeAuthenticationCloseLoopPolicyScope()),
() -> assertEquals("controlName-update", configuration.getReRegistrationCloseLoopControlName()),
diff --git a/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/config/ConsulConfigurationGatewayTest.java b/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/config/ConsulConfigurationGatewayTest.java
index ee75926b..cd20d1e1 100644
--- a/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/config/ConsulConfigurationGatewayTest.java
+++ b/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/config/ConsulConfigurationGatewayTest.java
@@ -74,6 +74,12 @@ class ConsulConfigurationGatewayTest {
+ "\"application.reregistration.clControlName\": \"controlName\","
+ "\"application.cpe.authentication.policyScope\": \"policyScope\","
+ "\"application.cpe.authentication.clControlName\": \"controlName\","
+ + "\"application.policyVersion\": \"1.0\","
+ + "\"application.clTargetType\": \"VM\","
+ + "\"application.clEventStatus\": \"ONSET\","
+ + "\"application.clVersion\": \"1.0.2\","
+ + "\"application.clTarget\": \"vserver.vserver-name\","
+ + "\"application.clOriginator\": \"DCAE-bbs-event-processor\","
+ "\"application.reregistration.configKey\": \"config_key_2\","
+ "\"application.cpeAuth.configKey\": \"config_key_1\","
+ "\"application.closeLoop.configKey\": \"config_key_3\","
@@ -186,6 +192,12 @@ class ConsulConfigurationGatewayTest {
.reRegistrationClControlName("controlName")
.cpeAuthPolicyScope("policyScope")
.cpeAuthClControlName("controlName")
+ .policyVersion("1.0")
+ .closeLoopTargetType("VM")
+ .closeLoopEventStatus("ONSET")
+ .closeLoopVersion("1.0.2")
+ .closeLoopTarget("vserver.vserver-name")
+ .closeLoopOriginator("DCAE-bbs-event-processor")
.reRegConfigKey("config_key_2")
.cpeAuthConfigKey("config_key_1")
.closeLoopConfigKey("config_key_3")
diff --git a/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/config/GenericPropertiesTest.java b/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/config/GenericPropertiesTest.java
index 12e393a1..91ebed8d 100644
--- a/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/config/GenericPropertiesTest.java
+++ b/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/config/GenericPropertiesTest.java
@@ -36,6 +36,12 @@ import org.springframework.test.context.TestPropertySource;
@TestPropertySource(properties = {
"configs.application.pipelinesPollingIntervalSec=30",
"configs.application.pipelinesTimeoutSec=15",
+ "configs.application.policyVersion=1.0.0",
+ "configs.application.clTargetType=VM",
+ "configs.application.clEventStatus=ONSET",
+ "configs.application.clVersion=1.0.2",
+ "configs.application.clTarget=vserver.vserver-name",
+ "configs.application.clOriginator=DCAE-bbs-event-processor",
"configs.application.re-registration.policyScope=reRegPolicyScope",
"configs.application.re-registration.clControlName=reRegControlName",
"configs.application.cpe-authentication.policyScope=cpeAuthPolicyScope",
@@ -54,6 +60,12 @@ class GenericPropertiesTest {
assertAll("Generic Application Properties",
() -> assertEquals(30, genericProperties.getPipelinesPollingIntervalSec()),
() -> assertEquals(15, genericProperties.getPipelinesTimeoutSec()),
+ () -> assertEquals("1.0.0", genericProperties.getPolicyVersion()),
+ () -> assertEquals("VM", genericProperties.getClTargetType()),
+ () -> assertEquals("ONSET", genericProperties.getClEventStatus()),
+ () -> assertEquals("1.0.2", genericProperties.getClVersion()),
+ () -> assertEquals("vserver.vserver-name", genericProperties.getClTarget()),
+ () -> assertEquals("DCAE-bbs-event-processor", genericProperties.getClOriginator()),
() -> assertEquals("reRegPolicyScope", genericProperties.getReRegistration().getPolicyScope()),
() -> assertEquals("cpeAuthPolicyScope",
genericProperties.getCpeAuthentication().getPolicyScope()),
diff --git a/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/pipelines/CpeAuthenticationPipelineTest.java b/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/pipelines/CpeAuthenticationPipelineTest.java
index dbac5bf2..76d9659c 100644
--- a/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/pipelines/CpeAuthenticationPipelineTest.java
+++ b/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/pipelines/CpeAuthenticationPipelineTest.java
@@ -98,6 +98,18 @@ class CpeAuthenticationPipelineTest {
.thenReturn("controlName");
when(configuration.getCpeAuthenticationCloseLoopPolicyScope())
.thenReturn("policyScope");
+ when(configuration.getPolicyVersion())
+ .thenReturn("1.0.0");
+ when(configuration.getCloseLoopTargetType())
+ .thenReturn("VM");
+ when(configuration.getCloseLoopEventStatus())
+ .thenReturn("ONSET");
+ when(configuration.getCloseLoopVersion())
+ .thenReturn("1.0.2");
+ when(configuration.getCloseLoopTarget())
+ .thenReturn("CL-Target");
+ when(configuration.getCloseLoopOriginator())
+ .thenReturn("DCAE-BBS-ep");
pipeline = new CpeAuthenticationPipeline(configuration, consumerTask,
publisherTask, aaiClientTask, new HashMap<>());
diff --git a/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/pipelines/ReRegistrationPipelineTest.java b/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/pipelines/ReRegistrationPipelineTest.java
index dbd1aab1..a1b6b148 100644
--- a/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/pipelines/ReRegistrationPipelineTest.java
+++ b/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/pipelines/ReRegistrationPipelineTest.java
@@ -98,6 +98,18 @@ class ReRegistrationPipelineTest {
.thenReturn("controlName");
when(configuration.getReRegistrationCloseLoopPolicyScope())
.thenReturn("policyScope");
+ when(configuration.getPolicyVersion())
+ .thenReturn("1.0.0");
+ when(configuration.getCloseLoopTargetType())
+ .thenReturn("VM");
+ when(configuration.getCloseLoopEventStatus())
+ .thenReturn("ONSET");
+ when(configuration.getCloseLoopVersion())
+ .thenReturn("1.0.2");
+ when(configuration.getCloseLoopTarget())
+ .thenReturn("CL-Target");
+ when(configuration.getCloseLoopOriginator())
+ .thenReturn("DCAE-BBS-ep");
pipeline = new ReRegistrationPipeline(configuration, consumerTask,
publisherTask, aaiClientTask, new HashMap<>());