summaryrefslogtreecommitdiffstats
path: root/components/bbs-event-processor/src/main/java/org
diff options
context:
space:
mode:
authorStavros Kanarakis <stavros.kanarakis@nokia.com>2020-02-28 21:53:04 +0200
committerStavros Kanarakis <stavros.kanarakis@nokia.com>2020-03-03 17:30:22 +0200
commit22025d4ddfcccd86a2f93be7dadea9735e4b4528 (patch)
tree6b12e0770f65cb6a7389c0300fe95454ac1717fb /components/bbs-event-processor/src/main/java/org
parent82a39f7da3177a9b9b700c7291ed5ea47c90e478 (diff)
Upgrade of BBS-ep service
Upgraded service to use latest DCAE-SDK Upgraded many of the dependencies to latest versions Introduced Java 11 Change-Id: I29d265d2a75aa80749f567cfb10920b2c45c2cec Issue-ID: DCAEGEN2-2105 Signed-off-by: Stavros Kanarakis <stavros.kanarakis@nokia.com>
Diffstat (limited to 'components/bbs-event-processor/src/main/java/org')
-rw-r--r--components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/Application.java2
-rw-r--r--components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/config/ApplicationConfiguration.java164
-rw-r--r--components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/config/ApplicationConstants.java4
-rw-r--r--components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/config/ConsulConfigurationGateway.java113
-rw-r--r--components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/config/MessageRouterConfig.java48
-rw-r--r--components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/exceptions/EmptyDmaapResponseException.java4
-rw-r--r--components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/model/ControlLoopPublisherDmaapModel.java3
-rw-r--r--components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/model/CpeAuthenticationConsumerDmaapModel.java4
-rw-r--r--components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/model/PnfAaiObject.java3
-rw-r--r--components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/model/ReRegistrationConsumerDmaapModel.java4
-rw-r--r--components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/pipelines/CpeAuthenticationPipeline.java66
-rw-r--r--components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/pipelines/ReRegistrationPipeline.java53
-rw-r--r--components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/pipelines/Scheduler.java16
-rw-r--r--components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapCpeAuthenticationConsumerTaskImpl.java85
-rw-r--r--components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapPublisherTask.java6
-rw-r--r--components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapPublisherTaskImpl.java72
-rw-r--r--components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapReRegistrationConsumerTaskImpl.java78
-rw-r--r--components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/AaiReactiveClient.java43
-rw-r--r--components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/CommonEventFields.java2
-rw-r--r--components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/ControlLoopJsonBodyBuilder.java33
-rw-r--r--components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/CpeAuthenticationDmaapConsumerJsonParser.java12
-rw-r--r--components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/CpeAuthenticationJsonBodyBuilder.java6
-rw-r--r--components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/GenericUtils.java98
-rw-r--r--components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/ReRegistrationDmaapConsumerJsonParser.java8
-rw-r--r--components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/ReRegistrationJsonBodyBuilder.java6
25 files changed, 522 insertions, 411 deletions
diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/Application.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/Application.java
index 66e1f86d..94b955f1 100644
--- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/Application.java
+++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/Application.java
@@ -71,7 +71,7 @@ public class Application {
@Bean
TaskScheduler threadPoolTaskScheduler() {
- ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
+ var scheduler = new ThreadPoolTaskScheduler();
scheduler.setPoolSize(THREADS_IN_POOL);
scheduler.setThreadNamePrefix("pipeline-thrd-");
return scheduler;
diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/config/ApplicationConfiguration.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/config/ApplicationConfiguration.java
index 5022a693..33a935e5 100644
--- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/config/ApplicationConfiguration.java
+++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/config/ApplicationConfiguration.java
@@ -21,7 +21,10 @@
package org.onap.bbs.event.processor.config;
import static org.onap.bbs.event.processor.config.ApplicationConstants.STREAMS_TYPE;
+import static org.onap.bbs.event.processor.utilities.GenericUtils.keyStoreFromResource;
+import static org.onap.dcaegen2.services.sdk.security.ssl.Passwords.fromPath;
+import java.nio.file.Paths;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
@@ -31,10 +34,12 @@ import org.onap.bbs.event.processor.exceptions.ApplicationEnvironmentException;
import org.onap.bbs.event.processor.exceptions.ConfigurationParsingException;
import org.onap.bbs.event.processor.model.GeneratedAppConfigObject;
import org.onap.bbs.event.processor.utilities.LoggingUtil;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapConsumerConfiguration;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapPublisherConfiguration;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.ImmutableMessageRouterPublisherConfig;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.ImmutableMessageRouterSubscriberConfig;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterPublisherConfig;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterSubscriberConfig;
+import org.onap.dcaegen2.services.sdk.security.ssl.ImmutableSecurityKeys;
+import org.onap.dcaegen2.services.sdk.security.ssl.SecurityKeys;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
@@ -48,9 +53,9 @@ public class ApplicationConfiguration implements ConfigurationChangeObservable {
private final SecurityProperties securityProperties;
private final GenericProperties genericProperties;
- private DmaapConsumerConfiguration dmaapReRegistrationConsumerConfiguration;
- private DmaapConsumerConfiguration dmaapCpeAuthenticationConsumerConfiguration;
- private DmaapPublisherConfiguration dmaapPublisherConfiguration;
+ private MessageRouterSubscriberConfig dmaapReRegistrationConsumerConfiguration;
+ private MessageRouterSubscriberConfig dmaapCpeAuthenticationConsumerConfiguration;
+ private MessageRouterPublisherConfig dmaapPublisherConfiguration;
private AaiClientConfiguration aaiClientConfiguration;
private Set<ConfigurationChangeObserver> observers;
@@ -97,15 +102,15 @@ public class ApplicationConfiguration implements ConfigurationChangeObservable {
observers.forEach(ConfigurationChangeObserver::updateConfiguration);
}
- public synchronized DmaapConsumerConfiguration getDmaapReRegistrationConsumerConfiguration() {
+ public synchronized MessageRouterSubscriberConfig getDmaapReRegistrationConsumerConfiguration() {
return dmaapReRegistrationConsumerConfiguration;
}
- public synchronized DmaapConsumerConfiguration getDmaapCpeAuthenticationConsumerConfiguration() {
+ public synchronized MessageRouterSubscriberConfig getDmaapCpeAuthenticationConsumerConfiguration() {
return dmaapCpeAuthenticationConsumerConfiguration;
}
- public synchronized DmaapPublisherConfiguration getDmaapPublisherConfiguration() {
+ public synchronized MessageRouterPublisherConfig getDmaapPublisherConfiguration() {
return dmaapPublisherConfiguration;
}
@@ -117,6 +122,18 @@ public class ApplicationConfiguration implements ConfigurationChangeObservable {
return genericProperties.getPipelinesPollingIntervalSec();
}
+ public synchronized DmaapProducerProperties getDmaapProducerProperties() {
+ return dmaapProducerProperties;
+ }
+
+ public synchronized DmaapReRegistrationConsumerProperties getDmaapReRegistrationConsumerProperties() {
+ return dmaapReRegistrationConsumerProperties;
+ }
+
+ public synchronized DmaapCpeAuthenticationConsumerProperties getDmaapCpeAuthenticationConsumerProperties() {
+ return dmaapCpeAuthenticationConsumerProperties;
+ }
+
public synchronized int getPipelinesTimeoutInSeconds() {
return genericProperties.getPipelinesTimeoutSec();
}
@@ -180,11 +197,17 @@ public class ApplicationConfiguration implements ConfigurationChangeObservable {
securityProperties.setKeyStorePasswordPath(newConfiguration.keyStorePasswordPath());
securityProperties.setTrustStorePath(newConfiguration.trustStorePath());
securityProperties.setTrustStorePasswordPath(newConfiguration.trustStorePasswordPath());
-
- GeneratedAppConfigObject.StreamsObject reRegObject =
+ final SecurityKeys securityKeys = ImmutableSecurityKeys.builder()
+ .keyStore(keyStoreFromResource(securityProperties.getKeyStorePath()))
+ .keyStorePassword(fromPath(Paths.get(securityProperties.getKeyStorePasswordPath())))
+ .trustStore(keyStoreFromResource(securityProperties.getTrustStorePath()))
+ .trustStorePassword(fromPath(Paths.get(securityProperties.getTrustStorePasswordPath())))
+ .build();
+
+ var reRegObject =
getStreamsObject(newConfiguration.streamSubscribesMap(), newConfiguration.reRegConfigKey(),
"PNF Re-Registration");
- TopicUrlInfo topicUrlInfo = parseTopicUrl(reRegObject.dmaapInfo().topicUrl());
+ var topicUrlInfo = parseTopicUrl(reRegObject.dmaapInfo().topicUrl());
dmaapReRegistrationConsumerProperties.setDmaapHostName(topicUrlInfo.getHost());
dmaapReRegistrationConsumerProperties.setDmaapPortNumber(topicUrlInfo.getPort());
dmaapReRegistrationConsumerProperties.setDmaapProtocol(newConfiguration.dmaapProtocol());
@@ -196,9 +219,9 @@ public class ApplicationConfiguration implements ConfigurationChangeObservable {
dmaapReRegistrationConsumerProperties.setConsumerGroup(newConfiguration.dmaapConsumerConsumerGroup());
dmaapReRegistrationConsumerProperties.setMessageLimit(newConfiguration.dmaapMessageLimit());
dmaapReRegistrationConsumerProperties.setTimeoutMs(newConfiguration.dmaapTimeoutMs());
- constructDmaapReRegistrationConfiguration();
+ constructDmaapReRegistrationConfiguration(securityKeys);
- GeneratedAppConfigObject.StreamsObject cpeAuthObject =
+ var cpeAuthObject =
getStreamsObject(newConfiguration.streamSubscribesMap(), newConfiguration.cpeAuthConfigKey(),
"CPE Authentication");
topicUrlInfo = parseTopicUrl(cpeAuthObject.dmaapInfo().topicUrl());
@@ -213,9 +236,9 @@ public class ApplicationConfiguration implements ConfigurationChangeObservable {
dmaapCpeAuthenticationConsumerProperties.setConsumerGroup(newConfiguration.dmaapConsumerConsumerGroup());
dmaapCpeAuthenticationConsumerProperties.setMessageLimit(newConfiguration.dmaapMessageLimit());
dmaapCpeAuthenticationConsumerProperties.setTimeoutMs(newConfiguration.dmaapTimeoutMs());
- constructDmaapCpeAuthenticationConfiguration();
+ constructDmaapCpeAuthenticationConfiguration(securityKeys);
- GeneratedAppConfigObject.StreamsObject closeLoopObject =
+ var closeLoopObject =
getStreamsObject(newConfiguration.streamPublishesMap(), newConfiguration.closeLoopConfigKey(),
"Close Loop");
topicUrlInfo = parseTopicUrl(closeLoopObject.dmaapInfo().topicUrl());
@@ -226,7 +249,7 @@ public class ApplicationConfiguration implements ConfigurationChangeObservable {
dmaapProducerProperties.setDmaapUserPassword(closeLoopObject.aafPassword());
dmaapProducerProperties.setDmaapContentType(newConfiguration.dmaapContentType());
dmaapProducerProperties.setDmaapTopicName(topicUrlInfo.getTopicName());
- constructDmaapProducerConfiguration();
+ constructDmaapProducerConfiguration(securityKeys);
aaiClientProperties.setAaiHost(newConfiguration.aaiHost());
aaiClientProperties.setAaiPort(newConfiguration.aaiPort());
@@ -258,7 +281,7 @@ public class ApplicationConfiguration implements ConfigurationChangeObservable {
@NotNull
private GeneratedAppConfigObject.StreamsObject getStreamsObject(
Map<String, GeneratedAppConfigObject.StreamsObject> map, String configKey, String messageName) {
- GeneratedAppConfigObject.StreamsObject streamsObject = map.get(configKey);
+ var streamsObject = map.get(configKey);
if (!STREAMS_TYPE.equals(streamsObject.type())) {
throw new ApplicationEnvironmentException(String.format("%s requires information about"
+ " message-router topic in ONAP", messageName));
@@ -267,80 +290,45 @@ public class ApplicationConfiguration implements ConfigurationChangeObservable {
}
private void constructConfigurationObjects() {
- constructDmaapReRegistrationConfiguration();
- constructDmaapCpeAuthenticationConfiguration();
- constructDmaapProducerConfiguration();
+ final SecurityKeys securityKeysForReRegistration = ImmutableSecurityKeys.builder()
+ .keyStore(keyStoreFromResource(securityProperties.getKeyStorePath()))
+ .keyStorePassword(fromPath(Paths.get(securityProperties.getKeyStorePasswordPath())))
+ .trustStore(keyStoreFromResource(securityProperties.getTrustStorePath()))
+ .trustStorePassword(fromPath(Paths.get(securityProperties.getTrustStorePasswordPath())))
+ .build();
+ constructDmaapReRegistrationConfiguration(securityKeysForReRegistration);
+ final SecurityKeys securityKeysForCpeAuthentication = ImmutableSecurityKeys.builder()
+ .keyStore(keyStoreFromResource(securityProperties.getKeyStorePath()))
+ .keyStorePassword(fromPath(Paths.get(securityProperties.getKeyStorePasswordPath())))
+ .trustStore(keyStoreFromResource(securityProperties.getTrustStorePath()))
+ .trustStorePassword(fromPath(Paths.get(securityProperties.getTrustStorePasswordPath())))
+ .build();
+ constructDmaapCpeAuthenticationConfiguration(securityKeysForCpeAuthentication);
+ final SecurityKeys securityKeysForProducer = ImmutableSecurityKeys.builder()
+ .keyStore(keyStoreFromResource(securityProperties.getKeyStorePath()))
+ .keyStorePassword(fromPath(Paths.get(securityProperties.getKeyStorePasswordPath())))
+ .trustStore(keyStoreFromResource(securityProperties.getTrustStorePath()))
+ .trustStorePassword(fromPath(Paths.get(securityProperties.getTrustStorePasswordPath())))
+ .build();
+ constructDmaapProducerConfiguration(securityKeysForProducer);
constructAaiConfiguration();
}
- private void constructDmaapReRegistrationConfiguration() {
- dmaapReRegistrationConsumerConfiguration = new ImmutableDmaapConsumerConfiguration.Builder()
- .dmaapHostName(dmaapReRegistrationConsumerProperties.getDmaapHostName())
- .dmaapPortNumber(dmaapReRegistrationConsumerProperties.getDmaapPortNumber())
- .dmaapProtocol(dmaapReRegistrationConsumerProperties.getDmaapProtocol())
- .dmaapTopicName(dmaapReRegistrationConsumerProperties.getDmaapTopicName())
- .dmaapUserName(
- dmaapReRegistrationConsumerProperties.getDmaapUserName() == null ? "" :
- dmaapReRegistrationConsumerProperties.getDmaapUserName())
- .dmaapUserPassword(
- dmaapReRegistrationConsumerProperties.getDmaapUserPassword() == null ? "" :
- dmaapReRegistrationConsumerProperties.getDmaapUserPassword())
- .dmaapContentType(dmaapReRegistrationConsumerProperties.getDmaapContentType())
- .consumerId(dmaapReRegistrationConsumerProperties.getConsumerId())
- .consumerGroup(dmaapReRegistrationConsumerProperties.getConsumerGroup())
- .timeoutMs(dmaapReRegistrationConsumerProperties.getTimeoutMs())
- .messageLimit(dmaapReRegistrationConsumerProperties.getMessageLimit())
- .enableDmaapCertAuth(securityProperties.isEnableDmaapCertAuth())
- .keyStorePath(securityProperties.getKeyStorePath())
- .keyStorePasswordPath(securityProperties.getKeyStorePasswordPath())
- .trustStorePath(securityProperties.getTrustStorePath())
- .trustStorePasswordPath(securityProperties.getTrustStorePasswordPath())
+ private void constructDmaapReRegistrationConfiguration(final SecurityKeys securityKeys) {
+ dmaapReRegistrationConsumerConfiguration = ImmutableMessageRouterSubscriberConfig.builder()
+ .securityKeys(securityKeys)
.build();
}
- private void constructDmaapCpeAuthenticationConfiguration() {
- dmaapCpeAuthenticationConsumerConfiguration = new ImmutableDmaapConsumerConfiguration.Builder()
- .dmaapHostName(dmaapCpeAuthenticationConsumerProperties.getDmaapHostName())
- .dmaapPortNumber(dmaapCpeAuthenticationConsumerProperties.getDmaapPortNumber())
- .dmaapProtocol(dmaapCpeAuthenticationConsumerProperties.getDmaapProtocol())
- .dmaapTopicName(dmaapCpeAuthenticationConsumerProperties.getDmaapTopicName())
- .dmaapUserName(
- dmaapCpeAuthenticationConsumerProperties.getDmaapUserName() == null ? "" :
- dmaapCpeAuthenticationConsumerProperties.getDmaapUserName())
- .dmaapUserPassword(
- dmaapCpeAuthenticationConsumerProperties.getDmaapUserPassword() == null ? "" :
- dmaapCpeAuthenticationConsumerProperties.getDmaapUserPassword())
- .dmaapContentType(dmaapCpeAuthenticationConsumerProperties.getDmaapContentType())
- .consumerId(dmaapCpeAuthenticationConsumerProperties.getConsumerId())
- .consumerGroup(dmaapCpeAuthenticationConsumerProperties.getConsumerGroup())
- .timeoutMs(dmaapCpeAuthenticationConsumerProperties.getTimeoutMs())
- .messageLimit(dmaapCpeAuthenticationConsumerProperties.getMessageLimit())
- .enableDmaapCertAuth(securityProperties.isEnableDmaapCertAuth())
- .keyStorePath(securityProperties.getKeyStorePath())
- .keyStorePasswordPath(securityProperties.getKeyStorePasswordPath())
- .trustStorePath(securityProperties.getTrustStorePath())
- .trustStorePasswordPath(securityProperties.getTrustStorePasswordPath())
+ private void constructDmaapCpeAuthenticationConfiguration(final SecurityKeys securityKeys) {
+ dmaapCpeAuthenticationConsumerConfiguration = ImmutableMessageRouterSubscriberConfig.builder()
+ .securityKeys(securityKeys)
.build();
}
- private void constructDmaapProducerConfiguration() {
- dmaapPublisherConfiguration = new ImmutableDmaapPublisherConfiguration.Builder()
- .dmaapHostName(dmaapProducerProperties.getDmaapHostName())
- .dmaapPortNumber(dmaapProducerProperties.getDmaapPortNumber())
- .dmaapProtocol(dmaapProducerProperties.getDmaapProtocol())
- .dmaapTopicName(dmaapProducerProperties.getDmaapTopicName())
- .dmaapUserName(
- dmaapProducerProperties.getDmaapUserName() == null ? "" :
- dmaapProducerProperties.getDmaapUserName())
- .dmaapUserPassword(
- dmaapProducerProperties.getDmaapUserPassword() == null ? "" :
- dmaapProducerProperties.getDmaapUserPassword())
- .dmaapContentType(dmaapProducerProperties.getDmaapContentType())
- .enableDmaapCertAuth(securityProperties.isEnableDmaapCertAuth())
- .keyStorePath(securityProperties.getKeyStorePath())
- .keyStorePasswordPath(securityProperties.getKeyStorePasswordPath())
- .trustStorePath(securityProperties.getTrustStorePath())
- .trustStorePasswordPath(securityProperties.getTrustStorePasswordPath())
+ private void constructDmaapProducerConfiguration(final SecurityKeys securityKeys) {
+ dmaapPublisherConfiguration = ImmutableMessageRouterPublisherConfig.builder()
+ .securityKeys(securityKeys)
.build();
}
@@ -362,19 +350,19 @@ public class ApplicationConfiguration implements ConfigurationChangeObservable {
}
private TopicUrlInfo parseTopicUrl(String topicUrl) {
- String[] urlTokens = topicUrl.split(":");
+ var urlTokens = topicUrl.split(":");
if (urlTokens.length != 3) {
throw new ConfigurationParsingException("Wrong topic URL format");
}
- TopicUrlInfo topicUrlInfo = new TopicUrlInfo();
+ var topicUrlInfo = new TopicUrlInfo();
topicUrlInfo.setHost(urlTokens[1].replace("/", ""));
- String[] tokensAfterHost = urlTokens[2].split("/events/");
+ var tokensAfterHost = urlTokens[2].split("/events/");
if (tokensAfterHost.length != 2) {
throw new ConfigurationParsingException("Wrong topic name structure");
}
topicUrlInfo.setPort(Integer.valueOf(tokensAfterHost[0]));
- topicUrlInfo.setTopicName("events/" + tokensAfterHost[1]);
+ topicUrlInfo.setTopicName(tokensAfterHost[1]);
return topicUrlInfo;
}
diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/config/ApplicationConstants.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/config/ApplicationConstants.java
index d2ac3c10..7be6bdea 100644
--- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/config/ApplicationConstants.java
+++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/config/ApplicationConstants.java
@@ -35,5 +35,9 @@ public class ApplicationConstants {
// Close Loop Constants
public static final String DCAE_BBS_EVENT_PROCESSOR_MS_INSTANCE = "DCAE.BBS_event_processor_mSInstance";
+ // Subscribe for DMaaP
+ public static final String SUBSCRIBE_URL_TEMPLATE = "%s://%s:%d/events/%s";
+ public static final String PUBLISH_URL_TEMPLATE = "%s://%s:%d/events/%s";
+
private ApplicationConstants() {}
} \ No newline at end of file
diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/config/ConsulConfigurationGateway.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/config/ConsulConfigurationGateway.java
index 607b3b31..4ac85495 100644
--- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/config/ConsulConfigurationGateway.java
+++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/config/ConsulConfigurationGateway.java
@@ -40,8 +40,7 @@ import org.onap.bbs.event.processor.model.ImmutableGeneratedAppConfigObject;
import org.onap.bbs.event.processor.model.ImmutableStreamsObject;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClientFactory;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsRequests;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.CbsRequest;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperties;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.CbsClientConfiguration;
import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -85,9 +84,9 @@ public class ConsulConfigurationGateway {
}
boolean environmentNotReady() {
- String consulHost = System.getenv().get(CONSUL_HOST);
- String cbs = System.getenv().get(CONFIG_BINDING_SERVICE);
- String hostname = System.getenv().get(HOSTNAME);
+ var consulHost = System.getenv().get(CONSUL_HOST);
+ var cbs = System.getenv().get(CONFIG_BINDING_SERVICE);
+ var hostname = System.getenv().get(HOSTNAME);
return consulHost == null || cbs == null || hostname == null;
}
@@ -106,7 +105,7 @@ public class ConsulConfigurationGateway {
private void parseConsulRetrievedConfiguration(JsonObject jsonObject) {
- GeneratedAppConfigObject generatedAppConfigObject = generateAppConfigObject(jsonObject);
+ var generatedAppConfigObject = generateAppConfigObject(jsonObject);
LOGGER.trace("Consul-Retrieved Application Generated Object:\n{}", generatedAppConfigObject);
configuration.updateCurrentConfiguration(generatedAppConfigObject);
}
@@ -124,10 +123,10 @@ public class ConsulConfigurationGateway {
RequestDiagnosticContext diagnosticContext = RequestDiagnosticContext.create();
// Necessary properties from the environment (Consul host:port, service-name (hostname), CBS name)
- EnvProperties env = EnvProperties.fromEnvironment();
- CbsRequest cbsRequest = CbsRequests.getConfiguration(diagnosticContext);
+ var cbsClientConfig = CbsClientConfiguration.fromEnvironment();
+ var cbsRequest = CbsRequests.getConfiguration(diagnosticContext);
// Create the client and use it to get the configuration
- cbsFetchPipeline = CbsClientFactory.createCbsClient(env)
+ cbsFetchPipeline = CbsClientFactory.createCbsClient(cbsClientConfig)
.doOnError(e -> LOGGER.warn("CBS Configuration fetch failed with error: {}", e))
.retry(e -> true)
.flatMapMany(cbsClient -> cbsClient.updates(cbsRequest, initialDelay, period))
@@ -138,57 +137,57 @@ public class ConsulConfigurationGateway {
GeneratedAppConfigObject generateAppConfigObject(JsonObject configObject) {
if (LOGGER.isInfoEnabled()) {
- String configAsString = gson.toJson(configObject);
+ var configAsString = gson.toJson(configObject);
LOGGER.info("Received App Config object\n{}", configAsString);
}
- final String dmaapProtocol = configObject.get("dmaap.protocol").getAsString();
- final String dmaapContentType = configObject.get("dmaap.contentType").getAsString();
- final String dmaapConsumerId = configObject.get("dmaap.consumer.consumerId").getAsString();
- final String dmaapConsumerGroup = configObject.get("dmaap.consumer.consumerGroup").getAsString();
- final int dmaapMessageLimit = configObject.get("dmaap.messageLimit").getAsInt();
- final int dmaapTimeoutMs = configObject.get("dmaap.timeoutMs").getAsInt();
-
- final String aaiHost = configObject.get("aai.host").getAsString();
- final int aaiPort = configObject.get("aai.port").getAsInt();
- final String aaiProtocol = configObject.get("aai.protocol").getAsString();
- final String aaiUsername = configObject.get("aai.username").getAsString();
- final String aaiPassword = configObject.get("aai.password").getAsString();
- final boolean aaiIgnoreSslCertificateErrors =
+ final var dmaapProtocol = configObject.get("dmaap.protocol").getAsString();
+ final var dmaapContentType = configObject.get("dmaap.contentType").getAsString();
+ final var dmaapConsumerId = configObject.get("dmaap.consumer.consumerId").getAsString();
+ final var dmaapConsumerGroup = configObject.get("dmaap.consumer.consumerGroup").getAsString();
+ final var dmaapMessageLimit = configObject.get("dmaap.messageLimit").getAsInt();
+ final var dmaapTimeoutMs = configObject.get("dmaap.timeoutMs").getAsInt();
+
+ final var aaiHost = configObject.get("aai.host").getAsString();
+ final var aaiPort = configObject.get("aai.port").getAsInt();
+ final var aaiProtocol = configObject.get("aai.protocol").getAsString();
+ final var aaiUsername = configObject.get("aai.username").getAsString();
+ final var aaiPassword = configObject.get("aai.password").getAsString();
+ final var aaiIgnoreSslCertificateErrors =
configObject.get("aai.aaiIgnoreSslCertificateErrors").getAsBoolean();
- final int pipelinesPollingIntervalSec = configObject.get("application.pipelinesPollingIntervalSec").getAsInt();
- final int pipelinesTimeoutSec = configObject.get("application.pipelinesTimeoutSec").getAsInt();
- final int cbsPollingIntervalSec = configObject.get("application.cbsPollingIntervalSec").getAsInt();
+ final var pipelinesPollingIntervalSec = configObject.get("application.pipelinesPollingIntervalSec").getAsInt();
+ final var pipelinesTimeoutSec = configObject.get("application.pipelinesTimeoutSec").getAsInt();
+ final var cbsPollingIntervalSec = configObject.get("application.cbsPollingIntervalSec").getAsInt();
- final String reRegPolicyScope = configObject.get("application.reregistration.policyScope").getAsString();
- final String reRegClControlName = configObject.get("application.reregistration.clControlName").getAsString();
- final String cpeAuthPolicyScope = configObject.get("application.cpe.authentication.policyScope").getAsString();
- final String cpeAuthClControlName =
+ final var reRegPolicyScope = configObject.get("application.reregistration.policyScope").getAsString();
+ final var reRegClControlName = configObject.get("application.reregistration.clControlName").getAsString();
+ final var cpeAuthPolicyScope = configObject.get("application.cpe.authentication.policyScope").getAsString();
+ final var cpeAuthClControlName =
configObject.get("application.cpe.authentication.clControlName").getAsString();
- final String policyVersion = configObject.get("application.policyVersion").getAsString();
- final String closeLoopTargetType = configObject.get("application.clTargetType").getAsString();
- final String closeLoopEventStatus = configObject.get("application.clEventStatus").getAsString();
- final String closeLoopVersion = configObject.get("application.clVersion").getAsString();
- final String closeLoopTarget = configObject.get("application.clTarget").getAsString();
- final String closeLoopOriginator = configObject.get("application.clOriginator").getAsString();
+ final var policyVersion = configObject.get("application.policyVersion").getAsString();
+ final var closeLoopTargetType = configObject.get("application.clTargetType").getAsString();
+ final var closeLoopEventStatus = configObject.get("application.clEventStatus").getAsString();
+ final var closeLoopVersion = configObject.get("application.clVersion").getAsString();
+ final var closeLoopTarget = configObject.get("application.clTarget").getAsString();
+ final var closeLoopOriginator = configObject.get("application.clOriginator").getAsString();
- final String reRegConfigKey = configObject.get("application.reregistration.configKey").getAsString();
- final String cpeAuthConfigKey = configObject.get("application.cpeAuth.configKey").getAsString();
- final String closeLoopConfigKey = configObject.get("application.closeLoop.configKey").getAsString();
+ final var reRegConfigKey = configObject.get("application.reregistration.configKey").getAsString();
+ final var cpeAuthConfigKey = configObject.get("application.cpeAuth.configKey").getAsString();
+ final var closeLoopConfigKey = configObject.get("application.closeLoop.configKey").getAsString();
- final String loggingLevel = configObject.get("application.loggingLevel").getAsString();
+ final var loggingLevel = configObject.get("application.loggingLevel").getAsString();
- final String keyStorePath = configObject.get("application.ssl.keyStorePath").getAsString();
- final String keyStorePasswordPath = configObject.get("application.ssl.keyStorePasswordPath").getAsString();
- final String trustStorePath = configObject.get("application.ssl.trustStorePath").getAsString();
- final String trustStorePasswordPath = configObject.get("application.ssl.trustStorePasswordPath").getAsString();
- final boolean aaiEnableCertAuth = configObject.get("application.ssl.enableAaiCertAuth").getAsBoolean();
- final boolean dmaapEnableCertAuth = configObject.get("application.ssl.enableDmaapCertAuth").getAsBoolean();
+ final var keyStorePath = configObject.get("application.ssl.keyStorePath").getAsString();
+ final var keyStorePasswordPath = configObject.get("application.ssl.keyStorePasswordPath").getAsString();
+ final var trustStorePath = configObject.get("application.ssl.trustStorePath").getAsString();
+ final var trustStorePasswordPath = configObject.get("application.ssl.trustStorePasswordPath").getAsString();
+ final var aaiEnableCertAuth = configObject.get("application.ssl.enableAaiCertAuth").getAsBoolean();
+ final var dmaapEnableCertAuth = configObject.get("application.ssl.enableDmaapCertAuth").getAsBoolean();
- final JsonObject streamsPublishes = configObject.getAsJsonObject("streams_publishes");
- final JsonObject streamsSubscribes = configObject.getAsJsonObject("streams_subscribes");
+ final var streamsPublishes = configObject.getAsJsonObject("streams_publishes");
+ final var streamsSubscribes = configObject.getAsJsonObject("streams_subscribes");
return ImmutableGeneratedAppConfigObject.builder()
.dmaapProtocol(dmaapProtocol)
@@ -259,22 +258,22 @@ public class ConsulConfigurationGateway {
private Map.Entry<String, GeneratedAppConfigObject.StreamsObject> parseStreamsSingleObject(
Map.Entry<String, JsonElement> jsonEntry) {
- JsonObject closeLoopOutput = (JsonObject) jsonEntry.getValue();
+ var closeLoopOutput = (JsonObject) jsonEntry.getValue();
- String type = closeLoopOutput.get("type").getAsString();
- String aafUsername = closeLoopOutput.get("aaf_username") != null
+ var type = closeLoopOutput.get("type").getAsString();
+ var aafUsername = closeLoopOutput.get("aaf_username") != null
? closeLoopOutput.get("aaf_username").getAsString() : "";
- String aafPassword = closeLoopOutput.get("aaf_password") != null
+ var aafPassword = closeLoopOutput.get("aaf_password") != null
? closeLoopOutput.get("aaf_password").getAsString() : "";
- JsonObject dmaapInfo = closeLoopOutput.getAsJsonObject("dmaap_info");
- String clientId = dmaapInfo.get("client_id") != null
+ var dmaapInfo = closeLoopOutput.getAsJsonObject("dmaap_info");
+ var clientId = dmaapInfo.get("client_id") != null
? dmaapInfo.get("client_id").getAsString() : "";
- String clientRole = dmaapInfo.get("client_role") != null
+ var clientRole = dmaapInfo.get("client_role") != null
? dmaapInfo.get("client_role").getAsString() : "";
- String location = dmaapInfo.get("location") != null
+ var location = dmaapInfo.get("location") != null
? dmaapInfo.get("location").getAsString() : "";
- String topicUrl = dmaapInfo.get("topic_url").getAsString();
+ var topicUrl = dmaapInfo.get("topic_url").getAsString();
GeneratedAppConfigObject.DmaapInfo dmaapInfoObject = ImmutableDmaapInfo.builder()
.clientId(clientId)
diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/config/MessageRouterConfig.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/config/MessageRouterConfig.java
new file mode 100644
index 00000000..b0ecfd51
--- /dev/null
+++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/config/MessageRouterConfig.java
@@ -0,0 +1,48 @@
+/*
+ * ============LICENSE_START=======================================================
+ * BBS-RELOCATION-CPE-AUTHENTICATION-HANDLER
+ * ================================================================================
+ * Copyright (C) 2019 NOKIA Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+*/
+
+package org.onap.bbs.event.processor.config;
+
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DmaapClientFactory;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterPublisher;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterSubscriber;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+@Configuration
+public class MessageRouterConfig {
+
+ @Bean(name = "ReRegMessageRouterSubscriber")
+ public MessageRouterSubscriber reRegistrationMessageRouterSubscriber(ApplicationConfiguration configuration) {
+ return DmaapClientFactory
+ .createMessageRouterSubscriber(configuration.getDmaapReRegistrationConsumerConfiguration());
+ }
+
+ @Bean(name = "CpeAuthMessageRouterSubscriber")
+ public MessageRouterSubscriber registrationMessageRouterSubscriber(ApplicationConfiguration configuration) {
+ return DmaapClientFactory
+ .createMessageRouterSubscriber(configuration.getDmaapCpeAuthenticationConsumerConfiguration());
+ }
+
+ @Bean
+ public MessageRouterPublisher mrPub(ApplicationConfiguration configuration) {
+ return DmaapClientFactory.createMessageRouterPublisher(configuration.getDmaapPublisherConfiguration());
+ }
+}
diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/exceptions/EmptyDmaapResponseException.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/exceptions/EmptyDmaapResponseException.java
index 1be4f43a..e2e49797 100644
--- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/exceptions/EmptyDmaapResponseException.java
+++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/exceptions/EmptyDmaapResponseException.java
@@ -22,10 +22,6 @@ package org.onap.bbs.event.processor.exceptions;
public class EmptyDmaapResponseException extends RuntimeException {
- public EmptyDmaapResponseException() {
- super();
- }
-
public EmptyDmaapResponseException(String message) {
super(message);
}
diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/model/ControlLoopPublisherDmaapModel.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/model/ControlLoopPublisherDmaapModel.java
index 8fa73e42..c952e9bd 100644
--- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/model/ControlLoopPublisherDmaapModel.java
+++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/model/ControlLoopPublisherDmaapModel.java
@@ -26,11 +26,10 @@ import java.util.Map;
import org.immutables.gson.Gson;
import org.immutables.value.Value;
-import org.onap.dcaegen2.services.sdk.rest.services.model.DmaapModel;
@Value.Immutable
@Gson.TypeAdapters(fieldNamingStrategy = true)
-public interface ControlLoopPublisherDmaapModel extends DmaapModel {
+public interface ControlLoopPublisherDmaapModel {
@SerializedName(value = "closedLoopEventClient", alternate = "closedLoopEventClient")
String getClosedLoopEventClient();
diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/model/CpeAuthenticationConsumerDmaapModel.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/model/CpeAuthenticationConsumerDmaapModel.java
index 42c9896f..50ed66ef 100644
--- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/model/CpeAuthenticationConsumerDmaapModel.java
+++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/model/CpeAuthenticationConsumerDmaapModel.java
@@ -26,12 +26,10 @@ import java.util.Optional;
import org.immutables.gson.Gson;
import org.immutables.value.Value;
-import org.onap.dcaegen2.services.sdk.rest.services.model.AaiModel;
-import org.onap.dcaegen2.services.sdk.rest.services.model.DmaapModel;
@Value.Immutable
@Gson.TypeAdapters(fieldNamingStrategy = true)
-public interface CpeAuthenticationConsumerDmaapModel extends AaiModel, DmaapModel {
+public interface CpeAuthenticationConsumerDmaapModel {
@SerializedName(value = "correlationId", alternate = "correlationId")
String getCorrelationId();
diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/model/PnfAaiObject.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/model/PnfAaiObject.java
index 682c0641..f64f30e5 100644
--- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/model/PnfAaiObject.java
+++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/model/PnfAaiObject.java
@@ -26,11 +26,10 @@ import java.util.Optional;
import org.immutables.gson.Gson;
import org.immutables.value.Value;
-import org.onap.dcaegen2.services.sdk.rest.services.model.ClientModel;
@Value.Immutable
@Gson.TypeAdapters(fieldNamingStrategy = true, emptyAsNulls = true)
-public interface PnfAaiObject extends ClientModel {
+public interface PnfAaiObject {
@SerializedName(value = "pnf-name", alternate = "pnf-name")
String getPnfName();
diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/model/ReRegistrationConsumerDmaapModel.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/model/ReRegistrationConsumerDmaapModel.java
index 07fb75aa..3f58aa33 100644
--- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/model/ReRegistrationConsumerDmaapModel.java
+++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/model/ReRegistrationConsumerDmaapModel.java
@@ -24,12 +24,10 @@ import com.google.gson.annotations.SerializedName;
import org.immutables.gson.Gson;
import org.immutables.value.Value;
-import org.onap.dcaegen2.services.sdk.rest.services.model.AaiModel;
-import org.onap.dcaegen2.services.sdk.rest.services.model.DmaapModel;
@Value.Immutable
@Gson.TypeAdapters(fieldNamingStrategy = true)
-public interface ReRegistrationConsumerDmaapModel extends AaiModel, DmaapModel {
+public interface ReRegistrationConsumerDmaapModel {
@SerializedName(value = "correlationId", alternate = "correlationId")
String getCorrelationId();
diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/pipelines/CpeAuthenticationPipeline.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/pipelines/CpeAuthenticationPipeline.java
index a30903bb..2cab017f 100644
--- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/pipelines/CpeAuthenticationPipeline.java
+++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/pipelines/CpeAuthenticationPipeline.java
@@ -25,13 +25,11 @@ import static org.onap.bbs.event.processor.config.ApplicationConstants.DCAE_BBS_
import static org.onap.bbs.event.processor.config.ApplicationConstants.RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME;
import static org.onap.bbs.event.processor.config.ApplicationConstants.RETRIEVE_PNF_TASK_NAME;
import static org.onap.dcaegen2.services.sdk.rest.services.model.logging.MdcVariables.INSTANCE_UUID;
-import static org.onap.dcaegen2.services.sdk.rest.services.model.logging.MdcVariables.RESPONSE_CODE;
import java.time.Duration;
import java.time.Instant;
import java.util.HashMap;
import java.util.Map;
-import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.TimeoutException;
@@ -42,14 +40,13 @@ import org.onap.bbs.event.processor.exceptions.EmptyDmaapResponseException;
import org.onap.bbs.event.processor.model.ControlLoopPublisherDmaapModel;
import org.onap.bbs.event.processor.model.CpeAuthenticationConsumerDmaapModel;
import org.onap.bbs.event.processor.model.ImmutableControlLoopPublisherDmaapModel;
-import org.onap.bbs.event.processor.model.MetadataListAaiObject;
import org.onap.bbs.event.processor.model.PnfAaiObject;
import org.onap.bbs.event.processor.model.RelationshipListAaiObject;
import org.onap.bbs.event.processor.model.ServiceInstanceAaiObject;
import org.onap.bbs.event.processor.tasks.AaiClientTask;
import org.onap.bbs.event.processor.tasks.DmaapCpeAuthenticationConsumerTask;
import org.onap.bbs.event.processor.tasks.DmaapPublisherTask;
-import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpResponse;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
@@ -99,7 +96,7 @@ public class CpeAuthenticationPipeline {
LOGGER.trace("Reactive CPE Authentication pipeline subscribed - Execution started");
}
- Flux<HttpResponse> executePipeline() {
+ Flux<MessageRouterPublishResponse> executePipeline() {
return
// Consume CPE Authentication from DMaaP
consumeCpeAuthenticationFromDmaap()
@@ -111,12 +108,12 @@ public class CpeAuthenticationPipeline {
.flatMap(this::triggerPolicy);
}
- private void onSuccess(HttpResponse responseCode) {
- MDC.put(RESPONSE_CODE, String.valueOf(responseCode.statusCode()));
- LOGGER.info("CPE Authentication event successfully handled. "
- + "Publishing to DMaaP for Policy returned a status code of ({} {})",
- responseCode.statusCode(), responseCode.statusReason());
- MDC.remove(RESPONSE_CODE);
+ private void onSuccess(MessageRouterPublishResponse response) {
+ if (response.successful()) {
+ LOGGER.info("CPE Authentication event successfully handled. Published Policy event to DMaaP");
+ } else {
+ LOGGER.error("CPE Authentication event handling error [{}]", response.failReason());
+ }
}
private void onError(Throwable throwable) {
@@ -149,7 +146,7 @@ public class CpeAuthenticationPipeline {
.map(event -> {
// For each message, we have to keep separate state. This state will be enhanced
// in each step and handed off to the next processing step
- PipelineState state = new PipelineState();
+ var state = new PipelineState();
state.setCpeAuthenticationEvent(event);
return state;
});
@@ -161,9 +158,9 @@ public class CpeAuthenticationPipeline {
private Mono<PipelineState> fetchPnfFromAai(PipelineState state) {
- CpeAuthenticationConsumerDmaapModel vesEvent = state.getCpeAuthenticationEvent();
- String pnfName = vesEvent.getCorrelationId();
- String url = String.format("/aai/v14/network/pnfs/pnf/%s?depth=all", pnfName);
+ var vesEvent = state.getCpeAuthenticationEvent();
+ var pnfName = vesEvent.getCorrelationId();
+ var url = String.format("/aai/v14/network/pnfs/pnf/%s?depth=all", pnfName);
LOGGER.debug("Processing Step: Retrieve PNF. Url: ({})", url);
return aaiClientTask.executePnfRetrieval(RETRIEVE_PNF_TASK_NAME, url)
@@ -191,10 +188,10 @@ public class CpeAuthenticationPipeline {
return Mono.empty();
}
- PnfAaiObject pnf = state.getPnfAaiObject();
+ var pnf = state.getPnfAaiObject();
// Assuming that the PNF will only have a single service-instance relationship pointing
// towards the HSI CFS service
- String serviceInstanceId = pnf.getRelationshipListAaiObject().getRelationshipEntries()
+ var serviceInstanceId = pnf.getRelationshipListAaiObject().getRelationshipEntries()
.stream()
.filter(e -> "service-instance".equals(e.getRelatedTo()))
.flatMap(e -> e.getRelationshipData().stream())
@@ -208,7 +205,7 @@ public class CpeAuthenticationPipeline {
return Mono.empty();
}
- String url = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all",
+ var url = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all",
serviceInstanceId);
LOGGER.debug("Processing Step: Retrieve HSI CFS Service. Url: ({})", url);
return aaiClientTask.executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, url)
@@ -230,21 +227,13 @@ public class CpeAuthenticationPipeline {
});
}
- private Mono<HttpResponse> triggerPolicy(PipelineState state) {
+ private Flux<MessageRouterPublishResponse> triggerPolicy(PipelineState state) {
if (state == null || state.getHsiCfsServiceInstance() == null) {
- return Mono.empty();
- }
-
- // At this point, we must check if the PNF RGW MAC address matches the value extracted from VES event
- if (!isCorrectMacAddress(state)) {
- LOGGER.warn("Processing stopped. RGW MAC address taken from event ({}) "
- + "does not match with A&AI metadata corresponding value",
- state.getCpeAuthenticationEvent().getRgwMacAddress());
- return Mono.empty();
+ return Flux.empty();
}
- ControlLoopPublisherDmaapModel event = buildTriggeringPolicyEvent(state);
+ var event = buildTriggeringPolicyEvent(state);
return publisherTask.execute(event)
.timeout(Duration.ofSeconds(configuration.getPipelinesTimeoutInSeconds()))
.doOnError(TimeoutException.class,
@@ -256,30 +245,15 @@ public class CpeAuthenticationPipeline {
e -> Mono.empty());
}
-
- private boolean isCorrectMacAddress(PipelineState state) {
- // We need to check if the RGW MAC address received in VES event matches the one found in
- // HSIA CFS service (in its metadata section)
- Optional<MetadataListAaiObject> optionalMetadata = state.getHsiCfsServiceInstance()
- .getMetadataListAaiObject();
- String eventRgwMacAddress = state.getCpeAuthenticationEvent().getRgwMacAddress().orElse("");
- return optionalMetadata
- .map(list -> list.getMetadataEntries()
- .stream()
- .anyMatch(m -> "rgw-mac-address".equals(m.getMetaname())
- && m.getMetavalue().equals(eventRgwMacAddress)))
- .orElse(false);
- }
-
private ControlLoopPublisherDmaapModel buildTriggeringPolicyEvent(PipelineState state) {
- String cfsServiceInstanceId = state.getHsiCfsServiceInstance().getServiceInstanceId();
+ var cfsServiceInstanceId = state.getHsiCfsServiceInstance().getServiceInstanceId();
Map<String, String> enrichmentData = new HashMap<>();
enrichmentData.put("service-information.hsia-cfs-service-instance-id", cfsServiceInstanceId);
enrichmentData.put("cpe.old-authentication-state", state.cpeAuthenticationEvent.getOldAuthenticationState());
enrichmentData.put("cpe.new-authentication-state", state.cpeAuthenticationEvent.getNewAuthenticationState());
- String swVersion = state.getCpeAuthenticationEvent().getSwVersion().orElse("");
+ var swVersion = state.getCpeAuthenticationEvent().getSwVersion().orElse("");
if (!StringUtils.isEmpty(swVersion)) {
enrichmentData.put("cpe.swVersion", swVersion);
}
diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/pipelines/ReRegistrationPipeline.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/pipelines/ReRegistrationPipeline.java
index 33a9aea7..0bc5e353 100644
--- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/pipelines/ReRegistrationPipeline.java
+++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/pipelines/ReRegistrationPipeline.java
@@ -25,12 +25,10 @@ import static org.onap.bbs.event.processor.config.ApplicationConstants.DCAE_BBS_
import static org.onap.bbs.event.processor.config.ApplicationConstants.RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME;
import static org.onap.bbs.event.processor.config.ApplicationConstants.RETRIEVE_PNF_TASK_NAME;
import static org.onap.dcaegen2.services.sdk.rest.services.model.logging.MdcVariables.INSTANCE_UUID;
-import static org.onap.dcaegen2.services.sdk.rest.services.model.logging.MdcVariables.RESPONSE_CODE;
import java.time.Duration;
import java.time.Instant;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeoutException;
@@ -48,7 +46,7 @@ import org.onap.bbs.event.processor.model.ServiceInstanceAaiObject;
import org.onap.bbs.event.processor.tasks.AaiClientTask;
import org.onap.bbs.event.processor.tasks.DmaapPublisherTask;
import org.onap.bbs.event.processor.tasks.DmaapReRegistrationConsumerTask;
-import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpResponse;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
@@ -98,7 +96,7 @@ public class ReRegistrationPipeline {
LOGGER.trace("Reactive PNF Re-registration pipeline subscribed - Execution started");
}
- Flux<HttpResponse> executePipeline() {
+ Flux<MessageRouterPublishResponse> executePipeline() {
return
// Consume Re-Registration from DMaaP
consumeReRegistrationsFromDmaap()
@@ -110,12 +108,12 @@ public class ReRegistrationPipeline {
.flatMap(this::triggerPolicy);
}
- private void onSuccess(HttpResponse responseCode) {
- MDC.put(RESPONSE_CODE, String.valueOf(responseCode.statusCode()));
- LOGGER.info("PNF Re-Registration event successfully handled. "
- + "Publishing to DMaaP for Policy returned a status code of ({} {})",
- responseCode.statusCode(), responseCode.statusReason());
- MDC.remove(RESPONSE_CODE);
+ private void onSuccess(MessageRouterPublishResponse response) {
+ if (response.successful()) {
+ LOGGER.info("PNF Re-Registration event successfully handled. Published Policy event to DMaaP");
+ } else {
+ LOGGER.error("PNF Re-Registration event handling error [{}]", response.failReason());
+ }
}
private void onError(Throwable throwable) {
@@ -148,7 +146,7 @@ public class ReRegistrationPipeline {
.map(event -> {
// For each message, we have to keep separate state. This state will be enhanced
// in each step and handed off to the next processing step
- PipelineState state = new PipelineState();
+ var state = new PipelineState();
state.setReRegistrationEvent(event);
return state;
});
@@ -160,9 +158,9 @@ public class ReRegistrationPipeline {
private Mono<PipelineState> fetchPnfFromAai(PipelineState state) {
- ReRegistrationConsumerDmaapModel vesEvent = state.getReRegistrationEvent();
- String pnfName = vesEvent.getCorrelationId();
- String url = String.format("/aai/v14/network/pnfs/pnf/%s?depth=all", pnfName);
+ var vesEvent = state.getReRegistrationEvent();
+ var pnfName = vesEvent.getCorrelationId();
+ var url = String.format("/aai/v14/network/pnfs/pnf/%s?depth=all", pnfName);
LOGGER.debug("Processing Step: Retrieve PNF. Url: ({})", url);
return aaiClientTask.executePnfRetrieval(RETRIEVE_PNF_TASK_NAME, url)
@@ -196,10 +194,10 @@ public class ReRegistrationPipeline {
return Mono.empty();
}
- PnfAaiObject pnf = state.getPnfAaiObject();
+ var pnf = state.getPnfAaiObject();
// Assuming that the PNF will only have a single service-instance relationship pointing
// towards the HSI CFS service
- String serviceInstanceId = pnf.getRelationshipListAaiObject().getRelationshipEntries()
+ var serviceInstanceId = pnf.getRelationshipListAaiObject().getRelationshipEntries()
.stream()
.filter(e -> "service-instance".equals(e.getRelatedTo()))
.flatMap(e -> e.getRelationshipData().stream())
@@ -213,7 +211,7 @@ public class ReRegistrationPipeline {
return Mono.empty();
}
- String url = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all",
+ var url = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all",
serviceInstanceId);
LOGGER.debug("Processing Step: Retrieve HSI CFS Service. Url: ({})", url);
return aaiClientTask.executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, url)
@@ -236,8 +234,7 @@ public class ReRegistrationPipeline {
}
private boolean isNotReallyAnOntRelocation(PipelineState state) {
- List<RelationshipListAaiObject.RelationshipEntryAaiObject> relationshipEntries =
- state.getPnfAaiObject().getRelationshipListAaiObject().getRelationshipEntries();
+ var relationshipEntries = state.getPnfAaiObject().getRelationshipListAaiObject().getRelationshipEntries();
// If no logical-link, fail further processing
if (relationshipEntries.stream().noneMatch(e -> "logical-link".equals(e.getRelatedTo()))) {
@@ -247,7 +244,7 @@ public class ReRegistrationPipeline {
}
// Assuming PNF will only have one logical-link per BBS use case design
- boolean isNotRelocation = relationshipEntries
+ var isNotRelocation = relationshipEntries
.stream()
.filter(e -> "logical-link".equals(e.getRelatedTo()))
.flatMap(e -> e.getRelationshipData().stream())
@@ -263,13 +260,13 @@ public class ReRegistrationPipeline {
return isNotRelocation;
}
- private Mono<HttpResponse> triggerPolicy(PipelineState state) {
+ private Flux<MessageRouterPublishResponse> triggerPolicy(PipelineState state) {
if (state == null || state.getHsiCfsServiceInstance() == null) {
- return Mono.empty();
+ return Flux.empty();
}
- ControlLoopPublisherDmaapModel event = buildTriggeringPolicyEvent(state);
+ var event = buildTriggeringPolicyEvent(state);
return publisherTask.execute(event)
.timeout(Duration.ofSeconds(configuration.getPipelinesTimeoutInSeconds()))
.doOnError(TimeoutException.class,
@@ -283,12 +280,12 @@ public class ReRegistrationPipeline {
private ControlLoopPublisherDmaapModel buildTriggeringPolicyEvent(PipelineState state) {
- String cfsServiceInstanceId = state.getHsiCfsServiceInstance().getServiceInstanceId();
+ var cfsServiceInstanceId = state.getHsiCfsServiceInstance().getServiceInstanceId();
- String attachmentPoint = state.getReRegistrationEvent().getAttachmentPoint();
- String remoteId = state.getReRegistrationEvent().getRemoteId();
- String cvlan = state.getReRegistrationEvent().getCVlan();
- String svlan = state.getReRegistrationEvent().getSVlan();
+ var attachmentPoint = state.getReRegistrationEvent().getAttachmentPoint();
+ var remoteId = state.getReRegistrationEvent().getRemoteId();
+ var cvlan = state.getReRegistrationEvent().getCVlan();
+ var svlan = state.getReRegistrationEvent().getSVlan();
Map<String, String> enrichmentData = new HashMap<>();
enrichmentData.put("service-information.hsia-cfs-service-instance-id", cfsServiceInstanceId);
diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/pipelines/Scheduler.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/pipelines/Scheduler.java
index 5fbb0875..882635ce 100644
--- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/pipelines/Scheduler.java
+++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/pipelines/Scheduler.java
@@ -96,7 +96,7 @@ public class Scheduler implements ConfigurationChangeObserver {
LOGGER.info("BBS event processing pipelines will start in {} seconds "
+ "and will run periodically every {} seconds", PIPELINES_INITIAL_DELAY_IN_SECONDS,
currentPipelinesPollingInterval);
- Instant desiredStartTime = Instant.now().plusSeconds(PIPELINES_INITIAL_DELAY_IN_SECONDS);
+ var desiredStartTime = Instant.now().plusSeconds(PIPELINES_INITIAL_DELAY_IN_SECONDS);
scheduleProcessingTasks(desiredStartTime, currentPipelinesPollingInterval);
// Register for configuration changes
@@ -118,7 +118,7 @@ public class Scheduler implements ConfigurationChangeObserver {
cancelScheduledProcessingTasks();
reScheduleProcessingTasks();
}
- int newCbsPollingInterval = configuration.getCbsPollingInterval();
+ var newCbsPollingInterval = configuration.getCbsPollingInterval();
if (newCbsPollingInterval != currentCbsPollingInterval) {
if (newCbsPollingInterval < DEFAULT_CBS_POLLING_INTERVAL) {
LOGGER.warn("CBS Polling interval is too small ({}). Will not re-schedule CBS job",
@@ -222,9 +222,9 @@ public class Scheduler implements ConfigurationChangeObserver {
}
private int validatePipelinesPollingInterval() {
- int pipelinesPollingInterval = configuration.getPipelinesPollingIntervalInSeconds();
- boolean isSmallInterval = pipelinesPollingInterval < DEFAULT_PIPELINES_POLLING_INTERVAL;
- int verifiedInterval = isSmallInterval ? DEFAULT_PIPELINES_POLLING_INTERVAL : pipelinesPollingInterval;
+ var pipelinesPollingInterval = configuration.getPipelinesPollingIntervalInSeconds();
+ var isSmallInterval = pipelinesPollingInterval < DEFAULT_PIPELINES_POLLING_INTERVAL;
+ var verifiedInterval = isSmallInterval ? DEFAULT_PIPELINES_POLLING_INTERVAL : pipelinesPollingInterval;
if (isSmallInterval) {
LOGGER.warn("Pipelines Polling interval is too small ({}). Defaulting to {}", pipelinesPollingInterval,
DEFAULT_PIPELINES_POLLING_INTERVAL);
@@ -233,9 +233,9 @@ public class Scheduler implements ConfigurationChangeObserver {
}
private int verifyCbsPollingInterval() {
- int cbsPollingInterval = configuration.getCbsPollingInterval();
- boolean isSmallInterval = cbsPollingInterval < DEFAULT_CBS_POLLING_INTERVAL;
- int verifiedInterval = isSmallInterval ? DEFAULT_CBS_POLLING_INTERVAL : cbsPollingInterval;
+ var cbsPollingInterval = configuration.getCbsPollingInterval();
+ var isSmallInterval = cbsPollingInterval < DEFAULT_CBS_POLLING_INTERVAL;
+ var verifiedInterval = isSmallInterval ? DEFAULT_CBS_POLLING_INTERVAL : cbsPollingInterval;
if (isSmallInterval) {
LOGGER.warn("CBS Polling interval is too small ({}). Defaulting to {}", cbsPollingInterval,
DEFAULT_CBS_POLLING_INTERVAL);
diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapCpeAuthenticationConsumerTaskImpl.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapCpeAuthenticationConsumerTaskImpl.java
index da510281..153cb91b 100644
--- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapCpeAuthenticationConsumerTaskImpl.java
+++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapCpeAuthenticationConsumerTaskImpl.java
@@ -20,59 +20,61 @@
package org.onap.bbs.event.processor.tasks;
-import com.google.gson.JsonElement;
-
-import java.util.Optional;
+import static org.onap.bbs.event.processor.config.ApplicationConstants.SUBSCRIBE_URL_TEMPLATE;
+import static org.onap.bbs.event.processor.utilities.GenericUtils.createSubscribeRequest;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
-import javax.net.ssl.SSLException;
import org.onap.bbs.event.processor.config.ApplicationConfiguration;
import org.onap.bbs.event.processor.config.ConfigurationChangeObserver;
import org.onap.bbs.event.processor.exceptions.EmptyDmaapResponseException;
import org.onap.bbs.event.processor.model.CpeAuthenticationConsumerDmaapModel;
import org.onap.bbs.event.processor.utilities.CpeAuthenticationDmaapConsumerJsonParser;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.ConsumerReactiveHttpClientFactory;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.DMaaPConsumerReactiveHttpClient;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.DMaaPReactiveWebClientFactory;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterSubscriber;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@Component
-public class DmaapCpeAuthenticationConsumerTaskImpl
- implements DmaapCpeAuthenticationConsumerTask, ConfigurationChangeObserver {
+public class DmaapCpeAuthenticationConsumerTaskImpl implements DmaapCpeAuthenticationConsumerTask,
+ ConfigurationChangeObserver {
private static final Logger LOGGER = LoggerFactory.getLogger(DmaapCpeAuthenticationConsumerTaskImpl.class);
- private ApplicationConfiguration configuration;
+
private final CpeAuthenticationDmaapConsumerJsonParser cpeAuthenticationDmaapConsumerJsonParser;
- private final ConsumerReactiveHttpClientFactory httpClientFactory;
+ private ApplicationConfiguration configuration;
+ private MessageRouterSubscriber subscriber;
+ private String subscribeUrl;
+ private MessageRouterSubscribeRequest subscribeRequest;
private static final EmptyDmaapResponseException EMPTY_DMAAP_EXCEPTION =
- new EmptyDmaapResponseException("CPE Authentication: Got an empty response from DMaaP");
-
- private DMaaPConsumerReactiveHttpClient httpClient;
+ new EmptyDmaapResponseException("CPE Authentication: Got an empty response from DMaaP");
@Autowired
- public DmaapCpeAuthenticationConsumerTaskImpl(ApplicationConfiguration configuration) throws SSLException {
- this(configuration, new CpeAuthenticationDmaapConsumerJsonParser(),
- new ConsumerReactiveHttpClientFactory(new DMaaPReactiveWebClientFactory()));
- }
-
DmaapCpeAuthenticationConsumerTaskImpl(ApplicationConfiguration configuration,
- CpeAuthenticationDmaapConsumerJsonParser
- cpeAuthenticationDmaapConsumerJsonParser,
- ConsumerReactiveHttpClientFactory httpClientFactory) throws SSLException {
+ @Qualifier("CpeAuthMessageRouterSubscriber")
+ MessageRouterSubscriber subscriber,
+ CpeAuthenticationDmaapConsumerJsonParser parser) {
+ this.cpeAuthenticationDmaapConsumerJsonParser = parser;
this.configuration = configuration;
- this.cpeAuthenticationDmaapConsumerJsonParser = cpeAuthenticationDmaapConsumerJsonParser;
- this.httpClientFactory = httpClientFactory;
-
- httpClient = httpClientFactory.create(this.configuration.getDmaapCpeAuthenticationConsumerConfiguration());
+ this.subscriber = subscriber;
+ subscribeUrl = String.format(SUBSCRIBE_URL_TEMPLATE,
+ this.configuration.getDmaapCpeAuthenticationConsumerProperties().getDmaapProtocol(),
+ this.configuration.getDmaapCpeAuthenticationConsumerProperties().getDmaapHostName(),
+ this.configuration.getDmaapCpeAuthenticationConsumerProperties().getDmaapPortNumber(),
+ this.configuration.getDmaapCpeAuthenticationConsumerProperties().getDmaapTopicName());
+
+ subscribeRequest = createSubscribeRequest(
+ subscribeUrl,
+ this.configuration.getDmaapCpeAuthenticationConsumerProperties().getConsumerGroup(),
+ this.configuration.getDmaapCpeAuthenticationConsumerProperties().getConsumerId());
}
@PostConstruct
@@ -87,24 +89,25 @@ public class DmaapCpeAuthenticationConsumerTaskImpl
@Override
public synchronized void updateConfiguration() {
- try {
- LOGGER.info("DMaaP CPE authentication consumer update due to new application configuration");
- LOGGER.info("Creating secure context with:\n {}",
- this.configuration.getDmaapCpeAuthenticationConsumerConfiguration());
- httpClient = httpClientFactory.create(this.configuration.getDmaapCpeAuthenticationConsumerConfiguration());
- } catch (SSLException e) {
- LOGGER.error("SSL error while updating HTTP Client after a config update");
- LOGGER.debug("SSL exception\n", e);
- }
+ LOGGER.info("DMaaP CPE authentication consumer update due to new application configuration");
+ subscribeUrl = String.format(SUBSCRIBE_URL_TEMPLATE,
+ configuration.getDmaapCpeAuthenticationConsumerProperties().getDmaapProtocol(),
+ configuration.getDmaapCpeAuthenticationConsumerProperties().getDmaapHostName(),
+ configuration.getDmaapCpeAuthenticationConsumerProperties().getDmaapPortNumber(),
+ configuration.getDmaapCpeAuthenticationConsumerProperties().getDmaapTopicName());
+ subscribeRequest = createSubscribeRequest(
+ subscribeUrl,
+ this.configuration.getDmaapCpeAuthenticationConsumerProperties().getConsumerGroup(),
+ this.configuration.getDmaapCpeAuthenticationConsumerProperties().getConsumerId());
}
@Override
public Flux<CpeAuthenticationConsumerDmaapModel> execute(String taskName) {
LOGGER.debug("Executing task for CPE-Authentication with name \"{}\"", taskName);
- DMaaPConsumerReactiveHttpClient httpClient = getHttpClient();
- Mono<JsonElement> response = httpClient.getDMaaPConsumerResponse(Optional.empty());
- return cpeAuthenticationDmaapConsumerJsonParser.extractModelFromDmaap(response)
- .switchIfEmpty(Flux.error(EMPTY_DMAAP_EXCEPTION))
+ return subscriber.getElements(subscribeRequest) // subscriber.get(subscribeRequest)
+ .flatMap(jsonElement ->
+ cpeAuthenticationDmaapConsumerJsonParser.extractModelFromDmaap(Mono.just(jsonElement)))
+ .switchIfEmpty(Mono.error(EMPTY_DMAAP_EXCEPTION))
.doOnError(e -> {
if (!(e instanceof EmptyDmaapResponseException)) {
LOGGER.error("DMaaP Consumption Exception: {}", e.getMessage());
@@ -112,8 +115,4 @@ public class DmaapCpeAuthenticationConsumerTaskImpl
}
});
}
-
- private synchronized DMaaPConsumerReactiveHttpClient getHttpClient() {
- return httpClient;
- }
}
diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapPublisherTask.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapPublisherTask.java
index 749c4e53..dec1dbcd 100644
--- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapPublisherTask.java
+++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapPublisherTask.java
@@ -21,11 +21,11 @@
package org.onap.bbs.event.processor.tasks;
import org.onap.bbs.event.processor.model.ControlLoopPublisherDmaapModel;
-import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpResponse;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse;
-import reactor.core.publisher.Mono;
+import reactor.core.publisher.Flux;
public interface DmaapPublisherTask {
- Mono<HttpResponse> execute(ControlLoopPublisherDmaapModel controlLoopPublisherDmaapModel);
+ Flux<MessageRouterPublishResponse> execute(ControlLoopPublisherDmaapModel controlLoopPublisherDmaapModel);
}
diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapPublisherTaskImpl.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapPublisherTaskImpl.java
index 283e5ef9..6c50b10d 100644
--- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapPublisherTaskImpl.java
+++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapPublisherTaskImpl.java
@@ -20,54 +20,48 @@
package org.onap.bbs.event.processor.tasks;
-import java.util.Optional;
+import static org.onap.bbs.event.processor.config.ApplicationConstants.PUBLISH_URL_TEMPLATE;
+import static org.onap.bbs.event.processor.utilities.GenericUtils.createPublishRequest;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
-import javax.net.ssl.SSLException;
import org.onap.bbs.event.processor.config.ApplicationConfiguration;
import org.onap.bbs.event.processor.config.ConfigurationChangeObserver;
import org.onap.bbs.event.processor.exceptions.DmaapException;
import org.onap.bbs.event.processor.model.ControlLoopPublisherDmaapModel;
import org.onap.bbs.event.processor.utilities.ControlLoopJsonBodyBuilder;
-import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpResponse;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.producer.DMaaPPublisherReactiveHttpClient;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.producer.DmaaPRestTemplateFactory;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.producer.PublisherReactiveHttpClientFactory;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DmaapClientFactory;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterPublisher;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
-import reactor.core.publisher.Mono;
+import reactor.core.publisher.Flux;
@Component
public class DmaapPublisherTaskImpl implements DmaapPublisherTask, ConfigurationChangeObserver {
private static final Logger LOGGER = LoggerFactory.getLogger(DmaapPublisherTaskImpl.class);
- private ApplicationConfiguration configuration;
- private final PublisherReactiveHttpClientFactory httpClientFactory;
- private DMaaPPublisherReactiveHttpClient httpClient;
+ private ApplicationConfiguration configuration;
+ private MessageRouterPublisher publisher;
+ private String publishUrl;
+ private MessageRouterPublishRequest publishRequest;
@Autowired
- DmaapPublisherTaskImpl(ApplicationConfiguration configuration) {
- this(configuration, new PublisherReactiveHttpClientFactory(new DmaaPRestTemplateFactory(),
- new ControlLoopJsonBodyBuilder()));
- }
-
- DmaapPublisherTaskImpl(ApplicationConfiguration configuration,
- PublisherReactiveHttpClientFactory httpClientFactory) {
+ DmaapPublisherTaskImpl(ApplicationConfiguration configuration, MessageRouterPublisher publisher) {
this.configuration = configuration;
- this.httpClientFactory = httpClientFactory;
-
- try {
- httpClient = httpClientFactory.create(this.configuration.getDmaapPublisherConfiguration());
- } catch (SSLException e) {
- LOGGER.error("SSL error while creating HTTP Client: {}", e.getMessage());
- LOGGER.debug("SSL exception\n", e);
- }
+ this.publisher = publisher;
+ publishUrl = String.format(PUBLISH_URL_TEMPLATE,
+ this.configuration.getDmaapProducerProperties().getDmaapProtocol(),
+ this.configuration.getDmaapProducerProperties().getDmaapHostName(),
+ this.configuration.getDmaapProducerProperties().getDmaapPortNumber(),
+ this.configuration.getDmaapProducerProperties().getDmaapTopicName());
+ publishRequest = createPublishRequest(publishUrl);
}
@PostConstruct
@@ -83,27 +77,23 @@ public class DmaapPublisherTaskImpl implements DmaapPublisherTask, Configuration
@Override
public synchronized void updateConfiguration() {
LOGGER.info("DMaaP Publisher update due to new application configuration");
- try {
- LOGGER.info("Creating secure context with:\n {}", this.configuration.getDmaapPublisherConfiguration());
- httpClient = httpClientFactory.create(this.configuration.getDmaapPublisherConfiguration());
- } catch (SSLException e) {
- LOGGER.error("SSL error while updating HTTP Client after a config update: {}", e.getMessage());
- LOGGER.debug("SSL exception\n", e);
- }
+ publisher =
+ DmaapClientFactory.createMessageRouterPublisher(configuration.getDmaapPublisherConfiguration());
+ publishUrl = String.format(PUBLISH_URL_TEMPLATE,
+ configuration.getDmaapProducerProperties().getDmaapProtocol(),
+ configuration.getDmaapProducerProperties().getDmaapHostName(),
+ configuration.getDmaapProducerProperties().getDmaapPortNumber(),
+ configuration.getDmaapProducerProperties().getDmaapTopicName());
+ publishRequest = createPublishRequest(publishUrl);
}
@Override
- public Mono<HttpResponse> execute(ControlLoopPublisherDmaapModel controlLoopPublisherDmaapModel) {
- if (controlLoopPublisherDmaapModel == null) {
+ public Flux<MessageRouterPublishResponse> execute(ControlLoopPublisherDmaapModel event) {
+ if (event == null) {
throw new DmaapException("Cannot invoke a DMaaP Publish task with a null message");
}
LOGGER.info("Executing task for publishing control loop message");
- LOGGER.debug("CL message \n{}", controlLoopPublisherDmaapModel);
- DMaaPPublisherReactiveHttpClient httpClient = getHttpClient();
- return httpClient.getDMaaPProducerResponse(controlLoopPublisherDmaapModel, Optional.empty());
- }
-
- private synchronized DMaaPPublisherReactiveHttpClient getHttpClient() {
- return httpClient;
+ LOGGER.debug("CL message \n{}", event);
+ return publisher.put(publishRequest, Flux.just(ControlLoopJsonBodyBuilder.createAsJsonElement(event)));
}
}
diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapReRegistrationConsumerTaskImpl.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapReRegistrationConsumerTaskImpl.java
index e40037b1..aff563c5 100644
--- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapReRegistrationConsumerTaskImpl.java
+++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapReRegistrationConsumerTaskImpl.java
@@ -20,25 +20,23 @@
package org.onap.bbs.event.processor.tasks;
-import com.google.gson.JsonElement;
-
-import java.util.Optional;
+import static org.onap.bbs.event.processor.config.ApplicationConstants.SUBSCRIBE_URL_TEMPLATE;
+import static org.onap.bbs.event.processor.utilities.GenericUtils.createSubscribeRequest;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
-import javax.net.ssl.SSLException;
import org.onap.bbs.event.processor.config.ApplicationConfiguration;
import org.onap.bbs.event.processor.config.ConfigurationChangeObserver;
import org.onap.bbs.event.processor.exceptions.EmptyDmaapResponseException;
import org.onap.bbs.event.processor.model.ReRegistrationConsumerDmaapModel;
import org.onap.bbs.event.processor.utilities.ReRegistrationDmaapConsumerJsonParser;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.ConsumerReactiveHttpClientFactory;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.DMaaPConsumerReactiveHttpClient;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.DMaaPReactiveWebClientFactory;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterSubscriber;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Flux;
@@ -49,30 +47,33 @@ public class DmaapReRegistrationConsumerTaskImpl implements DmaapReRegistrationC
ConfigurationChangeObserver {
private static final Logger LOGGER = LoggerFactory.getLogger(DmaapReRegistrationConsumerTaskImpl.class);
- private ApplicationConfiguration configuration;
+
private final ReRegistrationDmaapConsumerJsonParser reRegistrationDmaapConsumerJsonParser;
- private final ConsumerReactiveHttpClientFactory httpClientFactory;
+ private ApplicationConfiguration configuration;
+ private MessageRouterSubscriber subscriber;
+ private String subscribeUrl;
+ private MessageRouterSubscribeRequest subscribeRequest;
private static final EmptyDmaapResponseException EMPTY_DMAAP_EXCEPTION =
new EmptyDmaapResponseException("PNF Re-Registration: Got an empty response from DMaaP");
- private DMaaPConsumerReactiveHttpClient httpClient;
-
@Autowired
- public DmaapReRegistrationConsumerTaskImpl(ApplicationConfiguration configuration) throws SSLException {
- this(configuration, new ReRegistrationDmaapConsumerJsonParser(),
- new ConsumerReactiveHttpClientFactory(new DMaaPReactiveWebClientFactory()));
- }
-
DmaapReRegistrationConsumerTaskImpl(ApplicationConfiguration configuration,
- ReRegistrationDmaapConsumerJsonParser reRegDmaapConsumerJsonParser,
- ConsumerReactiveHttpClientFactory httpClientFactory)
- throws SSLException {
+ @Qualifier("ReRegMessageRouterSubscriber") MessageRouterSubscriber subscriber,
+ ReRegistrationDmaapConsumerJsonParser parser) {
+ this.reRegistrationDmaapConsumerJsonParser = parser;
this.configuration = configuration;
- this.reRegistrationDmaapConsumerJsonParser = reRegDmaapConsumerJsonParser;
- this.httpClientFactory = httpClientFactory;
-
- httpClient = httpClientFactory.create(this.configuration.getDmaapReRegistrationConsumerConfiguration());
+ this.subscriber = subscriber;
+ subscribeUrl = String.format(SUBSCRIBE_URL_TEMPLATE,
+ this.configuration.getDmaapReRegistrationConsumerProperties().getDmaapProtocol(),
+ this.configuration.getDmaapReRegistrationConsumerProperties().getDmaapHostName(),
+ this.configuration.getDmaapReRegistrationConsumerProperties().getDmaapPortNumber(),
+ this.configuration.getDmaapReRegistrationConsumerProperties().getDmaapTopicName());
+
+ subscribeRequest = createSubscribeRequest(
+ subscribeUrl,
+ this.configuration.getDmaapReRegistrationConsumerProperties().getConsumerGroup(),
+ this.configuration.getDmaapReRegistrationConsumerProperties().getConsumerId());
}
@PostConstruct
@@ -87,24 +88,25 @@ public class DmaapReRegistrationConsumerTaskImpl implements DmaapReRegistrationC
@Override
public synchronized void updateConfiguration() {
- try {
- LOGGER.info("DMaaP PNF reregistration consumer update due to new application configuration");
- LOGGER.info("Creating secure context with:\n {}",
- this.configuration.getDmaapReRegistrationConsumerConfiguration());
- httpClient = httpClientFactory.create(this.configuration.getDmaapReRegistrationConsumerConfiguration());
- } catch (SSLException e) {
- LOGGER.error("SSL error while updating HTTP Client after a config update");
- LOGGER.debug("SSL exception\n", e);
- }
+ LOGGER.info("DMaaP PNF reregistration consumer update due to new application configuration");
+ subscribeUrl = String.format(SUBSCRIBE_URL_TEMPLATE,
+ configuration.getDmaapReRegistrationConsumerProperties().getDmaapProtocol(),
+ configuration.getDmaapReRegistrationConsumerProperties().getDmaapHostName(),
+ configuration.getDmaapReRegistrationConsumerProperties().getDmaapPortNumber(),
+ configuration.getDmaapReRegistrationConsumerProperties().getDmaapTopicName());
+ subscribeRequest = createSubscribeRequest(
+ subscribeUrl,
+ this.configuration.getDmaapReRegistrationConsumerProperties().getConsumerGroup(),
+ this.configuration.getDmaapReRegistrationConsumerProperties().getConsumerId());
}
@Override
public Flux<ReRegistrationConsumerDmaapModel> execute(String taskName) {
LOGGER.debug("Executing task for Re-Registration with name \"{}\"", taskName);
- DMaaPConsumerReactiveHttpClient httpClient = getHttpClient();
- Mono<JsonElement> response = httpClient.getDMaaPConsumerResponse(Optional.empty());
- return reRegistrationDmaapConsumerJsonParser.extractModelFromDmaap(response)
- .switchIfEmpty(Flux.error(EMPTY_DMAAP_EXCEPTION))
+ return subscriber.getElements(subscribeRequest)
+ .flatMap(jsonElement ->
+ reRegistrationDmaapConsumerJsonParser.extractModelFromDmaap(Mono.just(jsonElement)))
+ .switchIfEmpty(Mono.error(EMPTY_DMAAP_EXCEPTION))
.doOnError(e -> {
if (!(e instanceof EmptyDmaapResponseException)) {
LOGGER.error("DMaaP Consumption Exception: {}", e.getMessage());
@@ -112,8 +114,4 @@ public class DmaapReRegistrationConsumerTaskImpl implements DmaapReRegistrationC
}
});
}
-
- private synchronized DMaaPConsumerReactiveHttpClient getHttpClient() {
- return httpClient;
- }
}
diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/AaiReactiveClient.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/AaiReactiveClient.java
index 84fc9f7d..c0e9b1c4 100644
--- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/AaiReactiveClient.java
+++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/AaiReactiveClient.java
@@ -20,6 +20,8 @@
package org.onap.bbs.event.processor.utilities;
+import static org.onap.bbs.event.processor.utilities.GenericUtils.keyStoreFromResource;
+import static org.onap.dcaegen2.services.sdk.security.ssl.Passwords.fromPath;
import static org.springframework.web.reactive.function.client.ExchangeFilterFunctions.basicAuthentication;
import com.google.gson.Gson;
@@ -27,9 +29,10 @@ import com.google.gson.JsonSyntaxException;
import io.netty.handler.ssl.SslContext;
+import java.nio.file.Paths;
+
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
-import javax.net.ssl.SSLException;
import org.onap.bbs.event.processor.config.AaiClientConfiguration;
import org.onap.bbs.event.processor.config.ApplicationConfiguration;
@@ -37,7 +40,9 @@ import org.onap.bbs.event.processor.config.ConfigurationChangeObserver;
import org.onap.bbs.event.processor.exceptions.AaiTaskException;
import org.onap.bbs.event.processor.model.PnfAaiObject;
import org.onap.bbs.event.processor.model.ServiceInstanceAaiObject;
-import org.onap.dcaegen2.services.sdk.rest.services.ssl.SslFactory;
+import org.onap.dcaegen2.services.sdk.security.ssl.ImmutableSecurityKeys;
+import org.onap.dcaegen2.services.sdk.security.ssl.SecurityKeys;
+import org.onap.dcaegen2.services.sdk.security.ssl.SslFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@@ -64,7 +69,7 @@ public class AaiReactiveClient implements ConfigurationChangeObserver {
private AaiClientConfiguration aaiClientConfiguration;
@Autowired
- AaiReactiveClient(ApplicationConfiguration configuration, Gson gson) throws SSLException {
+ AaiReactiveClient(ApplicationConfiguration configuration, Gson gson) {
this.configuration = configuration;
this.gson = gson;
this.sslFactory = new SslFactory();
@@ -85,25 +90,20 @@ public class AaiReactiveClient implements ConfigurationChangeObserver {
@Override
public void updateConfiguration() {
- AaiClientConfiguration newConfiguration = configuration.getAaiClientConfiguration();
+ var newConfiguration = configuration.getAaiClientConfiguration();
if (aaiClientConfiguration.equals(newConfiguration)) {
LOGGER.info("No Configuration changes necessary for AAI Reactive client");
} else {
synchronized (this) {
LOGGER.info("AAI Reactive client must be re-configured");
aaiClientConfiguration = newConfiguration;
- try {
- setupWebClient();
- } catch (SSLException e) {
- LOGGER.error("AAI Reactive client SSL error while re-configuring WebClient");
- LOGGER.debug("SSL Exception\n", e);
- }
+ setupWebClient();
}
}
}
- private void setupWebClient() throws SSLException {
- SslContext sslContext = createSslContext();
+ private void setupWebClient() {
+ var sslContext = createSslContext();
ClientHttpConnector reactorClientHttpConnector = new ReactorClientHttpConnector(
HttpClient.create().secure(sslContextSpec -> sslContextSpec.sslContext(sslContext)));
@@ -132,7 +132,7 @@ public class AaiReactiveClient implements ConfigurationChangeObserver {
private <T> Mono<T> performReactiveHttpGet(String url, Class<T> responseType) {
LOGGER.debug("Will issue Reactive GET request to URL ({}) for object ({})", url, responseType.getName());
- WebClient webClient = getWebClient();
+ var webClient = getWebClient();
return webClient
.get()
.uri(url)
@@ -179,17 +179,18 @@ public class AaiReactiveClient implements ConfigurationChangeObserver {
});
}
- private SslContext createSslContext() throws SSLException {
+ private SslContext createSslContext() {
if (aaiClientConfiguration.enableAaiCertAuth()) {
LOGGER.info("Creating secure context with:\n {}", aaiClientConfiguration);
- return sslFactory.createSecureContext(
- aaiClientConfiguration.keyStorePath(),
- aaiClientConfiguration.keyStorePasswordPath(),
- aaiClientConfiguration.trustStorePath(),
- aaiClientConfiguration.trustStorePasswordPath()
- );
+ final SecurityKeys securityKeys = ImmutableSecurityKeys.builder()
+ .keyStore(keyStoreFromResource(aaiClientConfiguration.keyStorePath()))
+ .keyStorePassword(fromPath(Paths.get(aaiClientConfiguration.keyStorePasswordPath())))
+ .trustStore(keyStoreFromResource(aaiClientConfiguration.trustStorePath()))
+ .trustStorePassword(fromPath(Paths.get(aaiClientConfiguration.trustStorePasswordPath())))
+ .build();
+ return sslFactory.createSecureClientContext(securityKeys);
}
- return sslFactory.createInsecureContext();
+ return sslFactory.createInsecureClientContext();
}
private synchronized WebClient getWebClient() {
diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/CommonEventFields.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/CommonEventFields.java
index c2b67348..b3fa09db 100644
--- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/CommonEventFields.java
+++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/CommonEventFields.java
@@ -20,7 +20,7 @@
package org.onap.bbs.event.processor.utilities;
-public class CommonEventFields {
+class CommonEventFields {
static final String COMMON_FORMAT = "\": \"%s\"";
static final String EVENT = "event";
diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/ControlLoopJsonBodyBuilder.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/ControlLoopJsonBodyBuilder.java
index 03ea0936..f27cca5e 100644
--- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/ControlLoopJsonBodyBuilder.java
+++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/ControlLoopJsonBodyBuilder.java
@@ -21,24 +21,23 @@
package org.onap.bbs.event.processor.utilities;
import com.google.gson.GsonBuilder;
+import com.google.gson.JsonElement;
import com.google.gson.TypeAdapterFactory;
import java.util.ServiceLoader;
import org.onap.bbs.event.processor.model.ControlLoopPublisherDmaapModel;
import org.onap.bbs.event.processor.model.ImmutableControlLoopPublisherDmaapModel;
-import org.onap.dcaegen2.services.sdk.rest.services.model.JsonBodyBuilder;
-public class ControlLoopJsonBodyBuilder implements JsonBodyBuilder<ControlLoopPublisherDmaapModel> {
+public class ControlLoopJsonBodyBuilder {
/**
* Serialize the Control Loop DMaaP model with GSON.
* @param publisherDmaapModel object to be serialized
* @return String output of serialization
*/
- @Override
public String createJsonBody(ControlLoopPublisherDmaapModel publisherDmaapModel) {
- GsonBuilder gsonBuilder = new GsonBuilder().disableHtmlEscaping();
+ var gsonBuilder = new GsonBuilder().disableHtmlEscaping();
ServiceLoader.load(TypeAdapterFactory.class).forEach(gsonBuilder::registerTypeAdapterFactory);
return gsonBuilder.create().toJson(ImmutableControlLoopPublisherDmaapModel.builder()
.closedLoopEventClient(publisherDmaapModel.getClosedLoopEventClient())
@@ -56,4 +55,30 @@ public class ControlLoopJsonBodyBuilder implements JsonBodyBuilder<ControlLoopPu
.originator(publisherDmaapModel.getOriginator())
.build());
}
+
+ /**
+ * Serialize the Control Loop DMaaP model with GSON.
+ * @param publisherDmaapModel object to be serialized
+ * @return String output of serialization
+ */
+ public static JsonElement createAsJsonElement(ControlLoopPublisherDmaapModel publisherDmaapModel) {
+ var gsonBuilder = new GsonBuilder().disableHtmlEscaping();
+ ServiceLoader.load(TypeAdapterFactory.class).forEach(gsonBuilder::registerTypeAdapterFactory);
+ return gsonBuilder.create().toJsonTree(ImmutableControlLoopPublisherDmaapModel.builder()
+ .closedLoopEventClient(publisherDmaapModel.getClosedLoopEventClient())
+ .policyVersion(publisherDmaapModel.getPolicyVersion())
+ .policyName(publisherDmaapModel.getPolicyName())
+ .policyScope(publisherDmaapModel.getPolicyScope())
+ .targetType(publisherDmaapModel.getTargetType())
+ .aaiEnrichmentData(publisherDmaapModel.getAaiEnrichmentData())
+ .closedLoopAlarmStart(publisherDmaapModel.getClosedLoopAlarmStart())
+ .closedLoopEventStatus(publisherDmaapModel.getClosedLoopEventStatus())
+ .closedLoopControlName(publisherDmaapModel.getClosedLoopControlName())
+ .version(publisherDmaapModel.getVersion())
+ .target(publisherDmaapModel.getTarget())
+ .requestId(publisherDmaapModel.getRequestId())
+ .originator(publisherDmaapModel.getOriginator())
+ .build());
+ }
+
}
diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/CpeAuthenticationDmaapConsumerJsonParser.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/CpeAuthenticationDmaapConsumerJsonParser.java
index 3cff4e65..5a0466ac 100644
--- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/CpeAuthenticationDmaapConsumerJsonParser.java
+++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/CpeAuthenticationDmaapConsumerJsonParser.java
@@ -46,11 +46,13 @@ import org.onap.bbs.event.processor.model.CpeAuthenticationConsumerDmaapModel;
import org.onap.bbs.event.processor.model.ImmutableCpeAuthenticationConsumerDmaapModel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
+@Component
public class CpeAuthenticationDmaapConsumerJsonParser {
private static final Logger LOGGER = LoggerFactory.getLogger(CpeAuthenticationDmaapConsumerJsonParser.class);
@@ -108,9 +110,9 @@ public class CpeAuthenticationDmaapConsumerJsonParser {
return Mono.empty();
}
- JsonObject commonEventHeader = dmaapResponseJsonObject.getAsJsonObject(EVENT)
+ var commonEventHeader = dmaapResponseJsonObject.getAsJsonObject(EVENT)
.getAsJsonObject(COMMON_EVENT_HEADER);
- JsonObject stateChangeFields = dmaapResponseJsonObject.getAsJsonObject(EVENT)
+ var stateChangeFields = dmaapResponseJsonObject.getAsJsonObject(EVENT)
.getAsJsonObject(STATE_CHANGE_FIELDS);
pnfSourceName = getValueFromJson(commonEventHeader, SOURCE_NAME);
@@ -120,7 +122,7 @@ public class CpeAuthenticationDmaapConsumerJsonParser {
stateInterface = getValueFromJson(stateChangeFields, STATE_INTERFACE);
if (stateChangeFields.has(ADDITIONAL_FIELDS)) {
- JsonObject additionalFields = stateChangeFields.getAsJsonObject(ADDITIONAL_FIELDS);
+ var additionalFields = stateChangeFields.getAsJsonObject(ADDITIONAL_FIELDS);
rgwMacAddress = getValueFromJson(additionalFields, RGW_MAC_ADDRESS);
swVersion = getValueFromJson(additionalFields, SW_VERSION);
}
@@ -128,7 +130,7 @@ public class CpeAuthenticationDmaapConsumerJsonParser {
if (StringUtils.isEmpty(pnfSourceName)
|| authenticationStatusMissing(oldAuthenticationStatus)
|| authenticationStatusMissing(newAuthenticationStatus)) {
- String incorrectEvent = dumpJsonData();
+ var incorrectEvent = dumpJsonData();
LOGGER.warn("Incorrect CPE Authentication JSON event: {}", incorrectEvent);
return Mono.empty();
}
@@ -164,7 +166,7 @@ public class CpeAuthenticationDmaapConsumerJsonParser {
}
Optional<JsonObject> getJsonObjectFromAnArray(JsonElement element) {
- JsonParser jsonParser = new JsonParser();
+ var jsonParser = new JsonParser();
return element.isJsonPrimitive() ? Optional.of(jsonParser.parse(element.getAsString()).getAsJsonObject())
: Optional.of(jsonParser.parse(element.toString()).getAsJsonObject());
}
diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/CpeAuthenticationJsonBodyBuilder.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/CpeAuthenticationJsonBodyBuilder.java
index 09aa7303..df8eaa40 100644
--- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/CpeAuthenticationJsonBodyBuilder.java
+++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/CpeAuthenticationJsonBodyBuilder.java
@@ -27,18 +27,16 @@ import java.util.ServiceLoader;
import org.onap.bbs.event.processor.model.CpeAuthenticationConsumerDmaapModel;
import org.onap.bbs.event.processor.model.ImmutableCpeAuthenticationConsumerDmaapModel;
-import org.onap.dcaegen2.services.sdk.rest.services.model.JsonBodyBuilder;
-public class CpeAuthenticationJsonBodyBuilder implements JsonBodyBuilder<CpeAuthenticationConsumerDmaapModel> {
+public class CpeAuthenticationJsonBodyBuilder {
/**
* Serialize the CPE authentication DMaaP model with GSON.
* @param cpeAuthenticationConsumerDmaapModel object to be serialized
* @return String output of serialization
*/
- @Override
public String createJsonBody(CpeAuthenticationConsumerDmaapModel cpeAuthenticationConsumerDmaapModel) {
- GsonBuilder gsonBuilder = new GsonBuilder();
+ var gsonBuilder = new GsonBuilder();
ServiceLoader.load(TypeAdapterFactory.class).forEach(gsonBuilder::registerTypeAdapterFactory);
return gsonBuilder.create().toJson(ImmutableCpeAuthenticationConsumerDmaapModel.builder()
.correlationId(cpeAuthenticationConsumerDmaapModel.getCorrelationId())
diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/GenericUtils.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/GenericUtils.java
new file mode 100644
index 00000000..be8bea09
--- /dev/null
+++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/GenericUtils.java
@@ -0,0 +1,98 @@
+/*
+ * ============LICENSE_START=======================================================
+ * BBS-RELOCATION-CPE-AUTHENTICATION-HANDLER
+ * ================================================================================
+ * Copyright (C) 2019 NOKIA Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.bbs.event.processor.utilities;
+
+import java.nio.file.Paths;
+
+import org.jetbrains.annotations.NotNull;
+import org.onap.bbs.event.processor.exceptions.ApplicationEnvironmentException;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableMessageRouterSink;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableMessageRouterSource;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSink;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.ContentType;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterPublishRequest;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterSubscribeRequest;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest;
+import org.onap.dcaegen2.services.sdk.security.ssl.SecurityKeysStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.util.StringUtils;
+
+public class GenericUtils {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(GenericUtils.class);
+
+ private GenericUtils() {}
+
+ /**
+ * Creates Message Router subscription request.
+ * @param topicUrl URL of topic to use
+ * @param consumerGroup Consumer Group for subscription
+ * @param consumerId Consumer ID for subscription
+ * @return request based on provided input
+ */
+ public static MessageRouterSubscribeRequest createSubscribeRequest(String topicUrl,
+ String consumerGroup, String consumerId) {
+ var sourceDefinition = ImmutableMessageRouterSource.builder()
+ .name("Subscriber source")
+ .topicUrl(topicUrl)
+ .build();
+
+ return ImmutableMessageRouterSubscribeRequest
+ .builder()
+ .sourceDefinition(sourceDefinition)
+ .consumerGroup(consumerGroup)
+ .consumerId(consumerId)
+ .build();
+ }
+
+ /**
+ * Creates Message Router publish request.
+ * @param topicUrl URL of topic to use
+ * @return request based on provided input
+ */
+ public static MessageRouterPublishRequest createPublishRequest(String topicUrl) {
+ MessageRouterSink sinkDefinition = ImmutableMessageRouterSink.builder()
+ .topicUrl(topicUrl)
+ .name("Producer sink")
+ .build();
+
+ return ImmutableMessageRouterPublishRequest.builder()
+ .sinkDefinition(sinkDefinition)
+ .contentType(ContentType.APPLICATION_JSON)
+ .build();
+ }
+
+ /**
+ * Creates a security key store for HTTPS.
+ * @param resource identifying the resource from which we will read security information
+ * @return store that will be used for HTTPS
+ */
+ @NotNull public static SecurityKeysStore keyStoreFromResource(String resource) {
+ if (StringUtils.isEmpty(resource)) {
+ throw new ApplicationEnvironmentException("Resource for security key store is empty");
+ }
+ var path = Paths.get(resource);
+ LOGGER.info("Reading keys from {}", path.toString());
+ return SecurityKeysStore.fromPath(path);
+ }
+}
diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/ReRegistrationDmaapConsumerJsonParser.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/ReRegistrationDmaapConsumerJsonParser.java
index 9fe0c277..5e249e57 100644
--- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/ReRegistrationDmaapConsumerJsonParser.java
+++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/ReRegistrationDmaapConsumerJsonParser.java
@@ -41,11 +41,13 @@ import org.onap.bbs.event.processor.model.ImmutableReRegistrationConsumerDmaapMo
import org.onap.bbs.event.processor.model.ReRegistrationConsumerDmaapModel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
+@Component
public class ReRegistrationDmaapConsumerJsonParser {
private static final Logger LOGGER = LoggerFactory.getLogger(ReRegistrationDmaapConsumerJsonParser.class);
@@ -101,7 +103,7 @@ public class ReRegistrationDmaapConsumerJsonParser {
return Mono.empty();
}
- JsonObject pnfReRegistrationFields =
+ var pnfReRegistrationFields =
dmaapResponseJsonObject.getAsJsonObject(ADDITIONAL_FIELDS);
pnfCorrelationId = getValueFromJson(dmaapResponseJsonObject, CORRELATION_ID);
@@ -112,7 +114,7 @@ public class ReRegistrationDmaapConsumerJsonParser {
svlan = getValueFromJson(pnfReRegistrationFields, SVLAN);
if (StringUtils.isEmpty(pnfCorrelationId) || anyImportantPropertyMissing()) {
- String incorrectEvent = dumpJsonData();
+ var incorrectEvent = dumpJsonData();
LOGGER.warn("Incorrect Re-Registration JSON event: {}", incorrectEvent);
return Mono.empty();
}
@@ -148,7 +150,7 @@ public class ReRegistrationDmaapConsumerJsonParser {
}
Optional<JsonObject> getJsonObjectFromAnArray(JsonElement element) {
- JsonParser jsonParser = new JsonParser();
+ var jsonParser = new JsonParser();
return element.isJsonPrimitive() ? Optional.of(jsonParser.parse(element.getAsString()).getAsJsonObject())
: Optional.of(jsonParser.parse(element.toString()).getAsJsonObject());
}
diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/ReRegistrationJsonBodyBuilder.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/ReRegistrationJsonBodyBuilder.java
index 867cfda7..bf90a4c5 100644
--- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/ReRegistrationJsonBodyBuilder.java
+++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/ReRegistrationJsonBodyBuilder.java
@@ -27,18 +27,16 @@ import java.util.ServiceLoader;
import org.onap.bbs.event.processor.model.ImmutableReRegistrationConsumerDmaapModel;
import org.onap.bbs.event.processor.model.ReRegistrationConsumerDmaapModel;
-import org.onap.dcaegen2.services.sdk.rest.services.model.JsonBodyBuilder;
-public class ReRegistrationJsonBodyBuilder implements JsonBodyBuilder<ReRegistrationConsumerDmaapModel> {
+public class ReRegistrationJsonBodyBuilder {
/**
* Serialize the Re-Registration DMaaP model with GSON.
* @param reRegistrationConsumerDmaapModel object to be serialized
* @return String output of serialization
*/
- @Override
public String createJsonBody(ReRegistrationConsumerDmaapModel reRegistrationConsumerDmaapModel) {
- GsonBuilder gsonBuilder = new GsonBuilder();
+ var gsonBuilder = new GsonBuilder();
ServiceLoader.load(TypeAdapterFactory.class).forEach(gsonBuilder::registerTypeAdapterFactory);
return gsonBuilder.create().toJson(ImmutableReRegistrationConsumerDmaapModel.builder()
.correlationId(reRegistrationConsumerDmaapModel.getCorrelationId())