diff options
author | Stavros Kanarakis <stavros.kanarakis@nokia.com> | 2019-04-10 13:04:11 +0300 |
---|---|---|
committer | Stavros Kanarakis <stavros.kanarakis@nokia.com> | 2019-04-10 13:04:11 +0300 |
commit | 47201b60365a87b993a8a93d0b1c1bb03cbca1d6 (patch) | |
tree | bd316f291b7208c33dc28a183fb7f34721250d3c /components/bbs-event-processor/src | |
parent | 35cc15e04411008b2f8094bbd3876e7a2daed587 (diff) |
Support for HTTPS certificates-based communication with A&AI and DMaaP
Also, upgraded DCAE-SDK to the latest 1.1.4 version
Change-Id: Ica59ab3107d9c0bcbf4dbaacf5063d4ceb8ed4b9
Issue-ID: DCAEGEN2-1354
Signed-off-by: Stavros Kanarakis <stavros.kanarakis@nokia.com>
Diffstat (limited to 'components/bbs-event-processor/src')
21 files changed, 296 insertions, 174 deletions
diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/config/ApplicationConfiguration.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/config/ApplicationConfiguration.java index 981d9633..5022a693 100644 --- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/config/ApplicationConfiguration.java +++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/config/ApplicationConfiguration.java @@ -174,6 +174,13 @@ public class ApplicationConfiguration implements ConfigurationChangeObservable { synchronized (this) { cbsPollingInterval = newConfiguration.cbsPollingIntervalSec(); + securityProperties.setEnableAaiCertAuth(newConfiguration.enableAaiCertAuth()); + securityProperties.setEnableDmaapCertAuth(newConfiguration.enableDmaapCertAuth()); + securityProperties.setKeyStorePath(newConfiguration.keyStorePath()); + securityProperties.setKeyStorePasswordPath(newConfiguration.keyStorePasswordPath()); + securityProperties.setTrustStorePath(newConfiguration.trustStorePath()); + securityProperties.setTrustStorePasswordPath(newConfiguration.trustStorePasswordPath()); + GeneratedAppConfigObject.StreamsObject reRegObject = getStreamsObject(newConfiguration.streamSubscribesMap(), newConfiguration.reRegConfigKey(), "PNF Re-Registration"); @@ -181,6 +188,8 @@ public class ApplicationConfiguration implements ConfigurationChangeObservable { dmaapReRegistrationConsumerProperties.setDmaapHostName(topicUrlInfo.getHost()); dmaapReRegistrationConsumerProperties.setDmaapPortNumber(topicUrlInfo.getPort()); dmaapReRegistrationConsumerProperties.setDmaapProtocol(newConfiguration.dmaapProtocol()); + dmaapReRegistrationConsumerProperties.setDmaapUserName(reRegObject.aafUsername()); + dmaapReRegistrationConsumerProperties.setDmaapUserPassword(reRegObject.aafPassword()); dmaapReRegistrationConsumerProperties.setDmaapContentType(newConfiguration.dmaapContentType()); dmaapReRegistrationConsumerProperties.setDmaapTopicName(topicUrlInfo.getTopicName()); dmaapReRegistrationConsumerProperties.setConsumerId(newConfiguration.dmaapConsumerConsumerId()); @@ -196,6 +205,8 @@ public class ApplicationConfiguration implements ConfigurationChangeObservable { dmaapCpeAuthenticationConsumerProperties.setDmaapHostName(topicUrlInfo.getHost()); dmaapCpeAuthenticationConsumerProperties.setDmaapPortNumber(topicUrlInfo.getPort()); dmaapCpeAuthenticationConsumerProperties.setDmaapProtocol(newConfiguration.dmaapProtocol()); + dmaapCpeAuthenticationConsumerProperties.setDmaapUserName(cpeAuthObject.aafUsername()); + dmaapCpeAuthenticationConsumerProperties.setDmaapUserPassword(cpeAuthObject.aafPassword()); dmaapCpeAuthenticationConsumerProperties.setDmaapContentType(newConfiguration.dmaapContentType()); dmaapCpeAuthenticationConsumerProperties.setDmaapTopicName(topicUrlInfo.getTopicName()); dmaapCpeAuthenticationConsumerProperties.setConsumerId(newConfiguration.dmaapConsumerConsumerId()); @@ -211,6 +222,8 @@ public class ApplicationConfiguration implements ConfigurationChangeObservable { dmaapProducerProperties.setDmaapHostName(topicUrlInfo.getHost()); dmaapProducerProperties.setDmaapPortNumber(topicUrlInfo.getPort()); dmaapProducerProperties.setDmaapProtocol(newConfiguration.dmaapProtocol()); + dmaapProducerProperties.setDmaapUserName(closeLoopObject.aafUsername()); + dmaapProducerProperties.setDmaapUserPassword(closeLoopObject.aafPassword()); dmaapProducerProperties.setDmaapContentType(newConfiguration.dmaapContentType()); dmaapProducerProperties.setDmaapTopicName(topicUrlInfo.getTopicName()); constructDmaapProducerConfiguration(); @@ -361,7 +374,7 @@ public class ApplicationConfiguration implements ConfigurationChangeObservable { throw new ConfigurationParsingException("Wrong topic name structure"); } topicUrlInfo.setPort(Integer.valueOf(tokensAfterHost[0])); - topicUrlInfo.setTopicName("/events/" + tokensAfterHost[1]); + topicUrlInfo.setTopicName("events/" + tokensAfterHost[1]); return topicUrlInfo; } diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/config/ConsulConfigurationGateway.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/config/ConsulConfigurationGateway.java index 1d27fc0a..607b3b31 100644 --- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/config/ConsulConfigurationGateway.java +++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/config/ConsulConfigurationGateway.java @@ -39,6 +39,8 @@ import org.onap.bbs.event.processor.model.ImmutableDmaapInfo; import org.onap.bbs.event.processor.model.ImmutableGeneratedAppConfigObject; import org.onap.bbs.event.processor.model.ImmutableStreamsObject; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClientFactory; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsRequests; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.CbsRequest; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperties; import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext; import org.slf4j.Logger; @@ -123,12 +125,12 @@ public class ConsulConfigurationGateway { // Necessary properties from the environment (Consul host:port, service-name (hostname), CBS name) EnvProperties env = EnvProperties.fromEnvironment(); - + CbsRequest cbsRequest = CbsRequests.getConfiguration(diagnosticContext); // Create the client and use it to get the configuration cbsFetchPipeline = CbsClientFactory.createCbsClient(env) .doOnError(e -> LOGGER.warn("CBS Configuration fetch failed with error: {}", e)) .retry(e -> true) - .flatMapMany(cbsClient -> cbsClient.updates(diagnosticContext, initialDelay, period)) + .flatMapMany(cbsClient -> cbsClient.updates(cbsRequest, initialDelay, period)) .subscribe(this::parseConsulRetrievedConfiguration, this::handleErrors); } @@ -178,6 +180,13 @@ public class ConsulConfigurationGateway { final String loggingLevel = configObject.get("application.loggingLevel").getAsString(); + final String keyStorePath = configObject.get("application.ssl.keyStorePath").getAsString(); + final String keyStorePasswordPath = configObject.get("application.ssl.keyStorePasswordPath").getAsString(); + final String trustStorePath = configObject.get("application.ssl.trustStorePath").getAsString(); + final String trustStorePasswordPath = configObject.get("application.ssl.trustStorePasswordPath").getAsString(); + final boolean aaiEnableCertAuth = configObject.get("application.ssl.enableAaiCertAuth").getAsBoolean(); + final boolean dmaapEnableCertAuth = configObject.get("application.ssl.enableDmaapCertAuth").getAsBoolean(); + final JsonObject streamsPublishes = configObject.getAsJsonObject("streams_publishes"); final JsonObject streamsSubscribes = configObject.getAsJsonObject("streams_subscribes"); @@ -211,6 +220,12 @@ public class ConsulConfigurationGateway { .cpeAuthConfigKey(cpeAuthConfigKey) .closeLoopConfigKey(closeLoopConfigKey) .loggingLevel(loggingLevel) + .keyStorePath(keyStorePath) + .keyStorePasswordPath(keyStorePasswordPath) + .trustStorePath(trustStorePath) + .trustStorePasswordPath(trustStorePasswordPath) + .enableAaiCertAuth(aaiEnableCertAuth) + .enableDmaapCertAuth(dmaapEnableCertAuth) .streamSubscribesMap(parseStreamsObjects(streamsSubscribes)) .streamPublishesMap(parseStreamsObjects(streamsPublishes)) .build(); diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/model/GeneratedAppConfigObject.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/model/GeneratedAppConfigObject.java index 4fdb81be..41a8a34f 100644 --- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/model/GeneratedAppConfigObject.java +++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/model/GeneratedAppConfigObject.java @@ -123,6 +123,25 @@ public interface GeneratedAppConfigObject { @SerializedName(value = "application.loggingLevel", alternate = "application.loggingLevel") String loggingLevel(); + @SerializedName(value = "application.ssl.trustStorePath", alternate = "application.ssl.trustStorePath") + String trustStorePath(); + + @SerializedName(value = "application.ssl.trustStorePasswordPath", + alternate = "application.ssl.trustStorePasswordPath") + String trustStorePasswordPath(); + + @SerializedName(value = "application.ssl.keyStorePath", alternate = "application.ssl.keyStorePath") + String keyStorePath(); + + @SerializedName(value = "application.ssl.keyStorePasswordPath", alternate = "application.ssl.keyStorePasswordPath") + String keyStorePasswordPath(); + + @SerializedName(value = "application.ssl.enableAaiCertAuth", alternate = "application.ssl.enableAaiCertAuth") + boolean enableAaiCertAuth(); + + @SerializedName(value = "application.ssl.enableDmaapCertAuth", alternate = "application.ssl.enableDmaapCertAuth") + boolean enableDmaapCertAuth(); + @SerializedName(value = "streams_subscribes", alternate = "streams_subscribes") Map<String, StreamsObject> streamSubscribesMap(); diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/pipelines/CpeAuthenticationPipeline.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/pipelines/CpeAuthenticationPipeline.java index 711ab185..a30903bb 100644 --- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/pipelines/CpeAuthenticationPipeline.java +++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/pipelines/CpeAuthenticationPipeline.java @@ -49,11 +49,11 @@ import org.onap.bbs.event.processor.model.ServiceInstanceAaiObject; import org.onap.bbs.event.processor.tasks.AaiClientTask; import org.onap.bbs.event.processor.tasks.DmaapCpeAuthenticationConsumerTask; import org.onap.bbs.event.processor.tasks.DmaapPublisherTask; +import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpResponse; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.slf4j.MDC; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.http.ResponseEntity; import org.springframework.stereotype.Component; import org.springframework.util.StringUtils; @@ -99,7 +99,7 @@ public class CpeAuthenticationPipeline { LOGGER.trace("Reactive CPE Authentication pipeline subscribed - Execution started"); } - Flux<ResponseEntity<String>> executePipeline() { + Flux<HttpResponse> executePipeline() { return // Consume CPE Authentication from DMaaP consumeCpeAuthenticationFromDmaap() @@ -111,11 +111,11 @@ public class CpeAuthenticationPipeline { .flatMap(this::triggerPolicy); } - private void onSuccess(ResponseEntity<String> responseCode) { - MDC.put(RESPONSE_CODE, responseCode.getStatusCode().toString()); + private void onSuccess(HttpResponse responseCode) { + MDC.put(RESPONSE_CODE, String.valueOf(responseCode.statusCode())); LOGGER.info("CPE Authentication event successfully handled. " + "Publishing to DMaaP for Policy returned a status code of ({} {})", - responseCode.getStatusCode().value(), responseCode.getStatusCode().getReasonPhrase()); + responseCode.statusCode(), responseCode.statusReason()); MDC.remove(RESPONSE_CODE); } @@ -171,8 +171,10 @@ public class CpeAuthenticationPipeline { .doOnError(TimeoutException.class, e -> LOGGER.warn("Timed out waiting for A&AI response") ) - .doOnError(e -> LOGGER.error("Error while retrieving PNF: {}", - e.getMessage()) + .doOnError(e -> { + LOGGER.error("Error while retrieving PNF: {}", e.getMessage()); + LOGGER.debug("Error\n", e); + } ) .onErrorResume( e -> e instanceof Exception, @@ -214,8 +216,10 @@ public class CpeAuthenticationPipeline { .doOnError(TimeoutException.class, e -> LOGGER.warn("Timed out waiting for A&AI response") ) - .doOnError(e -> LOGGER.error("Error while retrieving HSI CFS Service instance: {}", - e.getMessage()) + .doOnError(e -> { + LOGGER.error("Error while retrieving HSI CFS Service instance: {}", e.getMessage()); + LOGGER.debug("Error\n", e); + } ) .onErrorResume( e -> e instanceof Exception, @@ -226,7 +230,7 @@ public class CpeAuthenticationPipeline { }); } - private Mono<ResponseEntity<String>> triggerPolicy(PipelineState state) { + private Mono<HttpResponse> triggerPolicy(PipelineState state) { if (state == null || state.getHsiCfsServiceInstance() == null) { return Mono.empty(); diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/pipelines/ReRegistrationPipeline.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/pipelines/ReRegistrationPipeline.java index 9a42ed21..33a9aea7 100644 --- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/pipelines/ReRegistrationPipeline.java +++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/pipelines/ReRegistrationPipeline.java @@ -48,11 +48,11 @@ import org.onap.bbs.event.processor.model.ServiceInstanceAaiObject; import org.onap.bbs.event.processor.tasks.AaiClientTask; import org.onap.bbs.event.processor.tasks.DmaapPublisherTask; import org.onap.bbs.event.processor.tasks.DmaapReRegistrationConsumerTask; +import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpResponse; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.slf4j.MDC; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.http.ResponseEntity; import org.springframework.stereotype.Component; import org.springframework.util.StringUtils; @@ -98,7 +98,7 @@ public class ReRegistrationPipeline { LOGGER.trace("Reactive PNF Re-registration pipeline subscribed - Execution started"); } - Flux<ResponseEntity<String>> executePipeline() { + Flux<HttpResponse> executePipeline() { return // Consume Re-Registration from DMaaP consumeReRegistrationsFromDmaap() @@ -110,11 +110,11 @@ public class ReRegistrationPipeline { .flatMap(this::triggerPolicy); } - private void onSuccess(ResponseEntity<String> responseCode) { - MDC.put(RESPONSE_CODE, responseCode.getStatusCode().toString()); + private void onSuccess(HttpResponse responseCode) { + MDC.put(RESPONSE_CODE, String.valueOf(responseCode.statusCode())); LOGGER.info("PNF Re-Registration event successfully handled. " + "Publishing to DMaaP for Policy returned a status code of ({} {})", - responseCode.getStatusCode().value(), responseCode.getStatusCode().getReasonPhrase()); + responseCode.statusCode(), responseCode.statusReason()); MDC.remove(RESPONSE_CODE); } @@ -170,8 +170,10 @@ public class ReRegistrationPipeline { .doOnError(TimeoutException.class, e -> LOGGER.warn("Timed out waiting for A&AI response") ) - .doOnError(e -> LOGGER.error("Error while retrieving PNF: {}", - e.getMessage()) + .doOnError(e -> { + LOGGER.error("Error while retrieving PNF: {}", e.getMessage()); + LOGGER.debug("Error\n", e); + } ) .onErrorResume( e -> e instanceof Exception, @@ -219,8 +221,10 @@ public class ReRegistrationPipeline { .doOnError(TimeoutException.class, e -> LOGGER.warn("Timed out waiting for A&AI response") ) - .doOnError(e -> LOGGER.error("Error while retrieving HSI CFS Service instance: {}", - e.getMessage()) + .doOnError(e -> { + LOGGER.error("Error while retrieving HSI CFS Service instance: {}", e.getMessage()); + LOGGER.debug("Error\n", e); + } ) .onErrorResume( e -> e instanceof Exception, @@ -259,7 +263,7 @@ public class ReRegistrationPipeline { return isNotRelocation; } - private Mono<ResponseEntity<String>> triggerPolicy(PipelineState state) { + private Mono<HttpResponse> triggerPolicy(PipelineState state) { if (state == null || state.getHsiCfsServiceInstance() == null) { return Mono.empty(); diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapCpeAuthenticationConsumerTaskImpl.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapCpeAuthenticationConsumerTaskImpl.java index e6bef523..da510281 100644 --- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapCpeAuthenticationConsumerTaskImpl.java +++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapCpeAuthenticationConsumerTaskImpl.java @@ -20,6 +20,10 @@ package org.onap.bbs.event.processor.tasks; +import com.google.gson.JsonElement; + +import java.util.Optional; + import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; import javax.net.ssl.SSLException; @@ -85,6 +89,8 @@ public class DmaapCpeAuthenticationConsumerTaskImpl public synchronized void updateConfiguration() { try { LOGGER.info("DMaaP CPE authentication consumer update due to new application configuration"); + LOGGER.info("Creating secure context with:\n {}", + this.configuration.getDmaapCpeAuthenticationConsumerConfiguration()); httpClient = httpClientFactory.create(this.configuration.getDmaapCpeAuthenticationConsumerConfiguration()); } catch (SSLException e) { LOGGER.error("SSL error while updating HTTP Client after a config update"); @@ -96,7 +102,7 @@ public class DmaapCpeAuthenticationConsumerTaskImpl public Flux<CpeAuthenticationConsumerDmaapModel> execute(String taskName) { LOGGER.debug("Executing task for CPE-Authentication with name \"{}\"", taskName); DMaaPConsumerReactiveHttpClient httpClient = getHttpClient(); - Mono<String> response = httpClient.getDMaaPConsumerResponse(); + Mono<JsonElement> response = httpClient.getDMaaPConsumerResponse(Optional.empty()); return cpeAuthenticationDmaapConsumerJsonParser.extractModelFromDmaap(response) .switchIfEmpty(Flux.error(EMPTY_DMAAP_EXCEPTION)) .doOnError(e -> { diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapPublisherTask.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapPublisherTask.java index bddd2ecc..749c4e53 100644 --- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapPublisherTask.java +++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapPublisherTask.java @@ -21,11 +21,11 @@ package org.onap.bbs.event.processor.tasks; import org.onap.bbs.event.processor.model.ControlLoopPublisherDmaapModel; -import org.springframework.http.ResponseEntity; +import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpResponse; import reactor.core.publisher.Mono; public interface DmaapPublisherTask { - Mono<ResponseEntity<String>> execute(ControlLoopPublisherDmaapModel controlLoopPublisherDmaapModel); + Mono<HttpResponse> execute(ControlLoopPublisherDmaapModel controlLoopPublisherDmaapModel); } diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapPublisherTaskImpl.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapPublisherTaskImpl.java index 7b227211..283e5ef9 100644 --- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapPublisherTaskImpl.java +++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapPublisherTaskImpl.java @@ -20,21 +20,24 @@ package org.onap.bbs.event.processor.tasks; +import java.util.Optional; + import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; +import javax.net.ssl.SSLException; import org.onap.bbs.event.processor.config.ApplicationConfiguration; import org.onap.bbs.event.processor.config.ConfigurationChangeObserver; import org.onap.bbs.event.processor.exceptions.DmaapException; import org.onap.bbs.event.processor.model.ControlLoopPublisherDmaapModel; import org.onap.bbs.event.processor.utilities.ControlLoopJsonBodyBuilder; +import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpResponse; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.producer.DMaaPPublisherReactiveHttpClient; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.producer.DmaaPRestTemplateFactory; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.producer.PublisherReactiveHttpClientFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.http.ResponseEntity; import org.springframework.stereotype.Component; import reactor.core.publisher.Mono; @@ -59,7 +62,12 @@ public class DmaapPublisherTaskImpl implements DmaapPublisherTask, Configuration this.configuration = configuration; this.httpClientFactory = httpClientFactory; - httpClient = httpClientFactory.create(this.configuration.getDmaapPublisherConfiguration()); + try { + httpClient = httpClientFactory.create(this.configuration.getDmaapPublisherConfiguration()); + } catch (SSLException e) { + LOGGER.error("SSL error while creating HTTP Client: {}", e.getMessage()); + LOGGER.debug("SSL exception\n", e); + } } @PostConstruct @@ -75,17 +83,24 @@ public class DmaapPublisherTaskImpl implements DmaapPublisherTask, Configuration @Override public synchronized void updateConfiguration() { LOGGER.info("DMaaP Publisher update due to new application configuration"); - httpClient = httpClientFactory.create(this.configuration.getDmaapPublisherConfiguration()); + try { + LOGGER.info("Creating secure context with:\n {}", this.configuration.getDmaapPublisherConfiguration()); + httpClient = httpClientFactory.create(this.configuration.getDmaapPublisherConfiguration()); + } catch (SSLException e) { + LOGGER.error("SSL error while updating HTTP Client after a config update: {}", e.getMessage()); + LOGGER.debug("SSL exception\n", e); + } } @Override - public Mono<ResponseEntity<String>> execute(ControlLoopPublisherDmaapModel controlLoopPublisherDmaapModel) { + public Mono<HttpResponse> execute(ControlLoopPublisherDmaapModel controlLoopPublisherDmaapModel) { if (controlLoopPublisherDmaapModel == null) { throw new DmaapException("Cannot invoke a DMaaP Publish task with a null message"); } - LOGGER.info("Executing task for publishing control loop message \n{}", controlLoopPublisherDmaapModel); + LOGGER.info("Executing task for publishing control loop message"); + LOGGER.debug("CL message \n{}", controlLoopPublisherDmaapModel); DMaaPPublisherReactiveHttpClient httpClient = getHttpClient(); - return httpClient.getDMaaPProducerResponse(controlLoopPublisherDmaapModel); + return httpClient.getDMaaPProducerResponse(controlLoopPublisherDmaapModel, Optional.empty()); } private synchronized DMaaPPublisherReactiveHttpClient getHttpClient() { diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapReRegistrationConsumerTaskImpl.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapReRegistrationConsumerTaskImpl.java index 92f5a86f..e40037b1 100644 --- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapReRegistrationConsumerTaskImpl.java +++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapReRegistrationConsumerTaskImpl.java @@ -20,6 +20,10 @@ package org.onap.bbs.event.processor.tasks; +import com.google.gson.JsonElement; + +import java.util.Optional; + import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; import javax.net.ssl.SSLException; @@ -85,6 +89,8 @@ public class DmaapReRegistrationConsumerTaskImpl implements DmaapReRegistrationC public synchronized void updateConfiguration() { try { LOGGER.info("DMaaP PNF reregistration consumer update due to new application configuration"); + LOGGER.info("Creating secure context with:\n {}", + this.configuration.getDmaapReRegistrationConsumerConfiguration()); httpClient = httpClientFactory.create(this.configuration.getDmaapReRegistrationConsumerConfiguration()); } catch (SSLException e) { LOGGER.error("SSL error while updating HTTP Client after a config update"); @@ -96,7 +102,7 @@ public class DmaapReRegistrationConsumerTaskImpl implements DmaapReRegistrationC public Flux<ReRegistrationConsumerDmaapModel> execute(String taskName) { LOGGER.debug("Executing task for Re-Registration with name \"{}\"", taskName); DMaaPConsumerReactiveHttpClient httpClient = getHttpClient(); - Mono<String> response = httpClient.getDMaaPConsumerResponse(); + Mono<JsonElement> response = httpClient.getDMaaPConsumerResponse(Optional.empty()); return reRegistrationDmaapConsumerJsonParser.extractModelFromDmaap(response) .switchIfEmpty(Flux.error(EMPTY_DMAAP_EXCEPTION)) .doOnError(e -> { diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/AaiReactiveClient.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/AaiReactiveClient.java index 19b81a80..84fc9f7d 100644 --- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/AaiReactiveClient.java +++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/AaiReactiveClient.java @@ -181,6 +181,7 @@ public class AaiReactiveClient implements ConfigurationChangeObserver { private SslContext createSslContext() throws SSLException { if (aaiClientConfiguration.enableAaiCertAuth()) { + LOGGER.info("Creating secure context with:\n {}", aaiClientConfiguration); return sslFactory.createSecureContext( aaiClientConfiguration.keyStorePath(), aaiClientConfiguration.keyStorePasswordPath(), diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/CpeAuthenticationDmaapConsumerJsonParser.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/CpeAuthenticationDmaapConsumerJsonParser.java index 2bb5d98a..3cff4e65 100644 --- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/CpeAuthenticationDmaapConsumerJsonParser.java +++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/CpeAuthenticationDmaapConsumerJsonParser.java @@ -37,7 +37,6 @@ import com.google.gson.Gson; import com.google.gson.JsonElement; import com.google.gson.JsonObject; import com.google.gson.JsonParser; -import com.google.gson.JsonSyntaxException; import java.util.Optional; import java.util.stream.StreamSupport; @@ -79,20 +78,9 @@ public class CpeAuthenticationDmaapConsumerJsonParser { * @param dmaapResponse Response from DMaaP * @return CPE Authentication Consumer DMaaP reactive model */ - public Flux<CpeAuthenticationConsumerDmaapModel> extractModelFromDmaap(Mono<String> dmaapResponse) { + public Flux<CpeAuthenticationConsumerDmaapModel> extractModelFromDmaap(Mono<JsonElement> dmaapResponse) { return dmaapResponse - .flatMapMany(this::parseToMono) - .flatMap(this::createTargetFlux); - } - - private Mono<JsonElement> parseToMono(String message) { - if (StringUtils.isEmpty(message)) { - LOGGER.warn("DMaaP response is empty"); - return Mono.empty(); - } - return Mono.fromCallable(() -> new JsonParser().parse(message)) - .doOnError(e -> e instanceof JsonSyntaxException, e -> LOGGER.error("Invalid JSON. Ignoring")) - .onErrorResume(e -> e instanceof JsonSyntaxException, e -> Mono.empty()); + .flatMapMany(this::createTargetFlux); } private Flux<CpeAuthenticationConsumerDmaapModel> createTargetFlux(JsonElement jsonElement) { diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/ReRegistrationDmaapConsumerJsonParser.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/ReRegistrationDmaapConsumerJsonParser.java index 947d7a7c..9fe0c277 100644 --- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/ReRegistrationDmaapConsumerJsonParser.java +++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/ReRegistrationDmaapConsumerJsonParser.java @@ -32,7 +32,6 @@ import com.google.gson.Gson; import com.google.gson.JsonElement; import com.google.gson.JsonObject; import com.google.gson.JsonParser; -import com.google.gson.JsonSyntaxException; import java.util.Optional; import java.util.stream.StreamSupport; @@ -72,20 +71,9 @@ public class ReRegistrationDmaapConsumerJsonParser { * @param dmaapResponse Response from DMaaP * @return Re-Registration Consumer DMaaP reactive model */ - public Flux<ReRegistrationConsumerDmaapModel> extractModelFromDmaap(Mono<String> dmaapResponse) { + public Flux<ReRegistrationConsumerDmaapModel> extractModelFromDmaap(Mono<JsonElement> dmaapResponse) { return dmaapResponse - .flatMapMany(this::parseToMono) - .flatMap(this::createTargetFlux); - } - - private Mono<JsonElement> parseToMono(String message) { - if (StringUtils.isEmpty(message)) { - LOGGER.warn("DMaaP response is empty"); - return Mono.empty(); - } - return Mono.fromCallable(() -> new JsonParser().parse(message)) - .doOnError(e -> e instanceof JsonSyntaxException, e -> LOGGER.error("Invalid JSON. Ignoring")) - .onErrorResume(e -> e instanceof JsonSyntaxException, e -> Mono.empty()); + .flatMapMany(this::createTargetFlux); } private Flux<ReRegistrationConsumerDmaapModel> createTargetFlux(JsonElement jsonElement) { diff --git a/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/config/ApplicationConfigurationTest.java b/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/config/ApplicationConfigurationTest.java index 220466bc..69fbb3f0 100644 --- a/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/config/ApplicationConfigurationTest.java +++ b/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/config/ApplicationConfigurationTest.java @@ -91,8 +91,8 @@ import org.springframework.test.context.TestPropertySource; "configs.security.trustStorePasswordPath=test trust store password path", "configs.security.keyStorePath=test key store path", "configs.security.keyStorePasswordPath=test key store password path", - "configs.security.enableDmaapCertAuth=true", - "configs.security.enableAaiCertAuth=true", + "configs.security.enableDmaapCertAuth=false", + "configs.security.enableAaiCertAuth=false", "configs.application.pipelinesPollingIntervalSec=30", "configs.application.pipelinesTimeoutSec=15", "configs.application.policyVersion=1.0.0", @@ -208,6 +208,17 @@ class ApplicationConfigurationTest { () -> assertEquals("reRegControlName", configuration.getReRegistrationCloseLoopControlName()), () -> assertEquals("cpeAuthControlName", configuration.getCpeAuthenticationCloseLoopControlName()) ); + + assertAll("Security Application Properties", + () -> assertFalse(aaiClientConfiguration.enableAaiCertAuth()), + () -> assertFalse(dmaapConsumerReRegistrationConfig.enableDmaapCertAuth()), + () -> assertEquals("test key store path", aaiClientConfiguration.keyStorePath()), + () -> assertEquals("test key store password path", + aaiClientConfiguration.keyStorePasswordPath()), + () -> assertEquals("test trust store path", aaiClientConfiguration.trustStorePath()), + () -> assertEquals("test trust store password path", + aaiClientConfiguration.trustStorePasswordPath()) + ); } @Test @@ -287,6 +298,12 @@ class ApplicationConfigurationTest { .cpeAuthConfigKey("config_key_2") .closeLoopConfigKey("config_key_3") .loggingLevel("TRACE") + .keyStorePath("test key store path - update") + .keyStorePasswordPath("test key store password path - update") + .trustStorePath("test trust store path - update") + .trustStorePasswordPath("test trust store password path - update") + .enableAaiCertAuth(true) + .enableDmaapCertAuth(true) .streamSubscribesMap(subscribes) .streamPublishesMap(Collections.singletonMap("config_key_3", streamsObject3)) .build(); @@ -315,11 +332,11 @@ class ApplicationConfigurationTest { assertAll("DMaaP Consumer Re-Registration Configuration Properties", () -> assertEquals("we-are-message-router1.us", dmaapConsumerReRegistrationConfig.dmaapHostName()), () -> assertEquals(Integer.valueOf(3901), dmaapConsumerReRegistrationConfig.dmaapPortNumber()), - () -> assertEquals("/events/unauthenticated.PNF_UPDATE", + () -> assertEquals("events/unauthenticated.PNF_UPDATE", dmaapConsumerReRegistrationConfig.dmaapTopicName()), () -> assertEquals("https", dmaapConsumerReRegistrationConfig.dmaapProtocol()), - () -> assertEquals("", dmaapConsumerReRegistrationConfig.dmaapUserName()), - () -> assertEquals("", dmaapConsumerReRegistrationConfig.dmaapUserPassword()), + () -> assertEquals("some-user", dmaapConsumerReRegistrationConfig.dmaapUserName()), + () -> assertEquals("some-password", dmaapConsumerReRegistrationConfig.dmaapUserPassword()), () -> assertEquals("application/json", dmaapConsumerReRegistrationConfig.dmaapContentType()), () -> assertEquals("c13", dmaapConsumerReRegistrationConfig.consumerId()), () -> assertEquals("OpenDcae-c13", dmaapConsumerReRegistrationConfig.consumerGroup()), @@ -332,11 +349,11 @@ class ApplicationConfigurationTest { assertAll("DMaaP Consumer CPE Authentication Configuration Properties", () -> assertEquals("we-are-message-router2.us", dmaapConsumerCpeAuthenticationConfig.dmaapHostName()), () -> assertEquals(Integer.valueOf(3902), dmaapConsumerCpeAuthenticationConfig.dmaapPortNumber()), - () -> assertEquals("/events/unauthenticated.CPE_AUTHENTICATION", + () -> assertEquals("events/unauthenticated.CPE_AUTHENTICATION", dmaapConsumerCpeAuthenticationConfig.dmaapTopicName()), () -> assertEquals("https", dmaapConsumerCpeAuthenticationConfig.dmaapProtocol()), - () -> assertEquals("", dmaapConsumerCpeAuthenticationConfig.dmaapUserName()), - () -> assertEquals("", dmaapConsumerCpeAuthenticationConfig.dmaapUserPassword()), + () -> assertEquals("some-user", dmaapConsumerCpeAuthenticationConfig.dmaapUserName()), + () -> assertEquals("some-password", dmaapConsumerCpeAuthenticationConfig.dmaapUserPassword()), () -> assertEquals("application/json", dmaapConsumerCpeAuthenticationConfig.dmaapContentType()), () -> assertEquals("c13", dmaapConsumerCpeAuthenticationConfig.consumerId()), () -> assertEquals("OpenDcae-c13", dmaapConsumerCpeAuthenticationConfig.consumerGroup()), @@ -348,11 +365,11 @@ class ApplicationConfigurationTest { assertAll("DMaaP Publisher Configuration Properties", () -> assertEquals("we-are-message-router3.us", dmaapPublisherConfiguration.dmaapHostName()), () -> assertEquals(Integer.valueOf(3903), dmaapPublisherConfiguration.dmaapPortNumber()), - () -> assertEquals("/events/unauthenticated.DCAE_CL_OUTPUT", + () -> assertEquals("events/unauthenticated.DCAE_CL_OUTPUT", dmaapPublisherConfiguration.dmaapTopicName()), () -> assertEquals("https", dmaapPublisherConfiguration.dmaapProtocol()), - () -> assertEquals("", dmaapPublisherConfiguration.dmaapUserName()), - () -> assertEquals("", dmaapPublisherConfiguration.dmaapUserPassword()), + () -> assertEquals("some-user", dmaapPublisherConfiguration.dmaapUserName()), + () -> assertEquals("some-password", dmaapPublisherConfiguration.dmaapUserPassword()), () -> assertEquals("application/json", dmaapPublisherConfiguration.dmaapContentType()) ); @@ -371,5 +388,16 @@ class ApplicationConfigurationTest { () -> assertEquals("controlName-update", configuration.getReRegistrationCloseLoopControlName()), () -> assertEquals("controlName-update", configuration.getCpeAuthenticationCloseLoopControlName()) ); + + assertAll("Security Application Properties", + () -> assertTrue(aaiClientConfiguration.enableAaiCertAuth()), + () -> assertTrue(dmaapConsumerReRegistrationConfig.enableDmaapCertAuth()), + () -> assertEquals("test key store path - update", aaiClientConfiguration.keyStorePath()), + () -> assertEquals("test key store password path - update", + aaiClientConfiguration.keyStorePasswordPath()), + () -> assertEquals("test trust store path - update", aaiClientConfiguration.trustStorePath()), + () -> assertEquals("test trust store password path - update", + aaiClientConfiguration.trustStorePasswordPath()) + ); } }
\ No newline at end of file diff --git a/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/config/ConsulConfigurationGatewayTest.java b/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/config/ConsulConfigurationGatewayTest.java index 9f5ce6dc..1acf864d 100644 --- a/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/config/ConsulConfigurationGatewayTest.java +++ b/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/config/ConsulConfigurationGatewayTest.java @@ -84,6 +84,12 @@ class ConsulConfigurationGatewayTest { + "\"application.cpeAuth.configKey\": \"config_key_1\"," + "\"application.closeLoop.configKey\": \"config_key_3\"," + "\"application.loggingLevel\": \"TRACE\"," + + "\"application.ssl.keyStorePath\": \"/opt/app/bbs-event-processor/etc/cert/key.p12\"," + + "\"application.ssl.keyStorePasswordPath\": \"/opt/app/bbs-event-processor/etc/cert/key.pass\"," + + "\"application.ssl.trustStorePath\": \"/opt/app/bbs-event-processor/etc/cert/trust.jks\"," + + "\"application.ssl.trustStorePasswordPath\": \"/opt/app/bbs-event-processor/etc/cert/trust.pass\"," + + "\"application.ssl.enableAaiCertAuth\": true," + + "\"application.ssl.enableDmaapCertAuth\": true," + "\"streams_subscribes\": {" + "\"config_key_1\": {" + "\"type\": \"message_router\"," @@ -203,6 +209,12 @@ class ConsulConfigurationGatewayTest { .cpeAuthConfigKey("config_key_1") .closeLoopConfigKey("config_key_3") .loggingLevel("TRACE") + .keyStorePath("/opt/app/bbs-event-processor/etc/cert/key.p12") + .keyStorePasswordPath("/opt/app/bbs-event-processor/etc/cert/key.pass") + .trustStorePath("/opt/app/bbs-event-processor/etc/cert/trust.jks") + .trustStorePasswordPath("/opt/app/bbs-event-processor/etc/cert/trust.pass") + .enableAaiCertAuth(true) + .enableDmaapCertAuth(true) .streamSubscribesMap(subscribes) .streamPublishesMap(Collections.singletonMap("config_key_3", streamsObject3)) .build(); diff --git a/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/pipelines/CpeAuthenticationPipelineTest.java b/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/pipelines/CpeAuthenticationPipelineTest.java index 76d9659c..c4bef9df 100644 --- a/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/pipelines/CpeAuthenticationPipelineTest.java +++ b/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/pipelines/CpeAuthenticationPipelineTest.java @@ -64,15 +64,13 @@ import org.onap.bbs.event.processor.model.ServiceInstanceAaiObject; import org.onap.bbs.event.processor.tasks.AaiClientTask; import org.onap.bbs.event.processor.tasks.DmaapCpeAuthenticationConsumerTask; import org.onap.bbs.event.processor.tasks.DmaapPublisherTask; +import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpResponse; import org.springframework.http.HttpStatus; -import org.springframework.http.ResponseEntity; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.test.StepVerifier; -// We can safely suppress unchecked assignment warnings for the ResponseEntity mock -@SuppressWarnings("unchecked") @DisplayName("CPE Authentication Pipeline Unit-Tests") class CpeAuthenticationPipelineTest { @@ -82,12 +80,12 @@ class CpeAuthenticationPipelineTest { private DmaapPublisherTask publisherTask; private AaiClientTask aaiClientTask; - private ResponseEntity<String> responseEntity; + private HttpResponse httpResponse; @BeforeEach void setup() { - responseEntity = Mockito.mock(ResponseEntity.class); + httpResponse = Mockito.mock(HttpResponse.class); configuration = Mockito.mock(ApplicationConfiguration.class); consumerTask = Mockito.mock(DmaapCpeAuthenticationConsumerTask.class); @@ -268,13 +266,13 @@ class CpeAuthenticationPipelineTest { .executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, cfsUrl)) .thenReturn(Mono.just(hsiCfsServiceInstance)); - when(responseEntity.getStatusCode()).thenReturn(HttpStatus.valueOf(HttpStatus.OK.value())); - when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(responseEntity)); + when(httpResponse.statusCode()).thenReturn(HttpStatus.OK.value()); + when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(httpResponse)); // Execute the pipeline StepVerifier.create(pipeline.executePipeline()) .expectSubscription() - .assertNext(r -> assertEquals(HttpStatus.OK, r.getStatusCode())) + .assertNext(r -> assertEquals(HttpStatus.OK.value(), r.statusCode())) .verifyComplete(); verify(publisherTask).execute(any(ControlLoopPublisherDmaapModel.class)); @@ -341,14 +339,14 @@ class CpeAuthenticationPipelineTest { .executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, cfsUrl2)) .thenReturn(Mono.just(hsiCfsServiceInstance2)); - when(responseEntity.getStatusCode()).thenReturn(HttpStatus.valueOf(HttpStatus.OK.value())); - when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(responseEntity)); + when(httpResponse.statusCode()).thenReturn(HttpStatus.OK.value()); + when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(httpResponse)); // Execute the pipeline StepVerifier.create(pipeline.executePipeline()) .expectSubscription() - .assertNext(r -> assertEquals(HttpStatus.OK, r.getStatusCode())) - .assertNext(r -> assertEquals(HttpStatus.OK, r.getStatusCode())) + .assertNext(r -> assertEquals(HttpStatus.OK.value(), r.statusCode())) + .assertNext(r -> assertEquals(HttpStatus.OK.value(), r.statusCode())) .verifyComplete(); verify(publisherTask, times(2)).execute(any(ControlLoopPublisherDmaapModel.class)); @@ -452,13 +450,13 @@ class CpeAuthenticationPipelineTest { .executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, cfsUrl2)) .thenReturn(Mono.just(hsiCfsServiceInstance2)); - when(responseEntity.getStatusCode()).thenReturn(HttpStatus.valueOf(HttpStatus.OK.value())); - when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(responseEntity)); + when(httpResponse.statusCode()).thenReturn(HttpStatus.OK.value()); + when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(httpResponse)); // Execute the pipeline StepVerifier.create(pipeline.executePipeline()) .expectSubscription() - .assertNext(r -> assertEquals(HttpStatus.OK, r.getStatusCode())) + .assertNext(r -> assertEquals(HttpStatus.OK.value(), r.statusCode())) .verifyComplete(); verify(publisherTask).execute(any(ControlLoopPublisherDmaapModel.class)); @@ -512,13 +510,13 @@ class CpeAuthenticationPipelineTest { .executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, cfsUrl)) .thenReturn(Mono.just(hsiCfsServiceInstance)); - when(responseEntity.getStatusCode()).thenReturn(HttpStatus.valueOf(HttpStatus.OK.value())); - when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(responseEntity)); + when(httpResponse.statusCode()).thenReturn(HttpStatus.OK.value()); + when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(httpResponse)); // Execute the pipeline StepVerifier.create(pipeline.executePipeline()) .expectSubscription() - .assertNext(r -> assertEquals(HttpStatus.OK, r.getStatusCode())) + .assertNext(r -> assertEquals(HttpStatus.OK.value(), r.statusCode())) .verifyComplete(); verify(aaiClientTask, times(2)).executePnfRetrieval(anyString(), anyString()); @@ -574,13 +572,13 @@ class CpeAuthenticationPipelineTest { .executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, cfsUrl)) .thenReturn(Mono.just(hsiCfsServiceInstance)); - when(responseEntity.getStatusCode()).thenReturn(HttpStatus.valueOf(HttpStatus.OK.value())); - when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(responseEntity)); + when(httpResponse.statusCode()).thenReturn(HttpStatus.OK.value()); + when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(httpResponse)); // Execute the pipeline StepVerifier.create(pipeline.executePipeline()) .expectSubscription() - .assertNext(r -> assertEquals(HttpStatus.OK, r.getStatusCode())) + .assertNext(r -> assertEquals(HttpStatus.OK.value(), r.statusCode())) .verifyComplete(); verify(aaiClientTask, times(2)) diff --git a/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/pipelines/ReRegistrationPipelineTest.java b/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/pipelines/ReRegistrationPipelineTest.java index a1b6b148..9453db3d 100644 --- a/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/pipelines/ReRegistrationPipelineTest.java +++ b/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/pipelines/ReRegistrationPipelineTest.java @@ -64,15 +64,13 @@ import org.onap.bbs.event.processor.model.ServiceInstanceAaiObject; import org.onap.bbs.event.processor.tasks.AaiClientTask; import org.onap.bbs.event.processor.tasks.DmaapPublisherTask; import org.onap.bbs.event.processor.tasks.DmaapReRegistrationConsumerTask; +import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpResponse; import org.springframework.http.HttpStatus; -import org.springframework.http.ResponseEntity; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.test.StepVerifier; -// We can safely suppress unchecked assignment warnings for the ResponseEntity mock -@SuppressWarnings("unchecked") @DisplayName("PNF Re-registration Pipeline Unit-Tests") class ReRegistrationPipelineTest { @@ -82,12 +80,12 @@ class ReRegistrationPipelineTest { private DmaapPublisherTask publisherTask; private AaiClientTask aaiClientTask; - private ResponseEntity<String> responseEntity; + private HttpResponse httpResponse; @BeforeEach void setup() { - responseEntity = Mockito.mock(ResponseEntity.class); + httpResponse = Mockito.mock(HttpResponse.class); configuration = Mockito.mock(ApplicationConfiguration.class); consumerTask = Mockito.mock(DmaapReRegistrationConsumerTask.class); @@ -262,8 +260,8 @@ class ReRegistrationPipelineTest { .executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, cfsUrl)) .thenReturn(Mono.just(hsiCfsServiceInstance)); - when(responseEntity.getStatusCode()).thenReturn(HttpStatus.valueOf(HttpStatus.OK.value())); - when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(responseEntity)); + when(httpResponse.statusCode()).thenReturn(HttpStatus.OK.value()); + when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(httpResponse)); // Execute the pipeline StepVerifier.create(pipeline.executePipeline()) @@ -312,13 +310,13 @@ class ReRegistrationPipelineTest { .executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, cfsUrl)) .thenReturn(Mono.just(hsiCfsServiceInstance)); - when(responseEntity.getStatusCode()).thenReturn(HttpStatus.valueOf(HttpStatus.OK.value())); - when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(responseEntity)); + when(httpResponse.statusCode()).thenReturn(HttpStatus.OK.value()); + when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(httpResponse)); // Execute the pipeline StepVerifier.create(pipeline.executePipeline()) .expectSubscription() - .assertNext(r -> assertEquals(HttpStatus.OK, r.getStatusCode())) + .assertNext(r -> assertEquals(HttpStatus.OK.value(), r.statusCode())) .verifyComplete(); verify(publisherTask).execute(any(ControlLoopPublisherDmaapModel.class)); @@ -384,14 +382,14 @@ class ReRegistrationPipelineTest { .executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, cfsUrl2)) .thenReturn(Mono.just(hsiCfsServiceInstance2)); - when(responseEntity.getStatusCode()).thenReturn(HttpStatus.valueOf(HttpStatus.OK.value())); - when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(responseEntity)); + when(httpResponse.statusCode()).thenReturn(HttpStatus.OK.value()); + when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(httpResponse)); // Execute the pipeline StepVerifier.create(pipeline.executePipeline()) .expectSubscription() - .assertNext(r -> assertEquals(HttpStatus.OK, r.getStatusCode())) - .assertNext(r -> assertEquals(HttpStatus.OK, r.getStatusCode())) + .assertNext(r -> assertEquals(HttpStatus.OK.value(), r.statusCode())) + .assertNext(r -> assertEquals(HttpStatus.OK.value(), r.statusCode())) .verifyComplete(); verify(publisherTask, times(2)).execute(any(ControlLoopPublisherDmaapModel.class)); @@ -491,13 +489,13 @@ class ReRegistrationPipelineTest { .executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, cfsUrl2)) .thenReturn(Mono.just(hsiCfsServiceInstance2)); - when(responseEntity.getStatusCode()).thenReturn(HttpStatus.valueOf(HttpStatus.OK.value())); - when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(responseEntity)); + when(httpResponse.statusCode()).thenReturn(HttpStatus.OK.value()); + when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(httpResponse)); // Execute the pipeline StepVerifier.create(pipeline.executePipeline()) .expectSubscription() - .assertNext(r -> assertEquals(HttpStatus.OK, r.getStatusCode())) + .assertNext(r -> assertEquals(HttpStatus.OK.value(), r.statusCode())) .verifyComplete(); verify(publisherTask).execute(any(ControlLoopPublisherDmaapModel.class)); @@ -551,13 +549,13 @@ class ReRegistrationPipelineTest { .executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, cfsUrl)) .thenReturn(Mono.just(hsiCfsServiceInstance)); - when(responseEntity.getStatusCode()).thenReturn(HttpStatus.valueOf(HttpStatus.OK.value())); - when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(responseEntity)); + when(httpResponse.statusCode()).thenReturn(HttpStatus.OK.value()); + when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(httpResponse)); // Execute the pipeline StepVerifier.create(pipeline.executePipeline()) .expectSubscription() - .assertNext(r -> assertEquals(HttpStatus.OK, r.getStatusCode())) + .assertNext(r -> assertEquals(HttpStatus.OK.value(), r.statusCode())) .verifyComplete(); verify(aaiClientTask, times(2)).executePnfRetrieval(anyString(), anyString()); @@ -613,13 +611,13 @@ class ReRegistrationPipelineTest { .executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, cfsUrl)) .thenReturn(Mono.just(hsiCfsServiceInstance)); - when(responseEntity.getStatusCode()).thenReturn(HttpStatus.valueOf(HttpStatus.OK.value())); - when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(responseEntity)); + when(httpResponse.statusCode()).thenReturn(HttpStatus.OK.value()); + when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(httpResponse)); // Execute the pipeline StepVerifier.create(pipeline.executePipeline()) .expectSubscription() - .assertNext(r -> assertEquals(HttpStatus.OK, r.getStatusCode())) + .assertNext(r -> assertEquals(HttpStatus.OK.value(), r.statusCode())) .verifyComplete(); verify(aaiClientTask, times(2)) diff --git a/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/tasks/DmaapCpeAuthenticationConsumerTaskImplTest.java b/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/tasks/DmaapCpeAuthenticationConsumerTaskImplTest.java index 538ff1de..40bcb65d 100644 --- a/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/tasks/DmaapCpeAuthenticationConsumerTaskImplTest.java +++ b/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/tasks/DmaapCpeAuthenticationConsumerTaskImplTest.java @@ -26,6 +26,11 @@ import static org.mockito.Mockito.reset; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import com.google.gson.Gson; +import com.google.gson.JsonElement; + +import java.util.Optional; + import javax.net.ssl.SSLException; import org.junit.Assert; @@ -62,6 +67,7 @@ class DmaapCpeAuthenticationConsumerTaskImplTest { private static CpeAuthenticationConsumerDmaapModel cpeAuthenticationConsumerDmaapModel; private static DMaaPConsumerReactiveHttpClient dMaaPConsumerReactiveHttpClient; private static String eventsArray; + private static Gson gson = new Gson(); @BeforeAll static void setUp() throws SSLException { @@ -108,22 +114,25 @@ class DmaapCpeAuthenticationConsumerTaskImplTest { @Test void passingEmptyMessage_NothingHappens() throws Exception { - when(dMaaPConsumerReactiveHttpClient.getDMaaPConsumerResponse()).thenReturn(Mono.just("")); + JsonElement empty = gson.toJsonTree(""); + when(dMaaPConsumerReactiveHttpClient.getDMaaPConsumerResponse(Optional.empty())).thenReturn(Mono.just(empty)); StepVerifier.create(dmaapConsumerTask.execute("Sample input")) .expectSubscription() .expectError(EmptyDmaapResponseException.class); - verify(dMaaPConsumerReactiveHttpClient).getDMaaPConsumerResponse(); + verify(dMaaPConsumerReactiveHttpClient).getDMaaPConsumerResponse(Optional.empty()); } @Test void passingNormalMessage_ResponseSucceeds() throws Exception { - when(dMaaPConsumerReactiveHttpClient.getDMaaPConsumerResponse()).thenReturn(Mono.just(eventsArray)); + JsonElement normalEventsArray = gson.toJsonTree(eventsArray); + when(dMaaPConsumerReactiveHttpClient.getDMaaPConsumerResponse(Optional.empty())) + .thenReturn(Mono.just(normalEventsArray)); StepVerifier.create(dmaapConsumerTask.execute("Sample input")) .expectSubscription() .consumeNextWith(e -> Assert.assertEquals(e, cpeAuthenticationConsumerDmaapModel)); - verify(dMaaPConsumerReactiveHttpClient).getDMaaPConsumerResponse(); + verify(dMaaPConsumerReactiveHttpClient).getDMaaPConsumerResponse(Optional.empty()); } private static DmaapConsumerConfiguration testVersionOfDmaapConsumerConfiguration() { diff --git a/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/tasks/DmaapPublisherTaskImplTest.java b/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/tasks/DmaapPublisherTaskImplTest.java index 199a43ec..436206d2 100644 --- a/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/tasks/DmaapPublisherTaskImplTest.java +++ b/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/tasks/DmaapPublisherTaskImplTest.java @@ -30,6 +30,9 @@ import static org.mockito.Mockito.when; import java.util.LinkedHashMap; import java.util.Map; +import java.util.Optional; + +import javax.net.ssl.SSLException; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeAll; @@ -39,12 +42,12 @@ import org.onap.bbs.event.processor.config.ApplicationConfiguration; import org.onap.bbs.event.processor.exceptions.DmaapException; import org.onap.bbs.event.processor.model.ControlLoopPublisherDmaapModel; import org.onap.bbs.event.processor.model.ImmutableControlLoopPublisherDmaapModel; +import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpResponse; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapPublisherConfiguration; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.producer.DMaaPPublisherReactiveHttpClient; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.producer.PublisherReactiveHttpClientFactory; import org.springframework.http.HttpStatus; -import org.springframework.http.ResponseEntity; import reactor.core.publisher.Mono; import reactor.test.StepVerifier; @@ -112,46 +115,46 @@ class DmaapPublisherTaskImplTest { } @Test - void passingNormalMessage_ReactiveClientProcessesIt() throws DmaapException { - ResponseEntity<String> responseEntity = setupMocks(HttpStatus.OK.value()); - when(responseEntity.getStatusCode()).thenReturn(HttpStatus.OK); + void passingNormalMessage_ReactiveClientProcessesIt() throws DmaapException, SSLException { + HttpResponse response = setupMocks(HttpStatus.OK.value()); + StepVerifier.create(task.execute(controlLoopPublisherDmaapModel)).expectSubscription() - .expectNext(responseEntity).verifyComplete(); + .expectNext(response).verifyComplete(); verify(reactiveHttpClient, times(1)) - .getDMaaPProducerResponse(controlLoopPublisherDmaapModel); + .getDMaaPProducerResponse(controlLoopPublisherDmaapModel, Optional.empty()); verifyNoMoreInteractions(reactiveHttpClient); } @Test - void passingNormalMessage_IncorrectResponseIsHandled() throws DmaapException { - ResponseEntity<String> responseEntity = setupMocks(HttpStatus.UNAUTHORIZED.value()); - when(responseEntity.getStatusCode()).thenReturn(HttpStatus.UNAUTHORIZED); + void passingNormalMessage_IncorrectResponseIsHandled() throws DmaapException, SSLException { + HttpResponse response = setupMocks(HttpStatus.UNAUTHORIZED.value()); + StepVerifier.create(task.execute(controlLoopPublisherDmaapModel)).expectSubscription() - .expectNext(responseEntity).verifyComplete(); + .expectNext(response).verifyComplete(); verify(reactiveHttpClient, times(1)) - .getDMaaPProducerResponse(controlLoopPublisherDmaapModel); + .getDMaaPProducerResponse(controlLoopPublisherDmaapModel, Optional.empty()); verifyNoMoreInteractions(reactiveHttpClient); } // We can safely suppress unchecked assignment warning here since it is a mock class @SuppressWarnings("unchecked") - private ResponseEntity<String> setupMocks(Integer httpResponseCode) { + private HttpResponse setupMocks(Integer httpResponseCode) throws SSLException { - ResponseEntity<String> responseEntity = mock(ResponseEntity.class); - when(responseEntity.getStatusCode()).thenReturn(HttpStatus.valueOf(httpResponseCode)); + HttpResponse response = mock(HttpResponse.class); + when(response.statusCode()).thenReturn(httpResponseCode); reactiveHttpClient = mock(DMaaPPublisherReactiveHttpClient.class); - when(reactiveHttpClient.getDMaaPProducerResponse(any())) - .thenReturn(Mono.just(responseEntity)); + when(reactiveHttpClient.getDMaaPProducerResponse(any(), any(Optional.class))) + .thenReturn(Mono.just(response)); PublisherReactiveHttpClientFactory httpClientFactory = mock(PublisherReactiveHttpClientFactory.class); doReturn(reactiveHttpClient).when(httpClientFactory).create(dmaapPublisherConfiguration); task = new DmaapPublisherTaskImpl(configuration, httpClientFactory); - return responseEntity; + return response; } private static DmaapPublisherConfiguration testVersionOfDmaapPublisherConfiguration() { diff --git a/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/tasks/DmaapReRegistrationConsumerTaskImplTest.java b/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/tasks/DmaapReRegistrationConsumerTaskImplTest.java index c9a461d8..72e28987 100644 --- a/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/tasks/DmaapReRegistrationConsumerTaskImplTest.java +++ b/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/tasks/DmaapReRegistrationConsumerTaskImplTest.java @@ -26,6 +26,11 @@ import static org.mockito.Mockito.reset; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import com.google.gson.Gson; +import com.google.gson.JsonElement; + +import java.util.Optional; + import javax.net.ssl.SSLException; import org.junit.Assert; @@ -59,7 +64,8 @@ class DmaapReRegistrationConsumerTaskImplTest { private static DmaapReRegistrationConsumerTaskImpl dmaapConsumerTask; private static ReRegistrationConsumerDmaapModel reRegistrationConsumerDmaapModel; private static DMaaPConsumerReactiveHttpClient dMaaPConsumerReactiveHttpClient; - private static String message; + private static String eventsArray; + private static Gson gson = new Gson(); @BeforeAll static void setUp() throws SSLException { @@ -91,12 +97,10 @@ class DmaapReRegistrationConsumerTaskImplTest { .sVlan(svlan) .build(); - message = String.format("[" + RE_REGISTRATION_EVENT_TEMPLATE + "]", - sourceName, - attachmentPoint, - remoteId, - cvlan, - svlan); + String event = String.format(RE_REGISTRATION_EVENT_TEMPLATE, sourceName, attachmentPoint, remoteId, + cvlan, svlan); + + eventsArray = "[" + event + "]"; } @AfterEach @@ -105,23 +109,26 @@ class DmaapReRegistrationConsumerTaskImplTest { } @Test - void passingEmptyMessage_NothingHappens() throws Exception { - when(dMaaPConsumerReactiveHttpClient.getDMaaPConsumerResponse()).thenReturn(Mono.just("")); + void passingEmptyMessage_NothingHappens() { + JsonElement empty = gson.toJsonTree(""); + when(dMaaPConsumerReactiveHttpClient.getDMaaPConsumerResponse(Optional.empty())).thenReturn(Mono.just(empty)); StepVerifier.create(dmaapConsumerTask.execute("Sample input")) .expectSubscription() .expectError(EmptyDmaapResponseException.class); - verify(dMaaPConsumerReactiveHttpClient).getDMaaPConsumerResponse(); + verify(dMaaPConsumerReactiveHttpClient).getDMaaPConsumerResponse(Optional.empty()); } @Test - void passingNormalMessage_ResponseSucceeds() throws Exception { - when(dMaaPConsumerReactiveHttpClient.getDMaaPConsumerResponse()).thenReturn(Mono.just(message)); + void passingNormalMessage_ResponseSucceeds() { + JsonElement normalEventsArray = gson.toJsonTree(eventsArray); + when(dMaaPConsumerReactiveHttpClient.getDMaaPConsumerResponse(Optional.empty())) + .thenReturn(Mono.just(normalEventsArray)); StepVerifier.create(dmaapConsumerTask.execute("Sample input")) .expectSubscription() .consumeNextWith(e -> Assert.assertEquals(e, reRegistrationConsumerDmaapModel)); - verify(dMaaPConsumerReactiveHttpClient).getDMaaPConsumerResponse(); + verify(dMaaPConsumerReactiveHttpClient).getDMaaPConsumerResponse(Optional.empty()); } private static DmaapConsumerConfiguration testVersionOfDmaapConsumerConfiguration() { diff --git a/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/utilities/CpeAuthenticationDmaapConsumerJsonParserTest.java b/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/utilities/CpeAuthenticationDmaapConsumerJsonParserTest.java index 4ca61f5e..c7ad7935 100644 --- a/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/utilities/CpeAuthenticationDmaapConsumerJsonParserTest.java +++ b/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/utilities/CpeAuthenticationDmaapConsumerJsonParserTest.java @@ -24,7 +24,9 @@ import static org.mockito.Mockito.spy; import com.google.gson.JsonElement; import com.google.gson.JsonParser; +import com.google.gson.stream.JsonReader; +import java.io.StringReader; import java.util.Optional; import org.junit.jupiter.api.BeforeAll; @@ -100,14 +102,17 @@ class CpeAuthenticationDmaapConsumerJsonParserTest { } @Test - void passingNonJson_EmptyFluxIsReturned() { + void passingNonJson_getIllegalStateException() { CpeAuthenticationDmaapConsumerJsonParser consumerJsonParser = new CpeAuthenticationDmaapConsumerJsonParser(); + JsonReader jsonReader = new JsonReader(new StringReader("not JSON")); + jsonReader.setLenient(true); + JsonElement notJson = jsonParser.parse(jsonReader).getAsJsonPrimitive(); - StepVerifier.create(consumerJsonParser.extractModelFromDmaap(Mono.just("not JSON"))) + StepVerifier.create(consumerJsonParser.extractModelFromDmaap(Mono.just(notJson))) .expectSubscription() - .verifyComplete(); + .verifyError(IllegalStateException.class); } @Test @@ -116,7 +121,7 @@ class CpeAuthenticationDmaapConsumerJsonParserTest { CpeAuthenticationDmaapConsumerJsonParser consumerJsonParser = new CpeAuthenticationDmaapConsumerJsonParser(); - StepVerifier.create(consumerJsonParser.extractModelFromDmaap(Mono.just("[]"))) + StepVerifier.create(consumerJsonParser.extractModelFromDmaap(Mono.just(jsonParser.parse("[]")))) .expectSubscription() .verifyComplete(); } @@ -151,7 +156,7 @@ class CpeAuthenticationDmaapConsumerJsonParserTest { .swVersion(swVersion) .build(); - StepVerifier.create(consumerJsonParser.extractModelFromDmaap(Mono.just(eventsArray))) + StepVerifier.create(consumerJsonParser.extractModelFromDmaap(Mono.just(jsonParser.parse(eventsArray)))) .expectSubscription() .expectNext(expectedEventObject); } @@ -182,7 +187,7 @@ class CpeAuthenticationDmaapConsumerJsonParserTest { Mockito.doReturn(Optional.of(jsonElement2.getAsJsonObject())) .when(consumerJsonParser).getJsonObjectFromAnArray(jsonElement2); - String eventsArray = "[" + firstEvent + secondEvent + "]"; + String eventsArray = "[" + firstEvent + "," + secondEvent + "]"; CpeAuthenticationConsumerDmaapModel expectedFirstEventObject = ImmutableCpeAuthenticationConsumerDmaapModel.builder() @@ -203,7 +208,7 @@ class CpeAuthenticationDmaapConsumerJsonParserTest { .swVersion(swVersion) .build(); - StepVerifier.create(consumerJsonParser.extractModelFromDmaap(Mono.just(eventsArray))) + StepVerifier.create(consumerJsonParser.extractModelFromDmaap(Mono.just(jsonParser.parse(eventsArray)))) .expectSubscription() .expectNext(expectedFirstEventObject) .expectNext(expectedSecondEventObject); @@ -229,7 +234,7 @@ class CpeAuthenticationDmaapConsumerJsonParserTest { String eventsArray = "[" + event + "]"; - StepVerifier.create(consumerJsonParser.extractModelFromDmaap(Mono.just(eventsArray))) + StepVerifier.create(consumerJsonParser.extractModelFromDmaap(Mono.just(jsonParser.parse(eventsArray)))) .expectSubscription() .verifyComplete(); } @@ -254,7 +259,7 @@ class CpeAuthenticationDmaapConsumerJsonParserTest { String eventsArray = "[" + event + "]"; - StepVerifier.create(consumerJsonParser.extractModelFromDmaap(Mono.just(eventsArray))) + StepVerifier.create(consumerJsonParser.extractModelFromDmaap(Mono.just(jsonParser.parse(eventsArray)))) .expectSubscription() .verifyComplete(); } @@ -279,7 +284,7 @@ class CpeAuthenticationDmaapConsumerJsonParserTest { String eventsArray = "[" + event + "]"; - StepVerifier.create(consumerJsonParser.extractModelFromDmaap(Mono.just(eventsArray))) + StepVerifier.create(consumerJsonParser.extractModelFromDmaap(Mono.just(jsonParser.parse(eventsArray)))) .expectSubscription() .verifyComplete(); } @@ -304,7 +309,7 @@ class CpeAuthenticationDmaapConsumerJsonParserTest { String eventsArray = "[" + event + "]"; - StepVerifier.create(consumerJsonParser.extractModelFromDmaap(Mono.just(eventsArray))) + StepVerifier.create(consumerJsonParser.extractModelFromDmaap(Mono.just(jsonParser.parse(eventsArray)))) .expectSubscription() .verifyComplete(); } diff --git a/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/utilities/ReRegistrationDmaapConsumerJsonParserTest.java b/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/utilities/ReRegistrationDmaapConsumerJsonParserTest.java index ca448dd0..cd238e2d 100644 --- a/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/utilities/ReRegistrationDmaapConsumerJsonParserTest.java +++ b/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/utilities/ReRegistrationDmaapConsumerJsonParserTest.java @@ -24,7 +24,9 @@ import static org.mockito.Mockito.spy; import com.google.gson.JsonElement; import com.google.gson.JsonParser; +import com.google.gson.stream.JsonReader; +import java.io.StringReader; import java.util.Optional; import org.junit.jupiter.api.BeforeAll; @@ -89,21 +91,22 @@ class ReRegistrationDmaapConsumerJsonParserTest { } @Test - void passingNonJson_EmptyFluxIsReturned() { + void passingNonJson_getIllegalStateException() { ReRegistrationDmaapConsumerJsonParser consumerJsonParser = new ReRegistrationDmaapConsumerJsonParser(); - - StepVerifier.create(consumerJsonParser.extractModelFromDmaap(Mono.just("not JSON"))) + JsonReader jsonReader = new JsonReader(new StringReader("not JSON")); + jsonReader.setLenient(true); + JsonElement notJson = jsonParser.parse(jsonReader).getAsJsonPrimitive(); + StepVerifier.create(consumerJsonParser.extractModelFromDmaap(Mono.just(notJson))) .expectSubscription() - .verifyComplete(); + .verifyError(IllegalStateException.class); } @Test void passingNoEvents_EmptyFluxIsReturned() { ReRegistrationDmaapConsumerJsonParser consumerJsonParser = new ReRegistrationDmaapConsumerJsonParser(); - - StepVerifier.create(consumerJsonParser.extractModelFromDmaap(Mono.just("[]"))) + StepVerifier.create(consumerJsonParser.extractModelFromDmaap(Mono.just(jsonParser.parse("[]")))) .expectSubscription() .verifyComplete(); } @@ -135,7 +138,7 @@ class ReRegistrationDmaapConsumerJsonParserTest { .sVlan(svlan) .build(); - StepVerifier.create(consumerJsonParser.extractModelFromDmaap(Mono.just(eventsArray))) + StepVerifier.create(consumerJsonParser.extractModelFromDmaap(Mono.just(jsonParser.parse(eventsArray)))) .expectSubscription() .expectNext(expectedEventObject); } @@ -165,7 +168,7 @@ class ReRegistrationDmaapConsumerJsonParserTest { Mockito.doReturn(Optional.of(jsonElement2.getAsJsonObject())) .when(consumerJsonParser).getJsonObjectFromAnArray(jsonElement2); - String eventsArray = "[" + firstEvent + secondEvent + "]"; + String eventsArray = "[" + firstEvent + "," + secondEvent + "]"; ReRegistrationConsumerDmaapModel expectedFirstEventObject = ImmutableReRegistrationConsumerDmaapModel.builder() .correlationId(correlationId1) @@ -182,7 +185,7 @@ class ReRegistrationDmaapConsumerJsonParserTest { .sVlan(svlan) .build(); - StepVerifier.create(consumerJsonParser.extractModelFromDmaap(Mono.just(eventsArray))) + StepVerifier.create(consumerJsonParser.extractModelFromDmaap(Mono.just(jsonParser.parse(eventsArray)))) .expectSubscription() .expectNext(expectedFirstEventObject) .expectNext(expectedSecondEventObject); @@ -209,7 +212,7 @@ class ReRegistrationDmaapConsumerJsonParserTest { String eventsArray = "[" + event + "]"; - StepVerifier.create(consumerJsonParser.extractModelFromDmaap(Mono.just(eventsArray))) + StepVerifier.create(consumerJsonParser.extractModelFromDmaap(Mono.just(jsonParser.parse(eventsArray)))) .expectSubscription() .verifyComplete(); } @@ -235,7 +238,7 @@ class ReRegistrationDmaapConsumerJsonParserTest { String eventsArray = "[" + event + "]"; - StepVerifier.create(consumerJsonParser.extractModelFromDmaap(Mono.just(eventsArray))) + StepVerifier.create(consumerJsonParser.extractModelFromDmaap(Mono.just(jsonParser.parse(eventsArray)))) .expectSubscription() .verifyComplete(); } @@ -261,7 +264,7 @@ class ReRegistrationDmaapConsumerJsonParserTest { String eventsArray = "[" + event + "]"; - StepVerifier.create(consumerJsonParser.extractModelFromDmaap(Mono.just(eventsArray))) + StepVerifier.create(consumerJsonParser.extractModelFromDmaap(Mono.just(jsonParser.parse(eventsArray)))) .expectSubscription() .verifyComplete(); } @@ -289,7 +292,7 @@ class ReRegistrationDmaapConsumerJsonParserTest { String eventsArray = "[" + event + "]"; - StepVerifier.create(consumerJsonParser.extractModelFromDmaap(Mono.just(eventsArray))) + StepVerifier.create(consumerJsonParser.extractModelFromDmaap(Mono.just(jsonParser.parse(eventsArray)))) .expectSubscription() .verifyComplete(); } |