diff options
author | Stavros Kanarakis <stavros.kanarakis@nokia.com> | 2020-02-28 21:53:04 +0200 |
---|---|---|
committer | Stavros Kanarakis <stavros.kanarakis@nokia.com> | 2020-03-03 17:30:22 +0200 |
commit | 22025d4ddfcccd86a2f93be7dadea9735e4b4528 (patch) | |
tree | 6b12e0770f65cb6a7389c0300fe95454ac1717fb /components/bbs-event-processor/src/main/java/org/onap/bbs | |
parent | 82a39f7da3177a9b9b700c7291ed5ea47c90e478 (diff) |
Upgrade of BBS-ep service
Upgraded service to use latest DCAE-SDK
Upgraded many of the dependencies to latest versions
Introduced Java 11
Change-Id: I29d265d2a75aa80749f567cfb10920b2c45c2cec
Issue-ID: DCAEGEN2-2105
Signed-off-by: Stavros Kanarakis <stavros.kanarakis@nokia.com>
Diffstat (limited to 'components/bbs-event-processor/src/main/java/org/onap/bbs')
25 files changed, 522 insertions, 411 deletions
diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/Application.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/Application.java index 66e1f86d..94b955f1 100644 --- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/Application.java +++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/Application.java @@ -71,7 +71,7 @@ public class Application { @Bean TaskScheduler threadPoolTaskScheduler() { - ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler(); + var scheduler = new ThreadPoolTaskScheduler(); scheduler.setPoolSize(THREADS_IN_POOL); scheduler.setThreadNamePrefix("pipeline-thrd-"); return scheduler; diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/config/ApplicationConfiguration.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/config/ApplicationConfiguration.java index 5022a693..33a935e5 100644 --- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/config/ApplicationConfiguration.java +++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/config/ApplicationConfiguration.java @@ -21,7 +21,10 @@ package org.onap.bbs.event.processor.config; import static org.onap.bbs.event.processor.config.ApplicationConstants.STREAMS_TYPE; +import static org.onap.bbs.event.processor.utilities.GenericUtils.keyStoreFromResource; +import static org.onap.dcaegen2.services.sdk.security.ssl.Passwords.fromPath; +import java.nio.file.Paths; import java.util.HashSet; import java.util.Map; import java.util.Set; @@ -31,10 +34,12 @@ import org.onap.bbs.event.processor.exceptions.ApplicationEnvironmentException; import org.onap.bbs.event.processor.exceptions.ConfigurationParsingException; import org.onap.bbs.event.processor.model.GeneratedAppConfigObject; import org.onap.bbs.event.processor.utilities.LoggingUtil; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapConsumerConfiguration; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapPublisherConfiguration; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.ImmutableMessageRouterPublisherConfig; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.ImmutableMessageRouterSubscriberConfig; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterPublisherConfig; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterSubscriberConfig; +import org.onap.dcaegen2.services.sdk.security.ssl.ImmutableSecurityKeys; +import org.onap.dcaegen2.services.sdk.security.ssl.SecurityKeys; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Configuration; @@ -48,9 +53,9 @@ public class ApplicationConfiguration implements ConfigurationChangeObservable { private final SecurityProperties securityProperties; private final GenericProperties genericProperties; - private DmaapConsumerConfiguration dmaapReRegistrationConsumerConfiguration; - private DmaapConsumerConfiguration dmaapCpeAuthenticationConsumerConfiguration; - private DmaapPublisherConfiguration dmaapPublisherConfiguration; + private MessageRouterSubscriberConfig dmaapReRegistrationConsumerConfiguration; + private MessageRouterSubscriberConfig dmaapCpeAuthenticationConsumerConfiguration; + private MessageRouterPublisherConfig dmaapPublisherConfiguration; private AaiClientConfiguration aaiClientConfiguration; private Set<ConfigurationChangeObserver> observers; @@ -97,15 +102,15 @@ public class ApplicationConfiguration implements ConfigurationChangeObservable { observers.forEach(ConfigurationChangeObserver::updateConfiguration); } - public synchronized DmaapConsumerConfiguration getDmaapReRegistrationConsumerConfiguration() { + public synchronized MessageRouterSubscriberConfig getDmaapReRegistrationConsumerConfiguration() { return dmaapReRegistrationConsumerConfiguration; } - public synchronized DmaapConsumerConfiguration getDmaapCpeAuthenticationConsumerConfiguration() { + public synchronized MessageRouterSubscriberConfig getDmaapCpeAuthenticationConsumerConfiguration() { return dmaapCpeAuthenticationConsumerConfiguration; } - public synchronized DmaapPublisherConfiguration getDmaapPublisherConfiguration() { + public synchronized MessageRouterPublisherConfig getDmaapPublisherConfiguration() { return dmaapPublisherConfiguration; } @@ -117,6 +122,18 @@ public class ApplicationConfiguration implements ConfigurationChangeObservable { return genericProperties.getPipelinesPollingIntervalSec(); } + public synchronized DmaapProducerProperties getDmaapProducerProperties() { + return dmaapProducerProperties; + } + + public synchronized DmaapReRegistrationConsumerProperties getDmaapReRegistrationConsumerProperties() { + return dmaapReRegistrationConsumerProperties; + } + + public synchronized DmaapCpeAuthenticationConsumerProperties getDmaapCpeAuthenticationConsumerProperties() { + return dmaapCpeAuthenticationConsumerProperties; + } + public synchronized int getPipelinesTimeoutInSeconds() { return genericProperties.getPipelinesTimeoutSec(); } @@ -180,11 +197,17 @@ public class ApplicationConfiguration implements ConfigurationChangeObservable { securityProperties.setKeyStorePasswordPath(newConfiguration.keyStorePasswordPath()); securityProperties.setTrustStorePath(newConfiguration.trustStorePath()); securityProperties.setTrustStorePasswordPath(newConfiguration.trustStorePasswordPath()); - - GeneratedAppConfigObject.StreamsObject reRegObject = + final SecurityKeys securityKeys = ImmutableSecurityKeys.builder() + .keyStore(keyStoreFromResource(securityProperties.getKeyStorePath())) + .keyStorePassword(fromPath(Paths.get(securityProperties.getKeyStorePasswordPath()))) + .trustStore(keyStoreFromResource(securityProperties.getTrustStorePath())) + .trustStorePassword(fromPath(Paths.get(securityProperties.getTrustStorePasswordPath()))) + .build(); + + var reRegObject = getStreamsObject(newConfiguration.streamSubscribesMap(), newConfiguration.reRegConfigKey(), "PNF Re-Registration"); - TopicUrlInfo topicUrlInfo = parseTopicUrl(reRegObject.dmaapInfo().topicUrl()); + var topicUrlInfo = parseTopicUrl(reRegObject.dmaapInfo().topicUrl()); dmaapReRegistrationConsumerProperties.setDmaapHostName(topicUrlInfo.getHost()); dmaapReRegistrationConsumerProperties.setDmaapPortNumber(topicUrlInfo.getPort()); dmaapReRegistrationConsumerProperties.setDmaapProtocol(newConfiguration.dmaapProtocol()); @@ -196,9 +219,9 @@ public class ApplicationConfiguration implements ConfigurationChangeObservable { dmaapReRegistrationConsumerProperties.setConsumerGroup(newConfiguration.dmaapConsumerConsumerGroup()); dmaapReRegistrationConsumerProperties.setMessageLimit(newConfiguration.dmaapMessageLimit()); dmaapReRegistrationConsumerProperties.setTimeoutMs(newConfiguration.dmaapTimeoutMs()); - constructDmaapReRegistrationConfiguration(); + constructDmaapReRegistrationConfiguration(securityKeys); - GeneratedAppConfigObject.StreamsObject cpeAuthObject = + var cpeAuthObject = getStreamsObject(newConfiguration.streamSubscribesMap(), newConfiguration.cpeAuthConfigKey(), "CPE Authentication"); topicUrlInfo = parseTopicUrl(cpeAuthObject.dmaapInfo().topicUrl()); @@ -213,9 +236,9 @@ public class ApplicationConfiguration implements ConfigurationChangeObservable { dmaapCpeAuthenticationConsumerProperties.setConsumerGroup(newConfiguration.dmaapConsumerConsumerGroup()); dmaapCpeAuthenticationConsumerProperties.setMessageLimit(newConfiguration.dmaapMessageLimit()); dmaapCpeAuthenticationConsumerProperties.setTimeoutMs(newConfiguration.dmaapTimeoutMs()); - constructDmaapCpeAuthenticationConfiguration(); + constructDmaapCpeAuthenticationConfiguration(securityKeys); - GeneratedAppConfigObject.StreamsObject closeLoopObject = + var closeLoopObject = getStreamsObject(newConfiguration.streamPublishesMap(), newConfiguration.closeLoopConfigKey(), "Close Loop"); topicUrlInfo = parseTopicUrl(closeLoopObject.dmaapInfo().topicUrl()); @@ -226,7 +249,7 @@ public class ApplicationConfiguration implements ConfigurationChangeObservable { dmaapProducerProperties.setDmaapUserPassword(closeLoopObject.aafPassword()); dmaapProducerProperties.setDmaapContentType(newConfiguration.dmaapContentType()); dmaapProducerProperties.setDmaapTopicName(topicUrlInfo.getTopicName()); - constructDmaapProducerConfiguration(); + constructDmaapProducerConfiguration(securityKeys); aaiClientProperties.setAaiHost(newConfiguration.aaiHost()); aaiClientProperties.setAaiPort(newConfiguration.aaiPort()); @@ -258,7 +281,7 @@ public class ApplicationConfiguration implements ConfigurationChangeObservable { @NotNull private GeneratedAppConfigObject.StreamsObject getStreamsObject( Map<String, GeneratedAppConfigObject.StreamsObject> map, String configKey, String messageName) { - GeneratedAppConfigObject.StreamsObject streamsObject = map.get(configKey); + var streamsObject = map.get(configKey); if (!STREAMS_TYPE.equals(streamsObject.type())) { throw new ApplicationEnvironmentException(String.format("%s requires information about" + " message-router topic in ONAP", messageName)); @@ -267,80 +290,45 @@ public class ApplicationConfiguration implements ConfigurationChangeObservable { } private void constructConfigurationObjects() { - constructDmaapReRegistrationConfiguration(); - constructDmaapCpeAuthenticationConfiguration(); - constructDmaapProducerConfiguration(); + final SecurityKeys securityKeysForReRegistration = ImmutableSecurityKeys.builder() + .keyStore(keyStoreFromResource(securityProperties.getKeyStorePath())) + .keyStorePassword(fromPath(Paths.get(securityProperties.getKeyStorePasswordPath()))) + .trustStore(keyStoreFromResource(securityProperties.getTrustStorePath())) + .trustStorePassword(fromPath(Paths.get(securityProperties.getTrustStorePasswordPath()))) + .build(); + constructDmaapReRegistrationConfiguration(securityKeysForReRegistration); + final SecurityKeys securityKeysForCpeAuthentication = ImmutableSecurityKeys.builder() + .keyStore(keyStoreFromResource(securityProperties.getKeyStorePath())) + .keyStorePassword(fromPath(Paths.get(securityProperties.getKeyStorePasswordPath()))) + .trustStore(keyStoreFromResource(securityProperties.getTrustStorePath())) + .trustStorePassword(fromPath(Paths.get(securityProperties.getTrustStorePasswordPath()))) + .build(); + constructDmaapCpeAuthenticationConfiguration(securityKeysForCpeAuthentication); + final SecurityKeys securityKeysForProducer = ImmutableSecurityKeys.builder() + .keyStore(keyStoreFromResource(securityProperties.getKeyStorePath())) + .keyStorePassword(fromPath(Paths.get(securityProperties.getKeyStorePasswordPath()))) + .trustStore(keyStoreFromResource(securityProperties.getTrustStorePath())) + .trustStorePassword(fromPath(Paths.get(securityProperties.getTrustStorePasswordPath()))) + .build(); + constructDmaapProducerConfiguration(securityKeysForProducer); constructAaiConfiguration(); } - private void constructDmaapReRegistrationConfiguration() { - dmaapReRegistrationConsumerConfiguration = new ImmutableDmaapConsumerConfiguration.Builder() - .dmaapHostName(dmaapReRegistrationConsumerProperties.getDmaapHostName()) - .dmaapPortNumber(dmaapReRegistrationConsumerProperties.getDmaapPortNumber()) - .dmaapProtocol(dmaapReRegistrationConsumerProperties.getDmaapProtocol()) - .dmaapTopicName(dmaapReRegistrationConsumerProperties.getDmaapTopicName()) - .dmaapUserName( - dmaapReRegistrationConsumerProperties.getDmaapUserName() == null ? "" : - dmaapReRegistrationConsumerProperties.getDmaapUserName()) - .dmaapUserPassword( - dmaapReRegistrationConsumerProperties.getDmaapUserPassword() == null ? "" : - dmaapReRegistrationConsumerProperties.getDmaapUserPassword()) - .dmaapContentType(dmaapReRegistrationConsumerProperties.getDmaapContentType()) - .consumerId(dmaapReRegistrationConsumerProperties.getConsumerId()) - .consumerGroup(dmaapReRegistrationConsumerProperties.getConsumerGroup()) - .timeoutMs(dmaapReRegistrationConsumerProperties.getTimeoutMs()) - .messageLimit(dmaapReRegistrationConsumerProperties.getMessageLimit()) - .enableDmaapCertAuth(securityProperties.isEnableDmaapCertAuth()) - .keyStorePath(securityProperties.getKeyStorePath()) - .keyStorePasswordPath(securityProperties.getKeyStorePasswordPath()) - .trustStorePath(securityProperties.getTrustStorePath()) - .trustStorePasswordPath(securityProperties.getTrustStorePasswordPath()) + private void constructDmaapReRegistrationConfiguration(final SecurityKeys securityKeys) { + dmaapReRegistrationConsumerConfiguration = ImmutableMessageRouterSubscriberConfig.builder() + .securityKeys(securityKeys) .build(); } - private void constructDmaapCpeAuthenticationConfiguration() { - dmaapCpeAuthenticationConsumerConfiguration = new ImmutableDmaapConsumerConfiguration.Builder() - .dmaapHostName(dmaapCpeAuthenticationConsumerProperties.getDmaapHostName()) - .dmaapPortNumber(dmaapCpeAuthenticationConsumerProperties.getDmaapPortNumber()) - .dmaapProtocol(dmaapCpeAuthenticationConsumerProperties.getDmaapProtocol()) - .dmaapTopicName(dmaapCpeAuthenticationConsumerProperties.getDmaapTopicName()) - .dmaapUserName( - dmaapCpeAuthenticationConsumerProperties.getDmaapUserName() == null ? "" : - dmaapCpeAuthenticationConsumerProperties.getDmaapUserName()) - .dmaapUserPassword( - dmaapCpeAuthenticationConsumerProperties.getDmaapUserPassword() == null ? "" : - dmaapCpeAuthenticationConsumerProperties.getDmaapUserPassword()) - .dmaapContentType(dmaapCpeAuthenticationConsumerProperties.getDmaapContentType()) - .consumerId(dmaapCpeAuthenticationConsumerProperties.getConsumerId()) - .consumerGroup(dmaapCpeAuthenticationConsumerProperties.getConsumerGroup()) - .timeoutMs(dmaapCpeAuthenticationConsumerProperties.getTimeoutMs()) - .messageLimit(dmaapCpeAuthenticationConsumerProperties.getMessageLimit()) - .enableDmaapCertAuth(securityProperties.isEnableDmaapCertAuth()) - .keyStorePath(securityProperties.getKeyStorePath()) - .keyStorePasswordPath(securityProperties.getKeyStorePasswordPath()) - .trustStorePath(securityProperties.getTrustStorePath()) - .trustStorePasswordPath(securityProperties.getTrustStorePasswordPath()) + private void constructDmaapCpeAuthenticationConfiguration(final SecurityKeys securityKeys) { + dmaapCpeAuthenticationConsumerConfiguration = ImmutableMessageRouterSubscriberConfig.builder() + .securityKeys(securityKeys) .build(); } - private void constructDmaapProducerConfiguration() { - dmaapPublisherConfiguration = new ImmutableDmaapPublisherConfiguration.Builder() - .dmaapHostName(dmaapProducerProperties.getDmaapHostName()) - .dmaapPortNumber(dmaapProducerProperties.getDmaapPortNumber()) - .dmaapProtocol(dmaapProducerProperties.getDmaapProtocol()) - .dmaapTopicName(dmaapProducerProperties.getDmaapTopicName()) - .dmaapUserName( - dmaapProducerProperties.getDmaapUserName() == null ? "" : - dmaapProducerProperties.getDmaapUserName()) - .dmaapUserPassword( - dmaapProducerProperties.getDmaapUserPassword() == null ? "" : - dmaapProducerProperties.getDmaapUserPassword()) - .dmaapContentType(dmaapProducerProperties.getDmaapContentType()) - .enableDmaapCertAuth(securityProperties.isEnableDmaapCertAuth()) - .keyStorePath(securityProperties.getKeyStorePath()) - .keyStorePasswordPath(securityProperties.getKeyStorePasswordPath()) - .trustStorePath(securityProperties.getTrustStorePath()) - .trustStorePasswordPath(securityProperties.getTrustStorePasswordPath()) + private void constructDmaapProducerConfiguration(final SecurityKeys securityKeys) { + dmaapPublisherConfiguration = ImmutableMessageRouterPublisherConfig.builder() + .securityKeys(securityKeys) .build(); } @@ -362,19 +350,19 @@ public class ApplicationConfiguration implements ConfigurationChangeObservable { } private TopicUrlInfo parseTopicUrl(String topicUrl) { - String[] urlTokens = topicUrl.split(":"); + var urlTokens = topicUrl.split(":"); if (urlTokens.length != 3) { throw new ConfigurationParsingException("Wrong topic URL format"); } - TopicUrlInfo topicUrlInfo = new TopicUrlInfo(); + var topicUrlInfo = new TopicUrlInfo(); topicUrlInfo.setHost(urlTokens[1].replace("/", "")); - String[] tokensAfterHost = urlTokens[2].split("/events/"); + var tokensAfterHost = urlTokens[2].split("/events/"); if (tokensAfterHost.length != 2) { throw new ConfigurationParsingException("Wrong topic name structure"); } topicUrlInfo.setPort(Integer.valueOf(tokensAfterHost[0])); - topicUrlInfo.setTopicName("events/" + tokensAfterHost[1]); + topicUrlInfo.setTopicName(tokensAfterHost[1]); return topicUrlInfo; } diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/config/ApplicationConstants.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/config/ApplicationConstants.java index d2ac3c10..7be6bdea 100644 --- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/config/ApplicationConstants.java +++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/config/ApplicationConstants.java @@ -35,5 +35,9 @@ public class ApplicationConstants { // Close Loop Constants public static final String DCAE_BBS_EVENT_PROCESSOR_MS_INSTANCE = "DCAE.BBS_event_processor_mSInstance"; + // Subscribe for DMaaP + public static final String SUBSCRIBE_URL_TEMPLATE = "%s://%s:%d/events/%s"; + public static final String PUBLISH_URL_TEMPLATE = "%s://%s:%d/events/%s"; + private ApplicationConstants() {} }
\ No newline at end of file diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/config/ConsulConfigurationGateway.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/config/ConsulConfigurationGateway.java index 607b3b31..4ac85495 100644 --- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/config/ConsulConfigurationGateway.java +++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/config/ConsulConfigurationGateway.java @@ -40,8 +40,7 @@ import org.onap.bbs.event.processor.model.ImmutableGeneratedAppConfigObject; import org.onap.bbs.event.processor.model.ImmutableStreamsObject; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClientFactory; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsRequests; -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.CbsRequest; -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperties; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.CbsClientConfiguration; import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -85,9 +84,9 @@ public class ConsulConfigurationGateway { } boolean environmentNotReady() { - String consulHost = System.getenv().get(CONSUL_HOST); - String cbs = System.getenv().get(CONFIG_BINDING_SERVICE); - String hostname = System.getenv().get(HOSTNAME); + var consulHost = System.getenv().get(CONSUL_HOST); + var cbs = System.getenv().get(CONFIG_BINDING_SERVICE); + var hostname = System.getenv().get(HOSTNAME); return consulHost == null || cbs == null || hostname == null; } @@ -106,7 +105,7 @@ public class ConsulConfigurationGateway { private void parseConsulRetrievedConfiguration(JsonObject jsonObject) { - GeneratedAppConfigObject generatedAppConfigObject = generateAppConfigObject(jsonObject); + var generatedAppConfigObject = generateAppConfigObject(jsonObject); LOGGER.trace("Consul-Retrieved Application Generated Object:\n{}", generatedAppConfigObject); configuration.updateCurrentConfiguration(generatedAppConfigObject); } @@ -124,10 +123,10 @@ public class ConsulConfigurationGateway { RequestDiagnosticContext diagnosticContext = RequestDiagnosticContext.create(); // Necessary properties from the environment (Consul host:port, service-name (hostname), CBS name) - EnvProperties env = EnvProperties.fromEnvironment(); - CbsRequest cbsRequest = CbsRequests.getConfiguration(diagnosticContext); + var cbsClientConfig = CbsClientConfiguration.fromEnvironment(); + var cbsRequest = CbsRequests.getConfiguration(diagnosticContext); // Create the client and use it to get the configuration - cbsFetchPipeline = CbsClientFactory.createCbsClient(env) + cbsFetchPipeline = CbsClientFactory.createCbsClient(cbsClientConfig) .doOnError(e -> LOGGER.warn("CBS Configuration fetch failed with error: {}", e)) .retry(e -> true) .flatMapMany(cbsClient -> cbsClient.updates(cbsRequest, initialDelay, period)) @@ -138,57 +137,57 @@ public class ConsulConfigurationGateway { GeneratedAppConfigObject generateAppConfigObject(JsonObject configObject) { if (LOGGER.isInfoEnabled()) { - String configAsString = gson.toJson(configObject); + var configAsString = gson.toJson(configObject); LOGGER.info("Received App Config object\n{}", configAsString); } - final String dmaapProtocol = configObject.get("dmaap.protocol").getAsString(); - final String dmaapContentType = configObject.get("dmaap.contentType").getAsString(); - final String dmaapConsumerId = configObject.get("dmaap.consumer.consumerId").getAsString(); - final String dmaapConsumerGroup = configObject.get("dmaap.consumer.consumerGroup").getAsString(); - final int dmaapMessageLimit = configObject.get("dmaap.messageLimit").getAsInt(); - final int dmaapTimeoutMs = configObject.get("dmaap.timeoutMs").getAsInt(); - - final String aaiHost = configObject.get("aai.host").getAsString(); - final int aaiPort = configObject.get("aai.port").getAsInt(); - final String aaiProtocol = configObject.get("aai.protocol").getAsString(); - final String aaiUsername = configObject.get("aai.username").getAsString(); - final String aaiPassword = configObject.get("aai.password").getAsString(); - final boolean aaiIgnoreSslCertificateErrors = + final var dmaapProtocol = configObject.get("dmaap.protocol").getAsString(); + final var dmaapContentType = configObject.get("dmaap.contentType").getAsString(); + final var dmaapConsumerId = configObject.get("dmaap.consumer.consumerId").getAsString(); + final var dmaapConsumerGroup = configObject.get("dmaap.consumer.consumerGroup").getAsString(); + final var dmaapMessageLimit = configObject.get("dmaap.messageLimit").getAsInt(); + final var dmaapTimeoutMs = configObject.get("dmaap.timeoutMs").getAsInt(); + + final var aaiHost = configObject.get("aai.host").getAsString(); + final var aaiPort = configObject.get("aai.port").getAsInt(); + final var aaiProtocol = configObject.get("aai.protocol").getAsString(); + final var aaiUsername = configObject.get("aai.username").getAsString(); + final var aaiPassword = configObject.get("aai.password").getAsString(); + final var aaiIgnoreSslCertificateErrors = configObject.get("aai.aaiIgnoreSslCertificateErrors").getAsBoolean(); - final int pipelinesPollingIntervalSec = configObject.get("application.pipelinesPollingIntervalSec").getAsInt(); - final int pipelinesTimeoutSec = configObject.get("application.pipelinesTimeoutSec").getAsInt(); - final int cbsPollingIntervalSec = configObject.get("application.cbsPollingIntervalSec").getAsInt(); + final var pipelinesPollingIntervalSec = configObject.get("application.pipelinesPollingIntervalSec").getAsInt(); + final var pipelinesTimeoutSec = configObject.get("application.pipelinesTimeoutSec").getAsInt(); + final var cbsPollingIntervalSec = configObject.get("application.cbsPollingIntervalSec").getAsInt(); - final String reRegPolicyScope = configObject.get("application.reregistration.policyScope").getAsString(); - final String reRegClControlName = configObject.get("application.reregistration.clControlName").getAsString(); - final String cpeAuthPolicyScope = configObject.get("application.cpe.authentication.policyScope").getAsString(); - final String cpeAuthClControlName = + final var reRegPolicyScope = configObject.get("application.reregistration.policyScope").getAsString(); + final var reRegClControlName = configObject.get("application.reregistration.clControlName").getAsString(); + final var cpeAuthPolicyScope = configObject.get("application.cpe.authentication.policyScope").getAsString(); + final var cpeAuthClControlName = configObject.get("application.cpe.authentication.clControlName").getAsString(); - final String policyVersion = configObject.get("application.policyVersion").getAsString(); - final String closeLoopTargetType = configObject.get("application.clTargetType").getAsString(); - final String closeLoopEventStatus = configObject.get("application.clEventStatus").getAsString(); - final String closeLoopVersion = configObject.get("application.clVersion").getAsString(); - final String closeLoopTarget = configObject.get("application.clTarget").getAsString(); - final String closeLoopOriginator = configObject.get("application.clOriginator").getAsString(); + final var policyVersion = configObject.get("application.policyVersion").getAsString(); + final var closeLoopTargetType = configObject.get("application.clTargetType").getAsString(); + final var closeLoopEventStatus = configObject.get("application.clEventStatus").getAsString(); + final var closeLoopVersion = configObject.get("application.clVersion").getAsString(); + final var closeLoopTarget = configObject.get("application.clTarget").getAsString(); + final var closeLoopOriginator = configObject.get("application.clOriginator").getAsString(); - final String reRegConfigKey = configObject.get("application.reregistration.configKey").getAsString(); - final String cpeAuthConfigKey = configObject.get("application.cpeAuth.configKey").getAsString(); - final String closeLoopConfigKey = configObject.get("application.closeLoop.configKey").getAsString(); + final var reRegConfigKey = configObject.get("application.reregistration.configKey").getAsString(); + final var cpeAuthConfigKey = configObject.get("application.cpeAuth.configKey").getAsString(); + final var closeLoopConfigKey = configObject.get("application.closeLoop.configKey").getAsString(); - final String loggingLevel = configObject.get("application.loggingLevel").getAsString(); + final var loggingLevel = configObject.get("application.loggingLevel").getAsString(); - final String keyStorePath = configObject.get("application.ssl.keyStorePath").getAsString(); - final String keyStorePasswordPath = configObject.get("application.ssl.keyStorePasswordPath").getAsString(); - final String trustStorePath = configObject.get("application.ssl.trustStorePath").getAsString(); - final String trustStorePasswordPath = configObject.get("application.ssl.trustStorePasswordPath").getAsString(); - final boolean aaiEnableCertAuth = configObject.get("application.ssl.enableAaiCertAuth").getAsBoolean(); - final boolean dmaapEnableCertAuth = configObject.get("application.ssl.enableDmaapCertAuth").getAsBoolean(); + final var keyStorePath = configObject.get("application.ssl.keyStorePath").getAsString(); + final var keyStorePasswordPath = configObject.get("application.ssl.keyStorePasswordPath").getAsString(); + final var trustStorePath = configObject.get("application.ssl.trustStorePath").getAsString(); + final var trustStorePasswordPath = configObject.get("application.ssl.trustStorePasswordPath").getAsString(); + final var aaiEnableCertAuth = configObject.get("application.ssl.enableAaiCertAuth").getAsBoolean(); + final var dmaapEnableCertAuth = configObject.get("application.ssl.enableDmaapCertAuth").getAsBoolean(); - final JsonObject streamsPublishes = configObject.getAsJsonObject("streams_publishes"); - final JsonObject streamsSubscribes = configObject.getAsJsonObject("streams_subscribes"); + final var streamsPublishes = configObject.getAsJsonObject("streams_publishes"); + final var streamsSubscribes = configObject.getAsJsonObject("streams_subscribes"); return ImmutableGeneratedAppConfigObject.builder() .dmaapProtocol(dmaapProtocol) @@ -259,22 +258,22 @@ public class ConsulConfigurationGateway { private Map.Entry<String, GeneratedAppConfigObject.StreamsObject> parseStreamsSingleObject( Map.Entry<String, JsonElement> jsonEntry) { - JsonObject closeLoopOutput = (JsonObject) jsonEntry.getValue(); + var closeLoopOutput = (JsonObject) jsonEntry.getValue(); - String type = closeLoopOutput.get("type").getAsString(); - String aafUsername = closeLoopOutput.get("aaf_username") != null + var type = closeLoopOutput.get("type").getAsString(); + var aafUsername = closeLoopOutput.get("aaf_username") != null ? closeLoopOutput.get("aaf_username").getAsString() : ""; - String aafPassword = closeLoopOutput.get("aaf_password") != null + var aafPassword = closeLoopOutput.get("aaf_password") != null ? closeLoopOutput.get("aaf_password").getAsString() : ""; - JsonObject dmaapInfo = closeLoopOutput.getAsJsonObject("dmaap_info"); - String clientId = dmaapInfo.get("client_id") != null + var dmaapInfo = closeLoopOutput.getAsJsonObject("dmaap_info"); + var clientId = dmaapInfo.get("client_id") != null ? dmaapInfo.get("client_id").getAsString() : ""; - String clientRole = dmaapInfo.get("client_role") != null + var clientRole = dmaapInfo.get("client_role") != null ? dmaapInfo.get("client_role").getAsString() : ""; - String location = dmaapInfo.get("location") != null + var location = dmaapInfo.get("location") != null ? dmaapInfo.get("location").getAsString() : ""; - String topicUrl = dmaapInfo.get("topic_url").getAsString(); + var topicUrl = dmaapInfo.get("topic_url").getAsString(); GeneratedAppConfigObject.DmaapInfo dmaapInfoObject = ImmutableDmaapInfo.builder() .clientId(clientId) diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/config/MessageRouterConfig.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/config/MessageRouterConfig.java new file mode 100644 index 00000000..b0ecfd51 --- /dev/null +++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/config/MessageRouterConfig.java @@ -0,0 +1,48 @@ +/* + * ============LICENSE_START======================================================= + * BBS-RELOCATION-CPE-AUTHENTICATION-HANDLER + * ================================================================================ + * Copyright (C) 2019 NOKIA Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= +*/ + +package org.onap.bbs.event.processor.config; + +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DmaapClientFactory; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterPublisher; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterSubscriber; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +@Configuration +public class MessageRouterConfig { + + @Bean(name = "ReRegMessageRouterSubscriber") + public MessageRouterSubscriber reRegistrationMessageRouterSubscriber(ApplicationConfiguration configuration) { + return DmaapClientFactory + .createMessageRouterSubscriber(configuration.getDmaapReRegistrationConsumerConfiguration()); + } + + @Bean(name = "CpeAuthMessageRouterSubscriber") + public MessageRouterSubscriber registrationMessageRouterSubscriber(ApplicationConfiguration configuration) { + return DmaapClientFactory + .createMessageRouterSubscriber(configuration.getDmaapCpeAuthenticationConsumerConfiguration()); + } + + @Bean + public MessageRouterPublisher mrPub(ApplicationConfiguration configuration) { + return DmaapClientFactory.createMessageRouterPublisher(configuration.getDmaapPublisherConfiguration()); + } +} diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/exceptions/EmptyDmaapResponseException.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/exceptions/EmptyDmaapResponseException.java index 1be4f43a..e2e49797 100644 --- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/exceptions/EmptyDmaapResponseException.java +++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/exceptions/EmptyDmaapResponseException.java @@ -22,10 +22,6 @@ package org.onap.bbs.event.processor.exceptions; public class EmptyDmaapResponseException extends RuntimeException { - public EmptyDmaapResponseException() { - super(); - } - public EmptyDmaapResponseException(String message) { super(message); } diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/model/ControlLoopPublisherDmaapModel.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/model/ControlLoopPublisherDmaapModel.java index 8fa73e42..c952e9bd 100644 --- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/model/ControlLoopPublisherDmaapModel.java +++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/model/ControlLoopPublisherDmaapModel.java @@ -26,11 +26,10 @@ import java.util.Map; import org.immutables.gson.Gson; import org.immutables.value.Value; -import org.onap.dcaegen2.services.sdk.rest.services.model.DmaapModel; @Value.Immutable @Gson.TypeAdapters(fieldNamingStrategy = true) -public interface ControlLoopPublisherDmaapModel extends DmaapModel { +public interface ControlLoopPublisherDmaapModel { @SerializedName(value = "closedLoopEventClient", alternate = "closedLoopEventClient") String getClosedLoopEventClient(); diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/model/CpeAuthenticationConsumerDmaapModel.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/model/CpeAuthenticationConsumerDmaapModel.java index 42c9896f..50ed66ef 100644 --- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/model/CpeAuthenticationConsumerDmaapModel.java +++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/model/CpeAuthenticationConsumerDmaapModel.java @@ -26,12 +26,10 @@ import java.util.Optional; import org.immutables.gson.Gson; import org.immutables.value.Value; -import org.onap.dcaegen2.services.sdk.rest.services.model.AaiModel; -import org.onap.dcaegen2.services.sdk.rest.services.model.DmaapModel; @Value.Immutable @Gson.TypeAdapters(fieldNamingStrategy = true) -public interface CpeAuthenticationConsumerDmaapModel extends AaiModel, DmaapModel { +public interface CpeAuthenticationConsumerDmaapModel { @SerializedName(value = "correlationId", alternate = "correlationId") String getCorrelationId(); diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/model/PnfAaiObject.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/model/PnfAaiObject.java index 682c0641..f64f30e5 100644 --- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/model/PnfAaiObject.java +++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/model/PnfAaiObject.java @@ -26,11 +26,10 @@ import java.util.Optional; import org.immutables.gson.Gson; import org.immutables.value.Value; -import org.onap.dcaegen2.services.sdk.rest.services.model.ClientModel; @Value.Immutable @Gson.TypeAdapters(fieldNamingStrategy = true, emptyAsNulls = true) -public interface PnfAaiObject extends ClientModel { +public interface PnfAaiObject { @SerializedName(value = "pnf-name", alternate = "pnf-name") String getPnfName(); diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/model/ReRegistrationConsumerDmaapModel.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/model/ReRegistrationConsumerDmaapModel.java index 07fb75aa..3f58aa33 100644 --- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/model/ReRegistrationConsumerDmaapModel.java +++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/model/ReRegistrationConsumerDmaapModel.java @@ -24,12 +24,10 @@ import com.google.gson.annotations.SerializedName; import org.immutables.gson.Gson; import org.immutables.value.Value; -import org.onap.dcaegen2.services.sdk.rest.services.model.AaiModel; -import org.onap.dcaegen2.services.sdk.rest.services.model.DmaapModel; @Value.Immutable @Gson.TypeAdapters(fieldNamingStrategy = true) -public interface ReRegistrationConsumerDmaapModel extends AaiModel, DmaapModel { +public interface ReRegistrationConsumerDmaapModel { @SerializedName(value = "correlationId", alternate = "correlationId") String getCorrelationId(); diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/pipelines/CpeAuthenticationPipeline.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/pipelines/CpeAuthenticationPipeline.java index a30903bb..2cab017f 100644 --- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/pipelines/CpeAuthenticationPipeline.java +++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/pipelines/CpeAuthenticationPipeline.java @@ -25,13 +25,11 @@ import static org.onap.bbs.event.processor.config.ApplicationConstants.DCAE_BBS_ import static org.onap.bbs.event.processor.config.ApplicationConstants.RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME; import static org.onap.bbs.event.processor.config.ApplicationConstants.RETRIEVE_PNF_TASK_NAME; import static org.onap.dcaegen2.services.sdk.rest.services.model.logging.MdcVariables.INSTANCE_UUID; -import static org.onap.dcaegen2.services.sdk.rest.services.model.logging.MdcVariables.RESPONSE_CODE; import java.time.Duration; import java.time.Instant; import java.util.HashMap; import java.util.Map; -import java.util.Optional; import java.util.UUID; import java.util.concurrent.TimeoutException; @@ -42,14 +40,13 @@ import org.onap.bbs.event.processor.exceptions.EmptyDmaapResponseException; import org.onap.bbs.event.processor.model.ControlLoopPublisherDmaapModel; import org.onap.bbs.event.processor.model.CpeAuthenticationConsumerDmaapModel; import org.onap.bbs.event.processor.model.ImmutableControlLoopPublisherDmaapModel; -import org.onap.bbs.event.processor.model.MetadataListAaiObject; import org.onap.bbs.event.processor.model.PnfAaiObject; import org.onap.bbs.event.processor.model.RelationshipListAaiObject; import org.onap.bbs.event.processor.model.ServiceInstanceAaiObject; import org.onap.bbs.event.processor.tasks.AaiClientTask; import org.onap.bbs.event.processor.tasks.DmaapCpeAuthenticationConsumerTask; import org.onap.bbs.event.processor.tasks.DmaapPublisherTask; -import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpResponse; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.slf4j.MDC; @@ -99,7 +96,7 @@ public class CpeAuthenticationPipeline { LOGGER.trace("Reactive CPE Authentication pipeline subscribed - Execution started"); } - Flux<HttpResponse> executePipeline() { + Flux<MessageRouterPublishResponse> executePipeline() { return // Consume CPE Authentication from DMaaP consumeCpeAuthenticationFromDmaap() @@ -111,12 +108,12 @@ public class CpeAuthenticationPipeline { .flatMap(this::triggerPolicy); } - private void onSuccess(HttpResponse responseCode) { - MDC.put(RESPONSE_CODE, String.valueOf(responseCode.statusCode())); - LOGGER.info("CPE Authentication event successfully handled. " - + "Publishing to DMaaP for Policy returned a status code of ({} {})", - responseCode.statusCode(), responseCode.statusReason()); - MDC.remove(RESPONSE_CODE); + private void onSuccess(MessageRouterPublishResponse response) { + if (response.successful()) { + LOGGER.info("CPE Authentication event successfully handled. Published Policy event to DMaaP"); + } else { + LOGGER.error("CPE Authentication event handling error [{}]", response.failReason()); + } } private void onError(Throwable throwable) { @@ -149,7 +146,7 @@ public class CpeAuthenticationPipeline { .map(event -> { // For each message, we have to keep separate state. This state will be enhanced // in each step and handed off to the next processing step - PipelineState state = new PipelineState(); + var state = new PipelineState(); state.setCpeAuthenticationEvent(event); return state; }); @@ -161,9 +158,9 @@ public class CpeAuthenticationPipeline { private Mono<PipelineState> fetchPnfFromAai(PipelineState state) { - CpeAuthenticationConsumerDmaapModel vesEvent = state.getCpeAuthenticationEvent(); - String pnfName = vesEvent.getCorrelationId(); - String url = String.format("/aai/v14/network/pnfs/pnf/%s?depth=all", pnfName); + var vesEvent = state.getCpeAuthenticationEvent(); + var pnfName = vesEvent.getCorrelationId(); + var url = String.format("/aai/v14/network/pnfs/pnf/%s?depth=all", pnfName); LOGGER.debug("Processing Step: Retrieve PNF. Url: ({})", url); return aaiClientTask.executePnfRetrieval(RETRIEVE_PNF_TASK_NAME, url) @@ -191,10 +188,10 @@ public class CpeAuthenticationPipeline { return Mono.empty(); } - PnfAaiObject pnf = state.getPnfAaiObject(); + var pnf = state.getPnfAaiObject(); // Assuming that the PNF will only have a single service-instance relationship pointing // towards the HSI CFS service - String serviceInstanceId = pnf.getRelationshipListAaiObject().getRelationshipEntries() + var serviceInstanceId = pnf.getRelationshipListAaiObject().getRelationshipEntries() .stream() .filter(e -> "service-instance".equals(e.getRelatedTo())) .flatMap(e -> e.getRelationshipData().stream()) @@ -208,7 +205,7 @@ public class CpeAuthenticationPipeline { return Mono.empty(); } - String url = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all", + var url = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all", serviceInstanceId); LOGGER.debug("Processing Step: Retrieve HSI CFS Service. Url: ({})", url); return aaiClientTask.executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, url) @@ -230,21 +227,13 @@ public class CpeAuthenticationPipeline { }); } - private Mono<HttpResponse> triggerPolicy(PipelineState state) { + private Flux<MessageRouterPublishResponse> triggerPolicy(PipelineState state) { if (state == null || state.getHsiCfsServiceInstance() == null) { - return Mono.empty(); - } - - // At this point, we must check if the PNF RGW MAC address matches the value extracted from VES event - if (!isCorrectMacAddress(state)) { - LOGGER.warn("Processing stopped. RGW MAC address taken from event ({}) " - + "does not match with A&AI metadata corresponding value", - state.getCpeAuthenticationEvent().getRgwMacAddress()); - return Mono.empty(); + return Flux.empty(); } - ControlLoopPublisherDmaapModel event = buildTriggeringPolicyEvent(state); + var event = buildTriggeringPolicyEvent(state); return publisherTask.execute(event) .timeout(Duration.ofSeconds(configuration.getPipelinesTimeoutInSeconds())) .doOnError(TimeoutException.class, @@ -256,30 +245,15 @@ public class CpeAuthenticationPipeline { e -> Mono.empty()); } - - private boolean isCorrectMacAddress(PipelineState state) { - // We need to check if the RGW MAC address received in VES event matches the one found in - // HSIA CFS service (in its metadata section) - Optional<MetadataListAaiObject> optionalMetadata = state.getHsiCfsServiceInstance() - .getMetadataListAaiObject(); - String eventRgwMacAddress = state.getCpeAuthenticationEvent().getRgwMacAddress().orElse(""); - return optionalMetadata - .map(list -> list.getMetadataEntries() - .stream() - .anyMatch(m -> "rgw-mac-address".equals(m.getMetaname()) - && m.getMetavalue().equals(eventRgwMacAddress))) - .orElse(false); - } - private ControlLoopPublisherDmaapModel buildTriggeringPolicyEvent(PipelineState state) { - String cfsServiceInstanceId = state.getHsiCfsServiceInstance().getServiceInstanceId(); + var cfsServiceInstanceId = state.getHsiCfsServiceInstance().getServiceInstanceId(); Map<String, String> enrichmentData = new HashMap<>(); enrichmentData.put("service-information.hsia-cfs-service-instance-id", cfsServiceInstanceId); enrichmentData.put("cpe.old-authentication-state", state.cpeAuthenticationEvent.getOldAuthenticationState()); enrichmentData.put("cpe.new-authentication-state", state.cpeAuthenticationEvent.getNewAuthenticationState()); - String swVersion = state.getCpeAuthenticationEvent().getSwVersion().orElse(""); + var swVersion = state.getCpeAuthenticationEvent().getSwVersion().orElse(""); if (!StringUtils.isEmpty(swVersion)) { enrichmentData.put("cpe.swVersion", swVersion); } diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/pipelines/ReRegistrationPipeline.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/pipelines/ReRegistrationPipeline.java index 33a9aea7..0bc5e353 100644 --- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/pipelines/ReRegistrationPipeline.java +++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/pipelines/ReRegistrationPipeline.java @@ -25,12 +25,10 @@ import static org.onap.bbs.event.processor.config.ApplicationConstants.DCAE_BBS_ import static org.onap.bbs.event.processor.config.ApplicationConstants.RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME; import static org.onap.bbs.event.processor.config.ApplicationConstants.RETRIEVE_PNF_TASK_NAME; import static org.onap.dcaegen2.services.sdk.rest.services.model.logging.MdcVariables.INSTANCE_UUID; -import static org.onap.dcaegen2.services.sdk.rest.services.model.logging.MdcVariables.RESPONSE_CODE; import java.time.Duration; import java.time.Instant; import java.util.HashMap; -import java.util.List; import java.util.Map; import java.util.UUID; import java.util.concurrent.TimeoutException; @@ -48,7 +46,7 @@ import org.onap.bbs.event.processor.model.ServiceInstanceAaiObject; import org.onap.bbs.event.processor.tasks.AaiClientTask; import org.onap.bbs.event.processor.tasks.DmaapPublisherTask; import org.onap.bbs.event.processor.tasks.DmaapReRegistrationConsumerTask; -import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpResponse; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.slf4j.MDC; @@ -98,7 +96,7 @@ public class ReRegistrationPipeline { LOGGER.trace("Reactive PNF Re-registration pipeline subscribed - Execution started"); } - Flux<HttpResponse> executePipeline() { + Flux<MessageRouterPublishResponse> executePipeline() { return // Consume Re-Registration from DMaaP consumeReRegistrationsFromDmaap() @@ -110,12 +108,12 @@ public class ReRegistrationPipeline { .flatMap(this::triggerPolicy); } - private void onSuccess(HttpResponse responseCode) { - MDC.put(RESPONSE_CODE, String.valueOf(responseCode.statusCode())); - LOGGER.info("PNF Re-Registration event successfully handled. " - + "Publishing to DMaaP for Policy returned a status code of ({} {})", - responseCode.statusCode(), responseCode.statusReason()); - MDC.remove(RESPONSE_CODE); + private void onSuccess(MessageRouterPublishResponse response) { + if (response.successful()) { + LOGGER.info("PNF Re-Registration event successfully handled. Published Policy event to DMaaP"); + } else { + LOGGER.error("PNF Re-Registration event handling error [{}]", response.failReason()); + } } private void onError(Throwable throwable) { @@ -148,7 +146,7 @@ public class ReRegistrationPipeline { .map(event -> { // For each message, we have to keep separate state. This state will be enhanced // in each step and handed off to the next processing step - PipelineState state = new PipelineState(); + var state = new PipelineState(); state.setReRegistrationEvent(event); return state; }); @@ -160,9 +158,9 @@ public class ReRegistrationPipeline { private Mono<PipelineState> fetchPnfFromAai(PipelineState state) { - ReRegistrationConsumerDmaapModel vesEvent = state.getReRegistrationEvent(); - String pnfName = vesEvent.getCorrelationId(); - String url = String.format("/aai/v14/network/pnfs/pnf/%s?depth=all", pnfName); + var vesEvent = state.getReRegistrationEvent(); + var pnfName = vesEvent.getCorrelationId(); + var url = String.format("/aai/v14/network/pnfs/pnf/%s?depth=all", pnfName); LOGGER.debug("Processing Step: Retrieve PNF. Url: ({})", url); return aaiClientTask.executePnfRetrieval(RETRIEVE_PNF_TASK_NAME, url) @@ -196,10 +194,10 @@ public class ReRegistrationPipeline { return Mono.empty(); } - PnfAaiObject pnf = state.getPnfAaiObject(); + var pnf = state.getPnfAaiObject(); // Assuming that the PNF will only have a single service-instance relationship pointing // towards the HSI CFS service - String serviceInstanceId = pnf.getRelationshipListAaiObject().getRelationshipEntries() + var serviceInstanceId = pnf.getRelationshipListAaiObject().getRelationshipEntries() .stream() .filter(e -> "service-instance".equals(e.getRelatedTo())) .flatMap(e -> e.getRelationshipData().stream()) @@ -213,7 +211,7 @@ public class ReRegistrationPipeline { return Mono.empty(); } - String url = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all", + var url = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all", serviceInstanceId); LOGGER.debug("Processing Step: Retrieve HSI CFS Service. Url: ({})", url); return aaiClientTask.executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, url) @@ -236,8 +234,7 @@ public class ReRegistrationPipeline { } private boolean isNotReallyAnOntRelocation(PipelineState state) { - List<RelationshipListAaiObject.RelationshipEntryAaiObject> relationshipEntries = - state.getPnfAaiObject().getRelationshipListAaiObject().getRelationshipEntries(); + var relationshipEntries = state.getPnfAaiObject().getRelationshipListAaiObject().getRelationshipEntries(); // If no logical-link, fail further processing if (relationshipEntries.stream().noneMatch(e -> "logical-link".equals(e.getRelatedTo()))) { @@ -247,7 +244,7 @@ public class ReRegistrationPipeline { } // Assuming PNF will only have one logical-link per BBS use case design - boolean isNotRelocation = relationshipEntries + var isNotRelocation = relationshipEntries .stream() .filter(e -> "logical-link".equals(e.getRelatedTo())) .flatMap(e -> e.getRelationshipData().stream()) @@ -263,13 +260,13 @@ public class ReRegistrationPipeline { return isNotRelocation; } - private Mono<HttpResponse> triggerPolicy(PipelineState state) { + private Flux<MessageRouterPublishResponse> triggerPolicy(PipelineState state) { if (state == null || state.getHsiCfsServiceInstance() == null) { - return Mono.empty(); + return Flux.empty(); } - ControlLoopPublisherDmaapModel event = buildTriggeringPolicyEvent(state); + var event = buildTriggeringPolicyEvent(state); return publisherTask.execute(event) .timeout(Duration.ofSeconds(configuration.getPipelinesTimeoutInSeconds())) .doOnError(TimeoutException.class, @@ -283,12 +280,12 @@ public class ReRegistrationPipeline { private ControlLoopPublisherDmaapModel buildTriggeringPolicyEvent(PipelineState state) { - String cfsServiceInstanceId = state.getHsiCfsServiceInstance().getServiceInstanceId(); + var cfsServiceInstanceId = state.getHsiCfsServiceInstance().getServiceInstanceId(); - String attachmentPoint = state.getReRegistrationEvent().getAttachmentPoint(); - String remoteId = state.getReRegistrationEvent().getRemoteId(); - String cvlan = state.getReRegistrationEvent().getCVlan(); - String svlan = state.getReRegistrationEvent().getSVlan(); + var attachmentPoint = state.getReRegistrationEvent().getAttachmentPoint(); + var remoteId = state.getReRegistrationEvent().getRemoteId(); + var cvlan = state.getReRegistrationEvent().getCVlan(); + var svlan = state.getReRegistrationEvent().getSVlan(); Map<String, String> enrichmentData = new HashMap<>(); enrichmentData.put("service-information.hsia-cfs-service-instance-id", cfsServiceInstanceId); diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/pipelines/Scheduler.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/pipelines/Scheduler.java index 5fbb0875..882635ce 100644 --- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/pipelines/Scheduler.java +++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/pipelines/Scheduler.java @@ -96,7 +96,7 @@ public class Scheduler implements ConfigurationChangeObserver { LOGGER.info("BBS event processing pipelines will start in {} seconds " + "and will run periodically every {} seconds", PIPELINES_INITIAL_DELAY_IN_SECONDS, currentPipelinesPollingInterval); - Instant desiredStartTime = Instant.now().plusSeconds(PIPELINES_INITIAL_DELAY_IN_SECONDS); + var desiredStartTime = Instant.now().plusSeconds(PIPELINES_INITIAL_DELAY_IN_SECONDS); scheduleProcessingTasks(desiredStartTime, currentPipelinesPollingInterval); // Register for configuration changes @@ -118,7 +118,7 @@ public class Scheduler implements ConfigurationChangeObserver { cancelScheduledProcessingTasks(); reScheduleProcessingTasks(); } - int newCbsPollingInterval = configuration.getCbsPollingInterval(); + var newCbsPollingInterval = configuration.getCbsPollingInterval(); if (newCbsPollingInterval != currentCbsPollingInterval) { if (newCbsPollingInterval < DEFAULT_CBS_POLLING_INTERVAL) { LOGGER.warn("CBS Polling interval is too small ({}). Will not re-schedule CBS job", @@ -222,9 +222,9 @@ public class Scheduler implements ConfigurationChangeObserver { } private int validatePipelinesPollingInterval() { - int pipelinesPollingInterval = configuration.getPipelinesPollingIntervalInSeconds(); - boolean isSmallInterval = pipelinesPollingInterval < DEFAULT_PIPELINES_POLLING_INTERVAL; - int verifiedInterval = isSmallInterval ? DEFAULT_PIPELINES_POLLING_INTERVAL : pipelinesPollingInterval; + var pipelinesPollingInterval = configuration.getPipelinesPollingIntervalInSeconds(); + var isSmallInterval = pipelinesPollingInterval < DEFAULT_PIPELINES_POLLING_INTERVAL; + var verifiedInterval = isSmallInterval ? DEFAULT_PIPELINES_POLLING_INTERVAL : pipelinesPollingInterval; if (isSmallInterval) { LOGGER.warn("Pipelines Polling interval is too small ({}). Defaulting to {}", pipelinesPollingInterval, DEFAULT_PIPELINES_POLLING_INTERVAL); @@ -233,9 +233,9 @@ public class Scheduler implements ConfigurationChangeObserver { } private int verifyCbsPollingInterval() { - int cbsPollingInterval = configuration.getCbsPollingInterval(); - boolean isSmallInterval = cbsPollingInterval < DEFAULT_CBS_POLLING_INTERVAL; - int verifiedInterval = isSmallInterval ? DEFAULT_CBS_POLLING_INTERVAL : cbsPollingInterval; + var cbsPollingInterval = configuration.getCbsPollingInterval(); + var isSmallInterval = cbsPollingInterval < DEFAULT_CBS_POLLING_INTERVAL; + var verifiedInterval = isSmallInterval ? DEFAULT_CBS_POLLING_INTERVAL : cbsPollingInterval; if (isSmallInterval) { LOGGER.warn("CBS Polling interval is too small ({}). Defaulting to {}", cbsPollingInterval, DEFAULT_CBS_POLLING_INTERVAL); diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapCpeAuthenticationConsumerTaskImpl.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapCpeAuthenticationConsumerTaskImpl.java index da510281..153cb91b 100644 --- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapCpeAuthenticationConsumerTaskImpl.java +++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapCpeAuthenticationConsumerTaskImpl.java @@ -20,59 +20,61 @@ package org.onap.bbs.event.processor.tasks; -import com.google.gson.JsonElement; - -import java.util.Optional; +import static org.onap.bbs.event.processor.config.ApplicationConstants.SUBSCRIBE_URL_TEMPLATE; +import static org.onap.bbs.event.processor.utilities.GenericUtils.createSubscribeRequest; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; -import javax.net.ssl.SSLException; import org.onap.bbs.event.processor.config.ApplicationConfiguration; import org.onap.bbs.event.processor.config.ConfigurationChangeObserver; import org.onap.bbs.event.processor.exceptions.EmptyDmaapResponseException; import org.onap.bbs.event.processor.model.CpeAuthenticationConsumerDmaapModel; import org.onap.bbs.event.processor.utilities.CpeAuthenticationDmaapConsumerJsonParser; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.ConsumerReactiveHttpClientFactory; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.DMaaPConsumerReactiveHttpClient; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.DMaaPReactiveWebClientFactory; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterSubscriber; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.stereotype.Component; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @Component -public class DmaapCpeAuthenticationConsumerTaskImpl - implements DmaapCpeAuthenticationConsumerTask, ConfigurationChangeObserver { +public class DmaapCpeAuthenticationConsumerTaskImpl implements DmaapCpeAuthenticationConsumerTask, + ConfigurationChangeObserver { private static final Logger LOGGER = LoggerFactory.getLogger(DmaapCpeAuthenticationConsumerTaskImpl.class); - private ApplicationConfiguration configuration; + private final CpeAuthenticationDmaapConsumerJsonParser cpeAuthenticationDmaapConsumerJsonParser; - private final ConsumerReactiveHttpClientFactory httpClientFactory; + private ApplicationConfiguration configuration; + private MessageRouterSubscriber subscriber; + private String subscribeUrl; + private MessageRouterSubscribeRequest subscribeRequest; private static final EmptyDmaapResponseException EMPTY_DMAAP_EXCEPTION = - new EmptyDmaapResponseException("CPE Authentication: Got an empty response from DMaaP"); - - private DMaaPConsumerReactiveHttpClient httpClient; + new EmptyDmaapResponseException("CPE Authentication: Got an empty response from DMaaP"); @Autowired - public DmaapCpeAuthenticationConsumerTaskImpl(ApplicationConfiguration configuration) throws SSLException { - this(configuration, new CpeAuthenticationDmaapConsumerJsonParser(), - new ConsumerReactiveHttpClientFactory(new DMaaPReactiveWebClientFactory())); - } - DmaapCpeAuthenticationConsumerTaskImpl(ApplicationConfiguration configuration, - CpeAuthenticationDmaapConsumerJsonParser - cpeAuthenticationDmaapConsumerJsonParser, - ConsumerReactiveHttpClientFactory httpClientFactory) throws SSLException { + @Qualifier("CpeAuthMessageRouterSubscriber") + MessageRouterSubscriber subscriber, + CpeAuthenticationDmaapConsumerJsonParser parser) { + this.cpeAuthenticationDmaapConsumerJsonParser = parser; this.configuration = configuration; - this.cpeAuthenticationDmaapConsumerJsonParser = cpeAuthenticationDmaapConsumerJsonParser; - this.httpClientFactory = httpClientFactory; - - httpClient = httpClientFactory.create(this.configuration.getDmaapCpeAuthenticationConsumerConfiguration()); + this.subscriber = subscriber; + subscribeUrl = String.format(SUBSCRIBE_URL_TEMPLATE, + this.configuration.getDmaapCpeAuthenticationConsumerProperties().getDmaapProtocol(), + this.configuration.getDmaapCpeAuthenticationConsumerProperties().getDmaapHostName(), + this.configuration.getDmaapCpeAuthenticationConsumerProperties().getDmaapPortNumber(), + this.configuration.getDmaapCpeAuthenticationConsumerProperties().getDmaapTopicName()); + + subscribeRequest = createSubscribeRequest( + subscribeUrl, + this.configuration.getDmaapCpeAuthenticationConsumerProperties().getConsumerGroup(), + this.configuration.getDmaapCpeAuthenticationConsumerProperties().getConsumerId()); } @PostConstruct @@ -87,24 +89,25 @@ public class DmaapCpeAuthenticationConsumerTaskImpl @Override public synchronized void updateConfiguration() { - try { - LOGGER.info("DMaaP CPE authentication consumer update due to new application configuration"); - LOGGER.info("Creating secure context with:\n {}", - this.configuration.getDmaapCpeAuthenticationConsumerConfiguration()); - httpClient = httpClientFactory.create(this.configuration.getDmaapCpeAuthenticationConsumerConfiguration()); - } catch (SSLException e) { - LOGGER.error("SSL error while updating HTTP Client after a config update"); - LOGGER.debug("SSL exception\n", e); - } + LOGGER.info("DMaaP CPE authentication consumer update due to new application configuration"); + subscribeUrl = String.format(SUBSCRIBE_URL_TEMPLATE, + configuration.getDmaapCpeAuthenticationConsumerProperties().getDmaapProtocol(), + configuration.getDmaapCpeAuthenticationConsumerProperties().getDmaapHostName(), + configuration.getDmaapCpeAuthenticationConsumerProperties().getDmaapPortNumber(), + configuration.getDmaapCpeAuthenticationConsumerProperties().getDmaapTopicName()); + subscribeRequest = createSubscribeRequest( + subscribeUrl, + this.configuration.getDmaapCpeAuthenticationConsumerProperties().getConsumerGroup(), + this.configuration.getDmaapCpeAuthenticationConsumerProperties().getConsumerId()); } @Override public Flux<CpeAuthenticationConsumerDmaapModel> execute(String taskName) { LOGGER.debug("Executing task for CPE-Authentication with name \"{}\"", taskName); - DMaaPConsumerReactiveHttpClient httpClient = getHttpClient(); - Mono<JsonElement> response = httpClient.getDMaaPConsumerResponse(Optional.empty()); - return cpeAuthenticationDmaapConsumerJsonParser.extractModelFromDmaap(response) - .switchIfEmpty(Flux.error(EMPTY_DMAAP_EXCEPTION)) + return subscriber.getElements(subscribeRequest) // subscriber.get(subscribeRequest) + .flatMap(jsonElement -> + cpeAuthenticationDmaapConsumerJsonParser.extractModelFromDmaap(Mono.just(jsonElement))) + .switchIfEmpty(Mono.error(EMPTY_DMAAP_EXCEPTION)) .doOnError(e -> { if (!(e instanceof EmptyDmaapResponseException)) { LOGGER.error("DMaaP Consumption Exception: {}", e.getMessage()); @@ -112,8 +115,4 @@ public class DmaapCpeAuthenticationConsumerTaskImpl } }); } - - private synchronized DMaaPConsumerReactiveHttpClient getHttpClient() { - return httpClient; - } } diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapPublisherTask.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapPublisherTask.java index 749c4e53..dec1dbcd 100644 --- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapPublisherTask.java +++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapPublisherTask.java @@ -21,11 +21,11 @@ package org.onap.bbs.event.processor.tasks; import org.onap.bbs.event.processor.model.ControlLoopPublisherDmaapModel; -import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpResponse; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse; -import reactor.core.publisher.Mono; +import reactor.core.publisher.Flux; public interface DmaapPublisherTask { - Mono<HttpResponse> execute(ControlLoopPublisherDmaapModel controlLoopPublisherDmaapModel); + Flux<MessageRouterPublishResponse> execute(ControlLoopPublisherDmaapModel controlLoopPublisherDmaapModel); } diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapPublisherTaskImpl.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapPublisherTaskImpl.java index 283e5ef9..6c50b10d 100644 --- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapPublisherTaskImpl.java +++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapPublisherTaskImpl.java @@ -20,54 +20,48 @@ package org.onap.bbs.event.processor.tasks; -import java.util.Optional; +import static org.onap.bbs.event.processor.config.ApplicationConstants.PUBLISH_URL_TEMPLATE; +import static org.onap.bbs.event.processor.utilities.GenericUtils.createPublishRequest; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; -import javax.net.ssl.SSLException; import org.onap.bbs.event.processor.config.ApplicationConfiguration; import org.onap.bbs.event.processor.config.ConfigurationChangeObserver; import org.onap.bbs.event.processor.exceptions.DmaapException; import org.onap.bbs.event.processor.model.ControlLoopPublisherDmaapModel; import org.onap.bbs.event.processor.utilities.ControlLoopJsonBodyBuilder; -import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpResponse; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.producer.DMaaPPublisherReactiveHttpClient; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.producer.DmaaPRestTemplateFactory; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.producer.PublisherReactiveHttpClientFactory; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DmaapClientFactory; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterPublisher; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; -import reactor.core.publisher.Mono; +import reactor.core.publisher.Flux; @Component public class DmaapPublisherTaskImpl implements DmaapPublisherTask, ConfigurationChangeObserver { private static final Logger LOGGER = LoggerFactory.getLogger(DmaapPublisherTaskImpl.class); - private ApplicationConfiguration configuration; - private final PublisherReactiveHttpClientFactory httpClientFactory; - private DMaaPPublisherReactiveHttpClient httpClient; + private ApplicationConfiguration configuration; + private MessageRouterPublisher publisher; + private String publishUrl; + private MessageRouterPublishRequest publishRequest; @Autowired - DmaapPublisherTaskImpl(ApplicationConfiguration configuration) { - this(configuration, new PublisherReactiveHttpClientFactory(new DmaaPRestTemplateFactory(), - new ControlLoopJsonBodyBuilder())); - } - - DmaapPublisherTaskImpl(ApplicationConfiguration configuration, - PublisherReactiveHttpClientFactory httpClientFactory) { + DmaapPublisherTaskImpl(ApplicationConfiguration configuration, MessageRouterPublisher publisher) { this.configuration = configuration; - this.httpClientFactory = httpClientFactory; - - try { - httpClient = httpClientFactory.create(this.configuration.getDmaapPublisherConfiguration()); - } catch (SSLException e) { - LOGGER.error("SSL error while creating HTTP Client: {}", e.getMessage()); - LOGGER.debug("SSL exception\n", e); - } + this.publisher = publisher; + publishUrl = String.format(PUBLISH_URL_TEMPLATE, + this.configuration.getDmaapProducerProperties().getDmaapProtocol(), + this.configuration.getDmaapProducerProperties().getDmaapHostName(), + this.configuration.getDmaapProducerProperties().getDmaapPortNumber(), + this.configuration.getDmaapProducerProperties().getDmaapTopicName()); + publishRequest = createPublishRequest(publishUrl); } @PostConstruct @@ -83,27 +77,23 @@ public class DmaapPublisherTaskImpl implements DmaapPublisherTask, Configuration @Override public synchronized void updateConfiguration() { LOGGER.info("DMaaP Publisher update due to new application configuration"); - try { - LOGGER.info("Creating secure context with:\n {}", this.configuration.getDmaapPublisherConfiguration()); - httpClient = httpClientFactory.create(this.configuration.getDmaapPublisherConfiguration()); - } catch (SSLException e) { - LOGGER.error("SSL error while updating HTTP Client after a config update: {}", e.getMessage()); - LOGGER.debug("SSL exception\n", e); - } + publisher = + DmaapClientFactory.createMessageRouterPublisher(configuration.getDmaapPublisherConfiguration()); + publishUrl = String.format(PUBLISH_URL_TEMPLATE, + configuration.getDmaapProducerProperties().getDmaapProtocol(), + configuration.getDmaapProducerProperties().getDmaapHostName(), + configuration.getDmaapProducerProperties().getDmaapPortNumber(), + configuration.getDmaapProducerProperties().getDmaapTopicName()); + publishRequest = createPublishRequest(publishUrl); } @Override - public Mono<HttpResponse> execute(ControlLoopPublisherDmaapModel controlLoopPublisherDmaapModel) { - if (controlLoopPublisherDmaapModel == null) { + public Flux<MessageRouterPublishResponse> execute(ControlLoopPublisherDmaapModel event) { + if (event == null) { throw new DmaapException("Cannot invoke a DMaaP Publish task with a null message"); } LOGGER.info("Executing task for publishing control loop message"); - LOGGER.debug("CL message \n{}", controlLoopPublisherDmaapModel); - DMaaPPublisherReactiveHttpClient httpClient = getHttpClient(); - return httpClient.getDMaaPProducerResponse(controlLoopPublisherDmaapModel, Optional.empty()); - } - - private synchronized DMaaPPublisherReactiveHttpClient getHttpClient() { - return httpClient; + LOGGER.debug("CL message \n{}", event); + return publisher.put(publishRequest, Flux.just(ControlLoopJsonBodyBuilder.createAsJsonElement(event))); } } diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapReRegistrationConsumerTaskImpl.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapReRegistrationConsumerTaskImpl.java index e40037b1..aff563c5 100644 --- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapReRegistrationConsumerTaskImpl.java +++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapReRegistrationConsumerTaskImpl.java @@ -20,25 +20,23 @@ package org.onap.bbs.event.processor.tasks; -import com.google.gson.JsonElement; - -import java.util.Optional; +import static org.onap.bbs.event.processor.config.ApplicationConstants.SUBSCRIBE_URL_TEMPLATE; +import static org.onap.bbs.event.processor.utilities.GenericUtils.createSubscribeRequest; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; -import javax.net.ssl.SSLException; import org.onap.bbs.event.processor.config.ApplicationConfiguration; import org.onap.bbs.event.processor.config.ConfigurationChangeObserver; import org.onap.bbs.event.processor.exceptions.EmptyDmaapResponseException; import org.onap.bbs.event.processor.model.ReRegistrationConsumerDmaapModel; import org.onap.bbs.event.processor.utilities.ReRegistrationDmaapConsumerJsonParser; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.ConsumerReactiveHttpClientFactory; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.DMaaPConsumerReactiveHttpClient; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.DMaaPReactiveWebClientFactory; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterSubscriber; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.stereotype.Component; import reactor.core.publisher.Flux; @@ -49,30 +47,33 @@ public class DmaapReRegistrationConsumerTaskImpl implements DmaapReRegistrationC ConfigurationChangeObserver { private static final Logger LOGGER = LoggerFactory.getLogger(DmaapReRegistrationConsumerTaskImpl.class); - private ApplicationConfiguration configuration; + private final ReRegistrationDmaapConsumerJsonParser reRegistrationDmaapConsumerJsonParser; - private final ConsumerReactiveHttpClientFactory httpClientFactory; + private ApplicationConfiguration configuration; + private MessageRouterSubscriber subscriber; + private String subscribeUrl; + private MessageRouterSubscribeRequest subscribeRequest; private static final EmptyDmaapResponseException EMPTY_DMAAP_EXCEPTION = new EmptyDmaapResponseException("PNF Re-Registration: Got an empty response from DMaaP"); - private DMaaPConsumerReactiveHttpClient httpClient; - @Autowired - public DmaapReRegistrationConsumerTaskImpl(ApplicationConfiguration configuration) throws SSLException { - this(configuration, new ReRegistrationDmaapConsumerJsonParser(), - new ConsumerReactiveHttpClientFactory(new DMaaPReactiveWebClientFactory())); - } - DmaapReRegistrationConsumerTaskImpl(ApplicationConfiguration configuration, - ReRegistrationDmaapConsumerJsonParser reRegDmaapConsumerJsonParser, - ConsumerReactiveHttpClientFactory httpClientFactory) - throws SSLException { + @Qualifier("ReRegMessageRouterSubscriber") MessageRouterSubscriber subscriber, + ReRegistrationDmaapConsumerJsonParser parser) { + this.reRegistrationDmaapConsumerJsonParser = parser; this.configuration = configuration; - this.reRegistrationDmaapConsumerJsonParser = reRegDmaapConsumerJsonParser; - this.httpClientFactory = httpClientFactory; - - httpClient = httpClientFactory.create(this.configuration.getDmaapReRegistrationConsumerConfiguration()); + this.subscriber = subscriber; + subscribeUrl = String.format(SUBSCRIBE_URL_TEMPLATE, + this.configuration.getDmaapReRegistrationConsumerProperties().getDmaapProtocol(), + this.configuration.getDmaapReRegistrationConsumerProperties().getDmaapHostName(), + this.configuration.getDmaapReRegistrationConsumerProperties().getDmaapPortNumber(), + this.configuration.getDmaapReRegistrationConsumerProperties().getDmaapTopicName()); + + subscribeRequest = createSubscribeRequest( + subscribeUrl, + this.configuration.getDmaapReRegistrationConsumerProperties().getConsumerGroup(), + this.configuration.getDmaapReRegistrationConsumerProperties().getConsumerId()); } @PostConstruct @@ -87,24 +88,25 @@ public class DmaapReRegistrationConsumerTaskImpl implements DmaapReRegistrationC @Override public synchronized void updateConfiguration() { - try { - LOGGER.info("DMaaP PNF reregistration consumer update due to new application configuration"); - LOGGER.info("Creating secure context with:\n {}", - this.configuration.getDmaapReRegistrationConsumerConfiguration()); - httpClient = httpClientFactory.create(this.configuration.getDmaapReRegistrationConsumerConfiguration()); - } catch (SSLException e) { - LOGGER.error("SSL error while updating HTTP Client after a config update"); - LOGGER.debug("SSL exception\n", e); - } + LOGGER.info("DMaaP PNF reregistration consumer update due to new application configuration"); + subscribeUrl = String.format(SUBSCRIBE_URL_TEMPLATE, + configuration.getDmaapReRegistrationConsumerProperties().getDmaapProtocol(), + configuration.getDmaapReRegistrationConsumerProperties().getDmaapHostName(), + configuration.getDmaapReRegistrationConsumerProperties().getDmaapPortNumber(), + configuration.getDmaapReRegistrationConsumerProperties().getDmaapTopicName()); + subscribeRequest = createSubscribeRequest( + subscribeUrl, + this.configuration.getDmaapReRegistrationConsumerProperties().getConsumerGroup(), + this.configuration.getDmaapReRegistrationConsumerProperties().getConsumerId()); } @Override public Flux<ReRegistrationConsumerDmaapModel> execute(String taskName) { LOGGER.debug("Executing task for Re-Registration with name \"{}\"", taskName); - DMaaPConsumerReactiveHttpClient httpClient = getHttpClient(); - Mono<JsonElement> response = httpClient.getDMaaPConsumerResponse(Optional.empty()); - return reRegistrationDmaapConsumerJsonParser.extractModelFromDmaap(response) - .switchIfEmpty(Flux.error(EMPTY_DMAAP_EXCEPTION)) + return subscriber.getElements(subscribeRequest) + .flatMap(jsonElement -> + reRegistrationDmaapConsumerJsonParser.extractModelFromDmaap(Mono.just(jsonElement))) + .switchIfEmpty(Mono.error(EMPTY_DMAAP_EXCEPTION)) .doOnError(e -> { if (!(e instanceof EmptyDmaapResponseException)) { LOGGER.error("DMaaP Consumption Exception: {}", e.getMessage()); @@ -112,8 +114,4 @@ public class DmaapReRegistrationConsumerTaskImpl implements DmaapReRegistrationC } }); } - - private synchronized DMaaPConsumerReactiveHttpClient getHttpClient() { - return httpClient; - } } diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/AaiReactiveClient.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/AaiReactiveClient.java index 84fc9f7d..c0e9b1c4 100644 --- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/AaiReactiveClient.java +++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/AaiReactiveClient.java @@ -20,6 +20,8 @@ package org.onap.bbs.event.processor.utilities; +import static org.onap.bbs.event.processor.utilities.GenericUtils.keyStoreFromResource; +import static org.onap.dcaegen2.services.sdk.security.ssl.Passwords.fromPath; import static org.springframework.web.reactive.function.client.ExchangeFilterFunctions.basicAuthentication; import com.google.gson.Gson; @@ -27,9 +29,10 @@ import com.google.gson.JsonSyntaxException; import io.netty.handler.ssl.SslContext; +import java.nio.file.Paths; + import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; -import javax.net.ssl.SSLException; import org.onap.bbs.event.processor.config.AaiClientConfiguration; import org.onap.bbs.event.processor.config.ApplicationConfiguration; @@ -37,7 +40,9 @@ import org.onap.bbs.event.processor.config.ConfigurationChangeObserver; import org.onap.bbs.event.processor.exceptions.AaiTaskException; import org.onap.bbs.event.processor.model.PnfAaiObject; import org.onap.bbs.event.processor.model.ServiceInstanceAaiObject; -import org.onap.dcaegen2.services.sdk.rest.services.ssl.SslFactory; +import org.onap.dcaegen2.services.sdk.security.ssl.ImmutableSecurityKeys; +import org.onap.dcaegen2.services.sdk.security.ssl.SecurityKeys; +import org.onap.dcaegen2.services.sdk.security.ssl.SslFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -64,7 +69,7 @@ public class AaiReactiveClient implements ConfigurationChangeObserver { private AaiClientConfiguration aaiClientConfiguration; @Autowired - AaiReactiveClient(ApplicationConfiguration configuration, Gson gson) throws SSLException { + AaiReactiveClient(ApplicationConfiguration configuration, Gson gson) { this.configuration = configuration; this.gson = gson; this.sslFactory = new SslFactory(); @@ -85,25 +90,20 @@ public class AaiReactiveClient implements ConfigurationChangeObserver { @Override public void updateConfiguration() { - AaiClientConfiguration newConfiguration = configuration.getAaiClientConfiguration(); + var newConfiguration = configuration.getAaiClientConfiguration(); if (aaiClientConfiguration.equals(newConfiguration)) { LOGGER.info("No Configuration changes necessary for AAI Reactive client"); } else { synchronized (this) { LOGGER.info("AAI Reactive client must be re-configured"); aaiClientConfiguration = newConfiguration; - try { - setupWebClient(); - } catch (SSLException e) { - LOGGER.error("AAI Reactive client SSL error while re-configuring WebClient"); - LOGGER.debug("SSL Exception\n", e); - } + setupWebClient(); } } } - private void setupWebClient() throws SSLException { - SslContext sslContext = createSslContext(); + private void setupWebClient() { + var sslContext = createSslContext(); ClientHttpConnector reactorClientHttpConnector = new ReactorClientHttpConnector( HttpClient.create().secure(sslContextSpec -> sslContextSpec.sslContext(sslContext))); @@ -132,7 +132,7 @@ public class AaiReactiveClient implements ConfigurationChangeObserver { private <T> Mono<T> performReactiveHttpGet(String url, Class<T> responseType) { LOGGER.debug("Will issue Reactive GET request to URL ({}) for object ({})", url, responseType.getName()); - WebClient webClient = getWebClient(); + var webClient = getWebClient(); return webClient .get() .uri(url) @@ -179,17 +179,18 @@ public class AaiReactiveClient implements ConfigurationChangeObserver { }); } - private SslContext createSslContext() throws SSLException { + private SslContext createSslContext() { if (aaiClientConfiguration.enableAaiCertAuth()) { LOGGER.info("Creating secure context with:\n {}", aaiClientConfiguration); - return sslFactory.createSecureContext( - aaiClientConfiguration.keyStorePath(), - aaiClientConfiguration.keyStorePasswordPath(), - aaiClientConfiguration.trustStorePath(), - aaiClientConfiguration.trustStorePasswordPath() - ); + final SecurityKeys securityKeys = ImmutableSecurityKeys.builder() + .keyStore(keyStoreFromResource(aaiClientConfiguration.keyStorePath())) + .keyStorePassword(fromPath(Paths.get(aaiClientConfiguration.keyStorePasswordPath()))) + .trustStore(keyStoreFromResource(aaiClientConfiguration.trustStorePath())) + .trustStorePassword(fromPath(Paths.get(aaiClientConfiguration.trustStorePasswordPath()))) + .build(); + return sslFactory.createSecureClientContext(securityKeys); } - return sslFactory.createInsecureContext(); + return sslFactory.createInsecureClientContext(); } private synchronized WebClient getWebClient() { diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/CommonEventFields.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/CommonEventFields.java index c2b67348..b3fa09db 100644 --- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/CommonEventFields.java +++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/CommonEventFields.java @@ -20,7 +20,7 @@ package org.onap.bbs.event.processor.utilities; -public class CommonEventFields { +class CommonEventFields { static final String COMMON_FORMAT = "\": \"%s\""; static final String EVENT = "event"; diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/ControlLoopJsonBodyBuilder.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/ControlLoopJsonBodyBuilder.java index 03ea0936..f27cca5e 100644 --- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/ControlLoopJsonBodyBuilder.java +++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/ControlLoopJsonBodyBuilder.java @@ -21,24 +21,23 @@ package org.onap.bbs.event.processor.utilities; import com.google.gson.GsonBuilder; +import com.google.gson.JsonElement; import com.google.gson.TypeAdapterFactory; import java.util.ServiceLoader; import org.onap.bbs.event.processor.model.ControlLoopPublisherDmaapModel; import org.onap.bbs.event.processor.model.ImmutableControlLoopPublisherDmaapModel; -import org.onap.dcaegen2.services.sdk.rest.services.model.JsonBodyBuilder; -public class ControlLoopJsonBodyBuilder implements JsonBodyBuilder<ControlLoopPublisherDmaapModel> { +public class ControlLoopJsonBodyBuilder { /** * Serialize the Control Loop DMaaP model with GSON. * @param publisherDmaapModel object to be serialized * @return String output of serialization */ - @Override public String createJsonBody(ControlLoopPublisherDmaapModel publisherDmaapModel) { - GsonBuilder gsonBuilder = new GsonBuilder().disableHtmlEscaping(); + var gsonBuilder = new GsonBuilder().disableHtmlEscaping(); ServiceLoader.load(TypeAdapterFactory.class).forEach(gsonBuilder::registerTypeAdapterFactory); return gsonBuilder.create().toJson(ImmutableControlLoopPublisherDmaapModel.builder() .closedLoopEventClient(publisherDmaapModel.getClosedLoopEventClient()) @@ -56,4 +55,30 @@ public class ControlLoopJsonBodyBuilder implements JsonBodyBuilder<ControlLoopPu .originator(publisherDmaapModel.getOriginator()) .build()); } + + /** + * Serialize the Control Loop DMaaP model with GSON. + * @param publisherDmaapModel object to be serialized + * @return String output of serialization + */ + public static JsonElement createAsJsonElement(ControlLoopPublisherDmaapModel publisherDmaapModel) { + var gsonBuilder = new GsonBuilder().disableHtmlEscaping(); + ServiceLoader.load(TypeAdapterFactory.class).forEach(gsonBuilder::registerTypeAdapterFactory); + return gsonBuilder.create().toJsonTree(ImmutableControlLoopPublisherDmaapModel.builder() + .closedLoopEventClient(publisherDmaapModel.getClosedLoopEventClient()) + .policyVersion(publisherDmaapModel.getPolicyVersion()) + .policyName(publisherDmaapModel.getPolicyName()) + .policyScope(publisherDmaapModel.getPolicyScope()) + .targetType(publisherDmaapModel.getTargetType()) + .aaiEnrichmentData(publisherDmaapModel.getAaiEnrichmentData()) + .closedLoopAlarmStart(publisherDmaapModel.getClosedLoopAlarmStart()) + .closedLoopEventStatus(publisherDmaapModel.getClosedLoopEventStatus()) + .closedLoopControlName(publisherDmaapModel.getClosedLoopControlName()) + .version(publisherDmaapModel.getVersion()) + .target(publisherDmaapModel.getTarget()) + .requestId(publisherDmaapModel.getRequestId()) + .originator(publisherDmaapModel.getOriginator()) + .build()); + } + } diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/CpeAuthenticationDmaapConsumerJsonParser.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/CpeAuthenticationDmaapConsumerJsonParser.java index 3cff4e65..5a0466ac 100644 --- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/CpeAuthenticationDmaapConsumerJsonParser.java +++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/CpeAuthenticationDmaapConsumerJsonParser.java @@ -46,11 +46,13 @@ import org.onap.bbs.event.processor.model.CpeAuthenticationConsumerDmaapModel; import org.onap.bbs.event.processor.model.ImmutableCpeAuthenticationConsumerDmaapModel; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; import org.springframework.util.StringUtils; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +@Component public class CpeAuthenticationDmaapConsumerJsonParser { private static final Logger LOGGER = LoggerFactory.getLogger(CpeAuthenticationDmaapConsumerJsonParser.class); @@ -108,9 +110,9 @@ public class CpeAuthenticationDmaapConsumerJsonParser { return Mono.empty(); } - JsonObject commonEventHeader = dmaapResponseJsonObject.getAsJsonObject(EVENT) + var commonEventHeader = dmaapResponseJsonObject.getAsJsonObject(EVENT) .getAsJsonObject(COMMON_EVENT_HEADER); - JsonObject stateChangeFields = dmaapResponseJsonObject.getAsJsonObject(EVENT) + var stateChangeFields = dmaapResponseJsonObject.getAsJsonObject(EVENT) .getAsJsonObject(STATE_CHANGE_FIELDS); pnfSourceName = getValueFromJson(commonEventHeader, SOURCE_NAME); @@ -120,7 +122,7 @@ public class CpeAuthenticationDmaapConsumerJsonParser { stateInterface = getValueFromJson(stateChangeFields, STATE_INTERFACE); if (stateChangeFields.has(ADDITIONAL_FIELDS)) { - JsonObject additionalFields = stateChangeFields.getAsJsonObject(ADDITIONAL_FIELDS); + var additionalFields = stateChangeFields.getAsJsonObject(ADDITIONAL_FIELDS); rgwMacAddress = getValueFromJson(additionalFields, RGW_MAC_ADDRESS); swVersion = getValueFromJson(additionalFields, SW_VERSION); } @@ -128,7 +130,7 @@ public class CpeAuthenticationDmaapConsumerJsonParser { if (StringUtils.isEmpty(pnfSourceName) || authenticationStatusMissing(oldAuthenticationStatus) || authenticationStatusMissing(newAuthenticationStatus)) { - String incorrectEvent = dumpJsonData(); + var incorrectEvent = dumpJsonData(); LOGGER.warn("Incorrect CPE Authentication JSON event: {}", incorrectEvent); return Mono.empty(); } @@ -164,7 +166,7 @@ public class CpeAuthenticationDmaapConsumerJsonParser { } Optional<JsonObject> getJsonObjectFromAnArray(JsonElement element) { - JsonParser jsonParser = new JsonParser(); + var jsonParser = new JsonParser(); return element.isJsonPrimitive() ? Optional.of(jsonParser.parse(element.getAsString()).getAsJsonObject()) : Optional.of(jsonParser.parse(element.toString()).getAsJsonObject()); } diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/CpeAuthenticationJsonBodyBuilder.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/CpeAuthenticationJsonBodyBuilder.java index 09aa7303..df8eaa40 100644 --- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/CpeAuthenticationJsonBodyBuilder.java +++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/CpeAuthenticationJsonBodyBuilder.java @@ -27,18 +27,16 @@ import java.util.ServiceLoader; import org.onap.bbs.event.processor.model.CpeAuthenticationConsumerDmaapModel; import org.onap.bbs.event.processor.model.ImmutableCpeAuthenticationConsumerDmaapModel; -import org.onap.dcaegen2.services.sdk.rest.services.model.JsonBodyBuilder; -public class CpeAuthenticationJsonBodyBuilder implements JsonBodyBuilder<CpeAuthenticationConsumerDmaapModel> { +public class CpeAuthenticationJsonBodyBuilder { /** * Serialize the CPE authentication DMaaP model with GSON. * @param cpeAuthenticationConsumerDmaapModel object to be serialized * @return String output of serialization */ - @Override public String createJsonBody(CpeAuthenticationConsumerDmaapModel cpeAuthenticationConsumerDmaapModel) { - GsonBuilder gsonBuilder = new GsonBuilder(); + var gsonBuilder = new GsonBuilder(); ServiceLoader.load(TypeAdapterFactory.class).forEach(gsonBuilder::registerTypeAdapterFactory); return gsonBuilder.create().toJson(ImmutableCpeAuthenticationConsumerDmaapModel.builder() .correlationId(cpeAuthenticationConsumerDmaapModel.getCorrelationId()) diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/GenericUtils.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/GenericUtils.java new file mode 100644 index 00000000..be8bea09 --- /dev/null +++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/GenericUtils.java @@ -0,0 +1,98 @@ +/* + * ============LICENSE_START======================================================= + * BBS-RELOCATION-CPE-AUTHENTICATION-HANDLER + * ================================================================================ + * Copyright (C) 2019 NOKIA Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.bbs.event.processor.utilities; + +import java.nio.file.Paths; + +import org.jetbrains.annotations.NotNull; +import org.onap.bbs.event.processor.exceptions.ApplicationEnvironmentException; +import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableMessageRouterSink; +import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableMessageRouterSource; +import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSink; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.ContentType; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterPublishRequest; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterSubscribeRequest; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest; +import org.onap.dcaegen2.services.sdk.security.ssl.SecurityKeysStore; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.util.StringUtils; + +public class GenericUtils { + + private static final Logger LOGGER = LoggerFactory.getLogger(GenericUtils.class); + + private GenericUtils() {} + + /** + * Creates Message Router subscription request. + * @param topicUrl URL of topic to use + * @param consumerGroup Consumer Group for subscription + * @param consumerId Consumer ID for subscription + * @return request based on provided input + */ + public static MessageRouterSubscribeRequest createSubscribeRequest(String topicUrl, + String consumerGroup, String consumerId) { + var sourceDefinition = ImmutableMessageRouterSource.builder() + .name("Subscriber source") + .topicUrl(topicUrl) + .build(); + + return ImmutableMessageRouterSubscribeRequest + .builder() + .sourceDefinition(sourceDefinition) + .consumerGroup(consumerGroup) + .consumerId(consumerId) + .build(); + } + + /** + * Creates Message Router publish request. + * @param topicUrl URL of topic to use + * @return request based on provided input + */ + public static MessageRouterPublishRequest createPublishRequest(String topicUrl) { + MessageRouterSink sinkDefinition = ImmutableMessageRouterSink.builder() + .topicUrl(topicUrl) + .name("Producer sink") + .build(); + + return ImmutableMessageRouterPublishRequest.builder() + .sinkDefinition(sinkDefinition) + .contentType(ContentType.APPLICATION_JSON) + .build(); + } + + /** + * Creates a security key store for HTTPS. + * @param resource identifying the resource from which we will read security information + * @return store that will be used for HTTPS + */ + @NotNull public static SecurityKeysStore keyStoreFromResource(String resource) { + if (StringUtils.isEmpty(resource)) { + throw new ApplicationEnvironmentException("Resource for security key store is empty"); + } + var path = Paths.get(resource); + LOGGER.info("Reading keys from {}", path.toString()); + return SecurityKeysStore.fromPath(path); + } +} diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/ReRegistrationDmaapConsumerJsonParser.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/ReRegistrationDmaapConsumerJsonParser.java index 9fe0c277..5e249e57 100644 --- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/ReRegistrationDmaapConsumerJsonParser.java +++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/ReRegistrationDmaapConsumerJsonParser.java @@ -41,11 +41,13 @@ import org.onap.bbs.event.processor.model.ImmutableReRegistrationConsumerDmaapMo import org.onap.bbs.event.processor.model.ReRegistrationConsumerDmaapModel; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; import org.springframework.util.StringUtils; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +@Component public class ReRegistrationDmaapConsumerJsonParser { private static final Logger LOGGER = LoggerFactory.getLogger(ReRegistrationDmaapConsumerJsonParser.class); @@ -101,7 +103,7 @@ public class ReRegistrationDmaapConsumerJsonParser { return Mono.empty(); } - JsonObject pnfReRegistrationFields = + var pnfReRegistrationFields = dmaapResponseJsonObject.getAsJsonObject(ADDITIONAL_FIELDS); pnfCorrelationId = getValueFromJson(dmaapResponseJsonObject, CORRELATION_ID); @@ -112,7 +114,7 @@ public class ReRegistrationDmaapConsumerJsonParser { svlan = getValueFromJson(pnfReRegistrationFields, SVLAN); if (StringUtils.isEmpty(pnfCorrelationId) || anyImportantPropertyMissing()) { - String incorrectEvent = dumpJsonData(); + var incorrectEvent = dumpJsonData(); LOGGER.warn("Incorrect Re-Registration JSON event: {}", incorrectEvent); return Mono.empty(); } @@ -148,7 +150,7 @@ public class ReRegistrationDmaapConsumerJsonParser { } Optional<JsonObject> getJsonObjectFromAnArray(JsonElement element) { - JsonParser jsonParser = new JsonParser(); + var jsonParser = new JsonParser(); return element.isJsonPrimitive() ? Optional.of(jsonParser.parse(element.getAsString()).getAsJsonObject()) : Optional.of(jsonParser.parse(element.toString()).getAsJsonObject()); } diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/ReRegistrationJsonBodyBuilder.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/ReRegistrationJsonBodyBuilder.java index 867cfda7..bf90a4c5 100644 --- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/ReRegistrationJsonBodyBuilder.java +++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/ReRegistrationJsonBodyBuilder.java @@ -27,18 +27,16 @@ import java.util.ServiceLoader; import org.onap.bbs.event.processor.model.ImmutableReRegistrationConsumerDmaapModel; import org.onap.bbs.event.processor.model.ReRegistrationConsumerDmaapModel; -import org.onap.dcaegen2.services.sdk.rest.services.model.JsonBodyBuilder; -public class ReRegistrationJsonBodyBuilder implements JsonBodyBuilder<ReRegistrationConsumerDmaapModel> { +public class ReRegistrationJsonBodyBuilder { /** * Serialize the Re-Registration DMaaP model with GSON. * @param reRegistrationConsumerDmaapModel object to be serialized * @return String output of serialization */ - @Override public String createJsonBody(ReRegistrationConsumerDmaapModel reRegistrationConsumerDmaapModel) { - GsonBuilder gsonBuilder = new GsonBuilder(); + var gsonBuilder = new GsonBuilder(); ServiceLoader.load(TypeAdapterFactory.class).forEach(gsonBuilder::registerTypeAdapterFactory); return gsonBuilder.create().toJson(ImmutableReRegistrationConsumerDmaapModel.builder() .correlationId(reRegistrationConsumerDmaapModel.getCorrelationId()) |