diff options
Diffstat (limited to 'components/bbs-event-processor/src/main/java')
12 files changed, 120 insertions, 61 deletions
diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/config/ApplicationConfiguration.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/config/ApplicationConfiguration.java index 981d9633..5022a693 100644 --- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/config/ApplicationConfiguration.java +++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/config/ApplicationConfiguration.java @@ -174,6 +174,13 @@ public class ApplicationConfiguration implements ConfigurationChangeObservable { synchronized (this) { cbsPollingInterval = newConfiguration.cbsPollingIntervalSec(); + securityProperties.setEnableAaiCertAuth(newConfiguration.enableAaiCertAuth()); + securityProperties.setEnableDmaapCertAuth(newConfiguration.enableDmaapCertAuth()); + securityProperties.setKeyStorePath(newConfiguration.keyStorePath()); + securityProperties.setKeyStorePasswordPath(newConfiguration.keyStorePasswordPath()); + securityProperties.setTrustStorePath(newConfiguration.trustStorePath()); + securityProperties.setTrustStorePasswordPath(newConfiguration.trustStorePasswordPath()); + GeneratedAppConfigObject.StreamsObject reRegObject = getStreamsObject(newConfiguration.streamSubscribesMap(), newConfiguration.reRegConfigKey(), "PNF Re-Registration"); @@ -181,6 +188,8 @@ public class ApplicationConfiguration implements ConfigurationChangeObservable { dmaapReRegistrationConsumerProperties.setDmaapHostName(topicUrlInfo.getHost()); dmaapReRegistrationConsumerProperties.setDmaapPortNumber(topicUrlInfo.getPort()); dmaapReRegistrationConsumerProperties.setDmaapProtocol(newConfiguration.dmaapProtocol()); + dmaapReRegistrationConsumerProperties.setDmaapUserName(reRegObject.aafUsername()); + dmaapReRegistrationConsumerProperties.setDmaapUserPassword(reRegObject.aafPassword()); dmaapReRegistrationConsumerProperties.setDmaapContentType(newConfiguration.dmaapContentType()); dmaapReRegistrationConsumerProperties.setDmaapTopicName(topicUrlInfo.getTopicName()); dmaapReRegistrationConsumerProperties.setConsumerId(newConfiguration.dmaapConsumerConsumerId()); @@ -196,6 +205,8 @@ public class ApplicationConfiguration implements ConfigurationChangeObservable { dmaapCpeAuthenticationConsumerProperties.setDmaapHostName(topicUrlInfo.getHost()); dmaapCpeAuthenticationConsumerProperties.setDmaapPortNumber(topicUrlInfo.getPort()); dmaapCpeAuthenticationConsumerProperties.setDmaapProtocol(newConfiguration.dmaapProtocol()); + dmaapCpeAuthenticationConsumerProperties.setDmaapUserName(cpeAuthObject.aafUsername()); + dmaapCpeAuthenticationConsumerProperties.setDmaapUserPassword(cpeAuthObject.aafPassword()); dmaapCpeAuthenticationConsumerProperties.setDmaapContentType(newConfiguration.dmaapContentType()); dmaapCpeAuthenticationConsumerProperties.setDmaapTopicName(topicUrlInfo.getTopicName()); dmaapCpeAuthenticationConsumerProperties.setConsumerId(newConfiguration.dmaapConsumerConsumerId()); @@ -211,6 +222,8 @@ public class ApplicationConfiguration implements ConfigurationChangeObservable { dmaapProducerProperties.setDmaapHostName(topicUrlInfo.getHost()); dmaapProducerProperties.setDmaapPortNumber(topicUrlInfo.getPort()); dmaapProducerProperties.setDmaapProtocol(newConfiguration.dmaapProtocol()); + dmaapProducerProperties.setDmaapUserName(closeLoopObject.aafUsername()); + dmaapProducerProperties.setDmaapUserPassword(closeLoopObject.aafPassword()); dmaapProducerProperties.setDmaapContentType(newConfiguration.dmaapContentType()); dmaapProducerProperties.setDmaapTopicName(topicUrlInfo.getTopicName()); constructDmaapProducerConfiguration(); @@ -361,7 +374,7 @@ public class ApplicationConfiguration implements ConfigurationChangeObservable { throw new ConfigurationParsingException("Wrong topic name structure"); } topicUrlInfo.setPort(Integer.valueOf(tokensAfterHost[0])); - topicUrlInfo.setTopicName("/events/" + tokensAfterHost[1]); + topicUrlInfo.setTopicName("events/" + tokensAfterHost[1]); return topicUrlInfo; } diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/config/ConsulConfigurationGateway.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/config/ConsulConfigurationGateway.java index 1d27fc0a..607b3b31 100644 --- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/config/ConsulConfigurationGateway.java +++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/config/ConsulConfigurationGateway.java @@ -39,6 +39,8 @@ import org.onap.bbs.event.processor.model.ImmutableDmaapInfo; import org.onap.bbs.event.processor.model.ImmutableGeneratedAppConfigObject; import org.onap.bbs.event.processor.model.ImmutableStreamsObject; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClientFactory; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsRequests; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.CbsRequest; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperties; import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext; import org.slf4j.Logger; @@ -123,12 +125,12 @@ public class ConsulConfigurationGateway { // Necessary properties from the environment (Consul host:port, service-name (hostname), CBS name) EnvProperties env = EnvProperties.fromEnvironment(); - + CbsRequest cbsRequest = CbsRequests.getConfiguration(diagnosticContext); // Create the client and use it to get the configuration cbsFetchPipeline = CbsClientFactory.createCbsClient(env) .doOnError(e -> LOGGER.warn("CBS Configuration fetch failed with error: {}", e)) .retry(e -> true) - .flatMapMany(cbsClient -> cbsClient.updates(diagnosticContext, initialDelay, period)) + .flatMapMany(cbsClient -> cbsClient.updates(cbsRequest, initialDelay, period)) .subscribe(this::parseConsulRetrievedConfiguration, this::handleErrors); } @@ -178,6 +180,13 @@ public class ConsulConfigurationGateway { final String loggingLevel = configObject.get("application.loggingLevel").getAsString(); + final String keyStorePath = configObject.get("application.ssl.keyStorePath").getAsString(); + final String keyStorePasswordPath = configObject.get("application.ssl.keyStorePasswordPath").getAsString(); + final String trustStorePath = configObject.get("application.ssl.trustStorePath").getAsString(); + final String trustStorePasswordPath = configObject.get("application.ssl.trustStorePasswordPath").getAsString(); + final boolean aaiEnableCertAuth = configObject.get("application.ssl.enableAaiCertAuth").getAsBoolean(); + final boolean dmaapEnableCertAuth = configObject.get("application.ssl.enableDmaapCertAuth").getAsBoolean(); + final JsonObject streamsPublishes = configObject.getAsJsonObject("streams_publishes"); final JsonObject streamsSubscribes = configObject.getAsJsonObject("streams_subscribes"); @@ -211,6 +220,12 @@ public class ConsulConfigurationGateway { .cpeAuthConfigKey(cpeAuthConfigKey) .closeLoopConfigKey(closeLoopConfigKey) .loggingLevel(loggingLevel) + .keyStorePath(keyStorePath) + .keyStorePasswordPath(keyStorePasswordPath) + .trustStorePath(trustStorePath) + .trustStorePasswordPath(trustStorePasswordPath) + .enableAaiCertAuth(aaiEnableCertAuth) + .enableDmaapCertAuth(dmaapEnableCertAuth) .streamSubscribesMap(parseStreamsObjects(streamsSubscribes)) .streamPublishesMap(parseStreamsObjects(streamsPublishes)) .build(); diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/model/GeneratedAppConfigObject.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/model/GeneratedAppConfigObject.java index 4fdb81be..41a8a34f 100644 --- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/model/GeneratedAppConfigObject.java +++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/model/GeneratedAppConfigObject.java @@ -123,6 +123,25 @@ public interface GeneratedAppConfigObject { @SerializedName(value = "application.loggingLevel", alternate = "application.loggingLevel") String loggingLevel(); + @SerializedName(value = "application.ssl.trustStorePath", alternate = "application.ssl.trustStorePath") + String trustStorePath(); + + @SerializedName(value = "application.ssl.trustStorePasswordPath", + alternate = "application.ssl.trustStorePasswordPath") + String trustStorePasswordPath(); + + @SerializedName(value = "application.ssl.keyStorePath", alternate = "application.ssl.keyStorePath") + String keyStorePath(); + + @SerializedName(value = "application.ssl.keyStorePasswordPath", alternate = "application.ssl.keyStorePasswordPath") + String keyStorePasswordPath(); + + @SerializedName(value = "application.ssl.enableAaiCertAuth", alternate = "application.ssl.enableAaiCertAuth") + boolean enableAaiCertAuth(); + + @SerializedName(value = "application.ssl.enableDmaapCertAuth", alternate = "application.ssl.enableDmaapCertAuth") + boolean enableDmaapCertAuth(); + @SerializedName(value = "streams_subscribes", alternate = "streams_subscribes") Map<String, StreamsObject> streamSubscribesMap(); diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/pipelines/CpeAuthenticationPipeline.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/pipelines/CpeAuthenticationPipeline.java index 711ab185..a30903bb 100644 --- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/pipelines/CpeAuthenticationPipeline.java +++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/pipelines/CpeAuthenticationPipeline.java @@ -49,11 +49,11 @@ import org.onap.bbs.event.processor.model.ServiceInstanceAaiObject; import org.onap.bbs.event.processor.tasks.AaiClientTask; import org.onap.bbs.event.processor.tasks.DmaapCpeAuthenticationConsumerTask; import org.onap.bbs.event.processor.tasks.DmaapPublisherTask; +import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpResponse; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.slf4j.MDC; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.http.ResponseEntity; import org.springframework.stereotype.Component; import org.springframework.util.StringUtils; @@ -99,7 +99,7 @@ public class CpeAuthenticationPipeline { LOGGER.trace("Reactive CPE Authentication pipeline subscribed - Execution started"); } - Flux<ResponseEntity<String>> executePipeline() { + Flux<HttpResponse> executePipeline() { return // Consume CPE Authentication from DMaaP consumeCpeAuthenticationFromDmaap() @@ -111,11 +111,11 @@ public class CpeAuthenticationPipeline { .flatMap(this::triggerPolicy); } - private void onSuccess(ResponseEntity<String> responseCode) { - MDC.put(RESPONSE_CODE, responseCode.getStatusCode().toString()); + private void onSuccess(HttpResponse responseCode) { + MDC.put(RESPONSE_CODE, String.valueOf(responseCode.statusCode())); LOGGER.info("CPE Authentication event successfully handled. " + "Publishing to DMaaP for Policy returned a status code of ({} {})", - responseCode.getStatusCode().value(), responseCode.getStatusCode().getReasonPhrase()); + responseCode.statusCode(), responseCode.statusReason()); MDC.remove(RESPONSE_CODE); } @@ -171,8 +171,10 @@ public class CpeAuthenticationPipeline { .doOnError(TimeoutException.class, e -> LOGGER.warn("Timed out waiting for A&AI response") ) - .doOnError(e -> LOGGER.error("Error while retrieving PNF: {}", - e.getMessage()) + .doOnError(e -> { + LOGGER.error("Error while retrieving PNF: {}", e.getMessage()); + LOGGER.debug("Error\n", e); + } ) .onErrorResume( e -> e instanceof Exception, @@ -214,8 +216,10 @@ public class CpeAuthenticationPipeline { .doOnError(TimeoutException.class, e -> LOGGER.warn("Timed out waiting for A&AI response") ) - .doOnError(e -> LOGGER.error("Error while retrieving HSI CFS Service instance: {}", - e.getMessage()) + .doOnError(e -> { + LOGGER.error("Error while retrieving HSI CFS Service instance: {}", e.getMessage()); + LOGGER.debug("Error\n", e); + } ) .onErrorResume( e -> e instanceof Exception, @@ -226,7 +230,7 @@ public class CpeAuthenticationPipeline { }); } - private Mono<ResponseEntity<String>> triggerPolicy(PipelineState state) { + private Mono<HttpResponse> triggerPolicy(PipelineState state) { if (state == null || state.getHsiCfsServiceInstance() == null) { return Mono.empty(); diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/pipelines/ReRegistrationPipeline.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/pipelines/ReRegistrationPipeline.java index 9a42ed21..33a9aea7 100644 --- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/pipelines/ReRegistrationPipeline.java +++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/pipelines/ReRegistrationPipeline.java @@ -48,11 +48,11 @@ import org.onap.bbs.event.processor.model.ServiceInstanceAaiObject; import org.onap.bbs.event.processor.tasks.AaiClientTask; import org.onap.bbs.event.processor.tasks.DmaapPublisherTask; import org.onap.bbs.event.processor.tasks.DmaapReRegistrationConsumerTask; +import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpResponse; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.slf4j.MDC; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.http.ResponseEntity; import org.springframework.stereotype.Component; import org.springframework.util.StringUtils; @@ -98,7 +98,7 @@ public class ReRegistrationPipeline { LOGGER.trace("Reactive PNF Re-registration pipeline subscribed - Execution started"); } - Flux<ResponseEntity<String>> executePipeline() { + Flux<HttpResponse> executePipeline() { return // Consume Re-Registration from DMaaP consumeReRegistrationsFromDmaap() @@ -110,11 +110,11 @@ public class ReRegistrationPipeline { .flatMap(this::triggerPolicy); } - private void onSuccess(ResponseEntity<String> responseCode) { - MDC.put(RESPONSE_CODE, responseCode.getStatusCode().toString()); + private void onSuccess(HttpResponse responseCode) { + MDC.put(RESPONSE_CODE, String.valueOf(responseCode.statusCode())); LOGGER.info("PNF Re-Registration event successfully handled. " + "Publishing to DMaaP for Policy returned a status code of ({} {})", - responseCode.getStatusCode().value(), responseCode.getStatusCode().getReasonPhrase()); + responseCode.statusCode(), responseCode.statusReason()); MDC.remove(RESPONSE_CODE); } @@ -170,8 +170,10 @@ public class ReRegistrationPipeline { .doOnError(TimeoutException.class, e -> LOGGER.warn("Timed out waiting for A&AI response") ) - .doOnError(e -> LOGGER.error("Error while retrieving PNF: {}", - e.getMessage()) + .doOnError(e -> { + LOGGER.error("Error while retrieving PNF: {}", e.getMessage()); + LOGGER.debug("Error\n", e); + } ) .onErrorResume( e -> e instanceof Exception, @@ -219,8 +221,10 @@ public class ReRegistrationPipeline { .doOnError(TimeoutException.class, e -> LOGGER.warn("Timed out waiting for A&AI response") ) - .doOnError(e -> LOGGER.error("Error while retrieving HSI CFS Service instance: {}", - e.getMessage()) + .doOnError(e -> { + LOGGER.error("Error while retrieving HSI CFS Service instance: {}", e.getMessage()); + LOGGER.debug("Error\n", e); + } ) .onErrorResume( e -> e instanceof Exception, @@ -259,7 +263,7 @@ public class ReRegistrationPipeline { return isNotRelocation; } - private Mono<ResponseEntity<String>> triggerPolicy(PipelineState state) { + private Mono<HttpResponse> triggerPolicy(PipelineState state) { if (state == null || state.getHsiCfsServiceInstance() == null) { return Mono.empty(); diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapCpeAuthenticationConsumerTaskImpl.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapCpeAuthenticationConsumerTaskImpl.java index e6bef523..da510281 100644 --- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapCpeAuthenticationConsumerTaskImpl.java +++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapCpeAuthenticationConsumerTaskImpl.java @@ -20,6 +20,10 @@ package org.onap.bbs.event.processor.tasks; +import com.google.gson.JsonElement; + +import java.util.Optional; + import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; import javax.net.ssl.SSLException; @@ -85,6 +89,8 @@ public class DmaapCpeAuthenticationConsumerTaskImpl public synchronized void updateConfiguration() { try { LOGGER.info("DMaaP CPE authentication consumer update due to new application configuration"); + LOGGER.info("Creating secure context with:\n {}", + this.configuration.getDmaapCpeAuthenticationConsumerConfiguration()); httpClient = httpClientFactory.create(this.configuration.getDmaapCpeAuthenticationConsumerConfiguration()); } catch (SSLException e) { LOGGER.error("SSL error while updating HTTP Client after a config update"); @@ -96,7 +102,7 @@ public class DmaapCpeAuthenticationConsumerTaskImpl public Flux<CpeAuthenticationConsumerDmaapModel> execute(String taskName) { LOGGER.debug("Executing task for CPE-Authentication with name \"{}\"", taskName); DMaaPConsumerReactiveHttpClient httpClient = getHttpClient(); - Mono<String> response = httpClient.getDMaaPConsumerResponse(); + Mono<JsonElement> response = httpClient.getDMaaPConsumerResponse(Optional.empty()); return cpeAuthenticationDmaapConsumerJsonParser.extractModelFromDmaap(response) .switchIfEmpty(Flux.error(EMPTY_DMAAP_EXCEPTION)) .doOnError(e -> { diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapPublisherTask.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapPublisherTask.java index bddd2ecc..749c4e53 100644 --- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapPublisherTask.java +++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapPublisherTask.java @@ -21,11 +21,11 @@ package org.onap.bbs.event.processor.tasks; import org.onap.bbs.event.processor.model.ControlLoopPublisherDmaapModel; -import org.springframework.http.ResponseEntity; +import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpResponse; import reactor.core.publisher.Mono; public interface DmaapPublisherTask { - Mono<ResponseEntity<String>> execute(ControlLoopPublisherDmaapModel controlLoopPublisherDmaapModel); + Mono<HttpResponse> execute(ControlLoopPublisherDmaapModel controlLoopPublisherDmaapModel); } diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapPublisherTaskImpl.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapPublisherTaskImpl.java index 7b227211..283e5ef9 100644 --- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapPublisherTaskImpl.java +++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapPublisherTaskImpl.java @@ -20,21 +20,24 @@ package org.onap.bbs.event.processor.tasks; +import java.util.Optional; + import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; +import javax.net.ssl.SSLException; import org.onap.bbs.event.processor.config.ApplicationConfiguration; import org.onap.bbs.event.processor.config.ConfigurationChangeObserver; import org.onap.bbs.event.processor.exceptions.DmaapException; import org.onap.bbs.event.processor.model.ControlLoopPublisherDmaapModel; import org.onap.bbs.event.processor.utilities.ControlLoopJsonBodyBuilder; +import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpResponse; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.producer.DMaaPPublisherReactiveHttpClient; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.producer.DmaaPRestTemplateFactory; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.producer.PublisherReactiveHttpClientFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.http.ResponseEntity; import org.springframework.stereotype.Component; import reactor.core.publisher.Mono; @@ -59,7 +62,12 @@ public class DmaapPublisherTaskImpl implements DmaapPublisherTask, Configuration this.configuration = configuration; this.httpClientFactory = httpClientFactory; - httpClient = httpClientFactory.create(this.configuration.getDmaapPublisherConfiguration()); + try { + httpClient = httpClientFactory.create(this.configuration.getDmaapPublisherConfiguration()); + } catch (SSLException e) { + LOGGER.error("SSL error while creating HTTP Client: {}", e.getMessage()); + LOGGER.debug("SSL exception\n", e); + } } @PostConstruct @@ -75,17 +83,24 @@ public class DmaapPublisherTaskImpl implements DmaapPublisherTask, Configuration @Override public synchronized void updateConfiguration() { LOGGER.info("DMaaP Publisher update due to new application configuration"); - httpClient = httpClientFactory.create(this.configuration.getDmaapPublisherConfiguration()); + try { + LOGGER.info("Creating secure context with:\n {}", this.configuration.getDmaapPublisherConfiguration()); + httpClient = httpClientFactory.create(this.configuration.getDmaapPublisherConfiguration()); + } catch (SSLException e) { + LOGGER.error("SSL error while updating HTTP Client after a config update: {}", e.getMessage()); + LOGGER.debug("SSL exception\n", e); + } } @Override - public Mono<ResponseEntity<String>> execute(ControlLoopPublisherDmaapModel controlLoopPublisherDmaapModel) { + public Mono<HttpResponse> execute(ControlLoopPublisherDmaapModel controlLoopPublisherDmaapModel) { if (controlLoopPublisherDmaapModel == null) { throw new DmaapException("Cannot invoke a DMaaP Publish task with a null message"); } - LOGGER.info("Executing task for publishing control loop message \n{}", controlLoopPublisherDmaapModel); + LOGGER.info("Executing task for publishing control loop message"); + LOGGER.debug("CL message \n{}", controlLoopPublisherDmaapModel); DMaaPPublisherReactiveHttpClient httpClient = getHttpClient(); - return httpClient.getDMaaPProducerResponse(controlLoopPublisherDmaapModel); + return httpClient.getDMaaPProducerResponse(controlLoopPublisherDmaapModel, Optional.empty()); } private synchronized DMaaPPublisherReactiveHttpClient getHttpClient() { diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapReRegistrationConsumerTaskImpl.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapReRegistrationConsumerTaskImpl.java index 92f5a86f..e40037b1 100644 --- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapReRegistrationConsumerTaskImpl.java +++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapReRegistrationConsumerTaskImpl.java @@ -20,6 +20,10 @@ package org.onap.bbs.event.processor.tasks; +import com.google.gson.JsonElement; + +import java.util.Optional; + import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; import javax.net.ssl.SSLException; @@ -85,6 +89,8 @@ public class DmaapReRegistrationConsumerTaskImpl implements DmaapReRegistrationC public synchronized void updateConfiguration() { try { LOGGER.info("DMaaP PNF reregistration consumer update due to new application configuration"); + LOGGER.info("Creating secure context with:\n {}", + this.configuration.getDmaapReRegistrationConsumerConfiguration()); httpClient = httpClientFactory.create(this.configuration.getDmaapReRegistrationConsumerConfiguration()); } catch (SSLException e) { LOGGER.error("SSL error while updating HTTP Client after a config update"); @@ -96,7 +102,7 @@ public class DmaapReRegistrationConsumerTaskImpl implements DmaapReRegistrationC public Flux<ReRegistrationConsumerDmaapModel> execute(String taskName) { LOGGER.debug("Executing task for Re-Registration with name \"{}\"", taskName); DMaaPConsumerReactiveHttpClient httpClient = getHttpClient(); - Mono<String> response = httpClient.getDMaaPConsumerResponse(); + Mono<JsonElement> response = httpClient.getDMaaPConsumerResponse(Optional.empty()); return reRegistrationDmaapConsumerJsonParser.extractModelFromDmaap(response) .switchIfEmpty(Flux.error(EMPTY_DMAAP_EXCEPTION)) .doOnError(e -> { diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/AaiReactiveClient.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/AaiReactiveClient.java index 19b81a80..84fc9f7d 100644 --- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/AaiReactiveClient.java +++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/AaiReactiveClient.java @@ -181,6 +181,7 @@ public class AaiReactiveClient implements ConfigurationChangeObserver { private SslContext createSslContext() throws SSLException { if (aaiClientConfiguration.enableAaiCertAuth()) { + LOGGER.info("Creating secure context with:\n {}", aaiClientConfiguration); return sslFactory.createSecureContext( aaiClientConfiguration.keyStorePath(), aaiClientConfiguration.keyStorePasswordPath(), diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/CpeAuthenticationDmaapConsumerJsonParser.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/CpeAuthenticationDmaapConsumerJsonParser.java index 2bb5d98a..3cff4e65 100644 --- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/CpeAuthenticationDmaapConsumerJsonParser.java +++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/CpeAuthenticationDmaapConsumerJsonParser.java @@ -37,7 +37,6 @@ import com.google.gson.Gson; import com.google.gson.JsonElement; import com.google.gson.JsonObject; import com.google.gson.JsonParser; -import com.google.gson.JsonSyntaxException; import java.util.Optional; import java.util.stream.StreamSupport; @@ -79,20 +78,9 @@ public class CpeAuthenticationDmaapConsumerJsonParser { * @param dmaapResponse Response from DMaaP * @return CPE Authentication Consumer DMaaP reactive model */ - public Flux<CpeAuthenticationConsumerDmaapModel> extractModelFromDmaap(Mono<String> dmaapResponse) { + public Flux<CpeAuthenticationConsumerDmaapModel> extractModelFromDmaap(Mono<JsonElement> dmaapResponse) { return dmaapResponse - .flatMapMany(this::parseToMono) - .flatMap(this::createTargetFlux); - } - - private Mono<JsonElement> parseToMono(String message) { - if (StringUtils.isEmpty(message)) { - LOGGER.warn("DMaaP response is empty"); - return Mono.empty(); - } - return Mono.fromCallable(() -> new JsonParser().parse(message)) - .doOnError(e -> e instanceof JsonSyntaxException, e -> LOGGER.error("Invalid JSON. Ignoring")) - .onErrorResume(e -> e instanceof JsonSyntaxException, e -> Mono.empty()); + .flatMapMany(this::createTargetFlux); } private Flux<CpeAuthenticationConsumerDmaapModel> createTargetFlux(JsonElement jsonElement) { diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/ReRegistrationDmaapConsumerJsonParser.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/ReRegistrationDmaapConsumerJsonParser.java index 947d7a7c..9fe0c277 100644 --- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/ReRegistrationDmaapConsumerJsonParser.java +++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/ReRegistrationDmaapConsumerJsonParser.java @@ -32,7 +32,6 @@ import com.google.gson.Gson; import com.google.gson.JsonElement; import com.google.gson.JsonObject; import com.google.gson.JsonParser; -import com.google.gson.JsonSyntaxException; import java.util.Optional; import java.util.stream.StreamSupport; @@ -72,20 +71,9 @@ public class ReRegistrationDmaapConsumerJsonParser { * @param dmaapResponse Response from DMaaP * @return Re-Registration Consumer DMaaP reactive model */ - public Flux<ReRegistrationConsumerDmaapModel> extractModelFromDmaap(Mono<String> dmaapResponse) { + public Flux<ReRegistrationConsumerDmaapModel> extractModelFromDmaap(Mono<JsonElement> dmaapResponse) { return dmaapResponse - .flatMapMany(this::parseToMono) - .flatMap(this::createTargetFlux); - } - - private Mono<JsonElement> parseToMono(String message) { - if (StringUtils.isEmpty(message)) { - LOGGER.warn("DMaaP response is empty"); - return Mono.empty(); - } - return Mono.fromCallable(() -> new JsonParser().parse(message)) - .doOnError(e -> e instanceof JsonSyntaxException, e -> LOGGER.error("Invalid JSON. Ignoring")) - .onErrorResume(e -> e instanceof JsonSyntaxException, e -> Mono.empty()); + .flatMapMany(this::createTargetFlux); } private Flux<ReRegistrationConsumerDmaapModel> createTargetFlux(JsonElement jsonElement) { |