diff options
Diffstat (limited to 'components/bbs-event-processor/src')
41 files changed, 1258 insertions, 1254 deletions
diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/Application.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/Application.java index 66e1f86d..94b955f1 100644 --- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/Application.java +++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/Application.java @@ -71,7 +71,7 @@ public class Application { @Bean TaskScheduler threadPoolTaskScheduler() { - ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler(); + var scheduler = new ThreadPoolTaskScheduler(); scheduler.setPoolSize(THREADS_IN_POOL); scheduler.setThreadNamePrefix("pipeline-thrd-"); return scheduler; diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/config/ApplicationConfiguration.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/config/ApplicationConfiguration.java index 5022a693..33a935e5 100644 --- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/config/ApplicationConfiguration.java +++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/config/ApplicationConfiguration.java @@ -21,7 +21,10 @@ package org.onap.bbs.event.processor.config; import static org.onap.bbs.event.processor.config.ApplicationConstants.STREAMS_TYPE; +import static org.onap.bbs.event.processor.utilities.GenericUtils.keyStoreFromResource; +import static org.onap.dcaegen2.services.sdk.security.ssl.Passwords.fromPath; +import java.nio.file.Paths; import java.util.HashSet; import java.util.Map; import java.util.Set; @@ -31,10 +34,12 @@ import org.onap.bbs.event.processor.exceptions.ApplicationEnvironmentException; import org.onap.bbs.event.processor.exceptions.ConfigurationParsingException; import org.onap.bbs.event.processor.model.GeneratedAppConfigObject; import org.onap.bbs.event.processor.utilities.LoggingUtil; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapConsumerConfiguration; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapPublisherConfiguration; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.ImmutableMessageRouterPublisherConfig; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.ImmutableMessageRouterSubscriberConfig; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterPublisherConfig; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterSubscriberConfig; +import org.onap.dcaegen2.services.sdk.security.ssl.ImmutableSecurityKeys; +import org.onap.dcaegen2.services.sdk.security.ssl.SecurityKeys; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Configuration; @@ -48,9 +53,9 @@ public class ApplicationConfiguration implements ConfigurationChangeObservable { private final SecurityProperties securityProperties; private final GenericProperties genericProperties; - private DmaapConsumerConfiguration dmaapReRegistrationConsumerConfiguration; - private DmaapConsumerConfiguration dmaapCpeAuthenticationConsumerConfiguration; - private DmaapPublisherConfiguration dmaapPublisherConfiguration; + private MessageRouterSubscriberConfig dmaapReRegistrationConsumerConfiguration; + private MessageRouterSubscriberConfig dmaapCpeAuthenticationConsumerConfiguration; + private MessageRouterPublisherConfig dmaapPublisherConfiguration; private AaiClientConfiguration aaiClientConfiguration; private Set<ConfigurationChangeObserver> observers; @@ -97,15 +102,15 @@ public class ApplicationConfiguration implements ConfigurationChangeObservable { observers.forEach(ConfigurationChangeObserver::updateConfiguration); } - public synchronized DmaapConsumerConfiguration getDmaapReRegistrationConsumerConfiguration() { + public synchronized MessageRouterSubscriberConfig getDmaapReRegistrationConsumerConfiguration() { return dmaapReRegistrationConsumerConfiguration; } - public synchronized DmaapConsumerConfiguration getDmaapCpeAuthenticationConsumerConfiguration() { + public synchronized MessageRouterSubscriberConfig getDmaapCpeAuthenticationConsumerConfiguration() { return dmaapCpeAuthenticationConsumerConfiguration; } - public synchronized DmaapPublisherConfiguration getDmaapPublisherConfiguration() { + public synchronized MessageRouterPublisherConfig getDmaapPublisherConfiguration() { return dmaapPublisherConfiguration; } @@ -117,6 +122,18 @@ public class ApplicationConfiguration implements ConfigurationChangeObservable { return genericProperties.getPipelinesPollingIntervalSec(); } + public synchronized DmaapProducerProperties getDmaapProducerProperties() { + return dmaapProducerProperties; + } + + public synchronized DmaapReRegistrationConsumerProperties getDmaapReRegistrationConsumerProperties() { + return dmaapReRegistrationConsumerProperties; + } + + public synchronized DmaapCpeAuthenticationConsumerProperties getDmaapCpeAuthenticationConsumerProperties() { + return dmaapCpeAuthenticationConsumerProperties; + } + public synchronized int getPipelinesTimeoutInSeconds() { return genericProperties.getPipelinesTimeoutSec(); } @@ -180,11 +197,17 @@ public class ApplicationConfiguration implements ConfigurationChangeObservable { securityProperties.setKeyStorePasswordPath(newConfiguration.keyStorePasswordPath()); securityProperties.setTrustStorePath(newConfiguration.trustStorePath()); securityProperties.setTrustStorePasswordPath(newConfiguration.trustStorePasswordPath()); - - GeneratedAppConfigObject.StreamsObject reRegObject = + final SecurityKeys securityKeys = ImmutableSecurityKeys.builder() + .keyStore(keyStoreFromResource(securityProperties.getKeyStorePath())) + .keyStorePassword(fromPath(Paths.get(securityProperties.getKeyStorePasswordPath()))) + .trustStore(keyStoreFromResource(securityProperties.getTrustStorePath())) + .trustStorePassword(fromPath(Paths.get(securityProperties.getTrustStorePasswordPath()))) + .build(); + + var reRegObject = getStreamsObject(newConfiguration.streamSubscribesMap(), newConfiguration.reRegConfigKey(), "PNF Re-Registration"); - TopicUrlInfo topicUrlInfo = parseTopicUrl(reRegObject.dmaapInfo().topicUrl()); + var topicUrlInfo = parseTopicUrl(reRegObject.dmaapInfo().topicUrl()); dmaapReRegistrationConsumerProperties.setDmaapHostName(topicUrlInfo.getHost()); dmaapReRegistrationConsumerProperties.setDmaapPortNumber(topicUrlInfo.getPort()); dmaapReRegistrationConsumerProperties.setDmaapProtocol(newConfiguration.dmaapProtocol()); @@ -196,9 +219,9 @@ public class ApplicationConfiguration implements ConfigurationChangeObservable { dmaapReRegistrationConsumerProperties.setConsumerGroup(newConfiguration.dmaapConsumerConsumerGroup()); dmaapReRegistrationConsumerProperties.setMessageLimit(newConfiguration.dmaapMessageLimit()); dmaapReRegistrationConsumerProperties.setTimeoutMs(newConfiguration.dmaapTimeoutMs()); - constructDmaapReRegistrationConfiguration(); + constructDmaapReRegistrationConfiguration(securityKeys); - GeneratedAppConfigObject.StreamsObject cpeAuthObject = + var cpeAuthObject = getStreamsObject(newConfiguration.streamSubscribesMap(), newConfiguration.cpeAuthConfigKey(), "CPE Authentication"); topicUrlInfo = parseTopicUrl(cpeAuthObject.dmaapInfo().topicUrl()); @@ -213,9 +236,9 @@ public class ApplicationConfiguration implements ConfigurationChangeObservable { dmaapCpeAuthenticationConsumerProperties.setConsumerGroup(newConfiguration.dmaapConsumerConsumerGroup()); dmaapCpeAuthenticationConsumerProperties.setMessageLimit(newConfiguration.dmaapMessageLimit()); dmaapCpeAuthenticationConsumerProperties.setTimeoutMs(newConfiguration.dmaapTimeoutMs()); - constructDmaapCpeAuthenticationConfiguration(); + constructDmaapCpeAuthenticationConfiguration(securityKeys); - GeneratedAppConfigObject.StreamsObject closeLoopObject = + var closeLoopObject = getStreamsObject(newConfiguration.streamPublishesMap(), newConfiguration.closeLoopConfigKey(), "Close Loop"); topicUrlInfo = parseTopicUrl(closeLoopObject.dmaapInfo().topicUrl()); @@ -226,7 +249,7 @@ public class ApplicationConfiguration implements ConfigurationChangeObservable { dmaapProducerProperties.setDmaapUserPassword(closeLoopObject.aafPassword()); dmaapProducerProperties.setDmaapContentType(newConfiguration.dmaapContentType()); dmaapProducerProperties.setDmaapTopicName(topicUrlInfo.getTopicName()); - constructDmaapProducerConfiguration(); + constructDmaapProducerConfiguration(securityKeys); aaiClientProperties.setAaiHost(newConfiguration.aaiHost()); aaiClientProperties.setAaiPort(newConfiguration.aaiPort()); @@ -258,7 +281,7 @@ public class ApplicationConfiguration implements ConfigurationChangeObservable { @NotNull private GeneratedAppConfigObject.StreamsObject getStreamsObject( Map<String, GeneratedAppConfigObject.StreamsObject> map, String configKey, String messageName) { - GeneratedAppConfigObject.StreamsObject streamsObject = map.get(configKey); + var streamsObject = map.get(configKey); if (!STREAMS_TYPE.equals(streamsObject.type())) { throw new ApplicationEnvironmentException(String.format("%s requires information about" + " message-router topic in ONAP", messageName)); @@ -267,80 +290,45 @@ public class ApplicationConfiguration implements ConfigurationChangeObservable { } private void constructConfigurationObjects() { - constructDmaapReRegistrationConfiguration(); - constructDmaapCpeAuthenticationConfiguration(); - constructDmaapProducerConfiguration(); + final SecurityKeys securityKeysForReRegistration = ImmutableSecurityKeys.builder() + .keyStore(keyStoreFromResource(securityProperties.getKeyStorePath())) + .keyStorePassword(fromPath(Paths.get(securityProperties.getKeyStorePasswordPath()))) + .trustStore(keyStoreFromResource(securityProperties.getTrustStorePath())) + .trustStorePassword(fromPath(Paths.get(securityProperties.getTrustStorePasswordPath()))) + .build(); + constructDmaapReRegistrationConfiguration(securityKeysForReRegistration); + final SecurityKeys securityKeysForCpeAuthentication = ImmutableSecurityKeys.builder() + .keyStore(keyStoreFromResource(securityProperties.getKeyStorePath())) + .keyStorePassword(fromPath(Paths.get(securityProperties.getKeyStorePasswordPath()))) + .trustStore(keyStoreFromResource(securityProperties.getTrustStorePath())) + .trustStorePassword(fromPath(Paths.get(securityProperties.getTrustStorePasswordPath()))) + .build(); + constructDmaapCpeAuthenticationConfiguration(securityKeysForCpeAuthentication); + final SecurityKeys securityKeysForProducer = ImmutableSecurityKeys.builder() + .keyStore(keyStoreFromResource(securityProperties.getKeyStorePath())) + .keyStorePassword(fromPath(Paths.get(securityProperties.getKeyStorePasswordPath()))) + .trustStore(keyStoreFromResource(securityProperties.getTrustStorePath())) + .trustStorePassword(fromPath(Paths.get(securityProperties.getTrustStorePasswordPath()))) + .build(); + constructDmaapProducerConfiguration(securityKeysForProducer); constructAaiConfiguration(); } - private void constructDmaapReRegistrationConfiguration() { - dmaapReRegistrationConsumerConfiguration = new ImmutableDmaapConsumerConfiguration.Builder() - .dmaapHostName(dmaapReRegistrationConsumerProperties.getDmaapHostName()) - .dmaapPortNumber(dmaapReRegistrationConsumerProperties.getDmaapPortNumber()) - .dmaapProtocol(dmaapReRegistrationConsumerProperties.getDmaapProtocol()) - .dmaapTopicName(dmaapReRegistrationConsumerProperties.getDmaapTopicName()) - .dmaapUserName( - dmaapReRegistrationConsumerProperties.getDmaapUserName() == null ? "" : - dmaapReRegistrationConsumerProperties.getDmaapUserName()) - .dmaapUserPassword( - dmaapReRegistrationConsumerProperties.getDmaapUserPassword() == null ? "" : - dmaapReRegistrationConsumerProperties.getDmaapUserPassword()) - .dmaapContentType(dmaapReRegistrationConsumerProperties.getDmaapContentType()) - .consumerId(dmaapReRegistrationConsumerProperties.getConsumerId()) - .consumerGroup(dmaapReRegistrationConsumerProperties.getConsumerGroup()) - .timeoutMs(dmaapReRegistrationConsumerProperties.getTimeoutMs()) - .messageLimit(dmaapReRegistrationConsumerProperties.getMessageLimit()) - .enableDmaapCertAuth(securityProperties.isEnableDmaapCertAuth()) - .keyStorePath(securityProperties.getKeyStorePath()) - .keyStorePasswordPath(securityProperties.getKeyStorePasswordPath()) - .trustStorePath(securityProperties.getTrustStorePath()) - .trustStorePasswordPath(securityProperties.getTrustStorePasswordPath()) + private void constructDmaapReRegistrationConfiguration(final SecurityKeys securityKeys) { + dmaapReRegistrationConsumerConfiguration = ImmutableMessageRouterSubscriberConfig.builder() + .securityKeys(securityKeys) .build(); } - private void constructDmaapCpeAuthenticationConfiguration() { - dmaapCpeAuthenticationConsumerConfiguration = new ImmutableDmaapConsumerConfiguration.Builder() - .dmaapHostName(dmaapCpeAuthenticationConsumerProperties.getDmaapHostName()) - .dmaapPortNumber(dmaapCpeAuthenticationConsumerProperties.getDmaapPortNumber()) - .dmaapProtocol(dmaapCpeAuthenticationConsumerProperties.getDmaapProtocol()) - .dmaapTopicName(dmaapCpeAuthenticationConsumerProperties.getDmaapTopicName()) - .dmaapUserName( - dmaapCpeAuthenticationConsumerProperties.getDmaapUserName() == null ? "" : - dmaapCpeAuthenticationConsumerProperties.getDmaapUserName()) - .dmaapUserPassword( - dmaapCpeAuthenticationConsumerProperties.getDmaapUserPassword() == null ? "" : - dmaapCpeAuthenticationConsumerProperties.getDmaapUserPassword()) - .dmaapContentType(dmaapCpeAuthenticationConsumerProperties.getDmaapContentType()) - .consumerId(dmaapCpeAuthenticationConsumerProperties.getConsumerId()) - .consumerGroup(dmaapCpeAuthenticationConsumerProperties.getConsumerGroup()) - .timeoutMs(dmaapCpeAuthenticationConsumerProperties.getTimeoutMs()) - .messageLimit(dmaapCpeAuthenticationConsumerProperties.getMessageLimit()) - .enableDmaapCertAuth(securityProperties.isEnableDmaapCertAuth()) - .keyStorePath(securityProperties.getKeyStorePath()) - .keyStorePasswordPath(securityProperties.getKeyStorePasswordPath()) - .trustStorePath(securityProperties.getTrustStorePath()) - .trustStorePasswordPath(securityProperties.getTrustStorePasswordPath()) + private void constructDmaapCpeAuthenticationConfiguration(final SecurityKeys securityKeys) { + dmaapCpeAuthenticationConsumerConfiguration = ImmutableMessageRouterSubscriberConfig.builder() + .securityKeys(securityKeys) .build(); } - private void constructDmaapProducerConfiguration() { - dmaapPublisherConfiguration = new ImmutableDmaapPublisherConfiguration.Builder() - .dmaapHostName(dmaapProducerProperties.getDmaapHostName()) - .dmaapPortNumber(dmaapProducerProperties.getDmaapPortNumber()) - .dmaapProtocol(dmaapProducerProperties.getDmaapProtocol()) - .dmaapTopicName(dmaapProducerProperties.getDmaapTopicName()) - .dmaapUserName( - dmaapProducerProperties.getDmaapUserName() == null ? "" : - dmaapProducerProperties.getDmaapUserName()) - .dmaapUserPassword( - dmaapProducerProperties.getDmaapUserPassword() == null ? "" : - dmaapProducerProperties.getDmaapUserPassword()) - .dmaapContentType(dmaapProducerProperties.getDmaapContentType()) - .enableDmaapCertAuth(securityProperties.isEnableDmaapCertAuth()) - .keyStorePath(securityProperties.getKeyStorePath()) - .keyStorePasswordPath(securityProperties.getKeyStorePasswordPath()) - .trustStorePath(securityProperties.getTrustStorePath()) - .trustStorePasswordPath(securityProperties.getTrustStorePasswordPath()) + private void constructDmaapProducerConfiguration(final SecurityKeys securityKeys) { + dmaapPublisherConfiguration = ImmutableMessageRouterPublisherConfig.builder() + .securityKeys(securityKeys) .build(); } @@ -362,19 +350,19 @@ public class ApplicationConfiguration implements ConfigurationChangeObservable { } private TopicUrlInfo parseTopicUrl(String topicUrl) { - String[] urlTokens = topicUrl.split(":"); + var urlTokens = topicUrl.split(":"); if (urlTokens.length != 3) { throw new ConfigurationParsingException("Wrong topic URL format"); } - TopicUrlInfo topicUrlInfo = new TopicUrlInfo(); + var topicUrlInfo = new TopicUrlInfo(); topicUrlInfo.setHost(urlTokens[1].replace("/", "")); - String[] tokensAfterHost = urlTokens[2].split("/events/"); + var tokensAfterHost = urlTokens[2].split("/events/"); if (tokensAfterHost.length != 2) { throw new ConfigurationParsingException("Wrong topic name structure"); } topicUrlInfo.setPort(Integer.valueOf(tokensAfterHost[0])); - topicUrlInfo.setTopicName("events/" + tokensAfterHost[1]); + topicUrlInfo.setTopicName(tokensAfterHost[1]); return topicUrlInfo; } diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/config/ApplicationConstants.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/config/ApplicationConstants.java index d2ac3c10..7be6bdea 100644 --- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/config/ApplicationConstants.java +++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/config/ApplicationConstants.java @@ -35,5 +35,9 @@ public class ApplicationConstants { // Close Loop Constants public static final String DCAE_BBS_EVENT_PROCESSOR_MS_INSTANCE = "DCAE.BBS_event_processor_mSInstance"; + // Subscribe for DMaaP + public static final String SUBSCRIBE_URL_TEMPLATE = "%s://%s:%d/events/%s"; + public static final String PUBLISH_URL_TEMPLATE = "%s://%s:%d/events/%s"; + private ApplicationConstants() {} }
\ No newline at end of file diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/config/ConsulConfigurationGateway.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/config/ConsulConfigurationGateway.java index 607b3b31..4ac85495 100644 --- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/config/ConsulConfigurationGateway.java +++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/config/ConsulConfigurationGateway.java @@ -40,8 +40,7 @@ import org.onap.bbs.event.processor.model.ImmutableGeneratedAppConfigObject; import org.onap.bbs.event.processor.model.ImmutableStreamsObject; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClientFactory; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsRequests; -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.CbsRequest; -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperties; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.CbsClientConfiguration; import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -85,9 +84,9 @@ public class ConsulConfigurationGateway { } boolean environmentNotReady() { - String consulHost = System.getenv().get(CONSUL_HOST); - String cbs = System.getenv().get(CONFIG_BINDING_SERVICE); - String hostname = System.getenv().get(HOSTNAME); + var consulHost = System.getenv().get(CONSUL_HOST); + var cbs = System.getenv().get(CONFIG_BINDING_SERVICE); + var hostname = System.getenv().get(HOSTNAME); return consulHost == null || cbs == null || hostname == null; } @@ -106,7 +105,7 @@ public class ConsulConfigurationGateway { private void parseConsulRetrievedConfiguration(JsonObject jsonObject) { - GeneratedAppConfigObject generatedAppConfigObject = generateAppConfigObject(jsonObject); + var generatedAppConfigObject = generateAppConfigObject(jsonObject); LOGGER.trace("Consul-Retrieved Application Generated Object:\n{}", generatedAppConfigObject); configuration.updateCurrentConfiguration(generatedAppConfigObject); } @@ -124,10 +123,10 @@ public class ConsulConfigurationGateway { RequestDiagnosticContext diagnosticContext = RequestDiagnosticContext.create(); // Necessary properties from the environment (Consul host:port, service-name (hostname), CBS name) - EnvProperties env = EnvProperties.fromEnvironment(); - CbsRequest cbsRequest = CbsRequests.getConfiguration(diagnosticContext); + var cbsClientConfig = CbsClientConfiguration.fromEnvironment(); + var cbsRequest = CbsRequests.getConfiguration(diagnosticContext); // Create the client and use it to get the configuration - cbsFetchPipeline = CbsClientFactory.createCbsClient(env) + cbsFetchPipeline = CbsClientFactory.createCbsClient(cbsClientConfig) .doOnError(e -> LOGGER.warn("CBS Configuration fetch failed with error: {}", e)) .retry(e -> true) .flatMapMany(cbsClient -> cbsClient.updates(cbsRequest, initialDelay, period)) @@ -138,57 +137,57 @@ public class ConsulConfigurationGateway { GeneratedAppConfigObject generateAppConfigObject(JsonObject configObject) { if (LOGGER.isInfoEnabled()) { - String configAsString = gson.toJson(configObject); + var configAsString = gson.toJson(configObject); LOGGER.info("Received App Config object\n{}", configAsString); } - final String dmaapProtocol = configObject.get("dmaap.protocol").getAsString(); - final String dmaapContentType = configObject.get("dmaap.contentType").getAsString(); - final String dmaapConsumerId = configObject.get("dmaap.consumer.consumerId").getAsString(); - final String dmaapConsumerGroup = configObject.get("dmaap.consumer.consumerGroup").getAsString(); - final int dmaapMessageLimit = configObject.get("dmaap.messageLimit").getAsInt(); - final int dmaapTimeoutMs = configObject.get("dmaap.timeoutMs").getAsInt(); - - final String aaiHost = configObject.get("aai.host").getAsString(); - final int aaiPort = configObject.get("aai.port").getAsInt(); - final String aaiProtocol = configObject.get("aai.protocol").getAsString(); - final String aaiUsername = configObject.get("aai.username").getAsString(); - final String aaiPassword = configObject.get("aai.password").getAsString(); - final boolean aaiIgnoreSslCertificateErrors = + final var dmaapProtocol = configObject.get("dmaap.protocol").getAsString(); + final var dmaapContentType = configObject.get("dmaap.contentType").getAsString(); + final var dmaapConsumerId = configObject.get("dmaap.consumer.consumerId").getAsString(); + final var dmaapConsumerGroup = configObject.get("dmaap.consumer.consumerGroup").getAsString(); + final var dmaapMessageLimit = configObject.get("dmaap.messageLimit").getAsInt(); + final var dmaapTimeoutMs = configObject.get("dmaap.timeoutMs").getAsInt(); + + final var aaiHost = configObject.get("aai.host").getAsString(); + final var aaiPort = configObject.get("aai.port").getAsInt(); + final var aaiProtocol = configObject.get("aai.protocol").getAsString(); + final var aaiUsername = configObject.get("aai.username").getAsString(); + final var aaiPassword = configObject.get("aai.password").getAsString(); + final var aaiIgnoreSslCertificateErrors = configObject.get("aai.aaiIgnoreSslCertificateErrors").getAsBoolean(); - final int pipelinesPollingIntervalSec = configObject.get("application.pipelinesPollingIntervalSec").getAsInt(); - final int pipelinesTimeoutSec = configObject.get("application.pipelinesTimeoutSec").getAsInt(); - final int cbsPollingIntervalSec = configObject.get("application.cbsPollingIntervalSec").getAsInt(); + final var pipelinesPollingIntervalSec = configObject.get("application.pipelinesPollingIntervalSec").getAsInt(); + final var pipelinesTimeoutSec = configObject.get("application.pipelinesTimeoutSec").getAsInt(); + final var cbsPollingIntervalSec = configObject.get("application.cbsPollingIntervalSec").getAsInt(); - final String reRegPolicyScope = configObject.get("application.reregistration.policyScope").getAsString(); - final String reRegClControlName = configObject.get("application.reregistration.clControlName").getAsString(); - final String cpeAuthPolicyScope = configObject.get("application.cpe.authentication.policyScope").getAsString(); - final String cpeAuthClControlName = + final var reRegPolicyScope = configObject.get("application.reregistration.policyScope").getAsString(); + final var reRegClControlName = configObject.get("application.reregistration.clControlName").getAsString(); + final var cpeAuthPolicyScope = configObject.get("application.cpe.authentication.policyScope").getAsString(); + final var cpeAuthClControlName = configObject.get("application.cpe.authentication.clControlName").getAsString(); - final String policyVersion = configObject.get("application.policyVersion").getAsString(); - final String closeLoopTargetType = configObject.get("application.clTargetType").getAsString(); - final String closeLoopEventStatus = configObject.get("application.clEventStatus").getAsString(); - final String closeLoopVersion = configObject.get("application.clVersion").getAsString(); - final String closeLoopTarget = configObject.get("application.clTarget").getAsString(); - final String closeLoopOriginator = configObject.get("application.clOriginator").getAsString(); + final var policyVersion = configObject.get("application.policyVersion").getAsString(); + final var closeLoopTargetType = configObject.get("application.clTargetType").getAsString(); + final var closeLoopEventStatus = configObject.get("application.clEventStatus").getAsString(); + final var closeLoopVersion = configObject.get("application.clVersion").getAsString(); + final var closeLoopTarget = configObject.get("application.clTarget").getAsString(); + final var closeLoopOriginator = configObject.get("application.clOriginator").getAsString(); - final String reRegConfigKey = configObject.get("application.reregistration.configKey").getAsString(); - final String cpeAuthConfigKey = configObject.get("application.cpeAuth.configKey").getAsString(); - final String closeLoopConfigKey = configObject.get("application.closeLoop.configKey").getAsString(); + final var reRegConfigKey = configObject.get("application.reregistration.configKey").getAsString(); + final var cpeAuthConfigKey = configObject.get("application.cpeAuth.configKey").getAsString(); + final var closeLoopConfigKey = configObject.get("application.closeLoop.configKey").getAsString(); - final String loggingLevel = configObject.get("application.loggingLevel").getAsString(); + final var loggingLevel = configObject.get("application.loggingLevel").getAsString(); - final String keyStorePath = configObject.get("application.ssl.keyStorePath").getAsString(); - final String keyStorePasswordPath = configObject.get("application.ssl.keyStorePasswordPath").getAsString(); - final String trustStorePath = configObject.get("application.ssl.trustStorePath").getAsString(); - final String trustStorePasswordPath = configObject.get("application.ssl.trustStorePasswordPath").getAsString(); - final boolean aaiEnableCertAuth = configObject.get("application.ssl.enableAaiCertAuth").getAsBoolean(); - final boolean dmaapEnableCertAuth = configObject.get("application.ssl.enableDmaapCertAuth").getAsBoolean(); + final var keyStorePath = configObject.get("application.ssl.keyStorePath").getAsString(); + final var keyStorePasswordPath = configObject.get("application.ssl.keyStorePasswordPath").getAsString(); + final var trustStorePath = configObject.get("application.ssl.trustStorePath").getAsString(); + final var trustStorePasswordPath = configObject.get("application.ssl.trustStorePasswordPath").getAsString(); + final var aaiEnableCertAuth = configObject.get("application.ssl.enableAaiCertAuth").getAsBoolean(); + final var dmaapEnableCertAuth = configObject.get("application.ssl.enableDmaapCertAuth").getAsBoolean(); - final JsonObject streamsPublishes = configObject.getAsJsonObject("streams_publishes"); - final JsonObject streamsSubscribes = configObject.getAsJsonObject("streams_subscribes"); + final var streamsPublishes = configObject.getAsJsonObject("streams_publishes"); + final var streamsSubscribes = configObject.getAsJsonObject("streams_subscribes"); return ImmutableGeneratedAppConfigObject.builder() .dmaapProtocol(dmaapProtocol) @@ -259,22 +258,22 @@ public class ConsulConfigurationGateway { private Map.Entry<String, GeneratedAppConfigObject.StreamsObject> parseStreamsSingleObject( Map.Entry<String, JsonElement> jsonEntry) { - JsonObject closeLoopOutput = (JsonObject) jsonEntry.getValue(); + var closeLoopOutput = (JsonObject) jsonEntry.getValue(); - String type = closeLoopOutput.get("type").getAsString(); - String aafUsername = closeLoopOutput.get("aaf_username") != null + var type = closeLoopOutput.get("type").getAsString(); + var aafUsername = closeLoopOutput.get("aaf_username") != null ? closeLoopOutput.get("aaf_username").getAsString() : ""; - String aafPassword = closeLoopOutput.get("aaf_password") != null + var aafPassword = closeLoopOutput.get("aaf_password") != null ? closeLoopOutput.get("aaf_password").getAsString() : ""; - JsonObject dmaapInfo = closeLoopOutput.getAsJsonObject("dmaap_info"); - String clientId = dmaapInfo.get("client_id") != null + var dmaapInfo = closeLoopOutput.getAsJsonObject("dmaap_info"); + var clientId = dmaapInfo.get("client_id") != null ? dmaapInfo.get("client_id").getAsString() : ""; - String clientRole = dmaapInfo.get("client_role") != null + var clientRole = dmaapInfo.get("client_role") != null ? dmaapInfo.get("client_role").getAsString() : ""; - String location = dmaapInfo.get("location") != null + var location = dmaapInfo.get("location") != null ? dmaapInfo.get("location").getAsString() : ""; - String topicUrl = dmaapInfo.get("topic_url").getAsString(); + var topicUrl = dmaapInfo.get("topic_url").getAsString(); GeneratedAppConfigObject.DmaapInfo dmaapInfoObject = ImmutableDmaapInfo.builder() .clientId(clientId) diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/config/MessageRouterConfig.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/config/MessageRouterConfig.java new file mode 100644 index 00000000..b0ecfd51 --- /dev/null +++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/config/MessageRouterConfig.java @@ -0,0 +1,48 @@ +/* + * ============LICENSE_START======================================================= + * BBS-RELOCATION-CPE-AUTHENTICATION-HANDLER + * ================================================================================ + * Copyright (C) 2019 NOKIA Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= +*/ + +package org.onap.bbs.event.processor.config; + +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DmaapClientFactory; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterPublisher; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterSubscriber; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +@Configuration +public class MessageRouterConfig { + + @Bean(name = "ReRegMessageRouterSubscriber") + public MessageRouterSubscriber reRegistrationMessageRouterSubscriber(ApplicationConfiguration configuration) { + return DmaapClientFactory + .createMessageRouterSubscriber(configuration.getDmaapReRegistrationConsumerConfiguration()); + } + + @Bean(name = "CpeAuthMessageRouterSubscriber") + public MessageRouterSubscriber registrationMessageRouterSubscriber(ApplicationConfiguration configuration) { + return DmaapClientFactory + .createMessageRouterSubscriber(configuration.getDmaapCpeAuthenticationConsumerConfiguration()); + } + + @Bean + public MessageRouterPublisher mrPub(ApplicationConfiguration configuration) { + return DmaapClientFactory.createMessageRouterPublisher(configuration.getDmaapPublisherConfiguration()); + } +} diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/exceptions/EmptyDmaapResponseException.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/exceptions/EmptyDmaapResponseException.java index 1be4f43a..e2e49797 100644 --- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/exceptions/EmptyDmaapResponseException.java +++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/exceptions/EmptyDmaapResponseException.java @@ -22,10 +22,6 @@ package org.onap.bbs.event.processor.exceptions; public class EmptyDmaapResponseException extends RuntimeException { - public EmptyDmaapResponseException() { - super(); - } - public EmptyDmaapResponseException(String message) { super(message); } diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/model/ControlLoopPublisherDmaapModel.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/model/ControlLoopPublisherDmaapModel.java index 8fa73e42..c952e9bd 100644 --- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/model/ControlLoopPublisherDmaapModel.java +++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/model/ControlLoopPublisherDmaapModel.java @@ -26,11 +26,10 @@ import java.util.Map; import org.immutables.gson.Gson; import org.immutables.value.Value; -import org.onap.dcaegen2.services.sdk.rest.services.model.DmaapModel; @Value.Immutable @Gson.TypeAdapters(fieldNamingStrategy = true) -public interface ControlLoopPublisherDmaapModel extends DmaapModel { +public interface ControlLoopPublisherDmaapModel { @SerializedName(value = "closedLoopEventClient", alternate = "closedLoopEventClient") String getClosedLoopEventClient(); diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/model/CpeAuthenticationConsumerDmaapModel.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/model/CpeAuthenticationConsumerDmaapModel.java index 42c9896f..50ed66ef 100644 --- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/model/CpeAuthenticationConsumerDmaapModel.java +++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/model/CpeAuthenticationConsumerDmaapModel.java @@ -26,12 +26,10 @@ import java.util.Optional; import org.immutables.gson.Gson; import org.immutables.value.Value; -import org.onap.dcaegen2.services.sdk.rest.services.model.AaiModel; -import org.onap.dcaegen2.services.sdk.rest.services.model.DmaapModel; @Value.Immutable @Gson.TypeAdapters(fieldNamingStrategy = true) -public interface CpeAuthenticationConsumerDmaapModel extends AaiModel, DmaapModel { +public interface CpeAuthenticationConsumerDmaapModel { @SerializedName(value = "correlationId", alternate = "correlationId") String getCorrelationId(); diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/model/PnfAaiObject.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/model/PnfAaiObject.java index 682c0641..f64f30e5 100644 --- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/model/PnfAaiObject.java +++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/model/PnfAaiObject.java @@ -26,11 +26,10 @@ import java.util.Optional; import org.immutables.gson.Gson; import org.immutables.value.Value; -import org.onap.dcaegen2.services.sdk.rest.services.model.ClientModel; @Value.Immutable @Gson.TypeAdapters(fieldNamingStrategy = true, emptyAsNulls = true) -public interface PnfAaiObject extends ClientModel { +public interface PnfAaiObject { @SerializedName(value = "pnf-name", alternate = "pnf-name") String getPnfName(); diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/model/ReRegistrationConsumerDmaapModel.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/model/ReRegistrationConsumerDmaapModel.java index 07fb75aa..3f58aa33 100644 --- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/model/ReRegistrationConsumerDmaapModel.java +++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/model/ReRegistrationConsumerDmaapModel.java @@ -24,12 +24,10 @@ import com.google.gson.annotations.SerializedName; import org.immutables.gson.Gson; import org.immutables.value.Value; -import org.onap.dcaegen2.services.sdk.rest.services.model.AaiModel; -import org.onap.dcaegen2.services.sdk.rest.services.model.DmaapModel; @Value.Immutable @Gson.TypeAdapters(fieldNamingStrategy = true) -public interface ReRegistrationConsumerDmaapModel extends AaiModel, DmaapModel { +public interface ReRegistrationConsumerDmaapModel { @SerializedName(value = "correlationId", alternate = "correlationId") String getCorrelationId(); diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/pipelines/CpeAuthenticationPipeline.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/pipelines/CpeAuthenticationPipeline.java index a30903bb..2cab017f 100644 --- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/pipelines/CpeAuthenticationPipeline.java +++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/pipelines/CpeAuthenticationPipeline.java @@ -25,13 +25,11 @@ import static org.onap.bbs.event.processor.config.ApplicationConstants.DCAE_BBS_ import static org.onap.bbs.event.processor.config.ApplicationConstants.RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME; import static org.onap.bbs.event.processor.config.ApplicationConstants.RETRIEVE_PNF_TASK_NAME; import static org.onap.dcaegen2.services.sdk.rest.services.model.logging.MdcVariables.INSTANCE_UUID; -import static org.onap.dcaegen2.services.sdk.rest.services.model.logging.MdcVariables.RESPONSE_CODE; import java.time.Duration; import java.time.Instant; import java.util.HashMap; import java.util.Map; -import java.util.Optional; import java.util.UUID; import java.util.concurrent.TimeoutException; @@ -42,14 +40,13 @@ import org.onap.bbs.event.processor.exceptions.EmptyDmaapResponseException; import org.onap.bbs.event.processor.model.ControlLoopPublisherDmaapModel; import org.onap.bbs.event.processor.model.CpeAuthenticationConsumerDmaapModel; import org.onap.bbs.event.processor.model.ImmutableControlLoopPublisherDmaapModel; -import org.onap.bbs.event.processor.model.MetadataListAaiObject; import org.onap.bbs.event.processor.model.PnfAaiObject; import org.onap.bbs.event.processor.model.RelationshipListAaiObject; import org.onap.bbs.event.processor.model.ServiceInstanceAaiObject; import org.onap.bbs.event.processor.tasks.AaiClientTask; import org.onap.bbs.event.processor.tasks.DmaapCpeAuthenticationConsumerTask; import org.onap.bbs.event.processor.tasks.DmaapPublisherTask; -import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpResponse; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.slf4j.MDC; @@ -99,7 +96,7 @@ public class CpeAuthenticationPipeline { LOGGER.trace("Reactive CPE Authentication pipeline subscribed - Execution started"); } - Flux<HttpResponse> executePipeline() { + Flux<MessageRouterPublishResponse> executePipeline() { return // Consume CPE Authentication from DMaaP consumeCpeAuthenticationFromDmaap() @@ -111,12 +108,12 @@ public class CpeAuthenticationPipeline { .flatMap(this::triggerPolicy); } - private void onSuccess(HttpResponse responseCode) { - MDC.put(RESPONSE_CODE, String.valueOf(responseCode.statusCode())); - LOGGER.info("CPE Authentication event successfully handled. " - + "Publishing to DMaaP for Policy returned a status code of ({} {})", - responseCode.statusCode(), responseCode.statusReason()); - MDC.remove(RESPONSE_CODE); + private void onSuccess(MessageRouterPublishResponse response) { + if (response.successful()) { + LOGGER.info("CPE Authentication event successfully handled. Published Policy event to DMaaP"); + } else { + LOGGER.error("CPE Authentication event handling error [{}]", response.failReason()); + } } private void onError(Throwable throwable) { @@ -149,7 +146,7 @@ public class CpeAuthenticationPipeline { .map(event -> { // For each message, we have to keep separate state. This state will be enhanced // in each step and handed off to the next processing step - PipelineState state = new PipelineState(); + var state = new PipelineState(); state.setCpeAuthenticationEvent(event); return state; }); @@ -161,9 +158,9 @@ public class CpeAuthenticationPipeline { private Mono<PipelineState> fetchPnfFromAai(PipelineState state) { - CpeAuthenticationConsumerDmaapModel vesEvent = state.getCpeAuthenticationEvent(); - String pnfName = vesEvent.getCorrelationId(); - String url = String.format("/aai/v14/network/pnfs/pnf/%s?depth=all", pnfName); + var vesEvent = state.getCpeAuthenticationEvent(); + var pnfName = vesEvent.getCorrelationId(); + var url = String.format("/aai/v14/network/pnfs/pnf/%s?depth=all", pnfName); LOGGER.debug("Processing Step: Retrieve PNF. Url: ({})", url); return aaiClientTask.executePnfRetrieval(RETRIEVE_PNF_TASK_NAME, url) @@ -191,10 +188,10 @@ public class CpeAuthenticationPipeline { return Mono.empty(); } - PnfAaiObject pnf = state.getPnfAaiObject(); + var pnf = state.getPnfAaiObject(); // Assuming that the PNF will only have a single service-instance relationship pointing // towards the HSI CFS service - String serviceInstanceId = pnf.getRelationshipListAaiObject().getRelationshipEntries() + var serviceInstanceId = pnf.getRelationshipListAaiObject().getRelationshipEntries() .stream() .filter(e -> "service-instance".equals(e.getRelatedTo())) .flatMap(e -> e.getRelationshipData().stream()) @@ -208,7 +205,7 @@ public class CpeAuthenticationPipeline { return Mono.empty(); } - String url = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all", + var url = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all", serviceInstanceId); LOGGER.debug("Processing Step: Retrieve HSI CFS Service. Url: ({})", url); return aaiClientTask.executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, url) @@ -230,21 +227,13 @@ public class CpeAuthenticationPipeline { }); } - private Mono<HttpResponse> triggerPolicy(PipelineState state) { + private Flux<MessageRouterPublishResponse> triggerPolicy(PipelineState state) { if (state == null || state.getHsiCfsServiceInstance() == null) { - return Mono.empty(); - } - - // At this point, we must check if the PNF RGW MAC address matches the value extracted from VES event - if (!isCorrectMacAddress(state)) { - LOGGER.warn("Processing stopped. RGW MAC address taken from event ({}) " - + "does not match with A&AI metadata corresponding value", - state.getCpeAuthenticationEvent().getRgwMacAddress()); - return Mono.empty(); + return Flux.empty(); } - ControlLoopPublisherDmaapModel event = buildTriggeringPolicyEvent(state); + var event = buildTriggeringPolicyEvent(state); return publisherTask.execute(event) .timeout(Duration.ofSeconds(configuration.getPipelinesTimeoutInSeconds())) .doOnError(TimeoutException.class, @@ -256,30 +245,15 @@ public class CpeAuthenticationPipeline { e -> Mono.empty()); } - - private boolean isCorrectMacAddress(PipelineState state) { - // We need to check if the RGW MAC address received in VES event matches the one found in - // HSIA CFS service (in its metadata section) - Optional<MetadataListAaiObject> optionalMetadata = state.getHsiCfsServiceInstance() - .getMetadataListAaiObject(); - String eventRgwMacAddress = state.getCpeAuthenticationEvent().getRgwMacAddress().orElse(""); - return optionalMetadata - .map(list -> list.getMetadataEntries() - .stream() - .anyMatch(m -> "rgw-mac-address".equals(m.getMetaname()) - && m.getMetavalue().equals(eventRgwMacAddress))) - .orElse(false); - } - private ControlLoopPublisherDmaapModel buildTriggeringPolicyEvent(PipelineState state) { - String cfsServiceInstanceId = state.getHsiCfsServiceInstance().getServiceInstanceId(); + var cfsServiceInstanceId = state.getHsiCfsServiceInstance().getServiceInstanceId(); Map<String, String> enrichmentData = new HashMap<>(); enrichmentData.put("service-information.hsia-cfs-service-instance-id", cfsServiceInstanceId); enrichmentData.put("cpe.old-authentication-state", state.cpeAuthenticationEvent.getOldAuthenticationState()); enrichmentData.put("cpe.new-authentication-state", state.cpeAuthenticationEvent.getNewAuthenticationState()); - String swVersion = state.getCpeAuthenticationEvent().getSwVersion().orElse(""); + var swVersion = state.getCpeAuthenticationEvent().getSwVersion().orElse(""); if (!StringUtils.isEmpty(swVersion)) { enrichmentData.put("cpe.swVersion", swVersion); } diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/pipelines/ReRegistrationPipeline.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/pipelines/ReRegistrationPipeline.java index 33a9aea7..0bc5e353 100644 --- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/pipelines/ReRegistrationPipeline.java +++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/pipelines/ReRegistrationPipeline.java @@ -25,12 +25,10 @@ import static org.onap.bbs.event.processor.config.ApplicationConstants.DCAE_BBS_ import static org.onap.bbs.event.processor.config.ApplicationConstants.RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME; import static org.onap.bbs.event.processor.config.ApplicationConstants.RETRIEVE_PNF_TASK_NAME; import static org.onap.dcaegen2.services.sdk.rest.services.model.logging.MdcVariables.INSTANCE_UUID; -import static org.onap.dcaegen2.services.sdk.rest.services.model.logging.MdcVariables.RESPONSE_CODE; import java.time.Duration; import java.time.Instant; import java.util.HashMap; -import java.util.List; import java.util.Map; import java.util.UUID; import java.util.concurrent.TimeoutException; @@ -48,7 +46,7 @@ import org.onap.bbs.event.processor.model.ServiceInstanceAaiObject; import org.onap.bbs.event.processor.tasks.AaiClientTask; import org.onap.bbs.event.processor.tasks.DmaapPublisherTask; import org.onap.bbs.event.processor.tasks.DmaapReRegistrationConsumerTask; -import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpResponse; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.slf4j.MDC; @@ -98,7 +96,7 @@ public class ReRegistrationPipeline { LOGGER.trace("Reactive PNF Re-registration pipeline subscribed - Execution started"); } - Flux<HttpResponse> executePipeline() { + Flux<MessageRouterPublishResponse> executePipeline() { return // Consume Re-Registration from DMaaP consumeReRegistrationsFromDmaap() @@ -110,12 +108,12 @@ public class ReRegistrationPipeline { .flatMap(this::triggerPolicy); } - private void onSuccess(HttpResponse responseCode) { - MDC.put(RESPONSE_CODE, String.valueOf(responseCode.statusCode())); - LOGGER.info("PNF Re-Registration event successfully handled. " - + "Publishing to DMaaP for Policy returned a status code of ({} {})", - responseCode.statusCode(), responseCode.statusReason()); - MDC.remove(RESPONSE_CODE); + private void onSuccess(MessageRouterPublishResponse response) { + if (response.successful()) { + LOGGER.info("PNF Re-Registration event successfully handled. Published Policy event to DMaaP"); + } else { + LOGGER.error("PNF Re-Registration event handling error [{}]", response.failReason()); + } } private void onError(Throwable throwable) { @@ -148,7 +146,7 @@ public class ReRegistrationPipeline { .map(event -> { // For each message, we have to keep separate state. This state will be enhanced // in each step and handed off to the next processing step - PipelineState state = new PipelineState(); + var state = new PipelineState(); state.setReRegistrationEvent(event); return state; }); @@ -160,9 +158,9 @@ public class ReRegistrationPipeline { private Mono<PipelineState> fetchPnfFromAai(PipelineState state) { - ReRegistrationConsumerDmaapModel vesEvent = state.getReRegistrationEvent(); - String pnfName = vesEvent.getCorrelationId(); - String url = String.format("/aai/v14/network/pnfs/pnf/%s?depth=all", pnfName); + var vesEvent = state.getReRegistrationEvent(); + var pnfName = vesEvent.getCorrelationId(); + var url = String.format("/aai/v14/network/pnfs/pnf/%s?depth=all", pnfName); LOGGER.debug("Processing Step: Retrieve PNF. Url: ({})", url); return aaiClientTask.executePnfRetrieval(RETRIEVE_PNF_TASK_NAME, url) @@ -196,10 +194,10 @@ public class ReRegistrationPipeline { return Mono.empty(); } - PnfAaiObject pnf = state.getPnfAaiObject(); + var pnf = state.getPnfAaiObject(); // Assuming that the PNF will only have a single service-instance relationship pointing // towards the HSI CFS service - String serviceInstanceId = pnf.getRelationshipListAaiObject().getRelationshipEntries() + var serviceInstanceId = pnf.getRelationshipListAaiObject().getRelationshipEntries() .stream() .filter(e -> "service-instance".equals(e.getRelatedTo())) .flatMap(e -> e.getRelationshipData().stream()) @@ -213,7 +211,7 @@ public class ReRegistrationPipeline { return Mono.empty(); } - String url = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all", + var url = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all", serviceInstanceId); LOGGER.debug("Processing Step: Retrieve HSI CFS Service. Url: ({})", url); return aaiClientTask.executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, url) @@ -236,8 +234,7 @@ public class ReRegistrationPipeline { } private boolean isNotReallyAnOntRelocation(PipelineState state) { - List<RelationshipListAaiObject.RelationshipEntryAaiObject> relationshipEntries = - state.getPnfAaiObject().getRelationshipListAaiObject().getRelationshipEntries(); + var relationshipEntries = state.getPnfAaiObject().getRelationshipListAaiObject().getRelationshipEntries(); // If no logical-link, fail further processing if (relationshipEntries.stream().noneMatch(e -> "logical-link".equals(e.getRelatedTo()))) { @@ -247,7 +244,7 @@ public class ReRegistrationPipeline { } // Assuming PNF will only have one logical-link per BBS use case design - boolean isNotRelocation = relationshipEntries + var isNotRelocation = relationshipEntries .stream() .filter(e -> "logical-link".equals(e.getRelatedTo())) .flatMap(e -> e.getRelationshipData().stream()) @@ -263,13 +260,13 @@ public class ReRegistrationPipeline { return isNotRelocation; } - private Mono<HttpResponse> triggerPolicy(PipelineState state) { + private Flux<MessageRouterPublishResponse> triggerPolicy(PipelineState state) { if (state == null || state.getHsiCfsServiceInstance() == null) { - return Mono.empty(); + return Flux.empty(); } - ControlLoopPublisherDmaapModel event = buildTriggeringPolicyEvent(state); + var event = buildTriggeringPolicyEvent(state); return publisherTask.execute(event) .timeout(Duration.ofSeconds(configuration.getPipelinesTimeoutInSeconds())) .doOnError(TimeoutException.class, @@ -283,12 +280,12 @@ public class ReRegistrationPipeline { private ControlLoopPublisherDmaapModel buildTriggeringPolicyEvent(PipelineState state) { - String cfsServiceInstanceId = state.getHsiCfsServiceInstance().getServiceInstanceId(); + var cfsServiceInstanceId = state.getHsiCfsServiceInstance().getServiceInstanceId(); - String attachmentPoint = state.getReRegistrationEvent().getAttachmentPoint(); - String remoteId = state.getReRegistrationEvent().getRemoteId(); - String cvlan = state.getReRegistrationEvent().getCVlan(); - String svlan = state.getReRegistrationEvent().getSVlan(); + var attachmentPoint = state.getReRegistrationEvent().getAttachmentPoint(); + var remoteId = state.getReRegistrationEvent().getRemoteId(); + var cvlan = state.getReRegistrationEvent().getCVlan(); + var svlan = state.getReRegistrationEvent().getSVlan(); Map<String, String> enrichmentData = new HashMap<>(); enrichmentData.put("service-information.hsia-cfs-service-instance-id", cfsServiceInstanceId); diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/pipelines/Scheduler.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/pipelines/Scheduler.java index 5fbb0875..882635ce 100644 --- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/pipelines/Scheduler.java +++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/pipelines/Scheduler.java @@ -96,7 +96,7 @@ public class Scheduler implements ConfigurationChangeObserver { LOGGER.info("BBS event processing pipelines will start in {} seconds " + "and will run periodically every {} seconds", PIPELINES_INITIAL_DELAY_IN_SECONDS, currentPipelinesPollingInterval); - Instant desiredStartTime = Instant.now().plusSeconds(PIPELINES_INITIAL_DELAY_IN_SECONDS); + var desiredStartTime = Instant.now().plusSeconds(PIPELINES_INITIAL_DELAY_IN_SECONDS); scheduleProcessingTasks(desiredStartTime, currentPipelinesPollingInterval); // Register for configuration changes @@ -118,7 +118,7 @@ public class Scheduler implements ConfigurationChangeObserver { cancelScheduledProcessingTasks(); reScheduleProcessingTasks(); } - int newCbsPollingInterval = configuration.getCbsPollingInterval(); + var newCbsPollingInterval = configuration.getCbsPollingInterval(); if (newCbsPollingInterval != currentCbsPollingInterval) { if (newCbsPollingInterval < DEFAULT_CBS_POLLING_INTERVAL) { LOGGER.warn("CBS Polling interval is too small ({}). Will not re-schedule CBS job", @@ -222,9 +222,9 @@ public class Scheduler implements ConfigurationChangeObserver { } private int validatePipelinesPollingInterval() { - int pipelinesPollingInterval = configuration.getPipelinesPollingIntervalInSeconds(); - boolean isSmallInterval = pipelinesPollingInterval < DEFAULT_PIPELINES_POLLING_INTERVAL; - int verifiedInterval = isSmallInterval ? DEFAULT_PIPELINES_POLLING_INTERVAL : pipelinesPollingInterval; + var pipelinesPollingInterval = configuration.getPipelinesPollingIntervalInSeconds(); + var isSmallInterval = pipelinesPollingInterval < DEFAULT_PIPELINES_POLLING_INTERVAL; + var verifiedInterval = isSmallInterval ? DEFAULT_PIPELINES_POLLING_INTERVAL : pipelinesPollingInterval; if (isSmallInterval) { LOGGER.warn("Pipelines Polling interval is too small ({}). Defaulting to {}", pipelinesPollingInterval, DEFAULT_PIPELINES_POLLING_INTERVAL); @@ -233,9 +233,9 @@ public class Scheduler implements ConfigurationChangeObserver { } private int verifyCbsPollingInterval() { - int cbsPollingInterval = configuration.getCbsPollingInterval(); - boolean isSmallInterval = cbsPollingInterval < DEFAULT_CBS_POLLING_INTERVAL; - int verifiedInterval = isSmallInterval ? DEFAULT_CBS_POLLING_INTERVAL : cbsPollingInterval; + var cbsPollingInterval = configuration.getCbsPollingInterval(); + var isSmallInterval = cbsPollingInterval < DEFAULT_CBS_POLLING_INTERVAL; + var verifiedInterval = isSmallInterval ? DEFAULT_CBS_POLLING_INTERVAL : cbsPollingInterval; if (isSmallInterval) { LOGGER.warn("CBS Polling interval is too small ({}). Defaulting to {}", cbsPollingInterval, DEFAULT_CBS_POLLING_INTERVAL); diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapCpeAuthenticationConsumerTaskImpl.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapCpeAuthenticationConsumerTaskImpl.java index da510281..153cb91b 100644 --- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapCpeAuthenticationConsumerTaskImpl.java +++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapCpeAuthenticationConsumerTaskImpl.java @@ -20,59 +20,61 @@ package org.onap.bbs.event.processor.tasks; -import com.google.gson.JsonElement; - -import java.util.Optional; +import static org.onap.bbs.event.processor.config.ApplicationConstants.SUBSCRIBE_URL_TEMPLATE; +import static org.onap.bbs.event.processor.utilities.GenericUtils.createSubscribeRequest; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; -import javax.net.ssl.SSLException; import org.onap.bbs.event.processor.config.ApplicationConfiguration; import org.onap.bbs.event.processor.config.ConfigurationChangeObserver; import org.onap.bbs.event.processor.exceptions.EmptyDmaapResponseException; import org.onap.bbs.event.processor.model.CpeAuthenticationConsumerDmaapModel; import org.onap.bbs.event.processor.utilities.CpeAuthenticationDmaapConsumerJsonParser; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.ConsumerReactiveHttpClientFactory; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.DMaaPConsumerReactiveHttpClient; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.DMaaPReactiveWebClientFactory; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterSubscriber; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.stereotype.Component; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @Component -public class DmaapCpeAuthenticationConsumerTaskImpl - implements DmaapCpeAuthenticationConsumerTask, ConfigurationChangeObserver { +public class DmaapCpeAuthenticationConsumerTaskImpl implements DmaapCpeAuthenticationConsumerTask, + ConfigurationChangeObserver { private static final Logger LOGGER = LoggerFactory.getLogger(DmaapCpeAuthenticationConsumerTaskImpl.class); - private ApplicationConfiguration configuration; + private final CpeAuthenticationDmaapConsumerJsonParser cpeAuthenticationDmaapConsumerJsonParser; - private final ConsumerReactiveHttpClientFactory httpClientFactory; + private ApplicationConfiguration configuration; + private MessageRouterSubscriber subscriber; + private String subscribeUrl; + private MessageRouterSubscribeRequest subscribeRequest; private static final EmptyDmaapResponseException EMPTY_DMAAP_EXCEPTION = - new EmptyDmaapResponseException("CPE Authentication: Got an empty response from DMaaP"); - - private DMaaPConsumerReactiveHttpClient httpClient; + new EmptyDmaapResponseException("CPE Authentication: Got an empty response from DMaaP"); @Autowired - public DmaapCpeAuthenticationConsumerTaskImpl(ApplicationConfiguration configuration) throws SSLException { - this(configuration, new CpeAuthenticationDmaapConsumerJsonParser(), - new ConsumerReactiveHttpClientFactory(new DMaaPReactiveWebClientFactory())); - } - DmaapCpeAuthenticationConsumerTaskImpl(ApplicationConfiguration configuration, - CpeAuthenticationDmaapConsumerJsonParser - cpeAuthenticationDmaapConsumerJsonParser, - ConsumerReactiveHttpClientFactory httpClientFactory) throws SSLException { + @Qualifier("CpeAuthMessageRouterSubscriber") + MessageRouterSubscriber subscriber, + CpeAuthenticationDmaapConsumerJsonParser parser) { + this.cpeAuthenticationDmaapConsumerJsonParser = parser; this.configuration = configuration; - this.cpeAuthenticationDmaapConsumerJsonParser = cpeAuthenticationDmaapConsumerJsonParser; - this.httpClientFactory = httpClientFactory; - - httpClient = httpClientFactory.create(this.configuration.getDmaapCpeAuthenticationConsumerConfiguration()); + this.subscriber = subscriber; + subscribeUrl = String.format(SUBSCRIBE_URL_TEMPLATE, + this.configuration.getDmaapCpeAuthenticationConsumerProperties().getDmaapProtocol(), + this.configuration.getDmaapCpeAuthenticationConsumerProperties().getDmaapHostName(), + this.configuration.getDmaapCpeAuthenticationConsumerProperties().getDmaapPortNumber(), + this.configuration.getDmaapCpeAuthenticationConsumerProperties().getDmaapTopicName()); + + subscribeRequest = createSubscribeRequest( + subscribeUrl, + this.configuration.getDmaapCpeAuthenticationConsumerProperties().getConsumerGroup(), + this.configuration.getDmaapCpeAuthenticationConsumerProperties().getConsumerId()); } @PostConstruct @@ -87,24 +89,25 @@ public class DmaapCpeAuthenticationConsumerTaskImpl @Override public synchronized void updateConfiguration() { - try { - LOGGER.info("DMaaP CPE authentication consumer update due to new application configuration"); - LOGGER.info("Creating secure context with:\n {}", - this.configuration.getDmaapCpeAuthenticationConsumerConfiguration()); - httpClient = httpClientFactory.create(this.configuration.getDmaapCpeAuthenticationConsumerConfiguration()); - } catch (SSLException e) { - LOGGER.error("SSL error while updating HTTP Client after a config update"); - LOGGER.debug("SSL exception\n", e); - } + LOGGER.info("DMaaP CPE authentication consumer update due to new application configuration"); + subscribeUrl = String.format(SUBSCRIBE_URL_TEMPLATE, + configuration.getDmaapCpeAuthenticationConsumerProperties().getDmaapProtocol(), + configuration.getDmaapCpeAuthenticationConsumerProperties().getDmaapHostName(), + configuration.getDmaapCpeAuthenticationConsumerProperties().getDmaapPortNumber(), + configuration.getDmaapCpeAuthenticationConsumerProperties().getDmaapTopicName()); + subscribeRequest = createSubscribeRequest( + subscribeUrl, + this.configuration.getDmaapCpeAuthenticationConsumerProperties().getConsumerGroup(), + this.configuration.getDmaapCpeAuthenticationConsumerProperties().getConsumerId()); } @Override public Flux<CpeAuthenticationConsumerDmaapModel> execute(String taskName) { LOGGER.debug("Executing task for CPE-Authentication with name \"{}\"", taskName); - DMaaPConsumerReactiveHttpClient httpClient = getHttpClient(); - Mono<JsonElement> response = httpClient.getDMaaPConsumerResponse(Optional.empty()); - return cpeAuthenticationDmaapConsumerJsonParser.extractModelFromDmaap(response) - .switchIfEmpty(Flux.error(EMPTY_DMAAP_EXCEPTION)) + return subscriber.getElements(subscribeRequest) // subscriber.get(subscribeRequest) + .flatMap(jsonElement -> + cpeAuthenticationDmaapConsumerJsonParser.extractModelFromDmaap(Mono.just(jsonElement))) + .switchIfEmpty(Mono.error(EMPTY_DMAAP_EXCEPTION)) .doOnError(e -> { if (!(e instanceof EmptyDmaapResponseException)) { LOGGER.error("DMaaP Consumption Exception: {}", e.getMessage()); @@ -112,8 +115,4 @@ public class DmaapCpeAuthenticationConsumerTaskImpl } }); } - - private synchronized DMaaPConsumerReactiveHttpClient getHttpClient() { - return httpClient; - } } diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapPublisherTask.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapPublisherTask.java index 749c4e53..dec1dbcd 100644 --- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapPublisherTask.java +++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapPublisherTask.java @@ -21,11 +21,11 @@ package org.onap.bbs.event.processor.tasks; import org.onap.bbs.event.processor.model.ControlLoopPublisherDmaapModel; -import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpResponse; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse; -import reactor.core.publisher.Mono; +import reactor.core.publisher.Flux; public interface DmaapPublisherTask { - Mono<HttpResponse> execute(ControlLoopPublisherDmaapModel controlLoopPublisherDmaapModel); + Flux<MessageRouterPublishResponse> execute(ControlLoopPublisherDmaapModel controlLoopPublisherDmaapModel); } diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapPublisherTaskImpl.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapPublisherTaskImpl.java index 283e5ef9..6c50b10d 100644 --- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapPublisherTaskImpl.java +++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapPublisherTaskImpl.java @@ -20,54 +20,48 @@ package org.onap.bbs.event.processor.tasks; -import java.util.Optional; +import static org.onap.bbs.event.processor.config.ApplicationConstants.PUBLISH_URL_TEMPLATE; +import static org.onap.bbs.event.processor.utilities.GenericUtils.createPublishRequest; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; -import javax.net.ssl.SSLException; import org.onap.bbs.event.processor.config.ApplicationConfiguration; import org.onap.bbs.event.processor.config.ConfigurationChangeObserver; import org.onap.bbs.event.processor.exceptions.DmaapException; import org.onap.bbs.event.processor.model.ControlLoopPublisherDmaapModel; import org.onap.bbs.event.processor.utilities.ControlLoopJsonBodyBuilder; -import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpResponse; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.producer.DMaaPPublisherReactiveHttpClient; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.producer.DmaaPRestTemplateFactory; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.producer.PublisherReactiveHttpClientFactory; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DmaapClientFactory; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterPublisher; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; -import reactor.core.publisher.Mono; +import reactor.core.publisher.Flux; @Component public class DmaapPublisherTaskImpl implements DmaapPublisherTask, ConfigurationChangeObserver { private static final Logger LOGGER = LoggerFactory.getLogger(DmaapPublisherTaskImpl.class); - private ApplicationConfiguration configuration; - private final PublisherReactiveHttpClientFactory httpClientFactory; - private DMaaPPublisherReactiveHttpClient httpClient; + private ApplicationConfiguration configuration; + private MessageRouterPublisher publisher; + private String publishUrl; + private MessageRouterPublishRequest publishRequest; @Autowired - DmaapPublisherTaskImpl(ApplicationConfiguration configuration) { - this(configuration, new PublisherReactiveHttpClientFactory(new DmaaPRestTemplateFactory(), - new ControlLoopJsonBodyBuilder())); - } - - DmaapPublisherTaskImpl(ApplicationConfiguration configuration, - PublisherReactiveHttpClientFactory httpClientFactory) { + DmaapPublisherTaskImpl(ApplicationConfiguration configuration, MessageRouterPublisher publisher) { this.configuration = configuration; - this.httpClientFactory = httpClientFactory; - - try { - httpClient = httpClientFactory.create(this.configuration.getDmaapPublisherConfiguration()); - } catch (SSLException e) { - LOGGER.error("SSL error while creating HTTP Client: {}", e.getMessage()); - LOGGER.debug("SSL exception\n", e); - } + this.publisher = publisher; + publishUrl = String.format(PUBLISH_URL_TEMPLATE, + this.configuration.getDmaapProducerProperties().getDmaapProtocol(), + this.configuration.getDmaapProducerProperties().getDmaapHostName(), + this.configuration.getDmaapProducerProperties().getDmaapPortNumber(), + this.configuration.getDmaapProducerProperties().getDmaapTopicName()); + publishRequest = createPublishRequest(publishUrl); } @PostConstruct @@ -83,27 +77,23 @@ public class DmaapPublisherTaskImpl implements DmaapPublisherTask, Configuration @Override public synchronized void updateConfiguration() { LOGGER.info("DMaaP Publisher update due to new application configuration"); - try { - LOGGER.info("Creating secure context with:\n {}", this.configuration.getDmaapPublisherConfiguration()); - httpClient = httpClientFactory.create(this.configuration.getDmaapPublisherConfiguration()); - } catch (SSLException e) { - LOGGER.error("SSL error while updating HTTP Client after a config update: {}", e.getMessage()); - LOGGER.debug("SSL exception\n", e); - } + publisher = + DmaapClientFactory.createMessageRouterPublisher(configuration.getDmaapPublisherConfiguration()); + publishUrl = String.format(PUBLISH_URL_TEMPLATE, + configuration.getDmaapProducerProperties().getDmaapProtocol(), + configuration.getDmaapProducerProperties().getDmaapHostName(), + configuration.getDmaapProducerProperties().getDmaapPortNumber(), + configuration.getDmaapProducerProperties().getDmaapTopicName()); + publishRequest = createPublishRequest(publishUrl); } @Override - public Mono<HttpResponse> execute(ControlLoopPublisherDmaapModel controlLoopPublisherDmaapModel) { - if (controlLoopPublisherDmaapModel == null) { + public Flux<MessageRouterPublishResponse> execute(ControlLoopPublisherDmaapModel event) { + if (event == null) { throw new DmaapException("Cannot invoke a DMaaP Publish task with a null message"); } LOGGER.info("Executing task for publishing control loop message"); - LOGGER.debug("CL message \n{}", controlLoopPublisherDmaapModel); - DMaaPPublisherReactiveHttpClient httpClient = getHttpClient(); - return httpClient.getDMaaPProducerResponse(controlLoopPublisherDmaapModel, Optional.empty()); - } - - private synchronized DMaaPPublisherReactiveHttpClient getHttpClient() { - return httpClient; + LOGGER.debug("CL message \n{}", event); + return publisher.put(publishRequest, Flux.just(ControlLoopJsonBodyBuilder.createAsJsonElement(event))); } } diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapReRegistrationConsumerTaskImpl.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapReRegistrationConsumerTaskImpl.java index e40037b1..aff563c5 100644 --- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapReRegistrationConsumerTaskImpl.java +++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapReRegistrationConsumerTaskImpl.java @@ -20,25 +20,23 @@ package org.onap.bbs.event.processor.tasks; -import com.google.gson.JsonElement; - -import java.util.Optional; +import static org.onap.bbs.event.processor.config.ApplicationConstants.SUBSCRIBE_URL_TEMPLATE; +import static org.onap.bbs.event.processor.utilities.GenericUtils.createSubscribeRequest; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; -import javax.net.ssl.SSLException; import org.onap.bbs.event.processor.config.ApplicationConfiguration; import org.onap.bbs.event.processor.config.ConfigurationChangeObserver; import org.onap.bbs.event.processor.exceptions.EmptyDmaapResponseException; import org.onap.bbs.event.processor.model.ReRegistrationConsumerDmaapModel; import org.onap.bbs.event.processor.utilities.ReRegistrationDmaapConsumerJsonParser; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.ConsumerReactiveHttpClientFactory; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.DMaaPConsumerReactiveHttpClient; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.DMaaPReactiveWebClientFactory; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterSubscriber; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.stereotype.Component; import reactor.core.publisher.Flux; @@ -49,30 +47,33 @@ public class DmaapReRegistrationConsumerTaskImpl implements DmaapReRegistrationC ConfigurationChangeObserver { private static final Logger LOGGER = LoggerFactory.getLogger(DmaapReRegistrationConsumerTaskImpl.class); - private ApplicationConfiguration configuration; + private final ReRegistrationDmaapConsumerJsonParser reRegistrationDmaapConsumerJsonParser; - private final ConsumerReactiveHttpClientFactory httpClientFactory; + private ApplicationConfiguration configuration; + private MessageRouterSubscriber subscriber; + private String subscribeUrl; + private MessageRouterSubscribeRequest subscribeRequest; private static final EmptyDmaapResponseException EMPTY_DMAAP_EXCEPTION = new EmptyDmaapResponseException("PNF Re-Registration: Got an empty response from DMaaP"); - private DMaaPConsumerReactiveHttpClient httpClient; - @Autowired - public DmaapReRegistrationConsumerTaskImpl(ApplicationConfiguration configuration) throws SSLException { - this(configuration, new ReRegistrationDmaapConsumerJsonParser(), - new ConsumerReactiveHttpClientFactory(new DMaaPReactiveWebClientFactory())); - } - DmaapReRegistrationConsumerTaskImpl(ApplicationConfiguration configuration, - ReRegistrationDmaapConsumerJsonParser reRegDmaapConsumerJsonParser, - ConsumerReactiveHttpClientFactory httpClientFactory) - throws SSLException { + @Qualifier("ReRegMessageRouterSubscriber") MessageRouterSubscriber subscriber, + ReRegistrationDmaapConsumerJsonParser parser) { + this.reRegistrationDmaapConsumerJsonParser = parser; this.configuration = configuration; - this.reRegistrationDmaapConsumerJsonParser = reRegDmaapConsumerJsonParser; - this.httpClientFactory = httpClientFactory; - - httpClient = httpClientFactory.create(this.configuration.getDmaapReRegistrationConsumerConfiguration()); + this.subscriber = subscriber; + subscribeUrl = String.format(SUBSCRIBE_URL_TEMPLATE, + this.configuration.getDmaapReRegistrationConsumerProperties().getDmaapProtocol(), + this.configuration.getDmaapReRegistrationConsumerProperties().getDmaapHostName(), + this.configuration.getDmaapReRegistrationConsumerProperties().getDmaapPortNumber(), + this.configuration.getDmaapReRegistrationConsumerProperties().getDmaapTopicName()); + + subscribeRequest = createSubscribeRequest( + subscribeUrl, + this.configuration.getDmaapReRegistrationConsumerProperties().getConsumerGroup(), + this.configuration.getDmaapReRegistrationConsumerProperties().getConsumerId()); } @PostConstruct @@ -87,24 +88,25 @@ public class DmaapReRegistrationConsumerTaskImpl implements DmaapReRegistrationC @Override public synchronized void updateConfiguration() { - try { - LOGGER.info("DMaaP PNF reregistration consumer update due to new application configuration"); - LOGGER.info("Creating secure context with:\n {}", - this.configuration.getDmaapReRegistrationConsumerConfiguration()); - httpClient = httpClientFactory.create(this.configuration.getDmaapReRegistrationConsumerConfiguration()); - } catch (SSLException e) { - LOGGER.error("SSL error while updating HTTP Client after a config update"); - LOGGER.debug("SSL exception\n", e); - } + LOGGER.info("DMaaP PNF reregistration consumer update due to new application configuration"); + subscribeUrl = String.format(SUBSCRIBE_URL_TEMPLATE, + configuration.getDmaapReRegistrationConsumerProperties().getDmaapProtocol(), + configuration.getDmaapReRegistrationConsumerProperties().getDmaapHostName(), + configuration.getDmaapReRegistrationConsumerProperties().getDmaapPortNumber(), + configuration.getDmaapReRegistrationConsumerProperties().getDmaapTopicName()); + subscribeRequest = createSubscribeRequest( + subscribeUrl, + this.configuration.getDmaapReRegistrationConsumerProperties().getConsumerGroup(), + this.configuration.getDmaapReRegistrationConsumerProperties().getConsumerId()); } @Override public Flux<ReRegistrationConsumerDmaapModel> execute(String taskName) { LOGGER.debug("Executing task for Re-Registration with name \"{}\"", taskName); - DMaaPConsumerReactiveHttpClient httpClient = getHttpClient(); - Mono<JsonElement> response = httpClient.getDMaaPConsumerResponse(Optional.empty()); - return reRegistrationDmaapConsumerJsonParser.extractModelFromDmaap(response) - .switchIfEmpty(Flux.error(EMPTY_DMAAP_EXCEPTION)) + return subscriber.getElements(subscribeRequest) + .flatMap(jsonElement -> + reRegistrationDmaapConsumerJsonParser.extractModelFromDmaap(Mono.just(jsonElement))) + .switchIfEmpty(Mono.error(EMPTY_DMAAP_EXCEPTION)) .doOnError(e -> { if (!(e instanceof EmptyDmaapResponseException)) { LOGGER.error("DMaaP Consumption Exception: {}", e.getMessage()); @@ -112,8 +114,4 @@ public class DmaapReRegistrationConsumerTaskImpl implements DmaapReRegistrationC } }); } - - private synchronized DMaaPConsumerReactiveHttpClient getHttpClient() { - return httpClient; - } } diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/AaiReactiveClient.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/AaiReactiveClient.java index 84fc9f7d..c0e9b1c4 100644 --- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/AaiReactiveClient.java +++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/AaiReactiveClient.java @@ -20,6 +20,8 @@ package org.onap.bbs.event.processor.utilities; +import static org.onap.bbs.event.processor.utilities.GenericUtils.keyStoreFromResource; +import static org.onap.dcaegen2.services.sdk.security.ssl.Passwords.fromPath; import static org.springframework.web.reactive.function.client.ExchangeFilterFunctions.basicAuthentication; import com.google.gson.Gson; @@ -27,9 +29,10 @@ import com.google.gson.JsonSyntaxException; import io.netty.handler.ssl.SslContext; +import java.nio.file.Paths; + import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; -import javax.net.ssl.SSLException; import org.onap.bbs.event.processor.config.AaiClientConfiguration; import org.onap.bbs.event.processor.config.ApplicationConfiguration; @@ -37,7 +40,9 @@ import org.onap.bbs.event.processor.config.ConfigurationChangeObserver; import org.onap.bbs.event.processor.exceptions.AaiTaskException; import org.onap.bbs.event.processor.model.PnfAaiObject; import org.onap.bbs.event.processor.model.ServiceInstanceAaiObject; -import org.onap.dcaegen2.services.sdk.rest.services.ssl.SslFactory; +import org.onap.dcaegen2.services.sdk.security.ssl.ImmutableSecurityKeys; +import org.onap.dcaegen2.services.sdk.security.ssl.SecurityKeys; +import org.onap.dcaegen2.services.sdk.security.ssl.SslFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -64,7 +69,7 @@ public class AaiReactiveClient implements ConfigurationChangeObserver { private AaiClientConfiguration aaiClientConfiguration; @Autowired - AaiReactiveClient(ApplicationConfiguration configuration, Gson gson) throws SSLException { + AaiReactiveClient(ApplicationConfiguration configuration, Gson gson) { this.configuration = configuration; this.gson = gson; this.sslFactory = new SslFactory(); @@ -85,25 +90,20 @@ public class AaiReactiveClient implements ConfigurationChangeObserver { @Override public void updateConfiguration() { - AaiClientConfiguration newConfiguration = configuration.getAaiClientConfiguration(); + var newConfiguration = configuration.getAaiClientConfiguration(); if (aaiClientConfiguration.equals(newConfiguration)) { LOGGER.info("No Configuration changes necessary for AAI Reactive client"); } else { synchronized (this) { LOGGER.info("AAI Reactive client must be re-configured"); aaiClientConfiguration = newConfiguration; - try { - setupWebClient(); - } catch (SSLException e) { - LOGGER.error("AAI Reactive client SSL error while re-configuring WebClient"); - LOGGER.debug("SSL Exception\n", e); - } + setupWebClient(); } } } - private void setupWebClient() throws SSLException { - SslContext sslContext = createSslContext(); + private void setupWebClient() { + var sslContext = createSslContext(); ClientHttpConnector reactorClientHttpConnector = new ReactorClientHttpConnector( HttpClient.create().secure(sslContextSpec -> sslContextSpec.sslContext(sslContext))); @@ -132,7 +132,7 @@ public class AaiReactiveClient implements ConfigurationChangeObserver { private <T> Mono<T> performReactiveHttpGet(String url, Class<T> responseType) { LOGGER.debug("Will issue Reactive GET request to URL ({}) for object ({})", url, responseType.getName()); - WebClient webClient = getWebClient(); + var webClient = getWebClient(); return webClient .get() .uri(url) @@ -179,17 +179,18 @@ public class AaiReactiveClient implements ConfigurationChangeObserver { }); } - private SslContext createSslContext() throws SSLException { + private SslContext createSslContext() { if (aaiClientConfiguration.enableAaiCertAuth()) { LOGGER.info("Creating secure context with:\n {}", aaiClientConfiguration); - return sslFactory.createSecureContext( - aaiClientConfiguration.keyStorePath(), - aaiClientConfiguration.keyStorePasswordPath(), - aaiClientConfiguration.trustStorePath(), - aaiClientConfiguration.trustStorePasswordPath() - ); + final SecurityKeys securityKeys = ImmutableSecurityKeys.builder() + .keyStore(keyStoreFromResource(aaiClientConfiguration.keyStorePath())) + .keyStorePassword(fromPath(Paths.get(aaiClientConfiguration.keyStorePasswordPath()))) + .trustStore(keyStoreFromResource(aaiClientConfiguration.trustStorePath())) + .trustStorePassword(fromPath(Paths.get(aaiClientConfiguration.trustStorePasswordPath()))) + .build(); + return sslFactory.createSecureClientContext(securityKeys); } - return sslFactory.createInsecureContext(); + return sslFactory.createInsecureClientContext(); } private synchronized WebClient getWebClient() { diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/CommonEventFields.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/CommonEventFields.java index c2b67348..b3fa09db 100644 --- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/CommonEventFields.java +++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/CommonEventFields.java @@ -20,7 +20,7 @@ package org.onap.bbs.event.processor.utilities; -public class CommonEventFields { +class CommonEventFields { static final String COMMON_FORMAT = "\": \"%s\""; static final String EVENT = "event"; diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/ControlLoopJsonBodyBuilder.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/ControlLoopJsonBodyBuilder.java index 03ea0936..f27cca5e 100644 --- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/ControlLoopJsonBodyBuilder.java +++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/ControlLoopJsonBodyBuilder.java @@ -21,24 +21,23 @@ package org.onap.bbs.event.processor.utilities; import com.google.gson.GsonBuilder; +import com.google.gson.JsonElement; import com.google.gson.TypeAdapterFactory; import java.util.ServiceLoader; import org.onap.bbs.event.processor.model.ControlLoopPublisherDmaapModel; import org.onap.bbs.event.processor.model.ImmutableControlLoopPublisherDmaapModel; -import org.onap.dcaegen2.services.sdk.rest.services.model.JsonBodyBuilder; -public class ControlLoopJsonBodyBuilder implements JsonBodyBuilder<ControlLoopPublisherDmaapModel> { +public class ControlLoopJsonBodyBuilder { /** * Serialize the Control Loop DMaaP model with GSON. * @param publisherDmaapModel object to be serialized * @return String output of serialization */ - @Override public String createJsonBody(ControlLoopPublisherDmaapModel publisherDmaapModel) { - GsonBuilder gsonBuilder = new GsonBuilder().disableHtmlEscaping(); + var gsonBuilder = new GsonBuilder().disableHtmlEscaping(); ServiceLoader.load(TypeAdapterFactory.class).forEach(gsonBuilder::registerTypeAdapterFactory); return gsonBuilder.create().toJson(ImmutableControlLoopPublisherDmaapModel.builder() .closedLoopEventClient(publisherDmaapModel.getClosedLoopEventClient()) @@ -56,4 +55,30 @@ public class ControlLoopJsonBodyBuilder implements JsonBodyBuilder<ControlLoopPu .originator(publisherDmaapModel.getOriginator()) .build()); } + + /** + * Serialize the Control Loop DMaaP model with GSON. + * @param publisherDmaapModel object to be serialized + * @return String output of serialization + */ + public static JsonElement createAsJsonElement(ControlLoopPublisherDmaapModel publisherDmaapModel) { + var gsonBuilder = new GsonBuilder().disableHtmlEscaping(); + ServiceLoader.load(TypeAdapterFactory.class).forEach(gsonBuilder::registerTypeAdapterFactory); + return gsonBuilder.create().toJsonTree(ImmutableControlLoopPublisherDmaapModel.builder() + .closedLoopEventClient(publisherDmaapModel.getClosedLoopEventClient()) + .policyVersion(publisherDmaapModel.getPolicyVersion()) + .policyName(publisherDmaapModel.getPolicyName()) + .policyScope(publisherDmaapModel.getPolicyScope()) + .targetType(publisherDmaapModel.getTargetType()) + .aaiEnrichmentData(publisherDmaapModel.getAaiEnrichmentData()) + .closedLoopAlarmStart(publisherDmaapModel.getClosedLoopAlarmStart()) + .closedLoopEventStatus(publisherDmaapModel.getClosedLoopEventStatus()) + .closedLoopControlName(publisherDmaapModel.getClosedLoopControlName()) + .version(publisherDmaapModel.getVersion()) + .target(publisherDmaapModel.getTarget()) + .requestId(publisherDmaapModel.getRequestId()) + .originator(publisherDmaapModel.getOriginator()) + .build()); + } + } diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/CpeAuthenticationDmaapConsumerJsonParser.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/CpeAuthenticationDmaapConsumerJsonParser.java index 3cff4e65..5a0466ac 100644 --- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/CpeAuthenticationDmaapConsumerJsonParser.java +++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/CpeAuthenticationDmaapConsumerJsonParser.java @@ -46,11 +46,13 @@ import org.onap.bbs.event.processor.model.CpeAuthenticationConsumerDmaapModel; import org.onap.bbs.event.processor.model.ImmutableCpeAuthenticationConsumerDmaapModel; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; import org.springframework.util.StringUtils; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +@Component public class CpeAuthenticationDmaapConsumerJsonParser { private static final Logger LOGGER = LoggerFactory.getLogger(CpeAuthenticationDmaapConsumerJsonParser.class); @@ -108,9 +110,9 @@ public class CpeAuthenticationDmaapConsumerJsonParser { return Mono.empty(); } - JsonObject commonEventHeader = dmaapResponseJsonObject.getAsJsonObject(EVENT) + var commonEventHeader = dmaapResponseJsonObject.getAsJsonObject(EVENT) .getAsJsonObject(COMMON_EVENT_HEADER); - JsonObject stateChangeFields = dmaapResponseJsonObject.getAsJsonObject(EVENT) + var stateChangeFields = dmaapResponseJsonObject.getAsJsonObject(EVENT) .getAsJsonObject(STATE_CHANGE_FIELDS); pnfSourceName = getValueFromJson(commonEventHeader, SOURCE_NAME); @@ -120,7 +122,7 @@ public class CpeAuthenticationDmaapConsumerJsonParser { stateInterface = getValueFromJson(stateChangeFields, STATE_INTERFACE); if (stateChangeFields.has(ADDITIONAL_FIELDS)) { - JsonObject additionalFields = stateChangeFields.getAsJsonObject(ADDITIONAL_FIELDS); + var additionalFields = stateChangeFields.getAsJsonObject(ADDITIONAL_FIELDS); rgwMacAddress = getValueFromJson(additionalFields, RGW_MAC_ADDRESS); swVersion = getValueFromJson(additionalFields, SW_VERSION); } @@ -128,7 +130,7 @@ public class CpeAuthenticationDmaapConsumerJsonParser { if (StringUtils.isEmpty(pnfSourceName) || authenticationStatusMissing(oldAuthenticationStatus) || authenticationStatusMissing(newAuthenticationStatus)) { - String incorrectEvent = dumpJsonData(); + var incorrectEvent = dumpJsonData(); LOGGER.warn("Incorrect CPE Authentication JSON event: {}", incorrectEvent); return Mono.empty(); } @@ -164,7 +166,7 @@ public class CpeAuthenticationDmaapConsumerJsonParser { } Optional<JsonObject> getJsonObjectFromAnArray(JsonElement element) { - JsonParser jsonParser = new JsonParser(); + var jsonParser = new JsonParser(); return element.isJsonPrimitive() ? Optional.of(jsonParser.parse(element.getAsString()).getAsJsonObject()) : Optional.of(jsonParser.parse(element.toString()).getAsJsonObject()); } diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/CpeAuthenticationJsonBodyBuilder.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/CpeAuthenticationJsonBodyBuilder.java index 09aa7303..df8eaa40 100644 --- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/CpeAuthenticationJsonBodyBuilder.java +++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/CpeAuthenticationJsonBodyBuilder.java @@ -27,18 +27,16 @@ import java.util.ServiceLoader; import org.onap.bbs.event.processor.model.CpeAuthenticationConsumerDmaapModel; import org.onap.bbs.event.processor.model.ImmutableCpeAuthenticationConsumerDmaapModel; -import org.onap.dcaegen2.services.sdk.rest.services.model.JsonBodyBuilder; -public class CpeAuthenticationJsonBodyBuilder implements JsonBodyBuilder<CpeAuthenticationConsumerDmaapModel> { +public class CpeAuthenticationJsonBodyBuilder { /** * Serialize the CPE authentication DMaaP model with GSON. * @param cpeAuthenticationConsumerDmaapModel object to be serialized * @return String output of serialization */ - @Override public String createJsonBody(CpeAuthenticationConsumerDmaapModel cpeAuthenticationConsumerDmaapModel) { - GsonBuilder gsonBuilder = new GsonBuilder(); + var gsonBuilder = new GsonBuilder(); ServiceLoader.load(TypeAdapterFactory.class).forEach(gsonBuilder::registerTypeAdapterFactory); return gsonBuilder.create().toJson(ImmutableCpeAuthenticationConsumerDmaapModel.builder() .correlationId(cpeAuthenticationConsumerDmaapModel.getCorrelationId()) diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/GenericUtils.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/GenericUtils.java new file mode 100644 index 00000000..be8bea09 --- /dev/null +++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/GenericUtils.java @@ -0,0 +1,98 @@ +/* + * ============LICENSE_START======================================================= + * BBS-RELOCATION-CPE-AUTHENTICATION-HANDLER + * ================================================================================ + * Copyright (C) 2019 NOKIA Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.bbs.event.processor.utilities; + +import java.nio.file.Paths; + +import org.jetbrains.annotations.NotNull; +import org.onap.bbs.event.processor.exceptions.ApplicationEnvironmentException; +import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableMessageRouterSink; +import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableMessageRouterSource; +import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSink; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.ContentType; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterPublishRequest; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterSubscribeRequest; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest; +import org.onap.dcaegen2.services.sdk.security.ssl.SecurityKeysStore; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.util.StringUtils; + +public class GenericUtils { + + private static final Logger LOGGER = LoggerFactory.getLogger(GenericUtils.class); + + private GenericUtils() {} + + /** + * Creates Message Router subscription request. + * @param topicUrl URL of topic to use + * @param consumerGroup Consumer Group for subscription + * @param consumerId Consumer ID for subscription + * @return request based on provided input + */ + public static MessageRouterSubscribeRequest createSubscribeRequest(String topicUrl, + String consumerGroup, String consumerId) { + var sourceDefinition = ImmutableMessageRouterSource.builder() + .name("Subscriber source") + .topicUrl(topicUrl) + .build(); + + return ImmutableMessageRouterSubscribeRequest + .builder() + .sourceDefinition(sourceDefinition) + .consumerGroup(consumerGroup) + .consumerId(consumerId) + .build(); + } + + /** + * Creates Message Router publish request. + * @param topicUrl URL of topic to use + * @return request based on provided input + */ + public static MessageRouterPublishRequest createPublishRequest(String topicUrl) { + MessageRouterSink sinkDefinition = ImmutableMessageRouterSink.builder() + .topicUrl(topicUrl) + .name("Producer sink") + .build(); + + return ImmutableMessageRouterPublishRequest.builder() + .sinkDefinition(sinkDefinition) + .contentType(ContentType.APPLICATION_JSON) + .build(); + } + + /** + * Creates a security key store for HTTPS. + * @param resource identifying the resource from which we will read security information + * @return store that will be used for HTTPS + */ + @NotNull public static SecurityKeysStore keyStoreFromResource(String resource) { + if (StringUtils.isEmpty(resource)) { + throw new ApplicationEnvironmentException("Resource for security key store is empty"); + } + var path = Paths.get(resource); + LOGGER.info("Reading keys from {}", path.toString()); + return SecurityKeysStore.fromPath(path); + } +} diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/ReRegistrationDmaapConsumerJsonParser.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/ReRegistrationDmaapConsumerJsonParser.java index 9fe0c277..5e249e57 100644 --- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/ReRegistrationDmaapConsumerJsonParser.java +++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/ReRegistrationDmaapConsumerJsonParser.java @@ -41,11 +41,13 @@ import org.onap.bbs.event.processor.model.ImmutableReRegistrationConsumerDmaapMo import org.onap.bbs.event.processor.model.ReRegistrationConsumerDmaapModel; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; import org.springframework.util.StringUtils; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +@Component public class ReRegistrationDmaapConsumerJsonParser { private static final Logger LOGGER = LoggerFactory.getLogger(ReRegistrationDmaapConsumerJsonParser.class); @@ -101,7 +103,7 @@ public class ReRegistrationDmaapConsumerJsonParser { return Mono.empty(); } - JsonObject pnfReRegistrationFields = + var pnfReRegistrationFields = dmaapResponseJsonObject.getAsJsonObject(ADDITIONAL_FIELDS); pnfCorrelationId = getValueFromJson(dmaapResponseJsonObject, CORRELATION_ID); @@ -112,7 +114,7 @@ public class ReRegistrationDmaapConsumerJsonParser { svlan = getValueFromJson(pnfReRegistrationFields, SVLAN); if (StringUtils.isEmpty(pnfCorrelationId) || anyImportantPropertyMissing()) { - String incorrectEvent = dumpJsonData(); + var incorrectEvent = dumpJsonData(); LOGGER.warn("Incorrect Re-Registration JSON event: {}", incorrectEvent); return Mono.empty(); } @@ -148,7 +150,7 @@ public class ReRegistrationDmaapConsumerJsonParser { } Optional<JsonObject> getJsonObjectFromAnArray(JsonElement element) { - JsonParser jsonParser = new JsonParser(); + var jsonParser = new JsonParser(); return element.isJsonPrimitive() ? Optional.of(jsonParser.parse(element.getAsString()).getAsJsonObject()) : Optional.of(jsonParser.parse(element.toString()).getAsJsonObject()); } diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/ReRegistrationJsonBodyBuilder.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/ReRegistrationJsonBodyBuilder.java index 867cfda7..bf90a4c5 100644 --- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/ReRegistrationJsonBodyBuilder.java +++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/ReRegistrationJsonBodyBuilder.java @@ -27,18 +27,16 @@ import java.util.ServiceLoader; import org.onap.bbs.event.processor.model.ImmutableReRegistrationConsumerDmaapModel; import org.onap.bbs.event.processor.model.ReRegistrationConsumerDmaapModel; -import org.onap.dcaegen2.services.sdk.rest.services.model.JsonBodyBuilder; -public class ReRegistrationJsonBodyBuilder implements JsonBodyBuilder<ReRegistrationConsumerDmaapModel> { +public class ReRegistrationJsonBodyBuilder { /** * Serialize the Re-Registration DMaaP model with GSON. * @param reRegistrationConsumerDmaapModel object to be serialized * @return String output of serialization */ - @Override public String createJsonBody(ReRegistrationConsumerDmaapModel reRegistrationConsumerDmaapModel) { - GsonBuilder gsonBuilder = new GsonBuilder(); + var gsonBuilder = new GsonBuilder(); ServiceLoader.load(TypeAdapterFactory.class).forEach(gsonBuilder::registerTypeAdapterFactory); return gsonBuilder.create().toJson(ImmutableReRegistrationConsumerDmaapModel.builder() .correlationId(reRegistrationConsumerDmaapModel.getCorrelationId()) diff --git a/components/bbs-event-processor/src/main/resources/application.yml b/components/bbs-event-processor/src/main/resources/application.yml index 9092adae..ed99643d 100644 --- a/components/bbs-event-processor/src/main/resources/application.yml +++ b/components/bbs-event-processor/src/main/resources/application.yml @@ -9,7 +9,7 @@ configs: re-registration: dmaapHostName: localhost dmaapPortNumber: 2222 - dmaapTopicName: /events/unauthenticated.PNF_Update + dmaapTopicName: unauthenticated.PNF_Update dmaapProtocol: http dmaapContentType: application/json consumerId: c12 @@ -19,7 +19,7 @@ configs: cpe-authentication: dmaapHostName: localhost dmaapPortNumber: 2222 - dmaapTopicName: /events/unauthenticated.CPE_Authentication + dmaapTopicName: unauthenticated.CPE_Authentication dmaapProtocol: http dmaapContentType: application/json consumerId: c12 @@ -29,7 +29,7 @@ configs: producer: dmaapHostName: localhost dmaapPortNumber: 2223 - dmaapTopicName: /events/unauthenticated.DCAE_CL_OUTPUT + dmaapTopicName: unauthenticated.DCAE_CL_OUTPUT dmaapProtocol: http dmaapContentType: application/json aai: @@ -47,14 +47,14 @@ configs: Real-Time: true Content-Type: application/json security: - trustStorePath: change it - trustStorePasswordPath: change it - keyStorePath: change it - keyStorePasswordPath: change it + trustStorePath: /opt/app/bbs-event-processor/etc/cert/trust.jks + trustStorePasswordPath: /opt/app/bbs-event-processor/etc/cert/trust.pass + keyStorePath: /opt/app/bbs-event-processor/etc/cert/cert.jks + keyStorePasswordPath: /opt/app/bbs-event-processor/etc/cert/jks.pass enableAaiCertAuth: false enableDmaapCertAuth: false application: - pipelinesPollingIntervalSec: 30 + pipelinesPollingIntervalSec: 25 pipelinesTimeoutSec: 15 policyVersion: 1.0.0.5 clTargetType: VM diff --git a/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/config/ApplicationConfigurationTest.java b/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/config/ApplicationConfigurationTest.java index 69fbb3f0..2d9e49fd 100644 --- a/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/config/ApplicationConfigurationTest.java +++ b/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/config/ApplicationConfigurationTest.java @@ -37,8 +37,6 @@ import org.onap.bbs.event.processor.model.GeneratedAppConfigObject; import org.onap.bbs.event.processor.model.ImmutableDmaapInfo; import org.onap.bbs.event.processor.model.ImmutableGeneratedAppConfigObject; import org.onap.bbs.event.processor.model.ImmutableStreamsObject; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.boot.test.context.SpringBootTest; @@ -66,6 +64,8 @@ import org.springframework.test.context.TestPropertySource; "configs.aai.client.aaiHeaders.Content-Type=application/merge-patch+json", "configs.dmaap.consumer.re-registration.dmaapHostName=test localhost", "configs.dmaap.consumer.re-registration.dmaapPortNumber=1234", + "configs.dmaap.consumer.re-registration.dmaapUserName=", + "configs.dmaap.consumer.re-registration.dmaapUserPassword=", "configs.dmaap.consumer.re-registration.dmaapTopicName=/events/unauthenticated.PNF_REREGISTRATION", "configs.dmaap.consumer.re-registration.dmaapProtocol=http", "configs.dmaap.consumer.re-registration.dmaapContentType=application/json", @@ -75,6 +75,8 @@ import org.springframework.test.context.TestPropertySource; "configs.dmaap.consumer.re-registration.messageLimit=1", "configs.dmaap.consumer.cpe-authentication.dmaapHostName=test localhost", "configs.dmaap.consumer.cpe-authentication.dmaapPortNumber=1234", + "configs.dmaap.consumer.cpe-authentication.dmaapUserName=", + "configs.dmaap.consumer.cpe-authentication.dmaapUserPassword=", "configs.dmaap.consumer.cpe-authentication.dmaapTopicName=/events/unauthenticated.CPE_AUTHENTICATION", "configs.dmaap.consumer.cpe-authentication.dmaapProtocol=http", "configs.dmaap.consumer.cpe-authentication.dmaapContentType=application/json", @@ -84,13 +86,15 @@ import org.springframework.test.context.TestPropertySource; "configs.dmaap.consumer.cpe-authentication.messageLimit=1", "configs.dmaap.producer.dmaapHostName=test localhost", "configs.dmaap.producer.dmaapPortNumber=1234", + "configs.dmaap.producer.dmaapUserName=", + "configs.dmaap.producer.dmaapUserPassword=", "configs.dmaap.producer.dmaapTopicName=/events/unauthenticated.DCAE_CL_OUTPUT", "configs.dmaap.producer.dmaapProtocol=http", "configs.dmaap.producer.dmaapContentType=application/json", - "configs.security.trustStorePath=test trust store path", - "configs.security.trustStorePasswordPath=test trust store password path", - "configs.security.keyStorePath=test key store path", - "configs.security.keyStorePasswordPath=test key store password path", + "configs.security.trustStorePath=KeyStore.jks", + "configs.security.trustStorePasswordPath=KeyStorePass.txt", + "configs.security.keyStorePath=KeyStore.jks", + "configs.security.keyStorePasswordPath=KeyStorePass.txt", "configs.security.enableDmaapCertAuth=false", "configs.security.enableAaiCertAuth=false", "configs.application.pipelinesPollingIntervalSec=30", @@ -132,7 +136,7 @@ class ApplicationConfigurationTest { @Test void testA_configurationObjectSuccessfullyPopulated() { - AaiClientConfiguration aaiClientConfiguration = configuration.getAaiClientConfiguration(); + var aaiClientConfiguration = configuration.getAaiClientConfiguration(); assertAll("AAI Client Configuration Properties", () -> assertEquals("test localhost", aaiClientConfiguration.aaiHost()), () -> assertEquals(Integer.valueOf(1234), aaiClientConfiguration.aaiPort()), @@ -148,50 +152,64 @@ class ApplicationConfigurationTest { aaiClientConfiguration.aaiHeaders().get("Content-Type")) ); - DmaapConsumerConfiguration dmaapConsumerReRegistrationConfig = - configuration.getDmaapReRegistrationConsumerConfiguration(); assertAll("DMaaP Consumer Re-Registration Configuration Properties", - () -> assertEquals("test localhost", dmaapConsumerReRegistrationConfig.dmaapHostName()), - () -> assertEquals(Integer.valueOf(1234), dmaapConsumerReRegistrationConfig.dmaapPortNumber()), + () -> assertEquals("test localhost", + configuration.getDmaapReRegistrationConsumerProperties().getDmaapHostName()), + () -> assertEquals(1234, + configuration.getDmaapReRegistrationConsumerProperties().getDmaapPortNumber()), () -> assertEquals("/events/unauthenticated.PNF_REREGISTRATION", - dmaapConsumerReRegistrationConfig.dmaapTopicName()), - () -> assertEquals("http", dmaapConsumerReRegistrationConfig.dmaapProtocol()), - () -> assertEquals("", dmaapConsumerReRegistrationConfig.dmaapUserName()), - () -> assertEquals("", dmaapConsumerReRegistrationConfig.dmaapUserPassword()), - () -> assertEquals("application/json", dmaapConsumerReRegistrationConfig.dmaapContentType()), - () -> assertEquals("c12", dmaapConsumerReRegistrationConfig.consumerId()), - () -> assertEquals("OpenDcae-c12", dmaapConsumerReRegistrationConfig.consumerGroup()), - () -> assertEquals(Integer.valueOf(-1), dmaapConsumerReRegistrationConfig.timeoutMs()), - () -> assertEquals(Integer.valueOf(1), dmaapConsumerReRegistrationConfig.messageLimit()) + configuration.getDmaapReRegistrationConsumerProperties().getDmaapTopicName()), + () -> assertEquals("http", + configuration.getDmaapReRegistrationConsumerProperties().getDmaapProtocol()), + () -> assertEquals("", + configuration.getDmaapReRegistrationConsumerProperties().getDmaapUserName()), + () -> assertEquals("", + configuration.getDmaapReRegistrationConsumerProperties().getDmaapUserPassword()), + () -> assertEquals("application/json", + configuration.getDmaapReRegistrationConsumerProperties().getDmaapContentType()), + () -> assertEquals("c12", + configuration.getDmaapReRegistrationConsumerProperties().getConsumerId()), + () -> assertEquals("OpenDcae-c12", + configuration.getDmaapReRegistrationConsumerProperties().getConsumerGroup()), + () -> assertEquals(-1, configuration.getDmaapReRegistrationConsumerProperties().getTimeoutMs()), + () -> assertEquals(1, configuration.getDmaapReRegistrationConsumerProperties().getMessageLimit()) ); - DmaapConsumerConfiguration dmaapConsumerCpeAuthenticationConfig = - configuration.getDmaapCpeAuthenticationConsumerConfiguration(); assertAll("DMaaP Consumer CPE Authentication Configuration Properties", - () -> assertEquals("test localhost", dmaapConsumerCpeAuthenticationConfig.dmaapHostName()), - () -> assertEquals(Integer.valueOf(1234), dmaapConsumerCpeAuthenticationConfig.dmaapPortNumber()), + () -> assertEquals("test localhost", + configuration.getDmaapCpeAuthenticationConsumerProperties().getDmaapHostName()), + () -> assertEquals(1234, + configuration.getDmaapCpeAuthenticationConsumerProperties().getDmaapPortNumber()), () -> assertEquals("/events/unauthenticated.CPE_AUTHENTICATION", - dmaapConsumerCpeAuthenticationConfig.dmaapTopicName()), - () -> assertEquals("http", dmaapConsumerCpeAuthenticationConfig.dmaapProtocol()), - () -> assertEquals("", dmaapConsumerCpeAuthenticationConfig.dmaapUserName()), - () -> assertEquals("", dmaapConsumerCpeAuthenticationConfig.dmaapUserPassword()), - () -> assertEquals("application/json", dmaapConsumerCpeAuthenticationConfig.dmaapContentType()), - () -> assertEquals("c12", dmaapConsumerCpeAuthenticationConfig.consumerId()), - () -> assertEquals("OpenDcae-c12", dmaapConsumerCpeAuthenticationConfig.consumerGroup()), - () -> assertEquals(Integer.valueOf(-1), dmaapConsumerCpeAuthenticationConfig.timeoutMs()), - () -> assertEquals(Integer.valueOf(1), dmaapConsumerCpeAuthenticationConfig.messageLimit()) + configuration.getDmaapCpeAuthenticationConsumerProperties().getDmaapTopicName()), + () -> assertEquals("http", + configuration.getDmaapCpeAuthenticationConsumerProperties().getDmaapProtocol()), + () -> assertEquals("", + configuration.getDmaapCpeAuthenticationConsumerProperties().getDmaapUserName()), + () -> assertEquals("", + configuration.getDmaapCpeAuthenticationConsumerProperties().getDmaapUserPassword()), + () -> assertEquals("application/json", + configuration.getDmaapCpeAuthenticationConsumerProperties().getDmaapContentType()), + () -> assertEquals("c12", + configuration.getDmaapCpeAuthenticationConsumerProperties().getConsumerId()), + () -> assertEquals("OpenDcae-c12", + configuration.getDmaapCpeAuthenticationConsumerProperties().getConsumerGroup()), + () -> assertEquals(-1, configuration.getDmaapCpeAuthenticationConsumerProperties().getTimeoutMs()), + () -> assertEquals(1, + configuration.getDmaapCpeAuthenticationConsumerProperties().getMessageLimit()) ); - DmaapPublisherConfiguration dmaapPublisherConfiguration = configuration.getDmaapPublisherConfiguration(); assertAll("DMaaP Publisher Configuration Properties", - () -> assertEquals("test localhost", dmaapPublisherConfiguration.dmaapHostName()), - () -> assertEquals(Integer.valueOf(1234), dmaapPublisherConfiguration.dmaapPortNumber()), + () -> assertEquals("test localhost", + configuration.getDmaapProducerProperties().getDmaapHostName()), + () -> assertEquals(1234, configuration.getDmaapProducerProperties().getDmaapPortNumber()), () -> assertEquals("/events/unauthenticated.DCAE_CL_OUTPUT", - dmaapPublisherConfiguration.dmaapTopicName()), - () -> assertEquals("http", dmaapPublisherConfiguration.dmaapProtocol()), - () -> assertEquals("", dmaapPublisherConfiguration.dmaapUserName()), - () -> assertEquals("", dmaapPublisherConfiguration.dmaapUserPassword()), - () -> assertEquals("application/json", dmaapPublisherConfiguration.dmaapContentType()) + configuration.getDmaapProducerProperties().getDmaapTopicName()), + () -> assertEquals("http", configuration.getDmaapProducerProperties().getDmaapProtocol()), + () -> assertEquals("", configuration.getDmaapProducerProperties().getDmaapUserName()), + () -> assertEquals("", configuration.getDmaapProducerProperties().getDmaapUserPassword()), + () -> assertEquals("application/json", + configuration.getDmaapProducerProperties().getDmaapContentType()) ); assertAll("Generic Application Properties", @@ -211,12 +229,11 @@ class ApplicationConfigurationTest { assertAll("Security Application Properties", () -> assertFalse(aaiClientConfiguration.enableAaiCertAuth()), - () -> assertFalse(dmaapConsumerReRegistrationConfig.enableDmaapCertAuth()), - () -> assertEquals("test key store path", aaiClientConfiguration.keyStorePath()), - () -> assertEquals("test key store password path", + () -> assertEquals("KeyStore.jks", aaiClientConfiguration.keyStorePath()), + () -> assertEquals("KeyStorePass.txt", aaiClientConfiguration.keyStorePasswordPath()), - () -> assertEquals("test trust store path", aaiClientConfiguration.trustStorePath()), - () -> assertEquals("test trust store password path", + () -> assertEquals("KeyStore.jks", aaiClientConfiguration.trustStorePath()), + () -> assertEquals("KeyStorePass.txt", aaiClientConfiguration.trustStorePasswordPath()) ); } @@ -298,10 +315,10 @@ class ApplicationConfigurationTest { .cpeAuthConfigKey("config_key_2") .closeLoopConfigKey("config_key_3") .loggingLevel("TRACE") - .keyStorePath("test key store path - update") - .keyStorePasswordPath("test key store password path - update") - .trustStorePath("test trust store path - update") - .trustStorePasswordPath("test trust store password path - update") + .keyStorePath("KeyStore-update.jks") + .keyStorePasswordPath("KeyStorePass-update.txt") + .trustStorePath("KeyStore-update.jks") + .trustStorePasswordPath("KeyStorePass-update.txt") .enableAaiCertAuth(true) .enableDmaapCertAuth(true) .streamSubscribesMap(subscribes) @@ -311,7 +328,7 @@ class ApplicationConfigurationTest { // Update the configuration configuration.updateCurrentConfiguration(updatedConfiguration); - AaiClientConfiguration aaiClientConfiguration = configuration.getAaiClientConfiguration(); + var aaiClientConfiguration = configuration.getAaiClientConfiguration(); assertAll("AAI Client Configuration Properties", () -> assertEquals("aai.onap.svc.cluster.local", aaiClientConfiguration.aaiHost()), () -> assertEquals(Integer.valueOf(8443), aaiClientConfiguration.aaiPort()), @@ -327,50 +344,65 @@ class ApplicationConfigurationTest { aaiClientConfiguration.aaiHeaders().get("Content-Type")) ); - DmaapConsumerConfiguration dmaapConsumerReRegistrationConfig = - configuration.getDmaapReRegistrationConsumerConfiguration(); assertAll("DMaaP Consumer Re-Registration Configuration Properties", - () -> assertEquals("we-are-message-router1.us", dmaapConsumerReRegistrationConfig.dmaapHostName()), - () -> assertEquals(Integer.valueOf(3901), dmaapConsumerReRegistrationConfig.dmaapPortNumber()), - () -> assertEquals("events/unauthenticated.PNF_UPDATE", - dmaapConsumerReRegistrationConfig.dmaapTopicName()), - () -> assertEquals("https", dmaapConsumerReRegistrationConfig.dmaapProtocol()), - () -> assertEquals("some-user", dmaapConsumerReRegistrationConfig.dmaapUserName()), - () -> assertEquals("some-password", dmaapConsumerReRegistrationConfig.dmaapUserPassword()), - () -> assertEquals("application/json", dmaapConsumerReRegistrationConfig.dmaapContentType()), - () -> assertEquals("c13", dmaapConsumerReRegistrationConfig.consumerId()), - () -> assertEquals("OpenDcae-c13", dmaapConsumerReRegistrationConfig.consumerGroup()), - () -> assertEquals(Integer.valueOf(5), dmaapConsumerReRegistrationConfig.timeoutMs()), - () -> assertEquals(Integer.valueOf(10), dmaapConsumerReRegistrationConfig.messageLimit()) + () -> assertEquals("we-are-message-router1.us", + configuration.getDmaapReRegistrationConsumerProperties().getDmaapHostName()), + () -> assertEquals(3901, + configuration.getDmaapReRegistrationConsumerProperties().getDmaapPortNumber()), + () -> assertEquals("unauthenticated.PNF_UPDATE", + configuration.getDmaapReRegistrationConsumerProperties().getDmaapTopicName()), + () -> assertEquals("https", + configuration.getDmaapReRegistrationConsumerProperties().getDmaapProtocol()), + () -> assertEquals("some-user", + configuration.getDmaapReRegistrationConsumerProperties().getDmaapUserName()), + () -> assertEquals("some-password", + configuration.getDmaapReRegistrationConsumerProperties().getDmaapUserPassword()), + () -> assertEquals("application/json", + configuration.getDmaapReRegistrationConsumerProperties().getDmaapContentType()), + () -> assertEquals("c13", + configuration.getDmaapReRegistrationConsumerProperties().getConsumerId()), + () -> assertEquals("OpenDcae-c13", + configuration.getDmaapReRegistrationConsumerProperties().getConsumerGroup()), + () -> assertEquals(5, configuration.getDmaapReRegistrationConsumerProperties().getTimeoutMs()), + () -> assertEquals(10, configuration.getDmaapReRegistrationConsumerProperties().getMessageLimit()) ); - DmaapConsumerConfiguration dmaapConsumerCpeAuthenticationConfig = - configuration.getDmaapCpeAuthenticationConsumerConfiguration(); assertAll("DMaaP Consumer CPE Authentication Configuration Properties", - () -> assertEquals("we-are-message-router2.us", dmaapConsumerCpeAuthenticationConfig.dmaapHostName()), - () -> assertEquals(Integer.valueOf(3902), dmaapConsumerCpeAuthenticationConfig.dmaapPortNumber()), - () -> assertEquals("events/unauthenticated.CPE_AUTHENTICATION", - dmaapConsumerCpeAuthenticationConfig.dmaapTopicName()), - () -> assertEquals("https", dmaapConsumerCpeAuthenticationConfig.dmaapProtocol()), - () -> assertEquals("some-user", dmaapConsumerCpeAuthenticationConfig.dmaapUserName()), - () -> assertEquals("some-password", dmaapConsumerCpeAuthenticationConfig.dmaapUserPassword()), - () -> assertEquals("application/json", dmaapConsumerCpeAuthenticationConfig.dmaapContentType()), - () -> assertEquals("c13", dmaapConsumerCpeAuthenticationConfig.consumerId()), - () -> assertEquals("OpenDcae-c13", dmaapConsumerCpeAuthenticationConfig.consumerGroup()), - () -> assertEquals(Integer.valueOf(5), dmaapConsumerCpeAuthenticationConfig.timeoutMs()), - () -> assertEquals(Integer.valueOf(10), dmaapConsumerCpeAuthenticationConfig.messageLimit()) + () -> assertEquals("we-are-message-router2.us", + configuration.getDmaapCpeAuthenticationConsumerProperties().getDmaapHostName()), + () -> assertEquals(3902, + configuration.getDmaapCpeAuthenticationConsumerProperties().getDmaapPortNumber()), + () -> assertEquals("unauthenticated.CPE_AUTHENTICATION", + configuration.getDmaapCpeAuthenticationConsumerProperties().getDmaapTopicName()), + () -> assertEquals("https", + configuration.getDmaapCpeAuthenticationConsumerProperties().getDmaapProtocol()), + () -> assertEquals("some-user", + configuration.getDmaapCpeAuthenticationConsumerProperties().getDmaapUserName()), + () -> assertEquals("some-password", + configuration.getDmaapCpeAuthenticationConsumerProperties().getDmaapUserPassword()), + () -> assertEquals("application/json", + configuration.getDmaapCpeAuthenticationConsumerProperties().getDmaapContentType()), + () -> assertEquals("c13", + configuration.getDmaapCpeAuthenticationConsumerProperties().getConsumerId()), + () -> assertEquals("OpenDcae-c13", + configuration.getDmaapCpeAuthenticationConsumerProperties().getConsumerGroup()), + () -> assertEquals(5, configuration.getDmaapCpeAuthenticationConsumerProperties().getTimeoutMs()), + () -> assertEquals(10, + configuration.getDmaapCpeAuthenticationConsumerProperties().getMessageLimit()) ); - DmaapPublisherConfiguration dmaapPublisherConfiguration = configuration.getDmaapPublisherConfiguration(); assertAll("DMaaP Publisher Configuration Properties", - () -> assertEquals("we-are-message-router3.us", dmaapPublisherConfiguration.dmaapHostName()), - () -> assertEquals(Integer.valueOf(3903), dmaapPublisherConfiguration.dmaapPortNumber()), - () -> assertEquals("events/unauthenticated.DCAE_CL_OUTPUT", - dmaapPublisherConfiguration.dmaapTopicName()), - () -> assertEquals("https", dmaapPublisherConfiguration.dmaapProtocol()), - () -> assertEquals("some-user", dmaapPublisherConfiguration.dmaapUserName()), - () -> assertEquals("some-password", dmaapPublisherConfiguration.dmaapUserPassword()), - () -> assertEquals("application/json", dmaapPublisherConfiguration.dmaapContentType()) + () -> assertEquals("we-are-message-router3.us", + configuration.getDmaapProducerProperties().getDmaapHostName()), + () -> assertEquals(3903, configuration.getDmaapProducerProperties().getDmaapPortNumber()), + () -> assertEquals("unauthenticated.DCAE_CL_OUTPUT", + configuration.getDmaapProducerProperties().getDmaapTopicName()), + () -> assertEquals("https", configuration.getDmaapProducerProperties().getDmaapProtocol()), + () -> assertEquals("some-user", configuration.getDmaapProducerProperties().getDmaapUserName()), + () -> assertEquals("some-password", + configuration.getDmaapProducerProperties().getDmaapUserPassword()), + () -> assertEquals("application/json", + configuration.getDmaapProducerProperties().getDmaapContentType()) ); assertAll("Generic Application Properties", @@ -391,12 +423,11 @@ class ApplicationConfigurationTest { assertAll("Security Application Properties", () -> assertTrue(aaiClientConfiguration.enableAaiCertAuth()), - () -> assertTrue(dmaapConsumerReRegistrationConfig.enableDmaapCertAuth()), - () -> assertEquals("test key store path - update", aaiClientConfiguration.keyStorePath()), - () -> assertEquals("test key store password path - update", + () -> assertEquals("KeyStore-update.jks", aaiClientConfiguration.keyStorePath()), + () -> assertEquals("KeyStorePass-update.txt", aaiClientConfiguration.keyStorePasswordPath()), - () -> assertEquals("test trust store path - update", aaiClientConfiguration.trustStorePath()), - () -> assertEquals("test trust store password path - update", + () -> assertEquals("KeyStore-update.jks", aaiClientConfiguration.trustStorePath()), + () -> assertEquals("KeyStorePass-update.txt", aaiClientConfiguration.trustStorePasswordPath()) ); } diff --git a/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/config/ConsulConfigurationGatewayTest.java b/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/config/ConsulConfigurationGatewayTest.java index 1acf864d..1d1bce2a 100644 --- a/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/config/ConsulConfigurationGatewayTest.java +++ b/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/config/ConsulConfigurationGatewayTest.java @@ -48,13 +48,13 @@ class ConsulConfigurationGatewayTest { } ConsulConfigurationGatewayTest() { - ApplicationConfiguration configuration = Mockito.mock(ApplicationConfiguration.class); + var configuration = Mockito.mock(ApplicationConfiguration.class); this.configurationGateway = new ConsulConfigurationGateway(configuration); } @Test void passingValidJson_constructsGeneratedAppConfigObject() { - final String validJson = "{" + final var validJson = "{" + "\"dmaap.protocol\": \"http\"," + "\"dmaap.contentType\": \"application/json\"," + "\"dmaap.consumer.consumerId\": \"c12\"," @@ -219,7 +219,7 @@ class ConsulConfigurationGatewayTest { .streamPublishesMap(Collections.singletonMap("config_key_3", streamsObject3)) .build(); - ConsulConfigurationGateway spiedGateway = Mockito.spy(configurationGateway); + var spiedGateway = Mockito.spy(configurationGateway); doReturn(false).when(spiedGateway).environmentNotReady(); assertEquals(expectedConfiguration, spiedGateway.generateAppConfigObject(jsonParser.parse(validJson).getAsJsonObject())); diff --git a/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/controllers/BbsEventProcessorControllerTest.java b/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/controllers/BbsEventProcessorControllerTest.java index bacb6c3e..8cc7d5cc 100644 --- a/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/controllers/BbsEventProcessorControllerTest.java +++ b/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/controllers/BbsEventProcessorControllerTest.java @@ -42,7 +42,6 @@ import org.springframework.boot.test.autoconfigure.web.servlet.WebMvcTest; import org.springframework.boot.test.context.TestConfiguration; import org.springframework.context.annotation.Bean; import org.springframework.test.web.servlet.MockMvc; -import org.springframework.test.web.servlet.MvcResult; @WebMvcTest(BbsEventProcessorController.class) @DisplayName("BBS Event Processor Controllers MVC Unit-Tests") @@ -67,7 +66,7 @@ class BbsEventProcessorControllerTest { @Test void sendingHeartBeatRestCall_RespondsWithAlive() throws Exception { - MvcResult heartBeatResult = mockMvc.perform(get("/heartbeat")).andReturn(); + var heartBeatResult = mockMvc.perform(get("/heartbeat")).andReturn(); mockMvc.perform(asyncDispatch(heartBeatResult)) .andExpect(status().isOk()) @@ -76,7 +75,7 @@ class BbsEventProcessorControllerTest { @Test void sendingReRegistrationSubmissionRestCall_RespondsWithOk() throws Exception { - MvcResult reregistrationSubmissionResult = mockMvc.perform(post("/poll-reregistration-events")).andReturn(); + var reregistrationSubmissionResult = mockMvc.perform(post("/poll-reregistration-events")).andReturn(); mockMvc.perform(asyncDispatch(reregistrationSubmissionResult)) .andExpect(status().isOk()) @@ -86,7 +85,7 @@ class BbsEventProcessorControllerTest { @Test void sendingCpeAuthenticationSubmissionRestCall_RespondsWithOk() throws Exception { - MvcResult reregistrationSubmissionResult = mockMvc.perform(post("/poll-cpe-authentication-events")).andReturn(); + var reregistrationSubmissionResult = mockMvc.perform(post("/poll-cpe-authentication-events")).andReturn(); mockMvc.perform(asyncDispatch(reregistrationSubmissionResult)) .andExpect(status().isOk()) @@ -97,7 +96,7 @@ class BbsEventProcessorControllerTest { @Test void sendingStartTasksRestCall_ifItSucceeds_RespondsWithOk() throws Exception { when(scheduler.reScheduleProcessingTasks()).thenReturn(true); - MvcResult startTasksResult = mockMvc.perform(post("/start-tasks")).andReturn(); + var startTasksResult = mockMvc.perform(post("/start-tasks")).andReturn(); mockMvc.perform(asyncDispatch(startTasksResult)) .andExpect(status().isOk()) @@ -108,7 +107,7 @@ class BbsEventProcessorControllerTest { @Test void sendingStartTasksRestCall_ifItFails_RespondsWithNotAcceptable() throws Exception { when(scheduler.reScheduleProcessingTasks()).thenReturn(false); - MvcResult startTasksResult = mockMvc.perform(post("/start-tasks")).andReturn(); + var startTasksResult = mockMvc.perform(post("/start-tasks")).andReturn(); mockMvc.perform(asyncDispatch(startTasksResult)) .andExpect(status().isNotAcceptable()) @@ -119,7 +118,7 @@ class BbsEventProcessorControllerTest { @Test void sendingCancelTasksRestCall_ifItSucceeds_RespondsWithOk() throws Exception { when(scheduler.cancelScheduledProcessingTasks()).thenReturn(true); - MvcResult cancellationResult = mockMvc.perform(post("/cancel-tasks")).andReturn(); + var cancellationResult = mockMvc.perform(post("/cancel-tasks")).andReturn(); mockMvc.perform(asyncDispatch(cancellationResult)) .andExpect(status().isOk()) @@ -130,7 +129,7 @@ class BbsEventProcessorControllerTest { @Test void sendingCancelTasksRestCall_ifItFails_RespondsWithNotAcceptable() throws Exception { when(scheduler.cancelScheduledProcessingTasks()).thenReturn(false); - MvcResult cancellationResult = mockMvc.perform(post("/cancel-tasks")).andReturn(); + var cancellationResult = mockMvc.perform(post("/cancel-tasks")).andReturn(); mockMvc.perform(asyncDispatch(cancellationResult)) .andExpect(status().isNotAcceptable()) diff --git a/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/model/GsonSerializationsTest.java b/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/model/GsonSerializationsTest.java index fd43b8be..30cc4ab8 100644 --- a/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/model/GsonSerializationsTest.java +++ b/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/model/GsonSerializationsTest.java @@ -22,7 +22,6 @@ package org.onap.bbs.event.processor.model; import static org.junit.jupiter.api.Assertions.assertEquals; -import com.google.gson.Gson; import com.google.gson.GsonBuilder; import com.google.gson.TypeAdapterFactory; @@ -42,13 +41,13 @@ class GsonSerializationsTest { @Test void creatingReRegistrationJsonBody_returnsJsonInString() { - String correlationId = "NokiaCorrelationId"; - String attachmentPoint = "olt2/1/1"; - String remoteId = "RemoteId"; - String cvlan = "1005"; - String svlan = "100"; + var correlationId = "NokiaCorrelationId"; + var attachmentPoint = "olt2/1/1"; + var remoteId = "RemoteId"; + var cvlan = "1005"; + var svlan = "100"; - String template = "{" + var template = "{" + "\"correlationId\":\"%s\"," + "\"attachment-point\":\"%s\"," + "\"remote-id\":\"%s\"," @@ -65,7 +64,7 @@ class GsonSerializationsTest { .build(); - String expectedResult = String.format(template, correlationId, attachmentPoint, remoteId, cvlan, svlan); + var expectedResult = String.format(template, correlationId, attachmentPoint, remoteId, cvlan, svlan); assertEquals(expectedResult, new ReRegistrationJsonBodyBuilder().createJsonBody(model)); } @@ -73,14 +72,14 @@ class GsonSerializationsTest { @Test void creatingCpeAuthenticationJsonBody_returnsJsonInString() { - String correlationId = "NokiaCorrelationID"; - AuthenticationState oldAuthenticationState = AuthenticationState.IN_SERVICE; - AuthenticationState newAuthenticationState = AuthenticationState.OUT_OF_SERVICE; - String stateInterface = "stateInterface"; - String rgwMacAddress = "00:0a:95:8d:78:16"; - String swVersion = "1.2"; + var correlationId = "NokiaCorrelationID"; + var oldAuthenticationState = AuthenticationState.IN_SERVICE; + var newAuthenticationState = AuthenticationState.OUT_OF_SERVICE; + var stateInterface = "stateInterface"; + var rgwMacAddress = "00:0a:95:8d:78:16"; + var swVersion = "1.2"; - String template = "{" + var template = "{" + "\"correlationId\":\"%s\"," + "\"old-authentication-state\":\"%s\"," + "\"new-authentication-state\":\"%s\"," @@ -99,7 +98,7 @@ class GsonSerializationsTest { .build(); - String expectedResult = String.format(template, correlationId, oldAuthenticationState.getNameInOnap(), + var expectedResult = String.format(template, correlationId, oldAuthenticationState.getNameInOnap(), newAuthenticationState.getNameInOnap(), stateInterface, rgwMacAddress, swVersion); assertEquals(expectedResult, new CpeAuthenticationJsonBodyBuilder().createJsonBody(model)); @@ -108,27 +107,27 @@ class GsonSerializationsTest { @Test void creatingDcaeControlLoopJsonBody_returnsJsonInString() { - String closedLoopEventClient = "DCAE.BBS_mSInstance"; - String policyVersion = "1.0.0.5"; - String policyName = "CPE_Authentication"; - String policyScope = + var closedLoopEventClient = "DCAE.BBS_mSInstance"; + var policyVersion = "1.0.0.5"; + var policyName = "CPE_Authentication"; + var policyScope = "service=HSIAService,type=SampleType," + "closedLoopControlName=CL-CPE_A-d925ed73-8231-4d02-9545-db4e101f88f8"; - String targetType = "VM"; - long closedLoopAlarmStart = 1484677482204798L; - String closedLoopEventStatus = "ONSET"; - String closedLoopControlName = "ControlLoop-CPE_A-2179b738-fd36-4843-a71a-a8c24c70c88b"; - String version = "1.0.2"; - String target = "vserver.vserver-name"; - String requestId = "97964e10-686e-4790-8c45-bdfa61df770f"; - String from = "DCAE"; + var targetType = "VM"; + var closedLoopAlarmStart = 1484677482204798L; + var closedLoopEventStatus = "ONSET"; + var closedLoopControlName = "ControlLoop-CPE_A-2179b738-fd36-4843-a71a-a8c24c70c88b"; + var version = "1.0.2"; + var target = "vserver.vserver-name"; + var requestId = "97964e10-686e-4790-8c45-bdfa61df770f"; + var from = "DCAE"; Map<String, String> aaiEnrichmentData = new LinkedHashMap<>(); aaiEnrichmentData.put("service-information.service-instance-id", "service-instance-id-example"); aaiEnrichmentData.put("cvlan-id", "example cvlan-id"); aaiEnrichmentData.put("svlan-id", "example svlan-id"); - String template = "{" + var template = "{" + "\"closedLoopEventClient\":\"%s\"," + "\"policyVersion\":\"%s\"," + "\"policyName\":\"%s\"," @@ -165,7 +164,7 @@ class GsonSerializationsTest { .originator(from) .build(); - String expectedResult = String.format(template, + var expectedResult = String.format(template, closedLoopEventClient, policyVersion, policyName, @@ -185,14 +184,14 @@ class GsonSerializationsTest { @Test void pnfAaiObject_IsSerializedSuccessfully() { - GsonBuilder gsonBuilder = new GsonBuilder(); + var gsonBuilder = new GsonBuilder(); ServiceLoader.load(TypeAdapterFactory.class).forEach(gsonBuilder::registerTypeAdapterFactory); - Gson gson = gsonBuilder.create(); + var gson = gsonBuilder.create(); - String pnfName = "NokiaCorrelationID"; - String swVersion = "1.2"; + var pnfName = "NokiaCorrelationID"; + var swVersion = "1.2"; - String template = "{" + var template = "{" + "\"pnf-name\":\"%s\"," + "\"in-maint\":true," + "\"sw-version\":\"%s\"," @@ -288,7 +287,7 @@ class GsonSerializationsTest { .build(); - String jsonPnfObject = String.format(template, pnfName, swVersion); + var jsonPnfObject = String.format(template, pnfName, swVersion); assertEquals(jsonPnfObject, gson.toJson(pnfAaiObject)); assertEquals(pnfAaiObject, gson.fromJson(jsonPnfObject, ImmutablePnfAaiObject.class)); @@ -297,14 +296,14 @@ class GsonSerializationsTest { @Test void serviceInstanceAaiObject_IsSerializedSuccessfully() { - GsonBuilder gsonBuilder = new GsonBuilder(); + var gsonBuilder = new GsonBuilder(); ServiceLoader.load(TypeAdapterFactory.class).forEach(gsonBuilder::registerTypeAdapterFactory); - Gson gson = gsonBuilder.create(); + var gson = gsonBuilder.create(); - String serviceInstanceId = "84003b26-6b76-4c75-b805-7b14ab4ffaef"; - String orchestrationStatus = "active"; + var serviceInstanceId = "84003b26-6b76-4c75-b805-7b14ab4ffaef"; + var orchestrationStatus = "active"; - String template = "{" + var template = "{" + "\"service-instance-id\":\"%s\"," + "\"orchestration-status\":\"%s\"," + "\"relationship-list\":{" @@ -370,7 +369,7 @@ class GsonSerializationsTest { .build(); - String jsonServiceInstanceObject = String.format(template, serviceInstanceId, orchestrationStatus); + var jsonServiceInstanceObject = String.format(template, serviceInstanceId, orchestrationStatus); assertEquals(jsonServiceInstanceObject, gson.toJson(serviceInstanceAaiObject)); assertEquals(serviceInstanceAaiObject, gson.fromJson(jsonServiceInstanceObject, diff --git a/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/pipelines/CpeAuthenticationPipelineTest.java b/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/pipelines/CpeAuthenticationPipelineTest.java index c4bef9df..50fcdefb 100644 --- a/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/pipelines/CpeAuthenticationPipelineTest.java +++ b/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/pipelines/CpeAuthenticationPipelineTest.java @@ -20,7 +20,7 @@ package org.onap.bbs.event.processor.pipelines; -import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.times; @@ -64,8 +64,7 @@ import org.onap.bbs.event.processor.model.ServiceInstanceAaiObject; import org.onap.bbs.event.processor.tasks.AaiClientTask; import org.onap.bbs.event.processor.tasks.DmaapCpeAuthenticationConsumerTask; import org.onap.bbs.event.processor.tasks.DmaapPublisherTask; -import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpResponse; -import org.springframework.http.HttpStatus; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -80,12 +79,12 @@ class CpeAuthenticationPipelineTest { private DmaapPublisherTask publisherTask; private AaiClientTask aaiClientTask; - private HttpResponse httpResponse; + private MessageRouterPublishResponse publishResponse; @BeforeEach void setup() { - httpResponse = Mockito.mock(HttpResponse.class); + publishResponse = Mockito.mock(MessageRouterPublishResponse.class); configuration = Mockito.mock(ApplicationConfiguration.class); consumerTask = Mockito.mock(DmaapCpeAuthenticationConsumerTask.class); @@ -148,12 +147,12 @@ class CpeAuthenticationPipelineTest { @Test void noResponseFromAai_PipelineTimesOut() throws SSLException { - String pnfName = "olt1"; - final String oldAuthenticationState = "outOfService"; - final String newAuthenticationState = "inService"; - final String stateInterface = "stateInterface"; - final String rgwMacAddress = "00:0a:95:8d:78:16"; - final String swVersion = "1.2"; + var pnfName = "olt1"; + var oldAuthenticationState = "outOfService"; + var newAuthenticationState = "inService"; + var stateInterface = "stateInterface"; + var rgwMacAddress = "00:0a:95:8d:78:16"; + var swVersion = "1.2"; // Prepare stubbed replies CpeAuthenticationConsumerDmaapModel event = ImmutableCpeAuthenticationConsumerDmaapModel.builder() @@ -181,13 +180,13 @@ class CpeAuthenticationPipelineTest { @Test void noResponseWhilePublishing_PipelineTimesOut() throws SSLException { - String pnfName = "olt1"; - final String oldAuthenticationState = "outOfService"; - final String newAuthenticationState = "inService"; - final String stateInterface = "stateInterface"; - final String rgwMacAddress = "00:0a:95:8d:78:16"; - final String swVersion = "1.2"; - String hsiCfsServiceInstanceId = UUID.randomUUID().toString(); + var pnfName = "olt1"; + var oldAuthenticationState = "outOfService"; + var newAuthenticationState = "inService"; + var stateInterface = "stateInterface"; + var rgwMacAddress = "00:0a:95:8d:78:16"; + var swVersion = "1.2"; + var hsiCfsServiceInstanceId = UUID.randomUUID().toString(); // Prepare stubbed replies CpeAuthenticationConsumerDmaapModel event = ImmutableCpeAuthenticationConsumerDmaapModel.builder() @@ -199,12 +198,12 @@ class CpeAuthenticationPipelineTest { .swVersion(swVersion) .build(); - PnfAaiObject pnfAaiObject = constructPnfObject(pnfName, hsiCfsServiceInstanceId); - ServiceInstanceAaiObject hsiCfsServiceInstance = + var pnfAaiObject = constructPnfObject(pnfName, hsiCfsServiceInstanceId); + var hsiCfsServiceInstance = constructHsiCfsServiceInstanceObject(hsiCfsServiceInstanceId, pnfName, rgwMacAddress); // Prepare Mocks - String cfsUrl = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all", + var cfsUrl = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all", hsiCfsServiceInstance.getServiceInstanceId()); when(configuration.getPipelinesTimeoutInSeconds()).thenReturn(1); @@ -217,7 +216,7 @@ class CpeAuthenticationPipelineTest { .executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, cfsUrl)) .thenReturn(Mono.just(hsiCfsServiceInstance)); - when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.never()); + when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Flux.never()); // Execute the pipeline StepVerifier.create(pipeline.executePipeline()) @@ -230,13 +229,13 @@ class CpeAuthenticationPipelineTest { @Test void singleCorrectEvent_handleSuccessfully() throws SSLException { - String pnfName = "olt1"; - final String oldAuthenticationState = "outOfService"; - final String newAuthenticationState = "inService"; - final String stateInterface = "stateInterface"; - final String rgwMacAddress = "00:0a:95:8d:78:16"; - final String swVersion = "1.2"; - String hsiCfsServiceInstanceId = UUID.randomUUID().toString(); + var pnfName = "olt1"; + var oldAuthenticationState = "outOfService"; + var newAuthenticationState = "inService"; + var stateInterface = "stateInterface"; + var rgwMacAddress = "00:0a:95:8d:78:16"; + var swVersion = "1.2"; + var hsiCfsServiceInstanceId = UUID.randomUUID().toString(); // Prepare stubbed replies CpeAuthenticationConsumerDmaapModel event = ImmutableCpeAuthenticationConsumerDmaapModel.builder() @@ -248,12 +247,12 @@ class CpeAuthenticationPipelineTest { .swVersion(swVersion) .build(); - PnfAaiObject pnfAaiObject = constructPnfObject(pnfName, hsiCfsServiceInstanceId); - ServiceInstanceAaiObject hsiCfsServiceInstance = + var pnfAaiObject = constructPnfObject(pnfName, hsiCfsServiceInstanceId); + var hsiCfsServiceInstance = constructHsiCfsServiceInstanceObject(hsiCfsServiceInstanceId, pnfName, rgwMacAddress); // Prepare Mocks - String cfsUrl = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all", + var cfsUrl = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all", hsiCfsServiceInstance.getServiceInstanceId()); when(configuration.getPipelinesTimeoutInSeconds()).thenReturn(10); @@ -266,13 +265,13 @@ class CpeAuthenticationPipelineTest { .executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, cfsUrl)) .thenReturn(Mono.just(hsiCfsServiceInstance)); - when(httpResponse.statusCode()).thenReturn(HttpStatus.OK.value()); - when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(httpResponse)); + when(publishResponse.successful()).thenReturn(true); + when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Flux.just(publishResponse)); // Execute the pipeline StepVerifier.create(pipeline.executePipeline()) .expectSubscription() - .assertNext(r -> assertEquals(HttpStatus.OK.value(), r.statusCode())) + .assertNext(r -> assertTrue(r.successful())) .verifyComplete(); verify(publisherTask).execute(any(ControlLoopPublisherDmaapModel.class)); @@ -281,16 +280,16 @@ class CpeAuthenticationPipelineTest { @Test void twoCorrectEvents_handleSuccessfully() throws SSLException { - String pnfName1 = "olt1"; - String pnfName2 = "olt2"; - final String oldAuthenticationState = "outOfService"; - final String newAuthenticationState = "inService"; - final String stateInterface = "stateInterface"; - final String rgwMacAddress1 = "00:0a:95:8d:78:16"; - final String rgwMacAddress2 = "00:0a:95:8d:78:17"; - final String swVersion = "1.2"; - String hsiCfsServiceInstanceId1 = UUID.randomUUID().toString(); - String hsiCfsServiceInstanceId2 = UUID.randomUUID().toString(); + var pnfName1 = "olt1"; + var pnfName2 = "olt2"; + var oldAuthenticationState = "outOfService"; + var newAuthenticationState = "inService"; + var stateInterface = "stateInterface"; + var rgwMacAddress1 = "00:0a:95:8d:78:16"; + var rgwMacAddress2 = "00:0a:95:8d:78:17"; + var swVersion = "1.2"; + var hsiCfsServiceInstanceId1 = UUID.randomUUID().toString(); + var hsiCfsServiceInstanceId2 = UUID.randomUUID().toString(); // Prepare stubbed replies CpeAuthenticationConsumerDmaapModel firstEvent = ImmutableCpeAuthenticationConsumerDmaapModel.builder() @@ -310,19 +309,19 @@ class CpeAuthenticationPipelineTest { .swVersion(swVersion) .build(); - PnfAaiObject pnfAaiObject1 = constructPnfObject(pnfName1, hsiCfsServiceInstanceId1); - PnfAaiObject pnfAaiObject2 = constructPnfObject(pnfName2, hsiCfsServiceInstanceId2); - ServiceInstanceAaiObject hsiCfsServiceInstance1 = + var pnfAaiObject1 = constructPnfObject(pnfName1, hsiCfsServiceInstanceId1); + var pnfAaiObject2 = constructPnfObject(pnfName2, hsiCfsServiceInstanceId2); + var hsiCfsServiceInstance1 = constructHsiCfsServiceInstanceObject(hsiCfsServiceInstanceId1, pnfName1, rgwMacAddress1); - ServiceInstanceAaiObject hsiCfsServiceInstance2 = + var hsiCfsServiceInstance2 = constructHsiCfsServiceInstanceObject(hsiCfsServiceInstanceId2, pnfName2, rgwMacAddress2); // Prepare Mocks - String pnfUrl1 = String.format("/aai/v14/network/pnfs/pnf/%s?depth=all", pnfName1); - String pnfUrl2 = String.format("/aai/v14/network/pnfs/pnf/%s?depth=all", pnfName2); - String cfsUrl1 = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all", + var pnfUrl1 = String.format("/aai/v14/network/pnfs/pnf/%s?depth=all", pnfName1); + var pnfUrl2 = String.format("/aai/v14/network/pnfs/pnf/%s?depth=all", pnfName2); + var cfsUrl1 = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all", hsiCfsServiceInstance1.getServiceInstanceId()); - String cfsUrl2 = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all", + var cfsUrl2 = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all", hsiCfsServiceInstance2.getServiceInstanceId()); when(configuration.getPipelinesTimeoutInSeconds()).thenReturn(10); @@ -339,14 +338,14 @@ class CpeAuthenticationPipelineTest { .executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, cfsUrl2)) .thenReturn(Mono.just(hsiCfsServiceInstance2)); - when(httpResponse.statusCode()).thenReturn(HttpStatus.OK.value()); - when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(httpResponse)); + when(publishResponse.successful()).thenReturn(true); + when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Flux.just(publishResponse)); // Execute the pipeline StepVerifier.create(pipeline.executePipeline()) .expectSubscription() - .assertNext(r -> assertEquals(HttpStatus.OK.value(), r.statusCode())) - .assertNext(r -> assertEquals(HttpStatus.OK.value(), r.statusCode())) + .assertNext(r -> assertTrue(r.successful())) + .assertNext(r -> assertTrue(r.successful())) .verifyComplete(); verify(publisherTask, times(2)).execute(any(ControlLoopPublisherDmaapModel.class)); @@ -355,12 +354,12 @@ class CpeAuthenticationPipelineTest { @Test void singleEvent_withPnfErrorReply_handleGracefully() throws SSLException { - String pnfName = "olt1"; - final String oldAuthenticationState = "outOfService"; - final String newAuthenticationState = "inService"; - final String stateInterface = "stateInterface"; - final String rgwMacAddress = "00:0a:95:8d:78:16"; - final String swVersion = "1.2"; + var pnfName = "olt1"; + var oldAuthenticationState = "outOfService"; + var newAuthenticationState = "inService"; + var stateInterface = "stateInterface"; + var rgwMacAddress = "00:0a:95:8d:78:16"; + var swVersion = "1.2"; // Prepare stubbed replies CpeAuthenticationConsumerDmaapModel event = ImmutableCpeAuthenticationConsumerDmaapModel.builder() @@ -389,90 +388,16 @@ class CpeAuthenticationPipelineTest { } @Test - void twoEvents_FirstOk_SecondUnmatchedMac_handleCorrectOnly() throws SSLException { - - String pnfName1 = "olt1"; - String pnfName2 = "olt2"; - final String oldAuthenticationState = "outOfService"; - final String newAuthenticationState = "inService"; - final String stateInterface = "stateInterface"; - final String rgwMacAddress1 = "00:0a:95:8d:78:16"; - final String rgwMacAddress2 = "00:0a:95:8d:78:17"; - final String swVersion = "1.2"; - String hsiCfsServiceInstanceId1 = UUID.randomUUID().toString(); - String hsiCfsServiceInstanceId2 = UUID.randomUUID().toString(); - - // Prepare stubbed replies - CpeAuthenticationConsumerDmaapModel firstEvent = ImmutableCpeAuthenticationConsumerDmaapModel.builder() - .correlationId(pnfName1) - .oldAuthenticationState(oldAuthenticationState) - .newAuthenticationState(newAuthenticationState) - .stateInterface(stateInterface) - .rgwMacAddress(rgwMacAddress1) - .swVersion(swVersion) - .build(); - CpeAuthenticationConsumerDmaapModel secondEvent = ImmutableCpeAuthenticationConsumerDmaapModel.builder() - .correlationId(pnfName2) - .oldAuthenticationState(oldAuthenticationState) - .newAuthenticationState(newAuthenticationState) - .stateInterface(stateInterface) - .rgwMacAddress(rgwMacAddress2) - .swVersion(swVersion) - .build(); - - PnfAaiObject pnfAaiObject1 = constructPnfObject(pnfName1, hsiCfsServiceInstanceId1); - PnfAaiObject pnfAaiObject2 = constructPnfObject(pnfName2, hsiCfsServiceInstanceId2); - ServiceInstanceAaiObject hsiCfsServiceInstance1 = - constructHsiCfsServiceInstanceObject(hsiCfsServiceInstanceId1, pnfName1, rgwMacAddress1); - ServiceInstanceAaiObject hsiCfsServiceInstance2 = - constructHsiCfsServiceInstanceObject(hsiCfsServiceInstanceId2, pnfName2, - "Having unmatched RGW MAC address"); - - // Prepare Mocks - String pnfUrl1 = String.format("/aai/v14/network/pnfs/pnf/%s?depth=all", pnfName1); - String pnfUrl2 = String.format("/aai/v14/network/pnfs/pnf/%s?depth=all", pnfName2); - String cfsUrl1 = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all", - hsiCfsServiceInstance1.getServiceInstanceId()); - String cfsUrl2 = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all", - hsiCfsServiceInstance2.getServiceInstanceId()); - - when(configuration.getPipelinesTimeoutInSeconds()).thenReturn(10); - when(consumerTask.execute(CONSUME_CPE_AUTHENTICATION_TASK_NAME)) - .thenReturn(Flux.fromIterable(Arrays.asList(firstEvent, secondEvent))); - - when(aaiClientTask.executePnfRetrieval(RETRIEVE_PNF_TASK_NAME, pnfUrl1)).thenReturn(Mono.just(pnfAaiObject1)); - when(aaiClientTask.executePnfRetrieval(RETRIEVE_PNF_TASK_NAME, pnfUrl2)).thenReturn(Mono.just(pnfAaiObject2)); - - when(aaiClientTask - .executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, cfsUrl1)) - .thenReturn(Mono.just(hsiCfsServiceInstance1)); - when(aaiClientTask - .executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, cfsUrl2)) - .thenReturn(Mono.just(hsiCfsServiceInstance2)); - - when(httpResponse.statusCode()).thenReturn(HttpStatus.OK.value()); - when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(httpResponse)); - - // Execute the pipeline - StepVerifier.create(pipeline.executePipeline()) - .expectSubscription() - .assertNext(r -> assertEquals(HttpStatus.OK.value(), r.statusCode())) - .verifyComplete(); - - verify(publisherTask).execute(any(ControlLoopPublisherDmaapModel.class)); - } - - @Test void twoEvents_firstOk_secondWithPnfErrorReply_handleCorrectOnly() throws SSLException { - String pnfName1 = "olt1"; - String pnfName2 = "olt2"; - final String oldAuthenticationState = "outOfService"; - final String newAuthenticationState = "inService"; - final String stateInterface = "stateInterface"; - final String rgwMacAddress = "00:0a:95:8d:78:16"; - final String swVersion = "1.2"; - String hsiCfsServiceInstanceId = UUID.randomUUID().toString(); + var pnfName1 = "olt1"; + var pnfName2 = "olt2"; + var oldAuthenticationState = "outOfService"; + var newAuthenticationState = "inService"; + var stateInterface = "stateInterface"; + var rgwMacAddress = "00:0a:95:8d:78:16"; + var swVersion = "1.2"; + var hsiCfsServiceInstanceId = UUID.randomUUID().toString(); // Prepare stubbed replies CpeAuthenticationConsumerDmaapModel firstEvent = ImmutableCpeAuthenticationConsumerDmaapModel.builder() @@ -492,12 +417,12 @@ class CpeAuthenticationPipelineTest { .swVersion(swVersion) .build(); - PnfAaiObject pnfAaiObject = constructPnfObject(pnfName1, hsiCfsServiceInstanceId); - ServiceInstanceAaiObject hsiCfsServiceInstance = + var pnfAaiObject = constructPnfObject(pnfName1, hsiCfsServiceInstanceId); + var hsiCfsServiceInstance = constructHsiCfsServiceInstanceObject(hsiCfsServiceInstanceId, pnfName1, rgwMacAddress); // Prepare Mocks - String cfsUrl = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all", + var cfsUrl = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all", hsiCfsServiceInstance.getServiceInstanceId()); when(configuration.getPipelinesTimeoutInSeconds()).thenReturn(10); @@ -510,13 +435,13 @@ class CpeAuthenticationPipelineTest { .executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, cfsUrl)) .thenReturn(Mono.just(hsiCfsServiceInstance)); - when(httpResponse.statusCode()).thenReturn(HttpStatus.OK.value()); - when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(httpResponse)); + when(publishResponse.successful()).thenReturn(true); + when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Flux.just(publishResponse)); // Execute the pipeline StepVerifier.create(pipeline.executePipeline()) .expectSubscription() - .assertNext(r -> assertEquals(HttpStatus.OK.value(), r.statusCode())) + .assertNext(r -> assertTrue(r.successful())) .verifyComplete(); verify(aaiClientTask, times(2)).executePnfRetrieval(anyString(), anyString()); @@ -527,14 +452,14 @@ class CpeAuthenticationPipelineTest { @Test void twoEvents_firstWithPnfErrorReply_secondOk_handleCorrectOnly() throws SSLException { - String pnfName1 = "olt1"; - String pnfName2 = "olt2"; - final String oldAuthenticationState = "outOfService"; - final String newAuthenticationState = "inService"; - final String stateInterface = "stateInterface"; - final String rgwMacAddress = "00:0a:95:8d:78:16"; - final String swVersion = "1.2"; - String hsiCfsServiceInstanceId = UUID.randomUUID().toString(); + var pnfName1 = "olt1"; + var pnfName2 = "olt2"; + var oldAuthenticationState = "outOfService"; + var newAuthenticationState = "inService"; + var stateInterface = "stateInterface"; + var rgwMacAddress = "00:0a:95:8d:78:16"; + var swVersion = "1.2"; + var hsiCfsServiceInstanceId = UUID.randomUUID().toString(); // Prepare stubbed replies CpeAuthenticationConsumerDmaapModel firstEvent = ImmutableCpeAuthenticationConsumerDmaapModel.builder() @@ -554,12 +479,12 @@ class CpeAuthenticationPipelineTest { .swVersion(swVersion) .build(); - PnfAaiObject pnfAaiObject = constructPnfObject(pnfName2, hsiCfsServiceInstanceId); - ServiceInstanceAaiObject hsiCfsServiceInstance = + var pnfAaiObject = constructPnfObject(pnfName2, hsiCfsServiceInstanceId); + var hsiCfsServiceInstance = constructHsiCfsServiceInstanceObject(hsiCfsServiceInstanceId, pnfName2, rgwMacAddress); // Prepare Mocks - String cfsUrl = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all", + var cfsUrl = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all", hsiCfsServiceInstance.getServiceInstanceId()); when(configuration.getPipelinesTimeoutInSeconds()).thenReturn(10); @@ -572,13 +497,13 @@ class CpeAuthenticationPipelineTest { .executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, cfsUrl)) .thenReturn(Mono.just(hsiCfsServiceInstance)); - when(httpResponse.statusCode()).thenReturn(HttpStatus.OK.value()); - when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(httpResponse)); + when(publishResponse.successful()).thenReturn(true); + when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Flux.just(publishResponse)); // Execute the pipeline StepVerifier.create(pipeline.executePipeline()) .expectSubscription() - .assertNext(r -> assertEquals(HttpStatus.OK.value(), r.statusCode())) + .assertNext(r -> assertTrue(r.successful())) .verifyComplete(); verify(aaiClientTask, times(2)) @@ -630,7 +555,7 @@ class CpeAuthenticationPipelineTest { private ServiceInstanceAaiObject constructHsiCfsServiceInstanceObject(String hsiCfsServiceInstanceId, String pnfName, String rgwMacAddress) { - String orchestrationStatus = "active"; + var orchestrationStatus = "active"; RelationshipListAaiObject.RelationshipEntryAaiObject relationshipEntry = ImmutableRelationshipEntryAaiObject.builder() diff --git a/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/pipelines/ReRegistrationPipelineTest.java b/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/pipelines/ReRegistrationPipelineTest.java index 9453db3d..815cb5cc 100644 --- a/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/pipelines/ReRegistrationPipelineTest.java +++ b/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/pipelines/ReRegistrationPipelineTest.java @@ -20,7 +20,7 @@ package org.onap.bbs.event.processor.pipelines; -import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.times; @@ -64,8 +64,7 @@ import org.onap.bbs.event.processor.model.ServiceInstanceAaiObject; import org.onap.bbs.event.processor.tasks.AaiClientTask; import org.onap.bbs.event.processor.tasks.DmaapPublisherTask; import org.onap.bbs.event.processor.tasks.DmaapReRegistrationConsumerTask; -import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpResponse; -import org.springframework.http.HttpStatus; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -80,12 +79,12 @@ class ReRegistrationPipelineTest { private DmaapPublisherTask publisherTask; private AaiClientTask aaiClientTask; - private HttpResponse httpResponse; + private MessageRouterPublishResponse publishResponse; @BeforeEach void setup() { - httpResponse = Mockito.mock(HttpResponse.class); + publishResponse = Mockito.mock(MessageRouterPublishResponse.class); configuration = Mockito.mock(ApplicationConfiguration.class); consumerTask = Mockito.mock(DmaapReRegistrationConsumerTask.class); @@ -148,11 +147,11 @@ class ReRegistrationPipelineTest { @Test void noResponseFromAai_PipelineTimesOut() throws SSLException { - String pnfName = "olt1"; - String attachmentPoint = "olt2-2-2"; - String remoteId = "newRemoteId"; - String cvlan = "1005"; - String svlan = "100"; + var pnfName = "olt1"; + var attachmentPoint = "olt2-2-2"; + var remoteId = "newRemoteId"; + var cvlan = "1005"; + var svlan = "100"; // Prepare stubbed replies ReRegistrationConsumerDmaapModel event = ImmutableReRegistrationConsumerDmaapModel.builder() @@ -179,12 +178,12 @@ class ReRegistrationPipelineTest { @Test void noResponseWhilePublishing_PipelineTimesOut() throws SSLException { - String pnfName = "olt1"; - String attachmentPoint = "olt2-2-2"; - String remoteId = "newRemoteId"; - String cvlan = "1005"; - String svlan = "100"; - String hsiCfsServiceInstanceId = UUID.randomUUID().toString(); + var pnfName = "olt1"; + var attachmentPoint = "olt2-2-2"; + var remoteId = "newRemoteId"; + var cvlan = "1005"; + var svlan = "100"; + var hsiCfsServiceInstanceId = UUID.randomUUID().toString(); // Prepare stubbed replies ReRegistrationConsumerDmaapModel event = ImmutableReRegistrationConsumerDmaapModel.builder() @@ -195,12 +194,11 @@ class ReRegistrationPipelineTest { .sVlan(svlan) .build(); - PnfAaiObject pnfAaiObject = constructPnfObject(pnfName, "olt1-1-1", hsiCfsServiceInstanceId); - ServiceInstanceAaiObject hsiCfsServiceInstance = - constructHsiCfsServiceInstanceObject(hsiCfsServiceInstanceId, pnfName, cvlan); + var pnfAaiObject = constructPnfObject(pnfName, "olt1-1-1", hsiCfsServiceInstanceId); + var hsiCfsServiceInstance = constructHsiCfsServiceInstanceObject(hsiCfsServiceInstanceId, pnfName, cvlan); // Prepare Mocks - String cfsUrl = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all", + var cfsUrl = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all", hsiCfsServiceInstance.getServiceInstanceId()); when(configuration.getPipelinesTimeoutInSeconds()).thenReturn(1); @@ -213,7 +211,7 @@ class ReRegistrationPipelineTest { .executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, cfsUrl)) .thenReturn(Mono.just(hsiCfsServiceInstance)); - when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.never()); + when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Flux.never()); // Execute the pipeline StepVerifier.create(pipeline.executePipeline()) @@ -226,12 +224,12 @@ class ReRegistrationPipelineTest { @Test void singleCorrectEvent_PnfHavingNoLogicalLink_handleGracefully() throws SSLException { - String pnfName = "olt1"; - String attachmentPoint = "olt2-2-2"; - String remoteId = "newRemoteId"; - String cvlan = "1005"; - String svlan = "100"; - String hsiCfsServiceInstanceId = UUID.randomUUID().toString(); + var pnfName = "olt1"; + var attachmentPoint = "olt2-2-2"; + var remoteId = "newRemoteId"; + var cvlan = "1005"; + var svlan = "100"; + var hsiCfsServiceInstanceId = UUID.randomUUID().toString(); // Prepare stubbed replies ReRegistrationConsumerDmaapModel event = ImmutableReRegistrationConsumerDmaapModel.builder() @@ -242,12 +240,12 @@ class ReRegistrationPipelineTest { .sVlan(svlan) .build(); - PnfAaiObject pnfAaiObject = constructPnfObjectWithoutLogicalLink(pnfName, hsiCfsServiceInstanceId); - ServiceInstanceAaiObject hsiCfsServiceInstance = + var pnfAaiObject = constructPnfObjectWithoutLogicalLink(pnfName, hsiCfsServiceInstanceId); + var hsiCfsServiceInstance = constructHsiCfsServiceInstanceObject(hsiCfsServiceInstanceId, pnfName, cvlan); // Prepare Mocks - String cfsUrl = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all", + var cfsUrl = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all", hsiCfsServiceInstance.getServiceInstanceId()); when(configuration.getPipelinesTimeoutInSeconds()).thenReturn(10); @@ -260,8 +258,8 @@ class ReRegistrationPipelineTest { .executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, cfsUrl)) .thenReturn(Mono.just(hsiCfsServiceInstance)); - when(httpResponse.statusCode()).thenReturn(HttpStatus.OK.value()); - when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(httpResponse)); + when(publishResponse.successful()).thenReturn(true); + when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Flux.just(publishResponse)); // Execute the pipeline StepVerifier.create(pipeline.executePipeline()) @@ -276,12 +274,12 @@ class ReRegistrationPipelineTest { @Test void singleCorrectEvent_handleSuccessfully() throws SSLException { - String pnfName = "olt1"; - String attachmentPoint = "olt2-2-2"; - String remoteId = "newRemoteId"; - String cvlan = "1005"; - String svlan = "100"; - String hsiCfsServiceInstanceId = UUID.randomUUID().toString(); + var pnfName = "olt1"; + var attachmentPoint = "olt2-2-2"; + var remoteId = "newRemoteId"; + var cvlan = "1005"; + var svlan = "100"; + var hsiCfsServiceInstanceId = UUID.randomUUID().toString(); // Prepare stubbed replies ReRegistrationConsumerDmaapModel event = ImmutableReRegistrationConsumerDmaapModel.builder() @@ -292,12 +290,11 @@ class ReRegistrationPipelineTest { .sVlan(svlan) .build(); - PnfAaiObject pnfAaiObject = constructPnfObject(pnfName, "old-attachment-point", hsiCfsServiceInstanceId); - ServiceInstanceAaiObject hsiCfsServiceInstance = - constructHsiCfsServiceInstanceObject(hsiCfsServiceInstanceId, pnfName, cvlan); + var pnfAaiObject = constructPnfObject(pnfName, "old-attachment-point", hsiCfsServiceInstanceId); + var hsiCfsServiceInstance = constructHsiCfsServiceInstanceObject(hsiCfsServiceInstanceId, pnfName, cvlan); // Prepare Mocks - String cfsUrl = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all", + var cfsUrl = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all", hsiCfsServiceInstance.getServiceInstanceId()); when(configuration.getPipelinesTimeoutInSeconds()).thenReturn(10); @@ -310,13 +307,13 @@ class ReRegistrationPipelineTest { .executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, cfsUrl)) .thenReturn(Mono.just(hsiCfsServiceInstance)); - when(httpResponse.statusCode()).thenReturn(HttpStatus.OK.value()); - when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(httpResponse)); + when(publishResponse.successful()).thenReturn(true); + when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Flux.just(publishResponse)); // Execute the pipeline StepVerifier.create(pipeline.executePipeline()) .expectSubscription() - .assertNext(r -> assertEquals(HttpStatus.OK.value(), r.statusCode())) + .assertNext(r -> assertTrue(r.successful())) .verifyComplete(); verify(publisherTask).execute(any(ControlLoopPublisherDmaapModel.class)); @@ -325,17 +322,17 @@ class ReRegistrationPipelineTest { @Test void twoCorrectEvents_handleSuccessfully() throws SSLException { - String pnfName1 = "olt1"; - String pnfName2 = "olt2"; - String attachmentPoint1 = "olt1-1-1"; - String attachmentPoint2 = "olt2-2-2"; - String remoteId1 = "newRemoteId1"; - String remoteId2 = "newRemoteId2"; - String cvlan1 = "1005"; - String cvlan2 = "1006"; - String svlan = "100"; - String hsiCfsServiceInstanceId1 = UUID.randomUUID().toString(); - String hsiCfsServiceInstanceId2 = UUID.randomUUID().toString(); + var pnfName1 = "olt1"; + var pnfName2 = "olt2"; + var attachmentPoint1 = "olt1-1-1"; + var attachmentPoint2 = "olt2-2-2"; + var remoteId1 = "newRemoteId1"; + var remoteId2 = "newRemoteId2"; + var cvlan1 = "1005"; + var cvlan2 = "1006"; + var svlan = "100"; + var hsiCfsServiceInstanceId1 = UUID.randomUUID().toString(); + var hsiCfsServiceInstanceId2 = UUID.randomUUID().toString(); // Prepare stubbed replies ReRegistrationConsumerDmaapModel firstEvent = ImmutableReRegistrationConsumerDmaapModel.builder() @@ -353,19 +350,17 @@ class ReRegistrationPipelineTest { .sVlan(svlan) .build(); - PnfAaiObject pnfAaiObject1 = constructPnfObject(pnfName1, "olt1-1-0", hsiCfsServiceInstanceId1); - PnfAaiObject pnfAaiObject2 = constructPnfObject(pnfName2, "olt2-2-0", hsiCfsServiceInstanceId2); - ServiceInstanceAaiObject hsiCfsServiceInstance1 = - constructHsiCfsServiceInstanceObject(hsiCfsServiceInstanceId1, pnfName1, cvlan1); - ServiceInstanceAaiObject hsiCfsServiceInstance2 = - constructHsiCfsServiceInstanceObject(hsiCfsServiceInstanceId2, pnfName2, cvlan2); + var pnfAaiObject1 = constructPnfObject(pnfName1, "olt1-1-0", hsiCfsServiceInstanceId1); + var pnfAaiObject2 = constructPnfObject(pnfName2, "olt2-2-0", hsiCfsServiceInstanceId2); + var hsiCfsServiceInstance1 = constructHsiCfsServiceInstanceObject(hsiCfsServiceInstanceId1, pnfName1, cvlan1); + var hsiCfsServiceInstance2 = constructHsiCfsServiceInstanceObject(hsiCfsServiceInstanceId2, pnfName2, cvlan2); // Prepare Mocks - String pnfUrl1 = String.format("/aai/v14/network/pnfs/pnf/%s?depth=all", pnfName1); - String pnfUrl2 = String.format("/aai/v14/network/pnfs/pnf/%s?depth=all", pnfName2); - String cfsUrl1 = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all", + var pnfUrl1 = String.format("/aai/v14/network/pnfs/pnf/%s?depth=all", pnfName1); + var pnfUrl2 = String.format("/aai/v14/network/pnfs/pnf/%s?depth=all", pnfName2); + var cfsUrl1 = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all", hsiCfsServiceInstance1.getServiceInstanceId()); - String cfsUrl2 = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all", + var cfsUrl2 = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all", hsiCfsServiceInstance2.getServiceInstanceId()); when(configuration.getPipelinesTimeoutInSeconds()).thenReturn(10); @@ -382,14 +377,14 @@ class ReRegistrationPipelineTest { .executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, cfsUrl2)) .thenReturn(Mono.just(hsiCfsServiceInstance2)); - when(httpResponse.statusCode()).thenReturn(HttpStatus.OK.value()); - when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(httpResponse)); + when(publishResponse.successful()).thenReturn(true); + when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Flux.just(publishResponse)); // Execute the pipeline StepVerifier.create(pipeline.executePipeline()) .expectSubscription() - .assertNext(r -> assertEquals(HttpStatus.OK.value(), r.statusCode())) - .assertNext(r -> assertEquals(HttpStatus.OK.value(), r.statusCode())) + .assertNext(r -> assertTrue(r.successful())) + .assertNext(r -> assertTrue(r.successful())) .verifyComplete(); verify(publisherTask, times(2)).execute(any(ControlLoopPublisherDmaapModel.class)); @@ -398,11 +393,11 @@ class ReRegistrationPipelineTest { @Test void singleEvent_withPnfErrorReply_handleGracefully() throws SSLException { - String pnfName = "olt1"; - String attachmentPoint = "olt2-2-2"; - String remoteId = "newRemoteId"; - String cvlan = "1005"; - String svlan = "100"; + var pnfName = "olt1"; + var attachmentPoint = "olt2-2-2"; + var remoteId = "newRemoteId"; + var cvlan = "1005"; + var svlan = "100"; // Prepare stubbed replies ReRegistrationConsumerDmaapModel event = ImmutableReRegistrationConsumerDmaapModel.builder() @@ -432,17 +427,17 @@ class ReRegistrationPipelineTest { @Test void twoEvents_FirstOk_SecondNotRelocation_handleCorrectOnly() throws SSLException { - String pnfName1 = "olt1"; - String pnfName2 = "olt2"; - String attachmentPoint1 = "olt1-1-1"; - String attachmentPoint2 = "olt2-2-2"; - String remoteId1 = "newRemoteId1"; - String remoteId2 = "newRemoteId2"; - String cvlan1 = "1005"; - String cvlan2 = "1006"; - String svlan = "100"; - String hsiCfsServiceInstanceId1 = UUID.randomUUID().toString(); - String hsiCfsServiceInstanceId2 = UUID.randomUUID().toString(); + var pnfName1 = "olt1"; + var pnfName2 = "olt2"; + var attachmentPoint1 = "olt1-1-1"; + var attachmentPoint2 = "olt2-2-2"; + var remoteId1 = "newRemoteId1"; + var remoteId2 = "newRemoteId2"; + var cvlan1 = "1005"; + var cvlan2 = "1006"; + var svlan = "100"; + var hsiCfsServiceInstanceId1 = UUID.randomUUID().toString(); + var hsiCfsServiceInstanceId2 = UUID.randomUUID().toString(); // Prepare stubbed replies ReRegistrationConsumerDmaapModel firstEvent = ImmutableReRegistrationConsumerDmaapModel.builder() @@ -460,19 +455,17 @@ class ReRegistrationPipelineTest { .sVlan(svlan) .build(); - PnfAaiObject pnfAaiObject1 = constructPnfObject(pnfName1, "olt1-1-0", hsiCfsServiceInstanceId1); - PnfAaiObject pnfAaiObject2 = constructPnfObject(pnfName2, attachmentPoint2, hsiCfsServiceInstanceId2); - ServiceInstanceAaiObject hsiCfsServiceInstance1 = - constructHsiCfsServiceInstanceObject(hsiCfsServiceInstanceId1, pnfName1, cvlan1); - ServiceInstanceAaiObject hsiCfsServiceInstance2 = - constructHsiCfsServiceInstanceObject(hsiCfsServiceInstanceId2, pnfName2, cvlan2); + var pnfAaiObject1 = constructPnfObject(pnfName1, "olt1-1-0", hsiCfsServiceInstanceId1); + var pnfAaiObject2 = constructPnfObject(pnfName2, attachmentPoint2, hsiCfsServiceInstanceId2); + var hsiCfsServiceInstance1 = constructHsiCfsServiceInstanceObject(hsiCfsServiceInstanceId1, pnfName1, cvlan1); + var hsiCfsServiceInstance2 = constructHsiCfsServiceInstanceObject(hsiCfsServiceInstanceId2, pnfName2, cvlan2); // Prepare Mocks - String pnfUrl1 = String.format("/aai/v14/network/pnfs/pnf/%s?depth=all", pnfName1); - String pnfUrl2 = String.format("/aai/v14/network/pnfs/pnf/%s?depth=all", pnfName2); - String cfsUrl1 = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all", + var pnfUrl1 = String.format("/aai/v14/network/pnfs/pnf/%s?depth=all", pnfName1); + var pnfUrl2 = String.format("/aai/v14/network/pnfs/pnf/%s?depth=all", pnfName2); + var cfsUrl1 = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all", hsiCfsServiceInstance1.getServiceInstanceId()); - String cfsUrl2 = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all", + var cfsUrl2 = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all", hsiCfsServiceInstance2.getServiceInstanceId()); when(configuration.getPipelinesTimeoutInSeconds()).thenReturn(10); @@ -489,13 +482,13 @@ class ReRegistrationPipelineTest { .executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, cfsUrl2)) .thenReturn(Mono.just(hsiCfsServiceInstance2)); - when(httpResponse.statusCode()).thenReturn(HttpStatus.OK.value()); - when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(httpResponse)); + when(publishResponse.successful()).thenReturn(true); + when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Flux.just(publishResponse)); // Execute the pipeline StepVerifier.create(pipeline.executePipeline()) .expectSubscription() - .assertNext(r -> assertEquals(HttpStatus.OK.value(), r.statusCode())) + .assertNext(r -> assertTrue(r.successful())) .verifyComplete(); verify(publisherTask).execute(any(ControlLoopPublisherDmaapModel.class)); @@ -504,16 +497,16 @@ class ReRegistrationPipelineTest { @Test void twoEvents_firstOk_secondWithPnfErrorReply_handleCorrectOnly() throws SSLException { - String pnfName1 = "olt1"; - String pnfName2 = "olt2"; - String attachmentPoint1 = "olt1-1-1"; - String attachmentPoint2 = "olt2-2-2"; - String remoteId1 = "newRemoteId1"; - String remoteId2 = "newRemoteId2"; - String cvlan1 = "1005"; - String cvlan2 = "1006"; - String svlan = "100"; - String hsiCfsServiceInstanceId = UUID.randomUUID().toString(); + var pnfName1 = "olt1"; + var pnfName2 = "olt2"; + var attachmentPoint1 = "olt1-1-1"; + var attachmentPoint2 = "olt2-2-2"; + var remoteId1 = "newRemoteId1"; + var remoteId2 = "newRemoteId2"; + var cvlan1 = "1005"; + var cvlan2 = "1006"; + var svlan = "100"; + var hsiCfsServiceInstanceId = UUID.randomUUID().toString(); // Prepare stubbed replies ReRegistrationConsumerDmaapModel firstEvent = ImmutableReRegistrationConsumerDmaapModel.builder() @@ -531,12 +524,11 @@ class ReRegistrationPipelineTest { .sVlan(svlan) .build(); - PnfAaiObject pnfAaiObject = constructPnfObject(pnfName1, "old-attachment-point", hsiCfsServiceInstanceId); - ServiceInstanceAaiObject hsiCfsServiceInstance = - constructHsiCfsServiceInstanceObject(hsiCfsServiceInstanceId, pnfName1, cvlan1); + var pnfAaiObject = constructPnfObject(pnfName1, "old-attachment-point", hsiCfsServiceInstanceId); + var hsiCfsServiceInstance = constructHsiCfsServiceInstanceObject(hsiCfsServiceInstanceId, pnfName1, cvlan1); // Prepare Mocks - String cfsUrl = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all", + var cfsUrl = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all", hsiCfsServiceInstance.getServiceInstanceId()); when(configuration.getPipelinesTimeoutInSeconds()).thenReturn(10); @@ -549,13 +541,13 @@ class ReRegistrationPipelineTest { .executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, cfsUrl)) .thenReturn(Mono.just(hsiCfsServiceInstance)); - when(httpResponse.statusCode()).thenReturn(HttpStatus.OK.value()); - when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(httpResponse)); + when(publishResponse.successful()).thenReturn(true); + when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Flux.just(publishResponse)); // Execute the pipeline StepVerifier.create(pipeline.executePipeline()) .expectSubscription() - .assertNext(r -> assertEquals(HttpStatus.OK.value(), r.statusCode())) + .assertNext(r -> assertTrue(r.successful())) .verifyComplete(); verify(aaiClientTask, times(2)).executePnfRetrieval(anyString(), anyString()); @@ -566,16 +558,16 @@ class ReRegistrationPipelineTest { @Test void twoEvents_firstWithPnfErrorReply_secondOk_handleCorrectOnly() throws SSLException { - String pnfName1 = "olt1"; - String pnfName2 = "olt2"; - String attachmentPoint1 = "olt1-1-1"; - String attachmentPoint2 = "olt2-2-2"; - String remoteId1 = "newRemoteId1"; - String remoteId2 = "newRemoteId2"; - String cvlan1 = "1005"; - String cvlan2 = "1006"; - String svlan = "100"; - String hsiCfsServiceInstanceId = UUID.randomUUID().toString(); + var pnfName1 = "olt1"; + var pnfName2 = "olt2"; + var attachmentPoint1 = "olt1-1-1"; + var attachmentPoint2 = "olt2-2-2"; + var remoteId1 = "newRemoteId1"; + var remoteId2 = "newRemoteId2"; + var cvlan1 = "1005"; + var cvlan2 = "1006"; + var svlan = "100"; + var hsiCfsServiceInstanceId = UUID.randomUUID().toString(); // Prepare stubbed replies ReRegistrationConsumerDmaapModel firstEvent = ImmutableReRegistrationConsumerDmaapModel.builder() @@ -593,12 +585,11 @@ class ReRegistrationPipelineTest { .sVlan(svlan) .build(); - PnfAaiObject pnfAaiObject = constructPnfObject(pnfName2, "old-attachment-point", hsiCfsServiceInstanceId); - ServiceInstanceAaiObject hsiCfsServiceInstance = - constructHsiCfsServiceInstanceObject(hsiCfsServiceInstanceId, pnfName2, cvlan2); + var pnfAaiObject = constructPnfObject(pnfName2, "old-attachment-point", hsiCfsServiceInstanceId); + var hsiCfsServiceInstance = constructHsiCfsServiceInstanceObject(hsiCfsServiceInstanceId, pnfName2, cvlan2); // Prepare Mocks - String cfsUrl = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all", + var cfsUrl = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all", hsiCfsServiceInstance.getServiceInstanceId()); when(configuration.getPipelinesTimeoutInSeconds()).thenReturn(10); @@ -611,13 +602,13 @@ class ReRegistrationPipelineTest { .executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, cfsUrl)) .thenReturn(Mono.just(hsiCfsServiceInstance)); - when(httpResponse.statusCode()).thenReturn(HttpStatus.OK.value()); - when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(httpResponse)); + when(publishResponse.successful()).thenReturn(true); + when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Flux.just(publishResponse)); // Execute the pipeline StepVerifier.create(pipeline.executePipeline()) .expectSubscription() - .assertNext(r -> assertEquals(HttpStatus.OK.value(), r.statusCode())) + .assertNext(r -> assertTrue(r.successful())) .verifyComplete(); verify(aaiClientTask, times(2)) @@ -719,7 +710,7 @@ class ReRegistrationPipelineTest { private ServiceInstanceAaiObject constructHsiCfsServiceInstanceObject(String hsiCfsServiceInstanceId, String pnfName, String cvlan) { - String orchestrationStatus = "active"; + var orchestrationStatus = "active"; RelationshipListAaiObject.RelationshipEntryAaiObject relationshipEntry = ImmutableRelationshipEntryAaiObject.builder() diff --git a/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/pipelines/SchedulerTest.java b/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/pipelines/SchedulerTest.java index f721ca7e..b037e2cb 100644 --- a/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/pipelines/SchedulerTest.java +++ b/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/pipelines/SchedulerTest.java @@ -50,9 +50,9 @@ class SchedulerTest { SchedulerTest() { configuration = Mockito.mock(ApplicationConfiguration.class); taskScheduler = Mockito.mock(TaskScheduler.class); - ReRegistrationPipeline reRegistrationPipeline = Mockito.mock(ReRegistrationPipeline.class); - CpeAuthenticationPipeline cpeAuthenticationPipeline = Mockito.mock(CpeAuthenticationPipeline.class); - ConsulConfigurationGateway configurationGateway = Mockito.mock(ConsulConfigurationGateway.class); + var reRegistrationPipeline = Mockito.mock(ReRegistrationPipeline.class); + var cpeAuthenticationPipeline = Mockito.mock(CpeAuthenticationPipeline.class); + var configurationGateway = Mockito.mock(ConsulConfigurationGateway.class); this.applicationScheduler = new Scheduler(configuration, configurationGateway, taskScheduler, reRegistrationPipeline, cpeAuthenticationPipeline); } @@ -60,7 +60,7 @@ class SchedulerTest { @Test void scheduleTasksWithValidSchedulingPeriod_Succeeds() { when(configuration.getPipelinesPollingIntervalInSeconds()).thenReturn(20); - ScheduledFuture scheduledFuture = Mockito.mock(ScheduledFuture.class); + var scheduledFuture = Mockito.mock(ScheduledFuture.class); when(taskScheduler.scheduleAtFixedRate(any(Runnable.class), any(Instant.class), any(Duration.class))) .thenReturn(scheduledFuture); @@ -75,8 +75,8 @@ class SchedulerTest { @Test void cancellingRunningTasksSucceeds_tasksAreDeleted() { when(configuration.getPipelinesPollingIntervalInSeconds()).thenReturn(20); - ScheduledFuture scheduledFuture1 = Mockito.mock(ScheduledFuture.class); - ScheduledFuture scheduledFuture2 = Mockito.mock(ScheduledFuture.class); + var scheduledFuture1 = Mockito.mock(ScheduledFuture.class); + var scheduledFuture2 = Mockito.mock(ScheduledFuture.class); when(scheduledFuture1.cancel(false)).thenReturn(true); when(scheduledFuture2.cancel(false)).thenReturn(true); when(scheduledFuture1.isCancelled()).thenReturn(true); @@ -85,7 +85,7 @@ class SchedulerTest { .thenReturn(scheduledFuture1).thenReturn(scheduledFuture2); applicationScheduler.setupScheduler(); - boolean result = applicationScheduler.cancelScheduledProcessingTasks(); + var result = applicationScheduler.cancelScheduledProcessingTasks(); assertAll("Successfully cancelling tasks", () -> assertTrue(result, "Result of cancellation task"), () -> assertEquals(0, applicationScheduler.numberOfTotalTasks(), "Total tasks"), @@ -97,8 +97,8 @@ class SchedulerTest { @Test void cancellingRunningTasksPartiallyFailing_tasksAreNotDeleted() { when(configuration.getPipelinesPollingIntervalInSeconds()).thenReturn(20); - ScheduledFuture scheduledFuture1 = Mockito.mock(ScheduledFuture.class); - ScheduledFuture scheduledFuture2 = Mockito.mock(ScheduledFuture.class); + var scheduledFuture1 = Mockito.mock(ScheduledFuture.class); + var scheduledFuture2 = Mockito.mock(ScheduledFuture.class); when(scheduledFuture1.cancel(false)).thenReturn(true); when(scheduledFuture2.cancel(false)).thenReturn(false); when(scheduledFuture1.isCancelled()).thenReturn(true); @@ -107,7 +107,7 @@ class SchedulerTest { .thenReturn(scheduledFuture1).thenReturn(scheduledFuture2); applicationScheduler.setupScheduler(); - boolean result = applicationScheduler.cancelScheduledProcessingTasks(); + var result = applicationScheduler.cancelScheduledProcessingTasks(); assertAll("Partially cancelling tasks", () -> assertFalse(result, "Result of cancellation task"), () -> assertEquals(1, applicationScheduler.numberOfTotalTasks(), "Total tasks"), @@ -119,8 +119,8 @@ class SchedulerTest { @Test void cancellingRunningTasksFailingForAllOfThem_noTasksAreDeleted() { when(configuration.getPipelinesPollingIntervalInSeconds()).thenReturn(20); - ScheduledFuture scheduledFuture1 = Mockito.mock(ScheduledFuture.class); - ScheduledFuture scheduledFuture2 = Mockito.mock(ScheduledFuture.class); + var scheduledFuture1 = Mockito.mock(ScheduledFuture.class); + var scheduledFuture2 = Mockito.mock(ScheduledFuture.class); when(scheduledFuture1.cancel(false)).thenReturn(false); when(scheduledFuture2.cancel(false)).thenReturn(false); when(scheduledFuture1.isCancelled()).thenReturn(false); @@ -129,7 +129,7 @@ class SchedulerTest { .thenReturn(scheduledFuture1).thenReturn(scheduledFuture2); applicationScheduler.setupScheduler(); - boolean result = applicationScheduler.cancelScheduledProcessingTasks(); + var result = applicationScheduler.cancelScheduledProcessingTasks(); assertAll("Failing in cancelling tasks", () -> assertFalse(result, "Result of cancellation task"), () -> assertEquals(2, applicationScheduler.numberOfTotalTasks(), "Total tasks"), @@ -141,15 +141,15 @@ class SchedulerTest { @Test void reSchedulingWithExistingActiveTasks_Fails() { when(configuration.getPipelinesPollingIntervalInSeconds()).thenReturn(20); - ScheduledFuture scheduledFuture1 = Mockito.mock(ScheduledFuture.class); - ScheduledFuture scheduledFuture2 = Mockito.mock(ScheduledFuture.class); + var scheduledFuture1 = Mockito.mock(ScheduledFuture.class); + var scheduledFuture2 = Mockito.mock(ScheduledFuture.class); when(scheduledFuture1.isCancelled()).thenReturn(false); when(scheduledFuture2.isCancelled()).thenReturn(false); when(taskScheduler.scheduleAtFixedRate(any(Runnable.class), any(Instant.class), any(Duration.class))) .thenReturn(scheduledFuture1).thenReturn(scheduledFuture2); applicationScheduler.setupScheduler(); - boolean result = applicationScheduler.reScheduleProcessingTasks(); + var result = applicationScheduler.reScheduleProcessingTasks(); assertAll("Rescheduling with active tasks", () -> assertFalse(result, "Result of re-scheduling"), () -> assertEquals(2, applicationScheduler.numberOfTotalTasks(), "Total tasks"), @@ -162,11 +162,11 @@ class SchedulerTest { void reSchedulingWithExistingCancelledTasks_Succeeds() { when(configuration.getPipelinesPollingIntervalInSeconds()).thenReturn(20); // Initial tasks - ScheduledFuture scheduledFuture1 = Mockito.mock(ScheduledFuture.class); - ScheduledFuture scheduledFuture2 = Mockito.mock(ScheduledFuture.class); + var scheduledFuture1 = Mockito.mock(ScheduledFuture.class); + var scheduledFuture2 = Mockito.mock(ScheduledFuture.class); // Re-scheduled tasks - ScheduledFuture scheduledFuture3 = Mockito.mock(ScheduledFuture.class); - ScheduledFuture scheduledFuture4 = Mockito.mock(ScheduledFuture.class); + var scheduledFuture3 = Mockito.mock(ScheduledFuture.class); + var scheduledFuture4 = Mockito.mock(ScheduledFuture.class); when(scheduledFuture1.isCancelled()).thenReturn(true); when(scheduledFuture2.isCancelled()).thenReturn(true); when(scheduledFuture3.isCancelled()).thenReturn(false); @@ -178,7 +178,7 @@ class SchedulerTest { .thenReturn(scheduledFuture4); applicationScheduler.setupScheduler(); - boolean result = applicationScheduler.reScheduleProcessingTasks(); + var result = applicationScheduler.reScheduleProcessingTasks(); assertAll("Rescheduling with cancelled tasks", () -> assertTrue(result, "Result of re-scheduling"), () -> assertEquals(2, applicationScheduler.numberOfTotalTasks(), "Total tasks"), diff --git a/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/tasks/AaiClientTaskImplTest.java b/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/tasks/AaiClientTaskImplTest.java index db5f7cb1..e13b8bc1 100644 --- a/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/tasks/AaiClientTaskImplTest.java +++ b/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/tasks/AaiClientTaskImplTest.java @@ -64,7 +64,7 @@ class AaiClientTaskImplTest { @BeforeEach void init() { - GsonBuilder gsonBuilder = new GsonBuilder(); + var gsonBuilder = new GsonBuilder(); ServiceLoader.load(TypeAdapterFactory.class).forEach(gsonBuilder::registerTypeAdapterFactory); reactiveClient = Mockito.mock(AaiReactiveClient.class); task = new AaiClientTaskImpl(reactiveClient); @@ -73,7 +73,7 @@ class AaiClientTaskImplTest { @Test void passingEmptyPnfObject_NothingHappens() throws AaiTaskException { when(reactiveClient.getPnfObjectDataFor(any(String.class))).thenReturn(Mono.empty()); - Mono<PnfAaiObject> pnf = task.executePnfRetrieval("Empty PNF task", "some-url"); + var pnf = task.executePnfRetrieval("Empty PNF task", "some-url"); verify(reactiveClient).getPnfObjectDataFor("some-url"); assertNull(pnf.block(Duration.ofSeconds(5))); @@ -82,7 +82,7 @@ class AaiClientTaskImplTest { @Test void passingEmptyServiceInstanceObject_NothingHappens() throws AaiTaskException { when(reactiveClient.getServiceInstanceObjectDataFor(any(String.class))).thenReturn(Mono.empty()); - Mono<ServiceInstanceAaiObject> serviceInstance = + var serviceInstance = task.executeServiceInstanceRetrieval("Empty Service Instance task", "some-url"); verify(reactiveClient).getServiceInstanceObjectDataFor("some-url"); @@ -92,8 +92,8 @@ class AaiClientTaskImplTest { @Test void passingPnfObject_taskSucceeds() throws AaiTaskException { - String pnfName = "pnf-1"; - String attachmentPoint = "olt1-1-1"; + var pnfName = "pnf-1"; + var attachmentPoint = "olt1-1-1"; // Build Relationship Data RelationshipListAaiObject.RelationshipEntryAaiObject firstRelationshipEntry = @@ -142,7 +142,7 @@ class AaiClientTaskImplTest { .build(); when(reactiveClient.getPnfObjectDataFor(any(String.class))).thenReturn(Mono.just(pnfAaiObject)); - Mono<PnfAaiObject> pnf = task.executePnfRetrieval("Normal PNF retrieval task", "some-url"); + var pnf = task.executePnfRetrieval("Normal PNF retrieval task", "some-url"); verify(reactiveClient).getPnfObjectDataFor("some-url"); assertNotNull(pnf.block(Duration.ofSeconds(5))); @@ -151,7 +151,7 @@ class AaiClientTaskImplTest { .expectSubscription() .consumeNextWith(aPnf -> { Assertions.assertEquals(pnfName, aPnf.getPnfName(), "PNF Name in response does not match"); - String extractedAttachmentPoint = aPnf.getRelationshipListAaiObject().getRelationshipEntries() + var extractedAttachmentPoint = aPnf.getRelationshipListAaiObject().getRelationshipEntries() .stream() .filter(e -> e.getRelatedTo().equals("logical-link")) .flatMap(e -> e.getRelationshipData().stream()) @@ -167,8 +167,8 @@ class AaiClientTaskImplTest { @Test void passingServiceInstanceObject_taskSucceeds() throws AaiTaskException { - String serviceInstanceId = "84003b26-6b76-4c75-b805-7b14ab4ffaef"; - String orchestrationStatus = "active"; + var serviceInstanceId = "84003b26-6b76-4c75-b805-7b14ab4ffaef"; + var orchestrationStatus = "active"; // Build Relationship Data RelationshipListAaiObject.RelationshipEntryAaiObject relationshipEntry = @@ -207,7 +207,7 @@ class AaiClientTaskImplTest { when(reactiveClient.getServiceInstanceObjectDataFor(any(String.class))) .thenReturn(Mono.just(serviceInstanceAaiObject)); - Mono<ServiceInstanceAaiObject> serviceInstance = + var serviceInstance = task.executeServiceInstanceRetrieval("Normal Service Instance retrieval task", "some-url"); @@ -220,10 +220,10 @@ class AaiClientTaskImplTest { Assertions.assertEquals(serviceInstanceId, instance.getServiceInstanceId(), "Service Instance ID in response does not match"); - MetadataListAaiObject extractedMetadataListObject = + var extractedMetadataListObject = instance.getMetadataListAaiObject().orElseThrow(AaiClientTaskTestException::new); - MetadataListAaiObject.MetadataEntryAaiObject extractedMetadataEntry = + var extractedMetadataEntry = extractedMetadataListObject.getMetadataEntries() .stream() .filter(m -> m.getMetaname().equals("cvlan")) diff --git a/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/tasks/DmaapCpeAuthenticationConsumerTaskImplTest.java b/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/tasks/DmaapCpeAuthenticationConsumerTaskImplTest.java index 40bcb65d..f644d443 100644 --- a/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/tasks/DmaapCpeAuthenticationConsumerTaskImplTest.java +++ b/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/tasks/DmaapCpeAuthenticationConsumerTaskImplTest.java @@ -20,38 +20,39 @@ package org.onap.bbs.event.processor.tasks; -import static org.mockito.Mockito.doReturn; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; import com.google.gson.Gson; -import com.google.gson.JsonElement; - -import java.util.Optional; - -import javax.net.ssl.SSLException; import org.junit.Assert; -import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import org.onap.bbs.event.processor.config.ApplicationConfiguration; +import org.onap.bbs.event.processor.config.DmaapCpeAuthenticationConsumerProperties; import org.onap.bbs.event.processor.exceptions.EmptyDmaapResponseException; import org.onap.bbs.event.processor.model.CpeAuthenticationConsumerDmaapModel; import org.onap.bbs.event.processor.model.ImmutableCpeAuthenticationConsumerDmaapModel; import org.onap.bbs.event.processor.utilities.CpeAuthenticationDmaapConsumerJsonParser; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapConsumerConfiguration; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.ConsumerReactiveHttpClientFactory; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.DMaaPConsumerReactiveHttpClient; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterSubscriber; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterSubscriberConfig; -import reactor.core.publisher.Mono; +import reactor.core.publisher.Flux; import reactor.test.StepVerifier; class DmaapCpeAuthenticationConsumerTaskImplTest { + private static final String DMAAP_PROTOCOL = "http"; + private static final String DMAAP_HOST = "message-router.onap.svc.cluster.local"; + private static final int DMAAP_PORT = 3904; + private static final String DMAAP_TOPIC = "unauthenticated.CPE_AUTHENTICATION"; + private static final String SUBSCRIBER_ID = "subscriberID"; + private static final String SUBSCRIBER_GROUP = "subscriberGroup"; + private static final String CPE_AUTHENTICATION_EVENT_TEMPLATE = "{\"event\": {" + "\"commonEventHeader\": { \"sourceName\":\"%s\"}," + "\"stateChangeFields\": {" @@ -65,32 +66,34 @@ class DmaapCpeAuthenticationConsumerTaskImplTest { private static DmaapCpeAuthenticationConsumerTask dmaapConsumerTask; private static CpeAuthenticationConsumerDmaapModel cpeAuthenticationConsumerDmaapModel; - private static DMaaPConsumerReactiveHttpClient dMaaPConsumerReactiveHttpClient; private static String eventsArray; + private static MessageRouterSubscriber subscriber; private static Gson gson = new Gson(); + private static ApplicationConfiguration configuration; @BeforeAll - static void setUp() throws SSLException { - - final String sourceName = "PNF-CorrelationId"; - final String oldAuthenticationState = "outOfService"; - final String newAuthenticationState = "inService"; - final String stateInterface = "stateInterface"; - final String rgwMacAddress = "00:0a:95:8d:78:16"; - final String swVersion = "1.2"; + static void setUp() { // Mock Re-registration configuration - DmaapConsumerConfiguration dmaapConsumerConfiguration = testVersionOfDmaapConsumerConfiguration(); - ApplicationConfiguration configuration = mock(ApplicationConfiguration.class); - when(configuration.getDmaapCpeAuthenticationConsumerConfiguration()).thenReturn(dmaapConsumerConfiguration); - - // Mock reactive DMaaP client - ConsumerReactiveHttpClientFactory httpClientFactory = mock(ConsumerReactiveHttpClientFactory.class); - dMaaPConsumerReactiveHttpClient = mock(DMaaPConsumerReactiveHttpClient.class); - doReturn(dMaaPConsumerReactiveHttpClient).when(httpClientFactory).create(dmaapConsumerConfiguration); - - dmaapConsumerTask = new DmaapCpeAuthenticationConsumerTaskImpl(configuration, - new CpeAuthenticationDmaapConsumerJsonParser(), httpClientFactory); + configuration = mock(ApplicationConfiguration.class); + var props = mock(DmaapCpeAuthenticationConsumerProperties.class); + when(props.getDmaapProtocol()).thenReturn(DMAAP_PROTOCOL); + when(props.getDmaapHostName()).thenReturn(DMAAP_HOST); + when(props.getDmaapPortNumber()).thenReturn(DMAAP_PORT); + when(props.getDmaapTopicName()).thenReturn(DMAAP_TOPIC); + when(props.getConsumerId()).thenReturn(SUBSCRIBER_ID); + when(props.getConsumerGroup()).thenReturn(SUBSCRIBER_GROUP); + when(configuration.getDmaapCpeAuthenticationConsumerProperties()).thenReturn(props); + + var subscriberConfig = mock(MessageRouterSubscriberConfig.class); + when(configuration.getDmaapCpeAuthenticationConsumerConfiguration()).thenReturn(subscriberConfig); + + var sourceName = "PNF-CorrelationId"; + var oldAuthenticationState = "outOfService"; + var newAuthenticationState = "inService"; + var stateInterface = "stateInterface"; + var rgwMacAddress = "00:0a:95:8d:78:16"; + var swVersion = "1.2"; cpeAuthenticationConsumerDmaapModel = ImmutableCpeAuthenticationConsumerDmaapModel.builder() .correlationId(sourceName) @@ -101,58 +104,42 @@ class DmaapCpeAuthenticationConsumerTaskImplTest { .swVersion(swVersion) .build(); - String event = String.format(CPE_AUTHENTICATION_EVENT_TEMPLATE, sourceName, oldAuthenticationState, + var event = String.format(CPE_AUTHENTICATION_EVENT_TEMPLATE, sourceName, oldAuthenticationState, newAuthenticationState, stateInterface, rgwMacAddress, swVersion); eventsArray = "[" + event + "]"; } - @AfterEach - void resetMock() { - reset(dMaaPConsumerReactiveHttpClient); - } - @Test void passingEmptyMessage_NothingHappens() throws Exception { - JsonElement empty = gson.toJsonTree(""); - when(dMaaPConsumerReactiveHttpClient.getDMaaPConsumerResponse(Optional.empty())).thenReturn(Mono.just(empty)); + var empty = gson.toJsonTree(""); + subscriber = mock(MessageRouterSubscriber.class); + when(subscriber.getElements(any())).thenReturn(Flux.just(empty)); + + dmaapConsumerTask = new DmaapCpeAuthenticationConsumerTaskImpl(configuration, subscriber, + new CpeAuthenticationDmaapConsumerJsonParser()); StepVerifier.create(dmaapConsumerTask.execute("Sample input")) .expectSubscription() .expectError(EmptyDmaapResponseException.class); - verify(dMaaPConsumerReactiveHttpClient).getDMaaPConsumerResponse(Optional.empty()); + + verify(subscriber, times(1)).getElements(any()); + verifyNoMoreInteractions(subscriber); } @Test void passingNormalMessage_ResponseSucceeds() throws Exception { - JsonElement normalEventsArray = gson.toJsonTree(eventsArray); - when(dMaaPConsumerReactiveHttpClient.getDMaaPConsumerResponse(Optional.empty())) - .thenReturn(Mono.just(normalEventsArray)); + var normalEventsArray = gson.toJsonTree(eventsArray); + subscriber = mock(MessageRouterSubscriber.class); + when(subscriber.getElements(any())).thenReturn(Flux.just(normalEventsArray)); + + dmaapConsumerTask = new DmaapCpeAuthenticationConsumerTaskImpl(configuration, subscriber, + new CpeAuthenticationDmaapConsumerJsonParser()); StepVerifier.create(dmaapConsumerTask.execute("Sample input")) .expectSubscription() .consumeNextWith(e -> Assert.assertEquals(e, cpeAuthenticationConsumerDmaapModel)); - verify(dMaaPConsumerReactiveHttpClient).getDMaaPConsumerResponse(Optional.empty()); - } - - private static DmaapConsumerConfiguration testVersionOfDmaapConsumerConfiguration() { - return new ImmutableDmaapConsumerConfiguration.Builder() - .consumerGroup("consumer-group") - .consumerId("consumer-id") - .dmaapContentType("application/json") - .dmaapHostName("message-router.onap.svc.cluster.local") - .dmaapPortNumber(3904) - .dmaapProtocol("http") - .dmaapUserName("admin") - .dmaapUserPassword("admin") - .trustStorePath("change it") - .trustStorePasswordPath("change_it") - .keyStorePath("change it") - .keyStorePasswordPath("change_it") - .enableDmaapCertAuth(false) - .dmaapTopicName("/events/unauthenticated.CPE_AUTHENTICATION") - .timeoutMs(-1) - .messageLimit(-1) - .build(); + verify(subscriber, times(1)).getElements(any()); + verifyNoMoreInteractions(subscriber); } }
\ No newline at end of file diff --git a/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/tasks/DmaapPublisherTaskImplTest.java b/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/tasks/DmaapPublisherTaskImplTest.java index 436206d2..7c158489 100644 --- a/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/tasks/DmaapPublisherTaskImplTest.java +++ b/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/tasks/DmaapPublisherTaskImplTest.java @@ -21,7 +21,6 @@ package org.onap.bbs.event.processor.tasks; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -30,55 +29,52 @@ import static org.mockito.Mockito.when; import java.util.LinkedHashMap; import java.util.Map; -import java.util.Optional; - -import javax.net.ssl.SSLException; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.function.Executable; import org.onap.bbs.event.processor.config.ApplicationConfiguration; +import org.onap.bbs.event.processor.config.DmaapProducerProperties; import org.onap.bbs.event.processor.exceptions.DmaapException; import org.onap.bbs.event.processor.model.ControlLoopPublisherDmaapModel; import org.onap.bbs.event.processor.model.ImmutableControlLoopPublisherDmaapModel; -import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpResponse; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapPublisherConfiguration; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.producer.DMaaPPublisherReactiveHttpClient; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.producer.PublisherReactiveHttpClientFactory; -import org.springframework.http.HttpStatus; - -import reactor.core.publisher.Mono; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterPublisher; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterPublisherConfig; + +import reactor.core.publisher.Flux; import reactor.test.StepVerifier; class DmaapPublisherTaskImplTest { + private static final String DMAAP_PROTOCOL = "http"; + private static final String DMAAP_HOST = "message-router.onap.svc.cluster.local"; + private static final int DMAAP_PORT = 3904; + private static final String DMAAP_TOPIC = "unauthenticated.DCAE_CL_OUTPUT"; + private static ControlLoopPublisherDmaapModel controlLoopPublisherDmaapModel; private static DmaapPublisherTaskImpl task; - private static DMaaPPublisherReactiveHttpClient reactiveHttpClient; private static ApplicationConfiguration configuration; - private static DmaapPublisherConfiguration dmaapPublisherConfiguration; @BeforeAll static void setUp() { - dmaapPublisherConfiguration = testVersionOfDmaapPublisherConfiguration(); configuration = mock(ApplicationConfiguration.class); - final String closedLoopEventClient = "DCAE.BBS_mSInstance"; - final String policyVersion = "1.0.0.5"; - final String policyName = "CPE_Authentication"; - final String policyScope = + final var closedLoopEventClient = "DCAE.BBS_mSInstance"; + final var policyVersion = "1.0.0.5"; + final var policyName = "CPE_Authentication"; + final var policyScope = "service=HSIAService,type=SampleType," + "closedLoopControlName=CL-CPE_A-d925ed73-8231-4d02-9545-db4e101f88f8"; - final String targetType = "VM"; - final long closedLoopAlarmStart = 1484677482204798L; - final String closedLoopEventStatus = "ONSET"; - final String closedLoopControlName = "ControlLoop-CPE_A-2179b738-fd36-4843-a71a-a8c24c70c88b"; - final String version = "1.0.2"; - final String target = "vserver.vserver-name"; - final String requestId = "97964e10-686e-4790-8c45-bdfa61df770f"; - final String from = "DCAE"; + final var targetType = "VM"; + final var closedLoopAlarmStart = 1484677482204798L; + final var closedLoopEventStatus = "ONSET"; + final var closedLoopControlName = "ControlLoop-CPE_A-2179b738-fd36-4843-a71a-a8c24c70c88b"; + final var version = "1.0.2"; + final var target = "vserver.vserver-name"; + final var requestId = "97964e10-686e-4790-8c45-bdfa61df770f"; + final var from = "DCAE"; final Map<String, String> aaiEnrichmentData = new LinkedHashMap<>(); aaiEnrichmentData.put("service-information.service-instance-id", "service-instance-id-example"); @@ -100,77 +96,64 @@ class DmaapPublisherTaskImplTest { .requestId(requestId) .originator(from) .build(); + var props = mock(DmaapProducerProperties.class); + when(props.getDmaapProtocol()).thenReturn(DMAAP_PROTOCOL); + when(props.getDmaapHostName()).thenReturn(DMAAP_HOST); + when(props.getDmaapPortNumber()).thenReturn(DMAAP_PORT); + when(props.getDmaapTopicName()).thenReturn(DMAAP_TOPIC); + when(configuration.getDmaapProducerProperties()).thenReturn(props); + + var publisherConfig = mock(MessageRouterPublisherConfig.class); + when(configuration.getDmaapPublisherConfiguration()).thenReturn(publisherConfig); + - when(configuration.getDmaapPublisherConfiguration()).thenReturn(dmaapPublisherConfiguration); } @Test void passingNullMessage_ExceptionIsRaised() { - task = new DmaapPublisherTaskImpl(configuration); - Executable executableFunction = () -> task.execute(null); Assertions.assertThrows(DmaapException.class, executableFunction, "Input message is invalid"); } @Test - void passingNormalMessage_ReactiveClientProcessesIt() throws DmaapException, SSLException { - HttpResponse response = setupMocks(HttpStatus.OK.value()); - - StepVerifier.create(task.execute(controlLoopPublisherDmaapModel)).expectSubscription() - .expectNext(response).verifyComplete(); - - verify(reactiveHttpClient, times(1)) - .getDMaaPProducerResponse(controlLoopPublisherDmaapModel, Optional.empty()); - verifyNoMoreInteractions(reactiveHttpClient); - } + void passingNormalMessage_ReactiveClientProcessesIt() throws DmaapException { + var publisher = mock(MessageRouterPublisher.class); + task = new DmaapPublisherTaskImpl(configuration, publisher); - @Test - void passingNormalMessage_IncorrectResponseIsHandled() throws DmaapException, SSLException { - HttpResponse response = setupMocks(HttpStatus.UNAUTHORIZED.value()); + var response = mockResponse(true); + when(publisher.put(any(),any())).thenReturn(Flux.just(response)); - StepVerifier.create(task.execute(controlLoopPublisherDmaapModel)).expectSubscription() - .expectNext(response).verifyComplete(); + StepVerifier.create(task.execute(controlLoopPublisherDmaapModel)) + .expectSubscription() + .assertNext(r -> Assertions.assertTrue(r.successful())) + .verifyComplete(); - verify(reactiveHttpClient, times(1)) - .getDMaaPProducerResponse(controlLoopPublisherDmaapModel, Optional.empty()); - verifyNoMoreInteractions(reactiveHttpClient); + verify(publisher, times(1)).put(any(),any()); + verifyNoMoreInteractions(publisher); } - // We can safely suppress unchecked assignment warning here since it is a mock class - @SuppressWarnings("unchecked") - private HttpResponse setupMocks(Integer httpResponseCode) throws SSLException { - - HttpResponse response = mock(HttpResponse.class); - when(response.statusCode()).thenReturn(httpResponseCode); - - reactiveHttpClient = mock(DMaaPPublisherReactiveHttpClient.class); - when(reactiveHttpClient.getDMaaPProducerResponse(any(), any(Optional.class))) - .thenReturn(Mono.just(response)); + @Test + void passingNormalMessage_IncorrectResponseIsHandled() throws DmaapException { + var publisher = mock(MessageRouterPublisher.class); + task = new DmaapPublisherTaskImpl(configuration, publisher); - PublisherReactiveHttpClientFactory httpClientFactory = mock(PublisherReactiveHttpClientFactory.class); - doReturn(reactiveHttpClient).when(httpClientFactory).create(dmaapPublisherConfiguration); + var response = mockResponse(false); + when(publisher.put(any(),any())).thenReturn(Flux.just(response)); - task = new DmaapPublisherTaskImpl(configuration, httpClientFactory); + StepVerifier.create(task.execute(controlLoopPublisherDmaapModel)) + .expectSubscription() + .assertNext(r -> Assertions.assertFalse(r.successful())) + .verifyComplete(); - return response; + verify(publisher, times(1)).put(any(),any()); + verifyNoMoreInteractions(publisher); } - private static DmaapPublisherConfiguration testVersionOfDmaapPublisherConfiguration() { - return new ImmutableDmaapPublisherConfiguration.Builder() - .dmaapContentType("application/json") - .dmaapHostName("message-router.onap.svc.cluster.local") - .dmaapPortNumber(3904) - .dmaapProtocol("http") - .dmaapUserName("admin") - .dmaapUserPassword("admin") - .trustStorePath("/opt/app/bbs/local/org.onap.bbs.trust.jks") - .trustStorePasswordPath("change_it") - .keyStorePath("/opt/app/bbs/local/org.onap.bbs.p12") - .keyStorePasswordPath("change_it") - .enableDmaapCertAuth(false) - .dmaapTopicName("/events/unauthenticated.DCAE_CL_OUTPUT") - .build(); + private MessageRouterPublishResponse mockResponse(boolean isSuccess) { + var response = mock(MessageRouterPublishResponse.class); + when(response.successful()).thenReturn(isSuccess); + return response; } }
\ No newline at end of file diff --git a/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/tasks/DmaapReRegistrationConsumerTaskImplTest.java b/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/tasks/DmaapReRegistrationConsumerTaskImplTest.java index 72e28987..edbe511d 100644 --- a/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/tasks/DmaapReRegistrationConsumerTaskImplTest.java +++ b/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/tasks/DmaapReRegistrationConsumerTaskImplTest.java @@ -20,38 +20,39 @@ package org.onap.bbs.event.processor.tasks; -import static org.mockito.Mockito.doReturn; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; import com.google.gson.Gson; -import com.google.gson.JsonElement; - -import java.util.Optional; - -import javax.net.ssl.SSLException; import org.junit.Assert; -import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import org.onap.bbs.event.processor.config.ApplicationConfiguration; +import org.onap.bbs.event.processor.config.DmaapReRegistrationConsumerProperties; import org.onap.bbs.event.processor.exceptions.EmptyDmaapResponseException; import org.onap.bbs.event.processor.model.ImmutableReRegistrationConsumerDmaapModel; import org.onap.bbs.event.processor.model.ReRegistrationConsumerDmaapModel; import org.onap.bbs.event.processor.utilities.ReRegistrationDmaapConsumerJsonParser; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapConsumerConfiguration; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.ConsumerReactiveHttpClientFactory; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.DMaaPConsumerReactiveHttpClient; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterSubscriber; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterSubscriberConfig; -import reactor.core.publisher.Mono; +import reactor.core.publisher.Flux; import reactor.test.StepVerifier; class DmaapReRegistrationConsumerTaskImplTest { + private static final String DMAAP_PROTOCOL = "http"; + private static final String DMAAP_HOST = "message-router.onap.svc.cluster.local"; + private static final int DMAAP_PORT = 3904; + private static final String DMAAP_TOPIC = "unauthenticated.PNF_REREGISTRATION"; + private static final String SUBSCRIBER_ID = "subscriberID"; + private static final String SUBSCRIBER_GROUP = "subscriberGroup"; + private static final String RE_REGISTRATION_EVENT_TEMPLATE = "{\"event\": {" + "\"commonEventHeader\": { \"sourceName\":\"%s\"}," + "\"additionalFields\": {" @@ -63,31 +64,33 @@ class DmaapReRegistrationConsumerTaskImplTest { private static DmaapReRegistrationConsumerTaskImpl dmaapConsumerTask; private static ReRegistrationConsumerDmaapModel reRegistrationConsumerDmaapModel; - private static DMaaPConsumerReactiveHttpClient dMaaPConsumerReactiveHttpClient; private static String eventsArray; + private static MessageRouterSubscriber subscriber; private static Gson gson = new Gson(); + private static ApplicationConfiguration configuration; @BeforeAll - static void setUp() throws SSLException { - - final String sourceName = "PNF-CorrelationId"; - final String attachmentPoint = "olt2/2/2"; - final String remoteId = "remoteId"; - final String cvlan = "1005"; - final String svlan = "100"; + static void setUp() { // Mock Re-registration configuration - DmaapConsumerConfiguration dmaapConsumerConfiguration = testVersionOfDmaapConsumerConfiguration(); - ApplicationConfiguration configuration = mock(ApplicationConfiguration.class); - when(configuration.getDmaapReRegistrationConsumerConfiguration()).thenReturn(dmaapConsumerConfiguration); - - // Mock reactive DMaaP client - ConsumerReactiveHttpClientFactory httpClientFactory = mock(ConsumerReactiveHttpClientFactory.class); - dMaaPConsumerReactiveHttpClient = mock(DMaaPConsumerReactiveHttpClient.class); - doReturn(dMaaPConsumerReactiveHttpClient).when(httpClientFactory).create(dmaapConsumerConfiguration); - - dmaapConsumerTask = new DmaapReRegistrationConsumerTaskImpl(configuration, - new ReRegistrationDmaapConsumerJsonParser(), httpClientFactory); + configuration = mock(ApplicationConfiguration.class); + var props = mock(DmaapReRegistrationConsumerProperties.class); + when(props.getDmaapProtocol()).thenReturn(DMAAP_PROTOCOL); + when(props.getDmaapHostName()).thenReturn(DMAAP_HOST); + when(props.getDmaapPortNumber()).thenReturn(DMAAP_PORT); + when(props.getDmaapTopicName()).thenReturn(DMAAP_TOPIC); + when(props.getConsumerId()).thenReturn(SUBSCRIBER_ID); + when(props.getConsumerGroup()).thenReturn(SUBSCRIBER_GROUP); + when(configuration.getDmaapReRegistrationConsumerProperties()).thenReturn(props); + + var subscriberConfig = mock(MessageRouterSubscriberConfig.class); + when(configuration.getDmaapReRegistrationConsumerConfiguration()).thenReturn(subscriberConfig); + + var sourceName = "PNF-CorrelationId"; + var attachmentPoint = "olt2/2/2"; + var remoteId = "remoteId"; + var cvlan = "1005"; + var svlan = "100"; reRegistrationConsumerDmaapModel = ImmutableReRegistrationConsumerDmaapModel.builder() .correlationId(sourceName) @@ -97,58 +100,43 @@ class DmaapReRegistrationConsumerTaskImplTest { .sVlan(svlan) .build(); - String event = String.format(RE_REGISTRATION_EVENT_TEMPLATE, sourceName, attachmentPoint, remoteId, + var event = String.format(RE_REGISTRATION_EVENT_TEMPLATE, sourceName, attachmentPoint, remoteId, cvlan, svlan); eventsArray = "[" + event + "]"; } - @AfterEach - void resetMock() { - reset(dMaaPConsumerReactiveHttpClient); - } - @Test void passingEmptyMessage_NothingHappens() { - JsonElement empty = gson.toJsonTree(""); - when(dMaaPConsumerReactiveHttpClient.getDMaaPConsumerResponse(Optional.empty())).thenReturn(Mono.just(empty)); + var empty = gson.toJsonTree(""); + subscriber = mock(MessageRouterSubscriber.class); + when(subscriber.getElements(any())).thenReturn(Flux.just(empty)); + + dmaapConsumerTask = new DmaapReRegistrationConsumerTaskImpl(configuration, subscriber, + new ReRegistrationDmaapConsumerJsonParser()); StepVerifier.create(dmaapConsumerTask.execute("Sample input")) .expectSubscription() .expectError(EmptyDmaapResponseException.class); - verify(dMaaPConsumerReactiveHttpClient).getDMaaPConsumerResponse(Optional.empty()); + + verify(subscriber, times(1)).getElements(any()); + verifyNoMoreInteractions(subscriber); } @Test void passingNormalMessage_ResponseSucceeds() { - JsonElement normalEventsArray = gson.toJsonTree(eventsArray); - when(dMaaPConsumerReactiveHttpClient.getDMaaPConsumerResponse(Optional.empty())) - .thenReturn(Mono.just(normalEventsArray)); + System.out.println("Events sent : " + eventsArray); + var normalEventsArray = gson.toJsonTree(eventsArray); + subscriber = mock(MessageRouterSubscriber.class); + when(subscriber.getElements(any())).thenReturn(Flux.just(normalEventsArray)); + + dmaapConsumerTask = new DmaapReRegistrationConsumerTaskImpl(configuration, subscriber, + new ReRegistrationDmaapConsumerJsonParser()); StepVerifier.create(dmaapConsumerTask.execute("Sample input")) .expectSubscription() .consumeNextWith(e -> Assert.assertEquals(e, reRegistrationConsumerDmaapModel)); - verify(dMaaPConsumerReactiveHttpClient).getDMaaPConsumerResponse(Optional.empty()); - } - - private static DmaapConsumerConfiguration testVersionOfDmaapConsumerConfiguration() { - return new ImmutableDmaapConsumerConfiguration.Builder() - .consumerGroup("OpenDCAE-c12") - .consumerId("c12") - .dmaapContentType("application/json") - .dmaapHostName("message-router.onap.svc.cluster.local") - .dmaapPortNumber(3904) - .dmaapProtocol("http") - .dmaapUserName("admin") - .dmaapUserPassword("admin") - .trustStorePath("/opt/app/bbs/local/org.onap.bbs.trust.jks") - .trustStorePasswordPath("change_it") - .keyStorePath("/opt/app/bbs/local/org.onap.bbs.p12") - .keyStorePasswordPath("change_it") - .enableDmaapCertAuth(false) - .dmaapTopicName("/events/unauthenticated.PNF_REREGISTRATION") - .timeoutMs(-1) - .messageLimit(-1) - .build(); + verify(subscriber, times(1)).getElements(any()); + verifyNoMoreInteractions(subscriber); } }
\ No newline at end of file diff --git a/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/utilities/AaiReactiveClientTest.java b/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/utilities/AaiReactiveClientTest.java index 8e3c46ba..2876dd90 100644 --- a/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/utilities/AaiReactiveClientTest.java +++ b/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/utilities/AaiReactiveClientTest.java @@ -37,8 +37,6 @@ import java.util.Collections; import java.util.HashMap; import java.util.ServiceLoader; -import javax.net.ssl.SSLException; - import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeAll; @@ -74,13 +72,13 @@ class AaiReactiveClientTest { private static WireMockServer wireMockServer; @BeforeAll - static void init() throws SSLException { - GsonBuilder gsonBuilder = new GsonBuilder(); + static void init() { + var gsonBuilder = new GsonBuilder(); ServiceLoader.load(TypeAdapterFactory.class).forEach(gsonBuilder::registerTypeAdapterFactory); gson = gsonBuilder.create(); - ApplicationConfiguration configuration = Mockito.mock(ApplicationConfiguration.class); - AaiClientConfiguration aaiClientConfiguration = Mockito.mock(AaiClientConfiguration.class); + var configuration = Mockito.mock(ApplicationConfiguration.class); + var aaiClientConfiguration = Mockito.mock(AaiClientConfiguration.class); when(configuration.getAaiClientConfiguration()).thenReturn(aaiClientConfiguration); when(aaiClientConfiguration.aaiUserName()).thenReturn("AAI"); when(aaiClientConfiguration.aaiUserPassword()).thenReturn("AAI"); @@ -106,10 +104,10 @@ class AaiReactiveClientTest { @Test void sendingReactiveRequestForPnf_Succeeds() { - String pnfName = "pnf-1"; - String attachmentPoint = "olt1-1-1"; + var pnfName = "pnf-1"; + var attachmentPoint = "olt1-1-1"; - String pnfUrl = String.format("/aai/v14/network/pnfs/pnf/%s?depth=1", pnfName); + var pnfUrl = String.format("/aai/v14/network/pnfs/pnf/%s?depth=1", pnfName); // Build Relationship Data RelationshipListAaiObject.RelationshipEntryAaiObject firstRelationshipEntry = @@ -166,7 +164,7 @@ class AaiReactiveClientTest { .expectSubscription() .consumeNextWith(pnf -> { Assertions.assertEquals(pnfName, pnf.getPnfName(), "PNF Name in response does not match"); - String extractedAttachmentPoint = pnf.getRelationshipListAaiObject().getRelationshipEntries() + var extractedAttachmentPoint = pnf.getRelationshipListAaiObject().getRelationshipEntries() .stream() .filter(e -> e.getRelatedTo().equals("logical-link")) .flatMap(e -> e.getRelationshipData().stream()) @@ -182,10 +180,10 @@ class AaiReactiveClientTest { @Test void sendingReactiveRequestForServiceInstance_Succeeds() { - String serviceInstanceId = "84003b26-6b76-4c75-b805-7b14ab4ffaef"; - String orchestrationStatus = "active"; + var serviceInstanceId = "84003b26-6b76-4c75-b805-7b14ab4ffaef"; + var orchestrationStatus = "active"; - String serviceInstanceUrl = + var serviceInstanceUrl = String.format("/aai/v14/nodes/service-instances/service-instance/%s?format=resource_and_url", serviceInstanceId); @@ -237,10 +235,10 @@ class AaiReactiveClientTest { Assertions.assertEquals(serviceInstanceId, serviceInstance.getServiceInstanceId(), "Service Instance ID in response does not match"); - MetadataListAaiObject extractedMetadataListObject = + var extractedMetadataListObject = serviceInstance.getMetadataListAaiObject().orElseThrow(AaiReactiveClientTestException::new); - MetadataListAaiObject.MetadataEntryAaiObject extractedMetadataEntry = + var extractedMetadataEntry = extractedMetadataListObject.getMetadataEntries() .stream() .filter(m -> m.getMetaname().equals("cvlan")) diff --git a/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/utilities/CpeAuthenticationDmaapConsumerJsonParserTest.java b/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/utilities/CpeAuthenticationDmaapConsumerJsonParserTest.java index c7ad7935..d7970ae9 100644 --- a/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/utilities/CpeAuthenticationDmaapConsumerJsonParserTest.java +++ b/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/utilities/CpeAuthenticationDmaapConsumerJsonParserTest.java @@ -104,9 +104,8 @@ class CpeAuthenticationDmaapConsumerJsonParserTest { @Test void passingNonJson_getIllegalStateException() { - CpeAuthenticationDmaapConsumerJsonParser consumerJsonParser = - new CpeAuthenticationDmaapConsumerJsonParser(); - JsonReader jsonReader = new JsonReader(new StringReader("not JSON")); + var consumerJsonParser = new CpeAuthenticationDmaapConsumerJsonParser(); + var jsonReader = new JsonReader(new StringReader("not JSON")); jsonReader.setLenient(true); JsonElement notJson = jsonParser.parse(jsonReader).getAsJsonPrimitive(); @@ -118,8 +117,7 @@ class CpeAuthenticationDmaapConsumerJsonParserTest { @Test void passingNoEvents_EmptyFluxIsReturned() { - CpeAuthenticationDmaapConsumerJsonParser consumerJsonParser = - new CpeAuthenticationDmaapConsumerJsonParser(); + var consumerJsonParser = new CpeAuthenticationDmaapConsumerJsonParser(); StepVerifier.create(consumerJsonParser.extractModelFromDmaap(Mono.just(jsonParser.parse("[]")))) .expectSubscription() @@ -129,23 +127,22 @@ class CpeAuthenticationDmaapConsumerJsonParserTest { @Test void passingOneCorrectEvent_validationSucceeds() { - String sourceName = "PNF-CorrelationId"; - String oldAuthenticationState = "outOfService"; - String newAuthenticationState = "inService"; - String stateInterface = "stateInterface"; - String rgwMacAddress = "00:0a:95:8d:78:16"; - String swVersion = "1.2"; + var sourceName = "PNF-CorrelationId"; + var oldAuthenticationState = "outOfService"; + var newAuthenticationState = "inService"; + var stateInterface = "stateInterface"; + var rgwMacAddress = "00:0a:95:8d:78:16"; + var swVersion = "1.2"; - String event = String.format(CPE_AUTHENTICATION_EVENT_TEMPLATE, sourceName, oldAuthenticationState, + var event = String.format(CPE_AUTHENTICATION_EVENT_TEMPLATE, sourceName, oldAuthenticationState, newAuthenticationState, stateInterface, rgwMacAddress, swVersion); - CpeAuthenticationDmaapConsumerJsonParser consumerJsonParser = - spy(new CpeAuthenticationDmaapConsumerJsonParser()); - JsonElement jsonElement = jsonParser.parse(event); + var consumerJsonParser = spy(new CpeAuthenticationDmaapConsumerJsonParser()); + var jsonElement = jsonParser.parse(event); Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())) .when(consumerJsonParser).getJsonObjectFromAnArray(jsonElement); - String eventsArray = "[" + event + "]"; + var eventsArray = "[" + event + "]"; CpeAuthenticationConsumerDmaapModel expectedEventObject = ImmutableCpeAuthenticationConsumerDmaapModel.builder() .correlationId(sourceName) @@ -164,30 +161,29 @@ class CpeAuthenticationDmaapConsumerJsonParserTest { @Test void passingTwoCorrectEvents_validationSucceeds() { - String sourceName1 = "PNF-CorrelationId"; - String sourceName2 = "PNF-CorrelationId"; - String oldAuthenticationState = "outOfService"; - String newAuthenticationState = "inService"; - String stateInterface = "stateInterface"; - String rgwMacAddress1 = "00:0a:95:8d:78:16"; - String rgwMacAddress2 = "00:0a:95:8d:78:17"; - String swVersion = "1.2"; + var sourceName1 = "PNF-CorrelationId"; + var sourceName2 = "PNF-CorrelationId"; + var oldAuthenticationState = "outOfService"; + var newAuthenticationState = "inService"; + var stateInterface = "stateInterface"; + var rgwMacAddress1 = "00:0a:95:8d:78:16"; + var rgwMacAddress2 = "00:0a:95:8d:78:17"; + var swVersion = "1.2"; - String firstEvent = String.format(CPE_AUTHENTICATION_EVENT_TEMPLATE, sourceName1, oldAuthenticationState, + var firstEvent = String.format(CPE_AUTHENTICATION_EVENT_TEMPLATE, sourceName1, oldAuthenticationState, newAuthenticationState, stateInterface, rgwMacAddress1, swVersion); - String secondEvent = String.format(CPE_AUTHENTICATION_EVENT_TEMPLATE, sourceName2, oldAuthenticationState, + var secondEvent = String.format(CPE_AUTHENTICATION_EVENT_TEMPLATE, sourceName2, oldAuthenticationState, newAuthenticationState, stateInterface, rgwMacAddress2, swVersion); - CpeAuthenticationDmaapConsumerJsonParser consumerJsonParser = - spy(new CpeAuthenticationDmaapConsumerJsonParser()); - JsonElement jsonElement1 = jsonParser.parse(firstEvent); + var consumerJsonParser = spy(new CpeAuthenticationDmaapConsumerJsonParser()); + var jsonElement1 = jsonParser.parse(firstEvent); Mockito.doReturn(Optional.of(jsonElement1.getAsJsonObject())) .when(consumerJsonParser).getJsonObjectFromAnArray(jsonElement1); - JsonElement jsonElement2 = jsonParser.parse(secondEvent); + var jsonElement2 = jsonParser.parse(secondEvent); Mockito.doReturn(Optional.of(jsonElement2.getAsJsonObject())) .when(consumerJsonParser).getJsonObjectFromAnArray(jsonElement2); - String eventsArray = "[" + firstEvent + "," + secondEvent + "]"; + var eventsArray = "[" + firstEvent + "," + secondEvent + "]"; CpeAuthenticationConsumerDmaapModel expectedFirstEventObject = ImmutableCpeAuthenticationConsumerDmaapModel.builder() @@ -217,22 +213,21 @@ class CpeAuthenticationDmaapConsumerJsonParserTest { @Test void passingJsonWithMissingAuthenticationState_validationFails() { - String sourceName = "PNF-CorrelationId"; - String oldAuthenticationState = "outOfService"; - String stateInterface = "stateInterface"; - String rgwMacAddress = "00:0a:95:8d:78:16"; - String swVersion = "1.2"; + var sourceName = "PNF-CorrelationId"; + var oldAuthenticationState = "outOfService"; + var stateInterface = "stateInterface"; + var rgwMacAddress = "00:0a:95:8d:78:16"; + var swVersion = "1.2"; - String event = String.format(CPE_AUTHENTICATION_EVENT_TEMPLATE_WITH_MISSING_AUTHENTICATION_STATE, sourceName, + var event = String.format(CPE_AUTHENTICATION_EVENT_TEMPLATE_WITH_MISSING_AUTHENTICATION_STATE, sourceName, oldAuthenticationState, stateInterface, rgwMacAddress, swVersion); - CpeAuthenticationDmaapConsumerJsonParser consumerJsonParser = - spy(new CpeAuthenticationDmaapConsumerJsonParser()); - JsonElement jsonElement = jsonParser.parse(event); + var consumerJsonParser = spy(new CpeAuthenticationDmaapConsumerJsonParser()); + var jsonElement = jsonParser.parse(event); Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())) .when(consumerJsonParser).getJsonObjectFromAnArray(jsonElement); - String eventsArray = "[" + event + "]"; + var eventsArray = "[" + event + "]"; StepVerifier.create(consumerJsonParser.extractModelFromDmaap(Mono.just(jsonParser.parse(eventsArray)))) .expectSubscription() @@ -242,22 +237,21 @@ class CpeAuthenticationDmaapConsumerJsonParserTest { @Test void passingJsonWithMissingSourceName_validationFails() { - String oldAuthenticationState = "outOfService"; - String newAuthenticationState = "inService"; - String stateInterface = "stateInterface"; - String rgwMacAddress = "00:0a:95:8d:78:16"; - String swVersion = "1.2"; + var oldAuthenticationState = "outOfService"; + var newAuthenticationState = "inService"; + var stateInterface = "stateInterface"; + var rgwMacAddress = "00:0a:95:8d:78:16"; + var swVersion = "1.2"; - String event = String.format(CPE_AUTHENTICATION_EVENT_TEMPLATE_WITH_MISSING_SOURCE_NAME, + var event = String.format(CPE_AUTHENTICATION_EVENT_TEMPLATE_WITH_MISSING_SOURCE_NAME, oldAuthenticationState, newAuthenticationState, stateInterface, rgwMacAddress, swVersion); - CpeAuthenticationDmaapConsumerJsonParser consumerJsonParser = - spy(new CpeAuthenticationDmaapConsumerJsonParser()); - JsonElement jsonElement = jsonParser.parse(event); + var consumerJsonParser = spy(new CpeAuthenticationDmaapConsumerJsonParser()); + var jsonElement = jsonParser.parse(event); Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())) .when(consumerJsonParser).getJsonObjectFromAnArray(jsonElement); - String eventsArray = "[" + event + "]"; + var eventsArray = "[" + event + "]"; StepVerifier.create(consumerJsonParser.extractModelFromDmaap(Mono.just(jsonParser.parse(eventsArray)))) .expectSubscription() @@ -267,22 +261,21 @@ class CpeAuthenticationDmaapConsumerJsonParserTest { @Test void passingJsonWithMissingSourceNameValue_validationFails() { - String oldAuthenticationState = "outOfService"; - String newAuthenticationState = "inService"; - String stateInterface = "stateInterface"; - String rgwMacAddress = "00:0a:95:8d:78:16"; - String swVersion = "1.2"; + var oldAuthenticationState = "outOfService"; + var newAuthenticationState = "inService"; + var stateInterface = "stateInterface"; + var rgwMacAddress = "00:0a:95:8d:78:16"; + var swVersion = "1.2"; - String event = String.format(CPE_AUTHENTICATION_EVENT_TEMPLATE_WITH_MISSING_SOURCE_NAME_VALUE, + var event = String.format(CPE_AUTHENTICATION_EVENT_TEMPLATE_WITH_MISSING_SOURCE_NAME_VALUE, oldAuthenticationState, newAuthenticationState, stateInterface, rgwMacAddress, swVersion); - CpeAuthenticationDmaapConsumerJsonParser consumerJsonParser = - spy(new CpeAuthenticationDmaapConsumerJsonParser()); - JsonElement jsonElement = jsonParser.parse(event); + var consumerJsonParser = spy(new CpeAuthenticationDmaapConsumerJsonParser()); + var jsonElement = jsonParser.parse(event); Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())) .when(consumerJsonParser).getJsonObjectFromAnArray(jsonElement); - String eventsArray = "[" + event + "]"; + var eventsArray = "[" + event + "]"; StepVerifier.create(consumerJsonParser.extractModelFromDmaap(Mono.just(jsonParser.parse(eventsArray)))) .expectSubscription() @@ -292,22 +285,21 @@ class CpeAuthenticationDmaapConsumerJsonParserTest { @Test void passingJsonWithMissingStateChangeFieldsHeader_validationFails() { - String oldAuthenticationState = "outOfService"; - String newAuthenticationState = "inService"; - String stateInterface = "stateInterface"; - String rgwMacAddress = "00:0a:95:8d:78:16"; - String swVersion = "1.2"; + var oldAuthenticationState = "outOfService"; + var newAuthenticationState = "inService"; + var stateInterface = "stateInterface"; + var rgwMacAddress = "00:0a:95:8d:78:16"; + var swVersion = "1.2"; - String event = String.format(CPE_AUTHENTICATION_EVENT_TEMPLATE_WITH_MISSING_STATE_CHANGE_FIELDS, + var event = String.format(CPE_AUTHENTICATION_EVENT_TEMPLATE_WITH_MISSING_STATE_CHANGE_FIELDS, oldAuthenticationState, newAuthenticationState, stateInterface, rgwMacAddress, swVersion); - CpeAuthenticationDmaapConsumerJsonParser consumerJsonParser = - spy(new CpeAuthenticationDmaapConsumerJsonParser()); - JsonElement jsonElement = jsonParser.parse(event); + var consumerJsonParser = spy(new CpeAuthenticationDmaapConsumerJsonParser()); + var jsonElement = jsonParser.parse(event); Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())) .when(consumerJsonParser).getJsonObjectFromAnArray(jsonElement); - String eventsArray = "[" + event + "]"; + var eventsArray = "[" + event + "]"; StepVerifier.create(consumerJsonParser.extractModelFromDmaap(Mono.just(jsonParser.parse(eventsArray)))) .expectSubscription() diff --git a/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/utilities/ReRegistrationDmaapConsumerJsonParserTest.java b/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/utilities/ReRegistrationDmaapConsumerJsonParserTest.java index cd238e2d..6d78826b 100644 --- a/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/utilities/ReRegistrationDmaapConsumerJsonParserTest.java +++ b/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/utilities/ReRegistrationDmaapConsumerJsonParserTest.java @@ -93,8 +93,8 @@ class ReRegistrationDmaapConsumerJsonParserTest { @Test void passingNonJson_getIllegalStateException() { - ReRegistrationDmaapConsumerJsonParser consumerJsonParser = new ReRegistrationDmaapConsumerJsonParser(); - JsonReader jsonReader = new JsonReader(new StringReader("not JSON")); + var consumerJsonParser = new ReRegistrationDmaapConsumerJsonParser(); + var jsonReader = new JsonReader(new StringReader("not JSON")); jsonReader.setLenient(true); JsonElement notJson = jsonParser.parse(jsonReader).getAsJsonPrimitive(); StepVerifier.create(consumerJsonParser.extractModelFromDmaap(Mono.just(notJson))) @@ -105,7 +105,7 @@ class ReRegistrationDmaapConsumerJsonParserTest { @Test void passingNoEvents_EmptyFluxIsReturned() { - ReRegistrationDmaapConsumerJsonParser consumerJsonParser = new ReRegistrationDmaapConsumerJsonParser(); + var consumerJsonParser = new ReRegistrationDmaapConsumerJsonParser(); StepVerifier.create(consumerJsonParser.extractModelFromDmaap(Mono.just(jsonParser.parse("[]")))) .expectSubscription() .verifyComplete(); @@ -114,21 +114,21 @@ class ReRegistrationDmaapConsumerJsonParserTest { @Test void passingOneCorrectEvent_validationSucceeds() { - String correlationId = "PNF-CorrelationId"; - String attachmentPoint = "olt1/1/1"; - String remoteId = "remoteId"; - String cvlan = "1005"; - String svlan = "100"; + var correlationId = "PNF-CorrelationId"; + var attachmentPoint = "olt1/1/1"; + var remoteId = "remoteId"; + var cvlan = "1005"; + var svlan = "100"; - String event = String.format(RE_REGISTRATION_EVENT_TEMPLATE, correlationId, attachmentPoint, + var event = String.format(RE_REGISTRATION_EVENT_TEMPLATE, correlationId, attachmentPoint, remoteId, cvlan, svlan); - ReRegistrationDmaapConsumerJsonParser consumerJsonParser = spy(new ReRegistrationDmaapConsumerJsonParser()); - JsonElement jsonElement = jsonParser.parse(event); + var consumerJsonParser = spy(new ReRegistrationDmaapConsumerJsonParser()); + var jsonElement = jsonParser.parse(event); Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())) .when(consumerJsonParser).getJsonObjectFromAnArray(jsonElement); - String eventsArray = "[" + event + "]"; + var eventsArray = "[" + event + "]"; ReRegistrationConsumerDmaapModel expectedEventObject = ImmutableReRegistrationConsumerDmaapModel.builder() .correlationId(correlationId) @@ -146,29 +146,29 @@ class ReRegistrationDmaapConsumerJsonParserTest { @Test void passingTwoCorrectEvents_validationSucceeds() { - String correlationId1 = "PNF-CorrelationId1"; - String correlationId2 = "PNF-CorrelationId2"; - String attachmentPoint1 = "olt1/1/1"; - String attachmentPoint2 = "olt2/2/2"; - String remoteId1 = "remoteId1"; - String remoteId2 = "remoteId2"; - String cvlan = "1005"; - String svlan = "100"; + var correlationId1 = "PNF-CorrelationId1"; + var correlationId2 = "PNF-CorrelationId2"; + var attachmentPoint1 = "olt1/1/1"; + var attachmentPoint2 = "olt2/2/2"; + var remoteId1 = "remoteId1"; + var remoteId2 = "remoteId2"; + var cvlan = "1005"; + var svlan = "100"; - String firstEvent = String.format(RE_REGISTRATION_EVENT_TEMPLATE, correlationId1, attachmentPoint1, + var firstEvent = String.format(RE_REGISTRATION_EVENT_TEMPLATE, correlationId1, attachmentPoint1, remoteId1, cvlan, svlan); - String secondEvent = String.format(RE_REGISTRATION_EVENT_TEMPLATE, correlationId1, attachmentPoint1, + var secondEvent = String.format(RE_REGISTRATION_EVENT_TEMPLATE, correlationId1, attachmentPoint1, remoteId1, cvlan, svlan); - ReRegistrationDmaapConsumerJsonParser consumerJsonParser = spy(new ReRegistrationDmaapConsumerJsonParser()); - JsonElement jsonElement1 = jsonParser.parse(firstEvent); + var consumerJsonParser = spy(new ReRegistrationDmaapConsumerJsonParser()); + var jsonElement1 = jsonParser.parse(firstEvent); Mockito.doReturn(Optional.of(jsonElement1.getAsJsonObject())) .when(consumerJsonParser).getJsonObjectFromAnArray(jsonElement1); - JsonElement jsonElement2 = jsonParser.parse(secondEvent); + var jsonElement2 = jsonParser.parse(secondEvent); Mockito.doReturn(Optional.of(jsonElement2.getAsJsonObject())) .when(consumerJsonParser).getJsonObjectFromAnArray(jsonElement2); - String eventsArray = "[" + firstEvent + "," + secondEvent + "]"; + var eventsArray = "[" + firstEvent + "," + secondEvent + "]"; ReRegistrationConsumerDmaapModel expectedFirstEventObject = ImmutableReRegistrationConsumerDmaapModel.builder() .correlationId(correlationId1) @@ -194,23 +194,23 @@ class ReRegistrationDmaapConsumerJsonParserTest { @Test void passingJsonWithMissingAttachmentPoint_validationFails() { - String correlationId = "PNF-CorrelationId"; - String remoteId = "remoteId"; - String cvlan = "1005"; - String svlan = "100"; + var correlationId = "PNF-CorrelationId"; + var remoteId = "remoteId"; + var cvlan = "1005"; + var svlan = "100"; - String event = String.format(RE_REGISTRATION_EVENT_TEMPLATE_MISSING_ATTACHMENT_POINT, + var event = String.format(RE_REGISTRATION_EVENT_TEMPLATE_MISSING_ATTACHMENT_POINT, correlationId, remoteId, cvlan, svlan); - ReRegistrationDmaapConsumerJsonParser consumerJsonParser = spy(new ReRegistrationDmaapConsumerJsonParser()); - JsonElement jsonElement = jsonParser.parse(event); + var consumerJsonParser = spy(new ReRegistrationDmaapConsumerJsonParser()); + var jsonElement = jsonParser.parse(event); Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())) .when(consumerJsonParser).getJsonObjectFromAnArray(jsonElement); - String eventsArray = "[" + event + "]"; + var eventsArray = "[" + event + "]"; StepVerifier.create(consumerJsonParser.extractModelFromDmaap(Mono.just(jsonParser.parse(eventsArray)))) .expectSubscription() @@ -220,23 +220,23 @@ class ReRegistrationDmaapConsumerJsonParserTest { @Test void passingJsonWithMissingCorrelationId_validationFails() { - String attachmentPoint = "olt1/1/1"; - String remoteId = "remoteId"; - String cvlan = "1005"; - String svlan = "100"; + var attachmentPoint = "olt1/1/1"; + var remoteId = "remoteId"; + var cvlan = "1005"; + var svlan = "100"; - String event = String.format(RE_REGISTRATION_EVENT_TEMPLATE_MISSING_CORRELATION_ID, + var event = String.format(RE_REGISTRATION_EVENT_TEMPLATE_MISSING_CORRELATION_ID, attachmentPoint, remoteId, cvlan, svlan); - ReRegistrationDmaapConsumerJsonParser consumerJsonParser = spy(new ReRegistrationDmaapConsumerJsonParser()); - JsonElement jsonElement = jsonParser.parse(event); + var consumerJsonParser = spy(new ReRegistrationDmaapConsumerJsonParser()); + var jsonElement = jsonParser.parse(event); Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())) .when(consumerJsonParser).getJsonObjectFromAnArray(jsonElement); - String eventsArray = "[" + event + "]"; + var eventsArray = "[" + event + "]"; StepVerifier.create(consumerJsonParser.extractModelFromDmaap(Mono.just(jsonParser.parse(eventsArray)))) .expectSubscription() @@ -246,23 +246,23 @@ class ReRegistrationDmaapConsumerJsonParserTest { @Test void passingJsonWithMissingCorrelationIdValue_validationFails() { - String attachmentPoint = "olt1/1/1"; - String remoteId = "remoteId"; - String cvlan = "1005"; - String svlan = "100"; + var attachmentPoint = "olt1/1/1"; + var remoteId = "remoteId"; + var cvlan = "1005"; + var svlan = "100"; - String event = String.format(RE_REGISTRATION_EVENT_TEMPLATE_MISSING_CORRELATION_ID_VALUE, + var event = String.format(RE_REGISTRATION_EVENT_TEMPLATE_MISSING_CORRELATION_ID_VALUE, attachmentPoint, remoteId, cvlan, svlan); - ReRegistrationDmaapConsumerJsonParser consumerJsonParser = spy(new ReRegistrationDmaapConsumerJsonParser()); - JsonElement jsonElement = jsonParser.parse(event); + var consumerJsonParser = spy(new ReRegistrationDmaapConsumerJsonParser()); + var jsonElement = jsonParser.parse(event); Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())) .when(consumerJsonParser).getJsonObjectFromAnArray(jsonElement); - String eventsArray = "[" + event + "]"; + var eventsArray = "[" + event + "]"; StepVerifier.create(consumerJsonParser.extractModelFromDmaap(Mono.just(jsonParser.parse(eventsArray)))) .expectSubscription() @@ -272,25 +272,25 @@ class ReRegistrationDmaapConsumerJsonParserTest { @Test void passingJsonWithMissingAdditionalFields_validationFails() { - String correlationId = "PNF-CorrelationId"; - String attachmentPoint = "olt1/1/1"; - String remoteId = "remoteId"; - String cvlan = "1005"; - String svlan = "100"; + var correlationId = "PNF-CorrelationId"; + var attachmentPoint = "olt1/1/1"; + var remoteId = "remoteId"; + var cvlan = "1005"; + var svlan = "100"; - String event = String.format(RE_REGISTRATION_EVENT_TEMPLATE_MISSING_ADDITIONAL_FIELDS, + var event = String.format(RE_REGISTRATION_EVENT_TEMPLATE_MISSING_ADDITIONAL_FIELDS, correlationId, attachmentPoint, remoteId, cvlan, svlan); - ReRegistrationDmaapConsumerJsonParser consumerJsonParser = spy(new ReRegistrationDmaapConsumerJsonParser()); - JsonElement jsonElement = jsonParser.parse(event); + var consumerJsonParser = spy(new ReRegistrationDmaapConsumerJsonParser()); + var jsonElement = jsonParser.parse(event); Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())) .when(consumerJsonParser).getJsonObjectFromAnArray(jsonElement); - String eventsArray = "[" + event + "]"; + var eventsArray = "[" + event + "]"; StepVerifier.create(consumerJsonParser.extractModelFromDmaap(Mono.just(jsonParser.parse(eventsArray)))) .expectSubscription() diff --git a/components/bbs-event-processor/src/test/resources/logback-test.xml b/components/bbs-event-processor/src/test/resources/logback-test.xml index 0b93a431..dd7dcb76 100644 --- a/components/bbs-event-processor/src/test/resources/logback-test.xml +++ b/components/bbs-event-processor/src/test/resources/logback-test.xml @@ -20,5 +20,5 @@ <include resource="org/springframework/boot/logging/logback/base.xml" /> <root level="ERROR"/> <logger name="org.springframework" level="ERROR"/> - <logger name="org.onap" level="WARN"/> + <logger name="org.onap.bbs" level="WARN"/> </configuration>
\ No newline at end of file |