summaryrefslogtreecommitdiffstats
path: root/components/bbs-event-processor/src
diff options
context:
space:
mode:
authorStavros Kanarakis <stavros.kanarakis@nokia.com>2019-04-10 13:04:11 +0300
committerStavros Kanarakis <stavros.kanarakis@nokia.com>2019-04-10 13:04:11 +0300
commit47201b60365a87b993a8a93d0b1c1bb03cbca1d6 (patch)
treebd316f291b7208c33dc28a183fb7f34721250d3c /components/bbs-event-processor/src
parent35cc15e04411008b2f8094bbd3876e7a2daed587 (diff)
Support for HTTPS certificates-based communication with A&AI and DMaaP
Also, upgraded DCAE-SDK to the latest 1.1.4 version Change-Id: Ica59ab3107d9c0bcbf4dbaacf5063d4ceb8ed4b9 Issue-ID: DCAEGEN2-1354 Signed-off-by: Stavros Kanarakis <stavros.kanarakis@nokia.com>
Diffstat (limited to 'components/bbs-event-processor/src')
-rw-r--r--components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/config/ApplicationConfiguration.java15
-rw-r--r--components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/config/ConsulConfigurationGateway.java19
-rw-r--r--components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/model/GeneratedAppConfigObject.java19
-rw-r--r--components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/pipelines/CpeAuthenticationPipeline.java24
-rw-r--r--components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/pipelines/ReRegistrationPipeline.java24
-rw-r--r--components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapCpeAuthenticationConsumerTaskImpl.java8
-rw-r--r--components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapPublisherTask.java4
-rw-r--r--components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapPublisherTaskImpl.java27
-rw-r--r--components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapReRegistrationConsumerTaskImpl.java8
-rw-r--r--components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/AaiReactiveClient.java1
-rw-r--r--components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/CpeAuthenticationDmaapConsumerJsonParser.java16
-rw-r--r--components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/ReRegistrationDmaapConsumerJsonParser.java16
-rw-r--r--components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/config/ApplicationConfigurationTest.java50
-rw-r--r--components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/config/ConsulConfigurationGatewayTest.java12
-rw-r--r--components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/pipelines/CpeAuthenticationPipelineTest.java40
-rw-r--r--components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/pipelines/ReRegistrationPipelineTest.java44
-rw-r--r--components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/tasks/DmaapCpeAuthenticationConsumerTaskImplTest.java17
-rw-r--r--components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/tasks/DmaapPublisherTaskImplTest.java37
-rw-r--r--components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/tasks/DmaapReRegistrationConsumerTaskImplTest.java33
-rw-r--r--components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/utilities/CpeAuthenticationDmaapConsumerJsonParserTest.java27
-rw-r--r--components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/utilities/ReRegistrationDmaapConsumerJsonParserTest.java29
21 files changed, 296 insertions, 174 deletions
diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/config/ApplicationConfiguration.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/config/ApplicationConfiguration.java
index 981d9633..5022a693 100644
--- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/config/ApplicationConfiguration.java
+++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/config/ApplicationConfiguration.java
@@ -174,6 +174,13 @@ public class ApplicationConfiguration implements ConfigurationChangeObservable {
synchronized (this) {
cbsPollingInterval = newConfiguration.cbsPollingIntervalSec();
+ securityProperties.setEnableAaiCertAuth(newConfiguration.enableAaiCertAuth());
+ securityProperties.setEnableDmaapCertAuth(newConfiguration.enableDmaapCertAuth());
+ securityProperties.setKeyStorePath(newConfiguration.keyStorePath());
+ securityProperties.setKeyStorePasswordPath(newConfiguration.keyStorePasswordPath());
+ securityProperties.setTrustStorePath(newConfiguration.trustStorePath());
+ securityProperties.setTrustStorePasswordPath(newConfiguration.trustStorePasswordPath());
+
GeneratedAppConfigObject.StreamsObject reRegObject =
getStreamsObject(newConfiguration.streamSubscribesMap(), newConfiguration.reRegConfigKey(),
"PNF Re-Registration");
@@ -181,6 +188,8 @@ public class ApplicationConfiguration implements ConfigurationChangeObservable {
dmaapReRegistrationConsumerProperties.setDmaapHostName(topicUrlInfo.getHost());
dmaapReRegistrationConsumerProperties.setDmaapPortNumber(topicUrlInfo.getPort());
dmaapReRegistrationConsumerProperties.setDmaapProtocol(newConfiguration.dmaapProtocol());
+ dmaapReRegistrationConsumerProperties.setDmaapUserName(reRegObject.aafUsername());
+ dmaapReRegistrationConsumerProperties.setDmaapUserPassword(reRegObject.aafPassword());
dmaapReRegistrationConsumerProperties.setDmaapContentType(newConfiguration.dmaapContentType());
dmaapReRegistrationConsumerProperties.setDmaapTopicName(topicUrlInfo.getTopicName());
dmaapReRegistrationConsumerProperties.setConsumerId(newConfiguration.dmaapConsumerConsumerId());
@@ -196,6 +205,8 @@ public class ApplicationConfiguration implements ConfigurationChangeObservable {
dmaapCpeAuthenticationConsumerProperties.setDmaapHostName(topicUrlInfo.getHost());
dmaapCpeAuthenticationConsumerProperties.setDmaapPortNumber(topicUrlInfo.getPort());
dmaapCpeAuthenticationConsumerProperties.setDmaapProtocol(newConfiguration.dmaapProtocol());
+ dmaapCpeAuthenticationConsumerProperties.setDmaapUserName(cpeAuthObject.aafUsername());
+ dmaapCpeAuthenticationConsumerProperties.setDmaapUserPassword(cpeAuthObject.aafPassword());
dmaapCpeAuthenticationConsumerProperties.setDmaapContentType(newConfiguration.dmaapContentType());
dmaapCpeAuthenticationConsumerProperties.setDmaapTopicName(topicUrlInfo.getTopicName());
dmaapCpeAuthenticationConsumerProperties.setConsumerId(newConfiguration.dmaapConsumerConsumerId());
@@ -211,6 +222,8 @@ public class ApplicationConfiguration implements ConfigurationChangeObservable {
dmaapProducerProperties.setDmaapHostName(topicUrlInfo.getHost());
dmaapProducerProperties.setDmaapPortNumber(topicUrlInfo.getPort());
dmaapProducerProperties.setDmaapProtocol(newConfiguration.dmaapProtocol());
+ dmaapProducerProperties.setDmaapUserName(closeLoopObject.aafUsername());
+ dmaapProducerProperties.setDmaapUserPassword(closeLoopObject.aafPassword());
dmaapProducerProperties.setDmaapContentType(newConfiguration.dmaapContentType());
dmaapProducerProperties.setDmaapTopicName(topicUrlInfo.getTopicName());
constructDmaapProducerConfiguration();
@@ -361,7 +374,7 @@ public class ApplicationConfiguration implements ConfigurationChangeObservable {
throw new ConfigurationParsingException("Wrong topic name structure");
}
topicUrlInfo.setPort(Integer.valueOf(tokensAfterHost[0]));
- topicUrlInfo.setTopicName("/events/" + tokensAfterHost[1]);
+ topicUrlInfo.setTopicName("events/" + tokensAfterHost[1]);
return topicUrlInfo;
}
diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/config/ConsulConfigurationGateway.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/config/ConsulConfigurationGateway.java
index 1d27fc0a..607b3b31 100644
--- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/config/ConsulConfigurationGateway.java
+++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/config/ConsulConfigurationGateway.java
@@ -39,6 +39,8 @@ import org.onap.bbs.event.processor.model.ImmutableDmaapInfo;
import org.onap.bbs.event.processor.model.ImmutableGeneratedAppConfigObject;
import org.onap.bbs.event.processor.model.ImmutableStreamsObject;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClientFactory;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsRequests;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.CbsRequest;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperties;
import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext;
import org.slf4j.Logger;
@@ -123,12 +125,12 @@ public class ConsulConfigurationGateway {
// Necessary properties from the environment (Consul host:port, service-name (hostname), CBS name)
EnvProperties env = EnvProperties.fromEnvironment();
-
+ CbsRequest cbsRequest = CbsRequests.getConfiguration(diagnosticContext);
// Create the client and use it to get the configuration
cbsFetchPipeline = CbsClientFactory.createCbsClient(env)
.doOnError(e -> LOGGER.warn("CBS Configuration fetch failed with error: {}", e))
.retry(e -> true)
- .flatMapMany(cbsClient -> cbsClient.updates(diagnosticContext, initialDelay, period))
+ .flatMapMany(cbsClient -> cbsClient.updates(cbsRequest, initialDelay, period))
.subscribe(this::parseConsulRetrievedConfiguration, this::handleErrors);
}
@@ -178,6 +180,13 @@ public class ConsulConfigurationGateway {
final String loggingLevel = configObject.get("application.loggingLevel").getAsString();
+ final String keyStorePath = configObject.get("application.ssl.keyStorePath").getAsString();
+ final String keyStorePasswordPath = configObject.get("application.ssl.keyStorePasswordPath").getAsString();
+ final String trustStorePath = configObject.get("application.ssl.trustStorePath").getAsString();
+ final String trustStorePasswordPath = configObject.get("application.ssl.trustStorePasswordPath").getAsString();
+ final boolean aaiEnableCertAuth = configObject.get("application.ssl.enableAaiCertAuth").getAsBoolean();
+ final boolean dmaapEnableCertAuth = configObject.get("application.ssl.enableDmaapCertAuth").getAsBoolean();
+
final JsonObject streamsPublishes = configObject.getAsJsonObject("streams_publishes");
final JsonObject streamsSubscribes = configObject.getAsJsonObject("streams_subscribes");
@@ -211,6 +220,12 @@ public class ConsulConfigurationGateway {
.cpeAuthConfigKey(cpeAuthConfigKey)
.closeLoopConfigKey(closeLoopConfigKey)
.loggingLevel(loggingLevel)
+ .keyStorePath(keyStorePath)
+ .keyStorePasswordPath(keyStorePasswordPath)
+ .trustStorePath(trustStorePath)
+ .trustStorePasswordPath(trustStorePasswordPath)
+ .enableAaiCertAuth(aaiEnableCertAuth)
+ .enableDmaapCertAuth(dmaapEnableCertAuth)
.streamSubscribesMap(parseStreamsObjects(streamsSubscribes))
.streamPublishesMap(parseStreamsObjects(streamsPublishes))
.build();
diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/model/GeneratedAppConfigObject.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/model/GeneratedAppConfigObject.java
index 4fdb81be..41a8a34f 100644
--- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/model/GeneratedAppConfigObject.java
+++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/model/GeneratedAppConfigObject.java
@@ -123,6 +123,25 @@ public interface GeneratedAppConfigObject {
@SerializedName(value = "application.loggingLevel", alternate = "application.loggingLevel")
String loggingLevel();
+ @SerializedName(value = "application.ssl.trustStorePath", alternate = "application.ssl.trustStorePath")
+ String trustStorePath();
+
+ @SerializedName(value = "application.ssl.trustStorePasswordPath",
+ alternate = "application.ssl.trustStorePasswordPath")
+ String trustStorePasswordPath();
+
+ @SerializedName(value = "application.ssl.keyStorePath", alternate = "application.ssl.keyStorePath")
+ String keyStorePath();
+
+ @SerializedName(value = "application.ssl.keyStorePasswordPath", alternate = "application.ssl.keyStorePasswordPath")
+ String keyStorePasswordPath();
+
+ @SerializedName(value = "application.ssl.enableAaiCertAuth", alternate = "application.ssl.enableAaiCertAuth")
+ boolean enableAaiCertAuth();
+
+ @SerializedName(value = "application.ssl.enableDmaapCertAuth", alternate = "application.ssl.enableDmaapCertAuth")
+ boolean enableDmaapCertAuth();
+
@SerializedName(value = "streams_subscribes", alternate = "streams_subscribes")
Map<String, StreamsObject> streamSubscribesMap();
diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/pipelines/CpeAuthenticationPipeline.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/pipelines/CpeAuthenticationPipeline.java
index 711ab185..a30903bb 100644
--- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/pipelines/CpeAuthenticationPipeline.java
+++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/pipelines/CpeAuthenticationPipeline.java
@@ -49,11 +49,11 @@ import org.onap.bbs.event.processor.model.ServiceInstanceAaiObject;
import org.onap.bbs.event.processor.tasks.AaiClientTask;
import org.onap.bbs.event.processor.tasks.DmaapCpeAuthenticationConsumerTask;
import org.onap.bbs.event.processor.tasks.DmaapPublisherTask;
+import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
@@ -99,7 +99,7 @@ public class CpeAuthenticationPipeline {
LOGGER.trace("Reactive CPE Authentication pipeline subscribed - Execution started");
}
- Flux<ResponseEntity<String>> executePipeline() {
+ Flux<HttpResponse> executePipeline() {
return
// Consume CPE Authentication from DMaaP
consumeCpeAuthenticationFromDmaap()
@@ -111,11 +111,11 @@ public class CpeAuthenticationPipeline {
.flatMap(this::triggerPolicy);
}
- private void onSuccess(ResponseEntity<String> responseCode) {
- MDC.put(RESPONSE_CODE, responseCode.getStatusCode().toString());
+ private void onSuccess(HttpResponse responseCode) {
+ MDC.put(RESPONSE_CODE, String.valueOf(responseCode.statusCode()));
LOGGER.info("CPE Authentication event successfully handled. "
+ "Publishing to DMaaP for Policy returned a status code of ({} {})",
- responseCode.getStatusCode().value(), responseCode.getStatusCode().getReasonPhrase());
+ responseCode.statusCode(), responseCode.statusReason());
MDC.remove(RESPONSE_CODE);
}
@@ -171,8 +171,10 @@ public class CpeAuthenticationPipeline {
.doOnError(TimeoutException.class,
e -> LOGGER.warn("Timed out waiting for A&AI response")
)
- .doOnError(e -> LOGGER.error("Error while retrieving PNF: {}",
- e.getMessage())
+ .doOnError(e -> {
+ LOGGER.error("Error while retrieving PNF: {}", e.getMessage());
+ LOGGER.debug("Error\n", e);
+ }
)
.onErrorResume(
e -> e instanceof Exception,
@@ -214,8 +216,10 @@ public class CpeAuthenticationPipeline {
.doOnError(TimeoutException.class,
e -> LOGGER.warn("Timed out waiting for A&AI response")
)
- .doOnError(e -> LOGGER.error("Error while retrieving HSI CFS Service instance: {}",
- e.getMessage())
+ .doOnError(e -> {
+ LOGGER.error("Error while retrieving HSI CFS Service instance: {}", e.getMessage());
+ LOGGER.debug("Error\n", e);
+ }
)
.onErrorResume(
e -> e instanceof Exception,
@@ -226,7 +230,7 @@ public class CpeAuthenticationPipeline {
});
}
- private Mono<ResponseEntity<String>> triggerPolicy(PipelineState state) {
+ private Mono<HttpResponse> triggerPolicy(PipelineState state) {
if (state == null || state.getHsiCfsServiceInstance() == null) {
return Mono.empty();
diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/pipelines/ReRegistrationPipeline.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/pipelines/ReRegistrationPipeline.java
index 9a42ed21..33a9aea7 100644
--- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/pipelines/ReRegistrationPipeline.java
+++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/pipelines/ReRegistrationPipeline.java
@@ -48,11 +48,11 @@ import org.onap.bbs.event.processor.model.ServiceInstanceAaiObject;
import org.onap.bbs.event.processor.tasks.AaiClientTask;
import org.onap.bbs.event.processor.tasks.DmaapPublisherTask;
import org.onap.bbs.event.processor.tasks.DmaapReRegistrationConsumerTask;
+import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
@@ -98,7 +98,7 @@ public class ReRegistrationPipeline {
LOGGER.trace("Reactive PNF Re-registration pipeline subscribed - Execution started");
}
- Flux<ResponseEntity<String>> executePipeline() {
+ Flux<HttpResponse> executePipeline() {
return
// Consume Re-Registration from DMaaP
consumeReRegistrationsFromDmaap()
@@ -110,11 +110,11 @@ public class ReRegistrationPipeline {
.flatMap(this::triggerPolicy);
}
- private void onSuccess(ResponseEntity<String> responseCode) {
- MDC.put(RESPONSE_CODE, responseCode.getStatusCode().toString());
+ private void onSuccess(HttpResponse responseCode) {
+ MDC.put(RESPONSE_CODE, String.valueOf(responseCode.statusCode()));
LOGGER.info("PNF Re-Registration event successfully handled. "
+ "Publishing to DMaaP for Policy returned a status code of ({} {})",
- responseCode.getStatusCode().value(), responseCode.getStatusCode().getReasonPhrase());
+ responseCode.statusCode(), responseCode.statusReason());
MDC.remove(RESPONSE_CODE);
}
@@ -170,8 +170,10 @@ public class ReRegistrationPipeline {
.doOnError(TimeoutException.class,
e -> LOGGER.warn("Timed out waiting for A&AI response")
)
- .doOnError(e -> LOGGER.error("Error while retrieving PNF: {}",
- e.getMessage())
+ .doOnError(e -> {
+ LOGGER.error("Error while retrieving PNF: {}", e.getMessage());
+ LOGGER.debug("Error\n", e);
+ }
)
.onErrorResume(
e -> e instanceof Exception,
@@ -219,8 +221,10 @@ public class ReRegistrationPipeline {
.doOnError(TimeoutException.class,
e -> LOGGER.warn("Timed out waiting for A&AI response")
)
- .doOnError(e -> LOGGER.error("Error while retrieving HSI CFS Service instance: {}",
- e.getMessage())
+ .doOnError(e -> {
+ LOGGER.error("Error while retrieving HSI CFS Service instance: {}", e.getMessage());
+ LOGGER.debug("Error\n", e);
+ }
)
.onErrorResume(
e -> e instanceof Exception,
@@ -259,7 +263,7 @@ public class ReRegistrationPipeline {
return isNotRelocation;
}
- private Mono<ResponseEntity<String>> triggerPolicy(PipelineState state) {
+ private Mono<HttpResponse> triggerPolicy(PipelineState state) {
if (state == null || state.getHsiCfsServiceInstance() == null) {
return Mono.empty();
diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapCpeAuthenticationConsumerTaskImpl.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapCpeAuthenticationConsumerTaskImpl.java
index e6bef523..da510281 100644
--- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapCpeAuthenticationConsumerTaskImpl.java
+++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapCpeAuthenticationConsumerTaskImpl.java
@@ -20,6 +20,10 @@
package org.onap.bbs.event.processor.tasks;
+import com.google.gson.JsonElement;
+
+import java.util.Optional;
+
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.net.ssl.SSLException;
@@ -85,6 +89,8 @@ public class DmaapCpeAuthenticationConsumerTaskImpl
public synchronized void updateConfiguration() {
try {
LOGGER.info("DMaaP CPE authentication consumer update due to new application configuration");
+ LOGGER.info("Creating secure context with:\n {}",
+ this.configuration.getDmaapCpeAuthenticationConsumerConfiguration());
httpClient = httpClientFactory.create(this.configuration.getDmaapCpeAuthenticationConsumerConfiguration());
} catch (SSLException e) {
LOGGER.error("SSL error while updating HTTP Client after a config update");
@@ -96,7 +102,7 @@ public class DmaapCpeAuthenticationConsumerTaskImpl
public Flux<CpeAuthenticationConsumerDmaapModel> execute(String taskName) {
LOGGER.debug("Executing task for CPE-Authentication with name \"{}\"", taskName);
DMaaPConsumerReactiveHttpClient httpClient = getHttpClient();
- Mono<String> response = httpClient.getDMaaPConsumerResponse();
+ Mono<JsonElement> response = httpClient.getDMaaPConsumerResponse(Optional.empty());
return cpeAuthenticationDmaapConsumerJsonParser.extractModelFromDmaap(response)
.switchIfEmpty(Flux.error(EMPTY_DMAAP_EXCEPTION))
.doOnError(e -> {
diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapPublisherTask.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapPublisherTask.java
index bddd2ecc..749c4e53 100644
--- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapPublisherTask.java
+++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapPublisherTask.java
@@ -21,11 +21,11 @@
package org.onap.bbs.event.processor.tasks;
import org.onap.bbs.event.processor.model.ControlLoopPublisherDmaapModel;
-import org.springframework.http.ResponseEntity;
+import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpResponse;
import reactor.core.publisher.Mono;
public interface DmaapPublisherTask {
- Mono<ResponseEntity<String>> execute(ControlLoopPublisherDmaapModel controlLoopPublisherDmaapModel);
+ Mono<HttpResponse> execute(ControlLoopPublisherDmaapModel controlLoopPublisherDmaapModel);
}
diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapPublisherTaskImpl.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapPublisherTaskImpl.java
index 7b227211..283e5ef9 100644
--- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapPublisherTaskImpl.java
+++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapPublisherTaskImpl.java
@@ -20,21 +20,24 @@
package org.onap.bbs.event.processor.tasks;
+import java.util.Optional;
+
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
+import javax.net.ssl.SSLException;
import org.onap.bbs.event.processor.config.ApplicationConfiguration;
import org.onap.bbs.event.processor.config.ConfigurationChangeObserver;
import org.onap.bbs.event.processor.exceptions.DmaapException;
import org.onap.bbs.event.processor.model.ControlLoopPublisherDmaapModel;
import org.onap.bbs.event.processor.utilities.ControlLoopJsonBodyBuilder;
+import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpResponse;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.producer.DMaaPPublisherReactiveHttpClient;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.producer.DmaaPRestTemplateFactory;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.producer.PublisherReactiveHttpClientFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Mono;
@@ -59,7 +62,12 @@ public class DmaapPublisherTaskImpl implements DmaapPublisherTask, Configuration
this.configuration = configuration;
this.httpClientFactory = httpClientFactory;
- httpClient = httpClientFactory.create(this.configuration.getDmaapPublisherConfiguration());
+ try {
+ httpClient = httpClientFactory.create(this.configuration.getDmaapPublisherConfiguration());
+ } catch (SSLException e) {
+ LOGGER.error("SSL error while creating HTTP Client: {}", e.getMessage());
+ LOGGER.debug("SSL exception\n", e);
+ }
}
@PostConstruct
@@ -75,17 +83,24 @@ public class DmaapPublisherTaskImpl implements DmaapPublisherTask, Configuration
@Override
public synchronized void updateConfiguration() {
LOGGER.info("DMaaP Publisher update due to new application configuration");
- httpClient = httpClientFactory.create(this.configuration.getDmaapPublisherConfiguration());
+ try {
+ LOGGER.info("Creating secure context with:\n {}", this.configuration.getDmaapPublisherConfiguration());
+ httpClient = httpClientFactory.create(this.configuration.getDmaapPublisherConfiguration());
+ } catch (SSLException e) {
+ LOGGER.error("SSL error while updating HTTP Client after a config update: {}", e.getMessage());
+ LOGGER.debug("SSL exception\n", e);
+ }
}
@Override
- public Mono<ResponseEntity<String>> execute(ControlLoopPublisherDmaapModel controlLoopPublisherDmaapModel) {
+ public Mono<HttpResponse> execute(ControlLoopPublisherDmaapModel controlLoopPublisherDmaapModel) {
if (controlLoopPublisherDmaapModel == null) {
throw new DmaapException("Cannot invoke a DMaaP Publish task with a null message");
}
- LOGGER.info("Executing task for publishing control loop message \n{}", controlLoopPublisherDmaapModel);
+ LOGGER.info("Executing task for publishing control loop message");
+ LOGGER.debug("CL message \n{}", controlLoopPublisherDmaapModel);
DMaaPPublisherReactiveHttpClient httpClient = getHttpClient();
- return httpClient.getDMaaPProducerResponse(controlLoopPublisherDmaapModel);
+ return httpClient.getDMaaPProducerResponse(controlLoopPublisherDmaapModel, Optional.empty());
}
private synchronized DMaaPPublisherReactiveHttpClient getHttpClient() {
diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapReRegistrationConsumerTaskImpl.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapReRegistrationConsumerTaskImpl.java
index 92f5a86f..e40037b1 100644
--- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapReRegistrationConsumerTaskImpl.java
+++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapReRegistrationConsumerTaskImpl.java
@@ -20,6 +20,10 @@
package org.onap.bbs.event.processor.tasks;
+import com.google.gson.JsonElement;
+
+import java.util.Optional;
+
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.net.ssl.SSLException;
@@ -85,6 +89,8 @@ public class DmaapReRegistrationConsumerTaskImpl implements DmaapReRegistrationC
public synchronized void updateConfiguration() {
try {
LOGGER.info("DMaaP PNF reregistration consumer update due to new application configuration");
+ LOGGER.info("Creating secure context with:\n {}",
+ this.configuration.getDmaapReRegistrationConsumerConfiguration());
httpClient = httpClientFactory.create(this.configuration.getDmaapReRegistrationConsumerConfiguration());
} catch (SSLException e) {
LOGGER.error("SSL error while updating HTTP Client after a config update");
@@ -96,7 +102,7 @@ public class DmaapReRegistrationConsumerTaskImpl implements DmaapReRegistrationC
public Flux<ReRegistrationConsumerDmaapModel> execute(String taskName) {
LOGGER.debug("Executing task for Re-Registration with name \"{}\"", taskName);
DMaaPConsumerReactiveHttpClient httpClient = getHttpClient();
- Mono<String> response = httpClient.getDMaaPConsumerResponse();
+ Mono<JsonElement> response = httpClient.getDMaaPConsumerResponse(Optional.empty());
return reRegistrationDmaapConsumerJsonParser.extractModelFromDmaap(response)
.switchIfEmpty(Flux.error(EMPTY_DMAAP_EXCEPTION))
.doOnError(e -> {
diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/AaiReactiveClient.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/AaiReactiveClient.java
index 19b81a80..84fc9f7d 100644
--- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/AaiReactiveClient.java
+++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/AaiReactiveClient.java
@@ -181,6 +181,7 @@ public class AaiReactiveClient implements ConfigurationChangeObserver {
private SslContext createSslContext() throws SSLException {
if (aaiClientConfiguration.enableAaiCertAuth()) {
+ LOGGER.info("Creating secure context with:\n {}", aaiClientConfiguration);
return sslFactory.createSecureContext(
aaiClientConfiguration.keyStorePath(),
aaiClientConfiguration.keyStorePasswordPath(),
diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/CpeAuthenticationDmaapConsumerJsonParser.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/CpeAuthenticationDmaapConsumerJsonParser.java
index 2bb5d98a..3cff4e65 100644
--- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/CpeAuthenticationDmaapConsumerJsonParser.java
+++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/CpeAuthenticationDmaapConsumerJsonParser.java
@@ -37,7 +37,6 @@ import com.google.gson.Gson;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
-import com.google.gson.JsonSyntaxException;
import java.util.Optional;
import java.util.stream.StreamSupport;
@@ -79,20 +78,9 @@ public class CpeAuthenticationDmaapConsumerJsonParser {
* @param dmaapResponse Response from DMaaP
* @return CPE Authentication Consumer DMaaP reactive model
*/
- public Flux<CpeAuthenticationConsumerDmaapModel> extractModelFromDmaap(Mono<String> dmaapResponse) {
+ public Flux<CpeAuthenticationConsumerDmaapModel> extractModelFromDmaap(Mono<JsonElement> dmaapResponse) {
return dmaapResponse
- .flatMapMany(this::parseToMono)
- .flatMap(this::createTargetFlux);
- }
-
- private Mono<JsonElement> parseToMono(String message) {
- if (StringUtils.isEmpty(message)) {
- LOGGER.warn("DMaaP response is empty");
- return Mono.empty();
- }
- return Mono.fromCallable(() -> new JsonParser().parse(message))
- .doOnError(e -> e instanceof JsonSyntaxException, e -> LOGGER.error("Invalid JSON. Ignoring"))
- .onErrorResume(e -> e instanceof JsonSyntaxException, e -> Mono.empty());
+ .flatMapMany(this::createTargetFlux);
}
private Flux<CpeAuthenticationConsumerDmaapModel> createTargetFlux(JsonElement jsonElement) {
diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/ReRegistrationDmaapConsumerJsonParser.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/ReRegistrationDmaapConsumerJsonParser.java
index 947d7a7c..9fe0c277 100644
--- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/ReRegistrationDmaapConsumerJsonParser.java
+++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/ReRegistrationDmaapConsumerJsonParser.java
@@ -32,7 +32,6 @@ import com.google.gson.Gson;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
-import com.google.gson.JsonSyntaxException;
import java.util.Optional;
import java.util.stream.StreamSupport;
@@ -72,20 +71,9 @@ public class ReRegistrationDmaapConsumerJsonParser {
* @param dmaapResponse Response from DMaaP
* @return Re-Registration Consumer DMaaP reactive model
*/
- public Flux<ReRegistrationConsumerDmaapModel> extractModelFromDmaap(Mono<String> dmaapResponse) {
+ public Flux<ReRegistrationConsumerDmaapModel> extractModelFromDmaap(Mono<JsonElement> dmaapResponse) {
return dmaapResponse
- .flatMapMany(this::parseToMono)
- .flatMap(this::createTargetFlux);
- }
-
- private Mono<JsonElement> parseToMono(String message) {
- if (StringUtils.isEmpty(message)) {
- LOGGER.warn("DMaaP response is empty");
- return Mono.empty();
- }
- return Mono.fromCallable(() -> new JsonParser().parse(message))
- .doOnError(e -> e instanceof JsonSyntaxException, e -> LOGGER.error("Invalid JSON. Ignoring"))
- .onErrorResume(e -> e instanceof JsonSyntaxException, e -> Mono.empty());
+ .flatMapMany(this::createTargetFlux);
}
private Flux<ReRegistrationConsumerDmaapModel> createTargetFlux(JsonElement jsonElement) {
diff --git a/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/config/ApplicationConfigurationTest.java b/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/config/ApplicationConfigurationTest.java
index 220466bc..69fbb3f0 100644
--- a/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/config/ApplicationConfigurationTest.java
+++ b/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/config/ApplicationConfigurationTest.java
@@ -91,8 +91,8 @@ import org.springframework.test.context.TestPropertySource;
"configs.security.trustStorePasswordPath=test trust store password path",
"configs.security.keyStorePath=test key store path",
"configs.security.keyStorePasswordPath=test key store password path",
- "configs.security.enableDmaapCertAuth=true",
- "configs.security.enableAaiCertAuth=true",
+ "configs.security.enableDmaapCertAuth=false",
+ "configs.security.enableAaiCertAuth=false",
"configs.application.pipelinesPollingIntervalSec=30",
"configs.application.pipelinesTimeoutSec=15",
"configs.application.policyVersion=1.0.0",
@@ -208,6 +208,17 @@ class ApplicationConfigurationTest {
() -> assertEquals("reRegControlName", configuration.getReRegistrationCloseLoopControlName()),
() -> assertEquals("cpeAuthControlName", configuration.getCpeAuthenticationCloseLoopControlName())
);
+
+ assertAll("Security Application Properties",
+ () -> assertFalse(aaiClientConfiguration.enableAaiCertAuth()),
+ () -> assertFalse(dmaapConsumerReRegistrationConfig.enableDmaapCertAuth()),
+ () -> assertEquals("test key store path", aaiClientConfiguration.keyStorePath()),
+ () -> assertEquals("test key store password path",
+ aaiClientConfiguration.keyStorePasswordPath()),
+ () -> assertEquals("test trust store path", aaiClientConfiguration.trustStorePath()),
+ () -> assertEquals("test trust store password path",
+ aaiClientConfiguration.trustStorePasswordPath())
+ );
}
@Test
@@ -287,6 +298,12 @@ class ApplicationConfigurationTest {
.cpeAuthConfigKey("config_key_2")
.closeLoopConfigKey("config_key_3")
.loggingLevel("TRACE")
+ .keyStorePath("test key store path - update")
+ .keyStorePasswordPath("test key store password path - update")
+ .trustStorePath("test trust store path - update")
+ .trustStorePasswordPath("test trust store password path - update")
+ .enableAaiCertAuth(true)
+ .enableDmaapCertAuth(true)
.streamSubscribesMap(subscribes)
.streamPublishesMap(Collections.singletonMap("config_key_3", streamsObject3))
.build();
@@ -315,11 +332,11 @@ class ApplicationConfigurationTest {
assertAll("DMaaP Consumer Re-Registration Configuration Properties",
() -> assertEquals("we-are-message-router1.us", dmaapConsumerReRegistrationConfig.dmaapHostName()),
() -> assertEquals(Integer.valueOf(3901), dmaapConsumerReRegistrationConfig.dmaapPortNumber()),
- () -> assertEquals("/events/unauthenticated.PNF_UPDATE",
+ () -> assertEquals("events/unauthenticated.PNF_UPDATE",
dmaapConsumerReRegistrationConfig.dmaapTopicName()),
() -> assertEquals("https", dmaapConsumerReRegistrationConfig.dmaapProtocol()),
- () -> assertEquals("", dmaapConsumerReRegistrationConfig.dmaapUserName()),
- () -> assertEquals("", dmaapConsumerReRegistrationConfig.dmaapUserPassword()),
+ () -> assertEquals("some-user", dmaapConsumerReRegistrationConfig.dmaapUserName()),
+ () -> assertEquals("some-password", dmaapConsumerReRegistrationConfig.dmaapUserPassword()),
() -> assertEquals("application/json", dmaapConsumerReRegistrationConfig.dmaapContentType()),
() -> assertEquals("c13", dmaapConsumerReRegistrationConfig.consumerId()),
() -> assertEquals("OpenDcae-c13", dmaapConsumerReRegistrationConfig.consumerGroup()),
@@ -332,11 +349,11 @@ class ApplicationConfigurationTest {
assertAll("DMaaP Consumer CPE Authentication Configuration Properties",
() -> assertEquals("we-are-message-router2.us", dmaapConsumerCpeAuthenticationConfig.dmaapHostName()),
() -> assertEquals(Integer.valueOf(3902), dmaapConsumerCpeAuthenticationConfig.dmaapPortNumber()),
- () -> assertEquals("/events/unauthenticated.CPE_AUTHENTICATION",
+ () -> assertEquals("events/unauthenticated.CPE_AUTHENTICATION",
dmaapConsumerCpeAuthenticationConfig.dmaapTopicName()),
() -> assertEquals("https", dmaapConsumerCpeAuthenticationConfig.dmaapProtocol()),
- () -> assertEquals("", dmaapConsumerCpeAuthenticationConfig.dmaapUserName()),
- () -> assertEquals("", dmaapConsumerCpeAuthenticationConfig.dmaapUserPassword()),
+ () -> assertEquals("some-user", dmaapConsumerCpeAuthenticationConfig.dmaapUserName()),
+ () -> assertEquals("some-password", dmaapConsumerCpeAuthenticationConfig.dmaapUserPassword()),
() -> assertEquals("application/json", dmaapConsumerCpeAuthenticationConfig.dmaapContentType()),
() -> assertEquals("c13", dmaapConsumerCpeAuthenticationConfig.consumerId()),
() -> assertEquals("OpenDcae-c13", dmaapConsumerCpeAuthenticationConfig.consumerGroup()),
@@ -348,11 +365,11 @@ class ApplicationConfigurationTest {
assertAll("DMaaP Publisher Configuration Properties",
() -> assertEquals("we-are-message-router3.us", dmaapPublisherConfiguration.dmaapHostName()),
() -> assertEquals(Integer.valueOf(3903), dmaapPublisherConfiguration.dmaapPortNumber()),
- () -> assertEquals("/events/unauthenticated.DCAE_CL_OUTPUT",
+ () -> assertEquals("events/unauthenticated.DCAE_CL_OUTPUT",
dmaapPublisherConfiguration.dmaapTopicName()),
() -> assertEquals("https", dmaapPublisherConfiguration.dmaapProtocol()),
- () -> assertEquals("", dmaapPublisherConfiguration.dmaapUserName()),
- () -> assertEquals("", dmaapPublisherConfiguration.dmaapUserPassword()),
+ () -> assertEquals("some-user", dmaapPublisherConfiguration.dmaapUserName()),
+ () -> assertEquals("some-password", dmaapPublisherConfiguration.dmaapUserPassword()),
() -> assertEquals("application/json", dmaapPublisherConfiguration.dmaapContentType())
);
@@ -371,5 +388,16 @@ class ApplicationConfigurationTest {
() -> assertEquals("controlName-update", configuration.getReRegistrationCloseLoopControlName()),
() -> assertEquals("controlName-update", configuration.getCpeAuthenticationCloseLoopControlName())
);
+
+ assertAll("Security Application Properties",
+ () -> assertTrue(aaiClientConfiguration.enableAaiCertAuth()),
+ () -> assertTrue(dmaapConsumerReRegistrationConfig.enableDmaapCertAuth()),
+ () -> assertEquals("test key store path - update", aaiClientConfiguration.keyStorePath()),
+ () -> assertEquals("test key store password path - update",
+ aaiClientConfiguration.keyStorePasswordPath()),
+ () -> assertEquals("test trust store path - update", aaiClientConfiguration.trustStorePath()),
+ () -> assertEquals("test trust store password path - update",
+ aaiClientConfiguration.trustStorePasswordPath())
+ );
}
} \ No newline at end of file
diff --git a/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/config/ConsulConfigurationGatewayTest.java b/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/config/ConsulConfigurationGatewayTest.java
index 9f5ce6dc..1acf864d 100644
--- a/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/config/ConsulConfigurationGatewayTest.java
+++ b/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/config/ConsulConfigurationGatewayTest.java
@@ -84,6 +84,12 @@ class ConsulConfigurationGatewayTest {
+ "\"application.cpeAuth.configKey\": \"config_key_1\","
+ "\"application.closeLoop.configKey\": \"config_key_3\","
+ "\"application.loggingLevel\": \"TRACE\","
+ + "\"application.ssl.keyStorePath\": \"/opt/app/bbs-event-processor/etc/cert/key.p12\","
+ + "\"application.ssl.keyStorePasswordPath\": \"/opt/app/bbs-event-processor/etc/cert/key.pass\","
+ + "\"application.ssl.trustStorePath\": \"/opt/app/bbs-event-processor/etc/cert/trust.jks\","
+ + "\"application.ssl.trustStorePasswordPath\": \"/opt/app/bbs-event-processor/etc/cert/trust.pass\","
+ + "\"application.ssl.enableAaiCertAuth\": true,"
+ + "\"application.ssl.enableDmaapCertAuth\": true,"
+ "\"streams_subscribes\": {"
+ "\"config_key_1\": {"
+ "\"type\": \"message_router\","
@@ -203,6 +209,12 @@ class ConsulConfigurationGatewayTest {
.cpeAuthConfigKey("config_key_1")
.closeLoopConfigKey("config_key_3")
.loggingLevel("TRACE")
+ .keyStorePath("/opt/app/bbs-event-processor/etc/cert/key.p12")
+ .keyStorePasswordPath("/opt/app/bbs-event-processor/etc/cert/key.pass")
+ .trustStorePath("/opt/app/bbs-event-processor/etc/cert/trust.jks")
+ .trustStorePasswordPath("/opt/app/bbs-event-processor/etc/cert/trust.pass")
+ .enableAaiCertAuth(true)
+ .enableDmaapCertAuth(true)
.streamSubscribesMap(subscribes)
.streamPublishesMap(Collections.singletonMap("config_key_3", streamsObject3))
.build();
diff --git a/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/pipelines/CpeAuthenticationPipelineTest.java b/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/pipelines/CpeAuthenticationPipelineTest.java
index 76d9659c..c4bef9df 100644
--- a/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/pipelines/CpeAuthenticationPipelineTest.java
+++ b/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/pipelines/CpeAuthenticationPipelineTest.java
@@ -64,15 +64,13 @@ import org.onap.bbs.event.processor.model.ServiceInstanceAaiObject;
import org.onap.bbs.event.processor.tasks.AaiClientTask;
import org.onap.bbs.event.processor.tasks.DmaapCpeAuthenticationConsumerTask;
import org.onap.bbs.event.processor.tasks.DmaapPublisherTask;
+import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpResponse;
import org.springframework.http.HttpStatus;
-import org.springframework.http.ResponseEntity;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
-// We can safely suppress unchecked assignment warnings for the ResponseEntity mock
-@SuppressWarnings("unchecked")
@DisplayName("CPE Authentication Pipeline Unit-Tests")
class CpeAuthenticationPipelineTest {
@@ -82,12 +80,12 @@ class CpeAuthenticationPipelineTest {
private DmaapPublisherTask publisherTask;
private AaiClientTask aaiClientTask;
- private ResponseEntity<String> responseEntity;
+ private HttpResponse httpResponse;
@BeforeEach
void setup() {
- responseEntity = Mockito.mock(ResponseEntity.class);
+ httpResponse = Mockito.mock(HttpResponse.class);
configuration = Mockito.mock(ApplicationConfiguration.class);
consumerTask = Mockito.mock(DmaapCpeAuthenticationConsumerTask.class);
@@ -268,13 +266,13 @@ class CpeAuthenticationPipelineTest {
.executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, cfsUrl))
.thenReturn(Mono.just(hsiCfsServiceInstance));
- when(responseEntity.getStatusCode()).thenReturn(HttpStatus.valueOf(HttpStatus.OK.value()));
- when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(responseEntity));
+ when(httpResponse.statusCode()).thenReturn(HttpStatus.OK.value());
+ when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(httpResponse));
// Execute the pipeline
StepVerifier.create(pipeline.executePipeline())
.expectSubscription()
- .assertNext(r -> assertEquals(HttpStatus.OK, r.getStatusCode()))
+ .assertNext(r -> assertEquals(HttpStatus.OK.value(), r.statusCode()))
.verifyComplete();
verify(publisherTask).execute(any(ControlLoopPublisherDmaapModel.class));
@@ -341,14 +339,14 @@ class CpeAuthenticationPipelineTest {
.executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, cfsUrl2))
.thenReturn(Mono.just(hsiCfsServiceInstance2));
- when(responseEntity.getStatusCode()).thenReturn(HttpStatus.valueOf(HttpStatus.OK.value()));
- when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(responseEntity));
+ when(httpResponse.statusCode()).thenReturn(HttpStatus.OK.value());
+ when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(httpResponse));
// Execute the pipeline
StepVerifier.create(pipeline.executePipeline())
.expectSubscription()
- .assertNext(r -> assertEquals(HttpStatus.OK, r.getStatusCode()))
- .assertNext(r -> assertEquals(HttpStatus.OK, r.getStatusCode()))
+ .assertNext(r -> assertEquals(HttpStatus.OK.value(), r.statusCode()))
+ .assertNext(r -> assertEquals(HttpStatus.OK.value(), r.statusCode()))
.verifyComplete();
verify(publisherTask, times(2)).execute(any(ControlLoopPublisherDmaapModel.class));
@@ -452,13 +450,13 @@ class CpeAuthenticationPipelineTest {
.executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, cfsUrl2))
.thenReturn(Mono.just(hsiCfsServiceInstance2));
- when(responseEntity.getStatusCode()).thenReturn(HttpStatus.valueOf(HttpStatus.OK.value()));
- when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(responseEntity));
+ when(httpResponse.statusCode()).thenReturn(HttpStatus.OK.value());
+ when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(httpResponse));
// Execute the pipeline
StepVerifier.create(pipeline.executePipeline())
.expectSubscription()
- .assertNext(r -> assertEquals(HttpStatus.OK, r.getStatusCode()))
+ .assertNext(r -> assertEquals(HttpStatus.OK.value(), r.statusCode()))
.verifyComplete();
verify(publisherTask).execute(any(ControlLoopPublisherDmaapModel.class));
@@ -512,13 +510,13 @@ class CpeAuthenticationPipelineTest {
.executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, cfsUrl))
.thenReturn(Mono.just(hsiCfsServiceInstance));
- when(responseEntity.getStatusCode()).thenReturn(HttpStatus.valueOf(HttpStatus.OK.value()));
- when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(responseEntity));
+ when(httpResponse.statusCode()).thenReturn(HttpStatus.OK.value());
+ when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(httpResponse));
// Execute the pipeline
StepVerifier.create(pipeline.executePipeline())
.expectSubscription()
- .assertNext(r -> assertEquals(HttpStatus.OK, r.getStatusCode()))
+ .assertNext(r -> assertEquals(HttpStatus.OK.value(), r.statusCode()))
.verifyComplete();
verify(aaiClientTask, times(2)).executePnfRetrieval(anyString(), anyString());
@@ -574,13 +572,13 @@ class CpeAuthenticationPipelineTest {
.executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, cfsUrl))
.thenReturn(Mono.just(hsiCfsServiceInstance));
- when(responseEntity.getStatusCode()).thenReturn(HttpStatus.valueOf(HttpStatus.OK.value()));
- when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(responseEntity));
+ when(httpResponse.statusCode()).thenReturn(HttpStatus.OK.value());
+ when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(httpResponse));
// Execute the pipeline
StepVerifier.create(pipeline.executePipeline())
.expectSubscription()
- .assertNext(r -> assertEquals(HttpStatus.OK, r.getStatusCode()))
+ .assertNext(r -> assertEquals(HttpStatus.OK.value(), r.statusCode()))
.verifyComplete();
verify(aaiClientTask, times(2))
diff --git a/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/pipelines/ReRegistrationPipelineTest.java b/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/pipelines/ReRegistrationPipelineTest.java
index a1b6b148..9453db3d 100644
--- a/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/pipelines/ReRegistrationPipelineTest.java
+++ b/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/pipelines/ReRegistrationPipelineTest.java
@@ -64,15 +64,13 @@ import org.onap.bbs.event.processor.model.ServiceInstanceAaiObject;
import org.onap.bbs.event.processor.tasks.AaiClientTask;
import org.onap.bbs.event.processor.tasks.DmaapPublisherTask;
import org.onap.bbs.event.processor.tasks.DmaapReRegistrationConsumerTask;
+import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpResponse;
import org.springframework.http.HttpStatus;
-import org.springframework.http.ResponseEntity;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
-// We can safely suppress unchecked assignment warnings for the ResponseEntity mock
-@SuppressWarnings("unchecked")
@DisplayName("PNF Re-registration Pipeline Unit-Tests")
class ReRegistrationPipelineTest {
@@ -82,12 +80,12 @@ class ReRegistrationPipelineTest {
private DmaapPublisherTask publisherTask;
private AaiClientTask aaiClientTask;
- private ResponseEntity<String> responseEntity;
+ private HttpResponse httpResponse;
@BeforeEach
void setup() {
- responseEntity = Mockito.mock(ResponseEntity.class);
+ httpResponse = Mockito.mock(HttpResponse.class);
configuration = Mockito.mock(ApplicationConfiguration.class);
consumerTask = Mockito.mock(DmaapReRegistrationConsumerTask.class);
@@ -262,8 +260,8 @@ class ReRegistrationPipelineTest {
.executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, cfsUrl))
.thenReturn(Mono.just(hsiCfsServiceInstance));
- when(responseEntity.getStatusCode()).thenReturn(HttpStatus.valueOf(HttpStatus.OK.value()));
- when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(responseEntity));
+ when(httpResponse.statusCode()).thenReturn(HttpStatus.OK.value());
+ when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(httpResponse));
// Execute the pipeline
StepVerifier.create(pipeline.executePipeline())
@@ -312,13 +310,13 @@ class ReRegistrationPipelineTest {
.executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, cfsUrl))
.thenReturn(Mono.just(hsiCfsServiceInstance));
- when(responseEntity.getStatusCode()).thenReturn(HttpStatus.valueOf(HttpStatus.OK.value()));
- when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(responseEntity));
+ when(httpResponse.statusCode()).thenReturn(HttpStatus.OK.value());
+ when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(httpResponse));
// Execute the pipeline
StepVerifier.create(pipeline.executePipeline())
.expectSubscription()
- .assertNext(r -> assertEquals(HttpStatus.OK, r.getStatusCode()))
+ .assertNext(r -> assertEquals(HttpStatus.OK.value(), r.statusCode()))
.verifyComplete();
verify(publisherTask).execute(any(ControlLoopPublisherDmaapModel.class));
@@ -384,14 +382,14 @@ class ReRegistrationPipelineTest {
.executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, cfsUrl2))
.thenReturn(Mono.just(hsiCfsServiceInstance2));
- when(responseEntity.getStatusCode()).thenReturn(HttpStatus.valueOf(HttpStatus.OK.value()));
- when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(responseEntity));
+ when(httpResponse.statusCode()).thenReturn(HttpStatus.OK.value());
+ when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(httpResponse));
// Execute the pipeline
StepVerifier.create(pipeline.executePipeline())
.expectSubscription()
- .assertNext(r -> assertEquals(HttpStatus.OK, r.getStatusCode()))
- .assertNext(r -> assertEquals(HttpStatus.OK, r.getStatusCode()))
+ .assertNext(r -> assertEquals(HttpStatus.OK.value(), r.statusCode()))
+ .assertNext(r -> assertEquals(HttpStatus.OK.value(), r.statusCode()))
.verifyComplete();
verify(publisherTask, times(2)).execute(any(ControlLoopPublisherDmaapModel.class));
@@ -491,13 +489,13 @@ class ReRegistrationPipelineTest {
.executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, cfsUrl2))
.thenReturn(Mono.just(hsiCfsServiceInstance2));
- when(responseEntity.getStatusCode()).thenReturn(HttpStatus.valueOf(HttpStatus.OK.value()));
- when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(responseEntity));
+ when(httpResponse.statusCode()).thenReturn(HttpStatus.OK.value());
+ when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(httpResponse));
// Execute the pipeline
StepVerifier.create(pipeline.executePipeline())
.expectSubscription()
- .assertNext(r -> assertEquals(HttpStatus.OK, r.getStatusCode()))
+ .assertNext(r -> assertEquals(HttpStatus.OK.value(), r.statusCode()))
.verifyComplete();
verify(publisherTask).execute(any(ControlLoopPublisherDmaapModel.class));
@@ -551,13 +549,13 @@ class ReRegistrationPipelineTest {
.executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, cfsUrl))
.thenReturn(Mono.just(hsiCfsServiceInstance));
- when(responseEntity.getStatusCode()).thenReturn(HttpStatus.valueOf(HttpStatus.OK.value()));
- when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(responseEntity));
+ when(httpResponse.statusCode()).thenReturn(HttpStatus.OK.value());
+ when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(httpResponse));
// Execute the pipeline
StepVerifier.create(pipeline.executePipeline())
.expectSubscription()
- .assertNext(r -> assertEquals(HttpStatus.OK, r.getStatusCode()))
+ .assertNext(r -> assertEquals(HttpStatus.OK.value(), r.statusCode()))
.verifyComplete();
verify(aaiClientTask, times(2)).executePnfRetrieval(anyString(), anyString());
@@ -613,13 +611,13 @@ class ReRegistrationPipelineTest {
.executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, cfsUrl))
.thenReturn(Mono.just(hsiCfsServiceInstance));
- when(responseEntity.getStatusCode()).thenReturn(HttpStatus.valueOf(HttpStatus.OK.value()));
- when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(responseEntity));
+ when(httpResponse.statusCode()).thenReturn(HttpStatus.OK.value());
+ when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(httpResponse));
// Execute the pipeline
StepVerifier.create(pipeline.executePipeline())
.expectSubscription()
- .assertNext(r -> assertEquals(HttpStatus.OK, r.getStatusCode()))
+ .assertNext(r -> assertEquals(HttpStatus.OK.value(), r.statusCode()))
.verifyComplete();
verify(aaiClientTask, times(2))
diff --git a/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/tasks/DmaapCpeAuthenticationConsumerTaskImplTest.java b/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/tasks/DmaapCpeAuthenticationConsumerTaskImplTest.java
index 538ff1de..40bcb65d 100644
--- a/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/tasks/DmaapCpeAuthenticationConsumerTaskImplTest.java
+++ b/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/tasks/DmaapCpeAuthenticationConsumerTaskImplTest.java
@@ -26,6 +26,11 @@ import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+import com.google.gson.Gson;
+import com.google.gson.JsonElement;
+
+import java.util.Optional;
+
import javax.net.ssl.SSLException;
import org.junit.Assert;
@@ -62,6 +67,7 @@ class DmaapCpeAuthenticationConsumerTaskImplTest {
private static CpeAuthenticationConsumerDmaapModel cpeAuthenticationConsumerDmaapModel;
private static DMaaPConsumerReactiveHttpClient dMaaPConsumerReactiveHttpClient;
private static String eventsArray;
+ private static Gson gson = new Gson();
@BeforeAll
static void setUp() throws SSLException {
@@ -108,22 +114,25 @@ class DmaapCpeAuthenticationConsumerTaskImplTest {
@Test
void passingEmptyMessage_NothingHappens() throws Exception {
- when(dMaaPConsumerReactiveHttpClient.getDMaaPConsumerResponse()).thenReturn(Mono.just(""));
+ JsonElement empty = gson.toJsonTree("");
+ when(dMaaPConsumerReactiveHttpClient.getDMaaPConsumerResponse(Optional.empty())).thenReturn(Mono.just(empty));
StepVerifier.create(dmaapConsumerTask.execute("Sample input"))
.expectSubscription()
.expectError(EmptyDmaapResponseException.class);
- verify(dMaaPConsumerReactiveHttpClient).getDMaaPConsumerResponse();
+ verify(dMaaPConsumerReactiveHttpClient).getDMaaPConsumerResponse(Optional.empty());
}
@Test
void passingNormalMessage_ResponseSucceeds() throws Exception {
- when(dMaaPConsumerReactiveHttpClient.getDMaaPConsumerResponse()).thenReturn(Mono.just(eventsArray));
+ JsonElement normalEventsArray = gson.toJsonTree(eventsArray);
+ when(dMaaPConsumerReactiveHttpClient.getDMaaPConsumerResponse(Optional.empty()))
+ .thenReturn(Mono.just(normalEventsArray));
StepVerifier.create(dmaapConsumerTask.execute("Sample input"))
.expectSubscription()
.consumeNextWith(e -> Assert.assertEquals(e, cpeAuthenticationConsumerDmaapModel));
- verify(dMaaPConsumerReactiveHttpClient).getDMaaPConsumerResponse();
+ verify(dMaaPConsumerReactiveHttpClient).getDMaaPConsumerResponse(Optional.empty());
}
private static DmaapConsumerConfiguration testVersionOfDmaapConsumerConfiguration() {
diff --git a/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/tasks/DmaapPublisherTaskImplTest.java b/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/tasks/DmaapPublisherTaskImplTest.java
index 199a43ec..436206d2 100644
--- a/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/tasks/DmaapPublisherTaskImplTest.java
+++ b/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/tasks/DmaapPublisherTaskImplTest.java
@@ -30,6 +30,9 @@ import static org.mockito.Mockito.when;
import java.util.LinkedHashMap;
import java.util.Map;
+import java.util.Optional;
+
+import javax.net.ssl.SSLException;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
@@ -39,12 +42,12 @@ import org.onap.bbs.event.processor.config.ApplicationConfiguration;
import org.onap.bbs.event.processor.exceptions.DmaapException;
import org.onap.bbs.event.processor.model.ControlLoopPublisherDmaapModel;
import org.onap.bbs.event.processor.model.ImmutableControlLoopPublisherDmaapModel;
+import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpResponse;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapPublisherConfiguration;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.producer.DMaaPPublisherReactiveHttpClient;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.producer.PublisherReactiveHttpClientFactory;
import org.springframework.http.HttpStatus;
-import org.springframework.http.ResponseEntity;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
@@ -112,46 +115,46 @@ class DmaapPublisherTaskImplTest {
}
@Test
- void passingNormalMessage_ReactiveClientProcessesIt() throws DmaapException {
- ResponseEntity<String> responseEntity = setupMocks(HttpStatus.OK.value());
- when(responseEntity.getStatusCode()).thenReturn(HttpStatus.OK);
+ void passingNormalMessage_ReactiveClientProcessesIt() throws DmaapException, SSLException {
+ HttpResponse response = setupMocks(HttpStatus.OK.value());
+
StepVerifier.create(task.execute(controlLoopPublisherDmaapModel)).expectSubscription()
- .expectNext(responseEntity).verifyComplete();
+ .expectNext(response).verifyComplete();
verify(reactiveHttpClient, times(1))
- .getDMaaPProducerResponse(controlLoopPublisherDmaapModel);
+ .getDMaaPProducerResponse(controlLoopPublisherDmaapModel, Optional.empty());
verifyNoMoreInteractions(reactiveHttpClient);
}
@Test
- void passingNormalMessage_IncorrectResponseIsHandled() throws DmaapException {
- ResponseEntity<String> responseEntity = setupMocks(HttpStatus.UNAUTHORIZED.value());
- when(responseEntity.getStatusCode()).thenReturn(HttpStatus.UNAUTHORIZED);
+ void passingNormalMessage_IncorrectResponseIsHandled() throws DmaapException, SSLException {
+ HttpResponse response = setupMocks(HttpStatus.UNAUTHORIZED.value());
+
StepVerifier.create(task.execute(controlLoopPublisherDmaapModel)).expectSubscription()
- .expectNext(responseEntity).verifyComplete();
+ .expectNext(response).verifyComplete();
verify(reactiveHttpClient, times(1))
- .getDMaaPProducerResponse(controlLoopPublisherDmaapModel);
+ .getDMaaPProducerResponse(controlLoopPublisherDmaapModel, Optional.empty());
verifyNoMoreInteractions(reactiveHttpClient);
}
// We can safely suppress unchecked assignment warning here since it is a mock class
@SuppressWarnings("unchecked")
- private ResponseEntity<String> setupMocks(Integer httpResponseCode) {
+ private HttpResponse setupMocks(Integer httpResponseCode) throws SSLException {
- ResponseEntity<String> responseEntity = mock(ResponseEntity.class);
- when(responseEntity.getStatusCode()).thenReturn(HttpStatus.valueOf(httpResponseCode));
+ HttpResponse response = mock(HttpResponse.class);
+ when(response.statusCode()).thenReturn(httpResponseCode);
reactiveHttpClient = mock(DMaaPPublisherReactiveHttpClient.class);
- when(reactiveHttpClient.getDMaaPProducerResponse(any()))
- .thenReturn(Mono.just(responseEntity));
+ when(reactiveHttpClient.getDMaaPProducerResponse(any(), any(Optional.class)))
+ .thenReturn(Mono.just(response));
PublisherReactiveHttpClientFactory httpClientFactory = mock(PublisherReactiveHttpClientFactory.class);
doReturn(reactiveHttpClient).when(httpClientFactory).create(dmaapPublisherConfiguration);
task = new DmaapPublisherTaskImpl(configuration, httpClientFactory);
- return responseEntity;
+ return response;
}
private static DmaapPublisherConfiguration testVersionOfDmaapPublisherConfiguration() {
diff --git a/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/tasks/DmaapReRegistrationConsumerTaskImplTest.java b/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/tasks/DmaapReRegistrationConsumerTaskImplTest.java
index c9a461d8..72e28987 100644
--- a/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/tasks/DmaapReRegistrationConsumerTaskImplTest.java
+++ b/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/tasks/DmaapReRegistrationConsumerTaskImplTest.java
@@ -26,6 +26,11 @@ import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+import com.google.gson.Gson;
+import com.google.gson.JsonElement;
+
+import java.util.Optional;
+
import javax.net.ssl.SSLException;
import org.junit.Assert;
@@ -59,7 +64,8 @@ class DmaapReRegistrationConsumerTaskImplTest {
private static DmaapReRegistrationConsumerTaskImpl dmaapConsumerTask;
private static ReRegistrationConsumerDmaapModel reRegistrationConsumerDmaapModel;
private static DMaaPConsumerReactiveHttpClient dMaaPConsumerReactiveHttpClient;
- private static String message;
+ private static String eventsArray;
+ private static Gson gson = new Gson();
@BeforeAll
static void setUp() throws SSLException {
@@ -91,12 +97,10 @@ class DmaapReRegistrationConsumerTaskImplTest {
.sVlan(svlan)
.build();
- message = String.format("[" + RE_REGISTRATION_EVENT_TEMPLATE + "]",
- sourceName,
- attachmentPoint,
- remoteId,
- cvlan,
- svlan);
+ String event = String.format(RE_REGISTRATION_EVENT_TEMPLATE, sourceName, attachmentPoint, remoteId,
+ cvlan, svlan);
+
+ eventsArray = "[" + event + "]";
}
@AfterEach
@@ -105,23 +109,26 @@ class DmaapReRegistrationConsumerTaskImplTest {
}
@Test
- void passingEmptyMessage_NothingHappens() throws Exception {
- when(dMaaPConsumerReactiveHttpClient.getDMaaPConsumerResponse()).thenReturn(Mono.just(""));
+ void passingEmptyMessage_NothingHappens() {
+ JsonElement empty = gson.toJsonTree("");
+ when(dMaaPConsumerReactiveHttpClient.getDMaaPConsumerResponse(Optional.empty())).thenReturn(Mono.just(empty));
StepVerifier.create(dmaapConsumerTask.execute("Sample input"))
.expectSubscription()
.expectError(EmptyDmaapResponseException.class);
- verify(dMaaPConsumerReactiveHttpClient).getDMaaPConsumerResponse();
+ verify(dMaaPConsumerReactiveHttpClient).getDMaaPConsumerResponse(Optional.empty());
}
@Test
- void passingNormalMessage_ResponseSucceeds() throws Exception {
- when(dMaaPConsumerReactiveHttpClient.getDMaaPConsumerResponse()).thenReturn(Mono.just(message));
+ void passingNormalMessage_ResponseSucceeds() {
+ JsonElement normalEventsArray = gson.toJsonTree(eventsArray);
+ when(dMaaPConsumerReactiveHttpClient.getDMaaPConsumerResponse(Optional.empty()))
+ .thenReturn(Mono.just(normalEventsArray));
StepVerifier.create(dmaapConsumerTask.execute("Sample input"))
.expectSubscription()
.consumeNextWith(e -> Assert.assertEquals(e, reRegistrationConsumerDmaapModel));
- verify(dMaaPConsumerReactiveHttpClient).getDMaaPConsumerResponse();
+ verify(dMaaPConsumerReactiveHttpClient).getDMaaPConsumerResponse(Optional.empty());
}
private static DmaapConsumerConfiguration testVersionOfDmaapConsumerConfiguration() {
diff --git a/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/utilities/CpeAuthenticationDmaapConsumerJsonParserTest.java b/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/utilities/CpeAuthenticationDmaapConsumerJsonParserTest.java
index 4ca61f5e..c7ad7935 100644
--- a/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/utilities/CpeAuthenticationDmaapConsumerJsonParserTest.java
+++ b/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/utilities/CpeAuthenticationDmaapConsumerJsonParserTest.java
@@ -24,7 +24,9 @@ import static org.mockito.Mockito.spy;
import com.google.gson.JsonElement;
import com.google.gson.JsonParser;
+import com.google.gson.stream.JsonReader;
+import java.io.StringReader;
import java.util.Optional;
import org.junit.jupiter.api.BeforeAll;
@@ -100,14 +102,17 @@ class CpeAuthenticationDmaapConsumerJsonParserTest {
}
@Test
- void passingNonJson_EmptyFluxIsReturned() {
+ void passingNonJson_getIllegalStateException() {
CpeAuthenticationDmaapConsumerJsonParser consumerJsonParser =
new CpeAuthenticationDmaapConsumerJsonParser();
+ JsonReader jsonReader = new JsonReader(new StringReader("not JSON"));
+ jsonReader.setLenient(true);
+ JsonElement notJson = jsonParser.parse(jsonReader).getAsJsonPrimitive();
- StepVerifier.create(consumerJsonParser.extractModelFromDmaap(Mono.just("not JSON")))
+ StepVerifier.create(consumerJsonParser.extractModelFromDmaap(Mono.just(notJson)))
.expectSubscription()
- .verifyComplete();
+ .verifyError(IllegalStateException.class);
}
@Test
@@ -116,7 +121,7 @@ class CpeAuthenticationDmaapConsumerJsonParserTest {
CpeAuthenticationDmaapConsumerJsonParser consumerJsonParser =
new CpeAuthenticationDmaapConsumerJsonParser();
- StepVerifier.create(consumerJsonParser.extractModelFromDmaap(Mono.just("[]")))
+ StepVerifier.create(consumerJsonParser.extractModelFromDmaap(Mono.just(jsonParser.parse("[]"))))
.expectSubscription()
.verifyComplete();
}
@@ -151,7 +156,7 @@ class CpeAuthenticationDmaapConsumerJsonParserTest {
.swVersion(swVersion)
.build();
- StepVerifier.create(consumerJsonParser.extractModelFromDmaap(Mono.just(eventsArray)))
+ StepVerifier.create(consumerJsonParser.extractModelFromDmaap(Mono.just(jsonParser.parse(eventsArray))))
.expectSubscription()
.expectNext(expectedEventObject);
}
@@ -182,7 +187,7 @@ class CpeAuthenticationDmaapConsumerJsonParserTest {
Mockito.doReturn(Optional.of(jsonElement2.getAsJsonObject()))
.when(consumerJsonParser).getJsonObjectFromAnArray(jsonElement2);
- String eventsArray = "[" + firstEvent + secondEvent + "]";
+ String eventsArray = "[" + firstEvent + "," + secondEvent + "]";
CpeAuthenticationConsumerDmaapModel expectedFirstEventObject =
ImmutableCpeAuthenticationConsumerDmaapModel.builder()
@@ -203,7 +208,7 @@ class CpeAuthenticationDmaapConsumerJsonParserTest {
.swVersion(swVersion)
.build();
- StepVerifier.create(consumerJsonParser.extractModelFromDmaap(Mono.just(eventsArray)))
+ StepVerifier.create(consumerJsonParser.extractModelFromDmaap(Mono.just(jsonParser.parse(eventsArray))))
.expectSubscription()
.expectNext(expectedFirstEventObject)
.expectNext(expectedSecondEventObject);
@@ -229,7 +234,7 @@ class CpeAuthenticationDmaapConsumerJsonParserTest {
String eventsArray = "[" + event + "]";
- StepVerifier.create(consumerJsonParser.extractModelFromDmaap(Mono.just(eventsArray)))
+ StepVerifier.create(consumerJsonParser.extractModelFromDmaap(Mono.just(jsonParser.parse(eventsArray))))
.expectSubscription()
.verifyComplete();
}
@@ -254,7 +259,7 @@ class CpeAuthenticationDmaapConsumerJsonParserTest {
String eventsArray = "[" + event + "]";
- StepVerifier.create(consumerJsonParser.extractModelFromDmaap(Mono.just(eventsArray)))
+ StepVerifier.create(consumerJsonParser.extractModelFromDmaap(Mono.just(jsonParser.parse(eventsArray))))
.expectSubscription()
.verifyComplete();
}
@@ -279,7 +284,7 @@ class CpeAuthenticationDmaapConsumerJsonParserTest {
String eventsArray = "[" + event + "]";
- StepVerifier.create(consumerJsonParser.extractModelFromDmaap(Mono.just(eventsArray)))
+ StepVerifier.create(consumerJsonParser.extractModelFromDmaap(Mono.just(jsonParser.parse(eventsArray))))
.expectSubscription()
.verifyComplete();
}
@@ -304,7 +309,7 @@ class CpeAuthenticationDmaapConsumerJsonParserTest {
String eventsArray = "[" + event + "]";
- StepVerifier.create(consumerJsonParser.extractModelFromDmaap(Mono.just(eventsArray)))
+ StepVerifier.create(consumerJsonParser.extractModelFromDmaap(Mono.just(jsonParser.parse(eventsArray))))
.expectSubscription()
.verifyComplete();
}
diff --git a/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/utilities/ReRegistrationDmaapConsumerJsonParserTest.java b/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/utilities/ReRegistrationDmaapConsumerJsonParserTest.java
index ca448dd0..cd238e2d 100644
--- a/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/utilities/ReRegistrationDmaapConsumerJsonParserTest.java
+++ b/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/utilities/ReRegistrationDmaapConsumerJsonParserTest.java
@@ -24,7 +24,9 @@ import static org.mockito.Mockito.spy;
import com.google.gson.JsonElement;
import com.google.gson.JsonParser;
+import com.google.gson.stream.JsonReader;
+import java.io.StringReader;
import java.util.Optional;
import org.junit.jupiter.api.BeforeAll;
@@ -89,21 +91,22 @@ class ReRegistrationDmaapConsumerJsonParserTest {
}
@Test
- void passingNonJson_EmptyFluxIsReturned() {
+ void passingNonJson_getIllegalStateException() {
ReRegistrationDmaapConsumerJsonParser consumerJsonParser = new ReRegistrationDmaapConsumerJsonParser();
-
- StepVerifier.create(consumerJsonParser.extractModelFromDmaap(Mono.just("not JSON")))
+ JsonReader jsonReader = new JsonReader(new StringReader("not JSON"));
+ jsonReader.setLenient(true);
+ JsonElement notJson = jsonParser.parse(jsonReader).getAsJsonPrimitive();
+ StepVerifier.create(consumerJsonParser.extractModelFromDmaap(Mono.just(notJson)))
.expectSubscription()
- .verifyComplete();
+ .verifyError(IllegalStateException.class);
}
@Test
void passingNoEvents_EmptyFluxIsReturned() {
ReRegistrationDmaapConsumerJsonParser consumerJsonParser = new ReRegistrationDmaapConsumerJsonParser();
-
- StepVerifier.create(consumerJsonParser.extractModelFromDmaap(Mono.just("[]")))
+ StepVerifier.create(consumerJsonParser.extractModelFromDmaap(Mono.just(jsonParser.parse("[]"))))
.expectSubscription()
.verifyComplete();
}
@@ -135,7 +138,7 @@ class ReRegistrationDmaapConsumerJsonParserTest {
.sVlan(svlan)
.build();
- StepVerifier.create(consumerJsonParser.extractModelFromDmaap(Mono.just(eventsArray)))
+ StepVerifier.create(consumerJsonParser.extractModelFromDmaap(Mono.just(jsonParser.parse(eventsArray))))
.expectSubscription()
.expectNext(expectedEventObject);
}
@@ -165,7 +168,7 @@ class ReRegistrationDmaapConsumerJsonParserTest {
Mockito.doReturn(Optional.of(jsonElement2.getAsJsonObject()))
.when(consumerJsonParser).getJsonObjectFromAnArray(jsonElement2);
- String eventsArray = "[" + firstEvent + secondEvent + "]";
+ String eventsArray = "[" + firstEvent + "," + secondEvent + "]";
ReRegistrationConsumerDmaapModel expectedFirstEventObject = ImmutableReRegistrationConsumerDmaapModel.builder()
.correlationId(correlationId1)
@@ -182,7 +185,7 @@ class ReRegistrationDmaapConsumerJsonParserTest {
.sVlan(svlan)
.build();
- StepVerifier.create(consumerJsonParser.extractModelFromDmaap(Mono.just(eventsArray)))
+ StepVerifier.create(consumerJsonParser.extractModelFromDmaap(Mono.just(jsonParser.parse(eventsArray))))
.expectSubscription()
.expectNext(expectedFirstEventObject)
.expectNext(expectedSecondEventObject);
@@ -209,7 +212,7 @@ class ReRegistrationDmaapConsumerJsonParserTest {
String eventsArray = "[" + event + "]";
- StepVerifier.create(consumerJsonParser.extractModelFromDmaap(Mono.just(eventsArray)))
+ StepVerifier.create(consumerJsonParser.extractModelFromDmaap(Mono.just(jsonParser.parse(eventsArray))))
.expectSubscription()
.verifyComplete();
}
@@ -235,7 +238,7 @@ class ReRegistrationDmaapConsumerJsonParserTest {
String eventsArray = "[" + event + "]";
- StepVerifier.create(consumerJsonParser.extractModelFromDmaap(Mono.just(eventsArray)))
+ StepVerifier.create(consumerJsonParser.extractModelFromDmaap(Mono.just(jsonParser.parse(eventsArray))))
.expectSubscription()
.verifyComplete();
}
@@ -261,7 +264,7 @@ class ReRegistrationDmaapConsumerJsonParserTest {
String eventsArray = "[" + event + "]";
- StepVerifier.create(consumerJsonParser.extractModelFromDmaap(Mono.just(eventsArray)))
+ StepVerifier.create(consumerJsonParser.extractModelFromDmaap(Mono.just(jsonParser.parse(eventsArray))))
.expectSubscription()
.verifyComplete();
}
@@ -289,7 +292,7 @@ class ReRegistrationDmaapConsumerJsonParserTest {
String eventsArray = "[" + event + "]";
- StepVerifier.create(consumerJsonParser.extractModelFromDmaap(Mono.just(eventsArray)))
+ StepVerifier.create(consumerJsonParser.extractModelFromDmaap(Mono.just(jsonParser.parse(eventsArray))))
.expectSubscription()
.verifyComplete();
}