From f9e1cf6e9e91e587b9d387dc994058bcfdfc8c20 Mon Sep 17 00:00:00 2001 From: Stavros Kanarakis Date: Tue, 26 Mar 2019 15:22:38 +0200 Subject: Bug fixes and performance improvements When facing any kind of errors on retrieving PNF and service-instance objects from A&AI, BBSep now logs the error and keeps the reactive stream active. Synchronized access in tasks when there is a configuration update. Performance improvements in polling from DMaaP. Change-Id: I654fd1a7267f2b723cc66b0a93e4855003af2914 Issue-ID: DCAEGEN2-1354 Signed-off-by: Stavros Kanarakis --- .../dpo/blueprints/bbs-event-processor-input.yaml | 2 +- .../k8s-bbs-event-processor.yaml-template | 36 ++++- .../dpo/spec/bbs-event-processor-spec.json | 70 +++++++-- .../processor/config/ApplicationConfiguration.java | 171 ++++++++++++--------- .../processor/config/ApplicationConstants.java | 5 +- .../config/ConfigurationChangeObserver.java | 5 +- .../config/ConsulConfigurationGateway.java | 46 ++++-- .../event/processor/config/GenericProperties.java | 18 +++ .../processor/model/GeneratedAppConfigObject.java | 21 ++- .../pipelines/CpeAuthenticationPipeline.java | 40 ++--- .../pipelines/ReRegistrationPipeline.java | 42 +++-- .../bbs/event/processor/pipelines/Scheduler.java | 10 +- .../bbs/event/processor/tasks/AaiClientTask.java | 3 - .../event/processor/tasks/AaiClientTaskImpl.java | 11 +- .../tasks/DmaapCpeAuthenticationConsumerTask.java | 3 - .../DmaapCpeAuthenticationConsumerTaskImpl.java | 53 +++++-- .../event/processor/tasks/DmaapPublisherTask.java | 3 - .../processor/tasks/DmaapPublisherTaskImpl.java | 37 ++++- .../tasks/DmaapReRegistrationConsumerTask.java | 3 - .../tasks/DmaapReRegistrationConsumerTaskImpl.java | 54 +++++-- .../processor/utilities/AaiReactiveClient.java | 29 ++-- .../src/main/resources/application.yml | 6 + .../config/ApplicationConfigurationTest.java | 24 +++ .../config/ConsulConfigurationGatewayTest.java | 12 ++ .../processor/config/GenericPropertiesTest.java | 12 ++ .../pipelines/CpeAuthenticationPipelineTest.java | 12 ++ .../pipelines/ReRegistrationPipelineTest.java | 12 ++ 27 files changed, 523 insertions(+), 217 deletions(-) (limited to 'components/bbs-event-processor') diff --git a/components/bbs-event-processor/dpo/blueprints/bbs-event-processor-input.yaml b/components/bbs-event-processor/dpo/blueprints/bbs-event-processor-input.yaml index 97bb138e..36e69cf6 100644 --- a/components/bbs-event-processor/dpo/blueprints/bbs-event-processor-input.yaml +++ b/components/bbs-event-processor/dpo/blueprints/bbs-event-processor-input.yaml @@ -25,5 +25,5 @@ application_rereg_cl_control_name: clControlNameReReg application_cpeAuth_policy_scope: policyScopeCpeAuth application_cpeAuth_cl_control_name: clControlNameCpeAuth application_cbs_polling_interval_sec: 120 -dmaap_consumer_id: c12 +dmaap_consumer_id: c12 dmaap_consumer_group: OpenDcae-c12 diff --git a/components/bbs-event-processor/dpo/blueprints/k8s-bbs-event-processor.yaml-template b/components/bbs-event-processor/dpo/blueprints/k8s-bbs-event-processor.yaml-template index 3468d7f6..e9550314 100644 --- a/components/bbs-event-processor/dpo/blueprints/k8s-bbs-event-processor.yaml-template +++ b/components/bbs-event-processor/dpo/blueprints/k8s-bbs-event-processor.yaml-template @@ -21,7 +21,7 @@ tosca_definitions_version: cloudify_dsl_1_3 imports: - "http://www.getcloudify.org/spec/cloudify/3.4/types.yaml" - - {{ ONAPTEMPLATE_RAWREPOURL_org_onap_dcaegen2_platform_plugins_releases }}/k8splugin/1.4.5/k8splugin_types.yaml + - https://nexus.onap.org/service/local/repositories/raw/content/org.onap.dcaegen2.platform.plugins/R3/k8splugin/1.4.4/k8splugin_types.yaml inputs: aai_enrichment_host: @@ -47,7 +47,31 @@ inputs: cpe_authentication_url: type: string close_loop_url: - type: string + type: string + application_policy_version: + description: Policy version value for building CL events + type: string + default: "1.0.0.5" + application_cl_target_type: + description: Close Loop target type value for building CL events + type: string + default: "VM" + application_cl_event_status: + description: Close Loop event status value for building CL events + type: string + default: "ONSET" + application_cl_version: + description: Close Loop version value for building CL events + type: string + default: "1.0.2" + application_cl_target: + description: Close Loop target value for building CL events + type: string + default: "vserver.vserver-name" + application_cl_originator: + description: Close Loop originator value for building CL events + type: string + default: "DCAE-BBS-ep" application_rereg_policy_scope: description: Policy Scope value for building PNF relocation CL event type: string @@ -101,6 +125,12 @@ node_templates: application.pipelinesPollingIntervalSec: 30 application.pipelinesTimeoutSec: 15 application.cbsPollingIntervalSec: { get_input: application_cbs_polling_interval_sec } + application.policyVersion: { get_input: application_policy_version } + application.clTargetType: { get_input: application_cl_target_type } + application.clEventStatus: { get_input: application_cl_event_status } + application.clVersion: { get_input: application_cl_version } + application.clTarget: { get_input: application_cl_target } + application.clOriginator: { get_input: application_cl_originator } application.reregistration.policyScope: { get_input: application_rereg_policy_scope } application.reregistration.clControlName: { get_input: application_rereg_cl_control_name } application.cpe.authentication.policyScope: { get_input: application_cpeAuth_policy_scope } @@ -127,4 +157,4 @@ node_templates: log_directory: "/opt/app/bbs-event-processor/logs" tls_info: cert_directory: '/opt/app/bbs-event-processor/etc/cert/' - use_tls: false + use_tls: false \ No newline at end of file diff --git a/components/bbs-event-processor/dpo/spec/bbs-event-processor-spec.json b/components/bbs-event-processor/dpo/spec/bbs-event-processor-spec.json index b1329bef..8710a877 100644 --- a/components/bbs-event-processor/dpo/spec/bbs-event-processor-spec.json +++ b/components/bbs-event-processor/dpo/spec/bbs-event-processor-spec.json @@ -52,7 +52,7 @@ "designer_editable": true, "policy_editable": false, "sourced_at_deployment": false, - "description": "DmaaP protocol used for any DMaaP interaction" + "description": "DMaaP protocol used for any DMaaP interaction" }, { "name": "dmaap.contentType", @@ -60,7 +60,7 @@ "designer_editable": true, "policy_editable": false, "sourced_at_deployment": false, - "description": "DmaaP content type" + "description": "DMaaP content type" }, { "name": "dmaap.consumer.consumerId", @@ -68,7 +68,7 @@ "designer_editable": true, "policy_editable": false, "sourced_at_deployment": true, - "description": "DmaaP consumer consumer ID" + "description": "DMaaP consumer consumer ID" }, { "name": "dmaap.consumer.consumerGroup", @@ -76,7 +76,7 @@ "designer_editable": true, "policy_editable": false, "sourced_at_deployment": true, - "description": "DmaaP consumer consumer group" + "description": "DMaaP consumer consumer group" }, { "name": "dmaap.messageLimit", @@ -84,7 +84,7 @@ "designer_editable": true, "policy_editable": false, "sourced_at_deployment": false, - "description": "DmaaP message limit" + "description": "DMaaP message limit" }, { "name": "dmaap.timeoutMs", @@ -92,7 +92,7 @@ "designer_editable": true, "policy_editable": false, "sourced_at_deployment": false, - "description": "DmaaP timeout in millis" + "description": "DMaaP timeout in millis" }, { "name": "aai.host", @@ -168,7 +168,7 @@ "value": 300, "constraints": [ { - "greater_or_equal": 20 + "greater_or_equal": 30 } ], "designer_editable": true, @@ -176,13 +176,61 @@ "sourced_at_deployment": true, "description": "Polling interval in seconds for fetching configuration from Consul via CBS service" }, + { + "name": "application.policyVersion", + "value": "1.0.0.5", + "designer_editable": false, + "policy_editable": true, + "sourced_at_deployment": true, + "description": "Policy Version parameter in Policy triggering event to be published" + }, + { + "name": "application.clTargetType", + "value": "VM", + "designer_editable": false, + "policy_editable": true, + "sourced_at_deployment": true, + "description": "Close Loop target type parameter in Policy triggering event to be published" + }, + { + "name": "application.clEventStatus", + "value": "ONSET", + "designer_editable": false, + "policy_editable": true, + "sourced_at_deployment": true, + "description": "Close Loop event status parameter in Policy triggering event to be published" + }, + { + "name": "application.clVersion", + "value": "1.0.2", + "designer_editable": false, + "policy_editable": true, + "sourced_at_deployment": true, + "description": "Close Loop version parameter in Policy triggering event to be published" + }, + { + "name": "application.clTarget", + "value": "vserver.vserver-name", + "designer_editable": false, + "policy_editable": true, + "sourced_at_deployment": true, + "description": "Close Loop target parameter in Policy triggering event to be published" + }, + { + "name": "application.clOriginator", + "value": "DCAE-BBS-ep", + "designer_editable": false, + "policy_editable": true, + "sourced_at_deployment": true, + "description": "Close Loop originator parameter in Policy triggering event to be published" + }, { "name": "application.reregistration.policyScope", "value": "policyScope", "designer_editable": false, "policy_editable": true, "sourced_at_deployment": true, - "description": "Hard-coded value for Policy Scope parameter in Policy triggering event to be published" + "description": "Policy Scope parameter in Policy triggering event to be published" }, { "name": "application.reregistration.clControlName", @@ -190,7 +238,7 @@ "designer_editable": false, "policy_editable": true, "sourced_at_deployment": true, - "description": "Hard-coded value for CL control name parameter in Policy triggering event to be published" + "description": "Close Loop control name parameter in Policy triggering event to be published" }, { "name": "application.cpe.authentication.policyScope", @@ -198,7 +246,7 @@ "designer_editable": false, "policy_editable": true, "sourced_at_deployment": true, - "description": "Hard-coded value for CL control nam parameter in Policy triggering event to be published" + "description": "Policy Scope parameter in Policy triggering event to be published" }, { "name": "application.cpe.authentication.clControlName", @@ -206,7 +254,7 @@ "designer_editable": false, "policy_editable": true, "sourced_at_deployment": true, - "description": "Hard-coded value for CL control nam parameter in Policy triggering event to be published" + "description": "Close Loop control nam parameter in Policy triggering event to be published" }, { "name": "application.reregistration.configKey", diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/config/ApplicationConfiguration.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/config/ApplicationConfiguration.java index 5277f3cd..69fa83d2 100644 --- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/config/ApplicationConfiguration.java +++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/config/ApplicationConfiguration.java @@ -93,51 +93,74 @@ public class ApplicationConfiguration implements ConfigurationChangeObservable { @Override public synchronized void notifyObservers() { - observers.forEach(o -> o.updateConfiguration(this)); + observers.forEach(ConfigurationChangeObserver::updateConfiguration); } - public DmaapConsumerConfiguration getDmaapReRegistrationConsumerConfiguration() { + public synchronized DmaapConsumerConfiguration getDmaapReRegistrationConsumerConfiguration() { return dmaapReRegistrationConsumerConfiguration; } - public DmaapConsumerConfiguration getDmaapCpeAuthenticationConsumerConfiguration() { + public synchronized DmaapConsumerConfiguration getDmaapCpeAuthenticationConsumerConfiguration() { return dmaapCpeAuthenticationConsumerConfiguration; } - public DmaapPublisherConfiguration getDmaapPublisherConfiguration() { + public synchronized DmaapPublisherConfiguration getDmaapPublisherConfiguration() { return dmaapPublisherConfiguration; } - public AaiClientConfiguration getAaiClientConfiguration() { + public synchronized AaiClientConfiguration getAaiClientConfiguration() { return aaiClientConfiguration; } - public int getPipelinesPollingIntervalInSeconds() { + public synchronized int getPipelinesPollingIntervalInSeconds() { return genericProperties.getPipelinesPollingIntervalSec(); } - public int getPipelinesTimeoutInSeconds() { + public synchronized int getPipelinesTimeoutInSeconds() { return genericProperties.getPipelinesTimeoutSec(); } - public int getCbsPollingInterval() { - return cbsPollingInterval; + public synchronized String getPolicyVersion() { + return genericProperties.getPolicyVersion(); + } + + public synchronized String getCloseLoopTargetType() { + return genericProperties.getClTargetType(); + } + + public synchronized String getCloseLoopEventStatus() { + return genericProperties.getClEventStatus(); + } + + public synchronized String getCloseLoopVersion() { + return genericProperties.getClVersion(); + } + + public synchronized String getCloseLoopTarget() { + return genericProperties.getClTarget(); } - public String getReRegistrationCloseLoopPolicyScope() { + public String getCloseLoopOriginator() { + return genericProperties.getClOriginator(); + } + public synchronized int getCbsPollingInterval() { + return cbsPollingInterval; + } + + public synchronized String getReRegistrationCloseLoopPolicyScope() { return genericProperties.getReRegistration().getPolicyScope(); } - public String getReRegistrationCloseLoopControlName() { + public synchronized String getReRegistrationCloseLoopControlName() { return genericProperties.getReRegistration().getClControlName(); } - public String getCpeAuthenticationCloseLoopPolicyScope() { + public synchronized String getCpeAuthenticationCloseLoopPolicyScope() { return genericProperties.getCpeAuthentication().getPolicyScope(); } - public String getCpeAuthenticationCloseLoopControlName() { + public synchronized String getCpeAuthenticationCloseLoopControlName() { return genericProperties.getCpeAuthentication().getClControlName(); } @@ -147,61 +170,71 @@ public class ApplicationConfiguration implements ConfigurationChangeObservable { */ public void updateCurrentConfiguration(GeneratedAppConfigObject newConfiguration) { - cbsPollingInterval = newConfiguration.cbsPollingIntervalSec(); - - GeneratedAppConfigObject.StreamsObject reRegObject = getStreamsObject(newConfiguration.streamSubscribesMap(), - newConfiguration.reRegConfigKey(), "PNF Re-Registration"); - TopicUrlInfo topicUrlInfo = parseTopicUrl(reRegObject.dmaapInfo().topicUrl()); - dmaapReRegistrationConsumerProperties.setDmaapHostName(topicUrlInfo.getHost()); - dmaapReRegistrationConsumerProperties.setDmaapPortNumber(topicUrlInfo.getPort()); - dmaapReRegistrationConsumerProperties.setDmaapProtocol(newConfiguration.dmaapProtocol()); - dmaapReRegistrationConsumerProperties.setDmaapContentType(newConfiguration.dmaapContentType()); - dmaapReRegistrationConsumerProperties.setDmaapTopicName(topicUrlInfo.getTopicName()); - dmaapReRegistrationConsumerProperties.setConsumerId(newConfiguration.dmaapConsumerConsumerId()); - dmaapReRegistrationConsumerProperties.setConsumerGroup(newConfiguration.dmaapConsumerConsumerGroup()); - dmaapReRegistrationConsumerProperties.setMessageLimit(newConfiguration.dmaapMessageLimit()); - dmaapReRegistrationConsumerProperties.setTimeoutMs(newConfiguration.dmaapTimeoutMs()); - constructDmaapReRegistrationConfiguration(); - - GeneratedAppConfigObject.StreamsObject cpeAuthObject = getStreamsObject(newConfiguration.streamSubscribesMap(), - newConfiguration.cpeAuthConfigKey(), "CPE Authentication"); - topicUrlInfo = parseTopicUrl(cpeAuthObject.dmaapInfo().topicUrl()); - dmaapCpeAuthenticationConsumerProperties.setDmaapHostName(topicUrlInfo.getHost()); - dmaapCpeAuthenticationConsumerProperties.setDmaapPortNumber(topicUrlInfo.getPort()); - dmaapCpeAuthenticationConsumerProperties.setDmaapProtocol(newConfiguration.dmaapProtocol()); - dmaapCpeAuthenticationConsumerProperties.setDmaapContentType(newConfiguration.dmaapContentType()); - dmaapCpeAuthenticationConsumerProperties.setDmaapTopicName(topicUrlInfo.getTopicName()); - dmaapCpeAuthenticationConsumerProperties.setConsumerId(newConfiguration.dmaapConsumerConsumerId()); - dmaapCpeAuthenticationConsumerProperties.setConsumerGroup(newConfiguration.dmaapConsumerConsumerGroup()); - dmaapCpeAuthenticationConsumerProperties.setMessageLimit(newConfiguration.dmaapMessageLimit()); - dmaapCpeAuthenticationConsumerProperties.setTimeoutMs(newConfiguration.dmaapTimeoutMs()); - constructDmaapCpeAuthenticationConfiguration(); - - GeneratedAppConfigObject.StreamsObject closeLoopObject = getStreamsObject(newConfiguration.streamPublishesMap(), - newConfiguration.closeLoopConfigKey(), "Close Loop"); - topicUrlInfo = parseTopicUrl(closeLoopObject.dmaapInfo().topicUrl()); - dmaapProducerProperties.setDmaapHostName(topicUrlInfo.getHost()); - dmaapProducerProperties.setDmaapPortNumber(topicUrlInfo.getPort()); - dmaapProducerProperties.setDmaapProtocol(newConfiguration.dmaapProtocol()); - dmaapProducerProperties.setDmaapContentType(newConfiguration.dmaapContentType()); - dmaapProducerProperties.setDmaapTopicName(topicUrlInfo.getTopicName()); - constructDmaapProducerConfiguration(); - - aaiClientProperties.setAaiHost(newConfiguration.aaiHost()); - aaiClientProperties.setAaiPort(newConfiguration.aaiPort()); - aaiClientProperties.setAaiProtocol(newConfiguration.aaiProtocol()); - aaiClientProperties.setAaiUserName(newConfiguration.aaiUsername()); - aaiClientProperties.setAaiUserPassword(newConfiguration.aaiPassword()); - aaiClientProperties.setAaiIgnoreSslCertificateErrors(newConfiguration.aaiIgnoreSslCertificateErrors()); - constructAaiConfiguration(); - - - genericProperties.setPipelinesPollingIntervalSec(newConfiguration.pipelinesPollingIntervalSec()); - genericProperties.setPipelinesTimeoutSec(newConfiguration.pipelinesTimeoutSec()); - genericProperties.getReRegistration().setPolicyScope(newConfiguration.reRegistrationPolicyScope()); - genericProperties.getReRegistration().setClControlName(newConfiguration.reRegistrationClControlName()); - genericProperties.getCpeAuthentication().setPolicyScope(newConfiguration.cpeAuthPolicyScope()); - genericProperties.getCpeAuthentication().setClControlName(newConfiguration.cpeAuthClControlName()); + synchronized (this) { + cbsPollingInterval = newConfiguration.cbsPollingIntervalSec(); + + GeneratedAppConfigObject.StreamsObject reRegObject = + getStreamsObject(newConfiguration.streamSubscribesMap(), newConfiguration.reRegConfigKey(), + "PNF Re-Registration"); + TopicUrlInfo topicUrlInfo = parseTopicUrl(reRegObject.dmaapInfo().topicUrl()); + dmaapReRegistrationConsumerProperties.setDmaapHostName(topicUrlInfo.getHost()); + dmaapReRegistrationConsumerProperties.setDmaapPortNumber(topicUrlInfo.getPort()); + dmaapReRegistrationConsumerProperties.setDmaapProtocol(newConfiguration.dmaapProtocol()); + dmaapReRegistrationConsumerProperties.setDmaapContentType(newConfiguration.dmaapContentType()); + dmaapReRegistrationConsumerProperties.setDmaapTopicName(topicUrlInfo.getTopicName()); + dmaapReRegistrationConsumerProperties.setConsumerId(newConfiguration.dmaapConsumerConsumerId()); + dmaapReRegistrationConsumerProperties.setConsumerGroup(newConfiguration.dmaapConsumerConsumerGroup()); + dmaapReRegistrationConsumerProperties.setMessageLimit(newConfiguration.dmaapMessageLimit()); + dmaapReRegistrationConsumerProperties.setTimeoutMs(newConfiguration.dmaapTimeoutMs()); + constructDmaapReRegistrationConfiguration(); + + GeneratedAppConfigObject.StreamsObject cpeAuthObject = + getStreamsObject(newConfiguration.streamSubscribesMap(), newConfiguration.cpeAuthConfigKey(), + "CPE Authentication"); + topicUrlInfo = parseTopicUrl(cpeAuthObject.dmaapInfo().topicUrl()); + dmaapCpeAuthenticationConsumerProperties.setDmaapHostName(topicUrlInfo.getHost()); + dmaapCpeAuthenticationConsumerProperties.setDmaapPortNumber(topicUrlInfo.getPort()); + dmaapCpeAuthenticationConsumerProperties.setDmaapProtocol(newConfiguration.dmaapProtocol()); + dmaapCpeAuthenticationConsumerProperties.setDmaapContentType(newConfiguration.dmaapContentType()); + dmaapCpeAuthenticationConsumerProperties.setDmaapTopicName(topicUrlInfo.getTopicName()); + dmaapCpeAuthenticationConsumerProperties.setConsumerId(newConfiguration.dmaapConsumerConsumerId()); + dmaapCpeAuthenticationConsumerProperties.setConsumerGroup(newConfiguration.dmaapConsumerConsumerGroup()); + dmaapCpeAuthenticationConsumerProperties.setMessageLimit(newConfiguration.dmaapMessageLimit()); + dmaapCpeAuthenticationConsumerProperties.setTimeoutMs(newConfiguration.dmaapTimeoutMs()); + constructDmaapCpeAuthenticationConfiguration(); + + GeneratedAppConfigObject.StreamsObject closeLoopObject = + getStreamsObject(newConfiguration.streamPublishesMap(), newConfiguration.closeLoopConfigKey(), + "Close Loop"); + topicUrlInfo = parseTopicUrl(closeLoopObject.dmaapInfo().topicUrl()); + dmaapProducerProperties.setDmaapHostName(topicUrlInfo.getHost()); + dmaapProducerProperties.setDmaapPortNumber(topicUrlInfo.getPort()); + dmaapProducerProperties.setDmaapProtocol(newConfiguration.dmaapProtocol()); + dmaapProducerProperties.setDmaapContentType(newConfiguration.dmaapContentType()); + dmaapProducerProperties.setDmaapTopicName(topicUrlInfo.getTopicName()); + constructDmaapProducerConfiguration(); + + aaiClientProperties.setAaiHost(newConfiguration.aaiHost()); + aaiClientProperties.setAaiPort(newConfiguration.aaiPort()); + aaiClientProperties.setAaiProtocol(newConfiguration.aaiProtocol()); + aaiClientProperties.setAaiUserName(newConfiguration.aaiUsername()); + aaiClientProperties.setAaiUserPassword(newConfiguration.aaiPassword()); + aaiClientProperties.setAaiIgnoreSslCertificateErrors(newConfiguration.aaiIgnoreSslCertificateErrors()); + constructAaiConfiguration(); + + genericProperties.setPipelinesPollingIntervalSec(newConfiguration.pipelinesPollingIntervalSec()); + genericProperties.setPipelinesTimeoutSec(newConfiguration.pipelinesTimeoutSec()); + genericProperties.setPolicyVersion(newConfiguration.policyVersion()); + genericProperties.setClTargetType(newConfiguration.closeLoopTargetType()); + genericProperties.setClEventStatus(newConfiguration.closeLoopEventStatus()); + genericProperties.setClVersion(newConfiguration.closeLoopVersion()); + genericProperties.setClTarget(newConfiguration.closeLoopTarget()); + genericProperties.setClOriginator(newConfiguration.closeLoopOriginator()); + genericProperties.getReRegistration().setPolicyScope(newConfiguration.reRegistrationPolicyScope()); + genericProperties.getReRegistration().setClControlName(newConfiguration.reRegistrationClControlName()); + genericProperties.getCpeAuthentication().setPolicyScope(newConfiguration.cpeAuthPolicyScope()); + genericProperties.getCpeAuthentication().setClControlName(newConfiguration.cpeAuthClControlName()); + } notifyObservers(); } @@ -210,7 +243,7 @@ public class ApplicationConfiguration implements ConfigurationChangeObservable { private GeneratedAppConfigObject.StreamsObject getStreamsObject( Map map, String configKey, String messageName) { GeneratedAppConfigObject.StreamsObject streamsObject = map.get(configKey); - if (!streamsObject.type().equals(STREAMS_TYPE)) { + if (!STREAMS_TYPE.equals(streamsObject.type())) { throw new ApplicationEnvironmentException(String.format("%s requires information about" + " message-router topic in ONAP", messageName)); } diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/config/ApplicationConstants.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/config/ApplicationConstants.java index 5718432e..d2ac3c10 100644 --- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/config/ApplicationConstants.java +++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/config/ApplicationConstants.java @@ -27,10 +27,13 @@ public class ApplicationConstants { public static final String RETRIEVE_PNF_TASK_NAME = "PNF Retrieval"; public static final String RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME = "HSI CFS Service Instance Retrieval"; - public static final String STREAMS_TYPE = "message_router"; + static final String STREAMS_TYPE = "message_router"; public static final String IN_SERVICE_NAME_IN_ONAP = "inService"; public static final String OUT_OF_SERVICE_NAME_IN_ONAP = "outOfService"; + // Close Loop Constants + public static final String DCAE_BBS_EVENT_PROCESSOR_MS_INSTANCE = "DCAE.BBS_event_processor_mSInstance"; + private ApplicationConstants() {} } \ No newline at end of file diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/config/ConfigurationChangeObserver.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/config/ConfigurationChangeObserver.java index d007c982..c246aaed 100644 --- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/config/ConfigurationChangeObserver.java +++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/config/ConfigurationChangeObserver.java @@ -23,8 +23,7 @@ package org.onap.bbs.event.processor.config; public interface ConfigurationChangeObserver { /** - * Take actions upon new application configuration. - * @param configuration new application configuration (complete configuration) + * Take actions upon updates in application configuration. */ - void updateConfiguration(ApplicationConfiguration configuration); + void updateConfiguration(); } diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/config/ConsulConfigurationGateway.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/config/ConsulConfigurationGateway.java index 315fc793..6530f0b2 100644 --- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/config/ConsulConfigurationGateway.java +++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/config/ConsulConfigurationGateway.java @@ -79,17 +79,7 @@ public class ConsulConfigurationGateway { getMissingEnvironmentVariables())); } - RequestDiagnosticContext diagnosticContext = RequestDiagnosticContext.create(); - - // Necessary properties from the environment (Consul host:port, service-name (hostname), CBS name) - EnvProperties env = EnvProperties.fromEnvironment(); - - // Create the client and use it to get the configuration - cbsFetchPipeline = CbsClientFactory.createCbsClient(env) - .doOnError(e -> LOGGER.warn("CBS Configuration fetch failed with error: {}", e)) - .retry(e -> true) - .flatMapMany(cbsClient -> cbsClient.updates(diagnosticContext, initialDelay, period)) - .subscribe(this::parseConsulRetrievedConfiguration, this::handleErrors); + fetchConfig(initialDelay, period); } boolean environmentNotReady() { @@ -121,6 +111,25 @@ public class ConsulConfigurationGateway { private void handleErrors(Throwable throwable) { LOGGER.error("Periodic CBS configuration polling was terminated with error: {}", throwable); + LOGGER.info("Will restart CBS configuration fetching job due to abnormal termination." + + " Will start fetching after 60 seconds (please correct configuration in the meantime)" + + " and it will then poll every {} seconds (reverting to default)", + configuration.getCbsPollingInterval()); + fetchConfig(Duration.ofSeconds(60), Duration.ofSeconds(configuration.getCbsPollingInterval())); + } + + private void fetchConfig(Duration initialDelay, Duration period) { + RequestDiagnosticContext diagnosticContext = RequestDiagnosticContext.create(); + + // Necessary properties from the environment (Consul host:port, service-name (hostname), CBS name) + EnvProperties env = EnvProperties.fromEnvironment(); + + // Create the client and use it to get the configuration + cbsFetchPipeline = CbsClientFactory.createCbsClient(env) + .doOnError(e -> LOGGER.warn("CBS Configuration fetch failed with error: {}", e)) + .retry(e -> true) + .flatMapMany(cbsClient -> cbsClient.updates(diagnosticContext, initialDelay, period)) + .subscribe(this::parseConsulRetrievedConfiguration, this::handleErrors); } @NotNull @@ -149,11 +158,20 @@ public class ConsulConfigurationGateway { final int pipelinesPollingIntervalSec = configObject.get("application.pipelinesPollingIntervalSec").getAsInt(); final int pipelinesTimeoutSec = configObject.get("application.pipelinesTimeoutSec").getAsInt(); final int cbsPollingIntervalSec = configObject.get("application.cbsPollingIntervalSec").getAsInt(); + final String reRegPolicyScope = configObject.get("application.reregistration.policyScope").getAsString(); final String reRegClControlName = configObject.get("application.reregistration.clControlName").getAsString(); final String cpeAuthPolicyScope = configObject.get("application.cpe.authentication.policyScope").getAsString(); final String cpeAuthClControlName = configObject.get("application.cpe.authentication.clControlName").getAsString(); + + final String policyVersion = configObject.get("application.policyVersion").getAsString(); + final String closeLoopTargetType = configObject.get("application.clTargetType").getAsString(); + final String closeLoopEventStatus = configObject.get("application.clEventStatus").getAsString(); + final String closeLoopVersion = configObject.get("application.clVersion").getAsString(); + final String closeLoopTarget = configObject.get("application.clTarget").getAsString(); + final String closeLoopOriginator = configObject.get("application.clOriginator").getAsString(); + final String reRegConfigKey = configObject.get("application.reregistration.configKey").getAsString(); final String cpeAuthConfigKey = configObject.get("application.cpeAuth.configKey").getAsString(); final String closeLoopConfigKey = configObject.get("application.closeLoop.configKey").getAsString(); @@ -179,6 +197,12 @@ public class ConsulConfigurationGateway { .cbsPollingIntervalSec(cbsPollingIntervalSec) .reRegistrationPolicyScope(reRegPolicyScope) .reRegistrationClControlName(reRegClControlName) + .policyVersion(policyVersion) + .closeLoopTargetType(closeLoopTargetType) + .closeLoopEventStatus(closeLoopEventStatus) + .closeLoopVersion(closeLoopVersion) + .closeLoopTarget(closeLoopTarget) + .closeLoopOriginator(closeLoopOriginator) .cpeAuthPolicyScope(cpeAuthPolicyScope) .cpeAuthClControlName(cpeAuthClControlName) .reRegConfigKey(reRegConfigKey) diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/config/GenericProperties.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/config/GenericProperties.java index bc8020a4..71fd5bbd 100644 --- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/config/GenericProperties.java +++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/config/GenericProperties.java @@ -40,6 +40,24 @@ public class GenericProperties { private int pipelinesTimeoutSec; + @NotBlank + private String policyVersion; + + @NotBlank + private String clTargetType; + + @NotBlank + private String clEventStatus; + + @NotBlank + private String clVersion; + + @NotBlank + private String clTarget; + + @NotBlank + private String clOriginator; + private ReRegistrationGenericProperties reRegistration; private CpeAuthenticationGenericProperties cpeAuthentication; diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/model/GeneratedAppConfigObject.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/model/GeneratedAppConfigObject.java index c48ea6ef..12716ce4 100644 --- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/model/GeneratedAppConfigObject.java +++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/model/GeneratedAppConfigObject.java @@ -74,8 +74,7 @@ public interface GeneratedAppConfigObject { @SerializedName(value = "application.pipelinesTimeoutSec", alternate = "application.pipelinesTimeoutSec") int pipelinesTimeoutSec(); - @SerializedName(value = "application.cbsPollingIntervalSec", - alternate = "application.cbsPollingIntervalSec") + @SerializedName(value = "application.cbsPollingIntervalSec", alternate = "application.cbsPollingIntervalSec") int cbsPollingIntervalSec(); @SerializedName(value = "application.reregistration.policyScope", @@ -94,6 +93,24 @@ public interface GeneratedAppConfigObject { alternate = "application.reregistration.clControlName") String cpeAuthClControlName(); + @SerializedName(value = "application.policyVersion", alternate = "application.policyVersion") + String policyVersion(); + + @SerializedName(value = "application.clTargetType", alternate = "application.clTargetType") + String closeLoopTargetType(); + + @SerializedName(value = "application.clEventStatus", alternate = "application.clEventStatus") + String closeLoopEventStatus(); + + @SerializedName(value = "application.clVersion", alternate = "application.clVersion") + String closeLoopVersion(); + + @SerializedName(value = "application.clTarget", alternate = "application.clTarget") + String closeLoopTarget(); + + @SerializedName(value = "application.clOriginator", alternate = "application.clOriginator") + String closeLoopOriginator(); + @SerializedName(value = "application.reregistration.configKey", alternate = "application.reregistration.configKey") String reRegConfigKey(); diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/pipelines/CpeAuthenticationPipeline.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/pipelines/CpeAuthenticationPipeline.java index a8d08576..d4688594 100644 --- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/pipelines/CpeAuthenticationPipeline.java +++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/pipelines/CpeAuthenticationPipeline.java @@ -21,6 +21,7 @@ package org.onap.bbs.event.processor.pipelines; import static org.onap.bbs.event.processor.config.ApplicationConstants.CONSUME_CPE_AUTHENTICATION_TASK_NAME; +import static org.onap.bbs.event.processor.config.ApplicationConstants.DCAE_BBS_EVENT_PROCESSOR_MS_INSTANCE; import static org.onap.bbs.event.processor.config.ApplicationConstants.RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME; import static org.onap.bbs.event.processor.config.ApplicationConstants.RETRIEVE_PNF_TASK_NAME; import static org.onap.dcaegen2.services.sdk.rest.services.model.logging.MdcVariables.INSTANCE_UUID; @@ -37,8 +38,6 @@ import java.util.concurrent.TimeoutException; import javax.net.ssl.SSLException; import org.onap.bbs.event.processor.config.ApplicationConfiguration; -import org.onap.bbs.event.processor.exceptions.AaiTaskException; -import org.onap.bbs.event.processor.exceptions.DmaapException; import org.onap.bbs.event.processor.exceptions.EmptyDmaapResponseException; import org.onap.bbs.event.processor.model.ControlLoopPublisherDmaapModel; import org.onap.bbs.event.processor.model.CpeAuthenticationConsumerDmaapModel; @@ -66,14 +65,7 @@ public class CpeAuthenticationPipeline { private static final Logger LOGGER = LoggerFactory.getLogger(CpeAuthenticationPipeline.class); - private static final String DCAE_BBS_EVENT_PROCESSOR_MS_INSTANCE = "DCAE.BBS_event_processor_mSInstance"; - private static final String POLICY_VERSION = "1.0.0.5"; private static final String POLICY_NAME = "CPE_Authentication"; - private static final String CLOSE_LOOP_TARGET_TYPE = "VM"; - private static final String CLOSED_LOOP_EVENT_STATUS = "ONSET"; - private static final String CLOSE_LOOP_VERSION = "1.0.2"; - private static final String CLOSE_LOOP_TARGET = "vserver.vserver-name"; - private static final String FROM = "DCAE"; private DmaapCpeAuthenticationConsumerTask consumerTask; private DmaapPublisherTask publisherTask; @@ -145,11 +137,13 @@ public class CpeAuthenticationPipeline { if (e instanceof TimeoutException) { LOGGER.warn("Timed out waiting for DMaaP response"); } else if (e instanceof EmptyDmaapResponseException) { - LOGGER.warn("Nothing to consume from DMaaP"); + LOGGER.info("Nothing to consume from DMaaP"); + } else { + LOGGER.error("DMaaP Consumer error: {}", e.getMessage()); } }) .onErrorResume( - e -> (e instanceof EmptyDmaapResponseException || e instanceof TimeoutException), + e -> e instanceof Exception, e -> Mono.empty()) .map(event -> { // For each message, we have to keep separate state. This state will be enhanced @@ -180,7 +174,7 @@ public class CpeAuthenticationPipeline { e.getMessage()) ) .onErrorResume( - e -> e instanceof AaiTaskException || e instanceof TimeoutException, + e -> e instanceof Exception, e -> Mono.empty()) .map(p -> { state.setPnfAaiObject(p); @@ -199,9 +193,9 @@ public class CpeAuthenticationPipeline { // towards the HSI CFS service String serviceInstanceId = pnf.getRelationshipListAaiObject().getRelationshipEntries() .stream() - .filter(e -> e.getRelatedTo().equals("service-instance")) + .filter(e -> "service-instance".equals(e.getRelatedTo())) .flatMap(e -> e.getRelationshipData().stream()) - .filter(d -> d.getRelationshipKey().equals("service-instance.service-instance-id")) + .filter(d -> "service-instance.service-instance-id".equals(d.getRelationshipKey())) .map(RelationshipListAaiObject.RelationshipDataEntryAaiObject::getRelationshipValue) .findFirst().orElse(""); @@ -223,7 +217,7 @@ public class CpeAuthenticationPipeline { e.getMessage()) ) .onErrorResume( - e -> e instanceof AaiTaskException || e instanceof TimeoutException, + e -> e instanceof Exception, e -> Mono.empty()) .map(s -> { state.setHsiCfsServiceInstance(s); @@ -253,7 +247,7 @@ public class CpeAuthenticationPipeline { ) .doOnError(e -> LOGGER.error("Error while triggering Policy: {}", e.getMessage())) .onErrorResume( - e -> e instanceof DmaapException || e instanceof TimeoutException, + e -> e instanceof Exception, e -> Mono.empty()); } @@ -267,7 +261,7 @@ public class CpeAuthenticationPipeline { return optionalMetadata .map(list -> list.getMetadataEntries() .stream() - .anyMatch(m -> m.getMetaname().equals("rgw-mac-address") + .anyMatch(m -> "rgw-mac-address".equals(m.getMetaname()) && m.getMetavalue().equals(eventRgwMacAddress))) .orElse(false); } @@ -287,18 +281,18 @@ public class CpeAuthenticationPipeline { ControlLoopPublisherDmaapModel triggerEvent = ImmutableControlLoopPublisherDmaapModel.builder() .closedLoopEventClient(DCAE_BBS_EVENT_PROCESSOR_MS_INSTANCE) - .policyVersion(POLICY_VERSION) + .policyVersion(configuration.getPolicyVersion()) .policyName(POLICY_NAME) .policyScope(configuration.getCpeAuthenticationCloseLoopPolicyScope()) - .targetType(CLOSE_LOOP_TARGET_TYPE) + .targetType(configuration.getCloseLoopTargetType()) .aaiEnrichmentData(enrichmentData) .closedLoopAlarmStart(Instant.now().getEpochSecond()) - .closedLoopEventStatus(CLOSED_LOOP_EVENT_STATUS) + .closedLoopEventStatus(configuration.getCloseLoopEventStatus()) .closedLoopControlName(configuration.getCpeAuthenticationCloseLoopControlName()) - .version(CLOSE_LOOP_VERSION) - .target(CLOSE_LOOP_TARGET) + .version(configuration.getCloseLoopVersion()) + .target(configuration.getCloseLoopTarget()) .requestId(UUID.randomUUID().toString()) - .originator(FROM) + .originator(configuration.getCloseLoopOriginator()) .build(); LOGGER.debug("Processing Step: Publish for Policy"); LOGGER.trace("Trigger Policy event: ({})",triggerEvent); diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/pipelines/ReRegistrationPipeline.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/pipelines/ReRegistrationPipeline.java index 7f96cdd5..0f28d7c5 100644 --- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/pipelines/ReRegistrationPipeline.java +++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/pipelines/ReRegistrationPipeline.java @@ -21,6 +21,7 @@ package org.onap.bbs.event.processor.pipelines; import static org.onap.bbs.event.processor.config.ApplicationConstants.CONSUME_REREGISTRATION_TASK_NAME; +import static org.onap.bbs.event.processor.config.ApplicationConstants.DCAE_BBS_EVENT_PROCESSOR_MS_INSTANCE; import static org.onap.bbs.event.processor.config.ApplicationConstants.RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME; import static org.onap.bbs.event.processor.config.ApplicationConstants.RETRIEVE_PNF_TASK_NAME; import static org.onap.dcaegen2.services.sdk.rest.services.model.logging.MdcVariables.INSTANCE_UUID; @@ -37,8 +38,6 @@ import java.util.concurrent.TimeoutException; import javax.net.ssl.SSLException; import org.onap.bbs.event.processor.config.ApplicationConfiguration; -import org.onap.bbs.event.processor.exceptions.AaiTaskException; -import org.onap.bbs.event.processor.exceptions.DmaapException; import org.onap.bbs.event.processor.exceptions.EmptyDmaapResponseException; import org.onap.bbs.event.processor.model.ControlLoopPublisherDmaapModel; import org.onap.bbs.event.processor.model.ImmutableControlLoopPublisherDmaapModel; @@ -65,14 +64,7 @@ public class ReRegistrationPipeline { private static final Logger LOGGER = LoggerFactory.getLogger(ReRegistrationPipeline.class); - private static final String DCAE_BBS_EVENT_PROCESSOR_MS_INSTANCE = "DCAE.BBS_event_processor_mSInstance"; - private static final String POLICY_VERSION = "1.0.0.5"; private static final String POLICY_NAME = "Nomadic_ONT"; - private static final String CLOSE_LOOP_TARGET_TYPE = "VM"; - private static final String CLOSED_LOOP_EVENT_STATUS = "ONSET"; - private static final String CLOSE_LOOP_VERSION = "1.0.2"; - private static final String CLOSE_LOOP_TARGET = "vserver.vserver-name"; - private static final String FROM = "DCAE"; private DmaapReRegistrationConsumerTask consumerTask; private DmaapPublisherTask publisherTask; @@ -144,11 +136,13 @@ public class ReRegistrationPipeline { if (e instanceof TimeoutException) { LOGGER.warn("Timed out waiting for DMaaP response"); } else if (e instanceof EmptyDmaapResponseException) { - LOGGER.warn("Nothing to consume from DMaaP"); + LOGGER.info("Nothing to consume from DMaaP"); + } else { + LOGGER.error("DMaaP Consumer error: {}", e.getMessage()); } }) .onErrorResume( - e -> (e instanceof EmptyDmaapResponseException || e instanceof TimeoutException), + e -> e instanceof Exception, e -> Mono.empty()) .map(event -> { // For each message, we have to keep separate state. This state will be enhanced @@ -179,7 +173,7 @@ public class ReRegistrationPipeline { e.getMessage()) ) .onErrorResume( - e -> e instanceof AaiTaskException || e instanceof TimeoutException, + e -> e instanceof Exception, e -> Mono.empty()) .map(p -> { state.setPnfAaiObject(p); @@ -204,9 +198,9 @@ public class ReRegistrationPipeline { // towards the HSI CFS service String serviceInstanceId = pnf.getRelationshipListAaiObject().getRelationshipEntries() .stream() - .filter(e -> e.getRelatedTo().equals("service-instance")) + .filter(e -> "service-instance".equals(e.getRelatedTo())) .flatMap(e -> e.getRelationshipData().stream()) - .filter(d -> d.getRelationshipKey().equals("service-instance.service-instance-id")) + .filter(d -> "service-instance.service-instance-id".equals(d.getRelationshipKey())) .map(RelationshipListAaiObject.RelationshipDataEntryAaiObject::getRelationshipValue) .findFirst().orElse(""); @@ -228,7 +222,7 @@ public class ReRegistrationPipeline { e.getMessage()) ) .onErrorResume( - e -> e instanceof AaiTaskException || e instanceof TimeoutException, + e -> e instanceof Exception, e -> Mono.empty()) .map(s -> { state.setHsiCfsServiceInstance(s); @@ -241,7 +235,7 @@ public class ReRegistrationPipeline { state.getPnfAaiObject().getRelationshipListAaiObject().getRelationshipEntries(); // If no logical-link, fail further processing - if (relationshipEntries.stream().noneMatch(e -> e.getRelatedTo().equals("logical-link"))) { + if (relationshipEntries.stream().noneMatch(e -> "logical-link".equals(e.getRelatedTo()))) { LOGGER.warn("PNF {} does not have any logical-links bridged. Stop further processing", state.getPnfAaiObject().getPnfName()); return true; @@ -250,7 +244,7 @@ public class ReRegistrationPipeline { // Assuming PNF will only have one logical-link per BBS use case design boolean isNotRelocation = relationshipEntries .stream() - .filter(e -> e.getRelatedTo().equals("logical-link")) + .filter(e -> "logical-link".equals(e.getRelatedTo())) .flatMap(e -> e.getRelationshipData().stream()) .anyMatch(d -> d.getRelationshipValue() .equals(state.getReRegistrationEvent().getAttachmentPoint())); @@ -278,7 +272,7 @@ public class ReRegistrationPipeline { ) .doOnError(e -> LOGGER.error("Error while triggering Policy: {}", e.getMessage())) .onErrorResume( - e -> e instanceof DmaapException || e instanceof TimeoutException, + e -> e instanceof Exception, e -> Mono.empty()); } @@ -301,18 +295,18 @@ public class ReRegistrationPipeline { ControlLoopPublisherDmaapModel triggerEvent = ImmutableControlLoopPublisherDmaapModel.builder() .closedLoopEventClient(DCAE_BBS_EVENT_PROCESSOR_MS_INSTANCE) - .policyVersion(POLICY_VERSION) + .policyVersion(configuration.getPolicyVersion()) .policyName(POLICY_NAME) .policyScope(configuration.getReRegistrationCloseLoopPolicyScope()) - .targetType(CLOSE_LOOP_TARGET_TYPE) + .targetType(configuration.getCloseLoopTargetType()) .aaiEnrichmentData(enrichmentData) .closedLoopAlarmStart(Instant.now().getEpochSecond()) - .closedLoopEventStatus(CLOSED_LOOP_EVENT_STATUS) + .closedLoopEventStatus(configuration.getCloseLoopEventStatus()) .closedLoopControlName(configuration.getReRegistrationCloseLoopControlName()) - .version(CLOSE_LOOP_VERSION) - .target(CLOSE_LOOP_TARGET) + .version(configuration.getCloseLoopVersion()) + .target(configuration.getCloseLoopTarget()) .requestId(UUID.randomUUID().toString()) - .originator(FROM) + .originator(configuration.getCloseLoopOriginator()) .build(); LOGGER.debug("Processing Step: Publish for Policy"); LOGGER.trace("Trigger Policy event: ({})",triggerEvent); diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/pipelines/Scheduler.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/pipelines/Scheduler.java index 64d212ad..5fbb0875 100644 --- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/pipelines/Scheduler.java +++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/pipelines/Scheduler.java @@ -112,17 +112,13 @@ public class Scheduler implements ConfigurationChangeObserver { } @Override - public void updateConfiguration(ApplicationConfiguration newConfiguration) { - if (newConfiguration.getPipelinesPollingIntervalInSeconds() != currentPipelinesPollingInterval - || newConfiguration.getCbsPollingInterval() != currentCbsPollingInterval) { - configuration = newConfiguration; - } - if (newConfiguration.getPipelinesPollingIntervalInSeconds() != currentPipelinesPollingInterval) { + public void updateConfiguration() { + if (configuration.getPipelinesPollingIntervalInSeconds() != currentPipelinesPollingInterval) { LOGGER.info("Pipelines Polling interval has changed. Re-scheduling processing pipelines"); cancelScheduledProcessingTasks(); reScheduleProcessingTasks(); } - int newCbsPollingInterval = newConfiguration.getCbsPollingInterval(); + int newCbsPollingInterval = configuration.getCbsPollingInterval(); if (newCbsPollingInterval != currentCbsPollingInterval) { if (newCbsPollingInterval < DEFAULT_CBS_POLLING_INTERVAL) { LOGGER.warn("CBS Polling interval is too small ({}). Will not re-schedule CBS job", diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/AaiClientTask.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/AaiClientTask.java index 239ccd6c..377fe0f6 100644 --- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/AaiClientTask.java +++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/AaiClientTask.java @@ -22,7 +22,6 @@ package org.onap.bbs.event.processor.tasks; import org.onap.bbs.event.processor.model.PnfAaiObject; import org.onap.bbs.event.processor.model.ServiceInstanceAaiObject; -import org.onap.bbs.event.processor.utilities.AaiReactiveClient; import reactor.core.publisher.Mono; @@ -31,6 +30,4 @@ public interface AaiClientTask { Mono executePnfRetrieval(String taskName, String url); Mono executeServiceInstanceRetrieval(String taskName, String url); - - AaiReactiveClient resolveClient(); } diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/AaiClientTaskImpl.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/AaiClientTaskImpl.java index 1dc43557..92516a04 100644 --- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/AaiClientTaskImpl.java +++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/AaiClientTaskImpl.java @@ -39,7 +39,7 @@ public class AaiClientTaskImpl implements AaiClientTask { private final AaiReactiveClient reactiveClient; @Autowired - public AaiClientTaskImpl(AaiReactiveClient reactiveClient) { + AaiClientTaskImpl(AaiReactiveClient reactiveClient) { this.reactiveClient = reactiveClient; } @@ -49,7 +49,7 @@ public class AaiClientTaskImpl implements AaiClientTask { throw new AaiTaskException("Cannot invoke an A&AI client task with an invalid URL"); } LOGGER.info("Executing task ({}) for retrieving PNF object", taskName); - return resolveClient().getPnfObjectDataFor(url); + return reactiveClient.getPnfObjectDataFor(url); } @Override @@ -58,11 +58,6 @@ public class AaiClientTaskImpl implements AaiClientTask { throw new AaiTaskException("Cannot invoke an A&AI client task with an invalid URL"); } LOGGER.info("Executing task ({}) for retrieving Service Instance object", taskName); - return resolveClient().getServiceInstanceObjectDataFor(url); - } - - @Override - public AaiReactiveClient resolveClient() { - return reactiveClient; + return reactiveClient.getServiceInstanceObjectDataFor(url); } } diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapCpeAuthenticationConsumerTask.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapCpeAuthenticationConsumerTask.java index 2e8a9ee2..5b7ade12 100644 --- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapCpeAuthenticationConsumerTask.java +++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapCpeAuthenticationConsumerTask.java @@ -23,13 +23,10 @@ package org.onap.bbs.event.processor.tasks; import javax.net.ssl.SSLException; import org.onap.bbs.event.processor.model.CpeAuthenticationConsumerDmaapModel; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.DMaaPConsumerReactiveHttpClient; import reactor.core.publisher.Flux; public interface DmaapCpeAuthenticationConsumerTask { Flux execute(String object) throws SSLException; - - DMaaPConsumerReactiveHttpClient resolveClient() throws SSLException; } diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapCpeAuthenticationConsumerTaskImpl.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapCpeAuthenticationConsumerTaskImpl.java index e9546638..4c5122d5 100644 --- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapCpeAuthenticationConsumerTaskImpl.java +++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapCpeAuthenticationConsumerTaskImpl.java @@ -20,9 +20,12 @@ package org.onap.bbs.event.processor.tasks; +import javax.annotation.PostConstruct; +import javax.annotation.PreDestroy; import javax.net.ssl.SSLException; import org.onap.bbs.event.processor.config.ApplicationConfiguration; +import org.onap.bbs.event.processor.config.ConfigurationChangeObserver; import org.onap.bbs.event.processor.exceptions.EmptyDmaapResponseException; import org.onap.bbs.event.processor.model.CpeAuthenticationConsumerDmaapModel; import org.onap.bbs.event.processor.utilities.CpeAuthenticationDmaapConsumerJsonParser; @@ -38,15 +41,21 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @Component -public class DmaapCpeAuthenticationConsumerTaskImpl implements DmaapCpeAuthenticationConsumerTask { +public class DmaapCpeAuthenticationConsumerTaskImpl + implements DmaapCpeAuthenticationConsumerTask, ConfigurationChangeObserver { private static final Logger LOGGER = LoggerFactory.getLogger(DmaapCpeAuthenticationConsumerTaskImpl.class); - private final ApplicationConfiguration configuration; + private ApplicationConfiguration configuration; private final CpeAuthenticationDmaapConsumerJsonParser cpeAuthenticationDmaapConsumerJsonParser; private final ConsumerReactiveHttpClientFactory httpClientFactory; + private static final EmptyDmaapResponseException EMPTY_DMAAP_EXCEPTION = + new EmptyDmaapResponseException("CPE Authentication: Got an empty response from DMaaP"); + + private DMaaPConsumerReactiveHttpClient httpClient; + @Autowired - public DmaapCpeAuthenticationConsumerTaskImpl(ApplicationConfiguration configuration) { + public DmaapCpeAuthenticationConsumerTaskImpl(ApplicationConfiguration configuration) throws SSLException { this(configuration, new CpeAuthenticationDmaapConsumerJsonParser(), new ConsumerReactiveHttpClientFactory(new DMaaPReactiveWebClientFactory())); } @@ -54,20 +63,41 @@ public class DmaapCpeAuthenticationConsumerTaskImpl implements DmaapCpeAuthentic DmaapCpeAuthenticationConsumerTaskImpl(ApplicationConfiguration configuration, CpeAuthenticationDmaapConsumerJsonParser cpeAuthenticationDmaapConsumerJsonParser, - ConsumerReactiveHttpClientFactory httpClientFactory) { + ConsumerReactiveHttpClientFactory httpClientFactory) throws SSLException { this.configuration = configuration; this.cpeAuthenticationDmaapConsumerJsonParser = cpeAuthenticationDmaapConsumerJsonParser; this.httpClientFactory = httpClientFactory; + + httpClient = httpClientFactory.create(this.configuration.getDmaapCpeAuthenticationConsumerConfiguration()); + } + + @PostConstruct + void registerForConfigChanges() { + configuration.register(this); + } + + @PreDestroy + void unRegisterForConfigChanges() { + configuration.unRegister(this); } @Override - public Flux execute(String taskName) throws SSLException { + public synchronized void updateConfiguration() { + try { + LOGGER.info("DMaaP CPE authentication consumer update due to new application configuration"); + httpClient = httpClientFactory.create(this.configuration.getDmaapCpeAuthenticationConsumerConfiguration()); + } catch (SSLException e) { + LOGGER.error("Error while updating HTTP Client after a config update: SSL exception"); + } + } + + @Override + public Flux execute(String taskName) { LOGGER.debug("Executing task for CPE-Authentication with name \"{}\"", taskName); - DMaaPConsumerReactiveHttpClient dmaaPConsumerReactiveHttpClient = resolveClient(); - Mono response = dmaaPConsumerReactiveHttpClient.getDMaaPConsumerResponse(); + DMaaPConsumerReactiveHttpClient httpClient = getHttpClient(); + Mono response = httpClient.getDMaaPConsumerResponse(); return cpeAuthenticationDmaapConsumerJsonParser.extractModelFromDmaap(response) - .switchIfEmpty(Flux.error( - new EmptyDmaapResponseException("CPE Authentication: Got an empty response from DMaaP"))) + .switchIfEmpty(Flux.error(EMPTY_DMAAP_EXCEPTION)) .doOnError(e -> { if (!(e instanceof EmptyDmaapResponseException)) { LOGGER.error("DMaaP Consumption Exception: {}", e.getMessage()); @@ -75,8 +105,7 @@ public class DmaapCpeAuthenticationConsumerTaskImpl implements DmaapCpeAuthentic }); } - @Override - public DMaaPConsumerReactiveHttpClient resolveClient() throws SSLException { - return httpClientFactory.create(configuration.getDmaapCpeAuthenticationConsumerConfiguration()); + private synchronized DMaaPConsumerReactiveHttpClient getHttpClient() { + return httpClient; } } diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapPublisherTask.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapPublisherTask.java index bc18064b..bddd2ecc 100644 --- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapPublisherTask.java +++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapPublisherTask.java @@ -21,7 +21,6 @@ package org.onap.bbs.event.processor.tasks; import org.onap.bbs.event.processor.model.ControlLoopPublisherDmaapModel; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.producer.DMaaPPublisherReactiveHttpClient; import org.springframework.http.ResponseEntity; import reactor.core.publisher.Mono; @@ -29,6 +28,4 @@ import reactor.core.publisher.Mono; public interface DmaapPublisherTask { Mono> execute(ControlLoopPublisherDmaapModel controlLoopPublisherDmaapModel); - - DMaaPPublisherReactiveHttpClient resolveClient(); } diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapPublisherTaskImpl.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapPublisherTaskImpl.java index 1dd39bec..7b227211 100644 --- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapPublisherTaskImpl.java +++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapPublisherTaskImpl.java @@ -20,7 +20,11 @@ package org.onap.bbs.event.processor.tasks; +import javax.annotation.PostConstruct; +import javax.annotation.PreDestroy; + import org.onap.bbs.event.processor.config.ApplicationConfiguration; +import org.onap.bbs.event.processor.config.ConfigurationChangeObserver; import org.onap.bbs.event.processor.exceptions.DmaapException; import org.onap.bbs.event.processor.model.ControlLoopPublisherDmaapModel; import org.onap.bbs.event.processor.utilities.ControlLoopJsonBodyBuilder; @@ -36,12 +40,14 @@ import org.springframework.stereotype.Component; import reactor.core.publisher.Mono; @Component -public class DmaapPublisherTaskImpl implements DmaapPublisherTask { +public class DmaapPublisherTaskImpl implements DmaapPublisherTask, ConfigurationChangeObserver { private static final Logger LOGGER = LoggerFactory.getLogger(DmaapPublisherTaskImpl.class); - private final ApplicationConfiguration configuration; + private ApplicationConfiguration configuration; private final PublisherReactiveHttpClientFactory httpClientFactory; + private DMaaPPublisherReactiveHttpClient httpClient; + @Autowired DmaapPublisherTaskImpl(ApplicationConfiguration configuration) { this(configuration, new PublisherReactiveHttpClientFactory(new DmaaPRestTemplateFactory(), @@ -52,6 +58,24 @@ public class DmaapPublisherTaskImpl implements DmaapPublisherTask { PublisherReactiveHttpClientFactory httpClientFactory) { this.configuration = configuration; this.httpClientFactory = httpClientFactory; + + httpClient = httpClientFactory.create(this.configuration.getDmaapPublisherConfiguration()); + } + + @PostConstruct + void registerForConfigChanges() { + configuration.register(this); + } + + @PreDestroy + void unRegisterForConfigChanges() { + configuration.unRegister(this); + } + + @Override + public synchronized void updateConfiguration() { + LOGGER.info("DMaaP Publisher update due to new application configuration"); + httpClient = httpClientFactory.create(this.configuration.getDmaapPublisherConfiguration()); } @Override @@ -60,12 +84,11 @@ public class DmaapPublisherTaskImpl implements DmaapPublisherTask { throw new DmaapException("Cannot invoke a DMaaP Publish task with a null message"); } LOGGER.info("Executing task for publishing control loop message \n{}", controlLoopPublisherDmaapModel); - DMaaPPublisherReactiveHttpClient dmaapPublisherReactiveHttpClient = resolveClient(); - return dmaapPublisherReactiveHttpClient.getDMaaPProducerResponse(controlLoopPublisherDmaapModel); + DMaaPPublisherReactiveHttpClient httpClient = getHttpClient(); + return httpClient.getDMaaPProducerResponse(controlLoopPublisherDmaapModel); } - @Override - public DMaaPPublisherReactiveHttpClient resolveClient() { - return httpClientFactory.create(configuration.getDmaapPublisherConfiguration()); + private synchronized DMaaPPublisherReactiveHttpClient getHttpClient() { + return httpClient; } } diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapReRegistrationConsumerTask.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapReRegistrationConsumerTask.java index ea459d00..77f2ef8e 100644 --- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapReRegistrationConsumerTask.java +++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapReRegistrationConsumerTask.java @@ -23,13 +23,10 @@ package org.onap.bbs.event.processor.tasks; import javax.net.ssl.SSLException; import org.onap.bbs.event.processor.model.ReRegistrationConsumerDmaapModel; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.DMaaPConsumerReactiveHttpClient; import reactor.core.publisher.Flux; public interface DmaapReRegistrationConsumerTask { Flux execute(String taskName) throws SSLException; - - DMaaPConsumerReactiveHttpClient resolveClient() throws SSLException; } diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapReRegistrationConsumerTaskImpl.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapReRegistrationConsumerTaskImpl.java index d8610215..6985396c 100644 --- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapReRegistrationConsumerTaskImpl.java +++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapReRegistrationConsumerTaskImpl.java @@ -20,9 +20,12 @@ package org.onap.bbs.event.processor.tasks; +import javax.annotation.PostConstruct; +import javax.annotation.PreDestroy; import javax.net.ssl.SSLException; import org.onap.bbs.event.processor.config.ApplicationConfiguration; +import org.onap.bbs.event.processor.config.ConfigurationChangeObserver; import org.onap.bbs.event.processor.exceptions.EmptyDmaapResponseException; import org.onap.bbs.event.processor.model.ReRegistrationConsumerDmaapModel; import org.onap.bbs.event.processor.utilities.ReRegistrationDmaapConsumerJsonParser; @@ -38,35 +41,63 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @Component -public class DmaapReRegistrationConsumerTaskImpl implements DmaapReRegistrationConsumerTask { +public class DmaapReRegistrationConsumerTaskImpl implements DmaapReRegistrationConsumerTask, + ConfigurationChangeObserver { private static final Logger LOGGER = LoggerFactory.getLogger(DmaapReRegistrationConsumerTaskImpl.class); - private final ApplicationConfiguration configuration; + private ApplicationConfiguration configuration; private final ReRegistrationDmaapConsumerJsonParser reRegistrationDmaapConsumerJsonParser; private final ConsumerReactiveHttpClientFactory httpClientFactory; + private static final EmptyDmaapResponseException EMPTY_DMAAP_EXCEPTION = + new EmptyDmaapResponseException("PNF Re-Registration: Got an empty response from DMaaP"); + + private DMaaPConsumerReactiveHttpClient httpClient; + @Autowired - public DmaapReRegistrationConsumerTaskImpl(ApplicationConfiguration configuration) { + public DmaapReRegistrationConsumerTaskImpl(ApplicationConfiguration configuration) throws SSLException { this(configuration, new ReRegistrationDmaapConsumerJsonParser(), new ConsumerReactiveHttpClientFactory(new DMaaPReactiveWebClientFactory())); } DmaapReRegistrationConsumerTaskImpl(ApplicationConfiguration configuration, ReRegistrationDmaapConsumerJsonParser reRegDmaapConsumerJsonParser, - ConsumerReactiveHttpClientFactory httpClientFactory) { + ConsumerReactiveHttpClientFactory httpClientFactory) + throws SSLException { this.configuration = configuration; this.reRegistrationDmaapConsumerJsonParser = reRegDmaapConsumerJsonParser; this.httpClientFactory = httpClientFactory; + + httpClient = httpClientFactory.create(this.configuration.getDmaapReRegistrationConsumerConfiguration()); + } + + @PostConstruct + void registerForConfigChanges() { + configuration.register(this); + } + + @PreDestroy + void unRegisterForConfigChanges() { + configuration.unRegister(this); } @Override - public Flux execute(String taskName) throws SSLException { + public synchronized void updateConfiguration() { + try { + LOGGER.info("DMaaP PNF reregistration consumer update due to new application configuration"); + httpClient = httpClientFactory.create(this.configuration.getDmaapReRegistrationConsumerConfiguration()); + } catch (SSLException e) { + LOGGER.error("Error while updating HTTP Client after a config update: SSL exception"); + } + } + + @Override + public Flux execute(String taskName) { LOGGER.debug("Executing task for Re-Registration with name \"{}\"", taskName); - DMaaPConsumerReactiveHttpClient dmaaPConsumerReactiveHttpClient = resolveClient(); - Mono response = dmaaPConsumerReactiveHttpClient.getDMaaPConsumerResponse(); + DMaaPConsumerReactiveHttpClient httpClient = getHttpClient(); + Mono response = httpClient.getDMaaPConsumerResponse(); return reRegistrationDmaapConsumerJsonParser.extractModelFromDmaap(response) - .switchIfEmpty(Flux.error( - new EmptyDmaapResponseException("Re-Registration: Got an empty response from DMaaP"))) + .switchIfEmpty(Flux.error(EMPTY_DMAAP_EXCEPTION)) .doOnError(e -> { if (!(e instanceof EmptyDmaapResponseException)) { LOGGER.error("DMaaP Consumption Exception: {}", e.getMessage()); @@ -74,8 +105,7 @@ public class DmaapReRegistrationConsumerTaskImpl implements DmaapReRegistrationC }); } - @Override - public DMaaPConsumerReactiveHttpClient resolveClient() throws SSLException { - return httpClientFactory.create(configuration.getDmaapReRegistrationConsumerConfiguration()); + private synchronized DMaaPConsumerReactiveHttpClient getHttpClient() { + return httpClient; } } diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/AaiReactiveClient.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/AaiReactiveClient.java index 131551e5..e07de4bb 100644 --- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/AaiReactiveClient.java +++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/AaiReactiveClient.java @@ -69,7 +69,7 @@ public class AaiReactiveClient implements ConfigurationChangeObserver { this.gson = gson; this.sslFactory = new SslFactory(); - aaiClientConfiguration = configuration.getAaiClientConfiguration(); + aaiClientConfiguration = this.configuration.getAaiClientConfiguration(); setupWebClient(); } @@ -84,22 +84,24 @@ public class AaiReactiveClient implements ConfigurationChangeObserver { } @Override - public void updateConfiguration(ApplicationConfiguration configuration) { + public void updateConfiguration() { AaiClientConfiguration newConfiguration = configuration.getAaiClientConfiguration(); if (aaiClientConfiguration.equals(newConfiguration)) { - LOGGER.debug("No Configuration changes necessary for AAI Reactive client"); + LOGGER.info("No Configuration changes necessary for AAI Reactive client"); } else { - LOGGER.debug("AAI Reactive client must be re-configured"); - aaiClientConfiguration = newConfiguration; - try { - setupWebClient(); - } catch (SSLException e) { - LOGGER.error("AAI Reactive client error while re-configuring WebClient"); + synchronized (this) { + LOGGER.info("AAI Reactive client must be re-configured"); + aaiClientConfiguration = newConfiguration; + try { + setupWebClient(); + } catch (SSLException e) { + LOGGER.error("AAI Reactive client error while re-configuring WebClient"); + } } } } - private synchronized void setupWebClient() throws SSLException { + private void setupWebClient() throws SSLException { SslContext sslContext = createSslContext(); ClientHttpConnector reactorClientHttpConnector = new ReactorClientHttpConnector( @@ -127,8 +129,9 @@ public class AaiReactiveClient implements ConfigurationChangeObserver { return performReactiveHttpGet(url, ServiceInstanceAaiObject.class); } - private synchronized Mono performReactiveHttpGet(String url, Class responseType) { + private Mono performReactiveHttpGet(String url, Class responseType) { LOGGER.debug("Will issue Reactive GET request to URL ({}) for object ({})", url, responseType.getName()); + WebClient webClient = getWebClient(); return webClient .get() .uri(url) @@ -186,4 +189,8 @@ public class AaiReactiveClient implements ConfigurationChangeObserver { } return sslFactory.createInsecureContext(); } + + private synchronized WebClient getWebClient() { + return webClient; + } } diff --git a/components/bbs-event-processor/src/main/resources/application.yml b/components/bbs-event-processor/src/main/resources/application.yml index c9820ebf..9092adae 100644 --- a/components/bbs-event-processor/src/main/resources/application.yml +++ b/components/bbs-event-processor/src/main/resources/application.yml @@ -56,6 +56,12 @@ configs: application: pipelinesPollingIntervalSec: 30 pipelinesTimeoutSec: 15 + policyVersion: 1.0.0.5 + clTargetType: VM + clEventStatus: ONSET + clVersion: 1.0.2 + clTarget: vserver.vserver-name + clOriginator: DCAE-bbs-event-processor re-registration: policyScope: policyScope clControlName: controlName diff --git a/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/config/ApplicationConfigurationTest.java b/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/config/ApplicationConfigurationTest.java index 59feacaa..7b81b1be 100644 --- a/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/config/ApplicationConfigurationTest.java +++ b/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/config/ApplicationConfigurationTest.java @@ -95,6 +95,12 @@ import org.springframework.test.context.TestPropertySource; "configs.security.enableAaiCertAuth=true", "configs.application.pipelinesPollingIntervalSec=30", "configs.application.pipelinesTimeoutSec=15", + "configs.application.policyVersion=1.0.0", + "configs.application.clTargetType=VM", + "configs.application.clEventStatus=ONSET", + "configs.application.clVersion=1.0.2", + "configs.application.clTarget=vserver.vserver-name", + "configs.application.clOriginator=DCAE-bbs-event-processor", "configs.application.re-registration.policyScope=reRegPolicyScope", "configs.application.re-registration.clControlName=reRegControlName", "configs.application.cpe-authentication.policyScope=cpeAuthPolicyScope", @@ -191,6 +197,12 @@ class ApplicationConfigurationTest { assertAll("Generic Application Properties", () -> assertEquals(30, configuration.getPipelinesPollingIntervalInSeconds()), () -> assertEquals(15, configuration.getPipelinesTimeoutInSeconds()), + () -> assertEquals("1.0.0", configuration.getPolicyVersion()), + () -> assertEquals("VM", configuration.getCloseLoopTargetType()), + () -> assertEquals("ONSET", configuration.getCloseLoopEventStatus()), + () -> assertEquals("1.0.2", configuration.getCloseLoopVersion()), + () -> assertEquals("vserver.vserver-name", configuration.getCloseLoopTarget()), + () -> assertEquals("DCAE-bbs-event-processor", configuration.getCloseLoopOriginator()), () -> assertEquals("reRegPolicyScope", configuration.getReRegistrationCloseLoopPolicyScope()), () -> assertEquals("cpeAuthPolicyScope", configuration.getCpeAuthenticationCloseLoopPolicyScope()), () -> assertEquals("reRegControlName", configuration.getReRegistrationCloseLoopControlName()), @@ -261,6 +273,12 @@ class ApplicationConfigurationTest { .pipelinesPollingIntervalSec(20) .pipelinesTimeoutSec(20) .cbsPollingIntervalSec(180) + .policyVersion("2.0.0") + .closeLoopTargetType("VM2") + .closeLoopEventStatus("ONSET-update") + .closeLoopVersion("2.0.2") + .closeLoopTarget("Target-update") + .closeLoopOriginator("Originator-update") .reRegistrationPolicyScope("policyScope-update") .reRegistrationClControlName("controlName-update") .cpeAuthPolicyScope("policyScope-update") @@ -341,6 +359,12 @@ class ApplicationConfigurationTest { () -> assertEquals(20, configuration.getPipelinesPollingIntervalInSeconds()), () -> assertEquals(20, configuration.getPipelinesTimeoutInSeconds()), () -> assertEquals(180, configuration.getCbsPollingInterval()), + () -> assertEquals("2.0.0", configuration.getPolicyVersion()), + () -> assertEquals("VM2", configuration.getCloseLoopTargetType()), + () -> assertEquals("ONSET-update", configuration.getCloseLoopEventStatus()), + () -> assertEquals("2.0.2", configuration.getCloseLoopVersion()), + () -> assertEquals("Target-update", configuration.getCloseLoopTarget()), + () -> assertEquals("Originator-update", configuration.getCloseLoopOriginator()), () -> assertEquals("policyScope-update", configuration.getReRegistrationCloseLoopPolicyScope()), () -> assertEquals("policyScope-update", configuration.getCpeAuthenticationCloseLoopPolicyScope()), () -> assertEquals("controlName-update", configuration.getReRegistrationCloseLoopControlName()), diff --git a/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/config/ConsulConfigurationGatewayTest.java b/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/config/ConsulConfigurationGatewayTest.java index ee75926b..cd20d1e1 100644 --- a/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/config/ConsulConfigurationGatewayTest.java +++ b/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/config/ConsulConfigurationGatewayTest.java @@ -74,6 +74,12 @@ class ConsulConfigurationGatewayTest { + "\"application.reregistration.clControlName\": \"controlName\"," + "\"application.cpe.authentication.policyScope\": \"policyScope\"," + "\"application.cpe.authentication.clControlName\": \"controlName\"," + + "\"application.policyVersion\": \"1.0\"," + + "\"application.clTargetType\": \"VM\"," + + "\"application.clEventStatus\": \"ONSET\"," + + "\"application.clVersion\": \"1.0.2\"," + + "\"application.clTarget\": \"vserver.vserver-name\"," + + "\"application.clOriginator\": \"DCAE-bbs-event-processor\"," + "\"application.reregistration.configKey\": \"config_key_2\"," + "\"application.cpeAuth.configKey\": \"config_key_1\"," + "\"application.closeLoop.configKey\": \"config_key_3\"," @@ -186,6 +192,12 @@ class ConsulConfigurationGatewayTest { .reRegistrationClControlName("controlName") .cpeAuthPolicyScope("policyScope") .cpeAuthClControlName("controlName") + .policyVersion("1.0") + .closeLoopTargetType("VM") + .closeLoopEventStatus("ONSET") + .closeLoopVersion("1.0.2") + .closeLoopTarget("vserver.vserver-name") + .closeLoopOriginator("DCAE-bbs-event-processor") .reRegConfigKey("config_key_2") .cpeAuthConfigKey("config_key_1") .closeLoopConfigKey("config_key_3") diff --git a/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/config/GenericPropertiesTest.java b/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/config/GenericPropertiesTest.java index 12e393a1..91ebed8d 100644 --- a/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/config/GenericPropertiesTest.java +++ b/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/config/GenericPropertiesTest.java @@ -36,6 +36,12 @@ import org.springframework.test.context.TestPropertySource; @TestPropertySource(properties = { "configs.application.pipelinesPollingIntervalSec=30", "configs.application.pipelinesTimeoutSec=15", + "configs.application.policyVersion=1.0.0", + "configs.application.clTargetType=VM", + "configs.application.clEventStatus=ONSET", + "configs.application.clVersion=1.0.2", + "configs.application.clTarget=vserver.vserver-name", + "configs.application.clOriginator=DCAE-bbs-event-processor", "configs.application.re-registration.policyScope=reRegPolicyScope", "configs.application.re-registration.clControlName=reRegControlName", "configs.application.cpe-authentication.policyScope=cpeAuthPolicyScope", @@ -54,6 +60,12 @@ class GenericPropertiesTest { assertAll("Generic Application Properties", () -> assertEquals(30, genericProperties.getPipelinesPollingIntervalSec()), () -> assertEquals(15, genericProperties.getPipelinesTimeoutSec()), + () -> assertEquals("1.0.0", genericProperties.getPolicyVersion()), + () -> assertEquals("VM", genericProperties.getClTargetType()), + () -> assertEquals("ONSET", genericProperties.getClEventStatus()), + () -> assertEquals("1.0.2", genericProperties.getClVersion()), + () -> assertEquals("vserver.vserver-name", genericProperties.getClTarget()), + () -> assertEquals("DCAE-bbs-event-processor", genericProperties.getClOriginator()), () -> assertEquals("reRegPolicyScope", genericProperties.getReRegistration().getPolicyScope()), () -> assertEquals("cpeAuthPolicyScope", genericProperties.getCpeAuthentication().getPolicyScope()), diff --git a/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/pipelines/CpeAuthenticationPipelineTest.java b/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/pipelines/CpeAuthenticationPipelineTest.java index dbac5bf2..76d9659c 100644 --- a/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/pipelines/CpeAuthenticationPipelineTest.java +++ b/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/pipelines/CpeAuthenticationPipelineTest.java @@ -98,6 +98,18 @@ class CpeAuthenticationPipelineTest { .thenReturn("controlName"); when(configuration.getCpeAuthenticationCloseLoopPolicyScope()) .thenReturn("policyScope"); + when(configuration.getPolicyVersion()) + .thenReturn("1.0.0"); + when(configuration.getCloseLoopTargetType()) + .thenReturn("VM"); + when(configuration.getCloseLoopEventStatus()) + .thenReturn("ONSET"); + when(configuration.getCloseLoopVersion()) + .thenReturn("1.0.2"); + when(configuration.getCloseLoopTarget()) + .thenReturn("CL-Target"); + when(configuration.getCloseLoopOriginator()) + .thenReturn("DCAE-BBS-ep"); pipeline = new CpeAuthenticationPipeline(configuration, consumerTask, publisherTask, aaiClientTask, new HashMap<>()); diff --git a/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/pipelines/ReRegistrationPipelineTest.java b/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/pipelines/ReRegistrationPipelineTest.java index dbd1aab1..a1b6b148 100644 --- a/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/pipelines/ReRegistrationPipelineTest.java +++ b/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/pipelines/ReRegistrationPipelineTest.java @@ -98,6 +98,18 @@ class ReRegistrationPipelineTest { .thenReturn("controlName"); when(configuration.getReRegistrationCloseLoopPolicyScope()) .thenReturn("policyScope"); + when(configuration.getPolicyVersion()) + .thenReturn("1.0.0"); + when(configuration.getCloseLoopTargetType()) + .thenReturn("VM"); + when(configuration.getCloseLoopEventStatus()) + .thenReturn("ONSET"); + when(configuration.getCloseLoopVersion()) + .thenReturn("1.0.2"); + when(configuration.getCloseLoopTarget()) + .thenReturn("CL-Target"); + when(configuration.getCloseLoopOriginator()) + .thenReturn("DCAE-BBS-ep"); pipeline = new ReRegistrationPipeline(configuration, consumerTask, publisherTask, aaiClientTask, new HashMap<>()); -- cgit 1.2.3-korg