summaryrefslogtreecommitdiffstats
path: root/components/bbs-event-processor/src/main/java
diff options
context:
space:
mode:
Diffstat (limited to 'components/bbs-event-processor/src/main/java')
-rw-r--r--components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/config/ApplicationConfiguration.java15
-rw-r--r--components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/config/ConsulConfigurationGateway.java19
-rw-r--r--components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/model/GeneratedAppConfigObject.java19
-rw-r--r--components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/pipelines/CpeAuthenticationPipeline.java24
-rw-r--r--components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/pipelines/ReRegistrationPipeline.java24
-rw-r--r--components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapCpeAuthenticationConsumerTaskImpl.java8
-rw-r--r--components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapPublisherTask.java4
-rw-r--r--components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapPublisherTaskImpl.java27
-rw-r--r--components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapReRegistrationConsumerTaskImpl.java8
-rw-r--r--components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/AaiReactiveClient.java1
-rw-r--r--components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/CpeAuthenticationDmaapConsumerJsonParser.java16
-rw-r--r--components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/ReRegistrationDmaapConsumerJsonParser.java16
12 files changed, 120 insertions, 61 deletions
diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/config/ApplicationConfiguration.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/config/ApplicationConfiguration.java
index 981d9633..5022a693 100644
--- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/config/ApplicationConfiguration.java
+++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/config/ApplicationConfiguration.java
@@ -174,6 +174,13 @@ public class ApplicationConfiguration implements ConfigurationChangeObservable {
synchronized (this) {
cbsPollingInterval = newConfiguration.cbsPollingIntervalSec();
+ securityProperties.setEnableAaiCertAuth(newConfiguration.enableAaiCertAuth());
+ securityProperties.setEnableDmaapCertAuth(newConfiguration.enableDmaapCertAuth());
+ securityProperties.setKeyStorePath(newConfiguration.keyStorePath());
+ securityProperties.setKeyStorePasswordPath(newConfiguration.keyStorePasswordPath());
+ securityProperties.setTrustStorePath(newConfiguration.trustStorePath());
+ securityProperties.setTrustStorePasswordPath(newConfiguration.trustStorePasswordPath());
+
GeneratedAppConfigObject.StreamsObject reRegObject =
getStreamsObject(newConfiguration.streamSubscribesMap(), newConfiguration.reRegConfigKey(),
"PNF Re-Registration");
@@ -181,6 +188,8 @@ public class ApplicationConfiguration implements ConfigurationChangeObservable {
dmaapReRegistrationConsumerProperties.setDmaapHostName(topicUrlInfo.getHost());
dmaapReRegistrationConsumerProperties.setDmaapPortNumber(topicUrlInfo.getPort());
dmaapReRegistrationConsumerProperties.setDmaapProtocol(newConfiguration.dmaapProtocol());
+ dmaapReRegistrationConsumerProperties.setDmaapUserName(reRegObject.aafUsername());
+ dmaapReRegistrationConsumerProperties.setDmaapUserPassword(reRegObject.aafPassword());
dmaapReRegistrationConsumerProperties.setDmaapContentType(newConfiguration.dmaapContentType());
dmaapReRegistrationConsumerProperties.setDmaapTopicName(topicUrlInfo.getTopicName());
dmaapReRegistrationConsumerProperties.setConsumerId(newConfiguration.dmaapConsumerConsumerId());
@@ -196,6 +205,8 @@ public class ApplicationConfiguration implements ConfigurationChangeObservable {
dmaapCpeAuthenticationConsumerProperties.setDmaapHostName(topicUrlInfo.getHost());
dmaapCpeAuthenticationConsumerProperties.setDmaapPortNumber(topicUrlInfo.getPort());
dmaapCpeAuthenticationConsumerProperties.setDmaapProtocol(newConfiguration.dmaapProtocol());
+ dmaapCpeAuthenticationConsumerProperties.setDmaapUserName(cpeAuthObject.aafUsername());
+ dmaapCpeAuthenticationConsumerProperties.setDmaapUserPassword(cpeAuthObject.aafPassword());
dmaapCpeAuthenticationConsumerProperties.setDmaapContentType(newConfiguration.dmaapContentType());
dmaapCpeAuthenticationConsumerProperties.setDmaapTopicName(topicUrlInfo.getTopicName());
dmaapCpeAuthenticationConsumerProperties.setConsumerId(newConfiguration.dmaapConsumerConsumerId());
@@ -211,6 +222,8 @@ public class ApplicationConfiguration implements ConfigurationChangeObservable {
dmaapProducerProperties.setDmaapHostName(topicUrlInfo.getHost());
dmaapProducerProperties.setDmaapPortNumber(topicUrlInfo.getPort());
dmaapProducerProperties.setDmaapProtocol(newConfiguration.dmaapProtocol());
+ dmaapProducerProperties.setDmaapUserName(closeLoopObject.aafUsername());
+ dmaapProducerProperties.setDmaapUserPassword(closeLoopObject.aafPassword());
dmaapProducerProperties.setDmaapContentType(newConfiguration.dmaapContentType());
dmaapProducerProperties.setDmaapTopicName(topicUrlInfo.getTopicName());
constructDmaapProducerConfiguration();
@@ -361,7 +374,7 @@ public class ApplicationConfiguration implements ConfigurationChangeObservable {
throw new ConfigurationParsingException("Wrong topic name structure");
}
topicUrlInfo.setPort(Integer.valueOf(tokensAfterHost[0]));
- topicUrlInfo.setTopicName("/events/" + tokensAfterHost[1]);
+ topicUrlInfo.setTopicName("events/" + tokensAfterHost[1]);
return topicUrlInfo;
}
diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/config/ConsulConfigurationGateway.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/config/ConsulConfigurationGateway.java
index 1d27fc0a..607b3b31 100644
--- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/config/ConsulConfigurationGateway.java
+++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/config/ConsulConfigurationGateway.java
@@ -39,6 +39,8 @@ import org.onap.bbs.event.processor.model.ImmutableDmaapInfo;
import org.onap.bbs.event.processor.model.ImmutableGeneratedAppConfigObject;
import org.onap.bbs.event.processor.model.ImmutableStreamsObject;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClientFactory;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsRequests;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.CbsRequest;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperties;
import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext;
import org.slf4j.Logger;
@@ -123,12 +125,12 @@ public class ConsulConfigurationGateway {
// Necessary properties from the environment (Consul host:port, service-name (hostname), CBS name)
EnvProperties env = EnvProperties.fromEnvironment();
-
+ CbsRequest cbsRequest = CbsRequests.getConfiguration(diagnosticContext);
// Create the client and use it to get the configuration
cbsFetchPipeline = CbsClientFactory.createCbsClient(env)
.doOnError(e -> LOGGER.warn("CBS Configuration fetch failed with error: {}", e))
.retry(e -> true)
- .flatMapMany(cbsClient -> cbsClient.updates(diagnosticContext, initialDelay, period))
+ .flatMapMany(cbsClient -> cbsClient.updates(cbsRequest, initialDelay, period))
.subscribe(this::parseConsulRetrievedConfiguration, this::handleErrors);
}
@@ -178,6 +180,13 @@ public class ConsulConfigurationGateway {
final String loggingLevel = configObject.get("application.loggingLevel").getAsString();
+ final String keyStorePath = configObject.get("application.ssl.keyStorePath").getAsString();
+ final String keyStorePasswordPath = configObject.get("application.ssl.keyStorePasswordPath").getAsString();
+ final String trustStorePath = configObject.get("application.ssl.trustStorePath").getAsString();
+ final String trustStorePasswordPath = configObject.get("application.ssl.trustStorePasswordPath").getAsString();
+ final boolean aaiEnableCertAuth = configObject.get("application.ssl.enableAaiCertAuth").getAsBoolean();
+ final boolean dmaapEnableCertAuth = configObject.get("application.ssl.enableDmaapCertAuth").getAsBoolean();
+
final JsonObject streamsPublishes = configObject.getAsJsonObject("streams_publishes");
final JsonObject streamsSubscribes = configObject.getAsJsonObject("streams_subscribes");
@@ -211,6 +220,12 @@ public class ConsulConfigurationGateway {
.cpeAuthConfigKey(cpeAuthConfigKey)
.closeLoopConfigKey(closeLoopConfigKey)
.loggingLevel(loggingLevel)
+ .keyStorePath(keyStorePath)
+ .keyStorePasswordPath(keyStorePasswordPath)
+ .trustStorePath(trustStorePath)
+ .trustStorePasswordPath(trustStorePasswordPath)
+ .enableAaiCertAuth(aaiEnableCertAuth)
+ .enableDmaapCertAuth(dmaapEnableCertAuth)
.streamSubscribesMap(parseStreamsObjects(streamsSubscribes))
.streamPublishesMap(parseStreamsObjects(streamsPublishes))
.build();
diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/model/GeneratedAppConfigObject.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/model/GeneratedAppConfigObject.java
index 4fdb81be..41a8a34f 100644
--- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/model/GeneratedAppConfigObject.java
+++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/model/GeneratedAppConfigObject.java
@@ -123,6 +123,25 @@ public interface GeneratedAppConfigObject {
@SerializedName(value = "application.loggingLevel", alternate = "application.loggingLevel")
String loggingLevel();
+ @SerializedName(value = "application.ssl.trustStorePath", alternate = "application.ssl.trustStorePath")
+ String trustStorePath();
+
+ @SerializedName(value = "application.ssl.trustStorePasswordPath",
+ alternate = "application.ssl.trustStorePasswordPath")
+ String trustStorePasswordPath();
+
+ @SerializedName(value = "application.ssl.keyStorePath", alternate = "application.ssl.keyStorePath")
+ String keyStorePath();
+
+ @SerializedName(value = "application.ssl.keyStorePasswordPath", alternate = "application.ssl.keyStorePasswordPath")
+ String keyStorePasswordPath();
+
+ @SerializedName(value = "application.ssl.enableAaiCertAuth", alternate = "application.ssl.enableAaiCertAuth")
+ boolean enableAaiCertAuth();
+
+ @SerializedName(value = "application.ssl.enableDmaapCertAuth", alternate = "application.ssl.enableDmaapCertAuth")
+ boolean enableDmaapCertAuth();
+
@SerializedName(value = "streams_subscribes", alternate = "streams_subscribes")
Map<String, StreamsObject> streamSubscribesMap();
diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/pipelines/CpeAuthenticationPipeline.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/pipelines/CpeAuthenticationPipeline.java
index 711ab185..a30903bb 100644
--- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/pipelines/CpeAuthenticationPipeline.java
+++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/pipelines/CpeAuthenticationPipeline.java
@@ -49,11 +49,11 @@ import org.onap.bbs.event.processor.model.ServiceInstanceAaiObject;
import org.onap.bbs.event.processor.tasks.AaiClientTask;
import org.onap.bbs.event.processor.tasks.DmaapCpeAuthenticationConsumerTask;
import org.onap.bbs.event.processor.tasks.DmaapPublisherTask;
+import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
@@ -99,7 +99,7 @@ public class CpeAuthenticationPipeline {
LOGGER.trace("Reactive CPE Authentication pipeline subscribed - Execution started");
}
- Flux<ResponseEntity<String>> executePipeline() {
+ Flux<HttpResponse> executePipeline() {
return
// Consume CPE Authentication from DMaaP
consumeCpeAuthenticationFromDmaap()
@@ -111,11 +111,11 @@ public class CpeAuthenticationPipeline {
.flatMap(this::triggerPolicy);
}
- private void onSuccess(ResponseEntity<String> responseCode) {
- MDC.put(RESPONSE_CODE, responseCode.getStatusCode().toString());
+ private void onSuccess(HttpResponse responseCode) {
+ MDC.put(RESPONSE_CODE, String.valueOf(responseCode.statusCode()));
LOGGER.info("CPE Authentication event successfully handled. "
+ "Publishing to DMaaP for Policy returned a status code of ({} {})",
- responseCode.getStatusCode().value(), responseCode.getStatusCode().getReasonPhrase());
+ responseCode.statusCode(), responseCode.statusReason());
MDC.remove(RESPONSE_CODE);
}
@@ -171,8 +171,10 @@ public class CpeAuthenticationPipeline {
.doOnError(TimeoutException.class,
e -> LOGGER.warn("Timed out waiting for A&AI response")
)
- .doOnError(e -> LOGGER.error("Error while retrieving PNF: {}",
- e.getMessage())
+ .doOnError(e -> {
+ LOGGER.error("Error while retrieving PNF: {}", e.getMessage());
+ LOGGER.debug("Error\n", e);
+ }
)
.onErrorResume(
e -> e instanceof Exception,
@@ -214,8 +216,10 @@ public class CpeAuthenticationPipeline {
.doOnError(TimeoutException.class,
e -> LOGGER.warn("Timed out waiting for A&AI response")
)
- .doOnError(e -> LOGGER.error("Error while retrieving HSI CFS Service instance: {}",
- e.getMessage())
+ .doOnError(e -> {
+ LOGGER.error("Error while retrieving HSI CFS Service instance: {}", e.getMessage());
+ LOGGER.debug("Error\n", e);
+ }
)
.onErrorResume(
e -> e instanceof Exception,
@@ -226,7 +230,7 @@ public class CpeAuthenticationPipeline {
});
}
- private Mono<ResponseEntity<String>> triggerPolicy(PipelineState state) {
+ private Mono<HttpResponse> triggerPolicy(PipelineState state) {
if (state == null || state.getHsiCfsServiceInstance() == null) {
return Mono.empty();
diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/pipelines/ReRegistrationPipeline.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/pipelines/ReRegistrationPipeline.java
index 9a42ed21..33a9aea7 100644
--- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/pipelines/ReRegistrationPipeline.java
+++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/pipelines/ReRegistrationPipeline.java
@@ -48,11 +48,11 @@ import org.onap.bbs.event.processor.model.ServiceInstanceAaiObject;
import org.onap.bbs.event.processor.tasks.AaiClientTask;
import org.onap.bbs.event.processor.tasks.DmaapPublisherTask;
import org.onap.bbs.event.processor.tasks.DmaapReRegistrationConsumerTask;
+import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
@@ -98,7 +98,7 @@ public class ReRegistrationPipeline {
LOGGER.trace("Reactive PNF Re-registration pipeline subscribed - Execution started");
}
- Flux<ResponseEntity<String>> executePipeline() {
+ Flux<HttpResponse> executePipeline() {
return
// Consume Re-Registration from DMaaP
consumeReRegistrationsFromDmaap()
@@ -110,11 +110,11 @@ public class ReRegistrationPipeline {
.flatMap(this::triggerPolicy);
}
- private void onSuccess(ResponseEntity<String> responseCode) {
- MDC.put(RESPONSE_CODE, responseCode.getStatusCode().toString());
+ private void onSuccess(HttpResponse responseCode) {
+ MDC.put(RESPONSE_CODE, String.valueOf(responseCode.statusCode()));
LOGGER.info("PNF Re-Registration event successfully handled. "
+ "Publishing to DMaaP for Policy returned a status code of ({} {})",
- responseCode.getStatusCode().value(), responseCode.getStatusCode().getReasonPhrase());
+ responseCode.statusCode(), responseCode.statusReason());
MDC.remove(RESPONSE_CODE);
}
@@ -170,8 +170,10 @@ public class ReRegistrationPipeline {
.doOnError(TimeoutException.class,
e -> LOGGER.warn("Timed out waiting for A&AI response")
)
- .doOnError(e -> LOGGER.error("Error while retrieving PNF: {}",
- e.getMessage())
+ .doOnError(e -> {
+ LOGGER.error("Error while retrieving PNF: {}", e.getMessage());
+ LOGGER.debug("Error\n", e);
+ }
)
.onErrorResume(
e -> e instanceof Exception,
@@ -219,8 +221,10 @@ public class ReRegistrationPipeline {
.doOnError(TimeoutException.class,
e -> LOGGER.warn("Timed out waiting for A&AI response")
)
- .doOnError(e -> LOGGER.error("Error while retrieving HSI CFS Service instance: {}",
- e.getMessage())
+ .doOnError(e -> {
+ LOGGER.error("Error while retrieving HSI CFS Service instance: {}", e.getMessage());
+ LOGGER.debug("Error\n", e);
+ }
)
.onErrorResume(
e -> e instanceof Exception,
@@ -259,7 +263,7 @@ public class ReRegistrationPipeline {
return isNotRelocation;
}
- private Mono<ResponseEntity<String>> triggerPolicy(PipelineState state) {
+ private Mono<HttpResponse> triggerPolicy(PipelineState state) {
if (state == null || state.getHsiCfsServiceInstance() == null) {
return Mono.empty();
diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapCpeAuthenticationConsumerTaskImpl.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapCpeAuthenticationConsumerTaskImpl.java
index e6bef523..da510281 100644
--- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapCpeAuthenticationConsumerTaskImpl.java
+++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapCpeAuthenticationConsumerTaskImpl.java
@@ -20,6 +20,10 @@
package org.onap.bbs.event.processor.tasks;
+import com.google.gson.JsonElement;
+
+import java.util.Optional;
+
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.net.ssl.SSLException;
@@ -85,6 +89,8 @@ public class DmaapCpeAuthenticationConsumerTaskImpl
public synchronized void updateConfiguration() {
try {
LOGGER.info("DMaaP CPE authentication consumer update due to new application configuration");
+ LOGGER.info("Creating secure context with:\n {}",
+ this.configuration.getDmaapCpeAuthenticationConsumerConfiguration());
httpClient = httpClientFactory.create(this.configuration.getDmaapCpeAuthenticationConsumerConfiguration());
} catch (SSLException e) {
LOGGER.error("SSL error while updating HTTP Client after a config update");
@@ -96,7 +102,7 @@ public class DmaapCpeAuthenticationConsumerTaskImpl
public Flux<CpeAuthenticationConsumerDmaapModel> execute(String taskName) {
LOGGER.debug("Executing task for CPE-Authentication with name \"{}\"", taskName);
DMaaPConsumerReactiveHttpClient httpClient = getHttpClient();
- Mono<String> response = httpClient.getDMaaPConsumerResponse();
+ Mono<JsonElement> response = httpClient.getDMaaPConsumerResponse(Optional.empty());
return cpeAuthenticationDmaapConsumerJsonParser.extractModelFromDmaap(response)
.switchIfEmpty(Flux.error(EMPTY_DMAAP_EXCEPTION))
.doOnError(e -> {
diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapPublisherTask.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapPublisherTask.java
index bddd2ecc..749c4e53 100644
--- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapPublisherTask.java
+++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapPublisherTask.java
@@ -21,11 +21,11 @@
package org.onap.bbs.event.processor.tasks;
import org.onap.bbs.event.processor.model.ControlLoopPublisherDmaapModel;
-import org.springframework.http.ResponseEntity;
+import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpResponse;
import reactor.core.publisher.Mono;
public interface DmaapPublisherTask {
- Mono<ResponseEntity<String>> execute(ControlLoopPublisherDmaapModel controlLoopPublisherDmaapModel);
+ Mono<HttpResponse> execute(ControlLoopPublisherDmaapModel controlLoopPublisherDmaapModel);
}
diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapPublisherTaskImpl.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapPublisherTaskImpl.java
index 7b227211..283e5ef9 100644
--- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapPublisherTaskImpl.java
+++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapPublisherTaskImpl.java
@@ -20,21 +20,24 @@
package org.onap.bbs.event.processor.tasks;
+import java.util.Optional;
+
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
+import javax.net.ssl.SSLException;
import org.onap.bbs.event.processor.config.ApplicationConfiguration;
import org.onap.bbs.event.processor.config.ConfigurationChangeObserver;
import org.onap.bbs.event.processor.exceptions.DmaapException;
import org.onap.bbs.event.processor.model.ControlLoopPublisherDmaapModel;
import org.onap.bbs.event.processor.utilities.ControlLoopJsonBodyBuilder;
+import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpResponse;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.producer.DMaaPPublisherReactiveHttpClient;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.producer.DmaaPRestTemplateFactory;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.producer.PublisherReactiveHttpClientFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Mono;
@@ -59,7 +62,12 @@ public class DmaapPublisherTaskImpl implements DmaapPublisherTask, Configuration
this.configuration = configuration;
this.httpClientFactory = httpClientFactory;
- httpClient = httpClientFactory.create(this.configuration.getDmaapPublisherConfiguration());
+ try {
+ httpClient = httpClientFactory.create(this.configuration.getDmaapPublisherConfiguration());
+ } catch (SSLException e) {
+ LOGGER.error("SSL error while creating HTTP Client: {}", e.getMessage());
+ LOGGER.debug("SSL exception\n", e);
+ }
}
@PostConstruct
@@ -75,17 +83,24 @@ public class DmaapPublisherTaskImpl implements DmaapPublisherTask, Configuration
@Override
public synchronized void updateConfiguration() {
LOGGER.info("DMaaP Publisher update due to new application configuration");
- httpClient = httpClientFactory.create(this.configuration.getDmaapPublisherConfiguration());
+ try {
+ LOGGER.info("Creating secure context with:\n {}", this.configuration.getDmaapPublisherConfiguration());
+ httpClient = httpClientFactory.create(this.configuration.getDmaapPublisherConfiguration());
+ } catch (SSLException e) {
+ LOGGER.error("SSL error while updating HTTP Client after a config update: {}", e.getMessage());
+ LOGGER.debug("SSL exception\n", e);
+ }
}
@Override
- public Mono<ResponseEntity<String>> execute(ControlLoopPublisherDmaapModel controlLoopPublisherDmaapModel) {
+ public Mono<HttpResponse> execute(ControlLoopPublisherDmaapModel controlLoopPublisherDmaapModel) {
if (controlLoopPublisherDmaapModel == null) {
throw new DmaapException("Cannot invoke a DMaaP Publish task with a null message");
}
- LOGGER.info("Executing task for publishing control loop message \n{}", controlLoopPublisherDmaapModel);
+ LOGGER.info("Executing task for publishing control loop message");
+ LOGGER.debug("CL message \n{}", controlLoopPublisherDmaapModel);
DMaaPPublisherReactiveHttpClient httpClient = getHttpClient();
- return httpClient.getDMaaPProducerResponse(controlLoopPublisherDmaapModel);
+ return httpClient.getDMaaPProducerResponse(controlLoopPublisherDmaapModel, Optional.empty());
}
private synchronized DMaaPPublisherReactiveHttpClient getHttpClient() {
diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapReRegistrationConsumerTaskImpl.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapReRegistrationConsumerTaskImpl.java
index 92f5a86f..e40037b1 100644
--- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapReRegistrationConsumerTaskImpl.java
+++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapReRegistrationConsumerTaskImpl.java
@@ -20,6 +20,10 @@
package org.onap.bbs.event.processor.tasks;
+import com.google.gson.JsonElement;
+
+import java.util.Optional;
+
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.net.ssl.SSLException;
@@ -85,6 +89,8 @@ public class DmaapReRegistrationConsumerTaskImpl implements DmaapReRegistrationC
public synchronized void updateConfiguration() {
try {
LOGGER.info("DMaaP PNF reregistration consumer update due to new application configuration");
+ LOGGER.info("Creating secure context with:\n {}",
+ this.configuration.getDmaapReRegistrationConsumerConfiguration());
httpClient = httpClientFactory.create(this.configuration.getDmaapReRegistrationConsumerConfiguration());
} catch (SSLException e) {
LOGGER.error("SSL error while updating HTTP Client after a config update");
@@ -96,7 +102,7 @@ public class DmaapReRegistrationConsumerTaskImpl implements DmaapReRegistrationC
public Flux<ReRegistrationConsumerDmaapModel> execute(String taskName) {
LOGGER.debug("Executing task for Re-Registration with name \"{}\"", taskName);
DMaaPConsumerReactiveHttpClient httpClient = getHttpClient();
- Mono<String> response = httpClient.getDMaaPConsumerResponse();
+ Mono<JsonElement> response = httpClient.getDMaaPConsumerResponse(Optional.empty());
return reRegistrationDmaapConsumerJsonParser.extractModelFromDmaap(response)
.switchIfEmpty(Flux.error(EMPTY_DMAAP_EXCEPTION))
.doOnError(e -> {
diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/AaiReactiveClient.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/AaiReactiveClient.java
index 19b81a80..84fc9f7d 100644
--- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/AaiReactiveClient.java
+++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/AaiReactiveClient.java
@@ -181,6 +181,7 @@ public class AaiReactiveClient implements ConfigurationChangeObserver {
private SslContext createSslContext() throws SSLException {
if (aaiClientConfiguration.enableAaiCertAuth()) {
+ LOGGER.info("Creating secure context with:\n {}", aaiClientConfiguration);
return sslFactory.createSecureContext(
aaiClientConfiguration.keyStorePath(),
aaiClientConfiguration.keyStorePasswordPath(),
diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/CpeAuthenticationDmaapConsumerJsonParser.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/CpeAuthenticationDmaapConsumerJsonParser.java
index 2bb5d98a..3cff4e65 100644
--- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/CpeAuthenticationDmaapConsumerJsonParser.java
+++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/CpeAuthenticationDmaapConsumerJsonParser.java
@@ -37,7 +37,6 @@ import com.google.gson.Gson;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
-import com.google.gson.JsonSyntaxException;
import java.util.Optional;
import java.util.stream.StreamSupport;
@@ -79,20 +78,9 @@ public class CpeAuthenticationDmaapConsumerJsonParser {
* @param dmaapResponse Response from DMaaP
* @return CPE Authentication Consumer DMaaP reactive model
*/
- public Flux<CpeAuthenticationConsumerDmaapModel> extractModelFromDmaap(Mono<String> dmaapResponse) {
+ public Flux<CpeAuthenticationConsumerDmaapModel> extractModelFromDmaap(Mono<JsonElement> dmaapResponse) {
return dmaapResponse
- .flatMapMany(this::parseToMono)
- .flatMap(this::createTargetFlux);
- }
-
- private Mono<JsonElement> parseToMono(String message) {
- if (StringUtils.isEmpty(message)) {
- LOGGER.warn("DMaaP response is empty");
- return Mono.empty();
- }
- return Mono.fromCallable(() -> new JsonParser().parse(message))
- .doOnError(e -> e instanceof JsonSyntaxException, e -> LOGGER.error("Invalid JSON. Ignoring"))
- .onErrorResume(e -> e instanceof JsonSyntaxException, e -> Mono.empty());
+ .flatMapMany(this::createTargetFlux);
}
private Flux<CpeAuthenticationConsumerDmaapModel> createTargetFlux(JsonElement jsonElement) {
diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/ReRegistrationDmaapConsumerJsonParser.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/ReRegistrationDmaapConsumerJsonParser.java
index 947d7a7c..9fe0c277 100644
--- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/ReRegistrationDmaapConsumerJsonParser.java
+++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/ReRegistrationDmaapConsumerJsonParser.java
@@ -32,7 +32,6 @@ import com.google.gson.Gson;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
-import com.google.gson.JsonSyntaxException;
import java.util.Optional;
import java.util.stream.StreamSupport;
@@ -72,20 +71,9 @@ public class ReRegistrationDmaapConsumerJsonParser {
* @param dmaapResponse Response from DMaaP
* @return Re-Registration Consumer DMaaP reactive model
*/
- public Flux<ReRegistrationConsumerDmaapModel> extractModelFromDmaap(Mono<String> dmaapResponse) {
+ public Flux<ReRegistrationConsumerDmaapModel> extractModelFromDmaap(Mono<JsonElement> dmaapResponse) {
return dmaapResponse
- .flatMapMany(this::parseToMono)
- .flatMap(this::createTargetFlux);
- }
-
- private Mono<JsonElement> parseToMono(String message) {
- if (StringUtils.isEmpty(message)) {
- LOGGER.warn("DMaaP response is empty");
- return Mono.empty();
- }
- return Mono.fromCallable(() -> new JsonParser().parse(message))
- .doOnError(e -> e instanceof JsonSyntaxException, e -> LOGGER.error("Invalid JSON. Ignoring"))
- .onErrorResume(e -> e instanceof JsonSyntaxException, e -> Mono.empty());
+ .flatMapMany(this::createTargetFlux);
}
private Flux<ReRegistrationConsumerDmaapModel> createTargetFlux(JsonElement jsonElement) {