diff options
author | pwielebs <piotr.wielebski@nokia.com> | 2019-03-07 10:47:59 +0100 |
---|---|---|
committer | Marcin Migdal <marcin.migdal@nokia.com> | 2019-03-20 11:59:16 +0100 |
commit | 295746ec486e0300e7d5958ba44f8054c30389f4 (patch) | |
tree | 3b43650f5878d00af3193a82d8394b1e0fcac1c0 /prh-app-server/src/main/java/org | |
parent | d824f1434f7ea71f1efdf3e9a667f6724ae600d3 (diff) |
Integrate PRH with SDK in version 1.1.4
- AAI client aligned
- CBS client aligned
- DmaaP client aligned
Change-Id: I7afd0a44572e1097be5c3a4acc7221c7923cea8b
Issue-ID: DCAEGEN2-1319
Signed-off-by: pwielebs <piotr.wielebski@nokia.com>
Diffstat (limited to 'prh-app-server/src/main/java/org')
10 files changed, 74 insertions, 64 deletions
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/MainApp.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/MainApp.java index 3967dc0a..5ea07e0a 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/MainApp.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/MainApp.java @@ -25,6 +25,7 @@ import static org.onap.dcaegen2.services.prh.model.logging.MdcVariables.REQUEST_ import java.util.Map; import java.util.UUID; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.providers.CloudConfigurationClient; import org.slf4j.MDC; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @@ -58,4 +59,10 @@ public class MainApp { TaskScheduler concurrentTaskScheduler() { return new ConcurrentTaskScheduler(); } + + + @Bean + CloudConfigurationClient getCloudConfigurationClient(){ + return new CloudConfigurationClient(); + } } diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/CloudConfiguration.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/CloudConfiguration.java index 08c99621..d2849500 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/CloudConfiguration.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/CloudConfiguration.java @@ -23,7 +23,8 @@ package org.onap.dcaegen2.services.prh.configuration; import com.google.gson.JsonObject; import org.onap.dcaegen2.services.sdk.rest.services.aai.client.config.AaiClientConfiguration; import org.onap.dcaegen2.services.sdk.rest.services.aai.client.config.ImmutableAaiClientConfiguration; -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.http.configuration.EnvProperties; + +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperties; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.providers.CloudConfigurationClient; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration; @@ -60,7 +61,6 @@ public class CloudConfiguration extends AppConfig { @Value("#{systemEnvironment}") private Properties systemEnvironment; - @Autowired public void setThreadPoolTaskScheduler(CloudConfigurationClient prhConfigurationProvider) { this.prhConfigurationProvider = prhConfigurationProvider; diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/EnvironmentProcessor.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/EnvironmentProcessor.java index 793fcc27..3d765bde 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/EnvironmentProcessor.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/EnvironmentProcessor.java @@ -21,8 +21,8 @@ package org.onap.dcaegen2.services.prh.configuration; import org.onap.dcaegen2.services.prh.exceptions.EnvironmentLoaderException; -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.http.configuration.EnvProperties; -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.http.configuration.ImmutableEnvProperties; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperties; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.ImmutableEnvProperties; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import reactor.core.publisher.Mono; diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java index 00a6d465..a69b7c54 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java @@ -20,6 +20,7 @@ package org.onap.dcaegen2.services.prh.service; +import com.google.gson.JsonArray; import com.google.gson.JsonElement; import com.google.gson.JsonObject; import com.google.gson.JsonParser; @@ -77,26 +78,14 @@ public class DmaapConsumerJsonParser { * @param monoMessage - results from DMaaP * @return reactive DMaaPModel */ - public Flux<ConsumerDmaapModel> getJsonObject(Mono<String> monoMessage) { + public Flux<ConsumerDmaapModel> getJsonObject(Mono<JsonArray> monoMessage) { return monoMessage - .flatMapMany(this::getJsonParserMessage) - .flatMap(this::createJsonConsumerModel); + .flatMapMany(this::getConsumerDmaapModelFromJsonArray); } - private Mono<JsonElement> getJsonParserMessage(String message) { - return StringUtils.isEmpty(message) ? logErrorAndReturnMonoEmpty("DmaaP response is empty") - : Mono.fromCallable(() -> new JsonParser().parse(message)); - } - - private Flux<ConsumerDmaapModel> createJsonConsumerModel(JsonElement jsonElement) { - return jsonElement.isJsonObject() - ? create(Flux.defer(() -> Flux.just(jsonElement.getAsJsonObject()))) - : getConsumerDmaapModelFromJsonArray(jsonElement); - } - - private Flux<ConsumerDmaapModel> getConsumerDmaapModelFromJsonArray(JsonElement jsonElement) { + private Flux<ConsumerDmaapModel> getConsumerDmaapModelFromJsonArray(JsonArray jsonElement) { return create( - Flux.defer(() -> Flux.fromStream(StreamSupport.stream(jsonElement.getAsJsonArray().spliterator(), false) + Flux.defer(() -> Flux.fromStream(StreamSupport.stream(jsonElement.spliterator(), false) .map(jsonElementFromArray -> getJsonObjectFromAnArray(jsonElementFromArray) .orElseGet(JsonObject::new))))); } @@ -115,7 +104,6 @@ public class DmaapConsumerJsonParser { } private Mono<ConsumerDmaapModel> transform(JsonObject responseFromDmaap) { - JsonObject commonEventHeader = responseFromDmaap.getAsJsonObject(EVENT) .getAsJsonObject(COMMON_EVENT_HEADER); JsonObject pnfRegistrationFields = responseFromDmaap.getAsJsonObject(EVENT) @@ -123,7 +111,6 @@ public class DmaapConsumerJsonParser { this.pnfSourceName = getValueFromJson(commonEventHeader, SOURCE_NAME); this.pnfNfRoleOptionalField = getValueFromJson(commonEventHeader, NF_ROLE); - this.pnfOamIpv4Address = getValueFromJson(pnfRegistrationFields, OAM_IPV_4_ADDRESS); this.pnfOamIpv6Address = getValueFromJson(pnfRegistrationFields, OAM_IPV_6_ADDRESS); this.pnfSerialNumberOptionalField = getValueFromJson(pnfRegistrationFields, SERIAL_NUMBER); diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AaiProducerTask.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AaiProducerTask.java index 8e31807a..705b085b 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AaiProducerTask.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AaiProducerTask.java @@ -25,9 +25,11 @@ import org.onap.dcaegen2.services.prh.exceptions.AaiNotFoundException; import org.onap.dcaegen2.services.prh.exceptions.PrhTaskException; import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel; import org.onap.dcaegen2.services.sdk.rest.services.aai.client.config.AaiClientConfiguration; -import org.onap.dcaegen2.services.sdk.rest.services.aai.client.service.AaiReactiveWebClientFactory; -import org.onap.dcaegen2.services.sdk.rest.services.aai.client.service.http.patch.AaiReactiveHttpPatchClient; + +import org.onap.dcaegen2.services.sdk.rest.services.aai.client.service.AaiHttpClientFactory; +import org.onap.dcaegen2.services.sdk.rest.services.aai.client.service.http.patch.AaiHttpPatchClient; +import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.CloudHttpClient; import org.onap.dcaegen2.services.sdk.rest.services.ssl.SslFactory; import org.springframework.web.reactive.function.client.WebClient; import reactor.core.publisher.Mono; @@ -40,14 +42,14 @@ public abstract class AaiProducerTask { abstract Mono<ConsumerDmaapModel> publish(ConsumerDmaapModel message) throws AaiNotFoundException; - abstract AaiReactiveHttpPatchClient resolveClient() throws SSLException; + abstract AaiHttpPatchClient resolveClient(); protected abstract AaiClientConfiguration resolveConfiguration(); protected abstract Mono<ConsumerDmaapModel> execute(ConsumerDmaapModel consumerDmaapModel) throws PrhTaskException, SSLException; - WebClient buildWebClient() throws SSLException { - return new AaiReactiveWebClientFactory(new SslFactory(), resolveConfiguration()).build(); + CloudHttpClient buildHttpClient() { + return new AaiHttpClientFactory(resolveConfiguration()).build(); } } diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AaiProducerTaskImpl.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AaiProducerTaskImpl.java index 355ca905..a34549d8 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AaiProducerTaskImpl.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AaiProducerTaskImpl.java @@ -20,22 +20,22 @@ package org.onap.dcaegen2.services.prh.tasks; -import javax.net.ssl.SSLException; import org.onap.dcaegen2.services.prh.configuration.Config; import org.onap.dcaegen2.services.prh.exceptions.AaiNotFoundException; import org.onap.dcaegen2.services.prh.exceptions.DmaapNotFoundException; import org.onap.dcaegen2.services.prh.exceptions.PrhTaskException; -import org.onap.dcaegen2.services.prh.model.AaiJsonBodyBuilderImpl; import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel; +import org.onap.dcaegen2.services.prh.model.AaiJsonBodyBuilderImpl; import org.onap.dcaegen2.services.prh.model.utils.HttpUtils; import org.onap.dcaegen2.services.sdk.rest.services.aai.client.config.AaiClientConfiguration; -import org.onap.dcaegen2.services.sdk.rest.services.aai.client.service.http.patch.AaiReactiveHttpPatchClient; - +import org.onap.dcaegen2.services.sdk.rest.services.aai.client.service.AaiHttpClientFactory; +import org.onap.dcaegen2.services.sdk.rest.services.aai.client.service.http.patch.AaiHttpPatchClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import reactor.core.publisher.Mono; +import reactor.netty.http.client.HttpClientResponse; /** @@ -47,7 +47,7 @@ public class AaiProducerTaskImpl extends AaiProducerTask { private static final Logger LOGGER = LoggerFactory.getLogger(AaiProducerTaskImpl.class); private final Config config; - private AaiReactiveHttpPatchClient aaiReactiveHttpPatchClient; + private AaiHttpPatchClient aaiHttpPatchClient; @Autowired public AaiProducerTaskImpl(Config config) { @@ -56,20 +56,20 @@ public class AaiProducerTaskImpl extends AaiProducerTask { @Override Mono<ConsumerDmaapModel> publish(ConsumerDmaapModel consumerDmaapModel) { - LOGGER.info("Publish to AAI DmaapModel"); - return aaiReactiveHttpPatchClient.getAaiProducerResponse(consumerDmaapModel) - .flatMap(response -> { - if (HttpUtils.isSuccessfulResponseCode(response.statusCode().value())) { - return Mono.just(consumerDmaapModel); - } - return Mono - .error(new AaiNotFoundException("Incorrect response code for continuation of tasks workflow")); - }); + Mono<HttpClientResponse> resposne = aaiHttpPatchClient.getAaiResponse(consumerDmaapModel); + return resposne.flatMap(response -> { + if (HttpUtils.isSuccessfulResponseCode(response.status().code())) { + return Mono.just(consumerDmaapModel); + } + return Mono + .error(new AaiNotFoundException("Incorrect response code for continuation of tasks workflow" + response.status().code())); + }); } @Override - AaiReactiveHttpPatchClient resolveClient() throws SSLException { - return new AaiReactiveHttpPatchClient(resolveConfiguration(), new AaiJsonBodyBuilderImpl()).createAaiWebClient(buildWebClient()); + AaiHttpPatchClient resolveClient() { + return new AaiHttpPatchClient(resolveConfiguration(), + new AaiJsonBodyBuilderImpl()).createAaiHttpClient(new AaiHttpClientFactory(resolveConfiguration()).build()); } @Override @@ -78,12 +78,11 @@ public class AaiProducerTaskImpl extends AaiProducerTask { } @Override - protected Mono<ConsumerDmaapModel> execute(ConsumerDmaapModel consumerDmaapModel) - throws PrhTaskException, SSLException { + protected Mono<ConsumerDmaapModel> execute(ConsumerDmaapModel consumerDmaapModel) throws PrhTaskException { if (consumerDmaapModel == null) { throw new DmaapNotFoundException("Invoked null object to DMaaP task"); } - aaiReactiveHttpPatchClient = resolveClient(); + aaiHttpPatchClient = resolveClient(); LOGGER.debug("Method called with arg {}", consumerDmaapModel); return publish(consumerDmaapModel); } diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java index fd7bca1e..69246090 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java @@ -20,6 +20,9 @@ package org.onap.dcaegen2.services.prh.tasks; +import com.google.gson.JsonArray; +import java.util.Objects; +import java.util.Optional; import javax.net.ssl.SSLException; import org.onap.dcaegen2.services.prh.configuration.Config; import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel; @@ -33,6 +36,7 @@ import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; /** @@ -69,7 +73,10 @@ public class DmaapConsumerTaskImpl implements DmaapConsumerTask { public Flux<ConsumerDmaapModel> execute(String object) throws SSLException { DMaaPConsumerReactiveHttpClient dmaaPConsumerReactiveHttpClient = resolveClient(); LOGGER.debug("Method called with arg {}", object); - return dmaapConsumerJsonParser.getJsonObject(dmaaPConsumerReactiveHttpClient.getDMaaPConsumerResponse()); + + Mono<JsonArray> response = dmaaPConsumerReactiveHttpClient.getDMaaPConsumerResponse( + Optional.empty()); + return dmaapConsumerJsonParser.getJsonObject(response); } @Override diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTask.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTask.java index 4d6c0f87..ec8ffaff 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTask.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTask.java @@ -20,11 +20,11 @@ package org.onap.dcaegen2.services.prh.tasks; +import javax.net.ssl.SSLException; import org.onap.dcaegen2.services.prh.exceptions.PrhTaskException; import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.producer.DMaaPPublisherReactiveHttpClient; - -import org.springframework.http.ResponseEntity; +import reactor.netty.http.client.HttpClientResponse; import reactor.core.publisher.Mono; /** @@ -32,7 +32,7 @@ import reactor.core.publisher.Mono; */ interface DmaapPublisherTask { - Mono<ResponseEntity<String>> execute(ConsumerDmaapModel consumerDmaapModel) throws PrhTaskException; + Mono<HttpClientResponse> execute(ConsumerDmaapModel consumerDmaapModel) throws PrhTaskException, SSLException; - DMaaPPublisherReactiveHttpClient resolveClient(); + DMaaPPublisherReactiveHttpClient resolveClient() throws SSLException;; } diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImpl.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImpl.java index 49accdd4..85b18b8a 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImpl.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImpl.java @@ -20,11 +20,14 @@ package org.onap.dcaegen2.services.prh.tasks; +import java.util.Optional; +import javax.net.ssl.SSLException; import org.onap.dcaegen2.services.prh.configuration.Config; import org.onap.dcaegen2.services.prh.exceptions.DmaapNotFoundException; import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel; import org.onap.dcaegen2.services.prh.model.PnfReadyJsonBodyBuilderImpl; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.producer.DMaaPPublisherReactiveHttpClient; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.producer.DmaaPRestTemplateFactory; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.producer.PublisherReactiveHttpClientFactory; @@ -32,8 +35,8 @@ import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.produce import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.http.ResponseEntity; import org.springframework.stereotype.Component; +import reactor.netty.http.client.HttpClientResponse; import reactor.core.publisher.Mono; /** @@ -43,7 +46,8 @@ import reactor.core.publisher.Mono; public class DmaapPublisherTaskImpl implements DmaapPublisherTask { private static final Logger LOGGER = LoggerFactory.getLogger(DmaapPublisherTaskImpl.class); - private final Config config; + private DmaapPublisherConfiguration dmaapPublisherConfiguration; + private final PublisherReactiveHttpClientFactory httpClientFactory; @Autowired @@ -52,22 +56,23 @@ public class DmaapPublisherTaskImpl implements DmaapPublisherTask { } DmaapPublisherTaskImpl(Config config, PublisherReactiveHttpClientFactory httpClientFactory) { - this.config = config; + this.dmaapPublisherConfiguration = config.getDmaapPublisherConfiguration(); this.httpClientFactory = httpClientFactory; } @Override - public Mono<ResponseEntity<String>> execute(ConsumerDmaapModel consumerDmaapModel) throws DmaapNotFoundException { + public Mono<HttpClientResponse> execute(ConsumerDmaapModel consumerDmaapModel) throws DmaapNotFoundException,SSLException { if (consumerDmaapModel == null) { throw new DmaapNotFoundException("Invoked null object to DMaaP task"); } DMaaPPublisherReactiveHttpClient dmaapPublisherReactiveHttpClient = resolveClient(); LOGGER.info("Method called with arg {}", consumerDmaapModel); - return dmaapPublisherReactiveHttpClient.getDMaaPProducerResponse(consumerDmaapModel); + return dmaapPublisherReactiveHttpClient.getDMaaPProducerResponse(consumerDmaapModel,Optional.empty()); } @Override - public DMaaPPublisherReactiveHttpClient resolveClient() { - return httpClientFactory.create(config.getDmaapPublisherConfiguration()); + public DMaaPPublisherReactiveHttpClient resolveClient() throws SSLException{ + return httpClientFactory.create(dmaapPublisherConfiguration); + } }
\ No newline at end of file diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java index 2924225b..7ecf4a6e 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java @@ -39,10 +39,10 @@ import org.slf4j.MDC; import org.slf4j.Marker; import org.slf4j.MarkerFactory; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.http.ResponseEntity; import org.springframework.stereotype.Component; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.netty.http.client.HttpClientResponse; /** * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 3/23/18 @@ -108,10 +108,13 @@ public class ScheduledTasks { logger.info("PRH tasks have been completed"); } - private void onSuccess(ResponseEntity<String> responseCode) { - MDC.put(RESPONSE_CODE, responseCode.getStatusCode().toString()); + private void onSuccess(HttpClientResponse response) { + + String statusCode = Integer.toString(response.status().code()); + + MDC.put(RESPONSE_CODE, statusCode); logger.info("Prh consumed tasks successfully. HTTP Response code from DMaaPProducer {}", - responseCode.getStatusCode().value()); + statusCode); MDC.remove(RESPONSE_CODE); } @@ -148,10 +151,10 @@ public class ScheduledTasks { } } - private Mono<ResponseEntity<String>> publishToDmaapConfiguration(ConsumerDmaapModel monoAaiModel) { + private Mono<HttpClientResponse> publishToDmaapConfiguration(ConsumerDmaapModel monoAaiModel) { try { return dmaapProducerTask.execute(monoAaiModel); - } catch (PrhTaskException e) { + } catch (PrhTaskException | SSLException e) { return Mono.error(e); } } |